线程池原理

线程

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。

线程状态切换

线程池

线程池可以重用存在的线程,减少线程创建,消亡的开销,提高性能提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池框架中最基础的接口是Executor,其定义了用于执行Runnable的execute方法,在其子类ExecutorService中定义了线程池的具体行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown(); // 在完成已提交的任务后封闭办事,不再接管新任务
List<Runnable> shutdownNow(); //停止所有正在履行的任务并封闭办事
boolean isShutdown(); // 测试是否该ExecutorService已被关闭
boolean isTerminated(); // 测试是否所有任务都履行完毕了
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task); // 可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
<T> Future<T> submit(Runnable task, T result); // 可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
Future<?> submit(Runnable task); // 可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

线程池关闭

线程池的运行状态和线程池中有效线程数都是他通过ctl一个字段来保存的,其3保存运行状态29保存有效线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池的运行状态和线程池中有效线程的数量,高3位保存runState,低29位保存workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池容量
// 初始化状态,能够接收新任务,以及对已添加的任务进行处理
private static final int RUNNING = -1 << COUNT_BITS; //高3位为111
// 不接收新任务,但能处理已添加的任务,调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位为000
// 不接收新任务,不处理已添加任务,且中断正在处理的任务,调用shutdownNow()时线程池由(RUNNING or SHUTDOWN) -> STOP。
private static final int STOP = 1 << COUNT_BITS; //高3位为001
// 当所有的任务已终止,线程池会变为TIDYING状态,会执行钩子函数terminated(),默认是空函数
// SHUTDOWN状态下,阻塞队列为空且线程池中执行的任务也为空时,则由SHUTDOWN -> TIDYING,在STOP状态下,线程池中执行的任务为空时,则由STOP -> TIDYING。
private static final int TIDYING = 2 << COUNT_BITS; //高3位为010
// 线程池彻底终止,则为TERMINATED状态,TIDYING状态下,执行完terminated()之后,由TIDYING -> TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS; //高3位为011
// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取出低29位的值,表示当前活动的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
}

关闭一个线程池时,有的线程还在执行某个任务,有的调用者正在向线程池提交任务,且队列中可能还有未执行的任务,故关闭过程不可能瞬时,而是需要一个平滑的过渡。

线程池状态流转

RUNNING:线程池的初始化状态,能够接收新任务,能对已添加的任务进行处理;

SHUTDOWN:调用线程池的shutdown接口时,线程池由RUNNING切换成SHUTDOWN状态,不接收新任务,但能处理已添加的任务;

STOP:调用线程池shutdownNow接口,线程池由RUNNINGSHUTDOWN切换成STOP状态,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务;

TIDYING:线程池在SHUTDOWN状态,阻塞队列为空并且线程池中执行的任务也为空,由SHUTDOWN切换为TIDYING状态;线程池在STOP状态,线程池中执行的任务为空,由STOP切换为TIDYING状态;当所有的任务已终止,ctl记录的任务数量为0,线程池切换为TIDYING状态。切换为TIDYING状态时,执行钩子函数terminated()terminated()ThreadPoolExecutor类中是空的,若想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()方法;

TERMINATED:线程池彻底终止状态,线程池在TIDYING状态时,执行terminated()之后,就会由TIDYING切换为TERMINATED状态;

状态迁移只会从小到大-1,0,1,2,3迁移不会逆向迁移。除terminated之外,线程池还提供其他几个钩子函数,这些钩子函数实现都是空的,若要实现自己的线程池,可重写这几个函数。

1
2
3
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

调用showdownshutdownNow后,线程池并不会立即关闭,还需要循环调用awaitTermination来等待线程池真正终止。正确关闭线程池的步骤如下:

1
2
3
4
5
6
7
8
9
executor.showdown(); // 或者executor.shutdownNow(); 
try {
boolean loop = true;
do { // 阻塞等待,直到线程池所有任务结束
loop = !executor.awaitTermination(2, TimeUnit.SECONDS);
} while (loop);
} catch (InterruptedException e) {
e.printStackTrace();
}

awaitTermination内部是通过不断循环判断线程池是否到达最终状态TERMINATED,若是则返回,否则通过termination条件变量阻塞一段时间,后继续判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final Condition termination = mainLock.newCondition();
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED)) // 判断状态是否是TERMINATED
return true; // 若是TERMINATED状态直接返回true
if (nanos <= 0)
return false; // 超时返回false
nanos = termination.awaitNanos(nanos); // 条件等待
}
} finally {
mainLock.unlock();
}
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

可从下面Worker的工作情况可知,Worker初始状态state-1,只有调用unlockstate才变为0,一个Worker执行任务前会加锁,意味着可以通过是否持有锁判断出线程是否处于空闲状态tryLock调用成功说明线程处于空闲状态,向其发送中断信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检测是否有关闭线程池的权限
advanceRunState(SHUTDOWN); // 把状态设置到SHUTDOWN
interruptIdleWorkers(); // 只中断空闲线程
onShutdown(); // 钩子函数,空实现
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// tryLock调用成功说明线程处于空闲状态,调用不成功说明线程当前持有锁,正在执行某个任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检测是否有关闭线程池的权限
advanceRunState(STOP); // 把状态设置到STOP
interruptWorkers(); // 只中断所有线程
tasks = drainQueue(); // 清空队列
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted(); // 不管线程是否在执行中,一律发送中断信号
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

shutdownshutdownNow都调用了tryTerminate方法,tryTerminate不会强制终止线程池,只是做一下检测,当worker0时,workQueue为空时,先把状态切换到TIDYING,然后调用钩子函数terminated(),当执行完成时把状态改为TERMINATED,接着调用termination.signalAll()通知前面阻塞在awaitTermination的所有调用者线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return; // 若线程池为RUNNING、TIDYING、TERMINATED等状态,或线程池为SHUTDOWN状态且队列不为空,则直接退出
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

核心配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

一般任务性质类型分为CPU密集型也叫计算密集型IO密集型CPU密集型的任务线程数一般设置为:线程数 = CPU核数 + 1IO密集型任务线程数一般设置为:线程数 = ((线程等待时间 / 线程CPU时间) + 1) * CPU数目

核心数据结构

通过阻塞队列workQueue存放线程池排队任务,通过mainLock独占锁对线程池内部各种变量进行互斥访问控制。

通过Worker封装线程,线程池中的每一个线程被封装成一个Worker对象,线程池维护的其实就是一组Worker对象,Worker中firstTask用来保存传入任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。通过getThreadFactory().newThread(this)来新建一个线程,newThread方法传入的参数是this,因为Worker本身实现了Runnable接口,也就是一个线程,所以一个Worker对象在启动时会调用Worker类的run方法

Worker继承于AQS,用于判断线程是否空闲以及是否可被中断,从tryAcquire方法可看出Worker是不可重入的,这也是为什么不直接使用ReentrantLock的原因。之所以设置为不可重入,是因为不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁,否则会中断正在运行的线程

构造方法中执行了setState(-1)state变量设置为-1,是为了禁止在执行任务前对线程进行中断AQS中默认的state0,若刚创建一个Worker对象,还没有执行任务时,不应该被中断。所以在runWorker方法中会先调用Worker对象的unlock方法将state设置为0

lock方法一旦获取了独占锁,表示当前线程正在执行任务中,则不应该中断线程;若该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;

线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue; // 存放任务的阻塞队列
private final ReentrantLock mainLock = new ReentrantLock(); // 对线程池内部各种变量进行互斥访问控制
private final HashSet<Worker> workers = new HashSet<Worker>(); // 线程集合
private final Condition termination = mainLock.newCondition();
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // Worker封装的线程
Runnable firstTask; // Worker接收到的第一个任务,即保存传入的任务
volatile long completedTasks; // Worker执行完毕的任务个数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 以当前Worker创建一个thread
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) { // 不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
}

在Worker的run方法中调用runWorker方法while循环从队列中获取任务,执行原始任务run方法而不是start方法,从而达到Worker线程复用的目的,如果任务执行异常会导致Worker线程退出,在finally中会先移除就的Worker再创建一个新的Worker线程

线程池原理

每次向线程池提交任务时,首先判断当前线程数是否大于或等于corePoolSize,若小于则新建线程执行,若大于则判断队列是否已满,若未满则放入队列,若已满,判断当前线程数是否大于或等于maximumPoolSize,若小于则新建线程执行,若大于则执行拒绝策略

线程池原理

值得注意的是,当往阻塞队列中放任务时addWorker(null, false)并没有传入任务,因为任务已经被添加到workQueue中了,worker在执行时会直接workQueue中获取任务。也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中
if (workerCountOf(c) < corePoolSize) {
// 第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
if (addWorker(command, true))
return; // 若添加失败直接返回
// 若果添加失败,则重新获取ctl值
c = ctl.get();
}
// 若当前线程数大于或等于corePoolSize,且通过阻塞队列offer方法将任务放入队列中
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,若不是运行状态,由于之前把command已添加到workQueue中,需移除该command,执行后通过拒绝策略对该任务进行处理,整个方法返回
if (!isRunning(recheck) && remove(command))
reject(command); // 调用具体配置的拒绝策略
// 获取线程池中有效线程数,若为0则执行addWorker方法,若workerCount大于0则直接返回,在workQueue中新增的command会在将来的某个时刻被执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 放入队列失败:若线程池已经不是RUNNING状态,或workerCount >= corePoolSize且workQueue已满,
else if (!addWorker(command, false))
reject(command); // 添加失败直接执行拒绝策略
}
// firstTask新开一个线程,若core为true则corePoolSize为上界,否则maximumPoolSize为上界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 获取运行状态
// 只要状态大于或等于SHUTDOWN,则说明线程池进入了关闭过程,此时不再接收新任务,则添加失败
// SHUTDOWN状态下firstTask不为空,则返回false,firstTask为空且workQueue为空,队列中已经无任务,无需再添加线程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c); // 当前活动的线程数
// 若工作线程数大于容量或大于设定的核心线程数或最大线程数,则添加失败
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 若将工作线程数加一成功,则直接退出内存和外层自旋,执行下面的逻辑
if (compareAndIncrementWorkerCount(c))
break retry; // 退出内层和外层循环
c = ctl.get(); // Re-read ctl
// 若运行状态发生改变,需要重试外循环
if (runStateOf(c) != rs)
continue retry;
// 否则CAS由于workerCount变化而失败;重试内循环
}
}
// workerCount成功加一,则开始添加线程操作
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 根据firstTask来创建Worker对象
final Thread t = w.thread; // 每一个Worker对象都会创建一个线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 若rs是RUNNING状态或rs是SHUTDOWN状态且firstTask为null,向线程池中添加线程
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 预先检查t是否可启动
throw new IllegalThreadStateException();
workers.add(w); // 把线程加入到线程集合
int s = workers.size();
if (s > largestPoolSize) // largestPoolSize记录着线程池中出现过的最大线程数量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 若成功加入到线程集合则启动线程,这里启动是调用Worker的run方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 加入失败或启动失败,将workerCount减一且将
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null) // 若是启动失败造成的需要将worker移除
workers.remove(w);
decrementWorkerCount(); // workerCount减一
tryTerminate();
} finally {
mainLock.unlock();
}
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

addWorker方法的主要工作是在线程池中创建一个新的线程并执行firstTask参数用于指定新增的线程执行的第一个任务core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSizefalse表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

rs == SHUTDOWN的情况下不会接受新提交的任务,故在firstTask不为空时会=返回false;若firstTask为空,且workQueue也为空,队列中已经没有任务了,不需要再添加线程了故返回false

任务执行过程

任务提交过程中会开启一个新的Worker线程,并把任务本身作为firstTask赋给该Worker,但对于一个Worker不仅执行一个任务,而是不断从队列中取出任务执行

getTask第二个if判断目的是控制线程池的有效线程数量。在执行execute方法时,若当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,且workQueue已满时,则可以增加工作线程,但这时若超时没有获取到任务,也就是timedOuttrue的情况,说明workQueue已经为空了,说明当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。是runWorker方法执行完之后,也就是Workerrun方法执行完,由JVM自动回收会销毁空闲线程

wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法,timed && timedOut如果为true,表示当前操作需要进行超时控制,且上次从阻塞队列中获取任务发生了超时wc == 1时说明当前线程是线程池中唯一的一个线程

getTask方法返回null时,在runWorker方法中会跳出while循环,该while循环就是线程复用的关键,然后会执行processWorkerExit方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 获取第一个任务
w.firstTask = null;
w.unlock(); // 初始时state变量为-1,不允许被中断,unlock方法将state设置为0,允许中断
boolean completedAbruptly = true; // 是否因为异常退出循环
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 执行任务前加锁,对应shutdown时的tryLock
// 若池正在停止,确保线程被中断;若没有则要确保线程不被中断。需要在第二种情况下重新检查以在清除中断的同时处理 shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt(); // 给自己发中断信号
try {
beforeExecute(wt, task); // 任务之前的钩子函数,目前实现为空
Throwable thrown = null;
try {
task.run(); // 执行任务代码,线程复用的关键
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任务之后的钩子函数,目前实现为空
}
} finally {
task = null;
w.completedTasks++; // 成功完成任务,completedTasks累加
w.unlock(); // 释放锁
}
}
completedAbruptly = false; // 用于判断Worker是正常退出还是中断或异常退出
} finally {
processWorkerExit(w, completedAbruptly); // Worker退出
}
}
private Runnable getTask() {
boolean timedOut = false; // timeOut表示上次从阻塞队列中取任务时是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 非RUNNING状态,rs>=STOP表示线程池是否正在stop,
// 当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制,对于超过核心线程数的线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 释放空闲线程,keepAliveTime的效用
return null;
continue;
}
try {
// 通过阻塞队列的poll方法进行超时控制,若在keepAliveTime时间内没有获取到任务,则返回null
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true; // 若 r == null,说明已经超时,timedOut设置为true
} catch (InterruptedException retry) {
timedOut = false; // 若获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 若线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了
if (completedAbruptly) // 若为中断或异常退出,需要将workerCount减1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; //统计完成的任务数
workers.remove(w); // 从workers中移除,也就表示着从线程池中移除了一个工作线程
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池,和shutdown、shutdownNow一样,每个线程在结束时都会尝试调用该函数,看是否可以终止整个线程池
tryTerminate();
// 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker
// 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker
// 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

拒绝策略

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

AbortPolicy是线程池的默认拒绝策略,直接抛出异常

1
2
3
4
5
6
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}

CallerRunsPolicy使用调用者线程执行任务

1
2
3
4
5
6
7
8
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

DiscardOldestPolicy丢弃队列尾部排队的线程,即排队最久的,不抛异常

1
2
3
4
5
6
7
8
9
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

DiscardPolicy丢弃当前任务,不抛异常

1
2
3
4
5
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}