【一起学 Python】第 71 天:Python 3.14 异步 Socket 编程,用 asyncio 写出高性能网络程序
欢迎来到 Python 学习计划的第 71 天!🎉
昨天我们学习了 Socket 编程基础,掌握了 TCP/UDP 通信的核心 API。今天,我们将结合第 69 天学过的 asyncio 异步编程(参考 [File 100](100-Asyncio 基础:async 与 await.md)),学习如何实现高并发 Socket 服务器。
这是现代 Python 网络编程的关键技能!掌握异步 Socket,你将能够轻松处理成千上万的并发连接!
💡 为什么需要异步网络编程?
同步 vs 异步模型对比

关键差异:
同步模型(传统多线程/多进程):
- 100 个连接 = 100 个线程(内存占用 GB 级)
异步模型(asyncio 单线程事件循环):
- 10,000 个连接 = 1 个线程(内存占用 MB 级)
场景选择:
- 同步适合:连接数少(< 100),代码简单优先
- 异步适合:连接数多(> 1000),性能优先
🔑 核心概念回顾
事件循环运作原理
异步 Socket 编程的核心是事件循环:
- 轮询检查:事件循环定期检查所有连接是否有事件就绪
- 任务调度:当连接可读/可写时,事件循环执行对应的协程
- 任务挂起:遇到 await时暂停协程,切换到其他就绪任务
- 事件驱动:网络事件(数据到达、连接可写)自动唤醒对应协程
📚 StreamReader 和 StreamWriter API
核心 API 对比
StreamReader - 异步读取数据
# 创建连接获得 readerreader, writer = await asyncio.open_connection('127.0.0.1', 8888)# 读取最多 n 字节data = await reader.read(1024)# 读取一行(遇到 \n 停止)line = await reader.readline()# 精确读取 n 字节,否则抛异常exact_data = await reader.readexactly(100)# 读取直到遇到分隔符(包括分隔符)data = await reader.readuntil(b'\n')# 检查是否已到达流末尾if reader.at_eof(): print("连接已关闭")
关键特性:
- 如果数据不可用,会暂停当前协程,让出时间给其他任务
StreamWriter - 异步写入数据
# 创建连接获得 writerreader, writer = await asyncio.open_connection('127.0.0.1', 8888)# 写入数据(不保证立即发送)writer.write(b"Hello")# 写入多行数据writer.writelines([b"line1\n", b"line2\n"])# 等待缓冲区清空(流量控制)- 非常重要!await writer.drain()# 获取连接信息peer_addr = writer.get_extra_info('peername')print(f"连接来自: {peer_addr}")# 关闭连接(第一步)writer.close()# 等待关闭完成(第二步)- 必须做!await writer.wait_closed()
关键特性:
write()drain()- 关闭时必须同时调用
close() 和 await wait_closed()
🖥️ 异步 TCP 服务器
基本异步服务器
import asyncioasync def handle_client(reader, writer): """处理单个客户端连接""" addr = writer.get_extra_info('peername') print(f"✅ 客户端连接: {addr}") try: while True: # 异步读取数据 data = await reader.read(1024) # 空数据表示连接关闭 if not data: print(f"❌ 客户端断开: {addr}") break # 处理数据 message = data.decode('utf-8') print(f"📨 收到 {addr}: {message}") # 发送响应 response = f"Echo: {message}\n" writer.write(response.encode('utf-8')) # 重要:流量控制 await writer.drain() except asyncio.CancelledError: print(f"⚠️ 任务被取消: {addr}") raise except Exception as e: print(f"❌ 错误 {addr}: {e}") finally: # 关闭连接 writer.close() await writer.wait_closed()async def async_server(): """启动异步服务器""" # 创建服务器 server = await asyncio.start_server( handle_client, '127.0.0.1', 8888 ) addr = server.sockets[0].getsockname() print(f"🚀 异步服务器启动: {addr}") # 使用 async with 自动管理资源 async with server: await server.serve_forever()if __name__ == "__main__": asyncio.run(async_server())
核心设计模式:
带超时的服务器
import asyncioasync def handle_client_with_timeout(reader, writer): """带超时的客户端处理""" addr = writer.get_extra_info('peername') try: while True: try: # 设置读取超时为 30 秒 data = await asyncio.wait_for( reader.read(1024), timeout=30.0 ) except asyncio.TimeoutError: print(f"⏱️ 客户端 {addr} 超时") break if not data: break # 处理数据 response = f"Echo: {data.decode()}" writer.write(response.encode()) # 写入超时也很重要 try: await asyncio.wait_for( writer.drain(), timeout=10.0 ) except asyncio.TimeoutError: print(f"⏱️ 发送超时: {addr}") break finally: writer.close() await writer.wait_closed()async def main(): server = await asyncio.start_server( handle_client_with_timeout, '127.0.0.1', 8888 ) async with server: await server.serve_forever()asyncio.run(main())
限制并发连接数
import asyncioclass LimitedAsyncServer: def __init__(self, max_connections=100): # 使用信号量限制并发连接数 self.semaphore = asyncio.Semaphore(max_connections) self.connections = set() async def handle_client(self, reader, writer): async with self.semaphore: # 获取信号量 addr = writer.get_extra_info('peername') self.connections.add(addr) print(f"✅ 连接: {addr} (当前: {len(self.connections)}/{self.semaphore._value})") try: while True: data = await reader.read(1024) if not data: break writer.write(data) await writer.drain() finally: self.connections.discard(addr) writer.close() await writer.wait_closed() async def run(self, host='127.0.0.1', port=8888): server = await asyncio.start_server( self.handle_client, host, port ) addr = server.sockets[0].getsockname() print(f"🚀 服务器启动: {addr}, 最大连接: {self.semaphore._value}") async with server: await server.serve_forever()if __name__ == "__main__": server = LimitedAsyncServer(max_connections=1000) asyncio.run(server.run())
💻 异步 TCP 客户端
基本异步客户端
import asyncioasync def async_client(): """异步 TCP 客户端""" try: # 创建连接 reader, writer = await asyncio.open_connection( '127.0.0.1', 8888 ) print(f"✅ 已连接到服务器") # 发送消息 message = "Hello, Async Server!" writer.write(message.encode('utf-8')) await writer.drain() print(f"📤 发送: {message}") # 接收响应 response = await reader.read(1024) print(f"📥 收到: {response.decode('utf-8')}") except Exception as e: print(f"❌ 错误: {e}") finally: # 关闭连接 if 'writer' in locals(): writer.close() await writer.wait_closed()if __name__ == "__main__": asyncio.run(async_client())
并发发起多个连接
import asyncioasync def fetch_data(host, port, message): """获取单个服务器的数据""" try: reader, writer = await asyncio.open_connection(host, port) # 发送请求 writer.write(message.encode() + b'\n') await writer.drain() # 接收响应 response = await reader.readline() writer.close() await writer.wait_closed() return response.decode().strip() except Exception as e: return f"Error: {e}"async def main(): """并发连接多个服务器""" servers = [ ('127.0.0.1', 8888, 'request1'), ('127.0.0.1', 8889, 'request2'), ('127.0.0.1', 8890, 'request3'), ] # 并发执行所有任务 tasks = [ fetch_data(host, port, msg) for host, port, msg in servers ] results = await asyncio.gather(*tasks) for i, result in enumerate(results): print(f"服务器 {i+1}: {result}")if __name__ == "__main__": asyncio.run(main())
🏗️ 完整示例:异步聊天应用
聊天服务器(chat_server.py)
import asynciofrom typing import Setclass AsyncChatServer: def __init__(self): self.clients: Set[asyncio.StreamWriter] = set() async def broadcast(self, message: str, sender=None): """广播消息给所有客户端""" for client in self.clients: if client != sender: try: client.write(message.encode('utf-8') + b'\n') await client.drain() except: # 客户端已断开,稍后清理 pass async def handle_client(self, reader, writer): """处理单个客户端""" addr = writer.get_extra_info('peername') self.clients.add(writer) print(f"✅ {addr} 加入聊天室") await self.broadcast(f"[系统] {addr} 加入了聊天室") try: while True: # 读取客户端消息 data = await reader.readline() if not data: print(f"❌ {addr} 断开连接") break message = data.decode('utf-8').strip() if message: # 广播消息 broadcast_msg = f"[{addr[0]}:{addr[1]}] {message}" print(broadcast_msg) await self.broadcast(broadcast_msg, writer) finally: # 客户端离开 self.clients.discard(writer) await self.broadcast(f"[系统] {addr} 离开了聊天室") writer.close() await writer.wait_closed() async def run(self, host='127.0.0.1', port=8888): server = await asyncio.start_server( self.handle_client, host, port ) addr = server.sockets[0].getsockname() print(f"🚀 聊天服务器启动: {addr}") async with server: await server.serve_forever()if __name__ == "__main__": chat = AsyncChatServer() asyncio.run(chat.run())
聊天客户端(chat_client.py)
import asyncioasync def send_messages(writer): """发送用户输入的消息""" loop = asyncio.get_running_loop() while True: try: # 在线程池中执行阻塞的 input() message = await loop.run_in_executor( None, input, "输入消息: " ) if message.lower() == 'quit': break writer.write(message.encode('utf-8') + b'\n') await writer.drain() except Exception as e: print(f"❌ 发送错误: {e}") breakasync def receive_messages(reader): """接收服务器消息""" try: while True: message = await reader.readline() if not message: print("❌ 服务器关闭连接") break print(f"\n📨 {message.decode('utf-8').strip()}") print("输入消息: ", end='', flush=True) except Exception as e: print(f"❌ 接收错误: {e}")async def main(): try: # 连接到服务器 reader, writer = await asyncio.open_connection( '127.0.0.1', 8888 ) print("✅ 已连接到聊天服务器") # 并发运行发送和接收任务 send_task = asyncio.create_task(send_messages(writer)) recv_task = asyncio.create_task(receive_messages(reader)) # 等待任意任务完成 done, pending = await asyncio.wait( [send_task, recv_task], return_when=asyncio.FIRST_COMPLETED ) # 取消其他任务 for task in pending: task.cancel() finally: writer.close() await writer.wait_closed()if __name__ == "__main__": asyncio.run(main())
🎯 异步 TCP 完整生命周期
⚡ 最佳实践和常见错误
常见错误 1:阻塞事件循环
# ❌ 错误:直接使用 time.sleep()async def bad_delay(): import time time.sleep(1) # 阻塞整个事件循环!# ✅ 正确:使用 asyncio.sleep()async def good_delay(): await asyncio.sleep(1) # 不阻塞,让其他任务运行
常见错误 2:忘记调用 drain()
# ❌ 错误:大数据发送不调用 drain()async def bad_send(writer, data): for chunk in split_data(data): writer.write(chunk) # 缓冲区满了会导致内存溢出!# ✅ 正确:定期调用 drain()async def good_send(writer, data): for chunk in split_data(data): writer.write(chunk) await writer.drain() # 流量控制
常见错误 3:不完整的连接关闭
# ❌ 错误:只关闭不等待async def bad_close(writer): writer.close() # 直接返回,可能资源未释放# ✅ 正确:close 后还要 wait_closed()async def good_close(writer): writer.close() await writer.wait_closed() # 确保资源完全释放
常见错误 4:缺少超时保护
# ❌ 错误:无限期等待async def bad_read(reader): while True: data = await reader.read(1024) # 如果客户端卡住,会永久等待# ✅ 正确:设置超时async def good_read(reader): try: data = await asyncio.wait_for( reader.read(1024), timeout=30.0 ) except asyncio.TimeoutError: print("读取超时")
常见错误 5:不限制并发连接
# ❌ 错误:无限制接受连接async def bad_server(handle_client): server = await asyncio.start_server( handle_client, '0.0.0.0', 8888 ) # 可能被 DoS 攻击# ✅ 正确:使用信号量限制semaphore = asyncio.Semaphore(1000)async def good_server(handle_client): async def limited_handler(reader, writer): async with semaphore: await handle_client(reader, writer) server = await asyncio.start_server( limited_handler, '0.0.0.0', 8888 )
🏗️ 高性能异步网络架构
架构特点:
- 单线程事件循环
- 任务就绪队列
- 非阻塞 I/O
- 高效多路复用 - epoll/kqueue 提供的 OS 级支持
- 零线程切换开销
🐛 调试和监控
监控连接数和任务数
import asyncioclass MonitoredAsyncServer: def __init__(self): self.active_connections = 0 async def handle_client(self, reader, writer): self.active_connections += 1 addr = writer.get_extra_info('peername') print(f"连接数: {self.active_connections}, 任务数: {len(asyncio.all_tasks())}") try: while True: data = await reader.read(1024) if not data: break writer.write(data) await writer.drain() finally: self.active_connections -= 1 writer.close() await writer.wait_closed()# 定期打印状态async def monitor(): while True: await asyncio.sleep(5) tasks = asyncio.all_tasks() print(f"活跃任务: {len(tasks)}")
📊 性能对比数据
指标 | 同步多线程 | 异步单线程 | 优势 |
|---|
1000 连接的内存占用 | ~50MB | ~5MB | 10x |
10000 连接吞吐量 | 1000 req/s | 10000 req/s | 10x |
平均响应延迟 | 100ms | 10ms | 10x |
CPU 使用率 | 80% | 20% | 4x |
编程复杂度 | ⭐⭐ | ⭐⭐⭐⭐ | - |
🎉 模块学习路线
┌─────────────────────────────────────────────────────────┐│ Socket 编程基础模块(第 70-73 天) │├─────────────────────────────────────────────────────────┤│ ││ 第 70 天 ✓ Socket API 基础:创建 TCP 与 UDP 连接 ││ │ • Socket 概念与原理 ││ │ • TCP vs UDP 对比 ││ │ • 服务器与客户端实现 ││ │ ││ ▼ ││ 第 71 天 ✓ Python 3.14 中的异步 Socket 编程 ││ │ • asyncio 与 Socket 结合 ││ │ • 高并发服务器实现 ││ │ • 连接池与超时控制 ││ │ ││ ▼ ││ 第 72 天 Socket 错误处理与超时机制 ││ │ • 网络异常处理 ││ │ • 超时设置与重连 ││ │ ││ ▼ ││ 第 73 天 基于 Socket 的简单聊天应用实现 ││ • 综合实战项目 ││ • 多客户端聊天室 ││ │└─────────────────────────────────────────────────────────┘
📌 明日预告:Socket 错误处理与超时机制
明天我们将进入 Socket 编程模块第三天!
- 主题:Socket 错误处理与超时机制
- 核心问题:
- 常见的网络异常有哪些?
- 如何设置连接超时和读取超时?
- 如何实现自动重连机制?
- 如何处理连接断开和重试?
- 如何构建健壮的网络应用?
💡 提前思考:
- 如果服务器突然断开,客户端应该如何处理?
- 如何避免连接无限等待?
- 重连时应该使用指数退避吗?