❝"不用担心数据竞争,但还是要小心死锁。" —— 每个 Rust 程序员的心声
引言:为什么要学并发?
想象你在一家餐厅,厨房里只有一个厨师(单线程)。客人点了三道菜:沙拉、牛排和甜点。这个厨师必须做完沙拉,再做牛排,最后做甜点。结果客人等了一个小时才吃上饭。
现在想象厨房里有三个厨师(多线程),他们可以同时准备不同的菜。结果?客人 20 分钟就能享用完整的一餐。这就是并发的魅力。
但问题来了:如果三个厨师都想用同一把刀,怎么办?如果他们不小心把盐罐和糖罐搞混了,怎么办?这就是并发编程需要解决的问题,而 Rust 在这方面简直是个天才。
第一章:线程 (Threads) —— 你的第一个并发程序
What - 什么是线程?
线程是程序中独立执行的最小单位。在 Rust 中,线程使用操作系统的原生线程。每个线程都有自己的栈空间,可以独立运行代码。
Why - 为什么需要线程?
- 充分利用 CPU:现代 CPU 都是多核的,不用就是浪费
- 提高响应性:一个线程处理耗时任务时,其他线程可以响应用户
How - 怎么创建线程?
use std::thread;use std::time::Duration;fnmain() {// 创建一个新线程let handle = thread::spawn(|| {for i in1..10 {println!("子线程: 数字 {}", i); thread::sleep(Duration::from_millis(1)); } });// 主线程继续执行for i in1..5 {println!("主线程: 数字 {}", i); thread::sleep(Duration::from_millis(1)); }// 等待子线程完成 handle.join().unwrap();}
关键点:
thread::spawn 接受一个闭包,在新线程中执行
线程与所有权
这是 Rust 的精髓所在。看看这个错误示例:
fnmain() {let v = vec![1, 2, 3];let handle = thread::spawn(|| {println!("向量: {:?}", v); // ❌ 编译错误! }); handle.join().unwrap();}
编译器会说:"兄弟,我不知道这个线程会运行多久,万一主线程把 v 释放了怎么办?"
正确做法 —— 使用 move:
fnmain() {let v = vec![1, 2, 3];let handle = thread::spawn(move || {println!("向量: {:?}", v); // ✅ 所有权转移到线程内 }); handle.join().unwrap();// v 在这里已经不可用了}
最佳实践:
第二章:Channel —— 线程间的"传声筒"
What - 什么是 Channel?
Channel 是一种消息传递机制,允许单向信息流动,包含发送端(Sender)和接收端(Receiver)。就像邮局:一个线程把消息投进邮筒(发送),另一个线程从信箱取出(接收)。
Why - 为什么用 Channel?
Go 语言有句名言:"不要通过共享内存来通信,而要通过通信来共享内存。" Channel 就是这个理念的实现:
How - 怎么使用 Channel?
基础用法
use std::sync::mpsc; // mpsc = Multiple Producer, Single Consumeruse std::thread;fnmain() {let (tx, rx) = mpsc::channel(); thread::spawn(move || {let val = String::from("你好"); tx.send(val).unwrap();// val 的所有权已经转移,这里不能再用 });let received = rx.recv().unwrap();println!("收到: {}", received);}
多个生产者
use std::sync::mpsc;use std::thread;fnmain() {let (tx, rx) = mpsc::channel();// 克隆发送者,创建多个生产者for i in0..3 {let tx_clone = tx.clone(); thread::spawn(move || { tx_clone.send(format!("线程 {} 发来消息", i)).unwrap(); }); }// 必须丢弃原始的 tx,否则 rx 会一直等待drop(tx);// 接收所有消息for received in rx {println!("收到: {}", received); }}
同步 Channel (有界队列)
use std::sync::mpsc;use std::thread;use std::time::Duration;fnmain() {// 创建容量为 2 的同步 channellet (tx, rx) = mpsc::sync_channel(2); thread::spawn(move || {for i in1..=5 {println!("发送: {}", i); tx.send(i).unwrap(); // 超过容量会阻塞 thread::sleep(Duration::from_millis(100)); } }); thread::sleep(Duration::from_secs(1));for received in rx {println!("接收: {}", received); thread::sleep(Duration::from_millis(500)); }}
Channel 类型对比:
| | | |
|---|
channel() | | | |
sync_channel(0) | | | |
sync_channel(n) | | | |
最佳实践:
第三章:共享状态并发 —— Mutex 和 Arc
What - 什么是共享状态?
有时候,channel 太麻烦了。比如你有一个计数器,10 个线程都要修改它。用 channel 的话,你得发送消息、接收消息、更新计数器、再发回去……太繁琐了!
这时候就需要共享状态:多个线程直接访问同一块内存。
Why - 为什么需要 Mutex?
想象十个人同时修改一个文档,没有任何协调。结果?一团糟。
Mutex(互斥锁)确保在任意时刻只有一个线程能访问数据。它像一个守门员:"一次只能进一个人!"
How - Mutex 基础
use std::sync::Mutex;fnmain() {let m = Mutex::new(5); {letmut num = m.lock().unwrap(); // 获取锁 *num = 6; } // 锁在这里自动释放println!("m = {:?}", m);}
关键概念:
MutexGuard 实现了 Deref,可以像普通引用一样使用- 离开作用域时,
MutexGuard 自动释放锁(RAII)
Arc + Mutex 组合拳
问题: Mutex 不能直接在线程间共享,怎么办?
答案: 用 Arc! Arc 是原子引用计数指针,是线程安全的智能指针。
use std::sync::{Arc, Mutex};use std::thread;fnmain() {let counter = Arc::new(Mutex::new(0));letmut handles = vec![];for _ in0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {letmut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); }for handle in handles { handle.join().unwrap(); }println!("结果: {}", *counter.lock().unwrap());}
工作原理:
Arc::new(Mutex::new(0)) 创建共享的互斥计数器Arc::clone(&counter) 克隆 Arc,增加引用计数- 每个线程持有一个 Arc 副本,指向同一个 Mutex
RwLock —— 读多写少的优化
RwLock 允许多个读者或单个写者,适合读操作远多于写操作的场景。
use std::sync::{Arc, RwLock};use std::thread;fnmain() {let data = Arc::new(RwLock::new(vec![1, 2, 3]));// 多个读线程letmut handles = vec![];for _ in0..5 {let data = Arc::clone(&data);let h = thread::spawn(move || {let r = data.read().unwrap();println!("读: {:?}", *r); }); handles.push(h); }// 一个写线程let data = Arc::clone(&data); handles.push(thread::spawn(move || {letmut w = data.write().unwrap(); w.push(4); }));for h in handles { h.join().unwrap(); }}
Mutex vs RwLock:
最佳实践:
- 只在真正需要共享可变状态时使用 Arc,不要过度包装
- 读操作 > 写操作 10 倍以上时,考虑 RwLock
第四章:Arc + Mutex 模式深度剖析
为什么是 Arc + Mutex?
这是 Rust 并发编程的黄金组合。让我们理解每个部分的作用:
Arc (Atomic Reference Counting):
Mutex:
- 提供内部可变性(interior mutability)
组合效果:
Arc<Mutex<T>> = 多所有权 + 可变性 + 线程安全
常见模式:共享配置
use std::sync::{Arc, Mutex};use std::thread;#[derive(Clone)]structConfig { timeout: u64, max_connections: usize,}fnmain() {let config = Arc::new(Mutex::new(Config { timeout: 30, max_connections: 100, }));letmut handles = vec![];// 10 个工作线程读取配置for i in0..10 {let config = Arc::clone(&config);let h = thread::spawn(move || {let cfg = config.lock().unwrap();println!("线程 {} 使用超时: {}s", i, cfg.timeout); }); handles.push(h); }// 主线程更新配置 {letmut cfg = config.lock().unwrap(); cfg.timeout = 60; cfg.max_connections = 200; }for h in handles { h.join().unwrap(); }}
细粒度锁 vs 粗粒度锁
粗粒度锁(不推荐):
structDatabase { users: Vec<User>, posts: Vec<Post>, comments: Vec<Comment>,}let db = Arc::new(Mutex::new(Database { ... }));// 即使只修改 users,整个数据库都被锁住letmut db = db.lock().unwrap();db.users.push(new_user);
细粒度锁(推荐):
structDatabase { users: Arc<Mutex<Vec<User>>>, posts: Arc<Mutex<Vec<Post>>>, comments: Arc<Mutex<Vec<Comment>>>,}// 只锁需要的部分letmut users = db.users.lock().unwrap();users.push(new_user);// posts 和 comments 仍然可用
手动释放锁
有时你需要提前释放锁:
use std::sync::{Arc, Mutex};fnmain() {let data = Arc::new(Mutex::new(0)); {letmut num = data.lock().unwrap(); *num += 1;// 方法 1: 显式 dropdrop(num);// 现在锁已释放,可以再次获取let num2 = data.lock().unwrap();println!("{}", *num2); }// 方法 2: 创建内部作用域 {letmut num = data.lock().unwrap(); *num += 1; } // 锁在这里释放 do_something_else();}
处理锁中毒(Poisoning)
当持有锁的线程 panic 时,锁会被标记为"中毒":
use std::sync::{Arc, Mutex};use std::thread;fnmain() {let data = Arc::new(Mutex::new(0));let data_clone = Arc::clone(&data);let _ = thread::spawn(move || {letmut num = data_clone.lock().unwrap(); *num += 1;panic!("糟糕!"); // 锁中毒 }).join();// 处理中毒的锁match data.lock() {Ok(mut guard) => { *guard += 1;println!("正常: {}", *guard); }Err(poisoned) => {letmut guard = poisoned.into_inner(); *guard = 0; // 重置数据 data.clear_poison();println!("已恢复中毒锁"); } }}
第五章:并发陷阱与最佳实践
陷阱 1: 死锁 (Deadlock)
什么是死锁?
两个或多个线程互相等待对方释放资源,导致无限等待。
经典死锁示例:
use std::sync::{Arc, Mutex};use std::thread;use std::time::Duration;fnmain() {let resource1 = Arc::new(Mutex::new(1));let resource2 = Arc::new(Mutex::new(2));let r1 = Arc::clone(&resource1);let r2 = Arc::clone(&resource2);let handle1 = thread::spawn(move || {let _g1 = r1.lock().unwrap();println!("线程1获取resource1"); thread::sleep(Duration::from_millis(100));let _g2 = r2.lock().unwrap(); // 等待 resource2println!("线程1获取resource2"); });let r1 = Arc::clone(&resource1);let r2 = Arc::clone(&resource2);let handle2 = thread::spawn(move || {let _g2 = r2.lock().unwrap();println!("线程2获取resource2"); thread::sleep(Duration::from_millis(100));let _g1 = r1.lock().unwrap(); // 等待 resource1println!("线程2获取resource1"); }); handle1.join().unwrap(); handle2.join().unwrap();}
结果: 程序卡死,两个线程永远等待。
解决方案: 统一锁顺序
// ✅ 总是按相同顺序获取锁fntransfer(from: &Arc<Mutex<i32>>, to: &Arc<Mutex<i32>>, amount: i32) {// 始终先锁地址小的,再锁地址大的let (first, second) = if Arc::as_ptr(from) < Arc::as_ptr(to) { (from, to) } else { (to, from) };letmut f = first.lock().unwrap();letmut s = second.lock().unwrap(); *f -= amount; *s += amount;}
防止死锁的其他策略:
陷阱 2: 数据竞争 vs 竞态条件
很多人混淆这两个概念:
数据竞争(Data Race):
竞态条件(Race Condition):
竞态条件示例:
use std::sync::{Arc, Mutex};use std::thread;fnmain() {let balance = Arc::new(Mutex::new(100));let b1 = Arc::clone(&balance);let h1 = thread::spawn(move || {let current = *b1.lock().unwrap();if current >= 50 {// 这里可能被打断! thread::sleep(std::time::Duration::from_millis(10)); *b1.lock().unwrap() -= 50; } });let b2 = Arc::clone(&balance);let h2 = thread::spawn(move || {let current = *b2.lock().unwrap();if current >= 50 { thread::sleep(std::time::Duration::from_millis(10)); *b2.lock().unwrap() -= 50; } }); h1.join().unwrap(); h2.join().unwrap();// 余额可能是 0(正确)或 50(错误)println!("余额: {}", *balance.lock().unwrap());}
解决方案: 原子操作
use std::sync::{Arc, Mutex};fnwithdraw(balance: &Arc<Mutex<i32>>, amount: i32) -> bool {letmut bal = balance.lock().unwrap();if *bal >= amount { *bal -= amount;true } else {false }} // 锁在函数结束时释放,保证原子性
陷阱 3: 过度使用 Arc
错误做法:
// ❌ 不需要共享的数据也用 Arcfnprocess_data(data: Arc<Vec<i32>>) {for num in data.iter() {println!("{}", num); }}
正确做法:
// ✅ 只在需要时使用 Arcfnprocess_data(data: &[i32]) {for num in data {println!("{}", num); }}fnmain() {let data = vec![1, 2, 3];// 不跨线程,直接用引用 process_data(&data);// 跨线程时才用 Arclet data = Arc::new(data);let data_clone = Arc::clone(&data); thread::spawn(move || { process_data(&data_clone); });}
陷阱 4: 在锁内调用未知代码
use std::sync::{Arc, Mutex};structProcessor { callback: Box<dynFn(i32) + Send>,}impl Processor {fnprocess(&self, data: &Arc<Mutex<i32>>) {letmut num = data.lock().unwrap(); *num += 1;// ❌ 危险!如果 callback 内部也尝试获取同一个锁? (self.callback)(*num);// 或者 callback 非常慢,导致锁被长时间持有 }}
正确做法:
impl Processor {fnprocess(&self, data: &Arc<Mutex<i32>>) {let value = {letmut num = data.lock().unwrap(); *num += 1; *num }; // 锁在这里释放// ✅ 在锁外调用 callback (self.callback)(value); }}
陷阱 5: Send 和 Sync 误用
Send: 可以安全地在线程间转移所有权Sync: 可以安全地在线程间共享引用
常见错误:
use std::rc::Rc;use std::thread;fnmain() {let rc = Rc::new(5);let rc_clone = Rc::clone(&rc);// ❌ Rc 不是 Send,不能发送到其他线程 thread::spawn(move || {println!("{}", rc_clone); });}
正确:
use std::sync::Arc;use std::thread;fnmain() {let arc = Arc::new(5);let arc_clone = Arc::clone(&arc);// ✅ Arc 是 Send thread::spawn(move || {println!("{}", arc_clone); }).join().unwrap();}
第六章:高级技巧与模式
1. 原子类型 —— 无锁并发
对于简单的整数操作,使用原子类型比 Mutex 更高效:
use std::sync::atomic::{AtomicUsize, Ordering};use std::sync::Arc;use std::thread;fnmain() {let counter = Arc::new(AtomicUsize::new(0));letmut handles = vec![];for _ in0..10 {let counter = Arc::clone(&counter);let h = thread::spawn(move || {for _ in0..1000 { counter.fetch_add(1, Ordering::SeqCst); } }); handles.push(h); }for h in handles { h.join().unwrap(); }println!("结果: {}", counter.load(Ordering::SeqCst));}
何时使用:
2. 条件变量 —— 等待特定条件
use std::sync::{Arc, Mutex, Condvar};use std::thread;fnmain() {let pair = Arc::new((Mutex::new(false), Condvar::new()));let pair2 = Arc::clone(&pair); thread::spawn(move || {let (lock, cvar) = &*pair2;letmut started = lock.lock().unwrap(); *started = true; cvar.notify_one(); // 唤醒等待的线程 });let (lock, cvar) = &*pair;letmut started = lock.lock().unwrap();while !*started { started = cvar.wait(started).unwrap(); // 等待条件满足 }println!("线程已启动!");}
3. 线程池模式
use std::sync::{Arc, Mutex};use std::sync::mpsc;use std::thread;typeJob = Box<dynFnOnce() + Send + 'static>;structThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>,}structWorker { id: usize, thread: thread::JoinHandle<()>,}impl ThreadPool {fnnew(size: usize) -> ThreadPool {let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));letmut workers = Vec::with_capacity(size);for id in0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } }fnexecute<F>(&self, f: F)where F: FnOnce() + Send + 'static, {let job = Box::new(f);self.sender.send(job).unwrap(); }}impl Worker {fnnew(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv();match job {Ok(job) => {println!("工作者 {} 执行任务", id); job(); }Err(_) => break, } }); Worker { id, thread } }}
实战建议
何时用 Channel,何时用 Arc + Mutex?
使用 Channel:
使用 Arc + Mutex:
决策树:
需要跨线程传递数据?├─ 是,单向传递 → Channel├─ 是,需要共享可变状态 → Arc + Mutex└─ 否,只在单线程内 → 普通引用或 Rc
性能优化检查清单
- ✅ 考虑使用 RwLock 替代 Mutex(读多写少场景)
调试并发问题的技巧
1. 使用 ThreadSanitizer
RUSTFLAGS="-Z sanitizer=thread" cargo run
2. 打印线程 ID
use std::thread;println!("线程 {:?} 执行", thread::current().id());
3. 使用 tracing 记录并发事件
use tracing::{info, instrument};#[instrument]fnprocess_data(id: usize) { info!("开始处理 {}", id);// ... info!("完成处理 {}", id);}
4. 压力测试
#[test]fnstress_test() {for _ in0..1000 {// 运行你的并发代码 }}
常见问题 FAQ
Q1: Arc<Mutex> 和 Mutex<Arc> 有什么区别?
A:
Arc<Mutex<T>>: 共享一个可变的 T(正确用法)Mutex<Arc<T>>: 互斥地访问一个共享的 T(几乎从不需要)
Q2: 为什么不能用 Rc 代替 Arc?
A: Rc 的引用计数不是原子的,在多线程环境下会导致数据竞争。Arc 使用原子操作,线程安全。
Q3: 如何避免克隆 Arc?
A:
// ❌ 不必要的克隆for i in0..10 {let data = Arc::clone(&data); spawn(move || { ... });}// ✅ 只克隆需要的次数let handles: Vec<_> = (0..10) .map(|i| {let data = Arc::clone(&data); spawn(move || { ... }) }) .collect();
Q4: send() 和 recv() 什么时候会阻塞?
A:
send(): 使用 sync_channel 且队列满时阻塞recv(): 队列空时阻塞,直到有消息或发送端全部关闭
Q5: 可以在异步代码中使用这些并发原语吗?
A:
std::sync::Mutex: ❌ 会阻塞执行器tokio::sync::Mutex: ✅ 异步版本,不会阻塞Channel: 使用 tokio::sync::mpsc
总结:并发编程的禅意
Rust 并发编程的核心理念:
最后的建议:
❝"并发很难,但 Rust 让它变得不那么难。不要害怕尝试,编译器是你最好的老师。当程序通过编译时,你已经避免了 90% 的并发 bug。剩下的 10%,就是你需要用脑子思考的部分了。"
记住:Rust 不能让并发变简单,但它能让并发变安全。这已经是巨大的进步了!
推荐资源
- 📖 The Rust Book - Fearless Concurrency[1]
- 📖 Rust Atomics and Locks[2]
- 🎥 Crust of Rust: Channels[3]
- 📝 Tokio Tutorial[4] (异步并发)
现在,去写一些并发代码吧!记住:编译器是你的朋友,不是敌人。🦀
[1] The Rust Book - Fearless Concurrency: https://doc.rust-lang.org/book/ch16-00-concurrency.html
[2] Rust Atomics and Locks: https://marabos.nl/atomics/
[3] Crust of Rust: Channels: https://www.youtube.com/watch?v=b4mS5UPHh20
[4] Tokio Tutorial: https://tokio.rs/tokio/tutorial