超声波
红外测距
里程计
IMU(惯性测量单元)
激光雷达感知
每种传感器,走独立环形队列
融合线程周期性读取最新数据并生成融合状态
控制线程根据状态机和 PID 输出电机控制
健康检查线程通过 timerfd 周期检查各线程/各传感器是否正常
watchdog 线程在系统异常时触发安全停车
守护进程负责:
接收主进程状态
接收本地调试命令
热重载 JSON 配置
回推配置给主进程
主进程和守护进程之间用 Socket
agv_robot_pro/├── include/│ ├── protocol.h│ ├── log.h│ ├── queue.h│ ├── config_json.h│ ├── sensor.h│ ├── fusion.h│ ├── state_machine.h│ ├── pid.h│ ├── health.h│ ├── watchdog.h│ ├── ipc.h│ ├── daemon.h│ └── app.h├── src/│ ├── log.c│ ├── queue.c│ ├── config_json.c│ ├── sensor.c│ ├── fusion.c│ ├── state_machine.c│ ├── pid.c│ ├── health.c│ ├── watchdog.c│ ├── ipc.c│ ├── daemon.c│ └── main.c├── tests/│ ├── test_queue.c│ └── test_pid.c└── Makefile目录结构主要包含:
协议层、基础设施层、感知层、决策层、控制层、运维层、测试层
超声波线程 ----> ultrasonic_queue ---\红外线程 ----> infrared_queue ----- \里程计线程 ----> odom_queue -----------> 融合线程 ---> 状态机 ---> 控制线程(PID)IMU线程 ----> imu_queue ---------- /激光线程 ----> lidar_queue --------/健康检查线程(timerfd) ---> 检查各传感器最近更新时间 ---> 喂狗watchdog线程 -----------> 超时则切换 SAFE_STOP主进程状态/告警 ---> Unix Domain Socket ---> 守护进程守护进程 <--- 本地命令 Unix Socket 客户端守护进程 <--- SIGHUP 重载 JSON 配置守护进程 ---> 新配置 ---> Unix Domain Socket ---> 主进程数据链路说明:
高频实时链路放在同进程多线程中,用独立环形队列解耦;
低频运维链路拆成守护进程,通过 Socket 做本地 IPC;
热配置通过 signal 触发守护进程 reload,再回推主进程生效。
#ifndef PROTOCOL_H#define PROTOCOL_H#include <stdint.h> #include <time.h> //引入 timespec 时间戳结构//定义传感器名字最大长度#define SENSOR_NAME_LEN 32//定义状态机名字最大长度#define STATE_SENSOR_LEN 32//定义控制动作字符串最大长度#define DECISION_LEN 32//定义本地Unix Socket 路径#define APP_DAEMON_SOCKET_PATH "/tmp/agv_robot_daemon.sock"//定义配置文件路径#define APP_CONFIG_JSON_PATH "/tmp/agv_robot.json"//定义传感器类型枚举typedef enum{ SENSOR_ULTRASONIC = 1, //超声波 SENSOR_INFRARED = 2, //红外 SENSOR_ODOMETRY = 3, //里程计 SENSOR_IMU = 4, //IMU SENSOR_LIDAR = 5 //激光雷达}SensorType;//定义应用层消息类型,主进程与守护进程通信会用到typedef enum{ MSG_STATUS = 1, //状态上报 MSG_CONFIG = 2, //配置回传 MSG_ALERT = 3 //告警}MessageType;//定义IMU 数据结构typedef struct{float yaw_deg; //偏航角float gyro_z_dps; //z轴角度float accel_x_mps2; //x轴角度}ImuData;//定义激光雷达数据结构typedef struct{float min_front_dist_cm; //前方最小障碍距离}LidarData;//定义同用传感器包typedef struct{ uint64_t seq; //数据序号 SensorType type; //传感器类型 char Sensor_name[SENSOR_NAME_LEN];//传感器名称 struct timespec ts; //时间戳 union{float ultrasonic_cm; //超声波距离 int infrared_hit; //红外是否触发float odom_speed_cm_s; //里程计速度 ImuData imu; //IMU数据 LidarData lidar; //激光雷达数据 }data;}SensorPacket;//定义融合状态typedef struct{ uint16_t fusion_seq; //融合序号 struct timespec ts; //融合时间戳float front_distance_cm; //前方融合后距离float speed_cm_s; //当前速度float yaw_deg; //当前姿态角 int obstacle_detected; //是否检测到距离 int sensor_fault; //是否有传感器故障}FusionState;//定义状态机状态typedef enum{ ROBOT_STATE_IDLE = 0, //空闲 ROBOT_STATE_FORWARD, //前进 ROBOT_STATE_SLOW, //减速 ROBOT_STATE_AVOID, //避障 ROBOT_STATE_SAFE_STOP, //安全停车 ROBOT_STATE_FAULT //故障}RobotState;//定义控制命令typedef struct{ uint64_t cmd_seq; //控制命令序号 struct timespec ts; //控制输出时间 RobotState state; //当前状态机状态 char decision[DECISION_LEN]; //文本动作,例如FORWARD / STOPfloat target_speed_cm_s; //目标速度float actual_speed_cm_s; //实际速度 int motor_left_pwm; //左轮 PWM int motor_right_pwm; //右轮 pwm}ContronlCommand;//定义运行配置typedef struct{ int danger_distance_cm; //危险距离阈值 int slow_distance_cm; //减速距离阈值 int health_check_ms; //健康检查周期 int watchdog_timeout_ms; //watchdog 超时时间float pid_kp; //pid k值float pid_ki; //pid i值 flaot pid_kd; //pid d值}RobotConfig;//定义发给守护进程状态typedef struct{ MessageType msg_type; //消息类型 FusionState fusion; //融合状态 ContronlCommand cmd; //控制命令}StatusMessage;//定义守护进程发回主进程的配置信息typedef struct{ MessageType msg_type; //消息类型 RobotConfig cfg; //配置内容}ConfigMessage;//定义告警消息typedef struct{ MessageType msg_type; //消息类型 char tetx[128]; //告警文本}AlterMessage;#endif这里是把TLS放进去,多线程模块不做这一步,后面日志全乱。
include/log.h#ifndef LOG_H#define LOG_H//TLS:每个线程各自持有自己的名字extern __thread const char *tls_thread_name;void log_set_thread_name(const char *name);void log_info(const char *fmt,...);void log_warn(const char *fmt,...);void log_error(const char *fmt,...);#end ifsrc/log.c#include "log.h"#include <stdio.h>#include <stdarg.h>#include <time.h>//TLS:线程局部存储__thread const char *tls_thread_name = "unknown";//设置当前线程名字void log_setthread_name(const char *name){ tls_thread_name = name;}//内部统一打印日志static void log_vprint(const char *level,const char *fmt,va_list ap){ struct timespec ts; clock_gettime(CLOCK_REALTIME,&ts); struct tm tm_info; localtime_r(&ts.tv_sec,&tm_info); char time_buf[64]; strftime(time_buf,sizeof(time_buf),"%H:%M:%S",&tm_info);printf("[%s.%03ld] [%-5s] [%-12s]",time_buf, ts.tv_nsec / 1000000, level, tls_thread_name); vprintf(fmt,ap);printf("\n"); fflush(stdout);}void log_info(const char *fmt,...){ va_list ap; va_start(ap,fmt); log_vprintf("INIFO",fmt,ap); va_end(ap);}void log_warn(const char *fmt,...){ va_list ap; va_start(ap,fmt); long_vprintf("WARN",fmt,ap); va_end(ap);}void log_error(const char *fmt,...){ va_list ap; va_start(ap,fmt); long_vprintf("ERROR",fmt,ap); va_end(ap);}/* 每个线程设置自己的线程名 日志自动带线程身份 不需要每次都手动传线程名*/include/config.h#ifndef CONFIG_H#define CONFIG_H#include "protocol.h"void config_set_default(RobotConfig, *cfg);int config_load(const char *path,RobotConfig *cfg);int config_save(const char *path,const RobotConfig *cfg);#endifsrc/config.c#include "config.h"#include <stdio.h>void config_set_fault(RobotConfig *cfg){if(cfg == NULL) {return; } cfg->danger_distance_cm = 25; //危险距离阈值设定 cfg->slow_distance_cm = 45; //减速距离阈值设定 cfg->log_level = 1; //日志等级 1}//从文件读取配置int config_load(const char *path,RobotConfig *cfg){if(path == NULL || cfg == NULL) {return -1; } FILE *fp = fopen(path,"r");if(fp == NULL){return -1; } config_set_default(cfg); char line[128]; int value = 0;while(fgets(line,sizeof(line),fp) != NULL){if(sscanf(line,"danger_distance_cm=%d",&value)==1){ cfg->danger_distance_cm = value; }elseif(sscanf(line,"slow_distance_cm=%d",&value)==1){ cfg->slow_distance_cm = value; }elseif(sscanf(line,"log_level=%d",&value)==1){ cfg->log_level = value; } } fclose(fp);return 0;}//保存配置int config_save(const char *path,const RobotConfig *cfg){if(path == NULL || cfg == NULL){return -1; } FILE *fp = fopen(path,"w");if(fp == NULL){return -1; } fprintf(fp,"danger_distance_cm=%d\n",cfg->danger_distance_cm); fprintf(fp,"slow_distance_cm=%d\n",cfg->slow_distance_cm); fprintf(fp,"log_level = %d\n",cfg->log_level); flcose(fp);return 0;}/* 配置读写和业务逻辑分离 守护进程只关心“加载配置” 处理线程只关心“拿配置做决策”*/ring_buffer.h / ring_buffer.c
这里是线程通信的核心 采集线程和处理线程之间,就靠它。
include/ring_buffer.h#ifndef RING_BUFFER_H#define RING_BUFFER_H#include "protocol.h"#include <pthread.h>#define RING_CAPACITY 64typedef struct{ SensorFrame data[RING_CAPAICITY]; int head; //出队位置 int tail; //入队位置 int count; //当前元素个数 int stop; //停止标记 pthread_mutex_t mutex; pthread_cond_t not_empty; pthread_cond_t not_full;}SensorRingBuffer;int ring_buffer_init(SensorRingBuffer *rb);void ring_buffer_destroy(SensorRingBuffer *rb);int ring_buffer_push(SensorRingBuffer *rb, const SensorFrame *frame);int ring_buffer_pop(SensorRingBuffer *rb, SensorFrame *frame);void ring_buffer_stop(SensorRingBuffer *rb);#endifsrc/ring_buffer.c#include "ring_buffer.h"#include <string.h>int ring_buffer_init(SensorRingBuffer *rb){if (rb == NULL) {return -1; } memset(rb, 0, sizeof(*rb)); pthread_mutex_init(&rb->mutex, NULL); pthread_cond_init(&rb->not_empty, NULL); pthread_cond_init(&rb->not_full, NULL);return 0;}void ring_buffer_destroy(SensorRingBuffer *rb){if (rb == NULL) {return; } pthread_mutex_destroy(&rb->mutex); pthread_cond_destroy(&rb->not_empty); pthread_cond_destroy(&rb->not_full);}/* 生产者入队 */int ring_buffer_push(SensorRingBuffer *rb, const SensorFrame *frame){if (rb == NULL || frame == NULL) {return -1; } pthread_mutex_lock(&rb->mutex); /* 队列满则等待 */while (rb->count == RING_CAPACITY && !rb->stop) { pthread_cond_wait(&rb->not_full, &rb->mutex); }if (rb->stop) { pthread_mutex_unlock(&rb->mutex);return -1; } rb->data[rb->tail] = *frame; rb->tail = (rb->tail + 1) % RING_CAPACITY; rb->count++; /* 通知消费者:队列非空 */ pthread_cond_signal(&rb->not_empty); pthread_mutex_unlock(&rb->mutex);return 0;}/* 消费者出队 */int ring_buffer_pop(SensorRingBuffer *rb, SensorFrame *frame){if (rb == NULL || frame == NULL) {return -1; } pthread_mutex_lock(&rb->mutex); /* 队列空则等待 */while (rb->count == 0 && !rb->stop) { pthread_cond_wait(&rb->not_empty, &rb->mutex); }if (rb->count == 0 && rb->stop) { pthread_mutex_unlock(&rb->mutex);return -1; } *frame = rb->data[rb->head]; rb->head = (rb->head + 1) % RING_CAPACITY; rb->count--; /* 通知生产者:队列非满 */ pthread_cond_signal(&rb->not_full); pthread_mutex_unlock(&rb->mutex);return 0;}/* 停止队列,唤醒等待线程 */void ring_buffer_stop(SensorRingBuffer *rb){if (rb == NULL) {return; } pthread_mutex_lock(&rb->mutex); rb->stop = 1; pthread_cond_broadcast(&rb->not_empty); pthread_cond_broadcast(&rb->not_full); pthread_mutex_unlock(&rb->mutex);}原理:
共享内存:环形数组就是共享缓冲区
互斥锁:保护 head/tail/count
条件变量:实现非空/非满等待通知
采集线程和处理线程之间,不能直接共享一个全局变量,而是采用带互斥锁和条件变量的环形缓冲区,既能保护并发安全,也能吸收采集与处理速度不一致带来的瞬时抖动。
思路说明:把所有传感器都抽象成同一套接口,线程函数只能依赖统一结构。
#ifndef SERSOR_H#define SERSOR_H#include "protocol.h"#include <ring_buffer.h>#include <pthread.h>typedef struct { SersorType type; const char *name; int period_ms; //采样周期 SersorRingBuffer *rb; //采样结果写入哪个环形队列 pthread_t tid; int running;}SersorDevice;int sersor_start(SersorDevice *dev);void Sersor_stop(SersorDevice *dev);void Sersor_join(SersorDevice *dev);#endif#include "serosr.h"#include "log.h"#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <pthread.h>#include <time.h>//模拟不同传感器读数static float fake_read_sersor(SerosorType type){ switch(type){case SERSOR_ULTRASOIC: //超声波:10—100cmreturn 10 + rand()%91;case SERSOR_INFARED: //红外:0/1模拟近距离障碍触发return rand()%2;case SERSOR_ODOMETRY: //里程计:模拟车速0-80cm/sreturn rand()%81; default:return 0; }}//统一采集线程函数static void *sersor_thread_func(*arg){ SersorDevice *dev = (SersorDevice *)arg; int seq = 0; log_set_thread_name(dev->name); log_info("sersor thread start");while(dev->running){ SersorFrame frame; memset(&frame,0,sizeof(frame)); frame.seq = ++seq; frame.type = dev->type; strnpy(frame.sensor_name,dev->name,sizeof(frame.sersor_name)-1); frame.value = fake_read_sersor(dev->type); clock_gettime(CLOCK_REALTIME,&frame.ts);if(ring_buffer_push(dev->rb,&frame) != 0){break; } log_info("push frame:seq = %d type = %d value = %0.2f",frame.seq,frame.type,frame.value); usleep(dev->period_ms *1000); } log_info("sersor thread exit");return NUll;}int sersor_start(SersorDevice *dev){if(dev == NULL || dev->rb == NULL){return -1; } dev->runnig = 1;if(pthread_create(&dev->tid,NULL,sersor_thread_func,dev) != 0){ dev->running = 0;return -1; }return 0;}void sersor_stop(SersorDevice *dev){if(dev != NULL){ pthread_join(dev->tid,NULL); }}为什么这样设计扩展?
1.新增一种SersorType;
2.补一个fake_read_sersor()分支或者真实驱动读数;
3.在主线程里再创建一个SersorDevice,不用改融合的主框架;
4.整体上传感器做成统一抽象,避免不同传感器线程各写一套独立逻辑,这样以后增加不同型号的传感器:激光雷达,IMU,摄像头都能沿用同一个采集模型,便于扩展。
融合模块负责从各个传感器数据中形成”环境状态“
include/fusion.h#ifndef FUSION_H#define FUSION_H#include "protocol.h"typedef struct{float lastest_ultrasonic; int lastest_infrared;float lastest_speed; int fusion_seq;}FusionContext;void fusion_init(FusionContext *ctx);void fusion_update(FusionContext *ctx,const SersorFrame *frame);void fusion_build_state(FusionContext *ctx,FusionState,*state);#endif;src/fusion.c#include "fusion.h"#include "string.h"#include <time.h>void fusion_init(FusionContext *ctx){if(ctx == NULL){return; } memset(ctx,0,sizeof(*ctx)); ctx->lastest_ultrasonic = 100.0f; ctx->lastest_infrared = 0; ctx->lastest_speed = 0.0f; ctx->fusion.seq = 0; }//根据不同传感器帧更新融合上下文void fusion_update(FusionContext *ctx, const SensorFrame *frame){if (ctx == NULL || frame == NULL) {return; } switch (frame->type) {case SENSOR_ULTRASONIC: ctx->latest_ultrasonic = frame->value;break;case SENSOR_INFRARED: ctx->latest_infrared = (int)frame->value;break;case SENSOR_ODOMETRY: ctx->latest_speed = frame->value;break; default:break; }}/* 基于上下文构造融合状态 */void fusion_build_state(FusionContext *ctx, FusionState *state){if (ctx == NULL || state == NULL) {return; } memset(state, 0, sizeof(*state)); ctx->fusion_seq++; state->frame_seq = ctx->fusion_seq; state->front_distance_cm = ctx->latest_ultrasonic; state->speed_cm_s = ctx->latest_speed; /* 简单规则: * 超声波距离近,或者红外触发,则判断前方有障碍 */if (ctx->latest_ultrasonic < 30.0f || ctx->latest_infrared == 1) { state->obstacle_detected = 1; } else { state->obstacle_detected = 0; } clock_gettime(CLOCK_REALTIME, &state->ts);}融合线程不是直接一帧一帧做业务,而是维护一个融合上下文:
最近一次超声波距离
最近一次红外状态
最近一次里程计速度
任何一帧到来,都先更新上下文,再输出一个融合状态。
负责动作控制决策
#ifndef CONTROL_H#define CONTROL_H#include "protocol.h"void control_decide(const FusionState *state, const RobotConfig *cfg, ControlCommand *cmd);#endif#include "control.h"#include <string.h>#include <time.h>void control_decide(const FusionState *state, const RobotConfig *cfg, ControlCommand *cmd){if (state == NULL || cfg == NULL || cmd == NULL) {return; } memset(cmd, 0, sizeof(*cmd)); cmd->frame_seq = state->frame_seq; clock_gettime(CLOCK_REALTIME, &cmd->ts);if (state->front_distance_cm <= cfg->danger_distance_cm) { strncpy(cmd->decision, "STOP", sizeof(cmd->decision) - 1); cmd->motor_pwm = 0; } elseif (state->front_distance_cm <= cfg->slow_distance_cm) { strncpy(cmd->decision, "SLOW", sizeof(cmd->decision) - 1); cmd->motor_pwm = 30; } elseif (state->obstacle_detected) { strncpy(cmd->decision, "TURN", sizeof(cmd->decision) - 1); cmd->motor_pwm = 20; } else { strncpy(cmd->decision, "FORWARD", sizeof(cmd->decision) - 1); cmd->motor_pwm = 60; }}这个模块是工程核心。 它负责:
管理多个传感器线程
管理处理线程
管理控制线程
管理运行配置
向守护进程发送状态
#ifndef WORKER_H#define WORKER_H#include "protocol.h"int worker_init(const RobotConfig *cfg, int status_write_fd);int worker_start(void);void worker_stop(void);void worker_join(void);void worker_update_config(const RobotConfig *cfg);#endif#include "worker.h"#include "ring_buffer.h"#include "sensor.h"#include "fusion.h"#include "control.h"#include "log.h"#include <pthread.h>#include <string.h>#include <unistd.h>#include <errno.h>/* 全局环形缓冲区:所有传感器共用,处理线程统一消费 */static SensorRingBuffer g_rb;/* 多传感器设备 */static SensorDevice g_ultrasonic;static SensorDevice g_infrared;static SensorDevice g_odometry;/* 融合处理线程 */static pthread_t g_fusion_tid;/* 当前运行配置 */static RobotConfig g_cfg;static pthread_mutex_t g_cfg_mutex = PTHREAD_MUTEX_INITIALIZER;/* 写给守护进程的 pipe */static int g_status_write_fd = -1;/* 运行标志 */static volatile int g_running = 0;/* 配置安全读取 */static RobotConfig get_cfg_copy(void){ RobotConfig cfg; pthread_mutex_lock(&g_cfg_mutex); cfg = g_cfg; pthread_mutex_unlock(&g_cfg_mutex);return cfg;}/* 主进程收到守护进程配置回传后,更新到 worker */void worker_update_config(const RobotConfig *cfg){if (cfg == NULL) {return; } pthread_mutex_lock(&g_cfg_mutex); g_cfg = *cfg; pthread_mutex_unlock(&g_cfg_mutex); log_info("config updated: danger=%d slow=%d log_level=%d", cfg->danger_distance_cm, cfg->slow_distance_cm, cfg->log_level);}/* 封装完整写,避免短写 */static int write_full(int fd, const void *buf, size_t len){ const char *p = (const char *)buf; size_t left = len;while (left > 0) { ssize_t n = write(fd, p, left);if (n < 0) {if (errno == EINTR) {continue; }return -1; } left -= (size_t)n; p += n; }return 0;}/* 融合处理线程: * 统一消费所有传感器帧 -> 更新融合上下文 -> 输出控制决策 */static void *fusion_thread_func(void *arg){ (void)arg; log_set_thread_name("fusion"); FusionContext fctx; fusion_init(&fctx);while (g_running) { SensorFrame frame;if (ring_buffer_pop(&g_rb, &frame) != 0) {break; } /* 1. 用传感器帧更新融合上下文 */ fusion_update(&fctx, &frame); /* 2. 构造融合状态 */ FusionState state; fusion_build_state(&fctx, &state); /* 3. 读取最新配置 */ RobotConfig cfg = get_cfg_copy(); /* 4. 做控制决策 */ ControlCommand cmd; control_decide(&state, &cfg, &cmd); log_info("fusion state: seq=%d dist=%.2f speed=%.2f obstacle=%d -> cmd=%s pwm=%d", state.frame_seq, state.front_distance_cm, state.speed_cm_s, state.obstacle_detected, cmd.decision, cmd.motor_pwm); /* 5. 发给守护进程 */if (g_status_write_fd >= 0) { RobotStatus status; status.fusion = state; status.cmd = cmd;if (write_full(g_status_write_fd, &status, sizeof(status)) != 0) { log_warn("write status to daemon failed"); } } } log_info("fusion thread exit");return NULL;}int worker_init(const RobotConfig *cfg, int status_write_fd){if (cfg == NULL) {return -1; }if (ring_buffer_init(&g_rb) != 0) {return -1; } pthread_mutex_lock(&g_cfg_mutex); g_cfg = *cfg; pthread_mutex_unlock(&g_cfg_mutex); g_status_write_fd = status_write_fd; /* 初始化多个传感器设备 */ g_ultrasonic.type = SENSOR_ULTRASONIC; g_ultrasonic.name = "ultrasonic"; g_ultrasonic.period_ms = 200; g_ultrasonic.rb = &g_rb; g_infrared.type = SENSOR_INFRARED; g_infrared.name = "infrared"; g_infrared.period_ms = 300; g_infrared.rb = &g_rb; g_odometry.type = SENSOR_ODOMETRY; g_odometry.name = "odometry"; g_odometry.period_ms = 500; g_odometry.rb = &g_rb;return 0;}int worker_start(void){ g_running = 1;if (sensor_start(&g_ultrasonic) != 0) {return -1; }if (sensor_start(&g_infrared) != 0) { sensor_stop(&g_ultrasonic); sensor_join(&g_ultrasonic);return -1; }if (sensor_start(&g_odometry) != 0) { sensor_stop(&g_ultrasonic); sensor_stop(&g_infrared); sensor_join(&g_ultrasonic); sensor_join(&g_infrared);return -1; }if (pthread_create(&g_fusion_tid, NULL, fusion_thread_func, NULL) != 0) { sensor_stop(&g_ultrasonic); sensor_stop(&g_infrared); sensor_stop(&g_odometry); sensor_join(&g_ultrasonic); sensor_join(&g_infrared); sensor_join(&g_odometry);return -1; }return 0;}void worker_stop(void){ g_running = 0; sensor_stop(&g_ultrasonic); sensor_stop(&g_infrared); sensor_stop(&g_odometry); ring_buffer_stop(&g_rb);}void worker_join(void){ sensor_join(&g_ultrasonic); sensor_join(&g_infrared); sensor_join(&g_odometry); pthread_join(g_fusion_tid, NULL); ring_buffer_destroy(&g_rb);}模块说明: 把多传感器采集线程和融合决策线程分开,传感器线程只负责采样和投递,融合线程只负责消费和决策。这样职责边界清晰,也便于后续替换融合算法。
守护进程负责:
读主进程发来的状态 pipe
读 FIFO 调试命令
收 SIGHUP 进行热重载
把新配置通过 pipe 回传给主进程
用 epoll 统一事件管理
#ifndef DAEMON_H#define DAEMON_Hint daemon_run(int status_read_fd, int config_write_fd, const char *fifo_path, const char *config_path);#endif#include "daemon.h"#include "protocol.h"#include "config.h"#include "log.h"#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <signal.h>#include <fcntl.h>#include <errno.h>#include <sys/stat.h>#include <sys/epoll.h>static int g_signal_pipe[2] = {-1, -1};static volatile sig_atomic_t g_daemon_stop = 0;/* 可靠写 */static int write_full(int fd, const void *buf, size_t len){ const char *p = (const char *)buf; size_t left = len;while (left > 0) { ssize_t n = write(fd, p, left);if (n < 0) {if (errno == EINTR) {continue; }return -1; } left -= (size_t)n; p += n; }return 0;}/* signal handler 里只做最小动作:写 self-pipe */static void daemon_signal_handler(int sig){ char ch = 0;if (sig == SIGHUP) { ch = 'R'; /* reload */ } elseif (sig == SIGTERM || sig == SIGINT) { ch = 'Q'; /* quit */ } else {return; }if (g_signal_pipe[1] >= 0) { (void)write(g_signal_pipe[1], &ch, 1); }}static int set_nonblock(int fd){ int flags = fcntl(fd, F_GETFL, 0);if (flags < 0) {return -1; }return fcntl(fd, F_SETFL, flags | O_NONBLOCK);}/* 把新配置回传给主进程 */static void send_config_back(int fd, const RobotConfig *cfg){if (write_full(fd, cfg, sizeof(*cfg)) != 0) { log_warn("send config back failed"); } else { log_info("send config back: danger=%d slow=%d log_level=%d", cfg->danger_distance_cm, cfg->slow_distance_cm, cfg->log_level); }}static void reload_config_and_notify(const char *config_path, int config_write_fd, RobotConfig *cfg){ RobotConfig new_cfg;if (config_load(config_path, &new_cfg) != 0) { log_warn("reload config failed");return; } *cfg = new_cfg; log_info("reload config success: danger=%d slow=%d log_level=%d", cfg->danger_distance_cm, cfg->slow_distance_cm, cfg->log_level); send_config_back(config_write_fd, cfg);}static void handle_fifo_command(const char *cmd, RobotConfig *cfg, const char *config_path, int config_write_fd){if (strcmp(cmd, "status") == 0) { log_info("fifo cmd=status"); log_info("current config: danger=%d slow=%d log_level=%d", cfg->danger_distance_cm, cfg->slow_distance_cm, cfg->log_level); } elseif (strcmp(cmd, "reload") == 0) { reload_config_and_notify(config_path, config_write_fd, cfg); } elseif (strncmp(cmd, "set danger ", 11) == 0) { int value = atoi(cmd + 11);if (value > 0) { cfg->danger_distance_cm = value; config_save(config_path, cfg); send_config_back(config_write_fd, cfg); log_info("set danger success: %d", value); } } elseif (strncmp(cmd, "set slow ", 9) == 0) { int value = atoi(cmd + 9);if (value > 0) { cfg->slow_distance_cm = value; config_save(config_path, cfg); send_config_back(config_write_fd, cfg); log_info("set slow success: %d", value); } } elseif (strcmp(cmd, "quit") == 0) { log_info("fifo cmd=quit"); g_daemon_stop = 1; } else { log_warn("unknown fifo cmd: %s", cmd); }}int daemon_run(int status_read_fd, int config_write_fd, const char *fifo_path, const char *config_path){ log_set_thread_name("daemon"); RobotConfig cfg;if (config_load(config_path, &cfg) != 0) { config_set_default(&cfg); config_save(config_path, &cfg); } /* 启动先发一次配置给主进程 */ send_config_back(config_write_fd, &cfg);if (pipe(g_signal_pipe) != 0) { perror("pipe");return -1; } signal(SIGHUP, daemon_signal_handler); signal(SIGTERM, daemon_signal_handler); signal(SIGINT, daemon_signal_handler); unlink(fifo_path);if (mkfifo(fifo_path, 0666) < 0 && errno != EEXIST) { perror("mkfifo");return -1; } int fifo_rd = open(fifo_path, O_RDONLY | O_NONBLOCK); int fifo_dummy_wr = open(fifo_path, O_WRONLY | O_NONBLOCK); set_nonblock(status_read_fd); set_nonblock(g_signal_pipe[0]); int epfd = epoll_create1(0);if (epfd < 0) { perror("epoll_create1");return -1; } struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = status_read_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, status_read_fd, &ev); ev.events = EPOLLIN; ev.data.fd = fifo_rd; epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_rd, &ev); ev.events = EPOLLIN; ev.data.fd = g_signal_pipe[0]; epoll_ctl(epfd, EPOLL_CTL_ADD, g_signal_pipe[0], &ev); log_info("daemon started"); log_info("fifo path: %s", fifo_path);while (!g_daemon_stop) { struct epoll_event events[8]; int n = epoll_wait(epfd, events, 8, 1000);if (n < 0) {if (errno == EINTR) {continue; }break; }for (int i = 0; i < n; i++) { int fd = events[i].data.fd; /* 1. 状态 pipe */if (fd == status_read_fd) {while (1) { RobotStatus status; ssize_t ret = read(status_read_fd, &status, sizeof(status));if (ret == (ssize_t)sizeof(status)) { log_info("robot status: fusion_seq=%d dist=%.2f speed=%.2f obstacle=%d cmd=%s pwm=%d", status.fusion.frame_seq, status.fusion.front_distance_cm, status.fusion.speed_cm_s, status.fusion.obstacle_detected, status.cmd.decision, status.cmd.motor_pwm); } elseif (ret < 0 && errno == EAGAIN) {break; } elseif (ret == 0) { log_warn("status pipe closed"); g_daemon_stop = 1;break; } else {break; } } } /* 2. FIFO 命令 */elseif (fd == fifo_rd) { char buf[256] = {0}; ssize_t ret = read(fifo_rd, buf, sizeof(buf) - 1);if (ret > 0) { buf[ret] = '\0'; buf[strcspn(buf, "\r\n")] = '\0'; handle_fifo_command(buf, &cfg, config_path, config_write_fd); } } /* 3. signal self-pipe */elseif (fd == g_signal_pipe[0]) { char sigbuf[32]; ssize_t ret = read(g_signal_pipe[0], sigbuf, sizeof(sigbuf));if (ret > 0) {for (ssize_t j = 0; j < ret; j++) {if (sigbuf[j] == 'R') { log_info("receive SIGHUP"); reload_config_and_notify(config_path, config_write_fd, &cfg); } elseif (sigbuf[j] == 'Q') { log_info("receive quit signal"); g_daemon_stop = 1; } } } } } } close(epfd); close(fifo_rd); close(fifo_dummy_wr); close(g_signal_pipe[0]); close(g_signal_pipe[1]); unlink(fifo_path); log_info("daemon exit");return 0;}守护进程不是简单 while+阻塞读,而是用 epoll 管理状态 pipe、FIFO 调试接口和 signal 事件。Signal 处理采用 self-pipe trick,把异步信号转成普通 FD 事件,在 epoll 主循环里完成热重载逻辑,这样更安全、更工程化。
主线程职责:
加载初始配置
创建两根 pipe
主进程 → 守护进程:状态
守护进程 → 主进程:配置回传
fork() 守护进程
启动 worker
监听配置回传,实时更新运行参数
#include "protocol.h"#include "config.h"#include "log.h"#include "worker.h"#include "daemon.h"#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <signal.h>#include <fcntl.h>#include <errno.h>#include <sys/epoll.h>#include <sys/wait.h>#define FIFO_PATH "/tmp/agv_robot_fifo"#define CONFIG_PATH "/tmp/agv_robot.conf"static volatile sig_atomic_t g_main_stop = 0;static void main_signal_handler(int sig){ (void)sig; g_main_stop = 1;}static int set_nonblock(int fd){ int flags = fcntl(fd, F_GETFL, 0);if (flags < 0) {return -1; }return fcntl(fd, F_SETFL, flags | O_NONBLOCK);}int main(void){ log_set_thread_name("main"); signal(SIGINT, main_signal_handler); signal(SIGTERM, main_signal_handler); RobotConfig init_cfg;if (config_load(CONFIG_PATH, &init_cfg) != 0) { config_set_default(&init_cfg); config_save(CONFIG_PATH, &init_cfg); } /* pipe_status: 主进程写状态,守护进程读 */ int pipe_status[2]; /* pipe_config: 守护进程写配置,主进程读 */ int pipe_config[2];if (pipe(pipe_status) != 0 || pipe(pipe_config) != 0) { perror("pipe");return 1; } pid_t pid = fork();if (pid < 0) { perror("fork");return 1; }if (pid == 0) { /* 子进程:守护进程 */ close(pipe_status[1]); close(pipe_config[0]); daemon_run(pipe_status[0], pipe_config[1], FIFO_PATH, CONFIG_PATH); close(pipe_status[0]); close(pipe_config[1]);exit(0); } /* 父进程 */ close(pipe_status[0]); close(pipe_config[1]);if (worker_init(&init_cfg, pipe_status[1]) != 0) { log_error("worker_init failed");return 1; }if (worker_start() != 0) { log_error("worker_start failed");return 1; } set_nonblock(pipe_config[0]); int epfd = epoll_create1(0);if (epfd < 0) { perror("epoll_create1");return 1; } struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = pipe_config[0]; epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_config[0], &ev); log_info("AGV robot system started"); log_info("daemon pid=%d", pid); log_info("fifo cmd example: echo status > %s", FIFO_PATH); log_info("reload example: kill -HUP %d", pid);while (!g_main_stop) { struct epoll_event events[4]; int n = epoll_wait(epfd, events, 4, 1000);if (n < 0) {if (errno == EINTR) {continue; }break; }for (int i = 0; i < n; i++) {if (events[i].data.fd == pipe_config[0]) {while (1) { RobotConfig cfg; ssize_t ret = read(pipe_config[0], &cfg, sizeof(cfg));if (ret == (ssize_t)sizeof(cfg)) { log_info("receive config from daemon: danger=%d slow=%d log_level=%d", cfg.danger_distance_cm, cfg.slow_distance_cm, cfg.log_level); worker_update_config(&cfg); } elseif (ret < 0 && errno == EAGAIN) {break; } elseif (ret == 0) { log_warn("config pipe closed"); g_main_stop = 1;break; } else {break; } } } } } log_info("system stopping..."); worker_stop(); worker_join(); close(pipe_status[1]); close(pipe_config[0]); close(epfd);kill(pid, SIGTERM); waitpid(pid, NULL, 0); log_info("AGV robot system exit");return 0;}CC = gccCFLAGS = -Wall -Wextra -O2 -IincludeLDFLAGS = -lpthreadTARGET = agv_robotSRC = src/main.c src/log.c src/config.c src/ring_buffer.c src/sensor.c src/fusion.c src/control.c src/worker.c src/daemon.call: $(TARGET)$(TARGET): $(SRC) $(CC) $(CFLAGS) -o $@ $(SRC) $(LDFLAGS)clean: rm -f $(TARGET).PHONY: all clean总结:
机器人小车的多线程/多进程控制项目。
主进程里把多个传感器采集拆成独立线程,例如:超声波、红外和里程计,每个采集线程通过共享环形缓冲区把数据投递给融合线程。
环形缓冲区内部用互斥锁和条件变量保证线程安全。 融合线程维护一个多传感器上下文,实时更新前方距离和速度状态,再结合运行配置做控制决策,输出 STOP、SLOW、FORWARD 等控制命令。
主进程把融合状态通过 pipe 发给后台守护进程,守护进程采用 epoll 统一监听状态 pipe、FIFO 调试接口以及 signal 事件。
在配置热更新上,使用 SIGHUP 通知守护进程重载配置文件,守护进程再把新配置通过另一根 pipe 回传给主进程,主进程动态更新控制阈值。
此外,为了让多线程日志更易排障,使用了 TLS 保存线程上下文,让每条日志自带线程身份。