用 Python 异步编程解决嵌入式串口通信难题
在嵌入式开发中,串口通信是最基础也最容易踩坑的环节。传统阻塞式写法在单任务场景下没问题,但当你需要同时监听多个串口、处理协议解析、驱动 OLED 显示时,阻塞式就成了瓶颈。
本文介绍一种基于 Python asyncio 的异步串口通信架构,让你的上位机工具从「能用」升级为「好用」。
传统方案的痛点
先看一段典型的串口读写代码:
importserialimporttimeser=serial.Serial('/dev/ttyUSB0',115200,timeout=1)# 阻塞式:发一条,等一条ser.write(b'AT+RST\r\n')time.sleep(0.5)response=ser.readline()print(response)
这段代码有三个致命问题:
- 1. 阻塞等待 —
readline() 期间 CPU 空转
在工业现场,串口通信往往需要 7×24 小时稳定运行。阻塞式方案在高并发场景下的可靠性堪忧。
异步方案架构
异步方案的核心思想是:用协程替代线程,用事件循环替代轮询。
核心代码
异步串口通信的关键是使用 asyncio.StreamReader 包装串口:
importasyncioimportserial_asyncioasyncdefread_sensor(reader,name:str):"""异步读取传感器数据"""whileTrue:line=awaitreader.readline()ifline:data=line.decode().strip()print(f"[{name}] 收到: {data}")asyncdefsend_command(writer,cmd:str):"""异步发送命令"""writer.write(f"{cmd}\r\n".encode())awaitwriter.drain()print(f"已发送: {cmd}")asyncdefmain():# 同时打开两个串口reader1,writer1=awaitserial_asyncio.open_serial_connection(url='/dev/ttyUSB0',baudrate=115200)reader2,writer2=awaitserial_asyncio.open_serial_connection(url='/dev/ttyUSB1',baudrate=9600)# 并发运行三个任务awaitasyncio.gather(read_sensor(reader1,"温度传感器"),read_sensor(reader2,"湿度传感器"),periodic_command(writer1),)
协议解析器
实际项目中,串口数据通常需要按帧解析。下面是一个通用的帧解析器:
classFrameParser:"""基于状态机的帧解析器帧格式: [0xAA] [0x55] [LEN] [DATA...] [CHECKSUM]"""def__init__(self):self._buffer=bytearray()self._state="WAIT_HEADER"deffeed(self,data:bytes)->list[dict]:"""喂入原始字节,返回解析出的帧列表"""self._buffer.extend(data)frames=[]whilelen(self._buffer)>0:ifself._state=="WAIT_HEADER":idx=self._buffer.find(b'\xaa\x55')ifidx<0:self._buffer.clear()breakself._buffer=self._buffer[idx:]self._state="READ_LENGTH"elifself._state=="READ_LENGTH":iflen(self._buffer)<3:break# 数据不够,等下次length=self._buffer[2]self._state="READ_DATA"self._expected=lengthelifself._state=="READ_DATA":total=3+self._expected+1# header + len + data + checksumiflen(self._buffer)<total:breakframe_data=self._buffer[3:3+self._expected]checksum=self._buffer[3+self._expected]ifself._verify_checksum(frame_data,checksum):frames.append({"data":bytes(frame_data)})self._buffer=self._buffer[total:]self._state="WAIT_HEADER"returnframes
性能对比
在实测环境中(STM32F407 + USB 转串口),异步方案的优势明显:
关键发现:当并发串口数超过 4 个时,阻塞式方案的帧丢失率急剧上升,而 asyncio 方案在 32 路并发下仍然保持稳定。
生产环境注意事项
在实际部署时,需要注意以下几点:
- 2. 重连策略 — 串口断线后自动重连,推荐使用指数退避
- 3. 数据持久化 — 关键数据写入 SQLite 或 InfluxDB
- 4. 日志记录 — 使用
structlog 结构化日志,便于排查问题
asyncdefheartbeat(writer,interval:float=30.0):"""定期发送心跳"""whileTrue:awaitasyncio.sleep(interval)awaitsend_command(writer,"PING")
总结
从阻塞式到异步,串口通信的架构升级并不复杂。核心收益:
如果你的上位机工具还在用 time.sleep() 等待串口响应,是时候试试 asyncio 了。
本文代码已在 Python 3.11 + pyserial-asyncio 0.6 环境下验证通过。完整代码见 GitHub 仓库[1]。