1. WebSocket简介
什么是WebSocket?
WebSocket是一种在单个TCP连接上进行全双工通信的协议,2011年被IETF标准化为RFC 6455。它允许服务器主动向客户端推送数据,是实现实时Web应用的关键技术。
与传统HTTP的区别
主要特点
建立在TCP之上
与HTTP兼容(使用HTTP升级握手)
低延迟,适合实时应用
支持文本和二进制数据
2. WebSocket协议原理
握手过程
# 客户端请求(HTTP升级)GET /chat HTTP/1.1Host: server.example.comUpgrade: websocketConnection: UpgradeSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==Sec-WebSocket-Version: 13# 服务器响应HTTP/1.1 101 Switching ProtocolsUpgrade: websocketConnection: UpgradeSec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
数据帧格式
WebSocket使用帧(frame)传输数据,包含:
FIN位(标记消息结束)
操作码(文本/二进制/关闭等)
掩码(客户端到服务器的消息需要掩码)
负载数据
3. Python中的WebSocket实现
常用库介绍
websockets - 异步WebSocket库
websocket-client - 客户端库
Django Channels - Django的WebSocket支持
Socket.IO - 更高级的实时通信库
安装
pip install websocketspip install websocket-client
4. 使用websockets库实现WebSocket
服务端实现
import asyncioimport websocketsimport jsonasync def echo(websocket, path): """ 简单的回显服务器 """ async for message in websocket: print(f"收到消息: {message}") await websocket.send(f"服务器回复: {message}")async def chat_server(websocket, path): """ 聊天室服务器 """ # 获取客户端连接信息 client_id = id(websocket) print(f"客户端 {client_id} 已连接") try: # 发送欢迎消息 welcome_msg = { "type": "system", "message": f"欢迎来到聊天室!你是用户{client_id}", "client_id": client_id } await websocket.send(json.dumps(welcome_msg)) # 处理消息 async for message in websocket: data = json.loads(message) if data.get("type") == "chat": # 广播消息给所有连接的客户端 broadcast_msg = { "type": "chat", "from": client_id, "message": data["message"], "timestamp": asyncio.get_event_loop().time() } # 这里应该保存连接并广播给所有用户 # 简化版本,直接回发给发送者 await websocket.send(json.dumps(broadcast_msg)) except websockets.exceptions.ConnectionClosed: print(f"客户端 {client_id} 断开连接")async def main(): # 启动WebSocket服务器 server = await websockets.serve( chat_server, "localhost", 8765 ) print("WebSocket服务器启动在 ws://localhost:8765") # 保持服务器运行 await server.wait_closed()if __name__ == "__main__": asyncio.run(main())
客户端实现
import asyncioimport websocketsimport jsonimport threadingclass WebSocketClient: def __init__(self, uri="ws://localhost:8765"): self.uri = uri self.websocket = None self.running = False async def connect(self): """连接到WebSocket服务器""" try: self.websocket = await websockets.connect(self.uri) self.running = True print(f"已连接到 {self.uri}") # 启动接收消息的协程 asyncio.create_task(self.receive_messages()) except Exception as e: print(f"连接失败: {e}") async def receive_messages(self): """接收服务器消息""" try: async for message in self.websocket: data = json.loads(message) self.handle_message(data) except websockets.exceptions.ConnectionClosed: print("连接已关闭") self.running = False def handle_message(self, data): """处理接收到的消息""" msg_type = data.get("type", "unknown") if msg_type == "system": print(f"[系统] {data['message']}") elif msg_type == "chat": print(f"[用户{data['from']}] {data['message']}") else: print(f"[未知类型] {data}") async def send_message(self, message): """发送消息到服务器""" if self.websocket and self.running: msg_data = { "type": "chat", "message": message } await self.websocket.send(json.dumps(msg_data)) async def disconnect(self): """断开连接""" if self.websocket: await self.websocket.close() self.running = False print("已断开连接")# 简单的交互式客户端async def interactive_client(): client = WebSocketClient() # 连接服务器 await client.connect() try: # 交互式发送消息 while client.running: message = input("输入消息 (输入 'quit' 退出): ") if message.lower() == 'quit': break if message.strip(): await client.send_message(message) finally: await client.disconnect()if __name__ == "__main__": asyncio.run(interactive_client())
5. 使用websocket-client库(同步客户端)
import websocketimport jsonimport threadingimport timeclass SyncWebSocketClient: def __init__(self, url="ws://localhost:8765"): self.url = url self.ws = None self.running = False def on_message(self, ws, message): """收到消息时的回调""" try: data = json.loads(message) print(f"收到: {data}") except: print(f"收到原始消息: {message}") def on_error(self, ws, error): """发生错误时的回调""" print(f"错误: {error}") def on_close(self, ws, close_status_code, close_msg): """连接关闭时的回调""" print("连接关闭") self.running = False def on_open(self, ws): """连接打开时的回调""" print("连接已建立") self.running = True # 发送测试消息 test_msg = { "type": "chat", "message": "Hello from sync client!" } ws.send(json.dumps(test_msg)) def connect(self): """连接到WebSocket服务器""" # 设置WebSocket self.ws = websocket.WebSocketApp( self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # 运行WebSocket(会阻塞) self.ws.run_forever() def send_message(self, message): """发送消息""" if self.ws and self.running: data = { "type": "chat", "message": message } self.ws.send(json.dumps(data)) def disconnect(self): """断开连接""" if self.ws: self.ws.close()# 使用示例if __name__ == "__main__": client = SyncWebSocketClient() # 在新线程中运行WebSocket客户端 thread = threading.Thread(target=client.connect) thread.daemon = True thread.start() # 等待连接建立 time.sleep(2) # 发送消息 if client.running: client.send_message("这是另一条消息") # 保持运行 try: while client.running: time.sleep(1) except KeyboardInterrupt: client.disconnect()
6. 实际应用示例:实时股票价格推送
import asyncioimport websocketsimport jsonimport randomfrom datetime import datetimeimport timeclass StockPriceServer: def __init__(self): self.stocks = { "AAPL": 150.0, "GOOGL": 2800.0, "TSLA": 700.0, "AMZN": 3300.0, "MSFT": 300.0 } self.clients = set() async def register(self, websocket): """注册新客户端""" self.clients.add(websocket) print(f"新客户端连接,当前客户端数: {len(self.clients)}") async def unregister(self, websocket): """移除客户端""" self.clients.remove(websocket) print(f"客户端断开,剩余客户端数: {len(self.clients)}") async def update_stock_prices(self): """模拟更新股票价格""" while True: await asyncio.sleep(1) # 每秒更新一次 # 随机更新股票价格 for symbol in self.stocks: change = random.uniform(-2.0, 2.0) self.stocks[symbol] = max(1.0, self.stocks[symbol] + change) # 广播给所有客户端 if self.clients: message = { "type": "stock_update", "timestamp": datetime.now().isoformat(), "stocks": self.stocks } message_json = json.dumps(message) # 发送给所有连接的客户端 tasks = [ client.send(message_json) for client in self.clients ] if tasks: await asyncio.gather(*tasks) async def handler(self, websocket, path): """处理客户端连接""" await self.register(websocket) try: # 发送初始股票数据 initial_data = { "type": "init", "stocks": self.stocks } await websocket.send(json.dumps(initial_data)) # 保持连接,等待客户端消息 async for message in websocket: data = json.loads(message) # 处理客户端请求(如订阅特定股票) if data.get("type") == "subscribe": # 这里可以添加订阅逻辑 pass except websockets.exceptions.ConnectionClosed: pass finally: await self.unregister(websocket) async def main(self, host="localhost", port=8765): """启动服务器""" # 启动股票价格更新任务 update_task = asyncio.create_task(self.update_stock_prices()) # 启动WebSocket服务器 async with websockets.serve(self.handler, host, port): print(f"股票价格服务器启动在 ws://{host}:{port}") await asyncio.Future() # 永久运行 # 取消更新任务 update_task.cancel()if __name__ == "__main__": server = StockPriceServer() asyncio.run(server.main())
7. WebSocket最佳实践
错误处理
async def robust_websocket_handler(websocket, path): try: async for message in websocket: try: # 处理消息 await process_message(message) except Exception as e: # 发送错误信息给客户端 error_msg = {"error": str(e)} await websocket.send(json.dumps(error_msg)) except websockets.exceptions.ConnectionClosedError: print("连接异常关闭") except Exception as e: print(f"未处理的异常: {e}")
心跳机制
async def heartbeat(websocket): """保持连接活跃""" try: while True: await asyncio.sleep(30) # 每30秒发送一次心跳 await websocket.ping() except websockets.exceptions.ConnectionClosed: print("心跳停止")
消息重连机制
async def resilient_websocket_client(url, max_retries=5): """具有重连机制的WebSocket客户端""" retry_count = 0 while retry_count < max_retries: try: async with websockets.connect(url) as websocket: print("连接成功") retry_count = 0 # 重置重试计数 async for message in websocket: # 处理消息 handle_message(message) except websockets.exceptions.ConnectionClosed: retry_count += 1 wait_time = 2 ** retry_count # 指数退避 print(f"连接断开,{wait_time}秒后重试...") await asyncio.sleep(wait_time) print("达到最大重试次数,连接失败")
8. WebSocket安全考虑
1. 身份验证
async def authenticated_handler(websocket, path): """需要身份验证的WebSocket处理器""" try: # 检查认证令牌 auth_token = await websocket.recv() if not validate_token(auth_token): await websocket.close(code=1008, reason="认证失败") return # 认证通过,继续处理 async for message in websocket: await process_message(message) except: pass
2. 数据验证
def validate_websocket_message(data): """验证WebSocket消息""" # 检查消息大小 if len(data) > 1024 * 1024: # 1MB限制 return False # 检查消息格式 try: parsed = json.loads(data) if "type" not in parsed: return False return True except: return False
9. 性能优化技巧
使用消息队列
import asynciofrom collections import defaultdictclass WebSocketManager: def __init__(self): self.clients = defaultdict(set) self.message_queue = asyncio.Queue() async def broadcaster(self): """专门的消息广播器""" while True: room_id, message = await self.message_queue.get() if room_id in self.clients: tasks = [ client.send(message) for client in self.clients[room_id] ] if tasks: await asyncio.gather(*tasks, return_exceptions=True)
二进制数据传输
async def send_binary_data(websocket): """发送二进制数据""" # 文本数据 text_data = "Hello, WebSocket!" await websocket.send(text_data) # 二进制数据 binary_data = b'\x00\x01\x02\x03\x04' await websocket.send(binary_data)
10. 调试和测试
测试工具
# 使用pytest测试WebSocketimport pytestimport asyncioimport websockets@pytest.mark.asyncioasync def test_websocket_echo(): """测试回显功能""" async with websockets.serve(echo, "localhost", 8765): async with websockets.connect("ws://localhost:8765") as ws: test_message = "测试消息" await ws.send(test_message) response = await ws.recv() assert test_message in response
调试技巧
# 启用详细日志import logginglogging.basicConfig(level=logging.DEBUG)# 使用Wireshark等工具捕获WebSocket流量
总结
WebSocket是现代Web应用中实现实时通信的核心技术。通过学习今天的内容,你应该掌握:
实践任务
创建一个简单的WebSocket聊天室
实现一个实时数据推送服务(如实时天气、股票价格)
为WebSocket服务添加身份验证机制
明天我们将学习RPC框架基础,了解如何在分布式系统中进行远程过程调用。