MicroPython 在 Raspberry Pi Pico 上完全支持 PIO,可以使用 Python 语法编写 PIO 程序,无需单独的汇编文件。基础概念
MicroPython PIO 装饰器MicroPython 使用 @rp2.asm_pio 装饰器定义 PIO 程序:
import rp2@rp2.asm_pio(...)defmy_pio_program():# PIO 指令(Python 函数调用形式)pass
装饰器参数配置状态机的行为(移位方向、自动推送等)。
PIO UART TX(发送)实现
完整代码
import rp2from machine import Pin# PIO UART 发送程序@rp2.asm_pio( out_init=rp2.PIO.OUT_HIGH, # TX 引脚初始为高电平(空闲状态) out_shiftdir=rp2.PIO.SHIFT_RIGHT, # LSB 先发 sideset_init=rp2.PIO.OUT_HIGH,)defuart_tx():# 每 bit 8 个时钟周期 pull() .side(1) [7] # 从 FIFO 取数据,保持高电平(空闲)set(x, 7) .side(0) [7] # 发送起始位(低电平),计数器=7 label("bitloop") out(pins, 1) [6] # 输出 1 bit 数据 jmp(x_dec, "bitloop") # 循环 8 次# 停止位由下一次 pull 的 side(1) 自动产生# 初始化 PIO UART TXclassPIO_UART_TX:def__init__(self, pin_num, baudrate=115200): self.pin = Pin(pin_num, Pin.OUT, value=1)# 计算分频器:系统时钟 / (8 * 波特率)# Pico 默认 125MHz freq = baudrate * 8# 创建状态机 self.sm = rp2.StateMachine(0, # 状态机编号 0-7 uart_tx, # PIO 程序 freq=freq, # 状态机频率 out_base=self.pin, # 输出引脚 sideset_base=self.pin # sideset 引脚 ) self.sm.active(1) # 启动状态机defwrite(self, data):"""发送字节或字节数组"""ifisinstance(data, int): self.sm.put(data)else:for byte in data: self.sm.put(byte)defprint(self, text):"""发送字符串""" self.write(text.encode('utf-8'))# 使用示例if __name__ == "__main__":# 在 GP0 上创建 UART TX,波特率 115200 uart = PIO_UART_TX(pin_num=0, baudrate=115200)# 发送数据 uart.print("Hello from PIO UART!\r\n") uart.write(b"Raw bytes: \x41\x42\x43\r\n") uart.write(0x55) # 发送单个字节
PIO UART RX(接收)实现
完整代码
import rp2from machine import Pin# PIO UART 接收程序@rp2.asm_pio( in_shiftdir=rp2.PIO.SHIFT_RIGHT, # LSB 先入 push_thresh=8, # 8 位后自动 push autopush=True, # 自动推送到 FIFO)defuart_rx(): label("start") wait(0, pin, 0) # 等待起始位(低电平)set(x, 7) [10] # 延迟到第一个数据位中心 label("bitloop") in_(pins, 1) # 采样 1 bit jmp(x_dec, "bitloop") [6] # 循环 8 次# autopush 自动将数据推送到 FIFO# wrap 自动返回 start# 初始化 PIO UART RXclassPIO_UART_RX:def__init__(self, pin_num, baudrate=115200): self.pin = Pin(pin_num, Pin.IN, Pin.PULL_UP) freq = baudrate * 8 self.sm = rp2.StateMachine(1, # 状态机编号(与 TX 不同) uart_rx, freq=freq, in_base=self.pin, # 输入引脚 jmp_pin=self.pin # 用于条件跳转的引脚 ) self.sm.active(1)defany(self):"""检查是否有数据可读"""return self.sm.rx_fifo() > 0defread(self, count=1):"""读取指定数量的字节""" data = []for _ inrange(count): data.append(self.sm.get() & 0xFF)returnbytes(data)defreadline(self, timeout_ms=1000):"""读取一行(以 \\n 结尾)"""import time line = b"" start = time.ticks_ms()whileTrue:if self.any(): char = self.sm.get() & 0xFF line += bytes([char])if char == ord('\n'):break start = time.ticks_ms() # 重置超时if time.ticks_diff(time.ticks_ms(), start) > timeout_ms:breakreturn line# 使用示例if __name__ == "__main__": rx = PIO_UART_RX(pin_num=1, baudrate=115200)whileTrue:if rx.any(): data = rx.read(1)print("Received:", data)
完整双向 PIO UART 类
import rp2from machine import Pinimport time# ========== PIO 程序定义 ==========@rp2.asm_pio( out_init=rp2.PIO.OUT_HIGH, out_shiftdir=rp2.PIO.SHIFT_RIGHT, sideset_init=rp2.PIO.OUT_HIGH,)defuart_tx_prog(): pull() .side(1) [7]set(x, 7) .side(0) [7] label("bitloop") out(pins, 1) [6] jmp(x_dec, "bitloop")@rp2.asm_pio( in_shiftdir=rp2.PIO.SHIFT_RIGHT, push_thresh=8, autopush=True,)defuart_rx_prog(): label("start") wait(0, pin, 0)set(x, 7) [10] label("bitloop") in_(pins, 1) jmp(x_dec, "bitloop") [6]# ========== PIO UART 类 ==========classPIO_UART:""" 完整的 PIO UART 实现,支持发送和接收 参数: tx_pin: 发送引脚编号 rx_pin: 接收引脚编号 baudrate: 波特率(默认 115200) tx_sm: TX 状态机编号(0-7) rx_sm: RX 状态机编号(0-7) """def__init__(self, tx_pin, rx_pin, baudrate=115200, tx_sm=0, rx_sm=1): self.baudrate = baudrate freq = baudrate * 8# 初始化 TX self.tx_pin = Pin(tx_pin, Pin.OUT, value=1) self.sm_tx = rp2.StateMachine( tx_sm, uart_tx_prog, freq=freq, out_base=self.tx_pin, sideset_base=self.tx_pin ) self.sm_tx.active(1)# 初始化 RX self.rx_pin = Pin(rx_pin, Pin.IN, Pin.PULL_UP) self.sm_rx = rp2.StateMachine( rx_sm, uart_rx_prog, freq=freq, in_base=self.rx_pin, jmp_pin=self.rx_pin ) self.sm_rx.active(1)# ========== 发送方法 ==========defwrite(self, data):"""发送字节、字节数组或字符串"""ifisinstance(data, str): data = data.encode('utf-8')elifisinstance(data, int): data = bytes([data])for byte in data: self.sm_tx.put(byte)defwriteline(self, text):"""发送一行文本(自动添加 \\r\\n)""" self.write(text + "\r\n")# ========== 接收方法 ==========defany(self):"""返回 RX FIFO 中的字节数"""return self.sm_rx.rx_fifo()defread(self, count=1):"""阻塞读取指定数量的字节""" data = bytearray()for _ inrange(count): data.append(self.sm_rx.get() & 0xFF)returnbytes(data)defread_nonblocking(self, count=1):"""非阻塞读取,返回可用的数据""" data = bytearray()for _ inrange(min(count, self.any())): data.append(self.sm_rx.get() & 0xFF)returnbytes(data)defreadline(self, timeout_ms=1000):"""读取一行(以 \\n 结尾),带超时""" line = bytearray() start = time.ticks_ms()whileTrue:if self.any(): char = self.sm_rx.get() & 0xFF line.append(char)if char == ord('\n'):break start = time.ticks_ms()if time.ticks_diff(time.ticks_ms(), start) > timeout_ms:breakreturnbytes(line)# ========== 便捷方法 ==========defflush_rx(self):"""清空 RX FIFO"""while self.any(): self.sm_rx.get()defdeinit(self):"""停止状态机""" self.sm_tx.active(0) self.sm_rx.active(0)# ========== 使用示例 ==========if __name__ == "__main__":# 创建 PIO UART 实例# TX = GP0, RX = GP1, 波特率 115200 uart = PIO_UART(tx_pin=0, rx_pin=1, baudrate=115200)# 发送欢迎消息 uart.writeline("PIO UART Ready!") uart.write(b"Hello World!\r\n")# 回显循环print("Starting echo loop...")whileTrue:if uart.any(): data = uart.read_nonblocking(uart.any())print("Received:", data) uart.write(data) # 回显 time.sleep_ms(10)
多路 PIO UART 示例
import rp2from machine import Pinimport time# 使用前面定义的 uart_tx_prog 和 uart_rx_progclassMultiPIO_UART:""" 管理多路 PIO UART Pico 有 8 个状态机(PIO0: SM0-3, PIO1: SM4-7) 每路 UART 需要 2 个 SM(TX + RX) 所以最多支持 4 路全双工 UART """def__init__(self): self.uarts = {} self.next_sm = 0defadd_uart(self, name, tx_pin, rx_pin, baudrate=115200):"""添加一路 UART"""if self.next_sm >= 8:raise RuntimeError("No more state machines available") uart = PIO_UART( tx_pin=tx_pin, rx_pin=rx_pin, baudrate=baudrate, tx_sm=self.next_sm, rx_sm=self.next_sm + 1 ) self.uarts[name] = uart self.next_sm += 2return uartdefget(self, name):"""获取指定名称的 UART"""return self.uarts.get(name)deflist_uarts(self):"""列出所有 UART"""returnlist(self.uarts.keys())# 使用示例:创建 4 路 UARTif __name__ == "__main__": multi = MultiPIO_UART()# 创建 4 路 UART(使用全部 8 个状态机) uart0 = multi.add_uart("UART0", tx_pin=0, rx_pin=1, baudrate=115200) uart1 = multi.add_uart("UART1", tx_pin=2, rx_pin=3, baudrate=9600) uart2 = multi.add_uart("UART2", tx_pin=4, rx_pin=5, baudrate=115200) uart3 = multi.add_uart("UART3", tx_pin=6, rx_pin=7, baudrate=19200)print("Available UARTs:", multi.list_uarts())# 向每路 UART 发送消息 uart0.writeline("Hello from UART0") uart1.writeline("Hello from UART1") uart2.writeline("Hello from UART2") uart3.writeline("Hello from UART3")# 轮询接收whileTrue:for name, uart in multi.uarts.items():if uart.any(): data = uart.read_nonblocking(uart.any())print(f"{name} received: {data}") time.sleep_ms(10)
PIO 指令参考(MicroPython 语法)
| | |
|---|
jmp | jmp(condition, "label") | |
wait | wait(polarity, src, index) | |
in | in_(source, bit_count) | |
out | out(dest, bit_count) | |
push | push() | |
pull | pull() | |
mov | mov(dest, source) | |
irq | irq(mode, index) | |
set | set(dest, value) | |
nop | nop() | |
wrap_target | wrap_target() | |
wrap | wrap() | |
.side(n) | .side(n) | |
[delay] | [n] | |
常见问题与注意事项
1. 状态机编号- Pico 共有 8 个状态机:
0-3(PIO0)和 4-7(PIO1)
2. 波特率精度波特率计算公式:
fsm=baudrate×8fsm=baudrate×8
系统时钟 $125,$MHz 时,常见波特率精度良好:
- 9600: fsm=76800fsm=76800 Hz ✓
- 115200: fsm=921600fsm=921600 Hz ✓
- 1000000: fsm=8000000fsm=8000000 Hz ✓
3. FIFO 深度- 每个状态机有 4 级 TX FIFO 和 4 级 RX FIFO
4. 与硬件 UART 对比