我们之前的回显服务器虽然允许多个客户端连接,但当连接超过一个时,会出现问题:一个客户端可能让其他客户端陷入等待。我们可以通过将套接字设为非阻塞模式来解决这个问题。当套接字处于非阻塞模式时,任何调用可能会阻塞的方法(比如 recv)都保证会立即返回。如果套接字有数据可供处理,那么我们会像使用阻塞套接字一样获得数据。如果没有数据,套接字会立即告诉我们它没有数据可用,这时我们就可以自由地去执行其他代码。
import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_socket.bind(('127.0.0.1', 8000))server_socket.listen()server_socket.setblocking(False)
从根本上说,创建非阻塞套接字和创建阻塞套接字没有区别,只是我们必须调用 setblocking(False)。默认情况下,套接字的这个值是 True,表示它是阻塞的。现在,让我们看看在原始应用中这样做的效果。这能解决问题吗?
import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('127.0.0.1', 8000)server_socket.bind(server_address)server_socket.listen()server_socket.setblocking(False) # ❶connections = []try: while True: connection, client_address = server_socket.accept() connection.setblocking(False) # ❷ print(f'I got a connection from {client_address}!') connections.append(connection) for connection in connections: buffer = b'' while buffer[-2:] != b'\r\n': data = connection.recv(2) if not data: break else: print(f'I got data: {data}!') buffer = buffer + data print(f"All the data is: {buffer}") connection.send(buffer)finally: server_socket.close()
当我们运行列表 3.5 时,会立刻注意到不同之处。我们的应用几乎瞬间就崩溃了!我们会被抛出一个 BlockingIOError 异常,因为我们的服务器套接字还没有连接,因此没有数据可处理:
Traceback (most recent call last): File "echo_server.py", line 14, in <module> connection, client_address = server_socket.accept() File " python3.8/socket.py", line 292, in accept fd, addr = self._accept()BlockingIOError: [Errno 35] Resource temporarily unavailable
这是套接字一种不太直观的方式来告诉我们:“我还没数据,稍后再试吧。” 我们没法轻易判断套接字是否有数据,所以一个解决方案就是直接捕获这个异常,忽略它,然后继续循环,直到我们真的有数据。用这个策略,我们会以最快的速度不断检查新连接和数据。这应该能解决我们阻塞套接字回显服务器的问题。
import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server_address = ('127.0.0.1', 8000)server_socket.bind(server_address)server_socket.listen()server_socket.setblocking(False)connections = []try: while True: try: connection, client_address = server_socket.accept() connection.setblocking(False) print(f'I got a connection from {client_address}!') connections.append(connection) except BlockingIOError: pass for connection in connections: try: buffer = b'' while buffer[-2:] != b'\r\n': data = connection.recv(2) if not data: break else: print(f'I got data: {data}!') buffer = buffer + data print(f"All the data is: {buffer}") connection.send(buffer) except BlockingIOError: passfinally: server_socket.close()
每次我们进入无限循环的一次迭代时,我们对 accept 或 recv 的任何调用都不会阻塞,要么立即抛出我们忽略的异常,要么就有数据准备就绪,我们可以立即处理。这个循环的每一次迭代都非常快,我们完全不依赖于任何客户端发送数据来推进下一行代码。这解决了我们阻塞服务器的问题,允许多个客户端同时连接并并发发送数据。
这种方法虽然可行,但它也有代价。第一是代码质量。只要可能没有数据,就捕获异常,很快就会变得冗长且容易出错。第二是资源问题。如果你在笔记本电脑上运行这个程序,几秒钟后可能会发现风扇声音变大了。这个应用会持续几乎 100% 地占用我们 CPU 的处理能力(见图 3.3)。这是因为我们一直在循环,并在应用内部尽可能快地获取异常,导致工作负载非常耗 CPU。
图 3.3 在循环并捕获异常时,CPU 使用率飙升至 100% 并保持不变。
之前我们提到过,操作系统有特定的事件通知系统,可以通知我们套接字有数据可处理。这些系统依赖于硬件级的通知,而不是像我们刚才那样用 while 循环轮询。Python 内置了一个库来使用这种事件通知系统。接下来,我们将使用它来解决我们的 CPU 利用率问题,并构建一个用于套接字事件的微型事件循环。