ReentrantLock 与 AQS

借助 AbstractQueuedSynchronizer,ReentrantLock 很方便地实现了 Java API 层面的“可重入锁”逻辑。

ReentrantLock 是 JDK 实现的 API 层面上的可重入锁,实现了 Lock 接口,提供 timeout、Condition、公平非公平等功能特性,内部使用 AQS 作为同步器,是并发编程中除 JVM synchronized 语义对象监视器实现外的另一个选择。其源码很简单,因为 AQS 做的实在太多了。

内部的 AQS 实现 - Sync

ReentrantLock 唯一的成员变量就是一个 Sync:

private final Sync sync;

这个 Sync 是 AQS 进一步的抽象实现类:

abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 此方法会尝试一次直接 CAS 更改同步资源数 state 并立即返回成功与否,
* 所以是“非公平地”尝试获取锁,这里的参数 acquire 在本类“锁”语义下就是 1
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// state 代表“重入次数 + 1”,0 代表未被线程持有
if (compareAndSetState(0, acquires)) {
// 直接将本线程设为当前独占线程
setExclusiveOwnerThread(current);
return true;
}
}
// 本线程就是当前占用锁的线程,由于可重入,这里 state 累加即可
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // int 溢出,太多次重入了
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

/**
* 按 AQS 要求进行实现,这里是直接修改 state,
* 只有 state 完全释放才返回 true
*/
protected final boolean tryRelease(int releases) {
// 释放资源就是减操作
int c = getState() - releases;
// 自己的锁自己释放,别人没权力
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// state 如果为 0,则意味着锁已释放,将当前独占线程置空,否则意味着退出一次重入
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

/**
* 是否当前线程独占,AbstractOwnableSynchronizer 持有一个 thread,
* 表示当前独占的线程,这个方法也是实现 AQS 模版方法
*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

/**
* 可以看到,在ReentrantLock 中,Condition 的实现类就是 AQS 的内部类 ConditionObject
*/
final ConditionObject newCondition() {
return new ConditionObject();
}

// ==== 以下方法提供给外面的 ReentrantLock 调用 ====

/**
* 获取锁的持有者(当前独占线程)
*/
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

/**
* 获取进行持有操作的次数,也就是 state,即“重入次数 + 1”
*/
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

/**
* 是否“锁住”
*/
final boolean isLocked() {
return getState() != 0;
}

/**
* 反序列化,主要重置了 state(序列化再反序列化,线程状况早都不一样了)
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}

如何体现“公平”

我们知道,AQS 的 acquire() 方法会在 tryAcquire() 后回退到 CLH 队列排队,而“怎么实现 tryAcquire() ”决定了锁的“公平”与否。

/**
* “非公平”的 Sync 实现
*/
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 不作任何判断,上来就尝试一次“插队”,体现“非公平”
return nonfairTryAcquire(acquires);
}
}

/**
* “公平”的 Sync 实现
*/
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 最重要的是,即使“没锁住”,也要遵守规矩排队,队里没其它线程的情况下才会尝试 CAS
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以看到,虽然“非公平”的 NonfairSync 也会来一轮 FairSync 里类似的判断,但是差别最大的是,FairSync 会查看队伍的排队情况,也即判断 hasQueuedPredecessors(),这才是真正“公平”和“非公平”的不同点。

常用的 ReentrantLock 空参构造器选的是“不公平”的 Sync 实现:

public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

“锁”语义相关 API

有了上面的基础,锁语义相关的 API 就很简单了:

/**
* 获取锁,可重入,不可打断
* 若锁被其他线程锁住了,则进入线程调度
* 对比 synchronized 语义,此方法不会阻塞(WAITING 取代 BLOCKING)
*/
public void lock() {
// acquire 为 AQS 实现,其中调用了子类实现的 tryAcquire()
sync.acquire(1);
}

/**
* 获取锁,可重入,可打断
* 详见 AQS 篇
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

/**
* 不管 Sync 的类型,“非公平”地尝试获取锁,立即返回
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

/**
* 不管 Sync 的类型,“非公平”地尝试获取锁,尝试一段时间
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

/**
* 解锁,release 虽有返回值,但是不用关心,因为只要返回了,一定成功 release 了
* 即使一开始没 release,也是 WAITING 切走后被唤醒,然后成功了
*/
public void unlock() {
sync.release(1);
}

/**
* 这没什么说的,就是获取 ConditionObject
*/
public Condition newCondition() {
return sync.newCondition();
}

// ==== 其它 API 都是 Sync 或 AQS 中的方法,见名知意,这里不赘述 ====
public int getHoldCount() {...}
public boolean isHeldByCurrentThread() {...}
public boolean isLocked() {...}
public final boolean isFair() {...}
protected Thread getOwner() {...}
public final boolean hasQueuedThreads() {...}
public final boolean hasQueuedThread(Thread thread) {...}
public final int getQueueLength() {...}
protected Collection<Thread> getQueuedThreads() {...}
public boolean hasWaiters(Condition condition) {...}
public int getWaitQueueLength(Condition condition) {...}
protected Collection<Thread> getWaitingThreads(Condition condition) {...}
public String toString() {...} // toString 方法可以方便的看到“锁住”状态

最佳实践

最佳实践是在 lock() 后一定 unlock()

Lock locker = new ReentrantLock();
locker.lock();
try {
// ...
} finally {
locker.unlock();
}

根据不同情况,可以使用带超时时间的 lock() 和能检测打断动作的 lockInterruptibly()

总结

ReentrantLock 本身并没有什么代码,只是实现了公平和非公平地 tryAcquire(),然后利用 AQS 同步器的独占模式去获取同步状态,并且利用 AbstractOwnableSynchronizer 提供的机制实现“可重入”。