AbstractQueuedSynchronizer 浅析

AbstractQueuedSynchronizer 是一个非常重要的类,J.U.C 的很多同步能力(比如锁)都是通过它来实现的。

简介

正如其名,AbstractQueuedSynchronizer 是一个“队列同步器”,可以将它理解为通过队列来实现同步功能的一个工具。

CLH

它基于 FIFO 队列,用一个 volatile int 维护同步资源,通过 Unsafe 提供的 CAS 语法结合一定的自旋,实现该 int 的原子性,进一步实现无锁化的同步资源获取。若尝试自旋 CAS 失败,其会通过 LockSupport 类提供的 park API 将线程置为 WAITING 或 TIMED_WAITING 态,在同步资源可用后,使用 unpark API 唤醒线程。与 JVM 层面实现的基于对象监视器和 synchronized 语法的同步不同,AbstractQueuedSynchronizer 是一个 JDK API 层面的同步实现,并且不会造成线程真正的“阻塞”(指 BLOCKED 态),也不申请 mutex lock。

同步器背后的基本思想非常简单,获取同步资源的步骤可以用如下伪代码来描述:

while (拿不到同步资源) {
    还没入队则入队;
    让当前线程睡眠进入 WAITING 态;
}
如已入队则出队;

释放时:

更新释放同步资源;
if (此次更新可以导致另一个正在“阻塞”的线程拿到同步资源)
    唤醒一个或多个线程;

CLH 队列

背景

AbstractQueuedSynchronizer 借鉴了 CLH 队列的思想,CLH 队列基于 CLH 锁算法,该名称来自发明该算法的 Craig、Landin 和 Hagersten。最初,该算法发表于 Craig 名为 《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》 的论文中,而 Doug Lea 大神在 AbstractQueuedSynchronizer 中借鉴了这个算法的思想,不过相比于原论文,AQS 还是作了些改变。

前提条件

CLH 锁有一个前提,或者说它基于一种假设:锁是会很快释放的,“自旋”并不会比“将线程阻塞然后唤醒”消耗更多的计算资源。这一点有点类似 JDK 1.6 中对 JVM synchronized 机制锁的优化,即在获取锁失败时尝试一定程度的自旋,而不是马上进入阻塞态。

这是一种经验上的优化,它可能是多数场景下的良药,但没有绝对意义上的“更优”,和 synchronized 对象监视器相比,还须结合实际场景和测试结果来选择。

源码解析

AbstractQueuedSynchronizer 继承了 AbstractOwnableSynchronizer,后者提供了将线程设置为“当前同步资源”的独享者功能(对应下文的“独占”模式),也就是将资源与线程绑定,ReentrantLock 使用它来提供“可重入”功能。

具体实现很简单,AbstractOwnableSynchronizer 持有一个 Thread 对象,这里不赘述。

state

前面说到,AQS 用一个 volatile int 维护同步资源,这个属性就是 state,AQS 通过 compareAndSetState 方法使用 Unsafe 机制(JNI)无锁 CAS 设置 state 的值。

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

比如在 ReentrantReadWriteLock 中,同步资源 state 就被分成了两部分,高 16 位 short 代表共享资源持有数(读锁),而低 16 位代表独占资源持有数(写锁)。而 ReentrantLock 中代表 重入次数 + 1,0 代表未被线程持有。

Java 9 之后,unsafe 变成了 VarHandle 类型的 STATE,但本质不变。

模板方法

AQS 的子类只需要实现 state 的变更方式即可(至于获取不到怎么办,由 AQS 帮你解决):

  1. 尝试获取独占资源。成功则返回 true,失败则返回 false:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
  1. 尝试释放独占资源。成功则返回 true,失败则返回 false:

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
  1. 尝试获取共享资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
  1. 尝试释放共享资源,如果释放后允许唤醒后续等待结点则返回 true,否则返回 false:

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
  1. 线程是否正在占用资源:

    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

说明:这里之所以没有定义成 abstract 方法而是直接抛异常,是因为实现类如果不支持独占模式,那么只需实现 tryAcquireShared()tryReleaseShared(),反之如果不支持共享模式,则只需实现 tryAcquire()tryRelease(),而 isHeldExclusively() 方法只有内部类 ConditionObject 用上了,除非同步器的使用者需要用上 Condition,否则也不需要在子类中实现。这样子类不必被强制做一些无意义的事情。

PS:AbstractQueuedSynchronizer 本身没有 abstract 方法。

Node

AQS 的静态内部类 Node 是 CLH 队列节点的抽象。

static final class Node {
    /** 用于体现当前 Shared/Exclusive 模式的两个 Node 对象 */
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    /** 四种 Node 的等待状态 */
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
  
    /** 当前等待状态 */
    volatile int waitStatus;

    /** 双向链表指针,均为 volatile 修饰的可见 reference */
    volatile Node prev;
    volatile Node next;

    /** Node 所代表的线程 */
    volatile Thread thread;

    /** 用作 Shared 模式以及 Condition 队列的指针,下文讲述 */
    Node nextWaiter;

    /**
     * 判断当前的模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    /**
     * 获取前驱节点
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    /**
     * 头结点、常量 SHARED
     */
    Node() {}

    /**
     * addWaiter
     */
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    /**
     * addConditionWaiter
     */
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

两种模式

AQS 队列有两种工作模式:Exclusive 独占和 Shared 共享,Node 使用特定常量(new Node()null)去标识 nextWaiter,以表明当前属于何种模式。

独占模式下,获取同步资源时,其他线程将获取失败,而共享模式下,同步资源被多个线程获取可能(但不一定)成功(受限于资源数)。

基于 Condition 来唤醒的情况只可能是独占模式,而这种模式下只需要构建一个简单的线程队列就行了,不需要在 nextWaiter 赋上节点。

五种状态

  • CANCELLED:仍在队列中但已超时或其线程被打断,这是终态,线程不会再次阻塞。
  • SIGNAL:后继节点已进入 WAITING 或即将被 park,故本节点必须在获取到同步资源后唤醒后继节点。
  • CONDITION:在某 Condition 上等待(条件队列专用)。
  • PROPAGATE:下次共享模式下的状态获取将被无条件的传播下去。
  • 0:初始态。

核心 API 解析

acquire

以独占模式,为线程获取同步资源:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(
        addWaiter(Node.EXCLUSIVE), arg)) {
        /* 
          acquireQueued 返回 true 则线程在休眠期间被别的线程打断了,
          这里恢复一下 interrupt 标志位
         */
        selfInterrupt();
    }
    /*
      这里一定拿到了同步资源,或者抛了异常
     */
}

上文已说过 tryAcquire() 留给子类实现,子类需要判断当前是否支持独占模式,且合理设置共享资源 state,这里的返回值是 boolean 类型,代表获取资源的成功与否。

acquire() 方法通过 tryAcquire() 先试着 acquire 一次,如果失败则将线程排队然后不断尝试 acquire,成功拿到后,根据这个过程是否被打断过来进行 interrupt 的设置,下文详细说明。

这个过程中,当前线程有可能会多次被其它线程 park 而进入 WAITING 态。

addWaiter:不成功,便入队

CAS 入队:

private Node addWaiter(Node mode) {
    // 根据模式(共享 or 独占),用当前线程创建队列节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        // 先尝试一次 CAS 替换 tail
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    /*
      失败的话,说明 tail 尚未初始化,或者有线程并发入队,则
      fallback 到 enq
     */
    enq(node);
    // 返回这个线程的 Node
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq()addWaiter() 的尝试一样,自旋 CAS 设置尾节点,只不过此时可能同步队列尚未使用过,需要初始化一个被 head 和 tail 所共同指向的 Dummy Node,所以 AQS 一旦使用过,同步队列至少有一个节点

所有队列节点的操作都是 prev 优先,next 滞后(仅 CAS 成功才设置 next),next 节点为 null 时并不代表已在队列尾端。

1

2

3

4

acquireQueued:睡还是不睡?

核心方法,该方法为已在队列中的线程以排他模式获取同步资源。

此方法不能响应打断(不抛出 InterruptedException),而是将期间被打断与否反映在返回值中,若想立即感知线程中断,请使用 acquireInterruptibly()

由于此时代表当前线程的 Node 已在同步队列中,故有两个选择:

  • 睡觉
  • 尝试获取独占资源
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /*
              我的前驱节点是 head,那我岂不是 No.2,现在我是醒的,那么可能
              是 head 被 interrupt 或者它自己结束了然后叫醒了我,反正我现
              在醒着,有资格去获取资源,那就试试呗!
             */
            if (p == head && tryAcquire(arg)) {
                /*
                  果然成功了,那我就设置新 head 为自己,取消当前线程与 node
                  的绑定,清空 新head.prev 和 原head.next
                 */
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            /* 
              没成功,可能我暂时是老二但是上面 tryAcquire 失败了,或者压根儿我不是老二,
              这里根据 shouldParkAfterFailedAcquire 的结果判断我是不是可以去睡了,
              如果可以,那就 park 直到被其它线程 unpark 唤醒,根据 parkAndCheckInterrupt
              方法返回值判断是否被打断过
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 醒来发现 park 期间被打断过,于是设置打断标志
                interrupted = true;
            
            // 醒了,或者不能睡,继续循环
        }
    } finally {
        /*
          要是最终没拿到(抛了其它异常),则取消这次入队尝试
         */
        if (failed)
            cancelAcquire(node);
    }
}

注:这块代码 JDK 11 与 JDK 8 的源码略有不同,JDK 11 如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

虽略有不同,但行为没有区别,只是加了个 selfInterrupt 设置自己的打断标志位。

该方法保证 acquireQueued() 方法要么抛出异常,要么拿到同步资源,调用者会考虑是否需要关心返回值(是否被打断过)。

可以看到,不但使用过的 AQS 至少有一个节点 head,而且 head.thread 大概率是 null,因为已经获取到同步资源的线程是会从 Node 取消绑定的(除非是 setHead() 中途那极短的一个中间状态)。

shouldParkAfterFailedAcquire() 判断本线程是否可以去睡了:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的等待状态
    int ws = pred.waitStatus;
    
    if (ws == Node.SIGNAL)
        /*
          而 SIGNAL 态的前驱节点会在拿到资源后唤醒其后继节点,也就是
          说本 Node 到时候是会被通知到的,不用担心,可以安心 park
         */
        return true;
        
    // ==== 只要前驱节点不是 SIGNAL 态,就不能睡 ====
  
    if (ws > 0) {
        /*
          前驱节点状态为 CANCELLED,不断往前找,直至找到正常未取消的 Node
          设为新 prev(前面提到 next 节点为 null 时并不代表已在队
          列尾端,所以需要从后往前找),重试自旋
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* 
         * 前驱节点状态为 0 或 PROPAGATE,尝试改为 SIGNAL,
         * 失败也不要紧,这里本线程并不马上睡去,而是继续尝试自旋,
         * 要是 tryAcquire 再失败,再次进本方法时,本线程就可以去睡了
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

总之:状态为 SIGNAL 的节点会在释放资源或者取消后唤醒后续节点,所以只要前驱结点的状态不是 SIGNAL,就不能安心 park。

如果前驱节点不是 SIGNAL,但本线程把它设成了 SIGNAL,则很可能前驱节点马上是 head 了,此时再尝试一下 tryAquire() 说不定就拿到同步状态了。这里是一个性能上的优化,认为前驱节点刚被设置为 SIGNAL,自己没必要马上进入阻塞态,而是自旋再试试,说不定 SIGNAL 的前驱节点马上就获取到了资源然后释放了。

tryAcquire

上图的“阻塞”指 park 的 WAITING 态。

release

释放独占模式下的同步资源:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 找到下一个需要唤醒的结点唤醒它
            unparkSuccessor(h);
        return true;
    }
    return false;
}

同样,tryRelease() 需要子类实现,返回值为 boolean 类型,如果 state 完全释放,则返回 true,否则返回 false。

那么什么情况下需要唤醒后继节点呢?这里没有先判断后继节点是否存在,而是判断了 waitStatus 是否为 0,如果不为 0,(在独占模式下)可能为 CANCELLED、SIGNAL,则需要 尝试 unparkSuccessor 方法。

unparkSuccessor:叫醒后面的人
private void unparkSuccessor(Node node) {
    /*
       重置头节点 waitStatus 为 0,失败也无所谓
       可能是其它线程改变了其状态
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 往后直到找到非 CANCELLED 状态的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        /*
           如果后继节点被取消或为 null,则从后往前找未被取消的节点,
           这里还是因为前文提到的 prev 优先于 next 被设置的缘故
         */
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 成功找到了一个这样的后继节点
    if (s != null)
        // 唤醒节点持有的线程
        LockSupport.unpark(s.thread);
}

这个方法在 doReleaseShared() 中也会调用。

acquireShared

以共享模式,为线程获取同步资源:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        // 获取失败
        doAcquireShared(arg);
}

tryAcquireShared() 方法需要子类实现,返回负数意味着获取失败,0 意味着当前线程获取成功,但是后续的会失败,因为没有更多资源了,而正数意味着后续的获取也可能成功。

doAcquireShared:有“福”同享
private void doAcquireShared(int arg) {
    // 同独占方式中的 addWaiter,添加一个 SHARED 模式的节点到队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 这里还是自旋获取前驱节点,然后判断是否为头节点,与独占模式差不多
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    /*
                       获取成功后,除了自己要设置为 head 之外,
                       上文说了,如果 r > 0,意味着后续的获取也
                       可能成功,还需要传播给更多的节点
                     */
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 同独占模式
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

与独占模式不同的是,除 setHead() 之外,setHeadAndPropagate() 还需要调用 doReleaseShared() 尝试后续节点的唤醒。

private void setHeadAndPropagate(Node node, int propagate) {
    // 老 head 引用
    Node h = head;
    // 新节点设为 head
    setHead(node);
    /*
       尝试唤醒后面的节点,因为可能:
         1. 还有剩余的资源(propagate > 0);
         2. setHead 之前和之后,head 都有可能变化,因为 setHead 后,
            本方法可能并发,故只要 head 为 null,不管是新 head 还是
            老 head,都可以尝试传播一下;
         3. 这里判断 head 的状态是否小于 0 而不是明确的某个状态,还是
            因为是考虑了并发中,PROPAGATE 状态的也可能已经变成了
            SIGNAL,所以统一用小于 0 来判断。

       由于这里的保守(考虑了并发的情况),可能会导致没必要的唤醒(有些被唤
       醒的线程是拿不到资源的),但是 AQS 的 CLH 算法本身基于争抢较少的
       情况,所以无所谓。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        /*
           后继节点同为 Shared 模式节点,或者为 null(同样是考虑
           next 的设置滞后于 prev),继续 release
         */
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared() 方法也会在 释放 共享资源时被调用,见下文。

releaseShared

释放共享模式下的同步资源:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
doReleaseShared:传递醒来的消息
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            /*
               若头节点状态为 SIGNAL,即已有线程去唤醒后续节点,
               将其设为 INITIAL 初始状态。成功,则唤醒后继,否则
               继续循环来检查状态
             */
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                    continue;
                }
                unparkSuccessor(h);
            // 若头节点状态为 INITIAL,则将其设为 PROPAGATE,保证 release 的传递
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                continue;
            }
        }
        /*
           若 head 被别的线程更改了,即其它线程释放了同步资源,
           必须重试循环体保证本次 release 的传播,否则说明没有
           要 release 的了,退出循环
         */
        if (h == head)
            break;
    }
}

共享模式下,在自身获取或释放共享资源时,需要唤醒后续的以共享的方式获取资源的节点。

其它 API

acquireInterruptibly:可打断

只是在 parkAndCheckInterrupt() 返回 true 时以“抛异常”取代 acquireQueued() 中的设置 interrupt 标志位。

doAcquireInterruptibly

tryAcquireNanos:实现超时

记录 deadline,每次自旋时判断是否到时间了,且如果剩余时间太长(大于 spinForTimeoutThreshold,默认 1 毫秒),则以“park 指定时间”来取代“自旋”。

doAcquireNanos

AbstractQueuedLongSynchronizer:更多同步状态

AbstractQueuedLongSynchronizer 作为另一个版本的 AbstractQueuedSynchronizer,与后者具有 完全相同 的结构、属性和方法,不同点只是使用了 64 位的 long 类型的同步状态 state,32 位 int 不够表达资源数量时可以考虑使用它:

AbstractQueuedLongSynchronizer