ScheduledThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
// 延迟任务,非周期
scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("延迟3s执行");
}, 5000, TimeUnit.MILLISECONDS);
// 周期任务,与任务本身执行时间有关
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println("1s后执行然后每2s执行一次");
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 周期任务,与任务本身执行时间无关,任务执行时间必须小于间隔时间
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println("1s后执行然后每2s执行一次");
}, 1000, 2000, TimeUnit.MILLISECONDS);

ScheduledThreadPoolExecutor是用来处理延时任务定时任务周期任务。延迟执行任务是依靠DelayQueue,而周期性执行任务是执行完一个任务后,再把任务仍回到任务队列中。任务的执行过程还是复用ThreadPoolExecutor,延迟的过程是在DelayedWorkQueue内部完成。

ScheduledThreadPoolExecutor内部有实现了一个特定的DelayQueueDelayedWorkQueue,其原理与DelayQueue一样,但对任务的取消进行了优化,DelayedWorkQueue是一个无界队列,内部实现了一个PriorityQueue,它会根据time的先后时间排序,time小的排在前面,若time相同则根据sequenceNumber排序,sequenceNumber小的排在前面;

工作线程会从DelayedWorkQueue取已经到期的任务去执行,执行结束后重新设置任务的到期时间,再次放回DelayedWorkQueue。

1
2
3
4
5
6
7
8
9
10
11
12
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,提交任务时会将任务封装成SchduledFutureTask类型的任务,然后直接将任务放入DelayedWorkQueue优先级队列中,当然若核心创建的任务线程小于corePoolSize会创建任务线程,但当前线程不会直接执行任务,而非是直接创建限制来直接当前提交的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

schedule最终任务的执行是通过ScheduledFutureTask的run方法来完成的。对于周期任务在run方法中通过ScheduledFutureTask.super.runAndReset()的返回值来确实是否要继续将任务添加到下一次周期中,返回false说明执行的周期任务抛出了异常,则不再将其添加到队列中。但不影响其他周期任务的执行

setNextRunTime中对time的处理差异是scheduleAtFixedRatescheduleWithFixedDelay的区别所在,当period为正数时是通过time直接加上period,而period为负数时triggerTime方法里面是now()加上-period

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 序列号,打破FIFO
private final long sequenceNumber;
// 启用执行任务的时间,即延迟时间
private long time;
// 重复任务的周期,正值表示固定速率执行,负值表示固定延迟执行,0表示非重复任务。
private final long period;
RunnableScheduledFuture<V> outerTask = this;
// 延迟队列的索引
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 比较两个ScheduledFutureTask的大小,在DelayedWorkQueue的siftUp方法中调用,用于排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public boolean isPeriodic() {
return period != 0;
}
// 设置下一次执行的时间,scheduleAtFixedRate与scheduleWithFixedDelay的区别所在
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p; // 上一次的时间加上周期时间
else
time = triggerTime(-p); // 当前时间加上周期时间
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0) remove(this);
return cancelled;
}
//
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run(); // 若为非重复任务,则直接执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 若为周期任务,执行完后重新计算延迟时间,再扔回队列
reExecutePeriodic(outerTask); // 将当前任务再次入队
}
}
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); // 将任务重新丢入队列中
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

这里需要特别说明的一点是在执行周期任务时,若任务抛出异常,则将不能设置下一次执行时间,则任务将不能正常周期执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW; // 若执行异常,这里返回false
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

scheduleAtFixedRate只能提交固定速率执行任务。与任务本身执行时间无关,任务执行时间必须小于间隔时间

1
2
3
4
5
6
7
8
9
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (period <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

scheduleWithFixedDelay只能提交固定延迟执行任务,这里对delay取了负数,与任务本身执行时间有关,若任务执行时间是10s间隔时间是2s则下一次执行时间为12s。

1
2
3
4
5
6
7
8
9
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (delay <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

DelayedWorkQueue

1
2
3
4
5
6
7
8
9
10
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16; // 队列初始容量
// 根据初始容量创建RunnableScheduledFuture类型的数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null; // leader线程
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();
}

往队列中添加元素是通过add方法,而add方法最终是调用offer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null) throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); // 若当前元素数量大于队列长度则进行扩容,扩容50%
size = i + 1; // 元素数量加1
if (i == 0) {
queue[0] = e;
setIndex(e, 0); // 记录索引
} else { //把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
siftUp(i, e); // 把需要最早执行的任务放在前面
}
// 若新加入的元素就是队列头:用户提交的第一个任务,或新任务进行堆调整以后,排在队列头
if (queue[0] == e) {
// leader设置为null为了使在take方法中的线程在通过available.signal();
// 后会执行available.awaitNanos(delay);
leader = null;
available.signal(); // 加入元素以后,唤醒worker线程
}
} finally {
lock.unlock();
}
return true;
}
private void grow() { // 扩容50%
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
// 循环的根据key节点与其父节点来判断,若key节点执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) { // 找到父节点的索引
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 若key节点的执行时间大于父节点的执行时间,不需要再排序了,直接放数组最后面
if (key.compareTo(e) >= 0) break;
// 若key.compareTo(e)<0,说明key节点执行时间小于父节点执行时间,需要把父节点移到后面
queue[k] = e;
setIndex(e, k);
k = parent; // 设置索引为k
}
queue[k] = key; // key设置为排序后的位置中
setIndex(key, k);
}

在ThreadPoolExecutor中getTask方法,工作线程会循环地调用take方法从workQueue中取任务,但定时任务却不同,若一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,故在take方法中要保证只有在到指定的执行时间时任务才可以被取走

leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader线程必须在从take或poll返回之前signal其它线程,除非其他线程成为了leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 堆的更节点都为空,说明队列为空需要阻塞
available.await();
else {
// 计算当前时间到执行时间的时间间隔
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null) // leader不为空,阻塞线程
available.await();
else {
// leader为空,则把leader设置为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 阻塞到执行时间
} finally {
// 设置leader = null,让其他线程执行available.awaitNanos(delay);
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader不为空,则说明leader的线程正在执行available.awaitNanos(delay);
// 如果queue[0] == null,说明队列为空
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size; // 数组长度-1
RunnableScheduledFuture<?> x = queue[s]; // 取出最后一个节点
queue[s] = null;
if (s != 0) // 长度不为0,则从第一个元素开始排序,目的是要把最后一个节点放到合适的位置上
siftDown(0, x);
setIndex(f, -1);
return f;
}
private void siftDown(int k, RunnableScheduledFuture<?> key) { // 使堆从k开始向下调整
int half = size >>> 1; // 根据二叉树的特性,数组长度除以2,表示取有子节点的索引
while (k < half) { // 判断索引为k的节点是否有子节点
int child = (k << 1) + 1; // 左子节点的索引
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1; // 右子节点的索引
// 如果有右子节点并且左子节点的时间间隔大于右子节点,取时间间隔最小的节点
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0) // 如果key的时间间隔小于等于c的时间间隔,跳出循环
break;
queue[k] = c; // 设置要移除索引的节点为其子节点
setIndex(c, k);
k = child;
}
queue[k] = key; // 将key放入索引为k的位置
setIndex(key, k);
}

siftdown方法在执行完并不是有序的,但子节点的下次执行时间一定比父节点的下次执行时间要大,由于每次都会取左子节点和右子节点中下次执行时间最小的节点,故可以保证在take和poll时出队是有序的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;

setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement); // 从i开始向下调整
if (queue[i] == replacement) // 如果queue[i]==replacement,说明i是叶子节点
siftUp(i, replacement); // 不能保证子节点的下次执行时间比父节点的大,需要进行一次向上调整
}
return true;
} finally {
lock.unlock();
}
}