在此前中,我们已经构建了一个性能强悍(Hyper)、安全(Rustls)、高可用(Retry & Failover)、可观测的网关程序。但它目前还存在着修改配置还需要重启等的问题,这使它不能作为一个 7x24小时长时间运行的基础设施。今天我们将通过 Rust 的并发原语(ArcSwap)和异步控制工具(CancellationToken)实现网关必须具备的“热重载”功能。
架构设计
要实现热加载,我们首先需要监控配置文件的变化、再然后原子更新配置文件中变化的值,最后取消还在探测旧地址的任务。
- 原子更新:引入
ArcSwap。传统的 RwLock在高频读取下锁竞争严重,而 ArcSwap基于 RCU 机制,读操作几乎无锁,非常适合“读多写少”的配置场景。 - 任务编排:使用
CancellationToken优雅地杀死旧的健康检查任务,并启动新任务。
核心代码
引入依赖
notify = "8"
tokio-util = "0.7"
notify: 用于文件监听
tokio-util: CancellationToken用于实现可取消任务
原子状态管理
改造全局状态的存储方式
修改 config的类型 Arc<AppConfig>为 Arc<ArcSwap<AppConfig>>。
使用 ArcSwap修饰 AppConfig, 这样我们就可以在后台原子地替换 AppConfig里的内容,而正在读取配置的线程不会受到任何影响。
修改 AppState 为原子 Map
use arc_swap::ArcSwap;
structAppState {
upstreams: ArcSwap<HashMap<String, Arc<UpstreamState>>>,
}
upstreams: 支持原子替换的 Map
辅助的配置加载与状态创建函数
fnload_config(path: &str) -> Result<AppConfig, ProxyError> {
let config_content = fs::read_to_string(path)
.map_err(|e| error(format!("Failed to read config file: {}", e)))?;
let config: AppConfig = toml::from_str(&config_content)
.map_err(|e| error(format!("Failed to parse config TOML: {}", e)))?;
Ok(config)
}
fncreate_state_from_config(config: &AppConfig) -> (HashMap<String, Arc<UpstreamState>>, ()) {
letmut upstreams_state = HashMap::new();
for (name, u_conf) in &config.upstreams {
upstreams_state.insert(
name.clone(),
Arc::new(UpstreamState {
active_urls: ArcSwap::from_pointee(u_conf.urls.clone()),
counter: AtomicUsize::new(0),
}),
);
}
(upstreams_state, ())
}
main 函数中初始加载配置
let args = Cli::parse();
let config_path = args.config.clone();
let initial_config = load_config(&config_path)?;
let (initial_state_map, _) = create_state_from_config(&initial_config);
let app_config = Arc::new(ArcSwap::from_pointee(initial_config));
let app_state = Arc::new(AppState {
upstreams: ArcSwap::new(Arc::new(initial_state_map)),
});
let current_conf = app_config.load();
let certs = load_certs(¤t_conf.server.cert_file)?;
let key = load_private_key(¤t_conf.server.key_file)?;
初始化全局状态,将 Config和 AppState都放入 ArcSwap,以便原子替换
current_conf: 是从容器里取出的“瞬间的数据快照”,加载证书暂不支持热重载
修改 proxy_handler逻辑
修改函数签名
#[instrument(skip(client, config, state, req), fields(method = %req.method(), uri = %req.uri()))]
asyncfnproxy_handler(
req: Request<Incoming>,
client: Arc<HttpClient>,
config: Arc<ArcSwap<AppConfig>>,
state: Arc<AppState>,
remote_addr: SocketAddr,
) -> Result<Response<BoxBody<Bytes, ProxyError>>, ProxyError> {
}
config类型修改为 Arc<ArcSwap<AppConfig>>
在 proxy_handler中调用 .load()读取配置
let current_config = config.load();
for route in ¤t_config.routes { ... }
config.load(): 获取当前配置快照,引用计数+1,速度极快
¤t_config.routes: 后续使用快照进行路由匹配
获取 Upstream 状态
let current_state_map = state.upstreams.load();
state.upstreams.load(): 加载状态快照,后续读取 upstreams并使用的代码和上一版一致
配置热加载与健康检查管理
监控文件变更
开启一个通道来负责监听文件变化,并接收文件变更事件。
在 main函数初始化 hyper_util::client::legacy::Client后修改逻辑
let (tx, mut rx) = mpsc::channel(1);
letmut watcher = RecommendedWatcher::new(
move |res| {
let _ = tx.blocking_send(res);
},
notify::Config::default(),
)?;
watcher.watch(Path::new(&config_path), RecursiveMode::NonRecursive)?;
let manager_config = app_config.clone();
let manager_state = app_state.clone();
let manager_client = client.clone();
let config_file_path = config_path.clone()
watcher.watch...: 启动文件监听器
启动“配置管理 + 健康检查”任务
tokio::spawn(asyncmove {
letmut cancel_token = CancellationToken::new();
letmut health_handle = tokio::spawn(start_health_check_loop(
manager_config.clone(),
manager_state.clone(),
manager_client.clone(),
cancel_token.clone(),
));
info!("Configuration watcher started for: {}", config_file_path);
whileletSome(res) = rx.recv().await {
match res {
Ok(event) => {
if matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)) {
info!("Config file changed, reloading...");
// 延迟等待,防止文件写入未完成
tokio::time::sleep(Duration::from_millis(100)).await;
match load_config(&config_file_path) {
Ok(new_conf) => {
info!("Config reloaded successfully!");
manager_config.store(Arc::new(new_conf.clone()));
let (new_state_map, _) = create_state_from_config(&new_conf);
manager_state.upstreams.store(Arc::new(new_state_map));
cancel_token.cancel();
let _ = health_handle.await;
info!("Old health check task stopped.");
info!("Starting new health check task...");
cancel_token = CancellationToken::new();
health_handle = tokio::spawn(start_health_check_loop(
manager_config.clone(),
manager_state.clone(),
manager_client.clone(),
cancel_token.clone(),
));
}
Err(e) => {
error!("Failed to reload config: {}. Keeping old config.", e);
}
}
}
}
Err(e) => error!("Watch error: {:?}", e),
}
}
});
let mut cancel_token = CancellationToken::new();: 启动初始的健康检查任务,创建取消令牌,用于控制健康检查任务。
let mut health_handle = tokio::spawn(start_health_check_loop(..., cancel_token.clone()));: 启动初始任务,并持有句柄
while let Some(res) = rx.recv().await: 循环等待文件变更事件
matches!(event.kind, EventKind::Modify(_) | EventKind::Create(_)): 不同编辑器保存文件的方式不同,有时是 Modify,有时是 Rename/Create,所以这里监听 Modify 和 Create 能覆盖大多数情况
manager_config.store(Arc::new(new_conf.clone()));: 更新 Config,原子替换
manager_state.upstreams.store(Arc::new(new_state_map));: 更新 State
重启健康检查任务
cancel_token.cancel();: 发送取消信号给旧任务
let _ = health_handle.await;: 等待旧任务彻底退出
ealth_handle = tokio::spawn(start_health_check_loop(...): 启动新任务
注意⚠️ health_handle.await这一行。它保证了我们不会同时运行两个健康检查任务,避免了并发冲突。
支持取消的健康检查逻辑
asyncfnstart_health_check_loop(
config: Arc<ArcSwap<AppConfig>>,
state: Arc<AppState>,
client: Arc<HttpClient>,
token: CancellationToken,
) {
letmut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = interval.tick() => {
// 继续执行
}
_ = token.cancelled() => {
info!("Health check loop stopped.");
return;
}
}
let current_config = config.load();
let current_state_map = state.upstreams.load();
for (name, upstream_config) in ¤t_config.upstreams {
ifletSome(upstream_state) = current_state_map.get(name) {
letmut healthy_urls = Vec::new();
for url in &upstream_config.urls {
if check_upstream(&client, url).await {
healthy_urls.push(url.clone());
} else {
warn!("Health check failed for: {}", url);
}
}
if healthy_urls.is_empty() {
warn!("All nodes down for upstream: {}", name);
upstream_state.active_urls.store(Arc::new(vec![]));
} else {
let current_len = upstream_state.active_urls.load().len();
if current_len != healthy_urls.len() {
info!(
"Upstream '{}' healthy nodes changed: {} -> {:?}",
name, current_len, healthy_urls
);
}
upstream_state.active_urls.store(Arc::new(healthy_urls));
}
}
}
}
}
token: CancellationToken,: 函数新增支持取消的令牌参数
tokio::select!: 使用 select!同时等待继续执行还是取消的任务状态,当 oken.cancelled()到达时,loop循环将停止。
config和 state: 后续load当前配置快照以及当前状态快照使用
验证与测试:零停机演示
1. 启动网关
此时 config.toml的配置如下
[upstreams]
[upstreams.httpbin_cluster]
urls = [
"https://httpbin.org",
"https://postman-echo.com"
]
日志输出如下:
2026-01-21T14:18:23.132429Z INFO hyper_proxy_tool: Metrics endpoint exposed at http://0.0.0.0:9000/metrics
2026-01-21T14:18:23.244641Z INFO hyper_proxy_tool: Configuration watcher started for: config.toml
2026-01-21T14:18:23.244641Z INFO hyper_proxy_tool: HTTPS Proxy Server listening on https://0.0.0.0:8443
2026-01-21T14:18:25.247743Z WARN hyper_proxy_tool: Health check failed for: https://httpbin.org
2026-01-21T14:18:26.548561Z INFO hyper_proxy_tool: Upstream 'httpbin_cluster' healthy nodes changed: 2 -> ["https://postman-echo.com"]
2026-01-21T14:18:26.549310Z WARN hyper_proxy_tool: Health check failed for: http://127.0.0.1:9001
2026-01-21T14:18:26.549359Z WARN hyper_proxy_tool: All nodes down for upstream: local_ws
2026-01-21T14:18:29.381095Z INFO hyper_proxy_tool: Upstream 'httpbin_cluster' healthy nodes changed: 1 -> ["https://httpbin.org", "https://postman-echo.com"]
正常启动并输出可访问的上游节点
2. 修改配置
postman-echo改成 postman-echo1
[upstreams]
[upstreams.httpbin_cluster]
urls = [
"https://httpbin.org",
"https://postman-echo1.com"
]
3. 观察日志
日志输出如下:
2026-01-21T14:20:49.231583Z INFO hyper_proxy_tool: Config file changed, reloading...
2026-01-21T14:20:49.333868Z INFO hyper_proxy_tool: Config reloaded successfully!
2026-01-21T14:20:49.333938Z INFO hyper_proxy_tool: Health check loop stopped.
2026-01-21T14:20:49.333956Z INFO hyper_proxy_tool: Old health check task stopped.
2026-01-21T14:20:49.333960Z INFO hyper_proxy_tool: Starting new health check task...
2026-01-21T14:20:49.334008Z INFO hyper_proxy_tool: Config file changed, reloading...
2026-01-21T14:20:49.436454Z INFO hyper_proxy_tool: Config reloaded successfully!
2026-01-21T14:20:49.866433Z WARN hyper_proxy_tool: Health check failed for: https://postman-echo1.com
2026-01-21T14:20:49.866472Z INFO hyper_proxy_tool: Upstream 'httpbin_cluster' healthy nodes changed: 2 -> ["https://httpbin.org"]
Config file changed, reloading...
看出配置被实时进行热加载了,
Old health check task stopped.
旧的健康检查任务被停止了
Config reloaded successfully!
新的配置加载成功
Health check failed for: https://postman-echo1.com
使用新的配置
4. 发起curl请求
curl -v -k https://127.0.0.1:8443/api/v1/get
日志输出如下:
2026-01-21T14:28:27.841426Z INFO proxy_handler{remote_addr=127.0.0.1:63905 method=GET uri=https://127.0.0.1:8443/api/v1/get}: hyper_proxy_tool: Attempt 1/1: Routing to https://httpbin.org (Buffered)
Attempt 1/1:: 可以看出只会有一个可用的健康节点了
5. 验证结果
- 新请求:新发起的
curl请求,会立即使用新的路由规则,只会转发到 httpbin.org而不是旧的配置 postman-echo.com上。
这就是 RCU机制的威力。
完整代码地址链接
https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=8ea5c6e0ce4d6ea2141318a08c12c218
总结
- ArcSwap:用于在读多写少场景下的无锁状态共享。
- CancellationToken:实现了优雅的异步任务终止与重启。
- Channel Watcher:基于事件驱动的控制流。
现在,我们的程序实现了在不中断现有连接的前提下,动态更新网关的路由和上游配置,实现“零停机运维”。
Happy Coding with Rust🦀!
往期文章