BlockingQueue阻塞队列二

SynchronousQueue

SynchronousQueue是一种特殊的BlockingQueue,其本身没有容量先调用put方法线程会阻塞,直到另一个线程调用了take方法,两个线程才同时解锁,反之亦然。通过构造函数可知,和锁一样SynchronousQueue也有公平非公平模式,若是公平模式,则用TransferQueue实现,若是非公平模式,则用TransferStack实现

1
2
3
4
5
6
7
private transient volatile Transferer<E> transferer;
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

put和take都调用了transfer方法,而TransferQueue和TransferStack分别实现了该接口,若是put第一个参数就是对应元素,若是take则第一个参数为null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

若是公平模式,则第一个调用put的线程会在队列头部第一个来到的take线程和它进行配对,遵循先到先配对原则,若是非公平模式,则最后一个调用put的线程会在栈顶,第一个来到的take线程会和它进行配对,遵循后道先配对原则

TransferQueue

TransferQueue是一个基于单向链表实现的队列,通过head和tail两个指针记录头部和尾部,初始时head和tail会指向一个空节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next; // 单向链表
volatile Object item; // 若是put则item不为空,若是take则item为空
volatile Thread waiter; // put或take对应的阻塞线程
final boolean isData; // 若是put则isData为true否则为false
}
transient volatile QNode head; // 单向链表的队头
transient volatile QNode tail; // 单向链表的队尾
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
}

put节点和take节点一旦相遇,就会配对出队列,故队列中不可能同事存在put节点和take节点,故要么所有节点都是put节点,要么所有节点都是take节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 队列还未初始化,自旋等待
continue; // spin

if (h == t || t.isData == isData) { // 队列为空或当前线程和队列中元素为同一中模式
QNode tn = t.next;
if (t != tail) // 不一致读,重新执行for循环
continue;
if (tn != null) { // 若t.next不为空,则替换尾部节点
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData); // 新建一个节点
if (!t.casNext(null, s)) // 加入尾部
continue;

advanceTail(t, s); // 后移tail指针
Object x = awaitFulfill(s, e, timed, nanos); // 进入阻塞状态
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) { // 从队列中唤醒,确定已经处于队列中的第一个元素
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;

} else { // 当前线程可以和队列中的第一个元素进行配对
QNode m = h.next; // 取出队列中第一个元素
if (t != tail || m == null || h != head) // 不一致读,重新执行for循环
continue; // inconsistent read

Object x = m.item;
if (isData == (x != null) || // 已经配对过了
x == m || // m cancelled
!m.casItem(x, e)) { // 尝试配对
advanceHead(h, m); // 已经配对过,直接出队列
continue;
}

advanceHead(h, m); // 配对成功,出队列
LockSupport.unpark(m.waiter); // 唤醒队列中与第一个元素对应的线程
return (x != null) ? (E)x : e;
}
}
}
void advanceTail(QNode t, QNode nt) {
if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

若当前线程和队列中的元素时同一种模式,则与当前线程对应的节点被加入队列尾部并阻塞,若不是同一种模式,则选取队列头部的第一元素进行配对。这里配对是通过m.casItem(x, e)把自己的item x换成对方的item e,若CAS操作成功,则配对成功,若是put节点,则isData=true,item!=null,若是take节点,则isData=false,item=null,若CAS操作不成功,则isData和item之间将不一致,也就是isData == (x != null),通过该条件可以判断节点是否已经配对成功过了。

TransferStack

TransferStack也是一个单向链表,不同于队列,其只需要一个head指针就能实现入栈和出栈操作,链表中节点有三种状态,REQUEST表示take节点,DATA表示put节点,二者配对成功会生成一个FULFILLING节点,入栈,然后FULFILLING节点和被配对的节点一起出栈。与TransferQueue不同TransferStack没有空的头结点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final class TransferStack<E> extends Transferer<E> {
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
static final class SNode {
volatile SNode next; // 单向链表
volatile SNode match; // 配对的节点
volatile Thread waiter; // 对应的阻塞线程
Object item; // data; or null for REQUESTs
int mode; // 三种模式
}
volatile SNode head;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 同一种模式
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈
SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 非同一种模式
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // 生成一个FULFILLING节点入栈
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // 两个节点一起出栈
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // 已经匹配过了,出栈
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // 配对一起出栈
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

BlockingDeque

BlockingDeque定义了一个阻塞的双端队列,该接口继承了BlockingQueue的同时,增加了对应的双端队列的操作接口,该接口只有一个实现LinkedBlockingDeque

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
E pollFirst(long timeout, TimeUnit unit) throws InterruptedException;
E pollLast(long timeout, TimeUnit unit) throws InterruptedException;
boolean removeFirstOccurrence(Object o);
boolean removeLastOccurrence(Object o);
}

LinkedBlockingDeque的核心数据结构如下,是一个双向链表,对应的原理和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
static final class Node<E> { // 双向链表的Node
E item;
Node<E> prev; // 前置节点
Node<E> next; // 后置节点
Node(E x) {
item = x;
}
}
transient Node<E> first; // 队列的头
transient Node<E> last; // 队列的尾
private transient int count; // 元素个数
private final int capacity; // 容量
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
}

入队操作

入队的基础操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean linkFirst(Node<E> node) {
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}

出队操作

出队的基础操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
private E unlinkLast() {
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

CopyOnWrite

CopyOnWrite机制的核心思想是,读写分离空间换时间,避免为保证并发安全导致的激烈的锁竞争。适用于读多写少的情况,最大程度的提高读的效率;

CopyOnWrite是最终一致性,在写的过程中,原有的读的数据是不会发生更新的,只有新的读才能读到最新数据;写的时候不能并发写,需要对写操作进行加锁;使用volatile变量,使其他线程能够及时读到新的数据;

CopyOnWrite机制在Java并发包中有CopyOnWriteArrayList和CopyOnWriteArraySet两种实现。CopyOnWriteArraySet底层也是通过CopyOnWriteArrayList来实现的。

CopyOnWrite机制

集合框架中的ArrayList是非线程安全的,Vector虽是线程安全的,但由于简单粗暴的锁同步机制,性能较差。CopyOnWriteArrayList容器允许并发读,读操作是无锁的,性能较高。至于写操作,如向容器中添加一个元素,则首先将当前容器复制一份,然后在新副本上执行写操作,结束之后再将原容器的引用指向新容器。

由于每次都要拷贝一份数据,对内存压力较大,甚至可能导致频繁GC,且无法保证实时性。添加的逻辑很简单,先将原容器copy一份,然后在新副本上执行写操作,之后再切换引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新副本上执行添加操作
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
// 将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index, numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}

读操作是无锁的

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}