Condition原理

Condition工作原理

Condition本身也是一个接口,其功能与wait/notify类似

1
2
3
4
5
6
7
8
9
10
11
12
public interface Condition {
void await() throws InterruptedException; // 在接到信号或被中断之前一直处于等待状态
void awaitUninterruptibly(); // 在接到信号前一直处于等待状态,不可被中断
// 在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 在接到信号、被中断或到达指定等待时间之前一直处于等待状态
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal(); // 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁
void signalAll(); // 唤醒所有等待线程
}

wait/notify必须与synchronized一起使用,同样Condition也必须和Lock一起使用,因此Lock接口中有一个与Condition相关的接口,所有Condition都是从Lock中构造出来的。Lock持有一个FIFO双向的同步阻塞队列Condition持有一个FIFO单向条件等待队列

1
2
3
4
5
6
7
8
public interface Lock {
void lock(); // 不能被中断
void lockInterruptibly() throws InterruptedException; // 可以被中断
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition(); // 所有Condition都是从Lock中构造出来的
}

Condition使用很简洁,避免了wait/notify的生产者通知生产者、消费者通知消费者的问题;因为Condition也必须和Lock一起使用,故Condition的实现也是Lock的一部份。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ReentrantLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return sync.newCondition();
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
public static class ReadLock implements Lock, java.io.Serializable {
public Condition newCondition() {
throw new UnsupportedOperationException(); // 读锁不支持Condition
}
}
public static class WriteLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return sync.newCondition();
}
}
}

ReadLock读锁不支持ConditionWriteLock写锁和互斥锁都支持Condition,虽然他们钩子调用的是自己内部类Sync,但内部类Sync都继承自AQS,sync.newCondition()最终都调用了AQS中的newCondition

1
2
3
4
5
6
7
8
9
10
11
12
final ConditionObject newCondition() {
return new ConditionObject();
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// 条件等待队列的首节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列
private transient Node firstWaiter;
// 条件等待队列的尾节点
private transient Node lastWaiter;
public ConditionObject() { }
}
}

await实现

每个Condition对象上面都阻塞了多个线程,ConditionObject内部维护了一个单向链表组成的FIFO队列,首先将当前线程封装成node节点,并将节点加入Condition等待队列的尾部,然后释放同步state状态即独占锁,释放锁的同时会将其在同步队列中移除,并非是将同步队列中的节点直接加入等待队列,唤醒同步队列中的后继节点,然后当前线程会进入park等待状态

由同步队列到等待队列的装换

自旋等待直到其他线程调用signal(),将node节点在等待队列上的节点移动到同步队列,或被中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 正要执行await收到中断信号,抛出异常
throw new InterruptedException();
Node node = addConditionWaiter(); // 加入Condition等待队列
int savedState = fullyRelease(node); // 阻塞在Condition之前必须先释放锁,否则会死锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 判断当前节点是否也在同步队列中,signal时会将节点移动到同步队列
LockSupport.park(this); // 若不在同步队列中,则还没有被其他线程signal,则park阻塞自己
// 判断标记两种中断,在被signal前中断还是被signal后中断。分别标记为THROW_IE,REINTERRUPT即-1和1
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break; // 被中断唤醒也会退出自旋
}
// 阻塞当前节点,直到node获取到了锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 重新拿锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清理Condition队列中被关闭的节点
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 根据中断状态来判断是抛出异常,还是执行中断
}
// 将节点添加Condition等待队列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// 若lastWaiter已经被cancelled直接清理出链表
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter; // 执行清理操作后lastWaiter可能变化了,所以重新赋值
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 检查中断,若在发出unpark信号之前被中断,则返回THROW_IE,否则返回REINTERRUPT,若未中断则返回0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 在取消等待后将节点转移到同步队列,若线程在发出信号之前被取消,则返回true
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 若当前状态为CONDITION且更新为初始状态成功,则将其加入同步等待队列
return true;
}
while (!isOnSyncQueue(node))
Thread.yield(); // 若不在同步队列让出等待
return false;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 断掉中间废弃节点时使用
while (t != null) { // 遍历整个链
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // 若当前节点waitStatus不为CONDITION
t.nextWaiter = null; // 将当前节点的nextWaiter置空,即断掉
if (trail == null) // 若头节点都废弃了
firstWaiter = next;
else // 将前一个正常节点的nextWaiter指向下一个节点
trail.nextWaiter = next;
if (next == null) // next为null说明trail节点已经是最后的节点了
lastWaiter = trail;
} else trail = t; // 将trail往后移动
t = next;
}
}
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
final int fullyRelease(Node node) { // 释放所持有全部锁,包括重入的
boolean failed = true;
try {
int savedState = getState(); // 不管重入几次都把state释放为0
if (release(savedState)) { // 释放独占锁,且唤醒同步阻塞队列中的排队元素
failed = false;
return savedState;
} else { // 释放锁失败抛出异常,后续将节点移除
throw new IllegalMonitorStateException();
}
} finally {
if (failed) //若释放锁失败,将waitStatus置为CANCELLED,便于后续清理出Condition等待队列
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
final boolean isOnSyncQueue(Node node) { // 这里的同步队列是只Lock中的FIFO队列
// 若waitStatus为CONDITION或者其前置节点为null,表示没有在同步队列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // 若后继节点不为空,则一定在同步队列中
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
// 从尾部向前搜索整个队列,查询该节点是否在同步队列中
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
}

线程调用await方法时,肯定已经拿到了锁,故在addConditionWaiter方法中,对双向链表的操作不需要执行CAS操作

线程在执行wait操作前,必须先释放锁,也就是fullyRelease(node),否则会发生死锁,这和wait/notifysynchronized的配合机制一样。

线程wait中被唤醒后,必须用acquireQueued(node, savedState)重新获取锁。

checkInterruptWhileWaiting(node))在LockSupport.park(this)之后,是为了检测在park期间是否收到过中断信号。这里await方法是可以响应中断的,当发现自己是被中断唤醒的,会直接退出while循环,await方法也会返回。

isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列中初始时Node只在Condition队列中,而不在AQS队列中,执行signal操作时会放入AQS同步队列

1
2
3
4
5
6
7
8
9
10
11
12
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted()) // 从park中醒来,收到中断,不退出,继续执行while循环
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

signal实现

signal方法会将等待队列中节点移到同步队列中。这里并不会直接唤醒等待队列中的节点,除非将等待队列中的节点移到同步队列中时,其在同步队列中的前置节点已经被取消,或CAS将前置节点waitStatus更新为SIGNAL时失败,才会执行唤醒当前节点操作。

由等待队列到同步队列的装换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void signal() {
if (!isHeldExclusively()) // 只有持有锁的线程才有资格调用signal
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒Condition队列中第一个线程
}
private void doSignal(Node first) {
do { // 若first节点被取消,接着唤醒下一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 若已经是最后一个元素了
first.nextWaiter = null; // 唤醒当前节点后要被剔除队列,便于GC所以将nextWaiter置空
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// 若更新waitStatus失败说明,节点已经被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 把Node放入互斥锁的同步队列中,再调用下面的unpark
Node p = enq(node);
int ws = p.waitStatus;
// 若node前置节点已经被取消,或更新前置节点状态为可唤醒失败,则唤醒当前node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒当前线程
return true;
}

同await一样,调用signal时必须先拿到锁,因为前面执行await时把锁释放了。从队列中取出firstWait唤醒,在通过unpark唤醒它之前,先调用enq(node)函数将该Node放入AQS锁对应的阻塞队列中,所以await中才while (!isOnSyncQueue(node))判断,若条件满足,说明await线程不是被中断,而是被unpark唤醒的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void signalAll() {
if (!isHeldExclusively()) // 只有持有锁的线程才有资格调用signal
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}