加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

ThreadPoolExcutor 原理探究 透过 ReentrantLock 分

发布时间:2020-12-15 07:09:27 所属栏目:Java 来源:网络整理
导读:概论 线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核

概论

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,线程数一般取 cpu 数量 +2 比较合适,线程数过多会导致额外的线程切换开销。

Java 中的线程池是用 ThreadPoolExecutor 类来实现的. 本文就对该类的源码来分析一下这个类内部对于线程的创建,管理以及后台任务的调度等方面的执行原理。

先看一下线程池的类图:

线程池的类图

上图的目的主要是为了让大家知道线程池相关类之间的关系,至少赚个眼熟,以后看到不会有害怕的感觉。


?

Executor 框架接口

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

下面是 ThreadPoolExeCutor 类图。Executors 其实是一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程实例。

从上图也可以看出来,ThreadPoolExeCutor 是线程池的核心。

J.U.C 中有三个 Executor 接口:

  • Executor:一个运行新任务的简单接口;

  • ExecutorService:扩展了 Executor 接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;

  • ScheduledExecutorService:扩展了 ExecutorService。支持 Future 和定期执行任务。

其实通过这些接口就可以看到一些设计思想,每个接口的名字和其任务是完全匹配的。不会因为 Executor 中只有一个方法,就将其放到其他接口中。这也是很重要的单一原则。


?

ThreadPoolExeCutor 分析

在去具体分析?ThreadPoolExeCutor 运行逻辑前,先看下面的流程图:

该图是 ThreadPoolExeCutor 整个运行过程的一个概括,整个源码的核心逻辑总结起来就是:

  1. 创建线程:要知道如何去创建线程,控制线程数量,线程的存活与销毁;

  2. 添加任务:任务添加后如何处理,是立刻执行,还是先保存;

  3. 执行任务:如何获取任务,任务执行失败后如何处理?

下面将进入源码分析,来深入理解?ThreadPoolExeCutor 的设计思想。


?

构造函数

先来看构造函数:

    public ThreadPoolExecutor(int corePoolSize, maximumPoolSize,1)">long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
     // 注意 workQueue,threadFactory,handler 是不可以为null 的,为空会直接抛出错误
if (workQueue == null || threadFactory == null || handler == null NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
  1. corePoolSize 核心线程数表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了corePoolSize,则不再重新创建线程。如果调用了?prestartCoreThread()?或者 prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。若 corePoolSize == 0,则任务执行完之后,没有任何请求进入时,销毁线程池的线程。若 corePoolSize > 0,即使本地任务执行完毕,核心线程也不会被销毁。corePoolSize 其实可以理解为可保留的空闲线程数。

  2. maximumPoolSize: 表示线程池能够容纳同时执行的最大线程数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。注意 maximumPoolSize >= 1 必须大于等于 1。maximumPoolSize == corePoolSize ,即是固定大小线程池。实际上最大容量是由?CAPACITY 控制

  3. keepAliveTime: 线程空闲时间。当空闲时间达到 keepAliveTime值时,线程会被销毁,直到只剩下 corePoolSize 个线程为止,避免浪费内存和句柄资源。默认情况,当线程池的线程数 > corePoolSize 时,keepAliveTime 才会起作用。但当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 变量设置为 true 时,核心线程超时后会被回收。

  4. unit:时间单位。为 keepAliveTime 指定时间单位。

  5. workQueue 缓存队列。当请求的线程数 > corePoolSize 时,线程进入 BlockingQueue 阻塞队列。可以使用 ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue。

  6. threadFactory?创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。

  7. handler 执行拒绝策略的对象。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
    • AbortPolicy: 直接拒绝所提交的任务,并抛出?RejectedExecutionException?异常;

    • CallerRunsPolicy:只用调用者所在的线程来执行任务;

    • DiscardPolicy:不处理直接丢弃掉任务;

    • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务


属性定义

看完构造函数之后,再来看下该类里面的变量,有助于进一步理解整个代码运行逻辑,下面是一些比较重要的变量:

// 用来标记线程池状态(高3位),线程个数(低29位)
默认是 RUNNING 状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

线程个数掩码位数,整型最大位数-3,可以适用于不同平台
static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00022222222222222222222222221111
int CAPACITY   = (1 << COUNT_BITS) - 1(高3位):11100000000000000000000000000000
int RUNNING    = -1 << COUNT_BITS;

(高3位):00000000000000000000000000000000
int SHUTDOWN   =  0 <<(高3位):00100000000000000000000000000000
int STOP       =  1 <<(高3位):01000000000000000000000000000000
int TIDYING    =  2 <<(高3位):01100000000000000000000000000000
int TERMINATED =  3 << 获取高三位 运行状态
int runStateOf(int c)     { return c & ~CAPACITY; }

获取低29位 线程个数
int workerCountOf(int c)  { return c & CAPACITY; }

计算ctl新值,线程状态 与 线程个数
int ctlOf(int rs,int wc) { return rs | wc; }

这里需要对一些操作做些解释。?

  • Integer.SIZE:对于不同平台,其位数不一样,目前常见的是 32 位;

  • (1 << COUNT_BITS) - 1:首先是将 1 左移?COUNT_BITS 位,也就是第?COUNT_BITS + 1 位是1,其余都是 0;-1 操作则是将后面前面的?COUNT_BITS 位都变成 1。

  • -1 << COUNT_BITS:-1 的原码是 10000000 00000000 00000000 00000001 ,反码是 222221111 22222111 22222111 22222110 ,补码 +1,然后左移 29 位是 11100000 00000000 00000000 00000000;这里转为十进制是负数。

  • ~CAPACITY取反,最高三位是1;

总结:这里巧妙利用 bit 操作来将线程数量和运行状态联系在一起,减少了变量的存在和内存的占用。其中五种状态的十进制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED


?

线程池状态

线程池状态含义:

  • RUNNING:接受新任务并且处理阻塞队列里的任务;

  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;

  • STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务;

  • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法

  • TERMINATED:终止状态。terminated 方法调用完成以后的状态;

线程池状态转换:

  • RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了shutdown()方法。

  • RUNNING or SHUTDOWN)-> STOP:显式 shutdownNow() 方法;

  • SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候;

  • STOP -> TIDYING:当线程池为空的时候;

  • TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候;


?原码,反码,补码知识小剧场:

1. 原码:原码就是符号位加上真值的绝对值,即用第一位表示符号,其余位表示值. 比如如果是 8 位二进制:

[+1]?= 0000 0001

[-1]?= 1000 0001

负数原码第一位是符号位.?

?

2. 反码:反码的表示方法是,正数的反码是其本身,负数的反码是在其原码的基础上,符号位不变,其余各个位取反.

[+1] = [0000 0001]?= [0000 0001]

[-1] = [1000 0001]?= [1111 1110]

?

3. 补码:补码的表示方法是,正数的补码就是其本身,负数的补码是在其原码的基础上,符号位不变,其余各位取反,最后 +1. (即在反码的基础上 +1)

[+1] = [0000 0001]?= [0000 0001]?= [0000 0001]

[-1] = [1000 0001]?= [1111 1110]?= [1111 1111]

4. 总结
在知道一个数原码的情况下:
正数:反码,补码 就是本身自己
负数:反码是高位符号位不变,其余位取反。补码:反码+1

?

?5. 左移:当数值左、右移时,先将数值转化为其补码形式,移完后,再转换成对应的原码

? ? ?左移:高位丢弃,低位补零

? ? ?[+1] ?= [00000001]

? ? ?[0000 0001] << 1 = [0000 0010] = [0000 0010] = [+2]

? ? ?[-1] ?= [1000 0001] = [1111 1111]

? ? ?[1111 1111] << 1 = [1111 1110] = [1000 0010] = [-2]

其中,再次提醒,负数的补码是反码+1;负数的反码是补码-1;

?

?6. 右移:高位保持不变,低位丢弃

? ? ?[+127] = [0111 1111] = [0111 1111]

? ? ?[0111 1111]补 >> 1 = [0011 1111] = [0011 1111] = [+63]

? ? ?[-127] = [1111 1111] = [1000 0001]

? ? ?[1000 0001] >> 1 = [1100 0000] = [1100 0000]原?= [-64]


execute 方法分析

通过 ThreadPoolExecutor 创建线程池后,提交任务后执行过程是怎样的,下面来通过源码来看一看。execute 方法源码如下:

public void execute(Runnable command) {
    if (command == )
         NullPointerException();

     返回包含线程数及线程池状态(头3位)
    int c = ctl.get();
    
     如果工作线程数小于核心线程数,则创建线程任务执行
    if (workerCountOf(c) < corePoolSize) {
        
        if (addWorker(command,1)">true))
            return;
            
         如果创建失败,防止外部已经在线程池中加入新任务,重新获取
        c = ctl.get();
    }
    
     只有线程池处于 RUNNING 状态,且 入队列成功
    if (isRunning(c) && workQueue.offer(command)) {
      // 后面的操作属于double-check
        int recheck = ctl.get();
        
         如果线程池不是 RUNNING 状态,则将刚加入队列的任务移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
            
         如果之前的线程已被消费完,新建一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null,1)">false);
    }
     核心池和队列都满了,尝试创建一个新线程
    if (!addWorker(command,1)">))
         如果 addWorker 返回是 false,即创建失败,则唤醒拒绝策略
        reject(command);
}?
execute 方法执行逻辑有这样几种情况:
  1. 如果当前运行的线程少于 corePoolSize,则会创建新的线程来执行新的任务;

  2. 如果运行的线程个数等于或者大于 corePoolSize,则会将提交的任务存放到阻塞队列 workQueue 中;

  3. 如果当前 workQueue 队列已满的话,则会创建新的线程来执行任务;

  4. 如果线程个数已经超过了 maximumPoolSize,则会使用饱和策略 RejectedExecutionHandler 来进行处理。

这里要注意一下?addWorker(null,false)?也就是创建一个线程,但并没有传入任务,因为任务已经被添加到 workQueue 中了,所以 worker 在执行的时候,会直接从 workQueue 中获取任务。所以,在?workerCountOf(recheck) == 0?时执行?addWorker(null,false)?也是为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务。

需要注意的是,线程池的设计思想就是使用了核心线程池 corePoolSize,阻塞队列 workQueue 和线程池 maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在需要框架中都会使用。

需要注意线程和任务之间的区别,任务是保存在?workQueue 中的,线程是从线程池里面取的,由?CAPACITY 控制容量。


addWorker 方法分析

addWorker 方法的主要工作是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize,代码如下:

boolean addWorker(Runnable firstTask,1)">boolean core) {
    retry:
    for (;;) {
         ctl.get();
         获取运行状态
        int rs = runStateOf(c);
        
        /*
         * 这个if判断
         * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
         * 接着判断以下3个条件,只要有1个不满足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列不为空
         * 
         * 首先考虑rs == SHUTDOWN的情况
         * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
         * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
         * 因为队列中已经没有任务了,不需要再添加线程了
         */
         Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return ;
         (;;) {
             获取线程数
            int wc = workerCountOf(c);
             如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
             这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
             如果为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                ;
             尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
             如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();   Re-read ctl
             如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = ;
    boolean workerAdded = ;
    Worker w = try {
         根据firstTask来创建Worker对象
        w =  Worker(firstTask);
         每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != ) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
             {
                 Recheck while holding lock.
                 Back out on ThreadFactory failure or if
                 shut down before lock acquired.
                 runStateOf(ctl.get());
                 rs < SHUTDOWN表示是RUNNING状态;
                 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == )) {
                    if (t.isAlive())  precheck that t is startable
                         IllegalThreadStateException();
                     workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                     largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = ;
                }
            } finally {
                mainLock.unlock();
            }
             (workerAdded) {
                 启动线程
                t.start();
                workerStarted = ;
            }
        }
    } if (! workerStarted)
            addWorkerFailed(w);
    }
     workerStarted;
}

这里需要注意有以下几点:

  1. 在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。

  2. ?t.start()会调用 Worker 类中的 run 方法,Worker 本身实现了 Runnable 接口。原因在创建线程得时候,将 Worker 实例传入了 t 当中,可参见?Worker 类的构造函数。

  3. wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次调用 addWorker 来添加线程会先判断当前线程数是否超过了CAPACITY,然后再去判断是否超 corePoolSize 或 maximumPoolSize,说明线程数实际上是由 CAPACITY 来控制的。


内部类 Worker 分析

上面分析过程中,提到了一个 Worker 类,对于某些对源码不是很熟悉得同学可能有点不清楚,下面就来看看 Worker 的源码:

 class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized,but we provide a
         * serialVersionUID to suppress a javac warning.
         long serialVersionUID = 6138294804551838833L;

         Thread this worker is running in.  Null if factory fails. final Thread thread;
         Initial task to run.  Possibly null. */
        Runnable firstTask;
         Per-thread task counter volatile  completedTasks;

        
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         
        Worker(Runnable firstTask) {
            setState(-1);  inhibit interrupts until runWorker
            this.firstTask = firstTask;
       // 注意此处传入的是this
this.thread = getThreadFactory().newThread(); } Delegates main run loop to outer runWorker. */
     // 这里其实会调用外部的 runWorker 方法来执行自己。
run() { runWorker( Lock methods // The value 0 represents the unlocked state. The value 1 represents the locked state. protected isHeldExclusively() { return getState() != 0; } boolean tryAcquire( unused) {
       // 如果已经设置过1了,这时候在设置1就会返回false,也就是不可重入
if (compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); ; } boolean tryRelease( unused) { setExclusiveOwnerThread(); setState(0); void lock() { acquire(1); } boolean tryLock() { return tryAcquire(1void unlock() { release(1boolean isLocked() { isHeldExclusively(); }      // 提供安全中断线程得方法 interruptIfStarted() { Thread t;
       // 一开始 setstate(-1) 避免了还没开始运行就被中断可能
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { { t.interrupt(); } catch (SecurityException ignore) { } } } }

首先看到的是 Worker 继承了(AbstractQueuedSynchronizer) AQS,并实现了 Runnable 接口,说明 Worker 本身也是线程。然后看其构造函数可以发现,内部有两个属性变量分别是?Runnable 和 Thread 实例,该类其实就是对传进来得属性做了一个封装,并加入了获取锁的逻辑(继承了 AQS )。具体可参考文章:透过 ReentrantLock 分析 AQS 的实现原理

Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:

  1. lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;

  2. 如果正在执行任务,则不应该中断线程;

  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;

  4. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;

  5. 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁。如果使用 ReentrantLock,它是可重入的,这样如果在任务中调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程,因为 size 小了,需要中断一些线程 。

所以,Worker 继承自 AQS,用于判断线程是否空闲以及是否可以被中断。

此外,在构造方法中执行了?setState(-1);,把 state 变量设置为 -1,为什么这么做呢?是因为 AQS 中默认的 state 是 0,如果刚创建了一个 Worker 对象,还没有执行任务时,这时就不应该被中断,看一下 tryAquire 方法:?

 unused) {
    )) {
        setExclusiveOwnerThread(Thread.currentThread());
        ;
    }
    ;
}

正因为如此,在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0。tryAcquire 方法是根据 state 是否是 0 来判断的,所以,setState(-1);将 state 设置为 -1 是为了禁止在执行任务前对线程进行中断。


?runWorker 方法分析

前面提到了内部类 Worker 的 run 方法调用了外部类 runWorker,下面来看下 runWork 的具体逻辑。

 runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = ;
       w.unlock();  status 设置为0,允许中断,也可以避免再次加锁失败
       boolean completedAbruptly = ;
        {
           while (task != null || (task = getTask()) != ) {
               // 要派发task的时候,需要上锁
               w.lock();
                如果线程池当前状态至少是stop,则设置中断标志;
                如果线程池当前状态是RUNNININ,则重置中断标志,重置后需要重新
               检查下线程池状态,因为当重置中断标志时候,可能调用了线程池的shutdown方法
               改变了线程池状态。
               if ((runStateAtLeast(ctl.get(),STOP) ||
                    (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(),STOP))) &&
                   !wt.isInterrupted())
                   wt.interrupt();

                {
                   任务执行前干一些事情
                   beforeExecute(wt,task);
                   Throwable thrown = ;
                    {
                       task.run();执行任务
                   }  (RuntimeException x) {
                       thrown = x; throw x;
                   }  (Error x) {
                       thrown = x;  (Throwable x) {
                       thrown = x;  Error(x);
                   }  {
                       任务执行完毕后干一些事情
                       afterExecute(task,thrown);
                   }
               }  {
                   task = 统计当前worker完成了多少个任务
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = ;
       }  {

           执行清了工作
           processWorkerExit(w,completedAbruptly);
       }
   }

总结一下 runWorker 方法的执行过程:

  1. while 循环不断地通过 getTask() 方法从阻塞队列中取任务;

  2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;

  3. 调用?task.run()执行任务;

  4. 如果 task 为 null 则跳出循环,执行 processWorkerExit 方法;

  5. runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。

这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。

completedAbruptly 变量来表示在执行任务过程中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。


?

getTask 方法分析

getTask 方法是从阻塞队列里面获取任务,具体代码逻辑如下:

private Runnable getTask() {
     timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false;  Did the last poll() time out?
     runStateOf(c);
        
         * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            ;
        }
         workerCountOf(c);
         Are workers subject to culling?
         timed变量用于判断是否需要进行超时控制。
         allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
         wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
         对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
             (compareAndDecrementWorkerCount(c))
                 {
            
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             
            Runnable r = timed ?
                workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != )
                 r;
             如果 r == null,说明已经超时,timedOut设置为true
            timedOut = ;
        }  (InterruptedException retry) {
             如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = ;
        }
    }
}

其实到这里后,你会发现在 ThreadPoolExcute 内部有几个重要的检验:

  • 判断当前的运行状态,根据运行状态来做处理,如果当前都停止运行了,那很多操作也就没必要了;

  • 判断当前线程池的数量,然后将该数据和 corePoolSize 以及?maximumPoolSize 进行比较,然后再去决定下一步该做啥;

首先是第一个 if 判断,当运行状态处于非 RUNNING 状态,此外?rs >= STOP(线程池是否正在 stop)或阻塞队列是否为空。则将 workerCount 减 1 并返回 null。为什么要减 1 呢,因为此处其实是去获取一个 task,但是发现处于停止状态了,也就是没必要再去获取运行任务了,那这个线程就没有存在的意义了。后续也会在?processWorkerExit 将该线程移除。

第二个 if 条件目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。

什么时候会销毁?当然是 runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。

getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。


?

processWorkerExit 方法

下面在看?processWorkerExit 方法的具体逻辑:

void processWorkerExit(Worker w,1)"> completedAbruptly) {
     如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
     如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。  
    if (completedAbruptly)  If abrupt,then workerCount wasn't adjusted
        decrementWorkerCount();
    .mainLock;
    mainLock.lock();
    统计完成的任务数
        completedTaskCount += w.completedTasks;
         从workers中移除,也就表示着从线程池中移除了一个工作线程
        workers.remove(w);
    }  {
        mainLock.unlock();
    }
     根据线程池状态进行判断是否结束线程池
    tryTerminate();
     ctl.get();
    
     * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
     (runStateLessThan(c,STOP)) {
        completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1if (workerCountOf(c) >= min)
                return;  replacement not needed
        }
        addWorker();
    }
}

至此,processWorkerExit 执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期。但是这有两点需要注意:

  1. 大家想想什么时候才会调用这个方法,任务干完了才会调用。那么没事做了,就需要看下是否有必要结束线程池,这时候就会调用?tryTerminate。

  2. 如果此时线程处于 STOP 状态以下,那么就会判断核心线程数是否达到了规定的数量,没有的话,就会继续创建一个线程。


tryTerminate方法

tryTerminate 方法根据线程池状态进行判断是否结束线程池,代码如下:

 tryTerminate() {
    
         * 当前线程池的状态为以下几种情况时,直接返回:
         * 1. RUNNING,因为还在运行中,不能停止;
         * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
         if (isRunning(c) ||
            runStateAtLeast(c,TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! 如果线程数量不为0,则中断一个空闲的工作线程,并返回
        if (workerCountOf(c) != 0) {  Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            .mainLock;
        mainLock.lock();
         这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
            if (ctl.compareAndSet(c,ctlOf(TIDYING,1)">))) {
                 {
                     terminated方法默认什么都不做,留给子类实现
                    terminated();
                }  设置状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED,1)">));
                    termination.signalAll();
                }
                ;
            }
        }  {
            mainLock.unlock();
        }
         else retry on failed CAS
    }
}

interruptIdleWorkers(boolean onlyOne) 如果 ONLY_ONE = true 那么就的最多让一个空闲线程发生中断,ONLY_ONE = false 时是所有空闲线程都会发生中断。那线程什么时候会处于空闲状态呢?

一是线程数量很多,任务都完成了;二是线程在 getTask 方法中执行?workQueue.take()?时,如果不执行中断会一直阻塞。

所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。


?

shutdown方法

shutdown 方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker,最后调用 tryTerminate 尝试结束线程池。

 shutdown() {
     安全策略判断
        checkShutdownAccess();
         切换状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
         中断空闲线程
        interruptIdleWorkers();
        onShutdown();  hook for ScheduledThreadPoolExecutor
    }  尝试结束线程池
    tryTerminate();
}

这里思考一个问题:在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操作,为什么要在执行任务的时候对每个工作线程都加锁呢?

下面仔细分析一下:

  • 在 getTask 方法中,如果这时线程池的状态是 SHUTDOWN 并且 workQueue 为空,那么就应该返回 null 来结束这个工作线程,而使线程池进入 SHUTDOWN 状态需要调用shutdown 方法;

  • shutdown 方法会调用 interruptIdleWorkers 来中断空闲的线程,interruptIdleWorkers 持有 mainLock,会遍历 workers 来逐个判断工作线程是否空闲。但 getTask 方法中没有mainLock;

  • 在 getTask 中,如果判断当前线程池状态是 RUNNING,并且阻塞队列为空,那么会调用?workQueue.take()?进行阻塞;

  • 如果在判断当前线程池状态是 RUNNING 后,这时调用了 shutdown 方法把状态改为了 SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了?workQueue.take()?后会一直阻塞而不会被销毁,因为在 SHUTDOWN 状态下不允许再有新的任务添加到 workQueue 中,这样一来线程池永远都关闭不了了;

  • 由上可知,shutdown 方法与 getTask 方法(从队列中获取任务时)存在竞态条件;

  • 解决这一问题就需要用到线程的中断,也就是为什么要用 interruptIdleWorkers 方法。在调用?workQueue.take()?时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出 InterruptedException,解除阻塞的状态;

  • 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;

  • 所以 Worker 继承自 AQS,在工作线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务,如果 tryLock 返回 true,说明该工作线程当前未执行任务,这时才可以被中断。

下面就来分析一下 interruptIdleWorkers 方法。

interruptIdleWorkers方法

 interruptIdleWorkers() {
    interruptIdleWorkers();
}
void interruptIdleWorkers( onlyOne) {
     (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                 (SecurityException ignore) {
                }  {
                    w.unlock();
                }
            }
             (onlyOne)
                ;
        }
    }  {
        mainLock.unlock();
    }
}

interruptIdleWorkers 遍历 workers 中所有的工作线程,若线程没有被中断 tryLock 成功,就中断该线程。

为什么需要持有 mainLock ?因为 workers 是 HashSet 类型的,不能保证线程安全。


?

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
     {
        checkShutdownAccess();
        advanceRunState(STOP);
         中断所有工作线程,无论是否空闲
        interruptWorkers();
         取出队列中没有被执行的任务
        tasks = drainQueue();
    }  {
        mainLock.unlock();
    }
    tryTerminate();
     tasks;
}

shutdownNow 方法与 shutdown 方法类似,不同的地方在于:

  1. 设置状态为 STOP;

  2. 中断所有工作线程,无论是否是空闲的;

  3. 取出阻塞队列中没有被执行的任务并返回。

shutdownNow 方法执行完之后调用 tryTerminate 方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为 TERMINATED。


?

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • getTaskCount:线程池已经执行的和未执行的任务总数;

  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于 taskCount;

  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;

  • getPoolSize:线程池当前的线程数量;

  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。

到此,关于 ThreadPoolExecutor 的内容就讲完了。

??

?参考文献

Java中线程池ThreadPoolExecutor原理探究

【Java】 之ThreadPoolExcutor源码浅析

线程池ThreadPoolExecutor实现原理

深入理解Java线程池:ThreadPoolExecutor

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读