在之前我们是通过原子操作 AtomicUsize实现了 Round-Robin 负载均衡策略。但这仅仅解决的是“流量分发”的问题,还并未处理“容错”的问题。在目前的程序中,如果流量通过负载均衡刚好打在A节点上,但如果A节点刚好宕机,网关将直接返回 502错误,导致服务可用性下降。
为了提升网关的健壮性,我们需要在代码层面引入两级容错机制:
tower重试中间件,针对偶发的网络抖动或服务端繁忙(5xx)进行重试。第一步先定义一个结构体并实现 tower::retry::PolicyTrait,实现 retry(“何时重试”)和 clone_request(“如何克隆请求”)的逻辑。
tower::retry中间件依赖于自定义Policy
重试策略逻辑:
result为 Err(如连接超时、DNS错误) -> 重试result为 Ok但状态码是 5xx (服务端错误) -> 重试代码实现如下:
use tower::retry::Policy;
#[derive(Clone)]
structProxyRetryPolicy {
max_attempts: usize, // 最大重试次数
}
impl ProxyRetryPolicy {
fnnew(max_attempts: usize) -> Self {
Self { max_attempts }
}
}
impl<B, E> Policy<Request<B>, Response<Incoming>, E> for ProxyRetryPolicy
where
B: Clone,
E: Into<BoxError>,
{
typeFuture = std::future::Ready<()>;
fnretry(&mutself, _req: &mut Request<B>, result: &mutResult<Response<Incoming>, E>) -> Option<Self::Future> {
ifself.max_attempts == 0 {
returnNone;
}
let should_retry = match result {
Ok(res) => res.status().is_server_error(),
Err(_) => true,
};
if should_retry {
self.max_attempts -= 1;
Some(std::future::ready(()))
} else {
None
}
}
fnclone_request(&mutself, req: &Request<B>) -> Option<Request<B>> {
letmut builder = Request::builder()
.method(req.method())
.uri(req.uri())
.version(req.version());
for (k, v) in req.headers() {
builder = builder.header(k, v);
}
builder.body(req.body().clone()).ok()
}
}
retry方法决定是否重试
res.status().is_server_error(): 检查状态码是否在 500-599 之间,如果是则是true
future::ready(()): 进行重试
clone_request方法是定义克隆请求逻辑,Body必须得支持Clone
实现故障转移的难点是:同一个 HTTP 请求可能需要被发送多次(发给节点 A 失败后,再发给节点 B)。
因为 Hyper的默认 Body类型 Incoming是一个流,它是“一次性”的,读取后即被消耗,无法克隆。直接重试会触发编译错误 E0382: use of moved value。
解决方案:
req.into_parts()将请求分离为元数据(Parts)和数据流(Body)。Full<Bytes>。该类型实现了 Clone,支持多次发送。修改 proxy_handler的前置处理逻辑:
// ...
// 路径重写
// if route.strip_prefix {...}
// 上面代码逻辑和之前一样
let (parts, body) = req.into_parts();
let body_bytes = body.collect().await?.to_bytes();
let body_full = Full::new(body_bytes);
req.into_parts(): 拆解Request,将req的所有权转移,分为parts和body缓冲
Body,将流转换为Bytes,并封装为Full<Bytes>注意⚠️:此时原始的
req变量已不可用,在后续构建请求必须使用parts和body_full来构建
随后在 proxy_handler中循环遍历上游节点列表。只有当 RetryLayer在当前节点的所有尝试均失败后,才会进入下一次循环,即切换节点。
let max_failover_attempts = upstream_config.urls.len();
for attempt in0..max_failover_attempts {
let current_count = upstream_config.counter.fetch_add(1, Ordering::Relaxed);
let index = current_count % upstream_config.urls.len();
let upstream_url_str = &upstream_config.urls[index];
let upstream_base = match upstream_url_str.parse::<Uri>() {
Ok(u) => u,
Err(e) => {
error!("Invalid upstream URL: {}", e);
continue;
}
};
info!(
"Attempt {}/{}: Routing to {}",
attempt + 1,
max_failover_attempts,
upstream_url_str
);
let base_str = upstream_base.to_string();
let base_trimmed = base_str.trim_end_matches('/');
let uri_string = format!("{}{}", base_trimmed, final_path); // URL 拼接
let new_uri = match uri_string.parse::<Uri>() {
Ok(u) => u,
Err(e) => {
error!("Invalid constructed URI: {}", e);
continue;
}
};
letmut builder = Request::builder()
.method(parts.method.clone())
.uri(new_uri)
.version(parts.version);
for (k, v) in &parts.headers {
builder = builder.header(k, v);
}
// 构造请求体
let retry_req = builder.body(body_full.clone())?;
let retry_service = ServiceBuilder::new()
.layer(tower::retry::RetryLayer::new(ProxyRetryPolicy::new(3)))
.service(service_fn(|req: Request<Full<Bytes>>| {
let client = client.clone();
asyncmove {
let (p, b) = req.into_parts();
let boxed = b.map_err(|e| match e {}).boxed();
client.request(Request::from_parts(p, boxed)).await
}
}));
match retry_service.oneshot(retry_req).await {
Ok(res) => {
// 请求成功,直接返回结果,结束循环
let res_boxed = res.map(|b| b.map_err(|e| Box::new(e) as ProxyError).boxed());
returnOk(res_boxed);
}
Err(err) => {
// 当前节点彻底失败(重试耗尽),记录日志,continue 进入下一次循环
warn!("Upstream {} failed: {:?}. Trying next node...", upstream_url_str, err);
last_error = Some(Box::new(err));
}
}
}
// 循环结束仍未返回,说明所有节点均不可用
error!(
"All upstreams failed for route '{}'. Last error: {:?}",
route.path, last_error
);
record_metrics(&method_str, "502", upstream_name, start);
let body = Empty::<Bytes>::new().map_err(|e| match e {}).boxed();
letmut resp = Response::new(body);
*resp.status_mut() = StatusCode::BAD_GATEWAY;
Ok(resp)
max_failover_attempts: 尝试次数 = 节点数量,确保每个节点都有机会被调用
upstream_config.counter.fetch_add...依然是使用Round-Robin策略进行原子性选址,随后是解析 URL 和 拼接 Path使用拆解出的
parts和可克隆的body_full来构造新请求
retry_service...: 构建重试策略的tower服务层逻辑
ProxyRetryPolicy::new(3): 单节点内部重试 3 次
client.request(Request::from_parts(p, boxed))...: 将Full<Bytes>转回Client需要的BoxBody并发送请求
retry_service.oneshot: 执行请求与错误处理逻辑
[upstreams]
[upstreams.httpbin_cluster]
urls = [
"https://httpbin.org",
"https://postman-echo1.com"
]
故意把
https://postman-echo.com写错https://postman-echo1.com
curl -v -k https://127.0.0.1:8443/api/v1/get
程序日志输出:
2026-01-06T15:48:51.832962Z INFO hyper_proxy_tool: Serving connection using HTTP/2
2026-01-06T15:48:51.834642Z INFO proxy_handler{remote_addr=127.0.0.1:64580 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Attempt 1/2: Routing to https://httpbin.org
正确,匹配到第一个节点
https://httpbin.org成功响应
curl -v -k https://127.0.0.1:8443/api/v1/get
程序日志输出:
2026-01-06T15:48:58.509269Z INFO hyper_proxy_tool: Serving connection using HTTP/2
2026-01-06T15:48:58.509990Z INFO proxy_handler{remote_addr=127.0.0.1:64610 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Attempt 1/2: Routing to https://postman-echo1.com
2026-01-06T15:48:58.547419Z WARN proxy_handler{remote_addr=127.0.0.1:64610 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Upstream https://postman-echo1.com failed after retries: hyper_util::client::legacy::Error(Connect, ConnectError("dns error", Custom { kind: Uncategorized, error: "failed to lookup address information: nodename nor servname provided, or not known" })). Trying next node...
2026-01-06T15:48:58.547560Z INFO proxy_handler{remote_addr=127.0.0.1:64610 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Attempt 2/2: Routing to https://httpbin.org
被路由到
https://postman-echo1.com,错误的上游节点,然后打印Trying next node...,最后路由到下一个节点https://httpbin.org,正确。
我们利用 tower 强大的中间件能力实现了重试与故障转移机制:
into_parts移动原始 req的所有权来避免 borrow of moved value的错误,确保在多次循环中能够重复构建合法的 HTTP 请求。现在,我们的 hyper-proxy-tool能够在上游服务部分节点宕机的情况下,自动将流量无缝迁移至健康节点,保障服务的连续性。
Happy Coding with Rust🦀!