线程池与连接池等的思想差不多,就是一种“池化”技术,预先定义好一系列的线程对象,支持复用。
简介
ThreadPoolExecutor 拥有一定数量的 Worker(Runnable 类型),这些 Worker 被用作不断地执行提交上来的 task(Runnable)。ThreadPoolExecutor 实现了 ExecutorService 接口,该接口定义了管理异步执行的任务的通用服务,在提交任务后返回 Future 对象支持异步化。
本篇主要解析 JDK 的线程池实现类 ThreadPoolExecutor,至于其子类 ScheduledExecutorService,是通过 DelayedWorkQueue 实现延时和周期循环的,具体可以查看内部类 ScheduledFutureTask 及相应源码以及后续博文。
继承体系
Executor
仅定义有空返回值的
execute
方法,执行传入的 Runnable 任务。ExecutorService
定义了管理终止 Executor 相关的方法,还定义了返回 Future 对象的
submit
/invoke
方法用于(批量)提交任务,而 Future 可以方便 追踪 已提交的 异步 任务。AbstractExecutorService
主要实现
submit
/invoke
方法,用于将newTaskFor
方法(私有)包装好的任务传给execute
方法执行。ScheduledExecutorService
定义带延时时间 delay 和周期间隔 period 的
schedule
方法。
源码解析
构造方法
其所有重载的构造方法最终调用的都是这一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
该构造器的参数含义如下:
corePoolSize
核心线程数,这部分线程在线程池生命周期内会一直保持运行。
maximumPoolSize
最大线程数,核心线程池忙不过来时(且队列已满)会添加临时的线程去支持,这部分“临时线程”的最大值即
(maximumPoolSize - corePoolSize)
。keepAliveTime
除 corePoolSize 核心线程 之外 的线程在超过 keepAliveTime 时间之后会被回收。
unit
keepAliveTime 的时间单位。
workQueue
等待队列,有以下常用实现:
ArrayBlockingQueue
数组实现的有界阻塞循环队列。
LinkedBlockingQueue
链表实现的阻塞队列(可定义上限),
Executors.newFixedThreadPool()
使用了这个队列。SynchronousQueue
数据转移同步点,本身不缓存数据,
Executors.newCachedThreadPool()
使用了这个队列。PriorityBlockingQueue
堆结构实现的无界阻塞优先队列。
threadFactory
线程工厂,通过设置产生的线程“组名”可以方便内存分析。
handler
RejectedExecutionHandler 类型,即线程池所使用的拒绝策略处理器,执行相应拒绝策略,ThreadPoolExecutor 中实现了以下策略:
AbortPolicy
直接抛出异常,默认策略。
CallerRunsPolicy
只用调用者所在线程来运行任务。
DiscardOldestPolicy
丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy
不处理,丢弃掉。当然也可以根据应用场景需要来实现
ctl
ctl 是核心成员变量,其为 AtomicInteger 类型,按位(32 位)包含两个含义:runState(高 3 位)和 workerCount(低 29 位),后者代表 Worker 线程数,前者 runState 有以下状态:
RUNNING
接受新 task 且处理队列中的 task;
SHUTDOWN
不接受新 task 但处理队列中的 task;
STOP
不接受新 task 也不处理队列中的 task,且中断正在运行的 task;
TIDYING
所有 task 都终止了,workerCount 为 0,线程调用
terminated()
钩子;TERMINATED
terminated()
完成。
其状态变更如下:
- 调用
shutdown()
:RUNNING -> SHUTDOWN - 调用
shutdownNow()
:(RUNNING or SHUTDOWN) -> STOP - 队列和线程池都为空:SHUTDOWN -> TIDYING
- 线程池为空:STOP -> TIDYING
terminated()
调用完成:TIDYING -> TERMINATED
addWorker()
方法根据 runState 添加新 Worker 线程。
添加任务
execute
实现 Executor 接口基本定义。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
正在运行的线程少于 core 数量(取 ctl 的低 29 位),尝试
启动一个新的 core 线程并将当前 command 作为其 firstTask
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果添加 core worker 失败,再次获取 ctl
c = ctl.get();
}
// 任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
// double-check
int recheck = ctl.get();
// runState 不为 RUNNING,回滚入队操作
if (!isRunning(recheck) && remove(command))
// 调用相应 RejectedExecutionHandler 的策略
reject(command);
// SHUTDOWN,刚被 shutdown()了,允许添加 worker 但任务会被舍弃
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 其余情况就让 command 在队列中好好呆着就行
} else if (!addWorker(command, false)) {
/*
状态不为 RUNNING,或者入队失败,
尝试添加非核心 Worker(使用maximumPoolSize 判断)。
若还失败,说明这个线程池完完全全的满了,或者状态不对(被 shutdown 了),
此时会调用构造时指定的 RejectedExecutionHandler 的 rejectedExecution()
方法,执行拒绝策略
*/
reject(command);
}
}
addWorker
简化如下:
private boolean addWorker(Runnable firstTask, boolean core) {
...
/*
已是 SHUTDOWN 后的状态了(通过 shutdown() 或 shutdownNow()),
如果线程池是通过 shutdown() 结束的,且任务队列非空,则意味着还有
任务没处理完,新任务不会被执行,但是新 Worker 线程还是会 使用
maximumPoolSize 策略被添加,这样能争取尽早完成任务队列中的任务。
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// 将 Runnable 的任务包装成 Worker
w = new Worker(firstTask);
// 封装的线程
final Thread t = w.thread;
// CAS ctl 线程数...
// 添加到 HashSet 列表
workers.add(w);
// 启动新线程
t.start();
}
Worker 构造器:
Worker(Runnable firstTask) {
// 启动之前禁止打断,见 shutdownNow() 的解析
setState(-1);
// 将第一个任务存下来
this.firstTask = firstTask;
// 将 Worker 本身传入 ThreadFactory,获取 target 为本 Worker 的 Thread
this.thread = getThreadFactory().newThread(this);
}
Worker 内部类继承了 AbstractQueuedSynchronizer 以实现同步,本文省略了加锁同步的逻辑。
t.start()
交给 CPU 启动一个线程后回调 Worker 的 run()
方法,这里统一调用了 ThreadPoolExecutor 的 runWorker()
:
public void run() {
runWorker(this);
}
runWorker()
中,线程池的核心线程在 第一个 task 运行完后会进 getTask()
方法继续拿任务:
// 允许打断,见 shutdownNow() 的解析
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// ...
task.run();
// ...
}
completedAbruptly = false;
} finally {
/*
如果执行 task 发生了异常,线程池会重新添加一个初始任务
为 null 的 Worker,随之启动一个新的线程
*/
processWorkerExit(w, completedAbruptly);
}
getTask
从初始化 ThreadPoolExecutor 时设置的 BlockingQueue 拿任务:
int rs = runStateOf(ctl.get());
// 状态为 SHUTDOWN 且队列为空,或者状态为 STOP 及后续
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
// 使上面的 runWorker 跳出 while 循环,从而停止线程
return null;
}
for (;;) {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
}
停止线程池
调用 interrupt()
可以修改中断标志位,从而利用逻辑使线程退出,而对于停止线程池,ThreadPoolExecutor 提供了两种方式。
shutdown
// CAS ctl 状态到 SHUTDOWN
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
// 留给 ScheduledThreadPoolExecutor 实现的回调方法
onShutdown();
tryTerminate();
interruptIdleWorkers
通过 Thread.interrupt()
,打断所有空闲线程。
private void interruptIdleWorkers(boolean onlyOne) {
for (Worker w : workers) {
Thread t = w.thread;
/*
tryLock 保证被打断打线程是 idle 的,
此时持有 AQS 同步资源的线程将被跳过(tryLock 立即返回)
*/
if (!t.isInterrupted() && w.tryLock()) {
t.interrupt();
}
}
}
tryTerminate
如果已经为 SHUTDOWN 状态且池和队列均为空,或者为 STOP 状态且池为空,则将状态转换为 TERMINATED,完成线程池的停止。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
1. RUNNING 无需 terminate;
2. TIDYING、TERMINATED 状态的线程说明已执行过了;
3. 状态为 SHUTDOWN 但任务队列还非空的情况,应该
是通过 shutdown() 停止的,但是队列中还是有等待
被执行的任务,直接返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
/*
workerCount 非零,尝试中断一个空闲线程,之后直接返回。
等最后一个 worker 结束后,其 processWorkerExit 再
一次调用 tryTerminate(),workers 数量为 0,才会走下
面的 CAS 逻辑
*/
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// CAS c -> TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 子类回调方法,默认空实现
terminated();
// CAS TIDYING -> TERMINATED (有 Lock 不用 CAS)
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用 awaitTermination() 方法的线程
termination.signalAll();
return;
}
// CAS 失败,retry
}
}
可能造成线程池停止运行的所有操作都会尝试调用
tryTerminate()
,比如移除 worker 的方法remove()
、run 中的getTask()
方法都有调用。
shutdownNow
// CAS ctl 状态到 STOP
advanceRunState(STOP);
interruptWorkers();
List<Runnable> tasks = drainQueue();
// 后续和 shutdown() 一样,见上文
tryTerminate();
return tasks;
interruptWorkers
通过 Thread.interrupt()
,打断所有已启动的线程。
for (Worker w : workers)
w.interruptIfStarted();
interruptIfStarted()
是 Worker 的方法,用于打断线程:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
t.interrupt();
}
}
为了防止
runWorker()
之前线程被打断(打断也没用,所以这里用 state 来简化此类判断),Worker 构造器中设置的 AQS 初始同步状态 state 的值是-1
,在runWorker()
中改成了 0。
drainQueue
将队列中的元素取出放到一个 ArrayList 中返回。
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
/*
drainTo 失败,可能队列是特殊类型(如 DelayQueue,
它只会 drain 已经超时的元素),此时尝试一个个删
*/
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
对比
shutdown
shutdown()
方法会等待正在执行的任务完成,如果等待队列不为空,会等任务被执行完,任务队列为空后,才进入到 SHUTDOWN 的后续状态。注意,本次
shutdown()
并不会阻塞等待这件事的发生,如果需要在调用shutdown()
方法的线程中等待线程池的关闭,可以调用利用了 Condition 条件的awaitTermination()
方法。shutdownNow
shutdownNow()
会 interrupt 正在执行 的任务,而在队列中等待执行的任务会被转移到集合中被返回(注意到此方法是有返回值的,和 shutdown 不一样)。
为什么要使用线程池
- 减小线程创建和销毁带来的消耗;
- 方便控制并发,分配 CPU 资源;
- 传统的 new 一个线程给
run
逻辑交于 CPU 去跑的方式中,我们是将 “线程要做的事” 和 ”做这件事的线程“ 绑定在了一起,而使用线程池是将这两件事 解耦 开了。
线程池的合理配置
任务分类
任务一般可以这样分类:
根据性质
CPU 密集型、IO 密集型、混合型。
根据优先级
根据执行时长
根据依赖性
是否依赖其他系统资源,如数据库连接。
不同的应对方式
不同的任务可以用不同规模的线程池分开处理。
比如针对 CPU 密集型任务,可以配置尽可能小的线程池,减少线程上下文切换,如配置 Ncpu+1
个核心线程。而针对 IO 密集型任务,由于线程 并不是一直在使用 CPU, 可以配置较多线程,如 2*Ncpu
。
混合型的任务,如果可以 拆分,则将其拆分成 CPU 密集型任务和 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率将高于串行执行的吞吐率。如果这两个任务执行时间相差太大,没必要进行分解。
可以通过
Runtime.getRuntime().availableProcessors()
获得当前设备的 CPU 个数。
对于高并发、执行时间长的任务,应先考虑能否使用中间件对任务进行拆分和解耦、数据能否做缓存、能否加机器等,再来考虑线程池。
为什么不能无限的创建线程
- 栈空间有限,内存不够了会 OOM。
- CPU 中执行上下文的切换会导致 CPU 中的 指令流水线(Instruction Pipeline) 的中断和 CPU 缓存 的失效,如果线程太多,线程切换的时间会比线程执行的时间还要长,严重浪费了 CPU 资源,得不偿失。
- 若线程间有竞态,对于共享资源的同步操作(如使用锁)会导致线程切换开销急剧增加。