在此前中,为了支持请求重试与故障转移,我们采用了全量缓冲策略,即把 Request Body完整读入内存。这种设计虽然保证了高可用性,但在处理大文件上传或高并发大流量场景下,存在严重的内存溢出风险。今天我们将根据请求特征动态选择“内存缓冲”或“零拷贝流式传输”,在系统稳定性与资源效率之间取得平衡。
技术债务
在分布式系统中,重试的前提是请求必须具备可重放性。
在 Hyper 1.0中,默认的 IncomingBody 是一个数据流。流具有“一次性消费”的特征,一旦被读取发送给上游节点 A,数据就从网关内存中消失了。若节点 A 返回失败,网关无法“倒带”数据流发给节点 B。
为了解决这个问题,我们在之前引入了缓冲机制:
let body_bytes = req.collect().await?.to_bytes();
let body_full = Full::new(body_bytes); //
上述代码的写法是无差别全量缓冲
body_full: 可克隆,可重试
这种设计的技术债务在于我们假设所有请求体都很小,但是网络请求会分为以下两种场景。
- 场景一:
API JSON调用(< 10KB)。缓冲开销忽略不计,重试收益极高。 - 场景二:文件上传 1GB 的大小。网关需要申请 1GB 堆内存。若并发 10 个此类请求,服务将因 OOM崩溃。
架构修改:有条件的重试
为了消除 OOM 隐患,我们需要有条件的重试。核心策略是根据请求特征将流量分叉:
- 触发条件:明确的
Content-Length且小于阈值(如 64KB);或是无 Body 的请求(GET/HEAD)。 - 特性:支持
Tower层的重试和集群节点之间的故障转移。
- 触发条件:
Content-Length超过阈值;或是 Transfer-Encoding: chunked(未知长度)。 - 特性:放弃重试。一旦连接建立并开始传输,若中途失败,网关直接向客户端返回错误,由客户端负责断点续传或重试。
代码实现
请求特征判别
定义缓冲阈值,并引入 HTTP Method以判断请求方式。
标准的 GET/DELETE请求通常不带 Content-Length头,但它们属于“小请求”,此类请求也是是要被缓冲的。
在 proxy_handler中判断,MAX_BUFFER_SIZE定义在 main之外。
const MAX_BUFFER_SIZE: u64 = 64 * 1024; // 64KB 阈值
let content_length = req.headers().get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok());
let should_buffer = ifletSome(len) = content_length {
len <= MAX_BUFFER_SIZE
} else {
match *req.method() {
Method::GET | Method::HEAD | Method::OPTIONS | Method::DELETE => true,
_ => false,
}
};
场景 A: 有明确长度 (如 POST JSON),检查是否小于阈值
场景 B: 没有 Content-Length 的请求,比如 GET/HEAD/OPTIONS/DELETE,通常无 Body,视为小请求,需要缓冲
场景 C: 走到 else分支的 _ => false这个分叉说明是 POST/PUT且无 Length,通常是 Chunked大文件传输,需要流式处理
双路径分发
根据 should_buffer的结果,代码逻辑进入两个完全独立的分支。
分支 A:缓冲模式
此分支的逻辑是高可用模式,将复用之前的故障转移循环逻辑。关键代码是 body.collect()。
if should_buffer {
let body_bytes = body.collect().await?.to_bytes();
let body_full = Full::new(body_bytes);
let max_failover_attempts = active_urls.len();
letmut last_error: Option<ProxyError> = None;
for attempt in0..max_failover_attempts {
let current_count = upstream_state.counter.fetch_add(1, Ordering::Relaxed);
let index = current_count % active_urls.len();
let upstream_url_str = &active_urls[index];
let upstream_base = match upstream_url_str.parse::<Uri>() {
Ok(u) => u,
Err(_) => continue,
};
let base_str = upstream_base.to_string();
let base_trimmed = base_str.trim_end_matches('/');
let uri_string = format!("{}{}", base_trimmed, final_path);
let new_uri = match uri_string.parse::<Uri>() {
Ok(u) => u,
Err(_) => continue,
};
info!(
"Attempt {}/{}: Routing to {} (Buffered)",
attempt + 1,
max_failover_attempts,
upstream_url_str
);
letmut builder = Request::builder()
.method(parts.method.clone())
.uri(new_uri)
.version(parts.version);
for (k, v) in &parts.headers {
builder = builder.header(k, v);
}
ifletSome(host) = upstream_base.host() {
builder = builder.header("Host", host);
}
builder = builder.header("x-forwarded-for", remote_addr.ip().to_string());
let retry_req = match builder.body(body_full.clone()) {
Ok(r) => r,
Err(_) => continue,
};
// 使用 Tower Retry
let retry_service = ServiceBuilder::new()
.layer(tower::retry::RetryLayer::new(ProxyRetryPolicy::new(3)))
.service(service_fn(|req: Request<Full<Bytes>>| {
let client = client.clone();
asyncmove {
let (parts, body) = req.into_parts();
let boxed_body = body.map_err(|e| match e {}).boxed();
client.request(Request::from_parts(parts, boxed_body)).await
}
}));
match retry_service.oneshot(retry_req).await {
Ok(res) => {
let status = res.status().as_u16().to_string();
record_metrics(&method_str, &status, upstream_name, start);
returnOk(res.map(|b| b.map_err(|e| Box::new(e) as ProxyError).boxed()));
}
Err(err) => {
warn!(
"Upstream {} failed: {:?}. Switching node...",
upstream_url_str, err
);
last_error = Some(Box::new(err));
}
}
}
// 循环结束仍失败
error!("All upstreams failed. Last: {:?}", last_error);
record_metrics(&method_str, "502", upstream_name, start);
returnOk(Response::builder()
.status(502)
.body(Empty::new().map_err(|e| match e {}).boxed())
.unwrap());
}
重要:if should_buffer: 和上一版的重试和故障转移逻辑一致
body.collect().await?.to_bytes(): 内存缓冲,消耗 Stream,生成 Bytes
builder.body(body_full.clone()): 构建可克隆请求,此时 body_full实现了 Clone,可以多次用于构造 Request
retry_service.oneshot...: 调用带 RetryLayer的 Service
分支 B:流式模式
此分支的逻辑是高性能模式,追求极致的低内存占用与低延迟。直接将 IncomingBody 封装后发送,不经过重试和故障转移层。
else {
let current_count = upstream_state.counter.fetch_add(1, Ordering::Relaxed);
let index = current_count % active_urls.len();
let upstream_url_str = &active_urls[index];
let upstream_base = match upstream_url_str.parse::<Uri>() {
Ok(u) => u,
Err(_) => {
returnOk(Response::builder()
.status(502)
.body(Empty::new().map_err(|e| match e {}).boxed())
.unwrap());
}
};
let base_str = upstream_base.to_string();
let base_trimmed = base_str.trim_end_matches('/');
let uri_string = format!("{}{}", base_trimmed, final_path);
let new_uri = match uri_string.parse::<Uri>() {
Ok(u) => u,
Err(_) => {
returnOk(Response::builder()
.status(502)
.body(Empty::new().map_err(|e| match e {}).boxed())
.unwrap());
}
};
info!("Streaming Large Request to {} (No Retry)", upstream_url_str);
letmut builder = Request::builder()
.method(parts.method)
.uri(new_uri)
.version(parts.version);
for (k, v) in &parts.headers {
builder = builder.header(k, v);
}
ifletSome(host) = upstream_base.host() {
builder = builder.header("Host", host);
}
builder = builder.header("x-forwarded-for", remote_addr.ip().to_string());
let streaming_req = match builder.body(body.map_err(|e| Box::new(e) as ProxyError).boxed())
{
Ok(r) => r,
Err(_) => {
returnOk(Response::builder()
.status(500)
.body(Empty::new().map_err(|e| match e {}).boxed())
.unwrap());
}
};
match client.request(streaming_req).await {
Ok(res) => {
let status = res.status().as_u16().to_string();
record_metrics(&method_str, &status, upstream_name, start);
Ok(res.map(|b| b.map_err(|e| Box::new(e) as ProxyError).boxed()))
}
Err(err) => {
error!("Streaming request failed: {:?}", err);
record_metrics(&method_str, "502", upstream_name, start);
Ok(Response::builder()
.status(502)
.body(Empty::new().map_err(|e| match e {}).boxed())
.unwrap())
}
}
}
upstream_state.counter.fetch_add...: 选址只选一次,不支持故障转移,因为 body流不能倒带
equest::builder().method(parts.method): 直接 move parts,不需要 clone
match builder.body: 直接使用原始 body流,通过 map_err适配错误类型
client.request(streaming_req): 直接发送,不经过 RetryLayer
Response::builder().status(502): 流式传输中途失败,只能返回 502,无法挽回
验证测试
普通 API 请求(缓冲模式)
curl -v -k https://127.0.0.1:8443/api/v1/get
日志输出如下:
2026-01-15T14:09:17.743211Z INFO hyper_proxy_tool: Serving connection using HTTP/2
2026-01-15T14:09:17.743797Z INFO proxy_handler{remote_addr=127.0.0.1:54646 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Attempt 1/1: Routing to https://httpbin.org (Buffered)
若首个节点超时,网关会自动切换至备用节点,调用者是无感知的。
大文件上传(流式模式)
构造一个超过 100KB 的文件 yes "A" | head -c 102400 > large.txt
curl -v -k -X POST -H "Content-Type: text/plain" --data-binary @large.txt https://127.0.0.1:8443/api/v1/post
日志输出如下:
2026-01-15T14:18:44.114528Z INFO hyper_proxy_tool: Serving connection using HTTP/2
2026-01-15T14:18:44.115373Z INFO proxy_handler{remote_addr=127.0.0.1:55188 method=POST uri=https://127.0.0.1:8443/api/v1/post}: hyper_proxy_tool: Streaming Large Request to https://httpbin.org (No Retry)
网关进程的内存占用保持平稳,并不会随文件大小进行线性增长。
总结
网关程序针对两种不同形式的请求,我们将进行自适应分发处理:
- 控制流:如
REST API,提供最大程度的可靠性保障,进行重试 + 故障转移。 - 数据流:如文件传输,提供最大程度的资源效率保障,进行零拷贝 + 流式透传。
不要在网关层缓冲大文件,将重试的责任上移至客户端(断点续传),或下移至 TCP 协议层(重传)。
Happy Coding with Rust🦀!
往期文章