ReentrantLock 与 AQS

借助 AbstractQueuedSynchronizer,ReentrantLock 很方便地实现了 Java API 层面的“可重入锁”逻辑。

ReentrantLock 是 JDK 实现的 API 层面上的可重入锁,实现了 Lock 接口,提供 timeout、Condition、公平非公平等功能特性,内部使用 AQS 作为同步器,是并发编程中除 JVM synchronized 语义对象监视器实现外的另一个选择。其源码很简单,因为 AQS 做的实在太多了。

内部的 AQS 实现 - Sync

ReentrantLock 唯一的成员变量就是一个 Sync:

private final Sync sync;

这个 Sync 是 AQS 进一步的抽象实现类:

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * 此方法会尝试一次直接 CAS 更改同步资源数 state 并立即返回成功与否,
     * 所以是“非公平地”尝试获取锁,这里的参数 acquire 在本类“锁”语义下就是 1
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // state 代表“重入次数 + 1”,0 代表未被线程持有
            if (compareAndSetState(0, acquires)) {
                // 直接将本线程设为当前独占线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 本线程就是当前占用锁的线程,由于可重入,这里 state 累加即可
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // int 溢出,太多次重入了
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    /**
     * 按 AQS 要求进行实现,这里是直接修改 state,
     * 只有 state 完全释放才返回 true
     */
    protected final boolean tryRelease(int releases) {
        // 释放资源就是减操作
        int c = getState() - releases;
        // 自己的锁自己释放,别人没权力
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // state 如果为 0,则意味着锁已释放,将当前独占线程置空,否则意味着退出一次重入
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    /**
     * 是否当前线程独占,AbstractOwnableSynchronizer 持有一个 thread,
     * 表示当前独占的线程,这个方法也是实现 AQS 模版方法
     */
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    /**
     * 可以看到,在ReentrantLock 中,Condition 的实现类就是 AQS 的内部类 ConditionObject
     */
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // ==== 以下方法提供给外面的 ReentrantLock 调用 ====

    /**
     * 获取锁的持有者(当前独占线程)
     */
    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    /**
     * 获取进行持有操作的次数,也就是 state,即“重入次数 + 1”
     */
    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    /**
     * 是否“锁住”
     */
    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * 反序列化,主要重置了 state(序列化再反序列化,线程状况早都不一样了)
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0);
    }
}

如何体现“公平”

我们知道,AQS 的 acquire() 方法会在 tryAcquire() 后回退到 CLH 队列排队,而“怎么实现 tryAcquire() ”决定了锁的“公平”与否。

/**
 * “非公平”的 Sync 实现
 */
static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        // 不作任何判断,上来就尝试一次“插队”,体现“非公平”
        return nonfairTryAcquire(acquires);
    }
}

/**
 * “公平”的 Sync 实现
 */
static final class FairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 最重要的是,即使“没锁住”,也要遵守规矩排队,队里没其它线程的情况下才会尝试 CAS
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

可以看到,虽然“非公平”的 NonfairSync 也会来一轮 FairSync 里类似的判断,但是差别最大的是,FairSync 会查看队伍的排队情况,也即判断 hasQueuedPredecessors(),这才是真正“公平”和“非公平”的不同点。

常用的 ReentrantLock 空参构造器选的是“不公平”的 Sync 实现:

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

“锁”语义相关 API

有了上面的基础,锁语义相关的 API 就很简单了:

/**
 * 获取锁,可重入,不可打断
 * 若锁被其他线程锁住了,则进入线程调度
 * 对比 synchronized 语义,此方法不会阻塞(WAITING 取代 BLOCKING)
 */
public void lock() {
    // acquire 为 AQS 实现,其中调用了子类实现的 tryAcquire()
    sync.acquire(1);
}

/**
 * 获取锁,可重入,可打断
 * 详见 AQS 篇
 */
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

/**
 * 不管 Sync 的类型,“非公平”地尝试获取锁,立即返回
 */
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

/**
 * 不管 Sync 的类型,“非公平”地尝试获取锁,尝试一段时间
 */
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

/**
 * 解锁,release 虽有返回值,但是不用关心,因为只要返回了,一定成功 release 了
 * 即使一开始没 release,也是 WAITING 切走后被唤醒,然后成功了
 */
public void unlock() {
    sync.release(1);
}

/**
 * 这没什么说的,就是获取 ConditionObject
 */
public Condition newCondition() {
    return sync.newCondition();
}

// ==== 其它 API 都是 Sync 或 AQS 中的方法,见名知意,这里不赘述 ====
public int getHoldCount() {...}
public boolean isHeldByCurrentThread() {...}
public boolean isLocked() {...}
public final boolean isFair() {...}
protected Thread getOwner() {...}
public final boolean hasQueuedThreads() {...}
public final boolean hasQueuedThread(Thread thread) {...}
public final int getQueueLength() {...}
protected Collection<Thread> getQueuedThreads() {...}
public boolean hasWaiters(Condition condition) {...}
public int getWaitQueueLength(Condition condition) {...}
protected Collection<Thread> getWaitingThreads(Condition condition) {...}
public String toString() {...} // toString 方法可以方便的看到“锁住”状态

最佳实践

最佳实践是在 lock() 后一定 unlock()

Lock locker = new ReentrantLock();
locker.lock();
try {
    // ...
} finally {
    locker.unlock();
}

根据不同情况,可以使用带超时时间的 lock() 和能检测打断动作的 lockInterruptibly()

总结

ReentrantLock 本身并没有什么代码,只是实现了公平和非公平地 tryAcquire(),然后利用 AQS 同步器的独占模式去获取同步状态,并且利用 AbstractOwnableSynchronizer 提供的机制实现“可重入”。