AbstractQueuedSynchronizer 是一个非常重要的类,J.U.C 的很多同步能力(比如锁)都是通过它来实现的。
简介
正如其名,AbstractQueuedSynchronizer 是一个“队列同步器”,可以将它理解为通过队列来实现同步功能的一个工具。
它基于 FIFO 队列,用一个 volatile int 维护同步资源,通过 Unsafe 提供的 CAS 语法结合一定的自旋,实现该 int 的原子性,进一步实现无锁化的同步资源获取。若尝试自旋 CAS 失败,其会通过 LockSupport 类提供的 park API 将线程置为 WAITING 或 TIMED_WAITING 态,在同步资源可用后,使用 unpark API 唤醒线程。与 JVM 层面实现的基于对象监视器和 synchronized 语法的同步不同,AbstractQueuedSynchronizer 是一个 JDK API 层面的同步实现,并且不会造成线程真正的“阻塞”(指 BLOCKED 态),也不申请 mutex lock。
同步器背后的基本思想非常简单,获取同步资源的步骤可以用如下伪代码来描述:
while (拿不到同步资源) {
还没入队则入队;
让当前线程睡眠进入 WAITING 态;
}
如已入队则出队;
释放时:
更新释放同步资源;
if (此次更新可以导致另一个正在“阻塞”的线程拿到同步资源)
唤醒一个或多个线程;
CLH 队列
背景
AbstractQueuedSynchronizer 借鉴了 CLH 队列的思想,CLH 队列基于 CLH 锁算法,该名称来自发明该算法的 Craig、Landin 和 Hagersten。最初,该算法发表于 Craig 名为 《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》 的论文中,而 Doug Lea 大神在 AbstractQueuedSynchronizer 中借鉴了这个算法的思想,不过相比于原论文,AQS 还是作了些改变。
前提条件
CLH 锁有一个前提,或者说它基于一种假设:锁是会很快释放的,“自旋”并不会比“将线程阻塞然后唤醒”消耗更多的计算资源。这一点有点类似 JDK 1.6 中对 JVM synchronized 机制锁的优化,即在获取锁失败时尝试一定程度的自旋,而不是马上进入阻塞态。
这是一种经验上的优化,它可能是多数场景下的良药,但没有绝对意义上的“更优”,和 synchronized 对象监视器相比,还须结合实际场景和测试结果来选择。
源码解析
AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer,后者提供了将线程设置为“当前同步资源”的独享者功能(对应下文的“独占”模式),也就是将资源与线程绑定,ReentrantLock 使用它来提供“可重入”功能。
具体实现很简单,AbstractOwnableSynchronizer 持有一个 Thread 对象,这里不赘述。
state
前面说到,AQS 用一个 volatile int 维护同步资源,这个属性就是 state,AQS 通过 compareAndSetState 方法使用 Unsafe 机制(JNI)无锁 CAS 设置 state 的值。
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
比如在 ReentrantReadWriteLock 中,同步资源 state 就被分成了两部分,高 16 位 short 代表共享资源持有数(读锁),而低 16 位代表独占资源持有数(写锁)。而 ReentrantLock 中代表
重入次数 + 1
,0 代表未被线程持有。Java 9 之后,unsafe 变成了 VarHandle 类型的 STATE,但本质不变。
模板方法
AQS 的子类只需要实现 state 的变更方式即可(至于获取不到怎么办,由 AQS 帮你解决):
- 尝试获取独占资源。成功则返回 true,失败则返回 false:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- 尝试释放独占资源。成功则返回 true,失败则返回 false:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
- 尝试获取共享资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
- 尝试释放共享资源,如果释放后允许唤醒后续等待结点则返回 true,否则返回 false:
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
- 线程是否正在占用资源:
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
说明:这里之所以没有定义成 abstract 方法而是直接抛异常,是因为实现类如果不支持独占模式,那么只需实现
tryAcquireShared()
和tryReleaseShared()
,反之如果不支持共享模式,则只需实现tryAcquire()
和tryRelease()
,而isHeldExclusively()
方法只有内部类 ConditionObject 用上了,除非同步器的使用者需要用上 Condition,否则也不需要在子类中实现。这样子类不必被强制做一些无意义的事情。PS:AbstractQueuedSynchronizer 本身没有 abstract 方法。
Node
AQS 的静态内部类 Node 是 CLH 队列节点的抽象。
static final class Node {
/** 用于体现当前 Shared/Exclusive 模式的两个 Node 对象 */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 四种 Node 的等待状态 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 当前等待状态 */
volatile int waitStatus;
/** 双向链表指针,均为 volatile 修饰的可见 reference */
volatile Node prev;
volatile Node next;
/** Node 所代表的线程 */
volatile Thread thread;
/** 用作 Shared 模式以及 Condition 队列的指针,下文讲述 */
Node nextWaiter;
/**
* 判断当前的模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/**
* 头结点、常量 SHARED
*/
Node() {}
/**
* addWaiter
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* addConditionWaiter
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
两种模式
AQS 队列有两种工作模式:Exclusive 独占和 Shared 共享,Node 使用特定常量(new Node()
和 null
)去标识 nextWaiter
,以表明当前属于何种模式。
独占模式下,获取同步资源时,其他线程将获取失败,而共享模式下,同步资源被多个线程获取可能(但不一定)成功(受限于资源数)。
基于 Condition 来唤醒的情况只可能是独占模式,而这种模式下只需要构建一个简单的线程队列就行了,不需要在
nextWaiter
赋上节点。
五种状态
- CANCELLED:仍在队列中但已超时或其线程被打断,这是终态,线程不会再次阻塞。
- SIGNAL:后继节点已进入 WAITING 或即将被 park,故本节点必须在获取到同步资源后唤醒后继节点。
- CONDITION:在某 Condition 上等待(条件队列专用)。
- PROPAGATE:下次共享模式下的状态获取将被无条件的传播下去。
- 0:初始态。
核心 API 解析
acquire
以独占模式,为线程获取同步资源:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(
addWaiter(Node.EXCLUSIVE), arg)) {
/*
acquireQueued 返回 true 则线程在休眠期间被别的线程打断了,
这里恢复一下 interrupt 标志位
*/
selfInterrupt();
}
/*
这里一定拿到了同步资源,或者抛了异常
*/
}
上文已说过 tryAcquire()
留给子类实现,子类需要判断当前是否支持独占模式,且合理设置共享资源 state,这里的返回值是 boolean 类型,代表获取资源的成功与否。
acquire()
方法通过 tryAcquire()
先试着 acquire 一次,如果失败则将线程排队然后不断尝试 acquire,成功拿到后,根据这个过程是否被打断过来进行 interrupt 的设置,下文详细说明。
这个过程中,当前线程有可能会多次被其它线程 park 而进入 WAITING 态。
addWaiter:不成功,便入队
CAS 入队:
private Node addWaiter(Node mode) {
// 根据模式(共享 or 独占),用当前线程创建队列节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 先尝试一次 CAS 替换 tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
/*
失败的话,说明 tail 尚未初始化,或者有线程并发入队,则
fallback 到 enq
*/
enq(node);
// 返回这个线程的 Node
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq()
与 addWaiter()
的尝试一样,自旋 CAS 设置尾节点,只不过此时可能同步队列尚未使用过,需要初始化一个被 head 和 tail 所共同指向的 Dummy Node,所以 AQS 一旦使用过,同步队列至少有一个节点。
所有队列节点的操作都是 prev 优先,next 滞后(仅 CAS 成功才设置 next),next 节点为 null 时并不代表已在队列尾端。
acquireQueued:睡还是不睡?
核心方法,该方法为已在队列中的线程以排他模式获取同步资源。
此方法不能响应打断(不抛出 InterruptedException),而是将期间被打断与否反映在返回值中,若想立即感知线程中断,请使用
acquireInterruptibly()
。
由于此时代表当前线程的 Node 已在同步队列中,故有两个选择:
- 睡觉
- 尝试获取独占资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
我的前驱节点是 head,那我岂不是 No.2,现在我是醒的,那么可能
是 head 被 interrupt 或者它自己结束了然后叫醒了我,反正我现
在醒着,有资格去获取资源,那就试试呗!
*/
if (p == head && tryAcquire(arg)) {
/*
果然成功了,那我就设置新 head 为自己,取消当前线程与 node
的绑定,清空 新head.prev 和 原head.next
*/
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
/*
没成功,可能我暂时是老二但是上面 tryAcquire 失败了,或者压根儿我不是老二,
这里根据 shouldParkAfterFailedAcquire 的结果判断我是不是可以去睡了,
如果可以,那就 park 直到被其它线程 unpark 唤醒,根据 parkAndCheckInterrupt
方法返回值判断是否被打断过
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 醒来发现 park 期间被打断过,于是设置打断标志
interrupted = true;
// 醒了,或者不能睡,继续循环
}
} finally {
/*
要是最终没拿到(抛了其它异常),则取消这次入队尝试
*/
if (failed)
cancelAcquire(node);
}
}
注:这块代码 JDK 11 与 JDK 8 的源码略有不同,JDK 11 如下:
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
虽略有不同,但行为没有区别,只是加了个 selfInterrupt 设置自己的打断标志位。
该方法保证
acquireQueued()
方法要么抛出异常,要么拿到同步资源,调用者会考虑是否需要关心返回值(是否被打断过)。
可以看到,不但使用过的 AQS 至少有一个节点 head,而且 head.thread 大概率是 null,因为已经获取到同步资源的线程是会从 Node 取消绑定的(除非是 setHead()
中途那极短的一个中间状态)。
shouldParkAfterFailedAcquire()
判断本线程是否可以去睡了:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
而 SIGNAL 态的前驱节点会在拿到资源后唤醒其后继节点,也就是
说本 Node 到时候是会被通知到的,不用担心,可以安心 park
*/
return true;
// ==== 只要前驱节点不是 SIGNAL 态,就不能睡 ====
if (ws > 0) {
/*
前驱节点状态为 CANCELLED,不断往前找,直至找到正常未取消的 Node
设为新 prev(前面提到 next 节点为 null 时并不代表已在队
列尾端,所以需要从后往前找),重试自旋
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 前驱节点状态为 0 或 PROPAGATE,尝试改为 SIGNAL,
* 失败也不要紧,这里本线程并不马上睡去,而是继续尝试自旋,
* 要是 tryAcquire 再失败,再次进本方法时,本线程就可以去睡了
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
总之:状态为 SIGNAL 的节点会在释放资源或者取消后唤醒后续节点,所以只要前驱结点的状态不是 SIGNAL,就不能安心 park。
如果前驱节点不是 SIGNAL,但本线程把它设成了 SIGNAL,则很可能前驱节点马上是 head 了,此时再尝试一下 tryAquire()
说不定就拿到同步状态了。这里是一个性能上的优化,认为前驱节点刚被设置为 SIGNAL,自己没必要马上进入阻塞态,而是自旋再试试,说不定 SIGNAL 的前驱节点马上就获取到了资源然后释放了。
上图的“阻塞”指 park 的 WAITING 态。
release
释放独占模式下的同步资源:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 找到下一个需要唤醒的结点唤醒它
unparkSuccessor(h);
return true;
}
return false;
}
同样,tryRelease()
需要子类实现,返回值为 boolean 类型,如果 state 完全释放,则返回 true,否则返回 false。
那么什么情况下需要唤醒后继节点呢?这里没有先判断后继节点是否存在,而是判断了 waitStatus 是否为 0,如果不为 0,(在独占模式下)可能为 CANCELLED、SIGNAL,则需要 尝试 unparkSuccessor
方法。
unparkSuccessor:叫醒后面的人
private void unparkSuccessor(Node node) {
/*
重置头节点 waitStatus 为 0,失败也无所谓
可能是其它线程改变了其状态
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 往后直到找到非 CANCELLED 状态的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
/*
如果后继节点被取消或为 null,则从后往前找未被取消的节点,
这里还是因为前文提到的 prev 优先于 next 被设置的缘故
*/
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 成功找到了一个这样的后继节点
if (s != null)
// 唤醒节点持有的线程
LockSupport.unpark(s.thread);
}
这个方法在 doReleaseShared()
中也会调用。
acquireShared
以共享模式,为线程获取同步资源:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败
doAcquireShared(arg);
}
tryAcquireShared()
方法需要子类实现,返回负数意味着获取失败,0 意味着当前线程获取成功,但是后续的会失败,因为没有更多资源了,而正数意味着后续的获取也可能成功。
doAcquireShared:有“福”同享
private void doAcquireShared(int arg) {
// 同独占方式中的 addWaiter,添加一个 SHARED 模式的节点到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 这里还是自旋获取前驱节点,然后判断是否为头节点,与独占模式差不多
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
/*
获取成功后,除了自己要设置为 head 之外,
上文说了,如果 r > 0,意味着后续的获取也
可能成功,还需要传播给更多的节点
*/
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 同独占模式
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
与独占模式不同的是,除 setHead()
之外,setHeadAndPropagate()
还需要调用 doReleaseShared()
尝试后续节点的唤醒。
private void setHeadAndPropagate(Node node, int propagate) {
// 老 head 引用
Node h = head;
// 新节点设为 head
setHead(node);
/*
尝试唤醒后面的节点,因为可能:
1. 还有剩余的资源(propagate > 0);
2. setHead 之前和之后,head 都有可能变化,因为 setHead 后,
本方法可能并发,故只要 head 为 null,不管是新 head 还是
老 head,都可以尝试传播一下;
3. 这里判断 head 的状态是否小于 0 而不是明确的某个状态,还是
因为是考虑了并发中,PROPAGATE 状态的也可能已经变成了
SIGNAL,所以统一用小于 0 来判断。
由于这里的保守(考虑了并发的情况),可能会导致没必要的唤醒(有些被唤
醒的线程是拿不到资源的),但是 AQS 的 CLH 算法本身基于争抢较少的
情况,所以无所谓。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
/*
后继节点同为 Shared 模式节点,或者为 null(同样是考虑
next 的设置滞后于 prev),继续 release
*/
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared()
方法也会在 释放 共享资源时被调用,见下文。
releaseShared
释放共享模式下的同步资源:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared:传递醒来的消息
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
/*
若头节点状态为 SIGNAL,即已有线程去唤醒后续节点,
将其设为 INITIAL 初始状态。成功,则唤醒后继,否则
继续循环来检查状态
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
unparkSuccessor(h);
// 若头节点状态为 INITIAL,则将其设为 PROPAGATE,保证 release 的传递
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
}
/*
若 head 被别的线程更改了,即其它线程释放了同步资源,
必须重试循环体保证本次 release 的传播,否则说明没有
要 release 的了,退出循环
*/
if (h == head)
break;
}
}
共享模式下,在自身获取或释放共享资源时,需要唤醒后续的以共享的方式获取资源的节点。
其它 API
acquireInterruptibly:可打断
只是在 parkAndCheckInterrupt()
返回 true 时以“抛异常”取代 acquireQueued()
中的设置 interrupt 标志位。
tryAcquireNanos:实现超时
记录 deadline,每次自旋时判断是否到时间了,且如果剩余时间太长(大于 spinForTimeoutThreshold
,默认 1 毫秒),则以“park 指定时间”来取代“自旋”。
AbstractQueuedLongSynchronizer:更多同步状态
AbstractQueuedLongSynchronizer 作为另一个版本的 AbstractQueuedSynchronizer,与后者具有 完全相同 的结构、属性和方法,不同点只是使用了 64 位的 long 类型的同步状态 state,32 位 int 不够表达资源数量时可以考虑使用它: