ThreadPoolExecutor 浅析

线程池与连接池等的思想差不多,就是一种“池化”技术,预先定义好一系列的线程对象,支持复用。

简介

ThreadPoolExecutor 拥有一定数量的 Worker(Runnable 类型),这些 Worker 被用作不断地执行提交上来的 task(Runnable)。ThreadPoolExecutor 实现了 ExecutorService 接口,该接口定义了管理异步执行的任务的通用服务,在提交任务后返回 Future 对象支持异步化。

本篇主要解析 JDK 的线程池实现类 ThreadPoolExecutor,至于其子类 ScheduledExecutorService,是通过 DelayedWorkQueue 实现延时和周期循环的,具体可以查看内部类 ScheduledFutureTask 及相应源码以及后续博文。

继承体系

Executor 继承体系

  • Executor

    仅定义有空返回值的 execute 方法,执行传入的 Runnable 任务。

  • ExecutorService

    定义了管理终止 Executor 相关的方法,还定义了返回 Future 对象的 submit/invoke 方法用于(批量)提交任务,而 Future 可以方便 追踪 已提交的 异步 任务。

  • AbstractExecutorService

    主要实现 submit/invoke 方法,用于将 newTaskFor 方法(私有)包装好的任务传给 execute 方法执行。

  • ScheduledExecutorService

    定义带延时时间 delay 和周期间隔 period 的 schedule 方法。

源码解析

构造方法

其所有重载的构造方法最终调用的都是这一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

该构造器的参数含义如下:

  • corePoolSize

    核心线程数,这部分线程在线程池生命周期内会一直保持运行。

  • maximumPoolSize

    最大线程数,核心线程池忙不过来时(且队列已满)会添加临时的线程去支持,这部分“临时线程”的最大值即 (maximumPoolSize - corePoolSize)

  • keepAliveTime

    除 corePoolSize 核心线程 之外 的线程在超过 keepAliveTime 时间之后会被回收。

  • unit

    keepAliveTime 的时间单位。

  • workQueue

    等待队列,有以下常用实现:

    • ArrayBlockingQueue

      数组实现的有界阻塞循环队列。

    • LinkedBlockingQueue

      链表实现的阻塞队列(可定义上限),Executors.newFixedThreadPool() 使用了这个队列。

    • SynchronousQueue

      数据转移同步点,本身不缓存数据,Executors.newCachedThreadPool() 使用了这个队列。

    • PriorityBlockingQueue

      堆结构实现的无界阻塞优先队列。

  • threadFactory

    线程工厂,通过设置产生的线程“组名”可以方便内存分析。

  • handler

    RejectedExecutionHandler 类型,即线程池所使用的拒绝策略处理器,执行相应拒绝策略,ThreadPoolExecutor 中实现了以下策略:

    • AbortPolicy

      直接抛出异常,默认策略。

    • CallerRunsPolicy

      只用调用者所在线程来运行任务。

    • DiscardOldestPolicy

      丢弃队列里最近的一个任务,并执行当前任务。

    • DiscardPolicy

      不处理,丢弃掉。当然也可以根据应用场景需要来实现

ctl

ctl 是核心成员变量,其为 AtomicInteger 类型,按位(32 位)包含两个含义:runState(高 3 位)和 workerCount(低 29 位),后者代表 Worker 线程数,前者 runState 有以下状态:

  • RUNNING

    接受新 task 且处理队列中的 task;

  • SHUTDOWN

    不接受新 task 但处理队列中的 task;

  • STOP

    不接受新 task 也不处理队列中的 task,且中断正在运行的 task;

  • TIDYING

    所有 task 都终止了,workerCount 为 0,线程调用 terminated() 钩子;

  • TERMINATED

    terminated() 完成。

其状态变更如下:

  • 调用 shutdown()RUNNING -> SHUTDOWN
  • 调用 shutdownNow():(RUNNING or SHUTDOWN) -> STOP
  • 队列和线程池都为空:SHUTDOWN -> TIDYING
  • 线程池为空:STOP -> TIDYING
  • terminated() 调用完成:TIDYING -> TERMINATED

addWorker() 方法根据 runState 添加新 Worker 线程。

添加任务

execute

实现 Executor 接口基本定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
/*
正在运行的线程少于 core 数量(取 ctl 的低 29 位),尝试
启动一个新的 core 线程并将当前 command 作为其 firstTask
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;

// 如果添加 core worker 失败,再次获取 ctl
c = ctl.get();
}

// 任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
// double-check
int recheck = ctl.get();

// runState 不为 RUNNING,回滚入队操作
if (!isRunning(recheck) && remove(command))
// 调用相应 RejectedExecutionHandler 的策略
reject(command);

// SHUTDOWN,刚被 shutdown()了,允许添加 worker 但任务会被舍弃
else if (workerCountOf(recheck) == 0)
addWorker(null, false);

// 其余情况就让 command 在队列中好好呆着就行

} else if (!addWorker(command, false)) {
/*
状态不为 RUNNING,或者入队失败,
尝试添加非核心 Worker(使用maximumPoolSize 判断)。
若还失败,说明这个线程池完完全全的满了,或者状态不对(被 shutdown 了),
此时会调用构造时指定的 RejectedExecutionHandler 的 rejectedExecution()
方法,执行拒绝策略
*/
reject(command);
}
}
addWorker

简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private boolean addWorker(Runnable firstTask, boolean core) {
...

/*
已是 SHUTDOWN 后的状态了(通过 shutdown() 或 shutdownNow()),
如果线程池是通过 shutdown() 结束的,且任务队列非空,则意味着还有
任务没处理完,新任务不会被执行,但是新 Worker 线程还是会 使用
maximumPoolSize 策略被添加,这样能争取尽早完成任务队列中的任务。
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

// 将 Runnable 的任务包装成 Worker
w = new Worker(firstTask);

// 封装的线程
final Thread t = w.thread;

// CAS ctl 线程数...

// 添加到 HashSet 列表
workers.add(w);

// 启动新线程
t.start();
}

Worker 构造器:

1
2
3
4
5
6
7
8
Worker(Runnable firstTask) {
// 启动之前禁止打断,见 shutdownNow() 的解析
setState(-1);
// 将第一个任务存下来
this.firstTask = firstTask;
// 将 Worker 本身传入 ThreadFactory,获取 target 为本 Worker 的 Thread
this.thread = getThreadFactory().newThread(this);
}

Worker 内部类继承了 AbstractQueuedSynchronizer 以实现同步,本文省略了加锁同步的逻辑。

t.start() 交给 CPU 启动一个线程后回调 Worker 的 run() 方法,这里统一调用了 ThreadPoolExecutor 的 runWorker()

1
2
3
public void run() {
runWorker(this);
}

runWorker() 中,线程池的核心线程在 第一个 task 运行完后会进 getTask() 方法继续拿任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 允许打断,见 shutdownNow() 的解析
w.unlock();

boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// ...
task.run();
// ...
}
completedAbruptly = false;
} finally {
/*
如果执行 task 发生了异常,线程池会重新添加一个初始任务
为 null 的 Worker,随之启动一个新的线程
*/
processWorkerExit(w, completedAbruptly);
}
getTask

从初始化 ThreadPoolExecutor 时设置的 BlockingQueue 拿任务:

1
2
3
4
5
6
7
8
9
10
11
12
int rs = runStateOf(ctl.get());

// 状态为 SHUTDOWN 且队列为空,或者状态为 STOP 及后续
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
// 使上面的 runWorker 跳出 while 循环,从而停止线程
return null;
}

for (;;) {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
}

execute 流程

停止线程池

调用 interrupt() 可以修改中断标志位,从而利用逻辑使线程退出,而对于停止线程池,ThreadPoolExecutor 提供了两种方式。

shutdown

1
2
3
4
5
6
7
8
9
// CAS ctl 状态到 SHUTDOWN
advanceRunState(SHUTDOWN);

interruptIdleWorkers();

// 留给 ScheduledThreadPoolExecutor 实现的回调方法
onShutdown();

tryTerminate();
interruptIdleWorkers

通过 Thread.interrupt(),打断所有空闲线程。

1
2
3
4
5
6
7
8
9
10
11
12
private void interruptIdleWorkers(boolean onlyOne) {
for (Worker w : workers) {
Thread t = w.thread;
/*
tryLock 保证被打断打线程是 idle 的,
此时持有 AQS 同步资源的线程将被跳过(tryLock 立即返回)
*/
if (!t.isInterrupted() && w.tryLock()) {
t.interrupt();
}
}
}
tryTerminate

如果已经为 SHUTDOWN 状态且池和队列均为空,或者为 STOP 状态且池为空,则将状态转换为 TERMINATED,完成线程池的停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final void tryTerminate() {
for (;;) {
int c = ctl.get();

/*
1. RUNNING 无需 terminate;
2. TIDYING、TERMINATED 状态的线程说明已执行过了;
3. 状态为 SHUTDOWN 但任务队列还非空的情况,应该
是通过 shutdown() 停止的,但是队列中还是有等待
被执行的任务,直接返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;

/*
workerCount 非零,尝试中断一个空闲线程,之后直接返回。
等最后一个 worker 结束后,其 processWorkerExit 再
一次调用 tryTerminate(),workers 数量为 0,才会走下
面的 CAS 逻辑
*/
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}

// CAS c -> TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 子类回调方法,默认空实现
terminated();
// CAS TIDYING -> TERMINATED (有 Lock 不用 CAS)
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用 awaitTermination() 方法的线程
termination.signalAll();
return;
}

// CAS 失败,retry
}
}

可能造成线程池停止运行的所有操作都会尝试调用 tryTerminate(),比如移除 worker 的方法 remove()、run 中的 getTask() 方法都有调用。

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
// CAS ctl 状态到 STOP
advanceRunState(STOP);

interruptWorkers();

List tasks = drainQueue();

// 后续和 shutdown() 一样,见上文
tryTerminate();

return tasks;
interruptWorkers

通过 Thread.interrupt(),打断所有已启动的线程。

1
2
for (Worker w : workers)
w.interruptIfStarted();

interruptIfStarted() 是 Worker 的方法,用于打断线程:

1
2
3
4
5
6
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
t.interrupt();
}
}

为了防止 runWorker() 之前线程被打断(打断也没用,所以这里用 state 来简化此类判断),Worker 构造器中设置的 AQS 初始同步状态 state 的值是 -1,在 runWorker() 中改成了 0。

drainQueue

将队列中的元素取出放到一个 ArrayList 中返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private List drainQueue() {
BlockingQueue q = workQueue;
ArrayList taskList = new ArrayList();
q.drainTo(taskList);
/*
drainTo 失败,可能队列是特殊类型(如 DelayQueue,
它只会 drain 已经超时的元素),此时尝试一个个删
*/
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

对比

  • shutdown

    shutdown() 方法会等待正在执行的任务完成,如果等待队列不为空,会等任务被执行完,任务队列为空后,才进入到 SHUTDOWN 的后续状态。

    注意,本次 shutdown() 并不会阻塞等待这件事的发生,如果需要在调用 shutdown() 方法的线程中等待线程池的关闭,可以调用利用了 Condition 条件的 awaitTermination() 方法。

  • shutdownNow

    shutdownNow() 会 interrupt 正在执行 的任务,而在队列中等待执行的任务会被转移到集合中被返回(注意到此方法是有返回值的,和 shutdown 不一样)。

为什么要使用线程池

  1. 减小线程创建和销毁带来的消耗;
  2. 方便控制并发,分配 CPU 资源;
  3. 传统的 new 一个线程给 run 逻辑交于 CPU 去跑的方式中,我们是将 “线程要做的事” 和 ”做这件事的线程“ 绑定在了一起,而使用线程池是将这两件事 解耦 开了。

线程池的合理配置

任务分类

任务一般可以这样分类:

  • 根据性质

    CPU 密集型、IO 密集型、混合型。

  • 根据优先级

  • 根据执行时长

  • 根据依赖性

    是否依赖其他系统资源,如数据库连接。

不同的应对方式

不同的任务可以用不同规模的线程池分开处理。

比如针对 CPU 密集型任务,可以配置尽可能小的线程池,减少线程上下文切换,如配置 Ncpu+1 个核心线程。而针对 IO 密集型任务,由于线程 并不是一直在使用 CPU, 可以配置较多线程,如 2*Ncpu

混合型的任务,如果可以 拆分,则将其拆分成 CPU 密集型任务和 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率将高于串行执行的吞吐率。如果这两个任务执行时间相差太大,没必要进行分解。

可以通过 Runtime.getRuntime().availableProcessors() 获得当前设备的 CPU 个数。

对于高并发、执行时间长的任务,应先考虑能否使用中间件对任务进行拆分和解耦、数据能否做缓存、能否加机器等,再来考虑线程池

为什么不能无限的创建线程

  • 栈空间有限,内存不够了会 OOM。
  • CPU 中执行上下文的切换会导致 CPU 中的 指令流水线(Instruction Pipeline) 的中断和 CPU 缓存 的失效,如果线程太多,线程切换的时间会比线程执行的时间还要长,严重浪费了 CPU 资源,得不偿失。
  • 若线程间有竞态,对于共享资源的同步操作(如使用锁)会导致线程切换开销急剧增加。