同步工具类

下面的方法基本都是会用到下面的两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // park阻塞
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 唤醒后续节点
return true;
}
return false;
}
}

Semaphore

通常将Semaphore称为信号量, 可控制同时访问特定资源的线程数,通过协调各个线程,以保证合理的使用资源。常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Semaphore implements java.io.Serializable {
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// permits表示许可线程数量, fair表示公平性
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 获取一个令牌,在获取到令牌或被其他线程调用中断之前,线程一直处于阻塞状态
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程
public void release() {
sync.releaseShared(1);
}
}

当初始资源为1时,Semaphore退化为排他锁,Semaphore实现和锁时分类似,也是基于AQS,同样有公平非公平之分,只有尝试获取锁的地方有公平与非公平之分,且唯一区别在于是否需要去判断队列中是否有元素排队释放锁不区分公平与非公平的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 可用资源的总数即state的初始值
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 与公平锁的唯一区别在于,这里不需要去判断队列中是否有元素排队
int available = getState(); // 获取当前可用资源数量
int remaining = available - acquires; // remaining小于0直接返回
if (remaining < 0 || compareAndSetState(available, remaining)) // remaining>=0尝试更新资源数
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) { // AQS模板方法
for (;;) {
int current = getState(); // 获取当前可用资源数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS加操作更新资源数
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) { // AQS模板方法
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) { // AQS模板方法
for (;;) { // 与非公平锁的唯一区别在于,这里需要去判断队列中是否有元素排队
if (hasQueuedPredecessors())
return -1; // 若队列中有元素排队直接返回-1
int available = getState(); // 获取当前可用资源数量
int remaining = available - acquires; // remaining小于0直接返回
if (remaining < 0 || compareAndSetState(available, remaining)) // remaining>=0 CAS减操作更新资源数
return remaining;
}
}
}
}

资源的总数即state的初始值,在acquire中对state进行CAS减操作,减到0之后线程阻塞,release中对state进行CAS加操作。

1
2
3
4
5
6
7
8
9
Semaphore semaphore = new Semaphore(2);
new Thread(() ->{
try {
semaphore.acquire(); // 获取公共资源
Thread.sleep(5000);
semaphore.release(); // 释放公共资源
} catch (InterruptedException e) {
}
}).start();

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Integer sleep = new Random().nextInt(5000);
System.out.println("index:" + Thread.currentThread().getName() + " sleep:" + sleep);
Thread.sleep(sleep);
} catch (Exception e) {
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println("所有线程执行完毕:" + (System.currentTimeMillis() - startTime));

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。初始值为线程的数量,每完成一个线程后计数器减一,当计数器的值为0时,表示所有的线程都已经完成任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,当CountDownLatch使用完毕后,它不能再次被使用。CountDownLatch没有公平与非公平之分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) { // AQS模板方法
return (getState() == 0) ? 1 : -1; // 直接判断当前计数器是否为0,若不为0则阻塞
}
protected boolean tryReleaseShared(int releases) { // AQS模板方法
for (;;) {
int c = getState();
if (c == 0) // 若计数器已经为0,直接返回false
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // CAS减
return nextc == 0; // 判断计数器是否为0
}
}
}

private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
}

CyclicBarrier

CyclicBarrier栅栏屏障让一组线程到达一个屏障时被阻塞,也可叫同步点,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, () -> System.out.println(Thread.currentThread().getName() + ":所有线程到达屏障,开始执行任务"));
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Integer sleep = new Random().nextInt(10000);
System.out.println("index:" + Thread.currentThread().getName() + " sleep:" + sleep);
try {
Thread.sleep(sleep);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + ":全部到达屏障.");

CyclicBarrier通过ReentrantLockCondition组合实现的,构造方法中的Runnable参数表示最后一个线程到达要做的任务,线程调用await()表示自己已经到达栅栏BrokenBarrierException表示栅栏已经被破坏,破坏原因可能是其中某个线程await()时被中断或者超时。一旦栅栏被破坏就不能再被重复使用了,所有等待的线程都将抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private static class Generation {
boolean broken = false; // 栅栏是否已经被破坏
}
// 当所有节点已经到达栅栏时,重置栅栏
private void nextGeneration() {
trip.signalAll(); // 将阻塞在Condition等待队列中的元素移动到同步等待队列中
count = parties; // 重置count
generation = new Generation();
}
private void breakBarrier() { // 栅栏被破坏时调用
generation.broken = true; // 标识栅栏已被破坏
count = parties; // 栅栏被破坏后重置count
trip.signalAll(); // 将阻塞在Condition等待队列中的元素移动到同步等待队列中
}
// parties表示屏障拦截的线程数量, Runnable表示最后一个线程到达要做的任务
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 获取锁
try {
final Generation g = generation;
if (g.broken) // 若发生中断该属性设置为true,直接抛出异常
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); // 若发生中断,打破栅栏
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 判断是否是最后一个到达的线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 若已经是最后一个线程了,执行设定任务
ranAction = true;
nextGeneration(); // 重置栅栏
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) { // 不是最后一个到达线程
try {
if (!timed) // 是否有超时
trip.await(); // 没有则放入条件队列中等待
else if (nanos > 0L) // 超时时间nanos大于0,则等待设置nanos
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock(); // 唤醒从Condition等待队列中移动到同步队列中的元素
}
}
}

Exchanger

当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final Exchanger<Integer> exchanger = new Exchanger<Integer>();
for (int i = 0; i < 10; i++) {
final Integer num = i;
new Thread(() -> {
System.out.println("线程:Thread_" + Thread.currentThread().getName() + "数据:" + num);
try {
Integer exchangeNum = exchanger.exchange(num);
Thread.sleep(1000);
System.out.println("线程:Thread_" + Thread.currentThread().getName() + "原先数据:" + num + " , 交换后的数据:" + exchangeNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

Phaser