很多人一提到毫米波雷达测人体的呼吸、测心率,第一反应往往是听起来很高级,但真正落到工程代码里,到底在做什么呢?本文将为你细细道来。
最近我使用Python语言实现了60GHz毫米波雷达单人生命体征检测的程序,demo演示效果如上视频所示,看起来效果还不错。它不是论文里的理想模型,也不是实验室里只看离线数据的演示脚本,而是一份更接近实际工程的在线实时处理程序。
硬件平台的采用RavSense公司的60G毫米波雷达模组,链上硬件,烧录准备好提供的程序就可以使用本文的Pyhton代码实时跑起来了(感兴趣的读者购买后可以联系我,发送本文的代码)。
在本项目的程序中,Python程序一边从雷达串口实时收数据,一边从指夹式心电仪(血氧仪)读取参考心率,再把目标距离、雷达呼吸、雷达心率、心电仪心率以及二者差值,统一显示在同一个GUI界面里,如下图所示。

读取指夹式心电仪(血氧仪)的参考心率是一个必须的操作步骤,只有这样才可以更加便利地将雷达的测量结果与参考心率进行对比,以此来初步判断雷达算法的准确性。指夹式心电仪(血氧仪)采用下面这款:

京东链接:https://item.jd.com/3209242.html?pcdk=jMGN0WI4a-w7fdrQg8ldKwDDgxU1kevQnMDe-cYtnt-qwE1MVXDAto0PEF9YL01A.rQ4a.tlbT&spmTag=YTAyMTkuYjAwMjM1Ni5jMDAwMDcyMTAuc2VhcmNoX2J1dHRvbiUyQ2EwMjQwLmIwMDI0OTMuYzAwMDA0MDI3LjElMjNza3VfY2FyZA#switch-sku
如果只用一句话概括这套Python程序的核心逻辑,那就是先在距离维找到“人”所在的距离门,再从这个距离门的复数相位里提取胸腔微动,随后把慢时间相位信号分成低频的呼吸成分和较高频的心跳成分,最后再用一套稳定化策略,尽量避免心率结果乱跳。
一、Python程序结构分析
这份代码其实分成两条并行链路。
第一条链路是心电仪参考链路。程序通过 HID 接口连接一台指夹式设备,先发送两组初始化序列 SEQ_A 和 SEQ_B,随后周期发送 keepalive,维持设备在线。接收到数据后,会优先从 0xEB 0x01 0x05 这一类数据包中解析出 (HR, SpO2),同时也兼容 F3 和 F0 头的数据更新。也就是说,这台心电仪在这里扮演的是“实时参考值”的角色。
第二条链路才是60GHz 雷达生命体征检测主链路。程序使用串口接收雷达帧数据,固定寻找 00 01 02 03 04 05 06 07 作为帧同步标记。同步成功后,按固定长度连续读取每一帧,再校验帧尾,只有通过校验的数据才会进入后续处理。
这两条链路最终在 GUI 层汇合:界面上不仅显示雷达测得的呼吸和心率,还同步显示心电仪心率和两者差值,同时给出时间距离谱、呼吸波形、心跳波形以及“雷达心率和心电仪心率”的滑窗对比曲线。工程上,这种做法很实用,因为它不只是“测出来了”,而是随时能看到自己测得准不准。
二、雷达算法程序结构分析
很多人容易忽略这一点,生命体征算法的上限,往往先被数据接收链路决定。
这份代码里,雷达端的参数设置是 60GHz 载频、4GHz 带宽、50ms 帧周期,同时配置为 3 路接收通道、每帧 256 个 chirp。程序收到串口字节流后,不是直接把它当作整数去算,而是先按 9 字节一组,重组出 12bit ADC 数据,再恢复成三路接收通道数据。这个步骤决定了后面 FFT 和相位提取有没有意义。
从工程角度看,这一步其实就是在回答一个基础问题,你后面要分析的,到底是不是“真实的雷达慢时间信号”?如果帧同步错了、字节重组错了、12bit 拼接错了,那么后面的呼吸、心率再漂亮,也只是“在错误数据上的漂亮结果”。
当原始数据被重组成 Rx1 / Rx2 / Rx3 后,程序并没有同时对三路做复杂融合,而是沿用较典型、也更直接的工程思路:以 Rx3 为主通道,构造慢时间历史矩阵 Rx3_m。每来一帧,就把当前 Rx3 数据按列拼接进历史矩阵。
接下来,对 Rx3_m 做 256 点距离 FFT,并经过 fftshift 保留有效距离区。到这里,程序得到的其实已经不是“原始 ADC”,而是一张随时间变化的距离谱。但生命体征并不需要整张图,它真正关心的是哪一个距离门,最像“人体所在的位置”?
因此,代码又做了一个非常工程化的动作,对每个距离门减去时间均值,形成一个简化的 MTI 结果。这个操作的直观意义是把静态背景压掉,只保留随时间化更明显的部分。然后,在最近一段时间窗内,对每个距离门的动态幅度求和,选出能量最大的那个距离门,作为当前人体所在门。
这一步很关键,因为后面的相位提取、呼吸检测、心率检测,都是围绕这个“最佳距离门”展开的。换句话说,这套算法不是在整幅距离图里找呼吸心率,而是先“锁定人”,再去看这个位置的细微运动。
毫米波雷达做生命体征检测,本质上不是看人有多亮,而是看人有没有发生极小的周期性位移。在这份代码里,程序取出目标距离门对应的复数序列 phase_data,然后做 unwrap(angle(...)),得到连续相位序列。angle() 提取的是复数相位,unwrap() 负责消除相位缠绕,避免出现突然的跳变。
接着,代码对相位做一阶差分,得到 phase_object。这一步的目的,是把“缓慢累积的相位变化”变成“更容易观察的小幅动态变化”。换句话说,它把胸腔的微小位移,映射成了一条慢时间上的微动信号。到这里,呼吸和心跳其实已经“藏”在这条相位信号里了,问题不再是有没有,而是怎么把它们干净地分出来。
三、呼吸心率提取过程
生命体征算法里最麻烦的,往往不是“没有信号”,而是“信号里夹了很多不该有的东西”,所以这份代码在相位链路里,专门加了两层噪声抑制。
第一层是脉冲噪声移除。程序先把相位差分信号转成角度,然后用一个 3 点判决函数 filter_remove_impulse_noise() 去检查:如果当前点相对前后点都突然大幅偏离,并且超过阈值 90°,那它大概率不是正常胸腔微动,而是一个异常毛刺。这时程序就不保留原值,而是用前后两点做线性插值替换。
第二层是后来又加进去的相位二次平滑。phase_noise_suppress() 会先做中值中心化,再根据统计阈值找异常点,把异常点替换成相邻均值,最后再做一次 [0.2, 0.6, 0.2] 的卷积平滑。这个处理不是为了“让曲线更好看”,而是为了降低后面峰值法和 FFT 法被瞬时毛刺带偏的概率。
从工程视角看,这两步很有代表性: 真正能落地的生命体征算法,往往不是某一个高大上的模型,而是若干个“看起来朴素但很有效”的抗干扰环节叠加出来的。
呼吸支路并没有走频谱主峰法,而是采用了更直观的时域方法。
程序先对相位信号做 0.1–0.5 Hz 的 2 阶 Butterworth 带通,保留典型呼吸频段,然后在滤波后波形上做峰值检测。峰值之间的时间间隔,实际上就是一个呼吸周期;取相邻峰时间差的平均值,再换算成每分钟次数,就得到了呼吸率。
为什么这套方法在工程里常见?因为呼吸的主频低、周期长、形态相对明显。只要信号质量还可以,用峰间距去估计呼吸,往往比直接看频谱更容易解释,也更方便现场调参数。
当然,这种方法也有代价:一旦波形幅值起伏太大、峰值不稳定,或者人的呼吸模式发生变化,结果就容易受阈值和最小峰间距影响。所以它是一个很典型的工程友好型方案,好理解、好调试,但不是万能。
和呼吸不同,心跳信号更弱、频率更高,也更容易被呼吸谐波、身体微动和瞬时噪声淹没。因此,这份代码在心率支路上采取的是先把相位信号通过 0.8–2.0 Hz 带通滤波,再乘汉宁窗,随后做 1024 点 FFT,得到心率频谱。接着,程序不是在全频范围乱找,而是只在 0.8–2.0 Hz 的正频段内找主峰,并通过三点抛物线插值对峰值位置做细化,最后换算成 BPM。
但真正让这份代码更像“工程版”的,不是 FFT 本身,而是后面的心率稳定器。代码里显式加了这样几层约束:如果呼吸率为 0,或者心率落在 45–140 BPM 以外,就先判无效;如果当前心率和上一时刻相差太大,但频谱质量又不够高,就不相信新值,而是保留上一值;如果新值可以接受,也还要限制每一步最大变化量,最后再对最近几次结果做中值平滑。
说白了,这一段不是在“提升理论分辨率”,而是在解决一个非常现实的问题,哪怕频谱偶尔找错峰,也尽量别让界面上的心率一秒钟从 95 掉到 50。
四、心电仪参考心率
因为工程里最怕的不是“没结果”,而是“你以为结果对”。
这份代码里,心电仪线程实时读取 HR,GUI 顶部同步显示“雷达心率、心电仪心率、心率差值”,右上角还专门画了一幅滑窗对比图,让雷达结果和参考值放在同一坐标系里一起看。
这意味着这套程序并不只是一个“单机演示界面”,它其实已经具备了现场联调的味道: 算法一边跑,参考值一边进,差值一边算。 这种结构最大的意义,是能让开发者快速判断:问题究竟出在雷达目标门选择、相位质量、滤波参数,还是心率稳定策略本身。
从原始数据接收、距离门选择、相位提取、呼吸分离、心率分离,到最终的参考心率对比,每一步都能在界面上看到痕迹。你不是只拿到一个“呼吸 = 14,心率 = 78”的数字,而是能看到这个数字是从哪一段波形、哪一个距离门、哪一次频谱峰里来的。
但目前版本也确实还不够完美,还需要继续优化,因为从代码上看,这份实现依然偏向单目标、单主通道的工程逻辑。核心生命体征提取主要围绕 Rx3 展开,多通道 1T3R 的信息并没有被充分融合。静态杂波抑制本质上还是“按时间均值做减法”的简化 MTI,呼吸支路仍然是峰值法,心率支路虽然加了稳定器,但本质上依旧是单通道相位频谱主峰法。
这就决定了它的定位更适合被理解为一套“已经能跑、也能看见问题”的工程实现,离“复杂环境下长期稳定、绝对精确”的产品级方案,还有继续打磨的空间。
五、总结
如果只看结果,很多生命体征程序最终都会落到两个数字:呼吸多少、心率多少。
但真正决定一套系统有没有工程价值的,往往不是这两个数字本身,而是它背后的方法链是否完整、是否可解释、是否方便调试、是否便于对照参考值持续优化。
这份代码最值得肯定的地方,就在于它已经不再停留在“离线算一个结果”,而是走到了更接近工程现场的形态, 雷达在线采集,心电仪在线对照,GUI 在线显示,结果在线修正。
从这个意义上说,它不仅是一份程序,更像是一套小型实验平台。 而所有真正成熟的生命体征算法,往往都是从这样一套“能边跑边看边改”的工程平台里,一点一点长出来的。

感兴趣的读者,可以先看看部分代码,完整代码请扫描上方二维码联系博主。
# -*- coding: utf-8 -*-"""指夹血氧/心率仪 HID 数据采集 + 雷达信号处理 工具整合GUI绘图、串口、HID设备通信、信号滤波功能"""# ===================== 1. 标准库导入 =====================import osimport timeimport threadingfrom typing import List, Optional, Tuplefrom dataclasses import dataclass, fieldfrom collections import dequefrom datetime import datetimefrom queue import Empty, Queue# ===================== 2. 第三方库导入 =====================import numpy as npimport serialfrom serial.tools import list_portsfrom scipy.io import savematfrom scipy.signal import butter, find_peaks, sosfilt, sosfiltfilt# 可选依赖:HID设备通信try:import hidexcept ImportError:hid = None# ===================== 3. GUI/绘图库导入 =====================import tkinter as tkfrom tkinter import filedialog, messagebox, ttkfrom matplotlib.backends.backend_tkagg import FigureCanvasTkAggfrom matplotlib.figure import Figurefrom matplotlib import rcParams# ===================== 4. Matplotlib 全局配置 =====================rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Noto Sans CJK SC', 'Arial Unicode MS']rcParams['axes.unicode_minus'] = False# ===================== 5. 全局常量 =====================# 指夹血氧仪 HID 设备标识VID = 0x28E9PID = 0x028A# 设备初始化指令序列SEQ_A = ["7d81a78080808080807d81a2808080808080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","82020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","80000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","831a011c121d3844056a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","9f1f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","8e031100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","81010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",]SEQ_B = ["80000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","81010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","82020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","9f1f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","8e071500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","8e031100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","9f1f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","9b001b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","9b011c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",]# 心跳包配置KEEPALIVE_INTERVAL = 5.0KEEPALIVE_PAYLOAD64 = bytes.fromhex("9a1a" + "00" * 62)# ===================== 6. 通用工具函数 =====================def get_timestamp() -> str:"""获取格式化时间戳"""return datetime.now().strftime("%Y-%m-%d %H:%M:%S")def safe_db(x: np.ndarray) -> np.ndarray:"""安全计算分贝值,避免log(0)报错"""return 20.0 * np.log10(np.maximum(np.abs(x), np.finfo(float).eps))# ===================== 7. 指夹血氧/心率仪 HID 采集模块 =====================def hex_to_64_bytes(hexstr: str) -> bytes:"""将十六进制字符串转换为64字节固定长度数据"""data = bytes.fromhex(hexstr)return (data + b"\x00" * 64)[:64]def write_report65(dev: "hid.device", payload64: bytes) -> int:"""向HID设备写入65字节报告(1字节报告ID + 64字节数据)"""return dev.write(b"\x00" + payload64)def parse_eb_frame(data: bytes) -> Tuple[Optional[Tuple[int, int]], List[int]]:"""解析EB帧数据,提取心率、血氧和波形数据"""vitals = Nonewave_data = []index = 0data_len = len(data)while index < data_len:if data[index] != 0xEB:index += 1continue# 解析心率血氧数据if index + 8 <= data_len and data[index+1] == 0x01 and data[index+2] == 0x05:vitals = (data[index+3], data[index+4])index += 8continue# 解析波形数据if index + 6 <= data_len and data[index+1] == 0x00:value = data[index+3] | (data[index+4] << 8)wave_data.append(value)index += 6continueindex += 1return vitals, wave_data@dataclassclass FingerState:"""血氧仪状态数据类(线程安全)"""hr: Optional[float] = Nonespo2: Optional[float] = Nonesource: str = ""last_update_time: float = field(default_factory=time.time)lock: threading.Lock = field(default_factory=threading.Lock)class FingerClipReader(threading.Thread):"""血氧仪数据读取线程(后台守护线程)"""def __init__(self, state: FingerState, stop_event: threading.Event, status_queue: Queue):super().__init__(daemon=True)self.state = stateself.stop_event = stop_eventself.status_queue = status_queueself.device: Optional["hid.device"] = Nonedef log(self, msg: str) -> None:"""发送日志到状态队列"""self.status_queue.put(("status", f"[{get_timestamp()}] [心电仪] {msg}"))def send_init_sequence(self, seq: List[str], label: str) -> None:"""发送设备初始化指令序列"""self.log(f"初始化 {label} 指令...")for cmd in seq:payload = hex_to_64_bytes(cmd)write_report65(self.device, payload)time.sleep(0.01)def _safe_close(self) -> None:"""安全关闭HID设备连接"""try:if self.device:self.device.close()except Exception:passfinally:self.device = Nonedef _stream_loop(self) -> bool:"""设备数据循环读取"""last_keepalive = time.time()last_rx_time = time.time()last_spo2 = Nonewhile not self.stop_event.is_set():current_time = time.time()# 发送心跳包if current_time - last_keepalive >= KEEPALIVE_INTERVAL:try:write_report65(self.device, KEEPALIVE_PAYLOAD64)except Exception:return Falselast_keepalive = current_time# 读取设备数据try:raw_data = self.device.read(64, timeout_ms=200)except Exception:return Falseif raw_data:last_rx_time = current_timebyte_data = bytes(raw_data)head = byte_data[0]# 解析EB帧if head == 0xEB:vitals, _ = parse_eb_frame(byte_data)if vitals:hr, spo2 = vitalslast_spo2 = spo2with self.state.lock:self.state.hr = float(hr)self.state.spo2 = float(spo2)self.state.source = "EB0105"self.state.last_update_time = time.time()# 解析F3帧(心率)elif head == 0xF3 and len(byte_data) >= 3:hr_val = int(byte_data[2])if 35 <= hr_val <= 220:with self.state.lock:self.state.hr = float(hr_val)if last_spo2 is not None:self.state.spo2 = float(last_spo2)self.state.source = "F3"self.state.last_update_time = time.time()# 解析F0帧(血氧)elif head == 0xF0 and len(byte_data) >= 2:spo2_val = int(byte_data[1])if 50 <= spo2_val <= 100:last_spo2 = spo2_valwith self.state.lock:self.state.spo2 = float(spo2_val)self.state.source = "F0/F3"self.state.last_update_time = time.time()# 超时断开if current_time - last_rx_time > 3.0:return Falsereturn Truedef _open_and_stream_once(self) -> bool:"""单次尝试连接并启动数据读取"""if hid is None:self.log("未安装hid库,无法连接设备")return Truedevice_list = hid.enumerate(VID, PID)if not device_list:self.log(f"未找到设备 VID=0x{VID:04X} PID=0x{PID:04X}")return Falsefor dev_info in device_list:if self.stop_event.is_set():return Trueself._safe_close()try:self.device = hid.device()self.device.open_path(dev_info["path"])self.device.set_nonblocking(False)# 发送初始化指令self.send_init_sequence(SEQ_A, "序列A")time.sleep(2.2)self.send_init_sequence(SEQ_B, "序列B")write_report65(self.device, KEEPALIVE_PAYLOAD64)self.log("设备连接成功")return self._stream_loop()except Exception as e:self.log(f"设备初始化失败: {str(e)}")self._safe_close()continuereturn Falsedef run(self) -> None:"""线程主函数:循环重连设备"""try:while not self.stop_event.is_set():if self._open_and_stream_once():breakif self.stop_event.is_set():breaktime.sleep(2.0)except Exception as e:self.log(f"线程异常退出: {repr(e)}")finally:self._safe_close()# ===================== 8. 雷达信号处理模块 =====================def remove_impulse_noise(data_prev2: float,data_prev1: float,data_curr: float,threshold: float) -> Tuple[float, float, float]:"""脉冲噪声滤波返回:滤波后值, 后向差分, 前向差分"""backward_diff = data_prev1 - data_prev2forward_diff = data_prev1 - data_curr# 检测脉冲噪声并插值if ((forward_diff > threshold) and (backward_diff > threshold)) or \((forward_diff < -threshold) and (backward_diff < -threshold)):filtered_val = data_prev2 + (data_curr - data_prev2) / 2else:filtered_val = data_prev1return filtered_val, backward_diff, forward_diffdef suppress_phase_noise(data: np.ndarray) -> np.ndarray:"""相位噪声抑制:中值滤波+阈值去噪+卷积平滑"""data = np.asarray(data, dtype=np.float64).copy()data_len = data.sizeif data_len <= 2:return data# 零均值化data -= np.median(data)abs_data = np.abs(data)med_abs = np.median(abs_data)# 计算噪声阈值if med_abs < np.finfo(float).eps:threshold = max(0.06, 3.0 * np.std(data))else:threshold = max(0.06, 5.0 * med_abs / 0.6745)# 异常值插值修复outlier_indices = np.where(abs_data > threshold)[0]for idx in outlier_indices:if 0 < idx < data_len - 1:data[idx] = 0.5 * (data[idx-1] + data[idx+1])elif idx == 0 and data_len > 1:data[idx] = data[idx+1]elif idx == data_len - 1 and data_len > 1:data[idx] = data[idx-1]# 加权卷积平滑kernel = np.array([0.2, 0.6, 0.2], dtype=np.float64)return np.convolve(data, kernel, mode='same')
欢迎点击下方名片,关注调皮连续波: