AbstractQueuedSynchronizer 浅析

AQS 一个是非常重要的类,JUC 很多功能都是借由它的帮助来实现本身对同步的需求。

简介

AbstractQueuedSynchronizer 类如其名,是一个“队列同步器”,可以理解为通过队列来实现同步的一个工具,它基于 FIFO 队列,用单个原子的 int 维护同步状态(或者说同步资源),使用自旋通过 Unsafe 提供的硬件级别 CAS 实现无锁化资源获取,在尝试自旋 CAS 失败后通过 LockSupport 提供的 park 机制实现线程睡眠,并在 API 层实现唤醒逻辑以达到 block/unblock 效果。

同步器背后的基本思想非常简单,acquire 操作如下:

1
2
3
4
5
while (拿不到同步资源) {
还没入队则入队;
让当前线程睡眠进入 WAITING 态;
}
如已入队则出队;

release 操作如下:

1
2
3
更新释放同步资源;
if (此次更新可以导致另一个正在“阻塞”的线程拿到同步资源)
唤醒一个或多个线程;

背景

CLH

AbstractQueuedSynchronizer 使用 CLH 队列实现同步,CLH 队列基于 CLH 锁算法,这三个字母是该自旋锁算法的提出者名字首字母(共三人,分别是 Craig、Landin 和 Hagersten),最早这个思想发表于 Craig 的论文 《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》,Doug Lea 写的 AbstractQueuedSynchronizer 实际上借鉴了这个算法的思想,但本身改动还是挺大的。

前提条件

CLH 锁有一个前提,或者说它基于一种场景,即:锁是会很快释放的,“自旋”并不会比“将线程阻塞然后唤醒”消耗更多的计算资源。这一点有点类似 JDK1.6 对 JVM 的锁优化:在获取锁失败时尝试一定的自旋获取,而不是马上进入阻塞状态等待释放锁的线程唤醒它。

这是一种经验上的优化,它可能是多数场景下的良药,但并没有绝对意义上的“好坏”。

源码解析

AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer,后者提供线程专享功能,这在 ReentrantLock 中被用来提供“可重入”功能。

state

前面说到,AQS 用单个原子的 int 维护同步状态,即 state 成员变量,compareAndSetState 方法使用 Unsafe 机制(JNI)无锁 CAS 设置 state 的值。

1
2
3
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

子类(同步器实现)只需要实现资源同步状态 state 的变更方式即可(至于获取不到怎么办,由 AQS 帮你解决):

  1. 尝试获取独占资源。成功则返回 true,失败则返回 false:

    1
    2
    3
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }
  1. 尝试释放独占资源。成功则返回 true,失败则返回 false:

    1
    2
    3
    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }
  1. 尝试获取共享资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源:

    1
    2
    3
    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }
  1. 尝试释放共享资源,如果释放后允许唤醒后续等待结点则返回 true,否则返回 false:

    1
    2
    3
    protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
    }
  1. 线程是否正在占用资源:

    1
    2
    3
    protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
    }

之所以没有定义成 abstract 方法而是直接抛异常,是因为实现类如果不支持独占模式,那么只需实现 tryAcquireShared 和 tryReleaseShared,如果不支持共享模式,则只需实现 tryAcquire 和 tryRelease,而 isHeldExclusively 方法只有内部类 ConditionObject 用上了,除非同步器的使用者需要用上 Condition,否则也不需要在子类中实现,这样能够不强制子类做一些事情,而在 API 层规定模板方法,更加灵活。

AbstractQueuedSynchronizer 本身没有 abstract 方法。

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static final class Node {
/** 标记模式所用的节点:Shared/Exclusive */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;


/** 枚举四种等待状态 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;


// ==================== 双向链表 ====================
volatile Node prev;
volatile Node next;


/** 绑定的线程 */
volatile Thread thread;



/** 用作 Shared 模式以及 Condition 队列 */
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}


/**
* 拿到前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}


// ==================== 构造器 ====================
// 用以建立头结点或者 Shared 模式的 Marker
Node() {}
// addWaiter 时使用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// addConditionWaiter 时使用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS

两种模式

队列有两种模式:Exclusive(独占)模式和 Shared(共享)模式,Node 使用特定常量(new Node()null)去标识 nextWaiter,以表明当前属于何种模式。

基于 Condition 只可能是独占模式,这种模式下只需要构建一个简单的线程队列就行了,不需要在 nextWaiter 赋上节点。

五种状态

  • CANCELLED:仍在队列中但已超时或线程被打断,这是终态,线程不会再次阻塞。
  • SIGNAL:后继节点已 park(或即将 park),本节点必须在获取到同步状态后 unpark 后继节点。
  • CONDITION:等待在 Condition 上(ConditionObject 队列节点专用)。
  • PROPAGATE:下次共享模式下的状态获取将被无条件的传播下去。
  • 0:初始态。

acquire

以独占模式,为线程获取同步资源:

1
2
3
4
5
6
7
8
9
10
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(
addWaiter(Node.EXCLUSIVE), arg)) {
/*
acquireQueued 返回 true 则线程在休眠期间被别的线程打断了,
这里恢复一下 interrupt 标志位
*/
selfInterrupt();
}
}

上文已说过 tryAcquire() 留给子类实现,子类需要判断当前是否支持独占模式,且合理设置共享资源 state,这里的返回值是 boolean 类型,代表获取资源的成功与否。

acquire() 方法相当于试一次 tryAcquire(),如果失败则将线程排队然后不断尝试获取锁,成功拿到后,根据这个过程是否被打断过来进行 interrupt 的设置,下文详细说明。

addWaiter:不成功,便入队

CAS 入队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Node addWaiter(Node mode) {
// 根据模式(共享 or 独占),用当前线程创建队列节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 先尝试一次 CAS 替换 tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
/*
失败的话,说明 tail 尚未初始化,或者有线程并发入队,则
fallback 到 enq
*/
enq(node);
// 返回这个线程的 Node
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

enq()addWaiter() 的尝试一样,自旋 CAS 设置尾节点,只不过此时可能同步队列尚未使用过,需要初始化一个 head 和 tail 共同指向的 Dummy Node(new Node()),所以 AQS 一旦使用过,同步队列至少有一个节点。

所有队列节点的操作都是 prev 优先,next 滞后(仅 CAS 成功才设置 next),next 节点为 null 时并不代表已在队列尾端。

1

2

3

4

acquireQueued:睡还是不睡?

核心方法。此时线程已在队列中,它有两个选择:睡觉 or 尝试获取独占资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
我的前驱节点是 head,那我岂不是是 No.2,我是醒的,可能是 head
被 interrupt 或者它自己好了,然后叫醒了我,反正我现在有资格去
尝试获取资源!
*/
if (p == head && tryAcquire(arg)) {
/*
成功!设置新 head 为自己,取消当前线程与 node 的绑定,
清空原 head 的 prev 指针,让原 head 尽快 GC 掉
*/
setHead(node);
// 先 prev 后 next
p.next = null;
failed = false;
return interrupted;
}
/*
老二失败了,或者我还不是老二,经判断我可以去睡了,
那就 park 进入 Waiting 态直至被 unpark 唤醒或者 interrupt
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// park 期间被打断(parkAndCheckInterrupt 返回 true)
interrupted = true;

// 其他情况继续循环:
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看到,不但使用过的 AQS 至少有一个节点 head,而且 head.thread 大概率是 null,因为已经获取到同步资源的线程是会从 Node 取消绑定的(除非是 setHead() 中途那极短的一个中间状态)。

判断是否本节点(线程)可以休息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的等待状态
int ws = pred.waitStatus;

if (ws == Node.SIGNAL)
/*
只要前驱节点不是 SIGNAL 态,就 return false 继续试,
否则前驱节点已被设为 SIGNAL,即前驱节点已被唤醒,并且在“拿到
资源后会唤醒其后继节点”,也就是说本 Node 到时候是会被通知
到的,不用担心,所以可以 park
*/
return true;

if (ws > 0) {
/*
前驱节点取消了,只有不断往前找,直至找到正常未取消的 Node
设为新 prev(前面提到 next 节点为 null 时并不代表已在队
列尾端,所以需要从后往前找)
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 前驱节点状态为 0 或者 PROPAGATE,则将其状态 CAS 成 SIGNAL
* 但本线程并不马上阻塞,而是继续尝试自旋查看前驱是否为 head
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

总之:状态为 SIGNAL 的节点会在释放资源或者取消后唤醒后续节点,所以只要前驱结点的状态不是 SIGNAL,就不能安心 park。

如果前驱节点不是 SIGNAL,但本线程把它设成了 SIGNAL,则很可能前驱节点马上是 head 了,此时再尝试一下 tryAquire() 说不定就拿到同步状态了。这里是一个性能上的优化,认为前驱节点刚被设置为 SIGNAL,自己没必要马上进入阻塞态,而是自旋再试试,说不定 SIGNAL 的前驱节点马上就获取到了资源然后释放了。

如果前驱节点是 SIGNAL,则正常让线程 park,进入 Waiting 阻塞态:

1
2
3
4
5
6
7
8
9
private final boolean parkAndCheckInterrupt() {
/**
* 利用 Unsafe 机制阻塞线程,并把 blocker 设为
* AQS 自己(以允许监控和诊断工具识别线程是被 AQS 阻塞的)
*/
LockSupport.park(this);
// 返回是否线程被打断
return Thread.interrupted();
}

tryAcquire

稍微总结一下就是:一旦头节点获取同步状态成功后,其后继节点就会被唤醒,自旋地尝试获取同步状态(因为一旦头节点获取成功,意味着释放同步状态也就变的可能了)。

若想立即感知线程中断,需要调用 acquireInterruptibly()

release

释放独占模式下的同步资源:

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 找到下一个需要唤醒的结点唤醒它
unparkSuccessor(h);
return true;
}
return false;
}

同样,tryRelease() 需要子类实现,返回值为 boolean 类型,如果 state 完全释放,则返回 true,否则返回 false。

那么什么情况下需要唤醒后继节点呢?这里没有先判断后继节点是否存在,而是判断了 waitStatus 是否为 0,如果不为 0,(在独占模式下)可能为 CANCELLED、SIGNAL,则需要 尝试 unparkSuccessor 方法。

unparkSuccessor:叫醒后面的人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
重置头节点 waitStatus 为 0,失败也无所谓
可能是其它线程改变了其状态
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 往后直到找到非 CANCELLED 状态的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
/*
如果后继节点被取消或为 null,则从后往前找未被取消的节点,
这里还是因为前文提到的 prev 优先于 next 被设置的缘故
*/
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 成功找到了一个这样的后继节点
if (s != null)
// 唤醒节点持有的线程
LockSupport.unpark(s.thread);
}

这个方法在 doReleaseShared() 中也会调用。

acquireShared

以共享模式,为线程获取同步资源:

1
2
3
4
5
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败
doAcquireShared(arg);
}

tryAcquireShared() 方法需要子类实现,返回负数意味着获取失败,0 意味着当前线程获取成功,但是后续的会失败,因为没有更多资源了,而正数意味着后续的获取也可能成功。

doAcquireShared:有“福”同享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void doAcquireShared(int arg) {
// 同独占方式中的 addWaiter,添加一个 SHARED 模式的节点到队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 这里还是自旋获取前驱节点,然后判断是否为头节点,与独占模式差不多
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
/*
获取成功后,除了自己要设置为 head 之外,
上文说了,如果 r > 0,意味着后续的获取也
可能成功,还需要传播给更多的节点
*/
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 同独占模式
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

与独占模式不同的是,除 setHead() 之外,setHeadAndPropagate() 还需要调用 doReleaseShared() 尝试后续节点的唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void setHeadAndPropagate(Node node, int propagate) {
// 老 head 引用
Node h = head;
// 新节点设为 head
setHead(node);
/*
尝试唤醒后面的节点,因为可能:
1. 还有剩余的资源(propagate > 0);
2. setHead 之前和之后,head 都有可能变化,因为 setHead 后,
本方法可能并发,故只要 head 为 null,不管是新 head 还是
老 head,都可以尝试传播一下;
3. 这里判断 head 的状态是否小于 0 而不是明确的某个状态,还是
因为是考虑了并发中,PROPAGATE 状态的也可能已经变成了
SIGNAL,所以统一用小于 0 来判断。

由于这里的保守(考虑了并发的情况),可能会导致没必要的唤醒(有些被唤
醒的线程是拿不到资源的),但是 AQS 的 CLH 算法本身基于争抢较少的
情况,所以无所谓。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
/*
后继节点同为 Shared 模式节点,或者为 null(同样是考虑
next 的设置滞后于 prev),继续 release
*/
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

doReleaseShared() 方法也会在 释放 共享资源时被调用,见下文。

releaseShared

释放共享模式下的同步资源:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

doReleaseShared:传递醒来的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
/*
若头节点状态为 SIGNAL,即已有线程去唤醒后续节点,
将其设为 INITIAL 初始状态。成功,则唤醒后继,否则
继续循环来检查状态
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
unparkSuccessor(h);
// 若头节点状态为 INITIAL,则将其设为 PROPAGATE,保证 release 的传递
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
}
/*
若 head 被别的线程更改了,即其它线程释放了同步资源,
必须重试循环体保证本次 release 的传播,否则说明没有
要 release 的了,退出循环
*/
if (h == head)
break;
}
}

共享模式下,在自身获取或释放共享资源时,需要唤醒后续的以共享的方式获取资源的节点。

其它

acquireInterruptibly:可打断

只是在 parkAndCheckInterrupt() 返回 true 时以“抛异常”取代 acquireQueued() 中的设置 interrupt 标志位。

doAcquireInterruptibly

tryAcquireNanos:实现超时

记录 deadline,每次自旋时判断是否到时间了,且如果剩余时间太长(大于 spinForTimeoutThreshold,默认 1 毫秒),则以“park 指定时间”来取代“自旋”。

doAcquireNanos

AbstractQueuedLongSynchronizer:更多同步状态

AbstractQueuedLongSynchronizer 作为另一个版本的 AbstractQueuedSynchronizer,与后者具有 完全相同 的结构、属性和方法,不同点只是使用了 64 位的 long 类型的同步状态 state,32 位 int 不够表达资源数量时可以考虑使用它:

AbstractQueuedLongSynchronizer