基于 Linux 主线源码系统梳理 Generic Receive Offload 的核心数据结构、接收路径、协议层处理、flush 时机与聚合算法,并给出网卡驱动接入 GRO 的工程要点与典型陷阱。
0. GRO 是什么,为什么需要它
GRO(Generic Receive Offload) 是 Linux 内核在接收路径上对相邻小包进行聚合的软件机制,与硬件 LRO(Large Receive Offload)对应但更通用、更安全。它的核心目标是:
- 降低协议栈处理开销:N 个 MSS 大小的段聚合成 1 个大 skb,只走 1 次 IP/TCP/Netfilter/路由查找,CPU 开销从 O(N) 降到 O(1)。
- 提升吞吐:10Gbps+ 链路在小包场景下,单核处理 skb 的开销会成为瓶颈,GRO 可以让单核吞吐提升数倍。
- 保留语义正确性:不像 LRO 那样可能破坏 TCP 语义(如 PAWS、ACK 边界),GRO 严格按协议规则聚合,可安全用于转发路径。
GRO 在协议栈中的位置:
NIC RX → 驱动 napi_poll → [驱动调用 napi_gro_receive / napi_gro_frags] → dev_gro_receive() → 协议 offload 回调(inet_gro_receive → tcp4_gro_receive) → 聚合或暂存到 napi->gro.hash[] → 适时 gro_complete() → 上交协议栈
1. 核心数据结构
1.1 struct gro_node —— GRO 实例的容器
每个 NAPI 实例内嵌一个 gro_node(include/linux/netdevice.h:356):
#define GRO_HASH_BUCKETS 8 /* 哈希桶数量,受 bitmask 位数限制 */structgro_list {structlist_headlist;/* 同一 flow 的 skb 链表 */int count; /* 当前桶中 skb 数量 */};structgro_node {unsignedlong bitmask; /* 标记哪些桶非空 */structgro_listhash[GRO_HASH_BUCKETS];/* 按 flow 哈希分桶 */structlist_headrx_list;/* GRO_NORMAL 包批量上交队列 */ u32 rx_count; /* rx_list 当前长度 */ u32 cached_napi_id; /* busy polling 用 */};
设计要点:
- 8 个哈希桶:基于
skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1) 分桶,相同 flow 落同桶,避免全表遍历。 - **
bitmask**:bitmap 标记哪些桶非空,__gro_flush() 用 ffs() 快速定位非空桶,O(桶数) 而非 O(包数)。 - **
rx_list**:未命中 GRO 的包(GRO_NORMAL)批量上交协议栈,避免逐包调用 netif_receive_skb(),详见 §4.4。
1.2 struct napi_gro_cb —— skb 的 GRO 控制块
每个进入 GRO 路径的 skb,其 skb->cb[] 会被复用为 napi_gro_cb(include/net/gro.h:17):
structnapi_gro_cb {union {struct {void *frag0; /* 直接指向首个 frag 的虚拟地址(快速路径)*/unsignedint frag0_len; };struct {structsk_buff *last;/* 链尾 skb(聚合用)*/unsignedlong age; /* 首包入队 jiffies(老化用)*/ }; };int data_offset; /* 当前处理位置相对 skb->data 的偏移 */ u16 flush; /* 非 0 表示不可合并 */ u16 count; /* 聚合的段数 */ u16 proto; /* 内层协议号(隧道用)*/ struct_group(zeroed, /* 每次 GRO 迭代清零的字段 */ u16 gro_remcsum_start; u8 same_flow:1; /* 是否与现有 flow 同流 */ u8 encap_mark:1; /* 隧道标记,避免重复解封装 */ u8 csum_valid:1; u8 csum_cnt:3; /* CHECKSUM_UNNECESSARY 嵌套层数 */ u8 free:2; /* 释放方式:FREE / FREE_STOLEN_HEAD */ u8 is_ipv6:1; u8 is_fou:1; u8 ip_fixedid:1; /* IP ID 是否固定(非自增)*/ u8 recursion_counter:4; /* 隧道递归深度,上限 15 */ u8 is_flist:1; /* 使用 frag_list 链接(UDP)*/ ); __wsum csum; /* CHECKSUM_COMPLETE 用 */ u16 network_offsets[2]; /* 外层/内层网络头偏移 */};#define NAPI_GRO_CB(skb) ((struct napi_gro_cb *)(skb)->cb)
关键技巧:
frag0 / frag0_len:当 skb 的线性区为空,但首个 frag 在低端内存时,直接用 frag0 指向 frag 的虚拟地址,避免 pskb_may_pull() 的内存拷贝(见 §3.2 的快速路径)。struct_group(zeroed, ...):编译期保证这 32 位字段连续对齐,dev_gro_receive() 中通过 *(u32 *)&NAPI_GRO_CB(skb)->zeroed = 0 一次清零,避免逐字段赋值。recursion_counter:防止隧道封装嵌套过深导致栈溢出,GRO_RECURSION_LIMIT = 15。
1.3 enum gro_result —— GRO 返回值
// include/linux/netdevice.h:446enum gro_result { GRO_MERGED, // skb 已合并到现有 flow,skb 本身可释放 GRO_MERGED_FREE, // 已合并,且 skb 头部被"偷走"(head_frag 优化),需特殊释放 GRO_HELD, // skb 暂存到 gro_hash,等待后续包聚合 GRO_NORMAL, // 走传统路径,加入 rx_list 批量上交 GRO_CONSUMED, // 已被消费(如 IPsec 异步处理中)};
gro_skb_finish() 根据返回值决定 skb 命运(net/core/gro.c:596):
staticgro_result_tgro_skb_finish(struct gro_node *gro, struct sk_buff *skb, gro_result_t ret){switch (ret) {case GRO_NORMAL: gro_normal_one(gro, skb, 1); // 加入 rx_list,达 batch 则上交break;case GRO_MERGED_FREE:if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD) napi_skb_free_stolen_head(skb); // 头部被偷,特殊释放elseif (skb->fclone != SKB_FCLONE_UNAVAILABLE) __kfree_skb(skb);else __napi_kfree_skb(skb, SKB_CONSUMED);break;case GRO_HELD:case GRO_MERGED:case GRO_CONSUMED:break; // 无操作 }return ret;}
2. GRO 接收入口:驱动如何接入
驱动有两种接入方式,对应两个 API。
2.1 napi_gro_receive() —— 线性 skb 路径
// include/linux/netdevice.h:4164staticinlinegro_result_tnapi_gro_receive(struct napi_struct *napi, struct sk_buff *skb){return gro_receive_skb(&napi->gro, skb);}
驱动典型用法(以 igb 为例,drivers/net/ethernet/intel/igb/igb_main.c:8934):
/* 驱动在收包过程中: * 1. 分配 skb 并填写数据 * 2. 设置 protocol、checksum、vlan 等元数据 * 3. 调用 napi_gro_receive 上交 */skb->protocol = eth_type_trans(skb, rx_ring->netdev);igb_process_skb_fields(rx_ring, rx_desc, skb); // checksum/vlan/timestampnapi_gro_receive(&q_vector->napi, skb);
2.2 napi_gro_frags() —— 零拷贝 frags 路径
当驱动将数据保留在 DMA page 中而不拷贝到 skb 线性区时,使用此接口:
// net/core/gro.c:755gro_result_tnapi_gro_frags(struct napi_struct *napi){structsk_buff *skb = napi_frags_skb(napi);// 构造 skb(仅含 ethhdr) ret = napi_frags_finish(napi, skb, dev_gro_receive(&napi->gro, skb));return ret;}
驱动典型用法(以 gve 为例,drivers/net/ethernet/google/gve/gve_rx.c:460):
skb = napi_get_frags(napi); // 分配一个空 skb(仅 GRO_MAX_HEAD 大小的 head)if (!skb) goto alloc_fail;/* 把 DMA page 作为 frag 填入 skb_shinfo(skb)->frags[] */skb_fill_page_desc(skb, nr_frags, page, offset, len);/* 多段包累加 frags,最后一包调用: */napi_gro_frags(napi);
两种模式对比:
| napi_gro_receive | napi_gro_frags |
|---|
| | |
| | |
| igb/igc/e1000e(build_skb 模式) | |
| | |
| | |
3. GRO 核心流程:dev_gro_receive()
这是 GRO 的「大脑」,位于 net/core/gro.c:460。逐段解析:
3.1 总体结构
staticenum gro_result dev_gro_receive(struct gro_node *gro, struct sk_buff *skb){ u32 bucket = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1);structgro_list *gro_list = &gro->hash[bucket];/* ... *//* (1) GRO 跳过判定:设备未启用 GRO 或挂了 XDP 程序 */if (netif_elide_gro(skb->dev))goto normal;/* (2) 预扫描现有 flow,标记 same_flow */ gro_list_prepare(&gro_list->list, skb);/* (3) 查找协议 offload handler */ list_for_each_entry_rcu(ptype, head, list) {if (ptype->type == type && ptype->callbacks.gro_receive)goto found_ptype; }goto normal; /* 无 offload handler,走普通路径 */found_ptype:/* (4) 初始化 NAPI_GRO_CB,设置 checksum *//* (5) 调用协议 gro_receive 回调 */ pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive, ipv6_gro_receive, inet_gro_receive, &gro_list->list, skb);/* (6) 处理返回值:same_flow / flush / held / complete */}
3.2 gro_list_prepare() —— same_flow 预判
在调用协议回调前,先用便宜的字段做粗筛(net/core/gro.c:342):
staticvoidgro_list_prepare(const struct list_head *head, const struct sk_buff *skb){unsignedint maclen = skb->dev->hard_header_len; u32 hash = skb_get_hash_raw(skb);structsk_buff *p; list_for_each_entry(p, head, list) {unsignedlong diffs;/* 第一关:hash 不同直接跳过 */if (hash != skb_get_hash_raw(p)) { NAPI_GRO_CB(p)->same_flow = 0;continue; }/* 第二关:dev / vlan / metadata 比对 */ diffs = (unsignedlong)p->dev ^ (unsignedlong)skb->dev; diffs |= p->vlan_all ^ skb->vlan_all; diffs |= skb_metadata_differs(p, skb);/* 第三关:MAC 头比对(以太网用 compare_ether_header,一次比较 12 字节)*/if (maclen == ETH_HLEN) diffs |= compare_ether_header(skb_mac_header(p), skb_mac_header(skb));elseif (!diffs) diffs = memcmp(skb_mac_header(p), skb_mac_header(skb), maclen);/* 慢路径:sk / dst / conntrack / tc ext 等少量场景 */if (!diffs && unlikely(skb->slow_gro | p->slow_gro)) { diffs |= p->sk != skb->sk; diffs |= skb_metadata_dst_cmp(p, skb); diffs |= skb_get_nfct(p) ^ skb_get_nfct(skb); diffs |= gro_list_prepare_tc_ext(skb, p, diffs); } NAPI_GRO_CB(p)->same_flow = !diffs; // 标记给后续协议回调用 }}
设计精髓:
skb_get_hash_raw 粗筛:hash 不同的 flow 不可能同流,直接跳过协议层的逐字段比较。compare_ether_header 一次比对 12 字节:源/目的 MAC 用一条指令比较,比 memcmp 快。slow_gro 旁路:绝大多数包 slow_gro=0,跳过 sk/dst/nfct 等冷字段,命中 fast path。
3.3 skb_gro_reset_offset() —— 设置 frag0 快速路径
staticinlinevoidskb_gro_reset_offset(struct sk_buff *skb, u32 nhoff){ NAPI_GRO_CB(skb)->network_offset = 0; NAPI_GRO_CB(skb)->data_offset = 0; headlen = skb_headlen(skb); NAPI_GRO_CB(skb)->frag0 = skb->data; // 默认指向线性区 NAPI_GRO_CB(skb)->frag0_len = headlen;if (headlen)return; // 线性区有数据,直接用/* 线性区为空(frags 模式),尝试直接用首个 frag 的虚拟地址 */ pinfo = skb_shinfo(skb); frag0 = &pinfo->frags[0];if (pinfo->nr_frags && skb_frag_page(frag0) && !PageHighMem(skb_frag_page(frag0)) && // 不能是高端内存 (!NET_IP_ALIGN || !((skb_frag_off(frag0) + nhoff) & 3))) { // 对齐 NAPI_GRO_CB(skb)->frag0 = skb_frag_address(frag0); NAPI_GRO_CB(skb)->frag0_len = min_t(unsignedint, skb_frag_size(frag0), skb->end - skb->tail); }}
关键点:高端内存(PageHighMem)不能直接 dereference,必须走 kmap_atomic 慢路径。这就是为什么 napi_gro_frags() 要求 DMA buffer 在低端内存。
3.4 协议 offload 回调链
GRO 是分层架构,每一层有自己的 gro_receive / gro_complete 回调,通过 struct packet_offload(L2)和 struct net_offload(L3+)注册:
// 注册 L2 offload(按 ethertype 分发)structpacket_offload { __be16 type; u16 priority;structoffload_callbacks {structsk_buff *(*gro_receive)(structlist_head *head, structsk_buff *skb);int (*gro_complete)(struct sk_buff *skb, int nhoff); } callbacks;structlist_headlist;};voiddev_add_offload(struct packet_offload *po);// 注册 L4 offload(按 IP protocol 分发)structnet_offload {structoffload_callbackscallbacks;};inet_add_offload(const struct net_offload *ops, int protocol);
调用链示例(TCP over IPv4 over Ethernet):
dev_gro_receive() └─ ptype->callbacks.gro_receive = inet_gro_receive (ETH_P_IP) └─ ops->callbacks.gro_receive = tcp4_gro_receive (IPPROTO_TCP) └─ tcp_gro_receive()
3.5 返回值处理与 held 包入队
// net/core/gro.c:519-565same_flow = NAPI_GRO_CB(skb)->same_flow;ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;if (pp) { // pp 非空:协议层要求 flush 一个旧 flow skb_list_del_init(pp); gro_complete(gro, pp); // 完成聚合,上交协议栈 gro_list->count--;}if (same_flow)goto ok; // 已合并,结束if (NAPI_GRO_CB(skb)->flush)goto normal; // 不可合并也不可暂存,走普通路径/* 新 flow:加入 gro_list,但桶内最多 MAX_GRO_SKBS=8 个 flow */if (unlikely(gro_list->count >= MAX_GRO_SKBS)) gro_flush_oldest(gro, &gro_list->list); // 桶满,挤掉最老的else gro_list->count++;gro_try_pull_from_frag0(skb);NAPI_GRO_CB(skb)->age = jiffies; // 记录入队时间,老化用NAPI_GRO_CB(skb)->last = skb; // last 指向自己(链尾)if (!skb_is_gso(skb)) skb_shinfo(skb)->gso_size = skb_gro_len(skb); // 记录 MSSlist_add(&skb->list, &gro_list->list); // 加入桶头部(最新)ret = GRO_HELD;
两个核心约束:
**MAX_GRO_SKBS = 8**:每个桶最多 8 个不同 flow 同时暂存。第 9 个 flow 进来时,会把最老的 flow 强制 gro_complete() 上交。这是为了防止 GRO 占用过多内存。
gso_size 记录 MSS:聚合时所有段必须同 MSS,tcp_gro_receive() 会校验 skb_shinfo(p)->gso_size 一致。这个 gso_size 后续会被 tcp_gro_complete() 写入 shinfo->gso_segs,告诉协议栈这是一个 GSO 包(虚拟分段)。
4. 协议层 GRO 实现
4.1 IP 层:inet_gro_receive()
位于 net/ipv4/af_inet.c:1464,核心逻辑:
struct sk_buff *inet_gro_receive(struct list_head *head, struct sk_buff *skb){ off = skb_gro_offset(skb); iph = skb_gro_header(skb, off + sizeof(*iph), off); // 拉取 IP 头if (!iph) goto out;/* 校验:版本/IHL 必须是 0x45(即 IPv4 无选项)*/if (*(u8 *)iph != 0x45) goto out;/* 分片包不聚合(ip_is_fragment)*/if (ip_is_fragment(iph)) goto out;/* IP 头校验和 */if (unlikely(ip_fast_csum((u8 *)iph, 5))) goto out; NAPI_GRO_CB(skb)->proto = iph->protocol;/* flush 判定:version/IHL/totlen/id 等字段一致性 * 关键:iph->id 的处理见 inet_gro_flush() */ flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (ntohl(*(__be32 *)&iph->id) & ~IP_DF));/* 与同流包比对:protocol/saddr/daddr 必须一致 */ list_for_each_entry(p, head, list) {if (!NAPI_GRO_CB(p)->same_flow) continue; iph2 = (struct iphdr *)(p->data + off);if ((iph->protocol ^ iph2->protocol) | (iph->saddr ^ iph2->saddr) | (iph->daddr ^ iph2->daddr)) { NAPI_GRO_CB(p)->same_flow = 0;continue; } } NAPI_GRO_CB(skb)->flush |= flush; skb_gro_pull(skb, sizeof(*iph)); // 推进 offset 越过 IP 头 skb_set_transport_header(skb, skb_gro_offset(skb));/* 进入 L4:tcp4_gro_receive 或 udp4_gro_receive */ pp = indirect_call_gro_receive(tcp4_gro_receive, udp4_gro_receive, ops->callbacks.gro_receive, head, skb);out: skb_gro_flush_final(skb, pp, flush);return pp;}
IP ID 处理的特殊性(include/net/gro.h:447):
staticinlineintinet_gro_flush(const struct iphdr *iph, const struct iphdr *iph2, struct sk_buff *p, bool outer){const u32 id = ntohl(*(__be32 *)&iph->id);const u32 id2 = ntohl(*(__be32 *)&iph2->id);const u16 ipid_offset = (id >> 16) - (id2 >> 16);const u16 count = NAPI_GRO_CB(p)->count;const u32 df = id & IP_DF;int flush;/* TTL / TOS / DF 必须一致 */ flush = (iph->ttl ^ iph2->ttl) | (iph->tos ^ iph2->tos) | (df ^ (id2 & IP_DF));if (flush | (outer && df))return flush;/* 第二包决定策略:DF=1 且 id 相同 → ip_fixedid 模式 * 否则按 ipid 自增校验 */if (count == 1 && df && !ipid_offset) NAPI_GRO_CB(p)->ip_fixedid = true;return ipid_offset ^ (count * !NAPI_GRO_CB(p)->ip_fixedid);}
为什么这么复杂? 不同 OS / 设备的 IP ID 行为不同:
- Linux 默认:IP ID 按字节流自增(每发一个包 +1)
- 部分 OS:DF 位置 1 时 IP ID 固定为 0
GRO 需要兼容所有情况,所以引入 ip_fixedid 标志:第二包来时若 DF=1 且 ID 相同,则后续都按 fixedid 处理,否则严格校验 ipid_offset == count。
4.2 TCP 层:tcp_gro_receive()
位于 net/ipv4/tcp_offload.c:312,TCP 聚合的校验最严格:
struct sk_buff *tcp_gro_receive(struct list_head *head, struct sk_buff *skb, struct tcphdr *th){unsignedint thlen = th->doff * 4;/* ... *//* 查找同流包(同 4 元组 + 同窗口)*/ p = tcp_gro_lookup(head, th);if (!p)goto out_check_final; // 新 flow th2 = tcp_hdr(p);/* (1) TCP 标志位:除 FIN/PSH 外必须一致 * 任何 CWR/URG/RST/SYN 都会触发 flush */ flush = (__force int)(flags & TCP_FLAG_CWR); flush |= (__force int)((flags ^ tcp_flag_word(th2)) & ~(TCP_FLAG_FIN | TCP_FLAG_PSH));/* (2) ACK 序号必须一致(同属一个 ACK 段)*/ flush |= (__force int)(th->ack_seq ^ th2->ack_seq);/* (3) TCP 选项必须完全一致(除 FIN/PSH)*/for (i = sizeof(*th); i < thlen; i += 4) flush |= *(u32 *)((u8 *)th + i) ^ *(u32 *)((u8 *)th2 + i);/* (4) 网络层一致性(IP ID/TTL/TOS)*/ flush |= gro_receive_network_flush(th, th2, p);/* (5) MSS 一致性:GSO 包校验 gso_size,普通包校验 len <= mss */ mss = skb_shinfo(p)->gso_size;if (unlikely(skb_is_gso(skb))) flush |= (mss != skb_shinfo(skb)->gso_size);else flush |= (len - 1) >= mss;/* (6) 序号连续性:th2->seq + skb_gro_len(p) == th->seq * 用 XOR 而非 !=,避免分支预测 */ flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq);/* (7) 解密状态一致(TLS offload)*/ flush |= skb_cmp_decrypted(p, skb);/* 若所有检查通过,调用 skb_gro_receive() 合并 */if (flush || skb_gro_receive(p, skb)) { mss = 1;goto out_check_final; }/* 合并 FIN/PSH 标志到头包 */ tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH);out_check_final:/* 末段小于 mss → 触发 flush(可能是最后一段)*/ flush = len < mss; flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH | TCP_FLAG_RST | TCP_FLAG_SYN | TCP_FLAG_FIN));if (p && (!NAPI_GRO_CB(skb)->same_flow || flush)) pp = p; // 返回 p,让 dev_gro_receive 触发 gro_complete NAPI_GRO_CB(skb)->flush |= (flush != 0);return pp;}
TCP GRO 的关键约束:
- 序号必须严格连续:
th2->seq + len(p) == th->seq,不允许乱序聚合(OFO 包不进 GRO) - ACK 序号必须相同:不同 ACK 号的段属于不同 ACK 周期,不能合并
- TCP 选项必须完全一致:TS 选项的时间戳值若不同会 flush
- 末段 < MSS 触发 flush:暗示发送方发完了(如 HTTP 响应最后一个不满 MSS 的包)
4.3 UDP 层:udp_gro_receive()
UDP GRO 比 TCP 更复杂,因为 UDP 无序号、无连接,聚合策略不同(net/ipv4/udp_offload.c:699):
static struct sk_buff *udp_gro_receive_segment(struct list_head *head, struct sk_buff *skb){structudphdr *uh = udp_gro_udphdr(skb);/* ... *//* UDP 校验和必须非零(与 GSO 对称)*/if (!uh->check) { NAPI_GRO_CB(skb)->flush = 1;returnNULL; }/* UDP 长度必须等于 GRO payload 长度(防 padding)*/ ulen = ntohs(uh->len);if (ulen <= sizeof(*uh) || ulen != skb_gro_len(skb)) { NAPI_GRO_CB(skb)->flush = 1;returnNULL; } skb_gro_pull(skb, sizeof(struct udphdr)); list_for_each_entry(p, head, list) {if (!NAPI_GRO_CB(p)->same_flow) continue; uh2 = udp_hdr(p);/* 端口(源+目的)必须一致 */if (*(u32 *)&uh->source != *(u32 *)&uh2->source) { NAPI_GRO_CB(p)->same_flow = 0;continue; }/* frag_list 模式 vs frag 模式不能混用 */if (NAPI_GRO_CB(skb)->is_flist != NAPI_GRO_CB(p)->is_flist) { NAPI_GRO_CB(skb)->flush = 1;return p; } flush = gro_receive_network_flush(uh, uh2, p);/* UDP 长度不一致或网络层不同 → 触发 flush */if (ulen > ntohs(uh2->len) || flush) { pp = p; } else {/* frag_list 模式:链表链接,保留每个 skb 头 */if (NAPI_GRO_CB(skb)->is_flist) { ret = skb_gro_receive_list(p, skb); } else {/* frag 模式:合并 frags */ skb_gro_postpull_rcsum(skb, uh, sizeof(struct udphdr)); ret = skb_gro_receive(p, skb); } } }/* ... */}
UDP GRO 的两种合并模式:
- frag 模式(默认):用
skb_gro_receive() 把数据合并到头包的 frags,单个大 skb。 - frag_list 模式:用
skb_gro_receive_list() 把 skb 链接到 shinfo->frag_list,保留每个 UDP 段的头部。适用于需要保留每段边界的场景(如一些 QUIC 实现)。
4.4 skb_gro_receive() —— 实际合并操作
这是 GRO 最核心的内存操作(net/core/gro.c:91),有三种路径:
intskb_gro_receive(struct sk_buff *p, struct sk_buff *skb){/* (0) 大小检查:不超过 gro_max_size */if (unlikely(p->len + len >= netif_get_gro_max_size(p->dev, p) || NAPI_GRO_CB(skb)->flush))return -E2BIG;/* IPv6 jumbo 边界:超过 65536 字节仅允许特定场景 */if (unlikely(p->len + len >= GRO_LEGACY_MAX_SIZE)) {if (NAPI_GRO_CB(skb)->proto != IPPROTO_TCP || (p->protocol == htons(ETH_P_IPV6) && skb_headroom(p) < sizeof(struct hop_jumbo_hdr)) || p->encapsulation)return -E2BIG; }/* (1) Page Pool 兼容性:pp_recycle 标志必须一致 */if (p->pp_recycle != skb->pp_recycle)return -ETOOMANYREFS;if (headlen <= offset) {/* 路径 A:skb 数据全在 frags 中 * 直接把 frags 拼接到 p 的 frags 数组 * 释放 skb 头部(NAPI_GRO_FREE) */ } elseif (skb->head_frag) {/* 路径 B:skb 线性区来自 page(build_skb 模式) * 把线性区作为新 frag 加入,释放 skb 控制结构 * 但保留 page(NAPI_GRO_FREE_STOLEN_HEAD) */ } else {/* 路径 C:fallback,把整个 skb 挂到 frag_list * 用 __skb_header_release(skb) 放弃头所有权 */merge: skb->destructor = NULL; skb->sk = NULL; __skb_pull(skb, offset);if (NAPI_GRO_CB(p)->last == p) skb_shinfo(p)->frag_list = skb;else NAPI_GRO_CB(p)->last->next = skb; NAPI_GRO_CB(p)->last = skb; __skb_header_release(skb); }/* 更新计数与长度 */ NAPI_GRO_CB(p)->count += segs; p->data_len += len; p->truesize += delta_truesize; p->len += len; NAPI_GRO_CB(skb)->same_flow = 1;return0;}
路径选择优先级:A > B > C。A/B 路径只搬运 frag 描述符(几十字节),不拷贝数据;C 路径用 frag_list 链接,几乎零成本但会保留多个 skb 头。
Page Pool 兼容性(p->pp_recycle != skb->pp_recycle):Page Pool 的页面引用计数语义与普通 page 不同(不通过 put_page 而是归还 pool),不能与普通 page 混用 frags。这是近年为支持 Page Pool 引入的检查。
5. GRO 的 flush 时机
GRO 包不能无限期暂存,需要适当时机 flush 上交协议栈。flush 分为两类:
5.1 主动 flush(gro_complete)
当协议层判定一个 flow 已经"足够"或"结束"时,立即完成聚合:
- TCP flags 含 FIN/RST/SYN/URG:连接状态变化
skb_gro_receive 返回 -E2BIG:达到 gro_max_sizeMAX_GRO_SKBS 限制:桶内 flow 数超 8,挤掉最老- 不同流出现:协议层返回
pp 强制 flush 旧 flow
5.2 被动 flush(NAPI 轮询结束)
napi_complete_done() 在 NAPI 轮询结束时调用 gro_flush_normal()(net/core/dev.c:6593):
// net/core/dev.c:6576 (napi_complete_done)if (work_done) {if (n->gro.bitmask) timeout = napi_get_gro_flush_timeout(n);/* ... */}/* 关键:无论是否 timeout,都会调用 gro_flush_normal */gro_flush_normal(&n->gro, !!timeout);
gro_flush_normal() 做两件事:
staticinlinevoidgro_flush_normal(struct gro_node *gro, bool flush_old){ gro_flush(gro, flush_old); // (1) flush 所有 held 包 gro_normal_list(gro); // (2) 上交 rx_list 中的 GRO_NORMAL 包}
5.3 gro_flush_timeout —— 延迟 flush
若设置了 gro_flush_timeout(默认 0,可通过 ethtool -C 或 sysfs 设置),NAPI 不会立即重新使能中断,而是启动定时器:
// net/core/dev.c:6577-6585if (work_done) {if (n->gro.bitmask) timeout = napi_get_gro_flush_timeout(n); n->defer_hard_irqs_count = napi_get_defer_hard_irqs(n);}if (n->defer_hard_irqs_count > 0) { n->defer_hard_irqs_count--; timeout = napi_get_gro_flush_timeout(n);if (timeout) ret = false; // 不调用 napi_complete,保持轮询状态}
作用:在流量稳定时,NAPI 保持轮询状态一段时间,让 GRO 有机会聚合更多包,减少中断次数。代价是增加延迟(默认 0 即关闭,开启后需谨慎选择 timeout 值)。
5.4 gro_normal_batch —— 批量上交
未命中 GRO 的包(GRO_NORMAL)不会立即上交协议栈,而是加入 rx_list,达到 gro_normal_batch(默认 64)才批量调用 netif_receive_skb_list_internal():
staticinlinevoidgro_normal_one(struct gro_node *gro, struct sk_buff *skb, int segs){ list_add_tail(&skb->list, &gro->rx_list); gro->rx_count += segs;if (gro->rx_count >= READ_ONCE(net_hotdata.gro_normal_batch)) gro_normal_list(gro); // 批量上交}
为什么这么做?netif_receive_skb_list_internal() 接受 skb 列表,可以批量处理(如批量调用协议 handler、批量 GRO 软中断调度),减少 per-skb 开销。这对非 GRO 流量(如小包 UDP)尤其重要。
6. gro_complete() —— 聚合完成
当一个 flow 被 flush 时,gro_complete() 被调用(net/core/gro.c:253):
staticvoidgro_complete(struct gro_node *gro, struct sk_buff *skb){structpacket_offload *ptype; __be16 type = skb->protocol;int err = -ENOENT;/* 单段包:无需 gro_complete,直接上交 */if (NAPI_GRO_CB(skb)->count == 1) { skb_shinfo(skb)->gso_size = 0;goto out; }/* 反向调用协议层 gro_complete 回调,自顶向下 */ list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_complete)continue; err = INDIRECT_CALL_INET(ptype->callbacks.gro_complete, ipv6_gro_complete, inet_gro_complete, skb, 0);break; }if (err) { WARN_ON(&ptype->list == head); kfree_skb(skb);return; }out: gro_normal_one(gro, skb, NAPI_GRO_CB(skb)->count);}
6.1 inet_gro_complete() —— 修复 IP 头
聚合后 IP 头需要修复(net/ipv4/af_inet.c:1590):
intinet_gro_complete(struct sk_buff *skb, int nhoff){structiphdr *iph = (structiphdr *)(skb->data + nhoff); __be16 totlen = iph->tot_len;/* 修改 tot_len 为聚合后的总长度 */ iph_set_totlen(iph, skb->len - nhoff);/* 增量更新校验和(只改了 tot_len 字段)*/ csum_replace2(&iph->check, totlen, iph->tot_len);/* 调用 L4 gro_complete(tcp4_gro_complete / udp4_gro_complete)*/ err = INDIRECT_CALL_2(ops->callbacks.gro_complete, tcp4_gro_complete, udp4_gro_complete, skb, nhoff + sizeof(*iph));return err;}
6.2 tcp_gro_complete() —— 设置 GSO 元数据
voidtcp_gro_complete(struct sk_buff *skb){structtcphdr *th = tcp_hdr(skb);structskb_shared_info *shinfo; skb->csum_start = (unsignedchar *)th - skb->head; skb->csum_offset = offsetof(struct tcphdr, check); skb->ip_summed = CHECKSUM_PARTIAL; // 标记需硬件计算校验和 shinfo = skb_shinfo(skb); shinfo->gso_segs = NAPI_GRO_CB(skb)->count; // 段数/* gso_size 在 inet_gro_receive 时已设置 *//* shinfo->gso_type 已包含 SKB_GSO_TCPV4 */}
关键:聚合后的 skb 被标记为 CHECKSUM_PARTIAL + gso_size != 0,对协议栈而言它就是一个 GSO 包。后续路径(路由、netfilter、driver)会按 GSO 处理:发送时若网卡支持 TSO 则直接发,否则软件分段。这意味着 GRO 包在内核中始终以"虚拟大包"形式存在,直到发送或交付用户态才还原。
7. 驱动实现 GRO 的注意事项
这是本文的重点。基于源码分析,总结驱动开发者接入 GRO 时的关键注意点。
7.1 启用 GRO 特性
驱动注册时必须声明 NETIF_F_GRO:
// 典型驱动注册代码dev->features |= NETIF_F_GRO | NETIF_F_GRO_HW;dev->hw_features |= NETIF_F_GRO | NETIF_F_GRO_HW;dev->wanted_features |= NETIF_F_GRO;
netif_elide_gro() 检查 NETIF_F_GRO 标志,未启用则直接走 GRO_NORMAL:
staticinlineboolnetif_elide_gro(const struct net_device *dev){if (!(dev->features & NETIF_F_GRO) || dev->xdp_prog)returntrue;returnfalse;}
注意:挂载 XDP 程序时会自动禁用 GRO(dev->xdp_prog 非 NULL)。这是设计上的取舍——XDP 已经在更早路径处理了包,GRO 聚合反而会破坏 XDP 的 per-packet 语义。
7.2 正确设置 skb 元数据(关键!)
调用 napi_gro_receive() 前,驱动必须正确填写以下字段,否则 GRO 会因 same_flow 判定失败而退化:
| | |
|---|
skb->protocol | eth_type_trans(skb, dev) | |
skb->dev | | gro_list_prepare |
skb->vlan_all | __vlan_hwaccel_put_tag() | |
skb->csum | | |
skb->hash | | flow 分桶的关键 |
**特别强调 skb->hash**:dev_gro_receive() 第一行就是 bucket = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1),gro_list_prepare() 也优先用 hash 粗筛。如果驱动不设置 hash,所有包会落到同一个桶(hash=0),且每次都要全表扫描 same_flow,GRO 性能急剧下降。
// 正确做法(igb 示例)if (igb_test_staterr(rx_desc, E1000_RXDADV_RSSTYPE_MASK)) igb_rx_hash(rx_ring, rx_desc, skb); // 从描述符提取 RSS hash
7.3 校验和的正确设置
GRO 校验和处理比普通路径复杂,因为要支持部分校验和(硬件 offload):
// dev_gro_receive 中的初始化switch (skb->ip_summed) {case CHECKSUM_COMPLETE: NAPI_GRO_CB(skb)->csum = skb->csum; NAPI_GRO_CB(skb)->csum_valid = 1;break;case CHECKSUM_UNNECESSARY: NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1; // 嵌套层数break;}
驱动必须正确报告:
- **
CHECKSUM_COMPLETE**:硬件已计算整个包的校验和(含伪首部),存于 skb->csum。GRO 会用这个值增量更新。 - **
CHECKSUM_UNNECESSARY**:硬件验证了某层校验和。csum_level 表示验证了几层(如隧道内外层)。 - **
CHECKSUM_NONE**:硬件无校验和 offload。GRO 会触发软件校验,性能下降。
常见错误:驱动把 CHECKSUM_UNNECESSARY 错误地用于部分校验场景,导致 GRO 聚合后校验和错误,协议栈丢弃整个聚合包。
7.4 napi_gro_frags() 的使用约束
使用 frags 模式时,驱动需保证:
DMA page 在低端内存:gro_list_prepare 中 !PageHighMem(skb_frag_page(frag0)) 是 frag0 快速路径的前提。高端内存 page 会触发 kmap_atomic 慢路径,性能急剧下降。
napi_get_frags() 分配的 skb head 足够大:GRO_MAX_HEAD = MAX_HEADER + 128,内核已保证。但驱动不能自己分配 skb 替代,必须用 napi_get_frags(),因为它内部会把 skb 缓存到 napi->skb 复用(napi_reuse_skb)。
多段包的 frag 链接:当单个 skb 的 MAX_SKB_FRAGS(通常 16)不够时,用 frag_list 链接多个 skb。gve 驱动的做法:
if (num_frags == MAX_SKB_FRAGS) { skb = napi_alloc_skb(napi, 0); skb_shinfo(ctx->skb_tail)->frag_list = skb; ctx->skb_tail = skb;}
napi_reuse_skb() 的限制:GRO 合并后会调用 napi_reuse_skb() 回收 skb。但 pfmemalloc 包不能复用(net/core/gro.c:640),驱动需注意内存压力场景。
7.5 gro_max_size 与 MTU
GRO 聚合后的包大小受 gro_max_size 限制(include/linux/netdevice.h:5341):
staticinlineunsignedintnetif_get_gro_max_size(const struct net_device *dev, const struct sk_buff *skb){return skb->protocol == htons(ETH_P_IPV6) ? READ_ONCE(dev->gro_max_size) : // IPv6 默认 65536 READ_ONCE(dev->gro_ipv4_max_size); // IPv4 默认 65536,可调到更大}
调优:
# 默认 65536,对于 IPv4 可调大(需网卡支持)ip link set eth0 gro_ipv4_max_size 185000# 现代网卡(如 mlx5)支持 200K+ GRO 包
驱动注意:若网卡 TSO 支持的超大分段(如 256K),应同步设置 gro_max_size,否则 GRO 聚合到 64K 就 flush,无法发挥 TSO 优势。
7.6 与 XDP 的兼容性
staticinlineboolnetif_elide_gro(const struct net_device *dev){if (!(dev->features & NETIF_F_GRO) || dev->xdp_prog)returntrue;returnfalse;}
挂载 XDP 程序后 GRO 自动禁用。驱动无需特殊处理,但需注意:
- XDP 程序返回
XDP_PASS 的包会走 GRO_NORMAL 路径(gro_normal_one),不再聚合。 - 若驱动同时支持 XDP 和 GRO,需保证 XDP 处理在
napi_gro_receive() 之前。
7.7 NAPI poll 的预算控制
GRO 在 NAPI 轮询上下文执行,gro_complete() 会调用 gro_normal_one() 上交包。若 NAPI 预算(budget,默认 64)耗尽但 GRO 还有 held 包,会导致延迟。
驱动最佳实践:
staticintmy_napi_poll(struct napi_struct *napi, int budget){int work_done = 0;while (work_done < budget) {/* 收包 */ skb = my_rx(skb); napi_gro_receive(napi, skb); work_done++; }/* 若收完包,调用 napi_complete_done 触发 GRO flush */if (work_done < budget) { napi_complete_done(napi, work_done); // 内部调用 gro_flush_normal }return work_done;}
关键:必须在 work_done < budget 时才调用 napi_complete_done(),否则 NAPI 永远不会 flush GRO,导致包堆积。
7.8 gro_flush_timeout 的考量
驱动可设置默认 gro_flush_timeout:
// net/core/dev.c:11770netdev_set_gro_flush_timeout(dev, 20000); // 20us,某些驱动默认值
权衡:
- 大 timeout:聚合率高,吞吐好,但延迟增加(包在 GRO 层滞留)
- 小 timeout / 0:延迟低,但聚合率下降,吞吐受损
对延迟敏感场景(如 HFT、游戏):应设为 0 或不设置。对吞吐敏感场景(如存储、视频流):20-40us 是合理值。
7.9 软件慢路径的 fallback
gro_complete() 中若协议层 gro_complete 回调失败,会 kfree_skb(skb):
if (err) { WARN_ON(&ptype->list == head); kfree_skb(skb);return;}
驱动无需处理,但需知道:GRO 聚合后的包可能因协议层错误被丢弃。这通常意味着驱动设置了错误的协议字段(如错误的 skb->protocol),导致找不到对应的 gro_complete handler。
7.10 Page Pool 与 GRO 的兼容
现代驱动普遍使用 Page Pool(struct page_pool)管理 RX buffer。GRO 与 Page Pool 的交互需注意:
// skb_gro_receive 中的检查if (p->pp_recycle != skb->pp_recycle)return -ETOOMANYREFS;
pp_recycle 标志必须一致:Page Pool 页面通过 page_pool_put_page() 归还,而非 put_page()。若把 Page Pool 页面混入普通 skb 的 frags,会导致引用计数错乱,内存泄漏或 use-after-free。
驱动最佳实践:
- 若 RX 路径全用 Page Pool,确保所有 skb 的
pp_recycle 都设置(skb_mark_for_recycler()) - 不要在同一 NAPI 的 RX 路径混用 Page Pool 和普通 alloc_page 分配的 buffer
8. GRO 调试与可观测性
8.1 查看 GRO 状态
# 网卡 GRO 是否启用ethtool -k eth0 | grep gro# generic-receive-offload: on# rx-gro-hw: off [fixed]# GRO 最大尺寸ip link show eth0# gro_max_size 65536 gro_ipv4_max_size 65536# gro_flush_timeoutcat /sys/class/net/eth0/gro_flush_timeout
8.2 GRO 聚合效果统计
# /proc/net/netstat 中的 GRO 相关nstat -az | grep -i -E 'TcpInSegs|TcpOutSegs'# 配合网卡 RX packets 数对比:# 若 RX packets >> TcpInSegs,说明 GRO 聚合率高# 若 RX packets ≈ TcpInSegs,说明 GRO 未生效# 软中断统计cat /proc/net/softnet_stat# 第一列 processed,第二列 dropped
8.3 ftrace 跟踪 GRO
# 跟踪 napi_gro_receive 入口echo 1 > /sys/kernel/debug/tracing/events/napi/napi_gro_receive_entry/enableecho 1 > /sys/kernel/debug/tracing/events/napi/napi_gro_receive_exit/enablecat /sys/kernel/debug/tracing/trace_pipe# 跟踪 skb_gro_receive(实际合并操作)echo 1 > /sys/kernel/debug/tracing/events/net/netif_receive_skb/enable
8.4 perf 采样 GRO 热点
# 采样 GRO 相关函数perf record -g -a -e cycles:pp -- napi_gro_receive,dev_gro_receive,skb_gro_receiveperf report# 常见热点:# - skb_gro_receive: 内存操作密集# - gro_list_prepare: same_flow 比较# - inet_gro_receive: IP 头校验# - tcp_gro_receive: TCP 标志/序号校验
9. 常见问题与排查
9.1 GRO 不生效,吞吐上不去
排查步骤:
确认 GRO 已启用:
ethtool -k eth0 | grep gro# 必须为 on
确认无 XDP 程序:
ip link show dev eth0# 若有 xdpgen/id 字段,XDP 已挂载,GRO 自动禁用
确认驱动设置了 skb->hash:
# 通过 perf 采样 skb_get_hash_raw 调用频率perf record -g -a -e cycles:pp -- skb_get_hash# 若驱动未设置 hash,会走软件 hash 计算慢路径
- TCP/UDP over IPv4/IPv6:支持
- ICMP、SCTP、其他协议:不支持(无 offload handler)
- 分片包:不支持(
ip_is_fragment 检查)
9.2 GRO 导致丢包
典型场景:聚合包过大,超过 socket 接收缓冲区或 qdisc 队列。
排查:
# 查看是否有 GRO 相关的 drop reasoncat /sys/kernel/debug/tracing/events/skb/kfree_skb/enable# 关注 reason=PKT_TOO_BIG 或 reason=SOCKET_RCVBUFF
解决:调整 gro_max_size 或 socket buffer。
9.3 GRO 导致延迟抖动
典型场景:gro_flush_timeout 设置过大,包在 GRO 层滞留。
排查:
cat /sys/class/net/eth0/gro_flush_timeout# 若 > 0,可能是延迟来源
解决:对延迟敏感场景设为 0,或减小到 10us 以内。
9.4 GRO 与 conntrack 冲突
聚合后的 GSO 包经过 netfilter 时,conntrack 只看到 1 个"大包",但实际是 N 个 TCP 段。若规则基于 per-segment 语义(如限速),结果会偏差。
解决:在 forward 链对 GRO 包做特殊处理,或用 nft 的 tcp option 匹配。
10. 总结
GRO 是 Linux 网络栈的性能基石之一,理解其工作原理对驱动开发和性能调优都至关重要。
10.1 GRO 的核心设计哲学
分层协议回调:L2/L3/L4 各层独立注册 gro_receive / gro_complete,通过 INDIRECT_CALL 优化分发,兼顾通用性与性能。
fast path / slow path 分离:frag0 快速路径、slow_gro 旁路、struct_group(zeroed) 一次性清零,处处体现"common case fast, rare case correct"的设计原则。
严格协议语义:GRO 不是无脑合并,TCP 序号连续性、ACK 一致性、IP ID 一致性等校验确保聚合后的包在协议层语义正确,可安全用于转发路径(LRO 不行)。
延迟与吞吐的权衡:gro_flush_timeout、MAX_GRO_SKBS=8、gro_normal_batch=64 等参数让用户在延迟和吞吐之间精细调节。
10.2 驱动开发者的核心要点
| |
|---|
| |
| RSS hash 是 GRO 分桶的关键,缺失导致性能崩塌 |
| CHECKSUM_COMPLETE / CHECKSUM_UNNECESSARY 必须准确 |
napi_gro_frags | |
| pp_recycle |
NAPI poll 配合 napi_complete_done | |
gro_max_size | |
| |
10.3 调优建议
| |
|---|
| gro_max_size 调到 185K,gro_flush_timeout=20us |
| gro_flush_timeout=0 |
| 确认 UDP GRO 已启用,gro_normal_batch=64 |
| 确认 virtio/netdevice 透传 NETIF_F_GRO |
| |
附录:关键源码索引
| | |
|---|
| net/core/gro.c | dev_gro_receive(), gro_receive_skb(), skb_gro_receive(), gro_complete(), __gro_flush() |
| include/net/gro.h | struct napi_gro_cb, gro_list_prepare 等 inline |
| include/linux/netdevice.h | struct gro_node, napi_gro_receive(), napi_gro_frags() |
| net/core/dev.c:6576 | napi_complete_done() |
| net/ipv4/af_inet.c:1464 | inet_gro_receive() |
| net/ipv6/ip6_offload.c | ipv6_gro_receive() |
| net/ipv4/tcp_offload.c:312 | tcp_gro_receive() |
| net/ipv4/udp_offload.c:699 | udp_gro_receive_segment() |
| include/net/gro_cells.h | gro_cells_receive() |
| drivers/net/ethernet/intel/igb/igb_main.c:9138 | napi_gro_receive() |
| drivers/net/ethernet/google/gve/gve_rx.c:460 | napi_get_frags() |