ThreadPoolExecutor 浅析

线程池与连接池等的思想差不多,就是一种“池化”技术,预先定义好一系列的线程对象,支持复用。

简介

ThreadPoolExecutor 拥有一定数量的 Worker(Runnable 类型),这些 Worker 被用作不断地执行提交上来的 task(Runnable)。ThreadPoolExecutor 实现了 ExecutorService 接口,该接口定义了管理异步执行的任务的通用服务,在提交任务后返回 Future 对象支持异步化。

本篇主要解析 JDK 的线程池实现类 ThreadPoolExecutor,至于其子类 ScheduledExecutorService,是通过 DelayedWorkQueue 实现延时和周期循环的,具体可以查看内部类 ScheduledFutureTask 及相应源码以及后续博文。

继承体系

Executor 继承体系

  • Executor

    仅定义有空返回值的 execute 方法,执行传入的 Runnable 任务。

  • ExecutorService

    定义了管理终止 Executor 相关的方法,还定义了返回 Future 对象的 submit/invoke 方法用于(批量)提交任务,而 Future 可以方便 追踪 已提交的 异步 任务。

  • AbstractExecutorService

    主要实现 submit/invoke 方法,用于将 newTaskFor 方法(私有)包装好的任务传给 execute 方法执行。

  • ScheduledExecutorService

    定义带延时时间 delay 和周期间隔 period 的 schedule 方法。

源码解析

构造方法

其所有重载的构造方法最终调用的都是这一个:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
        
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

该构造器的参数含义如下:

  • corePoolSize

    核心线程数,这部分线程在线程池生命周期内会一直保持运行。

  • maximumPoolSize

    最大线程数,核心线程池忙不过来时(且队列已满)会添加临时的线程去支持,这部分“临时线程”的最大值即 (maximumPoolSize - corePoolSize)

  • keepAliveTime

    除 corePoolSize 核心线程 之外 的线程在超过 keepAliveTime 时间之后会被回收。

  • unit

    keepAliveTime 的时间单位。

  • workQueue

    等待队列,有以下常用实现:

    • ArrayBlockingQueue

      数组实现的有界阻塞循环队列。

    • LinkedBlockingQueue

      链表实现的阻塞队列(可定义上限),Executors.newFixedThreadPool() 使用了这个队列。

    • SynchronousQueue

      数据转移同步点,本身不缓存数据,Executors.newCachedThreadPool() 使用了这个队列。

    • PriorityBlockingQueue

      堆结构实现的无界阻塞优先队列。

  • threadFactory

    线程工厂,通过设置产生的线程“组名”可以方便内存分析。

  • handler

    RejectedExecutionHandler 类型,即线程池所使用的拒绝策略处理器,执行相应拒绝策略,ThreadPoolExecutor 中实现了以下策略:

    • AbortPolicy

      直接抛出异常,默认策略。

    • CallerRunsPolicy

      只用调用者所在线程来运行任务。

    • DiscardOldestPolicy

      丢弃队列里最近的一个任务,并执行当前任务。

    • DiscardPolicy

      不处理,丢弃掉。当然也可以根据应用场景需要来实现

ctl

ctl 是核心成员变量,其为 AtomicInteger 类型,按位(32 位)包含两个含义:runState(高 3 位)和 workerCount(低 29 位),后者代表 Worker 线程数,前者 runState 有以下状态:

  • RUNNING

    接受新 task 且处理队列中的 task;

  • SHUTDOWN

    不接受新 task 但处理队列中的 task;

  • STOP

    不接受新 task 也不处理队列中的 task,且中断正在运行的 task;

  • TIDYING

    所有 task 都终止了,workerCount 为 0,线程调用 terminated() 钩子;

  • TERMINATED

    terminated() 完成。

其状态变更如下:

  • 调用 shutdown()RUNNING -> SHUTDOWN
  • 调用 shutdownNow():(RUNNING or SHUTDOWN) -> STOP
  • 队列和线程池都为空:SHUTDOWN -> TIDYING
  • 线程池为空:STOP -> TIDYING
  • terminated() 调用完成:TIDYING -> TERMINATED

addWorker() 方法根据 runState 添加新 Worker 线程。

添加任务

execute

实现 Executor 接口基本定义。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    /* 
       正在运行的线程少于 core 数量(取 ctl 的低 29 位),尝试
       启动一个新的 core 线程并将当前 command 作为其 firstTask
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
            
        // 如果添加 core worker 失败,再次获取 ctl
        c = ctl.get();
    }
    
    // 任务入队成功
    if (isRunning(c) && workQueue.offer(command)) {
        // double-check
        int recheck = ctl.get();
        
        // runState 不为 RUNNING,回滚入队操作
        if (!isRunning(recheck) && remove(command))
            // 调用相应 RejectedExecutionHandler 的策略
            reject(command);
            
        // SHUTDOWN,刚被 shutdown()了,允许添加 worker 但任务会被舍弃
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
            
        // 其余情况就让 command 在队列中好好呆着就行
        
    } else if (!addWorker(command, false)) {
        /*
           状态不为 RUNNING,或者入队失败,
           尝试添加非核心 Worker(使用maximumPoolSize 判断)。
           若还失败,说明这个线程池完完全全的满了,或者状态不对(被 shutdown 了),
           此时会调用构造时指定的 RejectedExecutionHandler 的 rejectedExecution()
           方法,执行拒绝策略
         */
        reject(command);
    }
}
addWorker

简化如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
        
    /* 
       已是 SHUTDOWN 后的状态了(通过 shutdown() 或 shutdownNow()),
       如果线程池是通过 shutdown() 结束的,且任务队列非空,则意味着还有
       任务没处理完,新任务不会被执行,但是新 Worker 线程还是会 使用
       maximumPoolSize 策略被添加,这样能争取尽早完成任务队列中的任务。
     */
    if (rs >= SHUTDOWN &&
        !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
    return false;
    
    // 将 Runnable 的任务包装成 Worker
    w = new Worker(firstTask);
    
    // 封装的线程
    final Thread t = w.thread;
    
    // CAS ctl 线程数...
    
    // 添加到 HashSet 列表
    workers.add(w);
    
    // 启动新线程
    t.start();
}

Worker 构造器:

Worker(Runnable firstTask) {
    // 启动之前禁止打断,见 shutdownNow() 的解析
    setState(-1);
    // 将第一个任务存下来
    this.firstTask = firstTask;
    // 将 Worker 本身传入 ThreadFactory,获取 target 为本 Worker 的 Thread
    this.thread = getThreadFactory().newThread(this);
}

Worker 内部类继承了 AbstractQueuedSynchronizer 以实现同步,本文省略了加锁同步的逻辑。

t.start() 交给 CPU 启动一个线程后回调 Worker 的 run() 方法,这里统一调用了 ThreadPoolExecutor 的 runWorker()

public void run() {
    runWorker(this);
}

runWorker() 中,线程池的核心线程在 第一个 task 运行完后会进 getTask() 方法继续拿任务

// 允许打断,见 shutdownNow() 的解析
w.unlock();

boolean completedAbruptly = true;
try {
    while (task != null || (task = getTask()) != null) {
        // ...
        task.run();
        // ...
    }
    completedAbruptly = false;
} finally {
    /*
       如果执行 task 发生了异常,线程池会重新添加一个初始任务
       为 null 的 Worker,随之启动一个新的线程
     */
    processWorkerExit(w, completedAbruptly);
}
getTask

从初始化 ThreadPoolExecutor 时设置的 BlockingQueue 拿任务:

int rs = runStateOf(ctl.get());

// 状态为 SHUTDOWN 且队列为空,或者状态为 STOP 及后续
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    // 使上面的 runWorker 跳出 while 循环,从而停止线程
    return null;
}

for (;;) {
    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
}

execute 流程

停止线程池

调用 interrupt() 可以修改中断标志位,从而利用逻辑使线程退出,而对于停止线程池,ThreadPoolExecutor 提供了两种方式。

shutdown

// CAS ctl 状态到 SHUTDOWN
advanceRunState(SHUTDOWN);

interruptIdleWorkers();

// 留给 ScheduledThreadPoolExecutor 实现的回调方法
onShutdown();

tryTerminate();
interruptIdleWorkers

通过 Thread.interrupt(),打断所有空闲线程。

private void interruptIdleWorkers(boolean onlyOne) {
    for (Worker w : workers) {
        Thread t = w.thread;
        /* 
           tryLock 保证被打断打线程是 idle 的,
           此时持有 AQS 同步资源的线程将被跳过(tryLock 立即返回)
         */
        if (!t.isInterrupted() && w.tryLock()) {
            t.interrupt();
        }
    }
}
tryTerminate

如果已经为 SHUTDOWN 状态且池和队列均为空,或者为 STOP 状态且池为空,则将状态转换为 TERMINATED,完成线程池的停止。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        
        /* 
           1. RUNNING 无需 terminate;
           2. TIDYING、TERMINATED 状态的线程说明已执行过了;
           3. 状态为 SHUTDOWN 但任务队列还非空的情况,应该
              是通过 shutdown() 停止的,但是队列中还是有等待
              被执行的任务,直接返回
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        
        /*
           workerCount 非零,尝试中断一个空闲线程,之后直接返回。
           等最后一个 worker 结束后,其 processWorkerExit 再
           一次调用 tryTerminate(),workers 数量为 0,才会走下
           面的 CAS 逻辑
         */
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // CAS c -> TIDYING
        if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
            // 子类回调方法,默认空实现
            terminated();
            // CAS TIDYING -> TERMINATED (有 Lock 不用 CAS)
            ctl.set(ctlOf(TERMINATED, 0));
            // 唤醒调用 awaitTermination() 方法的线程
            termination.signalAll();
            return;
        }

        // CAS 失败,retry
    }
}

可能造成线程池停止运行的所有操作都会尝试调用 tryTerminate(),比如移除 worker 的方法 remove()、run 中的 getTask() 方法都有调用。

shutdownNow

// CAS ctl 状态到 STOP
advanceRunState(STOP);

interruptWorkers();

List<Runnable> tasks = drainQueue();

// 后续和 shutdown() 一样,见上文
tryTerminate();

return tasks;
interruptWorkers

通过 Thread.interrupt(),打断所有已启动的线程。

for (Worker w : workers)
    w.interruptIfStarted();

interruptIfStarted() 是 Worker 的方法,用于打断线程:

void interruptIfStarted() {
     Thread t;
     if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
         t.interrupt();
     }
 }

为了防止 runWorker() 之前线程被打断(打断也没用,所以这里用 state 来简化此类判断),Worker 构造器中设置的 AQS 初始同步状态 state 的值是 -1,在 runWorker() 中改成了 0。

drainQueue

将队列中的元素取出放到一个 ArrayList 中返回。

private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    /*
       drainTo 失败,可能队列是特殊类型(如 DelayQueue,
       它只会 drain 已经超时的元素),此时尝试一个个删
     */
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

对比

  • shutdown

    shutdown() 方法会等待正在执行的任务完成,如果等待队列不为空,会等任务被执行完,任务队列为空后,才进入到 SHUTDOWN 的后续状态。

    注意,本次 shutdown() 并不会阻塞等待这件事的发生,如果需要在调用 shutdown() 方法的线程中等待线程池的关闭,可以调用利用了 Condition 条件的 awaitTermination() 方法。

  • shutdownNow

    shutdownNow() 会 interrupt 正在执行 的任务,而在队列中等待执行的任务会被转移到集合中被返回(注意到此方法是有返回值的,和 shutdown 不一样)。

为什么要使用线程池

  1. 减小线程创建和销毁带来的消耗;
  2. 方便控制并发,分配 CPU 资源;
  3. 传统的 new 一个线程给 run 逻辑交于 CPU 去跑的方式中,我们是将 “线程要做的事” 和 ”做这件事的线程“ 绑定在了一起,而使用线程池是将这两件事 解耦 开了。

线程池的合理配置

任务分类

任务一般可以这样分类:

  • 根据性质

    CPU 密集型、IO 密集型、混合型。

  • 根据优先级

  • 根据执行时长

  • 根据依赖性

    是否依赖其他系统资源,如数据库连接。

不同的应对方式

不同的任务可以用不同规模的线程池分开处理。

比如针对 CPU 密集型任务,可以配置尽可能小的线程池,减少线程上下文切换,如配置 Ncpu+1 个核心线程。而针对 IO 密集型任务,由于线程 并不是一直在使用 CPU, 可以配置较多线程,如 2*Ncpu

混合型的任务,如果可以 拆分,则将其拆分成 CPU 密集型任务和 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率将高于串行执行的吞吐率。如果这两个任务执行时间相差太大,没必要进行分解。

可以通过 Runtime.getRuntime().availableProcessors() 获得当前设备的 CPU 个数。

对于高并发、执行时间长的任务,应先考虑能否使用中间件对任务进行拆分和解耦、数据能否做缓存、能否加机器等,再来考虑线程池

为什么不能无限的创建线程

  • 栈空间有限,内存不够了会 OOM。
  • CPU 中执行上下文的切换会导致 CPU 中的 指令流水线(Instruction Pipeline) 的中断和 CPU 缓存 的失效,如果线程太多,线程切换的时间会比线程执行的时间还要长,严重浪费了 CPU 资源,得不偿失。
  • 若线程间有竞态,对于共享资源的同步操作(如使用锁)会导致线程切换开销急剧增加。