ReentrantReadWriteLock原理

和互斥锁相比,ReentrantReadWriteLock读写锁是读线程之间不相互互斥,写线程与写线程和读线程间互斥,ReentrantReadWriteLock的父接口是ReadWriteLock。使用ReadWriteLock时是获得内部的读锁和写锁,然后分别调用lock/unlock。

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

表面上看readLock和writeLock是两把锁,实际上它们只是同一把锁的两个视图,线程分为两类,读线程和写线程,读线程间不互斥,读线程与写线程互斥,写线程间互斥。从源码看readLock和writeLock实际共用同一个sync对象。Sync对象同互斥锁一样分为公平锁非公平锁两种策略,且继承自AQS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}
}

同互斥锁一样,读写锁也是用state变量来表示锁状态的,但含义完全不同,在Sync内部对state进行了重新定义,由于无法用一次CAS同事操作两个int变量,故state低16位用来记录写锁,若低16位的值等于5,表示一个写线程重入了5次;高16为用来记录读锁,若高16位的值等于5,表示5个线程都拿到了该锁或一个读线程重入了5次;state=0时表示没有线程持有锁state>0表示要么有线程持有读锁,要么有线程持有写锁

1
2
3
4
5
6
7
8
9
10
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 持有读锁的线程重入次数,高16位记录读锁
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 持有写锁的线程重入次数,低16位记录写锁
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

在AQS中有acquire/releaseacquireShared/releaseShared两对模板方法,互斥锁和读写锁中的写锁都是基于acquire/release模板方法来实现的,读写锁中的读锁是基于acquireShared/releaseShared模板方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}

将读/写、公平与非公平进行排列组合,有四种组合方式,tryAcquireShared、tryAcquire都是在Sync中实现的,Sync中的这两个方法又是模板方法,在FairSync和NonfairSync中分别实现。

  • 读锁的公平实现Sync.tryAcquireShared + FairSync中两个覆写的子函数
  • 读锁的非公平实现Sync.tryAcquireShared + NonfairSync中两个覆写的子函数
  • 写锁的公平实现Sync.tryAcquire+ FairSync中两个覆写的子函数
  • 写锁的非公平实现Sync.tryAcquire+ NonfairSync中两个覆写的子函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() { // 写线程抢锁时是否应该阻塞
return false; // 写线程在抢锁之前永远不被阻塞,是非公平
}
final boolean readerShouldBlock() { // 读线程抢锁之前是否应该阻塞
return apparentlyFirstQueuedIsExclusive(); // 读线程在抢锁时,当队列中第一个元素是写线程时,要阻塞
}
}

static final class FairSync extends Sync {
final boolean writerShouldBlock() { // 写线程抢锁时是否应该阻塞
return hasQueuedPredecessors(); // 写线程在抢锁之前,若队列中有其他线程排队,要阻塞,是公平的
}
final boolean readerShouldBlock() { // 读线程抢锁之前是否应该阻塞
return hasQueuedPredecessors(); // 读线程在抢锁之前,若队列中有其他线程排队,要阻塞,是公平的
}
}
// 查询当队列中第一个元素是否是独占模式
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // 第一元素为非共享模式
s.thread != null;
}
// 查询当前队列中是否有元素排队
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

对于公平锁,不论是读锁还是写锁,只要队列中有其他线程在排队,都不能直接去抢锁,要排在队列尾部

对于非公平锁,读锁与写锁实现略有差异,state=0时写线程能抢锁,state>0且持有写锁的线程是它自己能再次重入;因为读线程间不互斥,若当前是读线程持有锁,若其他读线程还非公平去一直抢锁,可能导致写线程永远拿不到锁,故对读线程的非公平做了一些约束。当队列第一个元素时写线程时,读线程也要阻塞一下不能直接的去抢锁

WriteLock公平与非公平实现

WriteLock写锁是排他锁,实现策略与互斥锁类似,重写了tryAcquire/tryRelease方法,若获取锁失败同互斥锁一样将该任务放入队列中排队处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 写线程只能有一个,但写线程能多次重入
if (c != 0) { // 说明写线程或读线程持有锁
// w == 0,说明锁被读线程持有直接,返回去队列中排队
// w != 0,且持有写锁的线程不是当前线程,返回去队列中排队
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 超过了最大重入次数
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// c == 0 没有线程持有该锁,判断是否需要被阻塞,否则尝试获取锁,若获取锁失败返回false
// writerShouldBlock()四种不同的实现策略:非公平直接返回false,公平判断队列中是否有元素排队
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 拿锁成功将独占锁设置为当前线程
return true;
}
}
}

c != 0w == 0,一定是读线程拿着锁

c != 0w != 0,当前一定是写线程拿着锁,若current != getExclusiveOwnerThread()说明持有写锁的线程不是当前线程

c == 0,说明当前没有线程持有该锁,可以通过CAS抢锁,抢锁成功将持有锁的线程设置为自己

WriteLock写锁是排他锁,公平与非公平实现几乎一模一样,只是writerShouldBlock()分别被FairSyncNonfairSync实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public static class WriteLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 释放锁成功,唤醒后续节点
return true;
}
return false;
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 低16位用来记录写锁,且写锁只能重入,故直接减
boolean free = exclusiveCount(nextc) == 0; // 是否将写锁完全释放锁成功
if (free)
setExclusiveOwnerThread(null);
setState(nextc); // 因为写线程是排他锁,故state操作不需要CAS
return free;
}
}
}

ReadLock公平与非公平实现

ReadLock读锁是共享锁,重写了tryAcquireShared/tryReleaseShared方法,这里读锁唤醒队列中紧接着排队的所有读线程的关键代码在setHeadAndPropagate(node, r)方法中,该方法会调用doReleaseShared从而调用unparkSuccessor去唤醒后继节点,这里也是共享锁独占锁唤醒后继线程的最大区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquireShared(1);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); // 获取锁失败
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 将节点以共享模型添加入队列尾部
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取前置节点
if (p == head) { // 若前置节点为head
int r = tryAcquireShared(arg); // 再次尝试获取读锁
if (r >= 0) { // 若获取读锁成功
setHeadAndPropagate(node, r); // 该处是去唤醒后续排队的读线程
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 第一次将前置节点waitStatus设置为SIGNAL,第二次park阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 若后继节点是共享模式,则继续唤醒后继节点
doReleaseShared();
}
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 写锁被某线程持有,且该线程还不是自己,读锁肯定拿不到直接返回
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 非公平锁判断队列中第一个元素是否是写锁,公平锁判断队列中是否有元素排队
if (!readerShouldBlock() && r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // CAS拿读锁高16位加1,读锁状态在高16位,故须把1左移16位再加一
if (r == 0) { // r之前等于0,说明这是第一个拿到读锁的线程
firstReader = current; // 记录第一拿取读锁的线程
firstReaderHoldCount = 1; // firstReader的持有计数
} else if (firstReader == current) { // 不是第一个
firstReaderHoldCount++; // firstReader的持有计数加一
} else { // firstReader不为当前线程,说明已经有别人先占读锁了
HoldCounter rh = cachedHoldCounter; // 缓存最后一个获取读锁的线程;
// rh中的线程不是当前线程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh); // 初始化ThreadLocal将rh保存进去
rh.count++;
}
return 1; // 说明成功获取了读锁
}
return fullTryAcquireShared(current); // 拿读锁失败,进入该函数不断自旋拿读锁
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 自旋抢读锁,直到写锁被人占了,且不是当前线程占的写锁
int c = getState();
// 判断写锁是否被占用
if (exclusiveCount(c) != 0) {
// 若线程拿到了写锁,且不是自己
if (getExclusiveOwnerThread() != current)
return -1;
// 非公平锁判断队列中第一个元素是否是写锁,公平锁判断队列中是否有元素排队
} else if (readerShouldBlock()) {
// 当前线程就是第一个获取读锁的线程(重入)
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
// 若当前线程未初始化过ThreadLocal中的值,get()会执行初始化
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
// 若count==0,则是上一行代码初始化的,那么执行remove
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS尝试获取锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) { // 获取锁成功,且当前无读线程持有锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 重入
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
}
}

低16位不等于0,说明写线程持有锁,且getExclusiveOwnerThread() != current才返回-1,若getExclusiveOwnerThread() == current说明当前线程是一个写线程,即一个线程持有WriteLock写锁后可以再去调用ReadLock.lock

这里的firstReader和cachedHoldCounter之类的变量,只是一些统计变量,在ReentrantReadWriteLock对外的一些查询函数中会用到,对整个读写互斥机制没有影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public static class ReadLock implements Lock, java.io.Serializable {
public void unlock() {
sync.releaseShared(1);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 若没有线程再持有读锁了
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 若队列中还有元素排队
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 且头节点waitStatus为-1
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 唤醒头节点前,将waitStatus设置为0
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒头节点
}
// 若头节点状态已经是0了,则将头节点waitStatus修改为-3
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 退出循环
}
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { // 第一个持有读锁的线程是否是当前线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--; // 说明是重入锁 释放一次
} else {
HoldCounter rh = cachedHoldCounter; // 最后一个获取读锁的线程
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 自旋
int c = getState();
int nextc = c - SHARED_UNIT; // 读锁在高16,故需要左移16位,再减
if (compareAndSetState(c, nextc))
return nextc == 0; // 减1后判断,是否还有线程持有读锁
}
}
}
}

因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减一需要通过一个for循环加CAS操作不断重试,这是tryReleaseShared与tryRelease根本差异所在。