线程安全之 ReentrantLock 完全解析

线程互斥同步除了使用最基本的 synchronized 关键字外(关于 synchronized 关键字的实现原理,请看之前写的线程安全之 synchronized 关键字), Java 5 之后还提供了 API 可以实现同样的功能,java.util.concurrent(简称 J.U.C)下的重入锁 ReentrantLock 不仅实现可重入的互斥锁,还有几个高级功能:等待可中断、可实现公平锁、锁可绑定多个条件、可限定最大等待时间。下面从基本使用到内部实现,层层分析 ReentrantLock 原理。

ReentrantLock 的用法

ReentrantLock 文档中写明了在 lock() 方法后,用 try 把同步代码块包起来,然后在 finally 中调用 unlock()。这样做的目的是保证解锁操作一定会被调用,防止死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
// block until condition holds
lock.lock();
try {
// ... method body
} finally {
lock.unlock();
}
}
}

ReentrantLock 还可以绑定多个条件,下面使用 Condition 文档中的例子来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// notFull 是 buffer 没有到最大值的条件
final Condition notFull = lock.newCondition();
// notEmpty 是 buffer 不为空的条件
final Condition notEmpty = lock.newCondition();
// buffer 最大值为 100
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
// buffer 满了就挂起,直到收到 notFull 的信号
notFull.await();
items[putptr] = x;
if (++ putptr == items.length) putptr = 0;
++ count;
// buffer 新增 item,发送 notEmpty 信号
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
// buffer 为空就挂起,直到收到 notEmpty 的信号
notEmpty.await();
Object x = items[takeptr];
if (++ takeptr == items.length) takeptr = 0;
-- count;
// buffer 取走 item,发送 notFull 信号
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

ReentrantLock 的 API

ReentrantLock 实现了 Lock 和 Serializable 接口,下面是它的一些关键 API。

ReentrantLock() – 默认使用非公平锁

ReentrantLock(boolean fair) – 是否使用公平锁

void lock() – 获取锁,如果锁被其他线程持有,则阻塞该线程

void lockInterruptibly() – 获取锁,如果锁被其他线程持有,则阻塞该线程,直到获取锁或被其他线程中断;如果获取锁之前或者在获取过程的过程中线程中断,则抛出中断异常

boolean tryLock() – 如果直接获取锁成功则返回 true;如果锁被其他线程持有,返回 false

boolean tryLock(long timeout, TimeUnit unit) – 在等待时间内获取到锁并且线程没有被中断,返回 true;否则返回 false

void unlock() – 释放锁,如果该线程没有持有锁,则抛出异常

Condition newCondition() – 返回一个与锁关联的 Condition 实例

boolean isHeldByCurrentThread() – 当前线程是否持有锁

boolean isLocked() – 锁是否被任意线程持有

ReentrantLock 的内部实现

先总体描述下 ReentrantLock 的大致实现,有一个成员属性 sync,所有的方法都是调用该属性的方法。Sync 继承 AbstractQueuedSynchronizer(简称 AQS),AQS 封装了锁和线程等待队列的基本实现。Sync 有两个子类 NonfairSyncFairSync,分别对应非公平锁和公平锁。AQS 内部使用volatile int state表示同步状态,在 ReentrantLock 中 state 表示占有线程对锁的持有数量,为 0 表示锁未被持有,为 1 表示锁被某个线程持有,> 1 表示锁被某个线程持有多次(即重入)。

默认非公平锁的 lock()

非公平锁的 lock() 的方法路线如下:

1
2
3
lock() -> NonfairSync.lock() -> AQS.compareAndSetState(0, 1)
-> AQS.acquire(1) -> NonfairSync.tryAcquire(1) -> Sync.nonfairTryAcquire(1)
-> AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), 1)

下面一步步分析源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void NonfairSync.lock() {
// 锁未被持有,则获取锁,并将当前线程设置为锁的独占线程
// 这里可能为其他线程刚刚释放锁,还有其他线程在等待,但这时直接获取,所以是不公平的
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// 若锁被持有,则调用 AQS.acquire(1) 方法
else
acquire(1);
}
protected final boolean AQS.compareAndSetState(int expect, int update) {
// 利用 sun.misc.Unsafe 的 CAS 原子操作
// 如果 state 的当前值为 expect,则修改为 update,返回 true
// 如果 state 的当前值不为 expect,返回 false
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void AQS.acquire(int arg) {
// NonfairSync.tryAcquire(1) 方法只是调用了 Sync.nonfairTryAcquire(1)
// 先尝试获取锁
if (!tryAcquire(arg) &&
// 获取失败则把线程添加到等待队列中,并阻塞该线程直到获取成功
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean Sync.nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果锁未被持有,则直接获取
// 这里可能为其他线程刚刚释放锁,还有其他线程在等待,但这时直接获取,所以是不公平的
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 锁被当前线程持有,属于重入,state ++
int nextc = c + acquires;
// 如果 state > 2 ^ 31 - 1, 则抛出异常,这也是最大重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

所以非公平锁的 lock() 的大致逻辑为:如果锁未被持有,不管等待队列中的线程直接获取;如果锁被自己(当前线程)持有,则把 state 加 1;否则将当前线程加入到等待队列中,并阻塞该线程直到获取成功。

关于AQS.acquireQueued()的内部实现在下一篇文章中专门分析 AQS 的内部原理,阻塞线程是调用LockSupport.park()方法实现的。

LockSupport.park() 与线程中断的关系

使用 Object.wait() 阻塞线程后,中断阻塞线程会唤醒它并且清除中断状态然后抛出 InterruptedException。而 LockSupport.park() 阻塞线程后,线程中断只会唤醒被阻塞的线程,没有其他行为,和 unpark() 行为一致,所以需要判断 Thread.interrupted() 来确定是否由中断唤醒的。

公平锁的 lock()

公平锁的 lock() 方法路线如下:

1
2
lock() -> FairSync.lock() -> AQS.acquire(1) -> FairSync.tryAcquire(1)
-> AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), 1)

公平锁与非公平锁的主要区别在于 FairSync.tryAcquire(1) 这一步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean FairSync.tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果锁未被持有,并且当前线程在等待队列的头部或者等待队列为空,则获取锁
// 保证了没有线程等待时间超过当前线程,所以是公平的
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 锁被当前线程持有,属于重入,state ++
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

所以公平锁的 lock() 的大致逻辑为:如果锁未被持有,并且当前线程在等待队列的头部或者等待队列为空,则获取锁;如果锁被自己(当前线程)持有,则把 state 加 1;否则将当前线程加入到等待队列中,并阻塞该线程。

可中断的 lockInterruptibly()

lockInterruptibly() 方法的文档介绍是获取锁除非线程中断,首先看它的方法路线:

1
2
3
lockInterruptibly() -> AQS.acquireInterruptibly(1) -> throw new InterruptedException()
-> NonfairSync.tryAcquire(1) or FairSync.tryAcquire(1)
-> AQS.doAcquireInterruptibly(1)

下面看 acquireInterruptibly() 的源码:

1
2
3
4
5
6
7
8
9
10
public final void AQS.acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程是中断的,抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,即如果锁未被持有或者已被当前线程持有,直接获取
if (!tryAcquire(arg))
// 获取锁失败,把线程添加到等待队列,阻塞线程,直到获取成功或者线程中断,线程中断也会抛出 InterruptedException
doAcquireInterruptibly(arg);
}

从上面实现,可以发现 lockInterruptibly() 与 lock() 的主要区别有两点:(1)如果此时线程是中断的,那么直接抛出 InterruptedException 异常;(2)如果线程被阻塞,在等待过程中线程中断,抛出 InterruptedException 并取消获取,从等待队列中删除。该方法可以用线程中断防止长时间阻塞,也可以以此退出死锁。

非公平的 tryLock()

不管是公平锁或者非公平锁,tryLock() 方法都是使用非公平策略来尝试获取锁,看它的路线图:

1
tryLock() -> Sync.nonfairTryAcquire(1)

Sync.nonfairTryAcquire(1) 方法在默认非公平锁的 lock() 中分析过了,如果锁未被其他线程持有(两种情况:1. 未被持有 2. 被自己持有),则获取锁并返回 true,否则返回 false。tryLock() 方法只是尝试获取锁,获取失败就会返回不会阻塞线程,而使用 synchronized 关键字则会阻塞直到获取锁。

在限定时间内的 tryLock(long timeout, TimeUnit unit)

先从方法实现看看与 tryLock() 的区别:

1
2
3
tryLock(long timeout, TimeUnit unit) -> AQS.tryAcquireNanos(1, unit.toNanos(timeout)) -> throw new InterruptedException()
-> NonfairSync.tryAcquire(1) or FairSync.tryAcquire(1)
-> AQS.doAcquireNanos(1, nanosTimeout)

AQS 的 tryAcquireNanos 方法源码如下:

1
2
3
4
5
6
7
8
9
10
public final boolean AQS.tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果当前线程是中断的,抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,即如果锁未被持有或者已被当前线程持有,直接获取
return tryAcquire(arg) ||
// 获取失败,把线程添加到等待队列,阻塞线程,直到限定时间、线程中断或者在此之前获取成功,线程中断也会抛出 InterruptedException
doAcquireNanos(arg, nanosTimeout);
}

可以看出 AQS.tryAcquireNanos(arg, nanosTimeout) 方法与 AQS.acquireInterruptibly(arg) 类似,都支持线程中断,还加上了一个限定时间。如果限定时间为 0,那么就相当于调用 tryAcquire(1) 方法。上面的 tryLock() 方法在公平锁中还是使用非公平策略,但是 tryLock(0, TimeUnit.SECONDS) 在公平锁中可以实现公平的 tryLock() 方法。

unlock()

unlock() 释放持有的锁,从获取锁的过程可以猜测到其中肯定会将 state 减 1,但是具体的方法路线是如何呢?

1
2
unlock() -> AQS.release(1) -> Sync.tryRelease(1)
-> AQS.unparkSuccessor(head)

下面具体源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final boolean AQS.release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放成功,并等待队列的第一个节点不为空,使用 LockSupport.unpark() 唤醒第一个节点的线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean Sync.tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 当前线程没有持有锁,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// state 为 0,锁才是自由的,否则只是退出一次重入,锁的被持有线程不变
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
// 返回释放后锁是否自由,即未被持有
return free;
}

所以 unlock() 方法实际是将 state 减 1,之后如果锁是自由的,则会唤起等待队列的头节点中的线程。不过在两者中间,如果有其他线程获取锁的话,公平锁会判断是否有线程等待,而非公平锁则直接获取该锁。

isHeldByCurrentThread() 与 isLocked()

这两个方法就比较简单了,直接看对应的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
protected final boolean Sync.isHeldExclusively() {
// 判断锁的被持有线程是否为当前线程
return getExclusiveOwnerThread() == Thread.currentThread();
}
public boolean isLocked() {
return sync.isLocked();
}
final boolean Sync.isLocked() {
return getState() != 0;
}

newCondition() 方法会在下面单独描述,而其他方法不是很重要,这里就不再分析了。

Condition

Condition 的作用与 Object 的 wait、notify、notifyAll 类似,用以线程间协作。调用 Condition.await() 或 Object.wait() 将阻塞线程等待其他线程的通知,调用 Conditon.signal()、Condition.signalAll()、Object.nofity()、Object.notifyAll() 将唤起 wait 的线程。

下面有几个相关疑问,可以仔细琢磨下。

为什么 Condition 与锁相关,Object 的 wait、notify、notifyAll 与对象相关

先思考为什么 wait、notify、notifyAll 是 Object 的方法,如果它们不和对象相关联,wait() 阻塞线程后,notify() 唤起线程时不知道究竟唤醒哪些 wait 的线程,所以与某一对象对应可以帮助 notify() 时唤醒的也是与该对象相关的等待线程。

为什么 await、signal 方法需要先获取锁,wait、notify 方法需要先获取对象锁

这样做的好处是保证 wait 和 notify 的过程是互斥的,而它们又要与某一个东西相关联,所以直接的方法与对象锁相关联,实际不是与对象相关。所以 Condition 和 lock 相关联。

Condition 的 API

await() – 释放相关的锁,然后阻塞当前线程直到被 singal 通知或者线程中断

awaitUninterruptibly() – 释放相关的锁,阻塞当前线程直到被 singal 通知

awaitNanos(long nanosTimeout)、await(long time, TimeUnit unit)、awaitUntil(Date deadline) – 释放相关的锁,阻塞当前线程直到被 singal 通知、线程中断或限定时间到

signal() – 唤醒一个等待的线程,被唤醒的线程返回 await() 方法前需要重新获取锁

singalAll() – 唤醒所有等待的线程,所有被唤醒的线程返回 await() 方法前需要重新获取锁

ReentrantLock 的 Condition 的内部实现

下面看 await()、signal() 两个方法的实现细节,ReentrantLock 返回的 Condition 是 AQS.ConditionObject 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void AQS.ConditionObject.await() throws InterruptedException {
// 如果线程中断,直接抛出 InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 先把当前线程添加到 condition 的等待队列中
Node node = addConditionWaiter();
// 释放线程当前持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断线程是否被通知想重新获取锁
while (!isOnSyncQueue(node)) {
// 阻塞线程
LockSupport.park(this);
// 阻塞线程被唤醒后,如果此时线程中断,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 所以阻塞线程被 signal 唤醒后,或者线程中断后可以跳出循环
// 重新获取锁,获取失败则阻塞加入阻塞队列直到获取成功
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 如果获取的过程中线程中断,设置 interruptMode 为 REINTERRUPT
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清楚等待队列中的取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 如果 interruptMode 为 REINTERRUPT, 再次中断线程
// 如果 interruptMode 为 THROW_IE,抛出 InterruptedException
reportInterruptAfterWait(interruptMode);
}

所以 awit() 的大致逻辑为:释放锁,并且阻塞自己并添加到 condition 的等待队列,被 signal 通知或线程中断后唤醒线程,重新获取锁。

下面再看 signal() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final void AQS.ConditionObject.signal() {
// 锁不是互斥独占锁时,抛出 IllegalMonitorStateException 异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 如果等待队列不为空,
doSignal(first);
}
private void AQS.ConditionObject.doSignal(Node first) {
do {
// 把 first 节点从队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 循环找到第一个未取消的节点,把该节点从 condition 队列添加到 sync 等待队列(lock 队列)
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

可以看到 signal() 并没有唤起 wait 的线程,只是把等待时间最长的未取消线程添加到 sync 等待队列,等待获取锁。

而 signalAll() 方法的区别时将 condition 等待队列中所有节点移到 sync 等待队列。

现在再来分析下,一开始提供的 Condition 的 BoundedBuffer 示例,假设现在 BoundedBuffer 中 items 为空:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Object take() throws InterruptedException {
lock.lock();
// 此时,线程 A 获取到 lock
try {
while (count == 0)
// 因为 buffer 为空,释放获取的 lock,阻塞线程,添加到 notEmpty 等待队列
notEmpty.await();
Object x = items[takeptr];
if (++ takeptr == items.length) takeptr = 0;
-- count;
// buffer 取走 item,发送 notFull 信号
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
public void put(Object x) throws InterruptedException {
lock.lock();
// 然后,线程 B 获取到 lock
try {
while (count == items.length)
// buffer 满了就挂起,直到收到 notFull 的信号
notFull.await();
items[putptr] = x;
if (++ putptr == items.length) putptr = 0;
++ count;
// 把 notEmpty 等待队列中的线程 A 移到 lock 的等待队列
notEmpty.signal();
} finally {
// 线程 B 释放锁,唤醒 lock 等待队列中的线程 A,线程 A 获取到 lock 然后从 await() 方法返回
lock.unlock();
}
}

总结

ReentrantLock 是 API 的重入锁,相对 synchronized 关键字来说,额外支持公平锁(synchronized 是非公平的)、获取锁可中断、可以限定获取的最大时间、可以关联多个 Condition。内部主要实现细节是基于 AQS 的,等待队列是用链表结构存储的,阻塞队列使用 LockSupport.park() 实现。

什么时候用 ReentrantLock?

JDK 1.6 之后,synchronized 的性能优化得和 ReentrantLock 差不多,所以在 synchronized 可以满足条件的情况话,优先使用 synchronized。

END
Johnny Shieh wechat
我的公众号,不只有技术,还有咖啡和彩蛋!