BlockingQueue阻塞队列一

BlockingQueue是一个带阻塞功能的队列,入队时若队列已满阻塞调用者出队时若队列为空阻塞调用者。该接口和JDK中Queue接口是兼容的,在其基础上增加了阻塞功能,addoffer是无阻塞的,offer带超时时间的是阻塞的,put是阻塞的,removepeekpoll是非阻塞的,poll带超时时间的是阻塞的,take是阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e); // 将元素插入队列,队列没满的话,放入成功。否则抛出异常。非条件阻塞
boolean offer(E e); // 将元素插入队列,若队列可容纳返回true,否则false,非条件阻塞
void put(E e) throws InterruptedException; // 将元素插入队列,若队列已满,条件阻塞
// // 将元素插入队列,若队列可容纳返回true,否则false,指定时间内未能入队抛出异常,条件阻塞
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException; // 取出队列首位的元素,若队列为空,条件阻塞
// 取出队列首位元素,若不能立即取出,则可以等timeout时间,取不到则抛出异常;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o); // 将元素从队列中移除,非阻塞
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueue各种实现类

队列的实质是一种存储数据的结构,通常用数组或链表实现,一般而言队列具备FIFO先进先出特性,也有双端队列和优先级队列,主要是入队EnQueue和出队DeQueue操作。队列分为几乎可以无限增长无限队列定义了最大容量优先队列

ArrayBlockingQueue

基于数组有界阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,按照FIFO原则对元素进行排序,以便缓存队列中的数据对象,一旦创建就不能再增加容量。由构造函数可以其实现是基于一把锁和两个条件。支持对等待的生产者线程和消费者线程可选公平策略

1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 试图从空队列中检索元素将导致类似阻塞
notFull = lock.newCondition(); // 试图向已满队列中放入元素会导致放入操作受阻塞
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// 真正存储数据的数组
final Object[] items;
// take、poll、peek、remove的下一个索引
int takeIndex;
// put、offer、add的下一个索引
int putIndex;
// 队列中元素总个数
int count;
// 可重入锁,插入和获取数据都需要获取该锁
final ReentrantLock lock;
// 队列不为空的条件
private final Condition notEmpty;
// 队列未满的条件
private final Condition notFull;
}

入队操作

入队操作的基础方法,putIndex相当于是一个环形操作,每调用一次enqueue方法则会给notEmpty条件队列发送一次信号。

1
2
3
4
5
6
7
8
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // 入队操作的下一个索引加一
putIndex = 0; // 若下一个索引等于数组长度,则从对头继续插入
count++; // 总数加一
notEmpty.signal(); // 通知消费者线程
}

add方法最终调用的是offer的无超时时间的方法,若队列已满直接返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean add(E e) {
return super.add(e); // 最终也是调用的offer方法
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 获取独占锁,不能被中断
try {
if (count == items.length) // 若队列已满则返回false,入队失败
return false;
else {
enqueue(e); // 将数据插入队列尾部
return true;
}
} finally {
lock.unlock(); // 释放独占锁
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可以被中断
try {
while (count == items.length) { // 若队列已满,自旋等待到超时时间结束
if (nanos <= 0)
return false; // 若超时时间设置为小于等于0,则直接返回失败
// 条件阻塞,超过等待时间后再次去获取到锁时抛出异常(当队列不再是满队列时被放入队列中排队唤醒)
nanos = notFull.awaitNanos(nanos);
}
enqueue(e); // 插入队列
return true;
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 条件阻塞,当队列不再是满队列时被放入队列中排队唤醒
enqueue(e);
} finally {
lock.unlock();
}
}

出队操作

出队的基础方法,这里的takeIndex也是环形操作,每调用一次dequeue都会给notFull条件队列发送一次信号。

1
2
3
4
5
6
7
8
9
10
11
12
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null; // 直接将该索引处的元素置空
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 消费元素后队列不再是满的,通知阻塞的生产者
return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 若队列中没有元素则返回空,否则将队列中元素出队
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
// 条件阻塞,超过等待时间后再次去获取到锁时抛出异常(当队列不再是空队列时被放入队列中排队唤醒)
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 若队列为空,消费线程进入等待队列
return dequeue();
} finally {
lock.unlock();
}
}
public E peek() { // 只查询队头数据,不删除数据
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue是一种基于单向链表的阻塞队列,理论上是有界的,队头和队尾是2个指针分开操作,故用了2把锁2个条件,分别用来控制元素入队和出队的原子性,以及对应的条件变量保证多线程先入队出队操作的线程安全,同时有一个AtomicInteger原子变量记录count数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> { // 单向链表
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 队列容量
private final int capacity;
// 队列元素个数
private final AtomicInteger count = new AtomicInteger();
// 存放首节点
transient Node<E> head;
// 存放尾节点
private transient Node<E> last;
// 执行take, poll等操作时候需要获取该锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待
private final Condition notEmpty = takeLock.newCondition();
// 执行put, offer等操作时候需要获取该锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待
private final Condition notFull = putLock.newCondition();
}

可以很明显看到若不指定队列容量,则默认容量Integer.MAX_VALUE相当与无界队列了。

1
2
3
4
5
6
7
8
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

入队操作

入队基础函数功能很简单,直接将最后个元素的next指向插入的元素,且将last指向最新的尾节点

1
2
3
private void enqueue(Node<E> node) {
last = last.next = node;
}

插入队列的元素不能为null,在往队列中put放数据时,若队列是满的通过notFull条件阻塞等待,消费线程从队列中取出一个元素后,判断取之前队列是满的,取完后通知生产者从notFull条件阻塞等待中唤醒,然后继续执行添加过程,发现添加后队列还没有满,则继续通知其他生产者从notFull条件阻塞等待中唤醒,添加完成后判断添加之前队列若是空的,则通知消费者notEmpty条件等待队列唤醒,继续出队操作。

为了提高并发度,用2把锁分别控制队头和队尾的操作,意味着put和put之间take和take之间是互斥的put和take之间不互斥,因为各自拿了一把锁,故当需要调用对方的condition的signal时,还必须加上对方的锁,也就是signalNotEmptysignalNotFull方法。不仅put会通知take,take也会通知put,当put发现非满时也会通知其他put线程,当take发现非空时,也会通知其他take线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); // 若队列已满,则放入条件队列中排队
}
enqueue(node); // 入队
c = count.getAndIncrement(); // 返回入队前队列中元素的个数
if (c + 1 < capacity)
notFull.signal(); // 若发现队列未满,发信号去唤醒排队的生产者线程
} finally {
putLock.unlock();
}
if (c == 0) // 若队列之前元素个数为0,现插入了一个元素则需要通知消费者
signalNotEmpty();
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false; // 若队列已经满了,直接返回false
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal(); // 通知消费者线程
} finally {
takeLock.unlock();
}
}

出队操作

出队操作与入队操作是相反的,通过take从队列中取数据时,若队列是空的通过notEmpty条件阻塞等待,生产线程往队列中添加一个元素后,判断添加之前队列是空的,添加后通知生产者从notFull条件阻塞等待中唤醒,然后继续执行添加过程,发现出队前元素个数大于1,则继续通知其他消费者线程notEmpty条件阻塞等待线程唤醒,取出完成后判断取出之前队列若是满的,则通知生产者线程从notFull条件等待队列唤醒,继续入队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await(); // 当队列为空是条件阻塞take
}
x = dequeue(); // 队列非空时,出队
c = count.getAndDecrement();
if (c > 1) // 出队之前队列中元素大于1
notEmpty.signal(); // 通知其他take线程
} finally {
takeLock.unlock();
}
if (c == capacity) // 若出队前队列是满的,出队后队列就不是满的了,这时需要通知put线程
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0) // 若传入时间小于当前时间,则直接返回空
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue(); // 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

PriorityBlockingQueue

队列通常是先进先出的,但PriorityQueue是按照元素的优先级从小到大出队列的,PriorityQueue中的两个元素之间需要可以比较大小,并现实了Comparable接口

allocationSpinLock自旋锁,用CAS操作来保证只有一个线程可以扩容队列,状态为0或者1,其中0表示当前没有在进行扩容,1表示当前正在扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private static final int DEFAULT_INITIAL_CAPACITY = 11; // 默认初始容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private transient Object[] queue; // 用数组实现的二叉小根堆
private transient int size; // 队列中元素个数
private transient Comparator<? super E> comparator; // 比较操作符,若未定义则使用元素自带的比较功能
private final ReentrantLock lock; // 一把锁加一个条件,没有非满的条件
private final Condition notEmpty; // 当队列中元素为空时条件阻塞
private transient volatile int allocationSpinLock; // 自旋锁,用CAS操作来保证只有一个线程可以扩容队列
private PriorityQueue<E> q;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1) throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
}

PriorityBlockingQueue是使用数组实现的二叉小根堆堆算法保证每次出队都是优先级最高的元素,基于一把锁加一个非空条件,无非满的条件,若不指定初始大小,内部会设定默认大小11,当元素个数超过该大小后,会自动扩容

在阻塞的实现方面,和ArrayBlockingQueue的机制相似,主要区别是用数组实现了一个二叉最小根堆,从而实现按优先级从小到大出队列,另一个区别是没有notFull条件,当元素个数超出数组长度时进行扩容处理。

入队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap); // size超出了数组的长度,则扩容
try {
Comparator<? super E> cmp = comparator;
if (cmp == null) // 若没定义比较操作符,使用元素自带的比较功能
siftUpComparable(n, e, array); // 执行siftUp操作将元素入堆
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 必须释放然后重新获取主锁,这里通过CAS来保证并发的扩容的安全性
Object[] newArray = null;
if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
// 若旧容量小于64则新容量为2*oldCap + 2,否则扩容50%
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // 新的容量可能溢出了,溢出抛出OOM异常
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null)
Thread.yield(); // 说明另一个线程正在分配,则退出
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}

tryGrow扩容会先释放锁,然后用CAS控制只有一个线程可以扩容成功,扩容时若不释放锁,其他线程就不能入队和出队,大大降低了并发度,CAS失败的线程会调用Thread.yield()让出CPU,目的是为了让扩容线程扩容后优先调用lock.lock重新获取锁。

有可能yield的线程在扩容线程扩容完成前已经退出,并获取到了锁。若当前数组扩容还没完毕,当前线程会再次调用tryGrow方法。

出队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0]; // 因为是二叉最小根堆,即堆定即是要出队的元素
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n); // 执行siftDown操作调整堆
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}

DelayQueue

DelayQueue延迟队列,是一个按时间从小到大出队的PriorityQueue优先级队列,所谓的延迟时间,就是通过未来将要执行的时间减去当前时间,故放入DelayQueue中的元素,必须实现Delayed接口

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

getDelay返回值小于或等于0,则说明元素到期,需要从队列中拿出来执行,该接口首先继承了Comparable接口,故要实现该接口,必须实现Comparable接口。DelayQueue是基于一把锁一个非空条件

1
2
3
4
5
6
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final PriorityQueue<E> q = new PriorityQueue<E>(); // 优先级队列
private Thread leader = null;
private final transient ReentrantLock lock = new ReentrantLock(); // 一把锁 + 一个非空条件
private final Condition available = lock.newCondition();
}

出队操作

不仅队列为空时阻塞,且堆顶元素的延迟时间没有到时也会阻塞,使用leader变量记录了等待堆顶元素的第一个线程通过getDelay可知堆顶元素何时到期,不必无限期等待,可通过available.awaitNanos(delay)等待一个有期限的时间,只有还有其他线程也在等待堆顶元素时即leader != null,才需要无限期等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); // 取出二叉堆堆顶元素,也就是延迟最小的元素
if (first == null)
available.await(); // 若队列为空则take线程阻塞
else {
long delay = first.getDelay(NANOSECONDS); // 计算延时时间
if (delay <= 0) // 堆顶元素的延时时间小于或等于0,出队列,返回
return q.poll();
first = null; // 等待时不要保留引用
if (leader != null) // 若已经有其他线程在等待该元素,则无限期阻塞
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 否则阻塞有限期时间
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 自己是leader,已经获取了堆顶元素,唤醒其他线程
lock.unlock();
}
}

入队操作

不是每放入一个元素都需要通知等待线程,只有当延迟时间是最小的,在堆顶时,才有必要通知等待线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 元素放入二叉堆
if (q.peek() == e) { // 若放进去的元素刚好在堆顶,则说明放入的元素延迟时间最小,则需要通知等待线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}