深入浅出AQS:Java并发编程的核心基石
今天我们来聊聊Java并发编程中那个既神秘又重要的核心组件——AQS(AbstractQueuedSynchronizer)。它就像是并发世界的"隐形守护者",虽然你看不见它,但ReentrantLock、CountDownLatch、Semaphore等并发工具都离不开它的支持。
一、AQS是什么?为什么重要?
AQS,全称AbstractQueuedSynchronizer(抽象队列同步器),是Java并发包java.util.concurrent.locks的核心框架。它提供了一个基于FIFO等待队列的同步器框架,通过一个int类型的状态变量state来表示同步状态。
简单来说,**AQS就是Java并发工具类的"骨架"**。当你使用ReentrantLock时,背后就是AQS在默默工作。

二、AQS的核心设计思想
2.1 状态管理:state变量
AQS使用一个volatile int state变量来表示同步状态并通过CAS更新状态:
state的不同含义:
- 在ReentrantLock中:state=0表示锁空闲,state>0表示锁被持有,可重入

- 在CountDownLatch中:state表示计数器值
State变量的设计哲学
一个state变量,多种语义:通过将多个状态编码到一个int中,减少了多个变量间的同步问题。
CAS代替锁:通过CAS操作更新state,避免了使用重量级锁,提高了并发性能。
模板方法模式:AQS定义了state的管理框架,具体语义由子类实现。
位运算优化:使用位运算将多个状态压缩到一个变量中,如读写锁的高低16位分离。
2.2 CLH队列:线程等待的"候车室"
AQS内部维护了一个FIFO双向队列(CLH锁的变体),当线程获取同步状态失败时,会被包装成Node节点加入队列:
staticfinalclassNode{
// 节点模式
staticfinal Node SHARED = new Node(); // 共享模式
staticfinal Node EXCLUSIVE = null; // 独占模式
// 等待状态
staticfinalint CANCELLED = 1; // 取消
staticfinalint SIGNAL = -1; // 需要唤醒后继节点
staticfinalint CONDITION = -2; // 在条件队列中等待
staticfinalint PROPAGATE = -3; // 共享模式下传播
volatileint waitStatus; // 等待状态
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 节点关联的线程
Node nextWaiter; // 下一个等待者
}

队列的状态转换全流程

三、AQS的两种模式
3.1 独占模式(Exclusive)
同一时刻只有一个线程能获取同步状态,如ReentrantLock:
// 独占模式获取锁
publicfinalvoidacquire(int arg){
if (!tryAcquire(arg) && // 尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败,加入队列
selfInterrupt(); // 恢复中断状态
}
独占模式获取流程:

3.2 共享模式(Shared)
多个线程可以同时获取同步状态,如Semaphore、CountDownLatch:
// 共享模式获取
publicfinalvoidacquireShared(int arg){
if (tryAcquireShared(arg) < 0) // 尝试获取
doAcquireShared(arg); // 获取失败,加入队列等待
}
四、源码深度解析:以ReentrantLock为例
4.1 ReentrantLock的公平锁实现
// 公平锁的tryAcquire实现
protectedfinalbooleantryAcquire(int acquires){
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前状态
if (c == 0) { // 锁未被占用
if (!hasQueuedPredecessors() && // 公平性:检查是否有前驱节点
compareAndSetState(0, acquires)) { // CAS设置状态
setExclusiveOwnerThread(current); // 设置当前线程为持有者
returntrue;
}
}
elseif (current == getExclusiveOwnerThread()) { // 重入
int nextc = c + acquires;
if (nextc < 0)
thrownew Error("Maximum lock count exceeded");
setState(nextc);
returntrue;
}
returnfalse;
}
4.2 acquireQueued:队列中的等待与唤醒
finalbooleanacquireQueued(final Node node, int arg){
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
final Node p = node.predecessor(); // 获取前驱节点
// 如果前驱是头节点,尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node); // 获取成功,设置为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否需要park,然后park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 取消获取
}
}
4.3 解锁过程:release
publicfinalbooleanrelease(int arg){
if (tryRelease(arg)) { // 尝试释放
Node h = head; // 获取头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后继节点
returntrue;
}
returnfalse;
}
// 唤醒后继节点
privatevoidunparkSuccessor(Node node){
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除状态
Node s = node.next; // 后继节点
if (s == null || s.waitStatus > 0) { // 如果为空或已取消
s = null;
// 从后向前查找有效的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒线程
}
六、AQS的条件变量:ConditionObject
AQS还提供了条件变量的支持,用于实现等待/通知机制:
publicclassConditionObjectimplementsCondition{
// 条件队列(单向链表)
privatetransient Node firstWaiter;
privatetransient Node lastWaiter;
// 等待方法
publicfinalvoidawait()throws InterruptedException {
if (Thread.interrupted())
thrownew InterruptedException();
Node node = addConditionWaiter(); // 加入条件队列
int savedState = fullyRelease(node); // 完全释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 不在同步队列中
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 清理已取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 通知方法
publicfinalvoidsignal(){
if (!isHeldExclusively())
thrownew IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒条件队列中的第一个节点
}
}

七、AQS的最佳实践与注意事项
7.1 性能优化要点
- 减少CAS操作:在tryAcquire中先进行状态检查,再考虑CAS
- 避免过度阻塞:适当使用tryAcquire和超时机制
- 合理设置state范围:根据业务需求设计state的语义
7.2 常见陷阱
// 错误示例:没有正确处理中断
publicvoidwrongLockMethod(){
lock.lock();
try {
// 业务逻辑
Thread.sleep(1000); // 可能被中断,但没有处理
} finally {
lock.unlock();
}
}
// 正确示例:正确处理中断
publicvoidcorrectLockMethod()throws InterruptedException {
lock.lockInterruptibly(); // 可中断的获取锁
try {
// 业务逻辑
Thread.sleep(1000);
} finally {
lock.unlock();
}
}
7.3 调试技巧
// 通过反射获取AQS内部状态
Field syncField = ReentrantLock.class.getDeclaredField("sync");
syncField.setAccessible(true);
Object sync = syncField.get(lock);
Field stateField = sync.getClass().getSuperclass().getDeclaredField("state");
stateField.setAccessible(true);
int state = (int) stateField.get(sync);
System.out.println("AQS state: " + state);
# 生成线程转储
jstack <pid> > thread_dump.txt
# 查找AQS相关的等待线程
grep -A 5 "AbstractQueuedSynchronizer" thread_dump.txt
八、总结
AQS是Java并发编程的基石,它通过精妙的设计解决了同步问题:
- 状态管理:使用volatile state + CAS操作保证原子性
理解AQS不仅有助于我们更好地使用Java并发工具,还能让我们在遇到复杂的并发问题时,有能力自己设计合适的同步器。