目录
一.背景
二. 硬件环境
1.CAN通信设备之PCAN
2.物理架构图
三. 资料分析
四. 二次开发
五. 应用
六. 总结
一.背景
在汽车电子领域中CAN是一种被广泛应用的通信协议,CAN 是controller area network 的缩写(以下称为can),是iso国际标准化的串行通信协议。
关于CAN的介绍可以参考这边博客:一文读懂CAN总线协议 (超详细配34张高清图)_can总线协议详解-CSDN博客
由于项目(UWB项目)需要对部分CAN Device进行通信以及相关功能性测试,所以就需要开发出一套基于CAN的测试工具来对多个Device进行CAN通信测试!
关于UWB(Ultra-Wideband,超宽带)是一种无线通信技术,详见这篇文章:
UWB技术了解_uwb pdoa-CSDN博客
二. 硬件环境
1.CAN通信设备之PCAN
我这边使用的CAN通信设备是PCAN(购买自淘宝),如下图:
将PCAN 的两个pin脚(CAN high 和CAN low)分别接到自己CAN device的 high 和 low两个pin 即可。
因为CAN通信本身就是通过双绞线进行通信的;至于接地的线可接可不接!
关于PCAN的更多详细资料可以问淘宝卖家索取;资料包会包含PCAN的客户端应用程序(PCAN-View),一些文档以及示例代码。
关于PCAN-View的使用在此就不过多赘述了,卖家的资料包里有相应的手册。
备注:关于CAN通信设备的选择只要支持CAN FD通信都可以,并不是一定要上图的PCAN设备!
2.物理架构图
关于硬件测试环境的架构图见下图(纯手绘),主要是便于各位理解硬件环境,请忽略图的美丑..
简单介绍一下:
硬件环境主要是由两台CAN Devices 各通过一个PCAN连接PC,PC上的测试工具会通过PCAN的USB口来区分两台CAN Devices;也可以通过两台CAN Device的CAN ID(CAN ID需不同)来区分Device。
三. 资料分析
在PCAN的资料包中包含一些供二次开发的库以及example code,其中PCANBasic.dll是官方封装好的动态链接库,其中提供了一些开放的供调用的接口,包括read & write等基础的CAN通信操作
PCANBasic.py文件是python编写的调用PCANBasic.dll封装了一些python的方法,包括open/ read/write/close等等
class PCANBasic: """ PCAN-Basic API class implementation """ def __init__(self): # 初始化操作,检查dll文件是否存在 # Loads the PCANBasic API # if platform.system() == 'Windows': # Loads the API on Windows self.__m_dllBasic = windll.LoadLibrary("PCANBasic") elif platform.system() == 'Linux': # Loads the API on Linux self.__m_dllBasic = cdll.LoadLibrary("libpcanbasic.so") elif platform.system() == 'Darwin': # Loads the API on Mac # # NOTE: # ~~~~~ # The macOS library for PCAN-USB interfaces from PEAK-System, PCBUSB library, # is a third-party software creaded and mantained by the MacCAN project. For # information and support, please contact MacCAN (info@mac-can). # self.__m_dllBasic = cdll.LoadLibrary("libPCBUSB.dylib") if self.__m_dllBasic == None: print ("Exception: The PCAN-Basic DLL couldn't be loaded!") # Initializes a PCAN Channel # def Initialize( self, Channel, Btr0Btr1, HwType = TPCANType(0), IOPort = c_uint(0), Interrupt = c_ushort(0)): """ Initializes a PCAN Channel Parameters: Channel : A TPCANHandle representing a PCAN Channel Btr0Btr1 : The speed for the communication (BTR0BTR1 code) HwType : Non-PnP: The type of hardware and operation mode IOPort : Non-PnP: The I/O address for the parallel port Interrupt: Non-PnP: Interrupt number of the parallel port Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_Initialize(Channel,Btr0Btr1,HwType,IOPort,Interrupt) return TPCANStatus(res) except: print ("Exception on PCANBasic.Initialize") raise # Initializes a FD capable PCAN Channel # def InitializeFD( self, Channel, BitrateFD): """ Initializes a FD capable PCAN Channel Parameters: Channel : The handle of a FD capable PCAN Channel BitrateFD : The speed for the communication (FD bit rate string) Remarks: See PCAN_BR_* values. * parameter and values must be separated by '=' * Couples of Parameter/value must be separated by ',' * Following Parameter must be filled out: f_clock, data_brp, data_sjw, data_tseg1, data_tseg2, nom_brp, nom_sjw, nom_tseg1, nom_tseg2. * Following Parameters are optional (not used yet): data_ssp_offset, nom_sam Example: f_clock=80000000,nom_brp=10,nom_tseg1=5,nom_tseg2=2,nom_sjw=1,data_brp=4,data_tseg1=7,data_tseg2=2,data_sjw=1 Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_InitializeFD(Channel,BitrateFD) return TPCANStatus(res) except: print ("Exception on PCANBasic.InitializeFD") raise # Uninitializes one or all PCAN Channels initialized by CAN_Initialize # def Uninitialize( self, Channel): """ Uninitializes one or all PCAN Channels initialized by CAN_Initialize Remarks: Giving the TPCANHandle value "PCAN_NONEBUS", uninitialize all initialized channels Parameters: Channel : A TPCANHandle representing a PCAN Channel Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_Uninitialize(Channel) return TPCANStatus(res) except: print ("Exception on PCANBasic.Uninitialize") raise # Resets the receive and transmit queues of the PCAN Channel # def Reset( self, Channel): """ Resets the receive and transmit queues of the PCAN Channel Remarks: A reset of the CAN controller is not performed Parameters: Channel : A TPCANHandle representing a PCAN Channel Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_Reset(Channel) return TPCANStatus(res) except: print ("Exception on PCANBasic.Reset") raise # Gets the current status of a PCAN Channel # def GetStatus( self, Channel): """ Gets the current status of a PCAN Channel Parameters: Channel : A TPCANHandle representing a PCAN Channel Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_GetStatus(Channel) return TPCANStatus(res) except: print ("Exception on PCANBasic.GetStatus") raise # Reads a CAN message from the receive queue of a PCAN Channel # def Read( self, Channel): """ Reads a CAN message from the receive queue of a PCAN Channel Remarks: The return value of this method is a 3-touple, where the first value is the result (TPCANStatus) of the method. The order of the values are: [0]: A TPCANStatus error code [1]: A TPCANMsg structure with the CAN message read [2]: A TPCANTimestamp structure with the time when a message was read Parameters: Channel : A TPCANHandle representing a PCAN Channel Returns: A touple with three values """ try: msg = TPCANMsg() timestamp = TPCANTimestamp() res = self.__m_dllBasic.CAN_Read(Channel,byref(msg),byref(timestamp)) return TPCANStatus(res),msg,timestamp except: print ("Exception on PCANBasic.Read") raise # Reads a CAN message from the receive queue of a FD capable PCAN Channel # def ReadFD( self, Channel): """ Reads a CAN message from the receive queue of a FD capable PCAN Channel Remarks: The return value of this method is a 3-touple, where the first value is the result (TPCANStatus) of the method. The order of the values are: [0]: A TPCANStatus error code [1]: A TPCANMsgFD structure with the CAN message read [2]: A TPCANTimestampFD that is the time when a message was read Parameters: Channel : The handle of a FD capable PCAN Channel Returns: A touple with three values """ try: msg = TPCANMsgFD() timestamp = TPCANTimestampFD() res = self.__m_dllBasic.CAN_ReadFD(Channel,byref(msg),byref(timestamp)) return TPCANStatus(res),msg,timestamp except: print ("Exception on PCANBasic.ReadFD") raise # Transmits a CAN message # def Write( self, Channel, MessageBuffer): """ Transmits a CAN message Parameters: Channel : A TPCANHandle representing a PCAN Channel MessageBuffer: A TPCANMsg representing the CAN message to be sent Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_Write(Channel,byref(MessageBuffer)) return TPCANStatus(res) except: print ("Exception on PCANBasic.Write") raise # Transmits a CAN message over a FD capable PCAN Channel # def WriteFD( self, Channel, MessageBuffer): """ Transmits a CAN message over a FD capable PCAN Channel Parameters: Channel : The handle of a FD capable PCAN Channel MessageBuffer: A TPCANMsgFD buffer with the message to be sent Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_WriteFD(Channel,byref(MessageBuffer)) return TPCANStatus(res) except: print ("Exception on PCANBasic.WriteFD") raise # Configures the reception filter # def FilterMessages( self, Channel, FromID, ToID, Mode): """ Configures the reception filter Remarks: The message filter will be expanded with every call to this function. If it is desired to reset the filter, please use the 'SetValue' function. Parameters: Channel : A TPCANHandle representing a PCAN Channel FromID : A c_uint value with the lowest CAN ID to be received ToID : A c_uint value with the highest CAN ID to be received Mode : A TPCANMode representing the message type (Standard, 11-bit identifier, or Extended, 29-bit identifier) Returns: A TPCANStatus error code """ try: res = self.__m_dllBasic.CAN_FilterMessages(Channel,FromID,ToID,Mode) return TPCANStatus(res) except: print ("Exception on PCANBasic.FilterMessages") raise # Retrieves a PCAN Channel value # def GetValue( self, Channel, Parameter): """ Retrieves a PCAN Channel value Remarks: Parameters can be present or not according with the kind of Hardware (PCAN Channel) being used. If a parameter is not available, a PCAN_ERROR_ILLPARAMTYPE error will be returned. The return value of this method is a 2-touple, where the first value is the result (TPCANStatus) of the method and the second one, the asked value Parameters: Channel : A TPCANHandle representing a PCAN Channel Parameter : The TPCANParameter parameter to get Returns: A touple with 2 values """ try: if Parameter == PCAN_API_VERSION or Parameter == PCAN_HARDWARE_NAME or Parameter == PCAN_CHANNEL_VERSION or Parameter == PCAN_LOG_LOCATION or Parameter == PCAN_TRACE_LOCATION or Parameter == PCAN_BITRATE_INFO_FD or Parameter == PCAN_IP_ADDRESS or Parameter == PCAN_FIRMWARE_VERSION or Parameter == PCAN_DEVICE_PART_NUMBER: mybuffer = create_string_buffer(256) elif Parameter == PCAN_ATTACHED_CHANNELS: res = self.GetValue(Channel, PCAN_ATTACHED_CHANNELS_COUNT) if TPCANStatus(res[0]) != PCAN_ERROR_OK: return TPCANStatus(res[0]), mybuffer = (TPCANChannelInformation * res[1])() elif Parameter == PCAN_ACCEPTANCE_FILTER_11BIT or Parameter == PCAN_ACCEPTANCE_FILTER_29BIT: mybuffer = c_int64(0) else: mybuffer = c_int(0) res = self.__m_dllBasic.CAN_GetValue(Channel,Parameter,byref(mybuffer),sizeof(mybuffer)) if Parameter == PCAN_ATTACHED_CHANNELS: return TPCANStatus(res),mybuffer else: return TPCANStatus(res),mybuffer.value except: print ("Exception on PCANBasic.GetValue") raise # Returns a descriptive text of a given TPCANStatus # error code, in any desired language # def SetValue( self, Channel, Parameter, Buffer): """ Configures or sets a PCAN Channel value Remarks: Parameters can be present or not according with the kind of Hardware (PCAN Channel) being used. If a parameter is not available, a PCAN_ERROR_ILLPARAMTYPE error will be returned. Parameters: Channel : A TPCANHandle representing a PCAN Channel Parameter : The TPCANParameter parameter to set Buffer : Buffer with the value to be set BufferLength : Size in bytes of the buffer Returns: A TPCANStatus error code """ try: if Parameter == PCAN_LOG_LOCATION or Parameter == PCAN_LOG_TEXT or Parameter == PCAN_TRACE_LOCATION: mybuffer = create_string_buffer(256) elif Parameter == PCAN_ACCEPTANCE_FILTER_11BIT or Parameter == PCAN_ACCEPTANCE_FILTER_29BIT: mybuffer = c_int64(0) else: mybuffer = c_int(0) mybuffer.value = Buffer res = self.__m_dllBasic.CAN_SetValue(Channel,Parameter,byref(mybuffer),sizeof(mybuffer)) return TPCANStatus(res) except: print ("Exception on PCANBasic.SetValue") raise def GetErrorText( self, Error, Language = 0): """ Returns a descriptive text of a given TPCANStatus error code, in any desired language Remarks: The current languages available for translation are: Neutral (0x00), German (0x07), English (0x09), Spanish (0x0A), Italian (0x10) and French (0x0C) The return value of this method is a 2-touple, where the first value is the result (TPCANStatus) of the method and the second one, the error text Parameters: Error : A TPCANStatus error code Language : Indicates a 'Primary language ID' (Default is Neutral(0)) Returns: A touple with 2 values """ try: mybuffer = create_string_buffer(256) res = self.__m_dllBasic.CAN_GetErrorText(Error,Language,byref(mybuffer)) return TPCANStatus(res),mybuffer.value except: print ("Exception on PCANBasic.GetErrorText") raise def LookUpChannel( self, Parameters): """ Finds a PCAN-Basic channel that matches with the given parameters Remarks: The return value of this method is a 2-touple, where the first value is the result (TPCANStatus) of the method and the second one a TPCANHandle value Parameters: Parameters : A comma separated string contained pairs of parameter-name/value to be matched within a PCAN-Basic channel Returns: A touple with 2 values """ try: mybuffer = TPCANHandle(0) res = self.__m_dllBasic.CAN_LookUpChannel(Parameters,byref(mybuffer)) return TPCANStatus(res),mybuffer except: print ("Exception on PCANBasic.LookUpChannel") raise
关于函数的作用就不过多解释了,注释部分写的相当清楚!
四. 二次开发
虽然有了官方提供的PCANBasic.py的封装,但是距离我们实际使用还有一些距离,官方提供的读写操作方法用起来还有诸多不便,所以在此基础上结合我自己的项目进行了二次开发,封装后可以直接读写我封装好的CAN 帧数据(不同项目对CAN Frame data的定义是不同的)。
由于一台PC可能连接多个CAN Device,所以需要一个 PcanDeviceManager的类来获取device locations
import enumimport sysimport timeimport mathfrom enum import IntEnumimport PCANBasic as pcanclass PcanDeviceManager: # gets the unique locations of connected PCAN devices @staticmethod def get_device_locations(): try: pcan_interface = pcan.PCANBasic() except Exception: raise LookupError("Unable to find PCANBasic.dll library!") handle = [] for channel in EnumPcanUSBChannel: api_result = pcan_interface.GetValue(channel.value, Parameter=pcan.PCAN_HARDWARE_NAME) if api_result[0] != pcan.PCAN_ERROR_OK: continue # PCAN channel get hardware name failed if api_result[1].decode('utf-8') != 'PCAN-USB FD': raise AssertionError('Deteced channel is not PCAN-USB FD') handle.append(channel.value.value) return handle
接下来就是基于官方的PCANBasic.py文件二次开发的pcan_device.py的代码:
class PcanDevice: pcan_interface = object device_id = b"" max_can_payload_bytes = None is_connected = False handle = None is_fd = None def __init__(self, device_id, device_channel, max_can_payload_bytes: int, is_fd: bool = True, use_extended_id: bool = False, enable_warnings: bool = True, en_fragmented_messages: bool = True) -> None: try: self.pcan_interface = pcan.PCANBasic() except Exception as err: raise LookupError("Unable to find PCANBasic.dll library:{}!".format(err)) assert is_fd or max_can_payload_bytes <= 8, "Classical CAN supports maximum 8 payload bytes" assert max_can_payload_bytes <= 64, "CAN-FD supports a maximum of 64 payload bytes" self.device_id = device_id self.max_can_payload_bytes = max_can_payload_bytes self.is_connected = False self.device_channel = device_channel self.is_fd = is_fd self.enable_warnings = enable_warnings self.en_fragmented_messages = en_fragmented_messages # 帧长度是否大于64字节 if use_extended_id: self.extended_id_bit = pcan.PCAN_MESSAGE_EXTENDED.value else: self.extended_id_bit = pcan.PCAN_MESSAGE_STANDARD.value # Opens the initialized device based on input parameters and the pre-initialized device_location def open(self): """ Setup connection with CAN device Returns: status """ can_fd_bitrate_string = self.get_can_fd_bitrate_params() if self.is_fd: api_result = self.pcan_interface.InitializeFD(self.device_channel, can_fd_bitrate_string) self.pcan_interface.ReadFD(self.device_channel) time.sleep(0.01) else: api_result = self.pcan_interface.Initialize(self.device_channel, pcan.PCAN_BAUD_1M) if api_result != pcan.PCAN_ERROR_OK: return EnumPcanStatus.PCAN_STATUS_ERR_GENERAL if api_result != pcan.PCAN_ERROR_OK: return EnumPcanStatus.PCAN_STATUS_ERR_GENERAL self.is_connected = True return EnumPcanStatus.PCAN_STATUS_OK def close(self): """ Close CAN device connection Returns: status """ self.pcan_interface.Uninitialize(self.device_channel) self.is_connected = False return EnumPcanStatus.PCAN_STATUS_OK def reset(self): """ Reset CAN Device to init state """ self.pcan_interface.Reset(self.device_channel) def get_device_info(self): """ Get the CAN device information, for example: device id, device channel, and so on. Returns: a dict contains device information. """ device_info = {} device_name_resp = self.pcan_interface.GetValue(self.device_channel, Parameter=pcan.PCAN_HARDWARE_NAME) if device_name_resp[0] != pcan.PCAN_ERROR_OK: raise AssertionError('Exception on PCANBasic.GetValue(PCAN_HARDWARE_NAME)') device_info['device_name'] = device_name_resp[1].decode('utf-8') device_id_resp = self.pcan_interface.GetValue(self.device_channel, Parameter=pcan.PCAN_DEVICE_ID) if device_id_resp[0] != pcan.PCAN_ERROR_OK: raise AssertionError('Exception on PCANBasic.GetValue(PCAN_DEVICE_ID)') device_info['device_id'] = device_id_resp[1] channel_version_resp = self.pcan_interface.GetValue(self.device_channel, Parameter=pcan.PCAN_CHANNEL_VERSION) if channel_version_resp[0] != pcan.PCAN_ERROR_OK: raise AssertionError('Exception on PCANBasic.GetValue(PCAN_CHANNEL_VERSION)') device_info['channel'] = self.device_channel device_info['channel_version'] = channel_version_resp[1] return device_info def transmit_can_message(self, uci_message: list[int], append_crc: bool = False, timeout_ms: int = 500) -> list[ EnumPcanStatus | list[int] | bool]: """ Transmit message to CAN devices Args: uci_message: can message, type: list append_crc: append crc to CAN message timeout_ms: timeout, unit: ms Returns: [status, uci_message, is_crc_valid] """ def transmit_CAN_message(uci_fragment: list[int]): assert uci_fragment is not None, "uci_message shall not be empty" assert len(uci_fragment) <= 8, "Fragmentation failed: Classic CAN messages have a maximum length of 8 bytes" msgCanMessage = pcan.TPCANMsg() msgCanMessage.ID = self.device_id msgCanMessage.LEN = len(uci_fragment) msgCanMessage.MSGTYPE = self.extended_id_bit # Assign and pad CAN message buffer for i in range(8): if i < len(uci_fragment): msgCanMessage.DATA[i] = uci_fragment[i] else: msgCanMessage.DATA[i] = CAN_PADDING_BYTE_VALUE # Transfer and process resulting status status_pcan_api = self.pcan_interface.Write(self.device_channel, msgCanMessage) match status_pcan_api: case pcan.PCAN_ERROR_OK: status = EnumPcanStatus.PCAN_STATUS_OK transmitted_data = list(msgCanMessage.DATA[0:len(uci_fragment)]) case _: status = EnumPcanStatus.PCAN_STATUS_ERR_GENERAL transmitted_data = None if self.enable_warnings: console.print(f"PCAN error code: {status_pcan_api}", console.StrColor.RED) return [status, transmitted_data] def transmit_CAN_FD_message(uci_fragment: list[int]): assert uci_fragment is not None, "uci_message shall not be empty" assert len(uci_fragment) <= 64, "Fragmentation failed: CAN-FD messages have a maximum length of 64 bytes" msgCanMessageFD = pcan.TPCANMsgFD() msgCanMessageFD.ID = self.device_id msgCanMessageFD.DLC = self.encode_fd_dlc(len(uci_fragment)) msgCanMessageFD.MSGTYPE = pcan.PCAN_MESSAGE_FD # self.extended_id_bit | pcan.PCAN_MESSAGE_FD.value | pcan.PCAN_MESSAGE_BRS.value # Assign and pad CAN message buffer for i in range(self.decode_fd_dlc(msgCanMessageFD.DLC)): if i < len(uci_fragment): msgCanMessageFD.DATA[i] = uci_fragment[i] else: msgCanMessageFD.DATA[i] = CAN_PADDING_BYTE_VALUE # Transfer and process resulting status status_pcan_api = self.pcan_interface.WriteFD(self.device_channel, msgCanMessageFD) match status_pcan_api: case pcan.PCAN_ERROR_OK: status = EnumPcanStatus.PCAN_STATUS_OK transmitted_data = list(msgCanMessageFD.DATA[0:len(uci_fragment)]) case _: status = EnumPcanStatus.PCAN_STATUS_ERR_GENERAL transmitted_data = None if self.enable_warnings: console.print(f"PCAN error code: {status_pcan_api}", console.StrColor.RED) return [status, transmitted_data] uci_message = list(uci_message) # Append CRC16 if required if append_crc == True: crc = nxp_crc.calculate_crc(frame=uci_message) uci_message = uci_message + crc.to_bytes(length=2, byteorder="little", signed=False) if self.is_fd: max_can_message_len = 64 else: max_can_message_len = 8 uci_fragments = [] if not self.en_fragmented_messages: # 判断是否是可分割的消息 uci_fragments.append(uci_message) else: uci_message_len = len(uci_message) uci_header_len = 4 uci_payload_len = uci_message_len - uci_header_len max_payload_len = max_can_message_len - uci_header_len num_fragments = max(math.ceil(uci_payload_len / max_payload_len), 1) last_payload_len = uci_payload_len - (num_fragments - 1) * max_payload_len header_with_pbf = [uci_message[0] | 0x10, uci_message[1], max_payload_len >> 8 & 0xFF, max_payload_len & 0xFF] header_without_pbf = [uci_message[0] & 0xEF, uci_message[1], last_payload_len >> 8 & 0xFF, last_payload_len & 0xFF] for i in range(num_fragments): payload_start_idx = uci_header_len + i * max_payload_len payload_end_idx = min(payload_start_idx + max_payload_len, uci_message_len) if i + 1 < num_fragments: curr_header = header_with_pbf else: curr_header = header_without_pbf curr_fragment = curr_header + uci_message[payload_start_idx: payload_end_idx] uci_fragments.append(curr_fragment) for uci_fragment in uci_fragments: if self.is_fd: [status, transmitted_data] = transmit_CAN_FD_message(uci_fragment) else: [status, transmitted_data] = transmit_CAN_message(uci_fragment) pass time.sleep(0.02) is_crc_valid = None # Implemented for compatibility with nxp_ft4222, serves no real purpose return [status, uci_message, is_crc_valid] def receive_can_message(self, timeout_ms: int = 500, crc_enable=False) -> list[EnumPcanStatus | list[int] | int]: """ Receive CAN message Args: timeout_ms: timeout, unit:ms crc_enable: enable CRC, True or False Returns: Received message """ def receive_CAN_message(): response_pcan_api = self.pcan_interface.Read(self.device_channel) status_pcan_api = response_pcan_api[0] match status_pcan_api: case pcan.PCAN_ERROR_OK: status = EnumPcanStatus.PCAN_STATUS_OK received_data = list(response_pcan_api[1].DATA[0:response_pcan_api[1].LEN]) case pcan.PCAN_ERROR_QRCVEMPTY: status = EnumPcanStatus.PCAN_STATUS_WAITING received_data = None case _: status = EnumPcanStatus.PCAN_STATUS_ERR_GENERAL received_data = None if self.enable_warnings: console.print("PCAN error: " + status.name, console.StrColor.RED) message_id = response_pcan_api[1].ID return [status, received_data, message_id] def receive_CAN_FD_message(): response_pcan_api = self.pcan_interface.ReadFD(self.device_channel) status_pcan_api = response_pcan_api[0] if response_pcan_api[0] == pcan.PCAN_ERROR_QRCVEMPTY: print('BBBBB :response pcan status:{}\n msg.ID:{}\n msg.dlc:{}\n msg.DATA:{}\n'.format(response_pcan_api[0], response_pcan_api[1].ID, response_pcan_api[1].DLC, list(response_pcan_api[1].DATA))) match status_pcan_api: case pcan.PCAN_ERROR_OK: status = EnumPcanStatus.PCAN_STATUS_OK received_data = list(response_pcan_api[1].DATA[0:self.decode_fd_dlc(response_pcan_api[1].DLC)]) case pcan.PCAN_ERROR_QRCVEMPTY: status = EnumPcanStatus.PCAN_STATUS_RCVEMPTY #received_data = None received_data = list(response_pcan_api[1].DATA[0:self.decode_fd_dlc(response_pcan_api[1].DLC)]) # return [status, received_data, response_pcan_api[1].ID] case pcan.PCAN_ERROR_BUSWARNING | pcan.PCAN_ERROR_BUSLIGHT | pcan.PCAN_ERROR_BUSHEAVY | pcan.PCAN_ERROR_BUSPASSIVE: status = EnumPcanStatus.PCAN_STATUS_WAITING received_data = None case 0x40008: status = EnumPcanStatus.PCAN_STATUS_WAITING received_data = None case _: status = EnumPcanStatus.PCAN_STATUS_ERR_GENERAL received_data = None if self.enable_warnings: console.print("PCAN error: " + status.name, console.StrColor.RED) message_id = response_pcan_api[1].ID return [status, received_data, message_id] message_id = None final_received = False uci_fragments = [] uci_header = [] uci_payload = [] # Receive message(s) while not final_received: # Try to receive a message if self.is_fd: [status, received_data_fragment, message_id_new] = receive_CAN_FD_message() else: [status, received_data_fragment, message_id_new] = receive_CAN_message() # Proceed according to status match status: case EnumPcanStatus.PCAN_STATUS_OK: # Check if message ID is consistent if message_id is not None and not message_id == message_id_new: assert AssertionError("Fragmented message was interrupted by a new message. This is currently not supported.") message_id = message_id_new uci_fragments.append(received_data_fragment) uci_header = received_data_fragment[0:4] uci_payload = uci_payload + received_data_fragment[4:] pbf = (uci_header[0] & 0x10) >> 4 final_received = not int(pbf) # pbf: o means the last segment; 1 means the packet is a segment of a UCI message and further packets will be transmitted case EnumPcanStatus.PCAN_STATUS_ERR_GENERAL: status = EnumPcanStatus.PCAN_STATUS_ERR_GENERAL uci_payload = None return [status, uci_payload, uci_fragments, message_id] case EnumPcanStatus.PCAN_STATUS_RCVEMPTY: return [status, [], None] # return empty uci_message case _: assert AssertionError("Uncaught EnumPcanStatus. This point should never be reached") status = EnumPcanStatus.PCAN_STATUS_OK if uci_header: payload_len = len(uci_payload) uci_header = [uci_header[0], uci_header[1], payload_len >> 8 & 0xFF, payload_len & 0xFF] uci_message = uci_header + uci_payload else: uci_message = None is_crc_valid = None if crc_enable: received_crc = uci_message[-2] | (uci_message[-1] << 8) is_crc_valid = nxp_crc.is_crc_valid(uci_message[:-2], received_crc) # Not returned, but kept for future integration return [status, uci_message, is_crc_valid] # TODO: implement custom CAN frequency. # currently, settings for CAN-FD arbitration field 1Mbps and data rate 2Mbps bitrates are hard-coded def get_can_fd_bitrate_params(self): """ 当前项目默认的时钟频率以及波特率等参数,这里返回一个固定值 Returns: arb_1mbps_data_5mbps """ arb_1mbps_data_2mbps = b'f_clock=80000000,nom_brp=10,nom_tseg1=5,nom_tseg2=2,nom_sjw=1,data_brp=4,data_tseg1=7,data_tseg2=2,data_sjw=1' arb_1mbps_data_5mbps = b'f_clock_mhz=80, nom_brp=1, nom_tseg1=59, nom_tseg2=20, nom_sjw=20, data_brp=1, data_tseg1=11, data_tseg2=4, data_sjw=4' # PCAN configuration for NCJ29D6 return arb_1mbps_data_5mbps def encode_fd_dlc(self, data_length_bytes): """ Args: data_length_bytes: The length of the transmitted data Returns: CAN FD data byte count """ dlc = 0 if data_length_bytes <= 8: dlc = data_length_bytes elif data_length_bytes <= 12: dlc = 9 elif data_length_bytes <= 16: dlc = 10 elif data_length_bytes <= 20: dlc = 11 elif data_length_bytes <= 24: dlc = 12 elif data_length_bytes <= 32: dlc = 13 elif data_length_bytes <= 48: dlc = 14 else: dlc = 15 return dlc def decode_fd_dlc(self, dlc): """ According the CAN data count decode out actual data length Args: dlc:CAN data length count Returns: data length bytes """ data_length_bytes = 0 if dlc <= 8: data_length_bytes = dlc elif dlc == 9: data_length_bytes = 12 elif dlc == 10: data_length_bytes = 16 elif dlc == 11: data_length_bytes = 20 elif dlc == 12: data_length_bytes = 24 elif dlc == 13: data_length_bytes = 32 elif dlc == 14: data_length_bytes = 48 else: data_length_bytes = 64 return data_length_bytes
关于函数的用法可以参考注释部分,注释写的也算详尽了。
五. 应用
接下来就可以在实际的项目中使用pcan_device.py来和CAN Device进行通信了;可以直接传入CAN数据帧,然后接受即可!
下面是一个简单的demo:
if __name__ == "__main__": # 默认的CAN ID CAN_DEVICE_ID = 16 # 初始化 device = PcanDevice(device_id=CAN_DEVICE_ID, device_channel=81, max_can_payload_bytes=64, is_fd=True, use_extended_id=False, en_fragmented_messages=False) # 打开CAN Device device.open() # 定义CAN数据帧 data = [42, 2, 0, 0] # 发送CAN数据帧 device.transmit_can_message(data) # 接收CAN数据帧 res = device.receive_can_message() print('Received msg:{}'.format([hex(data) for data in res[1]])) # 获取CAN Device information info = device.get_device_info()
返回的数据是:
Received msg:['0x4a', '0x2', '0x1', '0x94', '0x0', '0x11', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x0', '0x14'........]
上面example演示的是发送的[42, 2, 0, 0]其实是一个读取版本号的数据帧([2A, 02, 00, 00])
返回的版本号信息是:[4A, 02, 01 .......]
由于返回的数据帧过长(有近400个bytes),所以后面部分我就省略了。由此可见封装以后针对设备进行CAN FD的读写变得非常简单,不过想把原始的CAN数据帧翻译成能看懂的信息还需要进一步的解析工作,对于CAN数据帧解析部分由于保密问题在此就不做分享了。
所以,若需要实现更复杂的CAN FD通信只需要根据功能需要来组装特定的CAN数据帧,然后即可实现与设备的通信。然后按照不同的功能将脚本分成多个.py文件,最后为了方便用户的使用,编写了一个.bat脚本作为tool入口,将测按功能作为子菜单,输入菜单号即可执行:
六. 总结
在接触此项目之前,我对CAN的了解仅仅知道CAN是汽车电子领域一种通信协议,除此以外基本一无所知。 但是经过一段时间的学习和摸索也算对CAN有了初步了解,并且利用PCAN实现了CAN通信自由,真的是一把鼻涕一把泪啊!
有需要的同学可+ wx一起讨论: xgh321324