ReentrantLock之公平锁源码分析

转自:ReentrantLock 之公平锁源码分析

  本文分析的ReentrantLock所对应的Java版本为JDK8。
  在阅读本文前,读者应该知道什么是CAS、自旋。

本文大纲

  1. ReentrantLock公平锁简介
  2. AQS
  3. lock方法
  4. unlock方法

1. ReentrantLock公平锁简介

  ReentrantLock是JUC(java.util.concurrent)包中Lock接口的一个实现类,它是基于AbstractQueuedSynchronizer(下文简称AQS)来实现锁的功能。ReentrantLock的内部类Sync继承了AbstractQueuedSynchronizer,Sync又有FairSync和NonFairSync两个子类。FairSync实现了公平锁相关的操作,NonFairSync实现了非公平锁相关的操作。它们之间的关系如下:
  

  公平锁的公平之处主要体现在,对于一个新来的线程,如果锁没有被占用,它会判断等待队列中是否还有其它的等待线程,如果有的话,就加入等待队列队尾,否则就去抢占锁。

下面这段代码展示了公平锁的使用方法:

1
2
3
4
5
6
7
8
9
10
private final Lock lock = new ReentrantLock(true); // 参数true代表创建公平锁

public void method() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock();
}
}

2. AQS

  下面简单介绍一下AQS中的Node内部类和几个重要的成员变量。

2.1 Node

  AQS中,维护了一个Node内部类,用于包装我们的线程。我们需要关注Node中的如下属性:

  • pre:当前节点的前驱节点。
  • next:当前节点的后继节点。
  • thread:thread表示被包装的线程。
  • waitStatus:waitStatus是一个int整型,可以被赋予如下几种值:

    static final int CANCELLED =  1; // 线程被取消
    static final int SIGNAL  = -1; // 后继节点中的线程需要被唤醒
    static final int CONDITION = -2; // 暂不关注
    static final int PROPAGATE = -3; // 暂不关注
    

另外,当一个新的Node被创建时,waitStatus是0。

2.2 head

  head指向队列中的队首元素,可以理解为当前持有锁的线程。

2.3 tail

  tail指向队列中的队尾元素。

2.4 state

  state表示在ReentrantLock中可以理解为锁的状态,0表示当前锁没有被占用,大于0的数表示锁被当前线程重入的次数。例如,当state等于2时,表示当前线程在这把锁上进入了两次。

2.5 exclusiveOwnerThread

  表示当前占用锁的线程。

2.6 等待队列

  下图简单展示了AQS中的等待队列:
  

3. lock方法

  有了上面的AQS的基础知识后,我们就可以展开对ReentrantLock公平锁的分析了,先从lock方法入手。

  ReentrantLock中的lock方法很简单,只是调用了Sync类(本文研究公平锁,所以应该是FairSync类)中的lock方法:

public void lock() {
    sync.lock();
}

  我们跟到FairSync的lock方法,代码也很简单,调用了AQS中的acquire方法:

final void lock() {
    acquire(1);
}

  acquire方法:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 调用tryAcquire尝试去获取锁,如果获取成功,则方法结束
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果获取锁失败,执行acquireQueued方法,将把当前线程排入队尾
selfInterrupt();
}

  tryAcquire方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取锁的状态
if (c == 0) { // 如果状态是0,表示锁没有被占用
if (!hasQueuedPredecessors() && // 判断是队列中是否有排队中的线程
compareAndSetState(0, acquires)) { // 队列中没有排队的线程,则尝试用CAS去获取一下锁
setExclusiveOwnerThread(current); // 获取锁成功,则将当前占有锁的线程设置为当前线程
return true;
}
}
// 锁被占用、队列中有排队的线程或者当前线程在获取锁的时候失败将执行下面的代码
else if (current == getExclusiveOwnerThread()) { // 当前线程是否是占有锁的线程
int nextc = c + acquires; // 是的话,表示当前线程是重入这把锁,将锁的状态进行加1
if (nextc < 0)
throw new Error("Maximum lock count exceeded"); // 锁的重入次数超过int能够表示最大的值,抛出异常
setState(nextc); // 设置锁的状态
return true;
}
return false; // 没有获取到锁
}

  hasQueuedPredecessors方法:

1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && // 队列中的队首和队尾元素不相同
((s = h.next) == null || s.thread != Thread.currentThread()); // 队列中的第二个元素不为null,且第二个元素中的线程不是当前线程。这里如果返回true,说明队列中至少存在tail、head两个节点,就会执行acquireQueued将当前线程加入队尾
}

  如果tryAcquire没有获取到锁,将执行:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

  我们先分析addWaiter方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 将当前线程包装成Node,mode参数值为null,表示独占模式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred; // 如果队列中的尾节点不为空,将当前node的前驱节点设置为之前队列中的tail
if (compareAndSetTail(pred, node)) { // 用CAS把当前node设置为队尾元素
pred.next = node; // 成功的话,则将之前队尾元素的后继节点设置为当前节点。如果这里不清楚的话,请结合前面讲等待队列的那张图进行理解。
return node;
}
}
enq(node); // 队尾节点为空,或者用CAS设置队尾元素失败,则用自旋的方式入队
return node;
}

  enq方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) // 队尾元素为空,创建一个空的Node,并设置为队首
tail = head; // 设置队首和队尾为同一个空Node,进入下一次循环
} else {
node.prev = t; // 如果队列中的尾节点不为空,将当前node的前驱节点设置为之前队列中的tail
if (compareAndSetTail(t, node)) { // 用CAS把当前node设置为队尾元素
t.next = node; // 成功的话,则将之前队尾元素的后继节点设置为当前节点
return t;
}
}
}
}

  下面这张图反应了上面enq方法的处理流程:
  

  经过上面的方法,当前node已经加入等待队列的队尾,接下来将执行acquireQueued方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取node的前驱节点
if (p == head && tryAcquire(arg)) { // 如果node的前驱是head,它将去尝试获取锁(tryAcquire方法在前面已经分析过)
setHead(node); // 获取成功,则将node设置为head
p.next = null; // 将之前的head的后继节点置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 当前node的前驱不是head,将为当前node找到一个能够将其唤醒的前驱节点;或者当前node的前驱是head,但是获取锁失败
parkAndCheckInterrupt()) // 将当前线程挂起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

  shouldParkAfterFailedAcquire方法的作用就是找到一个能够唤醒当前node的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 开始时是0
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 前驱节点的状态是-1,会唤醒后继节点,可以将线程挂起
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev; // 前驱节点中的线程被取消,那就需要一直循环直到找到一个没有被设置为取消状态的前驱节点
} while (pred.waitStatus > 0);
pred.next = node; // 从后向前找,将第一个非取消状态的节点,设置这个节点的后继节点设置为当前node
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // waitStatus是0或者-3的时候,这时waitStatus都将被设置为-1
// 即后继节点需要前驱节点唤醒
}
return false; // 上层代码再进行一次循环,下次进入此方法时,将进入第一个if条件
}

  找到了合适的前驱节点,parkAndCheckInterrupt方法当前线程挂起:

1
2
3
4
private final boolean parkAndCheckInterrupt() { // 将线程挂起,等待前驱节点的唤醒
LockSupport.park(this);
return Thread.interrupted();
}

4. unlock方法

  ReentrantLock的unlock方法调用AQS中的release方法:

1
2
3
public void unlock() {
sync.release(1); // 调用AQS的release方法
}

  release方法:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试去释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 释放锁成功,head不为空,并且head的waitStatus不为0的情况下,将唤醒后继节点
return true;
}
return false;
}

  tryRelease方法:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 将锁的状态减1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException(); // 准备释放锁的线程不是持有锁的线程,抛出异常
boolean free = false;
if (c == 0) {
free = true; // 锁的状态是0,说明不存在重入的情况了,可以直接释放了
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

  锁释放成功,将唤醒后继节点,unparkSuccessor方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 注意,这个node是head节点
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 当前node的状态是小于0,将其状态设置为0

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // head节点的后继节点
if (s == null || s.waitStatus > 0) {
s = null; // 执行到这表示head的后继节点是1,处于取消的状态
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; // 从等待队列的队尾向前找,找到倒序的最后一个处于非取消状态的节点
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒head后面的处于非取消状态的第一个(正序)节点
}

---------------- The End ----------------
0%