from zlgcan import *import threadingimport timeclass CANFDDevice: def __init__(self, device_type=ZCAN_USBCANFD_200U, device_index=0): """ 初始化CANFDDevice对象 Args: device_type: 设备类型,默认ZCAN_USBCANFD_200U device_index: 设备索引,默认0 """ self.device_type = device_type self.device_index = device_index self.zcanlib = None self.device_handle = None self.chn_handles = {} self.receive_callbacks = {} self.running = True def open_device(self): """ 打开设备 Returns: bool: 打开成功返回True,失败返回False """ print("正在打开设备...") self.zcanlib = ZCAN() self.device_handle = self.zcanlib.OpenDevice(self.device_type, self.device_index, 0) if self.device_handle == INVALID_DEVICE_HANDLE: print("打开设备失败") return False print(f"设备打开成功,句柄: {self.device_handle}") # 获取设备信息 info = self.zcanlib.GetDeviceInf(self.device_handle) print(f"设备信息: {info}") return True def open_channel(self, channel=0, bit_rate=500000, data_rate=2000000, res=1): """ 打开通道 Args: channel: 通道号,默认0 bit_rate: 仲裁域波特率,默认500000 data_rate: 数据域波特率,默认2000000 res: 终端电阻,默认1(启用) Returns: bool: 打开成功返回True,失败返回False """ print(f"正在配置通道{channel}...") # 设置仲裁域波特率 和 数据域波特率 ret = self.zcanlib.ZCAN_SetValue(self.device_handle, str(channel) + "/canfd_abit_baud_rate", str(bit_rate).encode("utf-8")) ret = self.zcanlib.ZCAN_SetValue(self.device_handle, str(channel) + "/canfd_dbit_baud_rate", str(data_rate).encode("utf-8")) if ret != ZCAN_STATUS_OK: print("设置波特率失败!") return False # 终端电阻 ret = self.zcanlib.ZCAN_SetValue(self.device_handle, str(channel) + "/initenal_resistance", str(res).encode("utf-8")) if ret != ZCAN_STATUS_OK: print("设置终端电阻失败!") return False # 初始化通道 chn_init_cfg = ZCAN_CHANNEL_INIT_CONFIG() chn_init_cfg.can_type = ZCAN_TYPE_CANFD # USBCANFD 必须选择CANFD chn_init_cfg.config.canfd.mode = 0 # 0-正常模式 1-只听模式 chn_handle = self.zcanlib.InitCAN(self.device_handle, channel, chn_init_cfg) if chn_handle is None: print("初始化通道失败!") return False # 启动通道 ret = self.zcanlib.StartCAN(chn_handle) if ret != ZCAN_STATUS_OK: print("启动通道失败!") return False self.chn_handles[channel] = chn_handle print(f"通道{channel}启动成功,句柄: {chn_handle}") return True def send_can_message(self, channel=0, can_id=0x123, data=None): """ 发送CAN消息 Args: channel: 通道号,默认0 can_id: CAN ID,默认0x123 data: 数据 Returns: int: 发送成功的消息数,失败返回0 """ if channel not in self.chn_handles: print(f"通道{channel}未打开") return 0 if data is None: print("数据不能为空") return 0 chn_handle = self.chn_handles[channel] # 发送CAN报文 transmit_num = 1 msgs = (ZCAN_Transmit_Data * transmit_num)() msgs[0].transmit_type = 0 # 0-正常发送,2-自发自收 msgs[0].frame.can_id = can_id # 发送id msgs[0].frame.can_dlc = len(data) # 数据长度 msgs[0].frame._pad |= 0x20 # 发送回显 for j in range(msgs[0].frame.can_dlc): msgs[0].frame.data[j] = data[j] ret = self.zcanlib.Transmit(chn_handle, msgs, transmit_num) if ret > 0: print(f"成功发送 {ret} 条CAN报文,ID: 0x{can_id:X}") else: print(f"发送CAN报文失败,返回值: {ret}") return ret def send_canfd_message(self, channel=0, can_id=0x123, data=None, brs=1): """ 发送CANFD消息 Args: channel: 通道号,默认0 can_id: CAN ID,默认0x123 data: 数据 brs: BRS标志,默认1(加速) Returns: int: 发送成功的消息数,失败返回0 """ if channel not in self.chn_handles: print(f"通道{channel}未打开") return 0 if data is None: print("数据不能为空") return 0 chn_handle = self.chn_handles[channel] # 发送CANFD报文 transmit_canfd_num = 1 canfd_msgs = (ZCAN_TransmitFD_Data * transmit_canfd_num)() canfd_msgs[0].transmit_type = 0 # 0-正常发送,2-自发自收 canfd_msgs[0].frame.can_id = can_id # ID canfd_msgs[0].frame.len = len(data) # 长度 canfd_msgs[0].frame.flags |= 0x20 # 发送回显 if brs: canfd_msgs[0].frame.flags |= 0x1 # BRS 加速标志位:0不加速,1加速 for j in range(canfd_msgs[0].frame.len): canfd_msgs[0].frame.data[j] = data[j] ret = self.zcanlib.TransmitFD(chn_handle, canfd_msgs, transmit_canfd_num) if ret > 0: print(f"成功发送 {ret} 条CANFD报文,ID: 0x{can_id:X}") else: print(f"发送CANFD报文失败,返回值: {ret}") return ret def receive_can_message(self, channel=0, timeout=100): """ 接收CAN消息 Args: channel: 通道号,默认0 timeout: 超时时间,默认100ms Returns: list: 接收到的消息列表 """ if channel not in self.chn_handles: print(f"通道{channel}未打开") return [] chn_handle = self.chn_handles[channel] messages = [] # 接收CAN消息 rcv_num = self.zcanlib.GetReceiveNum(chn_handle, ZCAN_TYPE_CAN) if rcv_num: if rcv_num > 100: rcv_msg, rcv_num = self.zcanlib.Receive(chn_handle, 100, timeout) else: rcv_msg, rcv_num = self.zcanlib.Receive(chn_handle, rcv_num, timeout) for msg in rcv_msg[:rcv_num]: frame = msg.frame direction = "TX" if frame._pad & 0x20 else "RX" frame_type = "扩展帧" if frame.can_id & (1 << 31) else "标准帧" frame_format = "远程帧" if frame.can_id & (1 << 30) else "数据帧" can_id = frame.can_id & 0x1FFFFFFF if frame.can_id & (1 << 30): data = [] dlc = 0 else: dlc = frame.can_dlc data = list(frame.data[:dlc]) message = { "timestamp": msg.timestamp, "channel": channel, "type": "CAN", "direction": direction, "frame_type": frame_type, "frame_format": frame_format, "can_id": can_id, "dlc": dlc, "data": data } messages.append(message) # 接收CANFD消息 rcv_canfd_num = self.zcanlib.GetReceiveNum(chn_handle, ZCAN_TYPE_CANFD) if rcv_canfd_num: if rcv_canfd_num > 100: rcv_canfd_msgs, rcv_canfd_num = self.zcanlib.ReceiveFD(chn_handle, 100, timeout) else: rcv_canfd_msgs, rcv_canfd_num = self.zcanlib.ReceiveFD(chn_handle, rcv_canfd_num, timeout) for msg in rcv_canfd_msgs[:rcv_canfd_num]: frame = msg.frame brs = "加速" if frame.flags & 0x1 else " " direction = "TX" if frame.flags & 0x20 else "RX" frame_type = "扩展帧" if frame.can_id & (1 << 31) else "标准帧" frame_format = "远程帧" if frame.can_id & (1 << 30) else "数据帧" can_id = frame.can_id & 0x1FFFFFFF data = list(frame.data[:frame.len]) message = { "timestamp": msg.timestamp, "channel": channel, "type": "CANFD" + brs, "direction": direction, "frame_type": frame_type, "frame_format": frame_format, "can_id": can_id, "dlc": frame.len, "data": data } messages.append(message) return messages def start_receive_thread(self, channel=0, callback=None): """ 启动接收线程 Args: channel: 通道号,默认0 callback: 回调函数,接收消息列表作为参数 """ if channel not in self.chn_handles: print(f"通道{channel}未打开") return False self.receive_callbacks[channel] = callback def receive_loop(): while self.running: if channel not in self.chn_handles: time.sleep(0.005) continue messages = self.receive_can_message(channel) if messages and callback: callback(messages) time.sleep(0.005) thread = threading.Thread(target=receive_loop, daemon=True) thread.start() print(f"通道{channel}接收线程已启动") return True def close_channel(self, channel=0): """ 关闭通道 Args: channel: 通道号,默认0 Returns: bool: 关闭成功返回True,失败返回False """ if channel not in self.chn_handles: print(f"通道{channel}未打开") return False chn_handle = self.chn_handles[channel] ret = self.zcanlib.ResetCAN(chn_handle) if ret == 1: del self.chn_handles[channel] if channel in self.receive_callbacks: del self.receive_callbacks[channel] print(f"通道{channel}关闭成功") return True else: print(f"通道{channel}关闭失败") return False def close_device(self): """ 关闭设备 Returns: bool: 关闭成功返回True,失败返回False """ # 停止接收线程 self.running = False time.sleep(0.1) # 等待接收线程停止 # 关闭所有通道 for channel in list(self.chn_handles.keys()): self.close_channel(channel) # 关闭设备 if self.device_handle: ret = self.zcanlib.CloseDevice(self.device_handle) if ret == 1: print("设备关闭成功") self.device_handle = None self.zcanlib = None return True else: print("设备关闭失败") return False else: print("设备未打开") return Falsedef main(): """ 主函数,测试CANFDDevice类 """ print("周立功USBCANFD200U设备测试") print("=" * 70) # 创建设备对象 device = CANFDDevice() # 打开设备 if not device.open_device(): return # 打开通道0 if not device.open_channel(0): device.close_device() return # 定义接收回调函数 def receive_callback(messages): for msg in messages: data_str = " ".join([f"{num:02X}" for num in msg["data"]]) print(f"[{msg['timestamp']}] CAN{msg['channel']}{msg['type']}\t{msg['direction']} ID: 0x{msg['can_id']:X}\t{msg['frame_type']}{msg['frame_format']}") print(f" DLC: {msg['dlc']}\tDATA(hex): {data_str}") print("-" * 70) # 启动接收线程 device.start_receive_thread(0, receive_callback) # 发送测试消息 print("\n发送测试消息...") # 发送CAN消息 can_data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08] device.send_can_message(0, 0x123, can_data) # 发送CANFD消息 canfd_data = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA] device.send_canfd_message(0, 0x456, canfd_data) # 等待用户输入停止 print("\n按回车键停止测试") input() # 关闭设备 device.close_device()if __name__ == "__main__": main()