在多线程并发编程中,线程并非孤立执行,常常需要协同工作完成复杂任务。例如生产者线程生产数据,消费者线程消费数据,二者需要通过通信协调生产和消费的节奏,避免数据积压或消费空数据。Java 提供了多种线程通信工具,本文将从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的并发工具类( CountDownLatch 、 CyclicBarrier 等),全面解析线程通信的实现原理与实战场景。- 等待 / 唤醒:线程 A 执行到某一阶段后,需要等待线程 B 完成任务才能继续执行,线程 B 完成后唤醒线程 A。
- 生产 / 消费:生产者线程生产数据后,唤醒消费者线程消费;消费者线程消费完数据后,唤醒生产者线程生产。
- 多线程同步:多个线程需要等待所有线程都完成某一阶段任务后,再一起执行后续操作。
没有通信的多线程,只能通过轮询的方式检测状态,这会浪费大量 CPU 资源。而合理的通信机制,能让线程在需要等待时释放资源,被唤醒时恢复执行,提升程序性能。wait() 、 notify() 、 notifyAll() 是 Object 类的成员方法,所有 Java 对象都自带这些方法,它们必须在 synchronized 同步代码块 / 方法中使用,否则会抛出 IllegalMonitorStateException 异常。- wait():线程调用对象的 wait() 方法时,会释放该对象的锁,并进入该对象的等待队列,直到被其他线程唤醒。
- notify():线程调用对象的 notify() 方法时,会唤醒该对象等待队列中的一个随机线程,被唤醒的线程需要重新竞争锁才能继续执行。
- notifyAll():唤醒该对象等待队列中的所有线程,所有被唤醒的线程会竞争锁,只有获取到锁的线程能继续执行。
2. 经典案例:wait/notify 实现生产者 - 消费者模型import java.util.LinkedList;import java.util.Queue;/** * 生产者-消费者模型:仓库容量为3,生产满时生产者等待,消费空时消费者等待 */public class ProducerConsumerWaitNotify { // 仓库:存储产品 private static final Queue<String> WAREHOUSE = new LinkedList<>(); // 仓库最大容量 private static final int MAX_CAPACITY = 3; // 锁对象 private static final Object LOCK = new Object(); // 生产者线程:生产产品 static class Producer extends Thread { @Override public void run() { int count = 0; while (true) { synchronized (LOCK) { // 仓库满时,生产者等待 while (WAREHOUSE.size() == MAX_CAPACITY) { try { System.out.println("仓库已满,生产者等待..."); LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 生产产品 String product = "产品" + (++count); WAREHOUSE.add(product); System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size()); // 唤醒消费者线程 LOCK.notifyAll(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 消费者线程:消费产品 static class Consumer extends Thread { @Override public void run() { while (true) { synchronized (LOCK) { // 仓库空时,消费者等待 while (WAREHOUSE.isEmpty()) { try { System.out.println("仓库已空,消费者等待..."); LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 消费产品 String product = WAREHOUSE.poll(); System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size()); // 唤醒生产者线程 LOCK.notifyAll(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new Producer().start(); new Consumer().start(); }}
- 必须在同步代码块中使用:调用 wait() / notify() 前,线程必须获取该对象的锁,否则会抛出异常。
- 使用 while 循环判断条件:不能用 if 判断,因为线程被唤醒后可能条件仍不满足(如多个消费者被唤醒,仓库已空), while 会重新检查条件。
- notify() 与 notifyAll() 的选择: notify() 可能导致线程 “假死”(唤醒的是同类线程,如生产者唤醒生产者),推荐使用 notifyAll() 保证唤醒正确的线程。
Condition 是 java.util.concurrent.locks 包下的接口,是 Lock 锁的配套通信工具,相比 wait/notify 更加灵活,支持精准唤醒指定线程组 。- Condition 对象由 Lock 对象创建,一个 Lock 可以创建多个 Condition 对象,对应多个等待队列。
- await():类似 wait() ,线程释放锁并进入 Condition 的等待队列。
- signal():类似 notify() ,唤醒 Condition 等待队列中的一个线程。
- signalAll():类似 notifyAll() ,唤醒 Condition 等待队列中的所有线程。
2. 经典案例:Condition 实现精准唤醒的生产者 - 消费者模型import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * Condition实现生产者-消费者:精准唤醒生产者/消费者,避免唤醒同类线程 */public class ProducerConsumerCondition { private static final Queue<String> WAREHOUSE = new LinkedList<>(); private static final int MAX_CAPACITY = 3; // 创建可重入锁 private static final Lock LOCK = new ReentrantLock(); // 生产者等待的Condition private static final Condition PRODUCER_COND = LOCK.newCondition(); // 消费者等待的Condition private static final Condition CONSUMER_COND = LOCK.newCondition(); static class Producer extends Thread { @Override public void run() { int count = 0; while (true) { LOCK.lock(); try { while (WAREHOUSE.size() == MAX_CAPACITY) { System.out.println("仓库已满,生产者等待..."); // 生产者进入PRODUCER_COND等待队列 PRODUCER_COND.await(); } String product = "产品" + (++count); WAREHOUSE.add(product); System.out.println(Thread.currentThread().getName() + " 生产了:" + product + ",仓库容量:" + WAREHOUSE.size()); // 精准唤醒消费者线程 CONSUMER_COND.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread { @Override public void run() { while (true) { LOCK.lock(); try { while (WAREHOUSE.isEmpty()) { System.out.println("仓库已空,消费者等待..."); // 消费者进入CONSUMER_COND等待队列 CONSUMER_COND.await(); } String product = WAREHOUSE.poll(); System.out.println(Thread.currentThread().getName() + " 消费了:" + product + ",仓库容量:" + WAREHOUSE.size()); // 精准唤醒生产者线程 PRODUCER_COND.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new Producer().start(); new Consumer().start(); }}
3. Condition 与 wait/notify 的核心对比 | | |
|---|
| | |
| | |
| | |
| | |
| | 通过Lock创建,需在lock/unlock之间使用 |
除了基础的等待唤醒机制,JUC 包还提供了一些高级工具类,简化多线程通信与同步的实现。1. CountDownLatch:倒计时门闩,等待多个线程完成任务核心作用:让一个或多个线程等待其他多个线程完成任务后,再继续执行。原理:初始化一个计数器,线程完成任务后调用 countDown() 方法让计数器减 1,等待线程调用 await() 方法阻塞,直到计数器变为 0。适用场景:主线程等待多个子线程初始化完成后,再执行后续逻辑。代码案例:CountDownLatch 实现线程初始化等待import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo { // 计数器:需要3个线程初始化完成 private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(3); static class InitThread extends Thread { private final String threadName; public InitThread(String threadName) { this.threadName = threadName; } @Override public void run() { System.out.println(threadName + " 开始初始化..."); try { // 模拟初始化耗时 Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadName + " 初始化完成!"); // 计数器减1 COUNT_DOWN_LATCH.countDown(); } } public static void main(String[] args) throws InterruptedException { new InitThread("初始化线程1").start(); new InitThread("初始化线程2").start(); new InitThread("初始化线程3").start(); System.out.println("主线程等待所有初始化线程完成..."); // 主线程阻塞,直到计数器为0 COUNT_DOWN_LATCH.await(); System.out.println("所有线程初始化完成,主线程开始执行任务!"); }}
2. CyclicBarrier:循环屏障,让多个线程到达同一屏障点后再继续核心作用:让多个线程相互等待,直到所有线程都到达指定的 “屏障点”,再一起继续执行。原理:初始化屏障的线程数量,线程到达屏障点时调用 await() 方法阻塞,当所有线程都调用 await() 后,屏障打开,所有线程继续执行。适用场景:多个线程需要协同完成任务,必须等所有线程都准备好后才能开始执行。与 CountDownLatch 的区别: CyclicBarrier 可以重复使用(调用 reset() 重置屏障), CountDownLatch 的计数器只能使用一次。代码案例:CyclicBarrier 实现多线程协同执行import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo { // 屏障:3个线程到达后一起执行 private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(3, () -> { System.out.println("所有线程已到达屏障点,开始执行任务!"); }); static class WorkerThread extends Thread { private final String threadName; public WorkerThread(String threadName) { this.threadName = threadName; } @Override public void run() { System.out.println(threadName + " 正在前往屏障点..."); try { Thread.sleep((long) (Math.random() * 1000)); System.out.println(threadName + " 到达屏障点,等待其他线程..."); // 到达屏障点,等待其他线程 CYCLIC_BARRIER.await(); // 屏障打开后执行的任务 System.out.println(threadName + " 执行任务ing..."); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { new WorkerThread("工作线程1").start(); new WorkerThread("工作线程2").start(); new WorkerThread("工作线程3").start(); }}
3. Semaphore:信号量,控制同时访问资源的线程数量核心作用:控制同时访问特定资源的线程数量,通过信号量的 “许可” 机制实现。原理:初始化许可数量,线程访问资源前调用 acquire() 获取许可,访问完成后调用 release() 释放许可;若没有可用许可,线程会阻塞直到有许可被释放。适用场景:限流场景,如限制同时连接数据库的线程数量、限制接口的并发访问量。import java.util.concurrent.Semaphore;public class SemaphoreDemo { // 信号量:最多允许2个线程同时访问资源 private static final Semaphore SEMAPHORE = new Semaphore(2); static class AccessThread extends Thread { private final String threadName; public AccessThread(String threadName) { this.threadName = threadName; } @Override public void run() { try { // 获取许可 SEMAPHORE.acquire(); System.out.println(threadName + " 获取到许可,开始访问资源..."); Thread.sleep(1000); System.out.println(threadName + " 访问资源完成,释放许可!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放许可 SEMAPHORE.release(); } } } public static void main(String[] args) { // 5个线程竞争2个许可 for (int i = 1; i <= 5; i++) { new AccessThread("访问线程" + i).start(); } }}
- 基础场景:简单的生产者 - 消费者模型,使用 wait/notify 即可满足需求,注意在同步代码块中使用并通过 while 判断条件。
- 进阶场景:需要精准唤醒指定线程组时,优先使用 Condition 接口,搭配 ReentrantLock 锁,灵活性更高。
- 复杂场景:多线程同步等待(如初始化、协同执行)使用 CountDownLatch / CyclicBarrier ;限流场景使用 Semaphore 。
- 核心原则:线程通信的本质是 状态共享与协同 ,无论使用哪种工具,都要保证共享状态的线程安全,避免出现数据错乱。
本文详细讲解了 Java 线程通信的多种方式,从基础的 wait/notify 机制,到进阶的 Condition 接口,再到实用的 JUC 工具类,每种方式都有其适用场景。掌握这些通信工具,能帮助我们更好地实现多线程的协同工作,解决复杂的并发问题。下一篇文章,我们将深入讲解 Java 线程池的核心原理、参数配置与实战应用,这是高并发开发中必不可少的技术。