1. 概述与背景
RPS (Receive Packet Steering) 是 Linux 内核中一项软件层面的多队列收包负载均衡技术,用于将网络数据包的处理分发到多个 CPU 核心上。
1.1 为什么需要 RPS?
- 单队列网卡:物理网卡只有一个 RX 队列,所有收包中断都落在一个 CPU 上,导致该 CPU 成为瓶颈
- 多队列网卡队列数 < CPU 数:硬件队列数有限,无法覆盖所有 CPU
- NUMA 亲和性:希望数据包在应用程序所在的 CPU 上处理,减少跨 NUMA 访问
1.2 RPS 工作原理
RPS 在 netif_receive_skb() 中拦截即将进入协议栈的 skb,通过 hash 计算选择一个目标 CPU,将该 skb 挂入目标 CPU 的 backlog 队列,然后通过 IPI 通知目标 CPU 触发软中断来处理该 skb。
1.3 与 RFS 的关系
RPS 解决「哪个 CPU 处理」,RFS (Receive Flow Steering) 增强它:RPS 基于静态 CPU bitmap 随机分发,RFS 则记住「应用程序上次在哪个 CPU 上调用 recvmsg()」,使用应用层 CPU 亲和性来做决策,避免跨 CPU 的缓存失效。
1.4 关键宏控制
// RPS 整个子系统通过 CONFIG_RPS 编译开关控制#ifdef CONFIG_RPS// rps_needed: 启用了 RPS map(rps_cpus sysfs 配置)时打开// rfs_needed: 启用了 RFS 全局流表(rps_sock_flow_entries sysctl)时打开
2. 核心数据结构
2.1 RPS CPU Map —— 每个 RX 队列的 CPU 分配表
// include/net/rps.h: 19-24structrps_map {unsignedint len; // CPU 数量structrcu_headrcu; u16 cpus[]; // CPU 编号数组(变长)};#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + ((_num) * sizeof(u16)))
含义:每个网卡 RX 队列可以配置一组目标 CPU。当 hash 命中时,从该数组中选一个 CPU。
2.2 RPS Dev Flow Table —— 每个 RX 队列的 per-flow 状态表
// include/net/rps.h: 31-47structrps_dev_flow { u16 cpu; // 该 flow 当前被分配到的 CPU u16 filter; // aRFS 硬件 filter ID(RPS_NO_FILTER = 0xffff 表示未使用)unsignedint last_qtail; // 上次入队时目标 CPU backlog 队列的 tail 指针};structrps_dev_flow_table { u8 log; // log2(表大小)structrcu_headrcu;structrps_dev_flowflows[];// 流表项数组};
last_qtail 的作用:这是保证同流保序的核心。RFS 如果检测到应用程序换到了另一个 CPU,不能立即切换——必须确保目标 CPU 上该流的「所有旧 skb 已经处理完毕」。last_qtail 记录的是入队时目标 CPU backlog 队列的 head 指针(注意是 head,不是 tail)。当切换 CPU 时,检查「head 是否已经走过了 last_qtail」,如果是,说明旧数据已消费完,可以安全切换。
2.3 RPS Sock Flow Table —— 全局流→CPU 映射(RFS 核心)
// include/net/rps.h: 49-65/* * The rps_sock_flow_table contains mappings of flows to the last CPU * on which they were processed by the application (set in recvmsg). * Each entry is a 32bit value. Upper part is the high-order bits * of flow hash, lower part is CPU number. * rps_cpu_mask is used to partition the space. * 例如,64 个 CPU 时,rps_cpu_mask = 0x3f, * 低 6 位是 CPU 编号,高位是 hash 高位(用于去碰撞)。 */structrps_sock_flow_table {structrcu_headrcu; u32 mask; u32 ents[] ____cacheline_aligned_in_smp;};#define RPS_SOCK_FLOW_TABLE_SIZE(_num) \ (offsetof(struct rps_sock_flow_table, ents[_num]))#define RPS_NO_CPU 0xffff
编码方式:ents[i] = (hash & ~rps_cpu_mask) | smp_processor_id()即高 26 位存 hash 高位(去碰撞),低 6 位存 CPU 编号。
2.4 Per-CPU Softnet Data —— 每个 CPU 的收包基础设施
// include/linux/netdevice.h: 3465-3517structsoftnet_data {structlist_headpoll_list;// NAPI poll 列表structsk_buff_headprocess_queue;// backlog 处理后移交的 skb 队列local_lock_t process_queue_bh_lock;unsignedint processed;unsignedint time_squeeze;#ifdef CONFIG_RPSstructsoftnet_data *rps_ipi_list;// RPS IPI 链表头#endifunsignedint received_rps; // 收到的 RPS 中断次数(统计)bool in_net_rx_action;bool in_napi_threaded_poll;#ifdef CONFIG_RPSunsignedint input_queue_head ____cacheline_aligned_in_smp;// ---- 以下可被其他 CPU 访问(RPS/RFS 跨 CPU) ----call_single_data_t csd ____cacheline_aligned_in_smp;structsoftnet_data *rps_ipi_next;// IPI 链表 next 指针unsignedint cpu;unsignedint input_queue_tail; // 入队时的 tail 编号#endifstructsk_buff_headinput_pkt_queue;// backlog 收包队列(主要的 RPS 目标队列)structnapi_structbacklog;// backlog NAPI 结构// ...};
关键cache line 设计:
input_queue_head 仅由本地 CPU 写,其他 CPU 读 → 独立 cache linecsd / rps_ipi_next / input_queue_tail 跨 CPU 访问 → 独立 cache line
2.5 Netdev RX Queue —— 每个硬件 RX 队列的 RPS 配置
// include/net/netdev_rx_queue.h: 12-31structnetdev_rx_queue {structxdp_rxq_infoxdp_rxq;#ifdef CONFIG_RPSstructrps_map __rcu *rps_map;// CPU mask(sysfs: rps_cpus)structrps_dev_flow_table __rcu *rps_flow_table;// flow table(sysfs: rps_flow_cnt)#endifstructkobjectkobj;structnet_device *dev;#ifdef CONFIG_XDP_SOCKETSstructxsk_buff_pool *pool;#endifstructnapi_struct *napi;} ____cacheline_aligned_in_smp;
2.6 Net Hotdata —— 全局热路径只读数据
// include/net/hotdata.h: 10-41structnet_hotdata {// ...#ifdef CONFIG_RPSstructrps_sock_flow_table __rcu *rps_sock_flow_table;// 全局 RFS 流表 u32 rps_cpu_mask; // = roundup_pow_of_two(nr_cpu_ids) - 1#endifint max_backlog; // /proc/sys/net/core/netdev_max_backlogint dev_rx_weight; // NAPI 每次 poll 的处理配额// ...};
3. 配置接口
3.1 sysfs 配置接口
| | |
|---|
/sys/class/net/<dev>/queues/rx-<n>/rps_cpus | | rxqueue->rps_map |
/sys/class/net/<dev>/queues/rx-<n>/rps_flow_cnt | | rxqueue->rps_flow_table |
写入 rps_cpus 的核心路径
// net/core/net-sysfs.c: 981-1020staticintnetdev_rx_queue_set_rps_mask(struct netdev_rx_queue *queue,cpumask_var_t mask){staticDEFINE_MUTEX(rps_map_mutex);structrps_map *old_map, *map;int cpu, i;// 1. 分配 RPS map,填入 CPU 编号map = kzalloc(max_t(unsignedint, RPS_MAP_SIZE(cpumask_weight(mask)), L1_CACHE_BYTES), GFP_KERNEL); i = 0; for_each_cpu_and(cpu, mask, cpu_online_mask) // 只包含在线 CPUmap->cpus[i++] = cpu;// 2. RCU 替换旧 map mutex_lock(&rps_map_mutex); old_map = rcu_dereference_protected(queue->rps_map, ...); rcu_assign_pointer(queue->rps_map, map);// 3. 更新静态分支(控制快速路径的 if 判断)if (map) static_branch_inc(&rps_needed); // rps_needed++if (old_map) static_branch_dec(&rps_needed); // rps_needed-- mutex_unlock(&rps_map_mutex);// ...}
关键点:rps_needed 是 static_branch,在快速路径中通过 static_branch_unlikely(&rps_needed) 判断。如果没有任何网卡配置 RPS,这个分支在编译时就被优化为 NOP,零开销。
Housekeeping CPU 过滤
// net/core/net-sysfs.c: 1022-1031intrps_cpumask_housekeeping(struct cpumask *mask){if (!cpumask_empty(mask)) {// 只允许 housekeeping CPU(排除 isolated CPU) cpumask_and(mask, mask, housekeeping_cpumask(HK_TYPE_DOMAIN)); cpumask_and(mask, mask, housekeeping_cpumask(HK_TYPE_WQ));if (cpumask_empty(mask))return -EINVAL; }return0;}
3.2 sysctl 配置接口
| | |
|---|
net.core.rps_sock_flow_entries | | net_hotdata.rps_sock_flow_table |
net.core.rps_default_mask | | net->core.rps_default_mask |
net.core.netdev_max_backlog | | net_hotdata.max_backlog |
设置 rps_sock_flow_entries 的初始化路径
// net/core/sysctl_net_core.c: 137-203staticintrps_sock_flow_sysctl(const struct ctl_table *table, int write, ...){// ... net_hotdata.rps_cpu_mask = roundup_pow_of_two(nr_cpu_ids) - 1; sock_table->mask = size - 1;for (i = 0; i < size; i++) sock_table->ents[i] = RPS_NO_CPU; // 初始化为无 CPU rcu_assign_pointer(net_hotdata.rps_sock_flow_table, sock_table); static_branch_inc(&rps_needed); // 开启 RPS 快速路径 static_branch_inc(&rfs_needed); // 开启 RFS 快速路径}
4. CPU 选择算法(get_rps_cpu)
这是 RPS 最核心的函数,在 netif_receive_skb_internal() 中调用,决定 skb 应该被发送到哪个 CPU。
// net/core/dev.c: 4903-4998staticintget_rps_cpu(struct net_device *dev, struct sk_buff *skb, struct rps_dev_flow **rflowp)
4.1 完整决策流程
get_rps_cpu(dev, skb, &rflow) │ ├─ 1. 获取 RX 队列 │ if (skb 携带 rx_queue 信息) │ rxqueue = dev->_rx + skb_get_rx_queue(skb) │ ├─ 2. 快速退出检查 │ flow_table = rxqueue->rps_flow_table │ map = rxqueue->rps_map │ if (!flow_table && !map) // 该 RX 队列未启用任何 RPS │ return -1 // → 在本地 CPU 处理 │ ├─ 3. 计算 hash(hash = skb_get_hash(skb)) │ if (!hash) // hash 为 0,无法做流分类 │ return -1 │ ├─ 4. 【RFS 路径】检查全局流表 (sock_flow_table) │ sock_flow_table = net_hotdata.rps_sock_flow_table │ if (flow_table && sock_flow_table) // 两者都已配置 │ { │ ident = sock_flow_table->ents[hash & mask] │ if ((ident ^ hash) 的高位匹配) // 命中! │ { │ next_cpu = ident & rps_cpu_mask // RFS 建议的 CPU │ rflow = flow_table->flows[hash_slot] │ tcpu = rflow->cpu // 当前 flow 所在的 CPU │ │ // 【决定是否切换 CPU】 │ if (tcpu != next_cpu) { │ 允许切换的条件(三者满足其一): │ ① tcpu >= nr_cpu_ids(从未分配) │ ② !cpu_online(tcpu) (当前 CPU 已下线) │ ③ input_queue_head - last_qtail >= 0 │ (目标 CPU 已消费完该 flow 的所有旧 skb) │ │ if 满足: │ tcpu = next_cpu │ set_rps_cpu(dev, skb, rflow, next_cpu) │ } │ │ if (tcpu 有效且在线) │ return tcpu // RFS 命中! │ } │ } │ ├─ 5. 【RPS 路径】基于 rps_map 的随机分发 │ if (map) // 只有 rps_cpus 配置了但没配置 rps_flow_cnt │ { │ tcpu = map->cpus[reciprocal_scale(hash, map->len)] │ if (cpu_online(tcpu)) │ return tcpu // 随机分发 │ } │ └─ 6. 都不命中 → return -1(本地处理)
4.2 关键源码注释
// net/core/dev.c: 4960-4977/* * If the desired CPU (where last recvmsg was done) is * different from current CPU (one in the rx-queue flow * table entry), switch if one of the following holds: * - Current CPU is unset (>= nr_cpu_ids). * - Current CPU is offline. * - The current CPU's queue tail has advanced beyond the * last packet that was enqueued using this table entry. * This guarantees that all previous packets for the flow * have been dequeued, thus preserving in order delivery. */if (unlikely(tcpu != next_cpu) && (tcpu >= nr_cpu_ids || !cpu_online(tcpu) || ((int)(READ_ONCE(per_cpu(softnet_data, tcpu).input_queue_head) - rflow->last_qtail)) >= 0)) { tcpu = next_cpu; rflow = set_rps_cpu(dev, skb, rflow, next_cpu);}
4.3 set_rps_cpu —— 更新 flow 表项并处理 aRFS
// net/core/dev.c: 4852-4896static struct rps_dev_flow *set_rps_cpu(struct net_device *dev, struct sk_buff *skb, struct rps_dev_flow *rflow, u16 next_cpu){if (next_cpu < nr_cpu_ids) {#ifdef CONFIG_RFS_ACCEL// 【aRFS 路径】如果需要将 flow 转向某个 CPU,// 尝试通过 ndo_rx_flow_steer() 让硬件直接把数据包// 送到 next_cpu 对应的硬件队列 rxq_index = cpu_rmap_lookup_index(dev->rx_cpu_rmap, next_cpu); flow_id = rfs_slot(skb_get_hash(skb), flow_table); rc = dev->netdev_ops->ndo_rx_flow_steer(dev, skb, rxq_index, flow_id);// ...#endif// 记录入队时目标 CPU 的队列 head 位置 head = READ_ONCE(per_cpu(softnet_data, next_cpu).input_queue_head); rps_input_queue_tail_save(&rflow->last_qtail, head); } WRITE_ONCE(rflow->cpu, next_cpu);return rflow;}
aRFS 逻辑:如果网卡支持 NETIF_F_NTUPLE 和 ndo_rx_flow_steer,则设置硬件 filter,让后续的数据包直接通过硬件队列到达目标 CPU,彻底绕过 RPS 软件路径。
4.4 RPS 分发算法
// 使用 reciprocal_scale 实现均匀分布// include/linux/math.h: 205tcpu = map->cpus[reciprocal_scale(hash, map->len)];
reciprocal_scale(val, ep_ro) 将 hash 值均匀映射到 [0, ep_ro) 范围:
- 通过预计算的 reciprocal 值用乘法代替除法取模操作
- 保证同一 hash(同一 flow)总是映射到同一个 CPU 索引
5. 数据包入队(enqueue_to_backlog)
当选定了目标 CPU 后,调用 enqueue_to_backlog() 将 skb 挂入该 CPU 的 backlog 队列。
// net/core/dev.c: 5158-5207staticintenqueue_to_backlog(struct sk_buff *skb, int cpu,unsignedint *qtail)
5.1 完整流程
enqueue_to_backlog(skb, cpu, &rflow->last_qtail) │ ├─ 1. 检查设备是否运行中 │ if (!netif_running(skb->dev)) │ goto bad_dev → kfree_skb → return NET_RX_DROP │ ├─ 2. 获取目标 CPU 的 softnet_data │ sd = &per_cpu(softnet_data, cpu) │ ├─ 3. 快速队列长度检查(无锁) │ qlen = skb_queue_len_lockless(&sd->input_pkt_queue) │ if (qlen > max_backlog) // 超过最大 backlog │ goto cpu_backlog_drop │ ├─ 4. 获取队列锁,再次检查(精确检查) │ backlog_lock_irq_save(sd, &flags) │ qlen = skb_queue_len(&sd->input_pkt_queue) │ if (qlen <= max_backlog && !skb_flow_limit(skb, qlen)) │ { │ if (!qlen) // 队列之前为空,需要调度 NAPI │ { │ if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) │ napi_schedule_rps(sd); // ← 关键:通知目标 CPU │ } │ __skb_queue_tail(&sd->input_pkt_queue, skb); // 入队 │ tail = rps_input_queue_tail_incr(sd); // tail++ │ backlog_unlock_irq_restore(sd, &flags); │ │ rps_input_queue_tail_save(qtail, tail); // 保存 tail 到 rflow │ return NET_RX_SUCCESS; │ } │ └─ 5. 队列满或 flow limit 触发 → drop atomic_inc(&sd->dropped) kfree_skb(skb)return NET_RX_DROP
5.2 napi_schedule_rps —— IPI 通知机制
// net/core/dev.c: 5071-5094staticvoidnapi_schedule_rps(struct softnet_data *sd){structsoftnet_data *mysd = this_cpu_ptr(&softnet_data);#ifdef CONFIG_RPSif (sd != mysd) { // 目标 CPU 不是当前 CPUif (use_backlog_threads()) { __napi_schedule_irqoff(&sd->backlog); // backlog 线程模式return; }// 【经典 RPS 路径】 sd->rps_ipi_next = mysd->rps_ipi_list; // 链入 IPI 链表 mysd->rps_ipi_list = sd;if (!mysd->in_net_rx_action && !mysd->in_napi_threaded_poll) __raise_softirq_irqoff(NET_RX_SOFTIRQ); // 触发软中断return; }#endif __napi_schedule_irqoff(&mysd->backlog); // 目标 CPU 就是当前 CPU}
IPI 链表设计:
当前 CPU (cpu0) 的 rps_ipi_list: sd(cpu0).rps_ipi_list → sd(cpu3) → sd(cpu5) → NULL ↑ rps_ipi_next = sd(cpu5)
- 入队时将目标 CPU 的
softnet_data 链入当前 CPU 的 rps_ipi_list - 在
net_rx_action() 循环末尾,当前 CPU 遍历 rps_ipi_list,向每个远端 CPU 发送 IPI
6. 跨 CPU IPI 通知机制
6.1 IPI 发送
// net/core/dev.c: 6391-6402staticvoidnet_rps_send_ipi(struct softnet_data *remsd){while (remsd) {structsoftnet_data *next = remsd->rps_ipi_next;if (cpu_online(remsd->cpu)) smp_call_function_single_async(remsd->cpu, &remsd->csd); remsd = next; }}
remsd->csd 是一个 call_single_data_t,其中绑定的回调函数是 rps_trigger_softirq:
// net/core/dev.c: 5041-5048staticvoidrps_trigger_softirq(void *data){structsoftnet_data *sd = data; ____napi_schedule(sd, &sd->backlog); // 调度 backlog NAPI WRITE_ONCE(sd->received_rps, sd->received_rps + 1); // 统计}
6.2 IPI 发送时机
// net/core/dev.c: 6408-6422staticvoidnet_rps_action_and_irq_enable(struct softnet_data *sd){structsoftnet_data *remsd = sd->rps_ipi_list;if (!use_backlog_threads() && remsd) { sd->rps_ipi_list = NULL; local_irq_enable(); net_rps_send_ipi(remsd); // 发送 IPI 给所有等待的 CPU } else local_irq_enable();}
调用位置:net_rx_action() 在每轮 poll 循环结束时调用(此时开中断前发送 IPI)。
7. 软中断处理(process_backlog)
目标 CPU 收到 IPI 后,rps_trigger_softirq 调度了 sd->backlog NAPI,在软中断 NET_RX_SOFTIRQ 中由 net_rx_action() → napi_poll() → process_backlog() 处理。
// net/core/dev.c: 6434-6491staticintprocess_backlog(struct napi_struct *napi, int quota){structsoftnet_data *sd = container_of(napi, structsoftnet_data, backlog);bool again = true;int work = 0;// 1. 先检查是否有新的 RPS IPI 需要发送if (sd_has_rps_ipi_waiting(sd)) { local_irq_disable(); net_rps_action_and_irq_enable(sd); // 转发 IPI 到下游 CPU } napi->weight = READ_ONCE(net_hotdata.dev_rx_weight);// 2. 两层循环处理 skbwhile (again) {structsk_buff *skb;// 2a. 处理 process_queue(上一轮从 input_pkt_queue 转入的) local_lock_nested_bh(&softnet_data.process_queue_bh_lock);while ((skb = __skb_dequeue(&sd->process_queue))) { local_unlock_nested_bh(&softnet_data.process_queue_bh_lock); rcu_read_lock(); __netif_receive_skb(skb); // 进入协议栈 rcu_read_unlock();if (++work >= quota) { // 配额用尽 rps_input_queue_head_add(sd, work);return work; } local_lock_nested_bh(&softnet_data.process_queue_bh_lock); } local_unlock_nested_bh(&softnet_data.process_queue_bh_lock);// 2b. 从 input_pkt_queue 批量搬运到 process_queue backlog_lock_irq_disable(sd);if (skb_queue_empty(&sd->input_pkt_queue)) { napi->state &= NAPIF_STATE_THREADED; // 清除 SCHED 标记 again = false; // 没有更多数据 } else {// 批量搬运:将整个 input_pkt_queue 拼接到 process_queue local_lock_nested_bh(&softnet_data.process_queue_bh_lock); skb_queue_splice_tail_init(&sd->input_pkt_queue, &sd->process_queue); local_unlock_nested_bh(&softnet_data.process_queue_bh_lock); } backlog_unlock_irq_enable(sd); }if (work) rps_input_queue_head_add(sd, work); // 更新 head 计数器return work;}
7.1 process_backlog 设计要点
处理流程: input_pkt_queue(入队侧) process_queue(出队侧) ┌────┬────┬────┬────┐ splice ┌────┬────┬────┬────┐ │skb1│skb2│skb3│skb4│ ═══════════════════>│skb1│skb2│skb3│skb4│ └────┴────┴────┴────┘ └────┴────┴────┴────┘ ↑ 被其他 CPU 写入 ↓ 被本 CPU 消费 (加锁保护) __netif_receive_skb()
- 批量搬运
skb_queue_splice_tail_init:一次性将 input_pkt_queue 全部搬入 process_queue,减少锁竞争 - 两层结构:
input_pkt_queue 有锁保护可被其他 CPU 写入,process_queue 仅本地访问 - input_queue_head / input_queue_tail:这两个计数器用于跟踪 RFS flow 切换时的保序条件
8. NET_RX_SOFTIRQ 调度循环(net_rx_action)
// net/core/dev.c: 7657-7723static __latent_entropy voidnet_rx_action(void){structsoftnet_data *sd = this_cpu_ptr(&softnet_data);unsignedlong time_limit = jiffies + usecs_to_jiffies(READ_ONCE(net_hotdata.netdev_budget_usecs));int budget = READ_ONCE(net_hotdata.netdev_budget);start: sd->in_net_rx_action = true; local_irq_disable(); list_splice_init(&sd->poll_list, &list); // 取出 poll 列表 local_irq_enable();for (;;) {structnapi_struct *n;if (list_empty(&list)) {if (list_empty(&repoll)) { sd->in_net_rx_action = false; barrier();// 检查是否有新的 NAPI 被调度if (!list_empty(&sd->poll_list))goto start; // 有新的,重来if (!sd_has_rps_ipi_waiting(sd))goto end; // 全部完成 }break; } n = list_first_entry(&list, struct napi_struct, poll_list); budget -= napi_poll(n, &repoll); // 调用 poll (可能是 process_backlog)// 预算或时间用尽,本轮结束if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit))) { WRITE_ONCE(sd->time_squeeze, sd->time_squeeze + 1);break; } } local_irq_disable(); list_splice_tail_init(&sd->poll_list, &list); // 收集剩余 list_splice_tail(&repoll, &list); list_splice(&list, &sd->poll_list);if (!list_empty(&sd->poll_list)) __raise_softirq_irqoff(NET_RX_SOFTIRQ); // 再次触发else sd->in_net_rx_action = false; net_rps_action_and_irq_enable(sd); // 发送 RPS IPIend:// ...}
8.1 net_rx_action 与 RPS 的交互总结
net_rx_action() on CPU0: | ├─ poll device NAPI (如网卡驱动) │ │ │ └─ napi_gro_receive → netif_receive_skb() │ └─ get_rps_cpu() → 选择 CPU3 │ └─ enqueue_to_backlog(skb, CPU3) │ └─ napi_schedule_rps(sd_CPU3) │ └─ 将 sd_CPU3 链入 CPU0 的 rps_ipi_list │ ├─ poll 循环结束 │ └─ net_rps_action_and_irq_enable(sd_CPU0) └─ 遍历 rps_ipi_list: ├─ smp_call_function_single_async(CPU3, ...) │ └─ 在 CPU3 上执行 rps_trigger_softirq() │ └─ ____napi_schedule(sd_CPU3, &sd_CPU3.backlog) │ └─ 将 backlog 加入 CPU3 的 poll_list │ └─ __raise_softirq_irqoff(NET_RX_SOFTIRQ) on CPU3 │ └─ CPU3 在 NET_RX_SOFTIRQ 中: net_rx_action() └─ napi_poll(&sd_CPU3.backlog) └─ process_backlog() └─ __netif_receive_skb(skb) ← 在 CPU3 上进入协议栈
9. RFS(Receive Flow Steering)
RFS 是 RPS 的增强版,利用应用程序的 CPU 亲和性来决定数据包的目标 CPU。
9.1 应用层记录(recvmsg 触发)
// include/net/rps.h: 88-100staticinlinevoidsock_rps_record_flow_hash(__u32 hash){structrps_sock_flow_table *sock_flow_table;if (!hash) return; rcu_read_lock(); sock_flow_table = rcu_dereference(net_hotdata.rps_sock_flow_table);if (sock_flow_table) rps_record_sock_flow(sock_flow_table, hash); rcu_read_unlock();}
调用链:
应用调用 recvmsg() / read() / splice_read() │ │ TCP: tcp_recvmsg_locked() │ └─ tcp_cleanup_rbuf(sk, copied) │ └─ sock_rps_record_flow(sk) │ └─ if (sk->sk_state == TCP_ESTABLISHED) │ sock_rps_record_flow_hash(READ_ONCE(sk->sk_rxhash)) │ │ UDP: udp_recvmsg() │ └─ sock_rps_record_flow(sk) │ ▼rps_record_sock_flow(table, hash) { index = hash & table->mask; val = (hash & ~rps_cpu_mask) | raw_smp_processor_id(); // 高位是 hash 高位 低 6 位是当前 CPU 编号 WRITE_ONCE(table->ents[index], val); }
9.2 RFS 在 get_rps_cpu 中的使用(回顾)
// 在 get_rps_cpu() 中:ident = READ_ONCE(sock_flow_table->ents[hash & sock_flow_table->mask]);// 验证:高位 hash 必须匹配(去碰撞)if ((ident ^ hash) & ~net_hotdata.rps_cpu_mask)goto try_rps; // 碰撞,走普通 RPSnext_cpu = ident & net_hotdata.rps_cpu_mask; // 提取 CPU 编号// 如果 flow 的当前 CPU 与 RFS 建议不一致// 且满足切换条件(旧数据已消费完),则切换 CPUif (tcpu != next_cpu && (input_queue_head - last_qtail) >= 0) { tcpu = next_cpu; // ← 切换到应用所在的 CPU}
9.3 Connection Cleanup
连接关闭时从 RFS 表中清除 entry:
// include/net/rps.h: 126-148staticinlinevoidsock_rps_delete_flow(const struct sock *sk){// ... table = rcu_dereference(net_hotdata.rps_sock_flow_table);if (table) { index = hash & table->mask;if (READ_ONCE(table->ents[index]) != RPS_NO_CPU) WRITE_ONCE(table->ents[index], RPS_NO_CPU); }}
10. RFS 硬件加速(aRFS)
aRFS (Accelerated RFS) 是 RFS 的硬件辅助实现。当支持 NTUPLE filter 的网卡可用时,内核可以让硬件将特定流的数据包直接送到指定 CPU 的硬件队列,完全跳过软件 RPS。
10.1 触发条件
// net/core/dev.c: 4866-4869// 三个条件必须同时满足:if (!skb_rx_queue_recorded(skb) || // ① skb 记录了来自哪个硬件队列 !dev->rx_cpu_rmap || // ② 网卡提供了 CPU→ 硬件队列反向映射 !(dev->features & NETIF_F_NTUPLE)) // ③ 网卡支持 NTUPLE filtergoto out;
10.2 aRFS 执行
// net/core/dev.c: 4870-4888rxq_index = cpu_rmap_lookup_index(dev->rx_cpu_rmap, next_cpu);// 将 CPU 编号映射为硬件队列索引if (rxq_index == skb_get_rx_queue(skb))goto out; // 已经是目标队列,无需操作flow_id = rfs_slot(skb_get_hash(skb), flow_table);rc = dev->netdev_ops->ndo_rx_flow_steer(dev, skb, rxq_index, flow_id);// 驱动实现:配置硬件 filter,将 hash → flow_id 的数据包// 送到硬件队列 rxq_indexif (rc < 0) goto out;rflow = &flow_table->flows[flow_id];WRITE_ONCE(rflow->filter, rc); // 记录 filter ID
10.3 aRFS Filter 过期检查
// net/core/dev.c: 5013-5036boolrps_may_expire_flow(struct net_device *dev, u16 rxq_index, u32 flow_id, u16 filter_id){structrps_dev_flow *rflow = &flow_table->flows[flow_id]; cpu = READ_ONCE(rflow->cpu);// 检查:该 flow 在最近 N 个包内是否还有活跃流量if (READ_ONCE(rflow->filter) == filter_id && cpu < nr_cpu_ids && ((int)(input_queue_head - last_qtail) < (int)(10 << flow_table->log))) expire = false; // 流量活跃,不过期return expire;}
网卡驱动通常通过定时器周期性调用此函数来清理不活跃的硬件 filter。
11. Flow Limit 机制
当 backlog 队列长度超过 max_backlog / 2 时,启用 flow limit 来防止单个 flow 占用过多队列。
// net/core/dev.c: 5117-5152staticboolskb_flow_limit(struct sk_buff *skb, unsignedint qlen){if (qlen < (READ_ONCE(net_hotdata.max_backlog) >> 1))returnfalse; // 队列不算满,不启用 sd = this_cpu_ptr(&softnet_data); fl = rcu_dereference(sd->flow_limit);if (fl) { new_flow = hash_32(skb_get_hash(skb), fl->log_buckets); old_flow = fl->history[fl->history_head]; // 滑动窗口中最老的 entry fl->history[fl->history_head] = new_flow; fl->history_head = (fl->history_head + 1) & (FLOW_LIMIT_HISTORY - 1);if (likely(fl->buckets[old_flow])) fl->buckets[old_flow]--; // 减去离开窗口的 flowif (++fl->buckets[new_flow] > (FLOW_LIMIT_HISTORY >> 1)) {// 该 flow 在历史窗口中的占比超过 50% fl->count++;returntrue; // 丢弃该包 } }returnfalse;}
算法:维护一个长度为 FLOW_LIMIT_HISTORY 的滑动窗口。当某个 flow 在窗口中的出现次数超过一半时,后续该 flow 的包会被丢弃,以防止单个 flow 占满 backlog 导致其他 flow 饥饿。
12. 完整调用流程图
12.1 RPS 总体架构图
┌──────────────────────────────────────────────────┐ │ 网卡硬件 │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │RXQ 0│ │RXQ 1│ │RXQ 2│ │RXQ 3│ │ │ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │ └──────┼────────┼────────┼────────┼───────────────┘ │ │ │ │ ┌──────┼────────┼────────┼────────┼───────────────┐ │ ▼ ▼ ▼ ▼ │ │ CPU 0 CPU 1 CPU 2 CPU 3 │ │ (中断) (中断) (中断) (中断) │ │ │ │ │ │ │ │ │ NAPI poll + GRO │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ netif_receive_skb() │ │ │ │ │ ├── static_branch(rps_needed)? │ │ │ YES ↓ │ │ │ get_rps_cpu(skb) ←── CPU 选择核心 │ │ │ │ │ │ │ ├─ RFS 命中? → 应用所在 CPU │ │ │ ├─ RPS 命中? → reciprocal_scale 随机 CPU │ │ │ └─ 未命中 → 本地 CPU (-1) │ │ │ │ │ ├── cpu >= 0? │ │ │ YES ↓ │ │ │ enqueue_to_backlog(skb, target_cpu) │ │ │ │ │ │ │ ├─ 目标 CPU == 本地? │ │ │ │ YES → __napi_schedule(local_backlog)│ │ │ │ NO → 链入 rps_ipi_list │ │ │ │ │ │ │ └─ 队列入队完成 → 返回 │ │ │ │ │ └── cpu < 0? │ │ YES → __netif_receive_skb(skb) 本地处理 │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ net_rx_action() 每轮结束 │ │ │ │ net_rps_action_and_irq_enable(sd) │ │ │ │ └─ 遍历 rps_ipi_list → 发送 IPI 到远程CPU│ │ │ └─────────────────────────────────────────────┘ │ └───────────────────────────────────────────────────┘ │ IPI ┌────────────────┼───────────────────────────────────┐ │ ▼ │ │ 目标 CPU 收到 IPI: │ │ rps_trigger_softirq(data) │ │ └─ ____napi_schedule(sd, &sd->backlog) │ │ └─ __raise_softirq(NET_RX_SOFTIRQ) │ │ │ │ NET_RX_SOFTIRQ → net_rx_action() │ │ └─ napi_poll(&sd->backlog) │ │ └─ process_backlog(napi, quota) │ │ │ │ │ ├─ sd_has_rps_ipi_waiting? │ │ │ → 递归转发 IPI │ │ │ │ │ ├─ while (process_queue 非空) │ │ │ __netif_receive_skb(skb) │ │ │ (进入协议栈: TCP/IP 处理) │ │ │ │ │ └─ splice input_pkt_queue → process_q │ │ │ │ 应用程序在 recvmsg() 时: │ │ sock_rps_record_flow(sk) │ │ → 更新全局 RFS 表: flow_hash → 当前 CPU │ └───────────────────────────────────────────────────┘
13. 源码文件索引
| |
|---|
include/net/rps.h | RPS/RFS 核心数据结构和辅助函数(rps_map, rps_dev_flow, rps_sock_flow_table, sock_rps_record_flow) |
include/net/netdev_rx_queue.h | netdev_rx_queue 定义(包含 rps_map 和 rps_flow_table 指针) |
include/linux/netdevice.h:3465-3517 | softnet_data 结构(input_pkt_queue, backlog, rps_ipi_list, input_queue_head/tail, csd) |
include/net/hotdata.h:29-32 | net_hotdata.rps_sock_flow_table |
net/core/dev.c:4840-5050 | RPS 核心:get_rps_cpu(), set_rps_cpu(), rps_trigger_softirq(), rps_may_expire_flow() |
net/core/dev.c:5071-5094 | napi_schedule_rps() |
net/core/dev.c:5158-5207 | enqueue_to_backlog() |
net/core/dev.c:6168-6193 | netif_receive_skb_internal() |
net/core/dev.c:6391-6432 | net_rps_send_ipi() / net_rps_action_and_irq_enable() / sd_has_rps_ipi_waiting() |
net/core/dev.c:6434-6491 | process_backlog() |
net/core/dev.c:7657-7723 | net_rx_action() |
net/core/dev.c:5117-5152 | skb_flow_limit() |
net/core/net-sysfs.c:981-1058 | netdev_rx_queue_set_rps_mask() / store_rps_map() / store_rps_dev_flow_table_cnt() |
net/core/sysctl_net_core.c:102-203 | rps_default_mask_sysctl() |
net/ipv4/tcp.c:810,2174 | sock_rps_record_flow(sk) |
附录:关键设计总结
| |
|---|
| 零开销不启用 | static_branch_unlikely(&rps_needed) |
| 保证同流保序 | last_qtail 机制:切换 CPU 前检查旧 CPU 上的数据是否已消费完 |
| 跨 CPU 通知 | IPI 链表(rps_ipi_list)+ smp_call_function_single_async + csd |
| 减少锁竞争 | 两层队列:input_pkt_queue(锁保护)→ 批量 splice → process_queue(无锁) |
| NUMA 友好 | 结合 RFS,将数据包送到应用所在的 CPU,减少跨 NUMA 内存访问 |
| 防止单流饥饿 | Flow Limit:滑动窗口 + 频率计数,超过阈值的 flow 丢包 |
| 硬件加速 | aRFS:通过 ndo_rx_flow_steer 配置硬件 NTUPLE filter,完全跳过软件路径 |
| CPU 隔离兼容 | rps_cpumask_housekeeping() |
| 内存安全 | 所有 RPS 结构使用 RCU 保护,支持热更新配置 |