Condition 源码浅析

Condition API 方便了 JDK 层的锁等待逻辑,实现了 JDK 层的“对象监视器”。

对象监视器与线程通信

在使用 JVM 对象监视器提供的锁语义时,线程通信是通过 Object 类中的 wait()notify() 等方法实现的,如下例所示:

public static void main(String[] args) throws InterruptedException {
    Object locker = new Object();
    AtomicInteger resource = new AtomicInteger();
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                // 让 main 先 release
                Thread.sleep(100L);
            } catch (InterruptedException ignored) {
            }
            synchronized (locker) {
                while (resource.get() <= 0) {
                    try {
                        locker.wait();
                    } catch (InterruptedException ignored) {
                    }
                }
                resource.decrementAndGet();
                System.out.println(Thread.currentThread().getName() + " got one.");
                locker.notifyAll();
            }
        }, "Thread-" + i).start();
    }
    synchronized (locker) {
        for (int i = 0; i < 3; i++) {
            System.out.println("Main release one.");
            resource.incrementAndGet();
            locker.notifyAll();
            while (resource.get() > 0) {
                locker.wait();
            }
            Thread.sleep(1000L);
        }
    }
}

image-20200617023251364

我们知道,以上代码需要使用 synchronized 关键字提供的语义,那么当我们使用 AbstractQueuedSynchronizer 的同步语义时,是否也有类似的 API 功能呢?

答案当然是有的。

Lock + Condition 的实现方式

JDK 将 API 层面的锁等待条件抽象为 Condition 接口,通过它实现 “锁等待队列” 的效果。如果说 Lock 替换了 synchronized,则 Condition 替换了 Object 中的 wait()notify() 等方法。

使用 Condition 替换上文实现:

public static void main(String[] args) throws InterruptedException {
    Lock locker = new ReentrantLock();
    AtomicInteger resource = new AtomicInteger();
    // 获取一个与 locker 绑定的 Condition 对象
    Condition cond = locker.newCondition();
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException ignored) {
            }
            locker.lock();
            try {
                while (resource.get() <= 0) {
                    try {
                        cond.await();
                    } catch (InterruptedException ignored) {
                    }
                }
                resource.decrementAndGet();
                System.out.println(Thread.currentThread().getName() + " got one.");
                cond.signalAll();
            } finally {
                locker.unlock();
            }
        }, "Thread-" + i).start();
    }
    locker.lock();
    try {
        for (int i = 0; i < 3; i++) {
            System.out.println("Main release one.");
            resource.incrementAndGet();
            cond.signalAll();
            while (resource.get() > 0) {
                cond.await();
            }
            Thread.sleep(1000L);
        }
    } finally {
        locker.unlock();
    }
}

结果相同:

image-20200617023425916

源码解析

我们以 AbstractQueuedSynchronizer 的内部类 ConditionObject 为主进行源码解析。

ConditionObject 维护一个 单向双端 队列用以实现线程排队,其持有的两个引用都是 AQS 的内部类 Node 类型,分别代表 condition 队列的头和尾:

private transient Node firstWaiter;
private transient Node lastWaiter;

image-20200617004619344

这里为了不增加复杂性,源码中直接借用了 AQS 的 CLH 队列节点类作为 Condition 队列的节点类,在 Node 类的状态中也维护了对应的 status - CONDITION,参考 AQS 的笔记。

只可能在独占模式下使用 Condition。

await

public final void await() throws InterruptedException {
    // 检查 interrupt
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 向 condition 队列添加节点
    Node node = addConditionWaiter();
    
    // 从 AQS 队列中 release 当前节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环判断当前节点是否在 AQS 的同步队列中
    while (!isOnSyncQueue(node)) {
        // 没在同步队列中,意味着需要“await”,阻塞当前线程
        LockSupport.park(this);
        /*
          若被唤醒之前就已被打断则返回 THROW_IE,若唤醒后
          被打断返回 REINTERRUPT,否则返回 0,不为 0 则
          意味着被打断过,直接跳出循环
         */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // acquireQueued 时也被打断且在唤醒之前没被打断过,则设为 REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        // 清理 cancelled 节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 根据 interruptMode 选择抛出被打断异常还是重新设置打断标志位
        reportInterruptAfterWait(interruptMode);
}

这里的 while 循环判断是为了避免 虚假唤醒,因为线程可能由于别的 Condition 而唤醒,但是不满足当前 Condition 的唤醒条件,所以要判断是否在当前 AQS 的 CLH 队列中。

acquireQueued() 方法的解析见之前的文章。

addConditionWaiter

private Node addConditionWaiter() {
    // 队尾
    Node t = lastWaiter;
    // 清除队列中的已取消节点,拿到正常的队尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 构建当前线程对应的 Node 类
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 节点接到队列尾部,特别地,当队列为空时,同设为头节点
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
    // 头节点
    Node t = firstWaiter;
    // 保留的上个遍历节点的指针
    Node trail = null;
    // 完整遍历
    while (t != null) {
        // 下一个节点
        Node next = t.nextWaiter;
        // 当前节点非 Condition 状态
        if (t.waitStatus != Node.CONDITION) {
            // 后继指针置空
            t.nextWaiter = null;
            // 通过 trail 来“越过”已取消的节点
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

由于可能出现因等待节点取消或超时而导致 signal 方法不被调用的情况,故每次 unlink 都需要完整遍历一遍队列。

fullyRelease

这个方法将独占模式已获取的同步资源全部释放。

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

release() 方法的解析见之前的文章。

isOnSyncQueue

Condition 队列头节点若正处于 acquire 同步队列的状态,则返回 true。

final boolean isOnSyncQueue(Node node) {
    /* 
       状态为 CONDITION 或者前驱节点不存在(注意这个 prev 是
       用于同步队列的),则意味着当前节点没被唤醒而进入同步队列,
       仍然需要阻塞
     */
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 有后继节点则一定在同步队列中,可以尝试 acquire 了
    if (node.next != null)
        return true;
    // 在同步队列中寻找
    return findNodeFromTail(node);
}

image-20200617022801661

signal

把等的最久的那个线程的节点从 Condition 队列移回同步队列。

public final void signal() {
    // 当前线程并未独占同步资源(Lock),无权 signal,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // Condition 队列头节点,即等的最久的那个
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal

private void doSignal(Node first) {
    do {
        // firstWaiter 指向当前头节点的后继节点,若为 null 则置空 lastWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 断开 first
        first.nextWaiter = null;
    // 若被提前 cancel 则继续往后找
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
transferForSignal
final boolean transferForSignal(Node node) {
    // 失败意味着状态为 CANCELLED,返回 false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
       此时状态正常的 Node 已找到,并且已经从 Condition 队列移除,
       将其转到同步队列,如果前驱节点状态不正常(CANCELLED)或者设置
       其状态为 SIGNAL 失败,则唤醒 Node 的线程以再次同步(短暂的状
       态值不准确不要紧)
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

image-20200617022640927