Callable与Future

JDK只提供了两种线程启动方式,这两种方式中的run()方法的返回值是void类型Callable不算是线程启动方式,Thread类也并没有接收Callable参数的构造方法,只接收Runnable接口参数的构造方法,若要将结果值返回,需要用到一个包装类FutrueTask将Callable包装成Runnable,然后传递给Thread的构造方法即可:

1
2
3
4
5
6
7
8
9
new Thread().start();
new Thread(new Runnable() {
@Override
public void run() {}
}).start();

FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
Integer result = futureTask.get();

对于线程池来说,线程池execute(Runnable command)接口是无返回值的,与之相对应的是一个有返回值的接口Future<T> submit(Callable<T> task),该方法并不是在ThreadPoolExecutor中直接实现的,而是在其父类AbstractExecutorService中实现的。

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractExecutorService implements ExecutorService {
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 把Callable转换成Runnable
execute(ftask); // 调用ThreadPoolExecutor的execute(Runnable command)接口
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}

Callable其实是用Runnable实现的,在submit内部把Callable通过FutureTask这个Adapter转换成Runnable,然后通过execute执行。FutureTask是一个Adapter对象,即实现了Runnable接口也实现了Future接口,其内部包含了一个Callable对象,从而实现了Callable转换成Runnable。

FutureTask任务初始运行状态NEW,是构造函数保证的,INTERRUPTINGINTERRUPTED是任务中间状态,其余四种是任务终止状态,任务的中间状态是一个瞬态,非常短暂。且任务中间态并不代表任务正在执行,而是任务已执行完,正在设置最终返回结果,故只要state不处于NEW状态,就说明任务已经执行完毕,这里的执行完毕是指传入的Callable对象的call方法执行完毕,或者抛出了异常,运行状态仅在setsetExceptioncancel方法转换为终止状态

所有等待任务执行完毕的线程的集合是通过单向链表实现的队列,FutureTask中将WaitNode单向链表当做来使用,确切来说是当做Treiber来使用的,使用CAS来完成入栈出栈操作

同时可能有多个线程都在获取任务的执行结果,若任务还在执行过程中,则将获取结果的线程包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作。由于FutureTask中的队列本质上是一个Treiber栈,则使用该队列只需要一个指向栈顶节点的指针就行了,在FutureTask中waiters属性即是指向栈顶的指针:

1
2
3
4
5
6
private volatile WaitNode waiters; // 指向栈顶节点的指针
static final class WaitNode {
volatile Thread thread; // 记录线程的thread属性
volatile WaitNode next; // 指向下一个节点的指针
WaitNode() { thread = Thread.currentThread(); }
}

callable表示要执行的任务本身runner是执行callable的线程,为了中断取消任务做准备,在运行时执行run方法时通过CAS设置的。outcome是用来存储Callable的执行结果抛出的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state; // CAS state变量 + LockSupport.park/unpark
private static final int NEW = 0; // 任务初始运行状态
private static final int COMPLETING = 1; // 正在设置任务结果
private static final int NORMAL = 2; // 任务正常执行完毕
private static final int EXCEPTIONAL = 3; // 任务执行过程中发生异常
private static final int CANCELLED = 4; // 任务被取消
private static final int INTERRUPTING = 5; // 正在中断运行任务的线程
private static final int INTERRUPTED = 6; // 任务被中断

private Callable<V> callable; // 要执行的任务本身
private Object outcome; // Callable的执行结果,或抛出的异常
private volatile Thread runner; // 执行callable的线程,为了中断或取消任务做准备
private volatile WaitNode waiters; // 指向栈顶节点的指针
public FutureTask(Callable<V> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 确保callable的可见性
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // 确保callable的可见性
}
}

执行run方法时,使用CAS操作将runner属性设置位当前线程,可见runner属性是在运行时被初始化的。putOrderedIntputIntVolatile是等价的,保证了state状态对其他线程的可见性。handlePossibleCancellationInterrupt方法要结合后面的cancel方法来看,检测发现s = INTERRUPTING,说明cancel方法还没有执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 关键点在于把Callable的call转换成Runnable的run
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 任务执行异常,把异常栈存入outcome变量
}
if (ran)
set(result); // 任务执行成功,把返回值存入outcome变量
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}

首先将waiters属性的值由原值设置为null,waiters属性指向了Treiber栈的栈顶节点,将该值设为null的目的就是清空整个栈。

设置不成功则if语句块不会被执行,进行下一轮for循环,而下一轮for循环的判断条件又是waiters!=null ,则说明该方法只是为了确保waiters属性被成功设置成null。若设置成功,内层循环的作用是遍历链表中所有等待的线程,并唤醒他们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}

任务的取消,若任务已经执行完成、任务已经被取消过了、任务因为某种原因不能被取消cancel操作一定失败。cancel操作返回true并不代表任务真的被取消了,这取决于发起cancel时,任务所处的状态。

若发起cancel时任务还没有开始运行,则随后任务就不会被执行,若发起cancel时任务已经在运行了,若mayInterruptIfRunning为true,则当前在执行的任务会被中断,否则可以允许正在执行的任务继续运行,直到它执行完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

FutureTask中会涉及到两类线程,一类是执行任务的线程只有一个,FutureTask的run方法就由该线程来执行;一类是获取任务执行结果的线程可以有多个,这些线程可并发执行,每一个线程都是独立的,都可以调用get方法来获取任务的执行结果。故FutureTask中使用CAS state变量加LockSupport.park/unpark来实现阻塞唤醒机制。若任务还没有执行完,则这些线程就需要进入Treiber栈中挂起,直到任务执行结束,或者等待的线程自身被中断。

使用awaitDone方法等待任务进入终止态,awaitDone的返回值是任务的状态,而不是任务的结果,最终的结果是通过report方法根据awaitDone放回的状态来判断从而返回具体的响应结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { // 先检测当前线程是否被中断了
removeWaiter(q); // 将当前节点从队列中移除,若是被中断唤醒的q!=null
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 若任务已经进入终止态,则直接返回任务的状态;
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 若任务正在设置执行结果,则让出当前线程的CPU资源继续等待
Thread.yield();
else if (q == null) // 当前线程还没有进入等待队列,则新建了一个WaitNode
q = new WaitNode();
else if (!queued) // 当前线程还没有入队,则入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
} else
// 成功将任务放入队列后,将该线程挂起
// 任务执行完毕在finishCompletion方法中会唤醒所有在Treiber栈中等待的线程
// 等待的线程自身因为被中断等原因而被唤醒。
LockSupport.park(this);
}
}
// 它根据当前state状态,返回正常执行的结果,或者抛出指定的异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null; // 这里将要移除链表的节点持有的线程置空,便于后面将其从链表移除判断
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null) // 将链表中所有thread == null的节点移除
pred = q;
else if (pred != null) {
pred.next = s; // 将链表中所有thread == null的节点移除,要移除的节点不在栈顶
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}