ThreadPoolExcutor 原理探究 透过 ReentrantLock 分
概论线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。 例如,线程数一般取 cpu 数量 +2 比较合适,线程数过多会导致额外的线程切换开销。 Java 中的线程池是用 ThreadPoolExecutor 类来实现的. 本文就对该类的源码来分析一下这个类内部对于线程的创建,管理以及后台任务的调度等方面的执行原理。 先看一下线程池的类图: 上图的目的主要是为了让大家知道线程池相关类之间的关系,至少赚个眼熟,以后看到不会有害怕的感觉。 ? Executor 框架接口Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。 下面是 ThreadPoolExeCutor 类图。Executors 其实是一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程实例。 从上图也可以看出来,ThreadPoolExeCutor 是线程池的核心。 J.U.C 中有三个 Executor 接口:
其实通过这些接口就可以看到一些设计思想,每个接口的名字和其任务是完全匹配的。不会因为 Executor 中只有一个方法,就将其放到其他接口中。这也是很重要的单一原则。 ? ThreadPoolExeCutor 分析在去具体分析?ThreadPoolExeCutor 运行逻辑前,先看下面的流程图: 该图是 ThreadPoolExeCutor 整个运行过程的一个概括,整个源码的核心逻辑总结起来就是:
下面将进入源码分析,来深入理解?ThreadPoolExeCutor 的设计思想。 ? 构造函数先来看构造函数: public ThreadPoolExecutor(int corePoolSize, maximumPoolSize,1)">long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();
属性定义看完构造函数之后,再来看下该类里面的变量,有助于进一步理解整个代码运行逻辑,下面是一些比较重要的变量: // 用来标记线程池状态(高3位),线程个数(低29位) 默认是 RUNNING 状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0)); 线程个数掩码位数,整型最大位数-3,可以适用于不同平台 static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00022222222222222222222222221111 int CAPACITY = (1 << COUNT_BITS) - 1(高3位):11100000000000000000000000000000 int RUNNING = -1 << COUNT_BITS; (高3位):00000000000000000000000000000000 int SHUTDOWN = 0 <<(高3位):00100000000000000000000000000000 int STOP = 1 <<(高3位):01000000000000000000000000000000 int TIDYING = 2 <<(高3位):01100000000000000000000000000000 int TERMINATED = 3 << 获取高三位 运行状态 int runStateOf(int c) { return c & ~CAPACITY; } 获取低29位 线程个数 int workerCountOf(int c) { return c & CAPACITY; } 计算ctl新值,线程状态 与 线程个数 int ctlOf(int rs,int wc) { return rs | wc; } 这里需要对一些操作做些解释。?
总结:这里巧妙利用 bit 操作来将线程数量和运行状态联系在一起,减少了变量的存在和内存的占用。其中五种状态的十进制排序:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED ? 线程池状态线程池状态含义:
线程池状态转换:
?原码,反码,补码知识小剧场:1. 原码:原码就是符号位加上真值的绝对值,即用第一位表示符号,其余位表示值. 比如如果是 8 位二进制: [+1]原?= 0000 0001 [-1]原?= 1000 0001 负数原码第一位是符号位.? ? 2. 反码:反码的表示方法是,正数的反码是其本身,负数的反码是在其原码的基础上,符号位不变,其余各个位取反. [+1] = [0000 0001]原?= [0000 0001]反 [-1] = [1000 0001]原?= [1111 1110]反 ? 3. 补码:补码的表示方法是,正数的补码就是其本身,负数的补码是在其原码的基础上,符号位不变,其余各位取反,最后 +1. (即在反码的基础上 +1) [+1] = [0000 0001]原?= [0000 0001]反?= [0000 0001]补 [-1] = [1000 0001]原?= [1111 1110]反?= [1111 1111]补 4. 总结 ? ?5. 左移:当数值左、右移时,先将数值转化为其补码形式,移完后,再转换成对应的原码 ? ? ?左移:高位丢弃,低位补零 ? ? ?[+1] ?= [00000001]补 ? ? ?[0000 0001]补 << 1 = [0000 0010]补 = [0000 0010]原 = [+2] ? ? ?[-1] ?= [1000 0001]原 = [1111 1111]补 ? ? ?[1111 1111]补 << 1 = [1111 1110]补 = [1000 0010]原 = [-2] 其中,再次提醒,负数的补码是反码+1;负数的反码是补码-1; ? ?6. 右移:高位保持不变,低位丢弃 ? ? ?[+127] = [0111 1111]原 = [0111 1111]补 ? ? ?[0111 1111]补 >> 1 = [0011 1111]补 = [0011 1111]原 = [+63] ? ? ?[-127] = [1111 1111]原 = [1000 0001]补 ? ? ?[1000 0001]补 >> 1 = [1100 0000]补 = [1100 0000]原?= [-64] execute 方法分析通过 ThreadPoolExecutor 创建线程池后,提交任务后执行过程是怎样的,下面来通过源码来看一看。execute 方法源码如下: public void execute(Runnable command) { if (command == ) NullPointerException(); 返回包含线程数及线程池状态(头3位) int c = ctl.get(); 如果工作线程数小于核心线程数,则创建线程任务执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command,1)">true)) return; 如果创建失败,防止外部已经在线程池中加入新任务,重新获取 c = ctl.get(); } 只有线程池处于 RUNNING 状态,且 入队列成功 if (isRunning(c) && workQueue.offer(command)) { // 后面的操作属于double-check int recheck = ctl.get(); 如果线程池不是 RUNNING 状态,则将刚加入队列的任务移除 if (! isRunning(recheck) && remove(command)) reject(command); 如果之前的线程已被消费完,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null,1)">false); } 核心池和队列都满了,尝试创建一个新线程 if (!addWorker(command,1)">)) 如果 addWorker 返回是 false,即创建失败,则唤醒拒绝策略 reject(command); }? execute 方法执行逻辑有这样几种情况:
这里要注意一下? 需要注意的是,线程池的设计思想就是使用了核心线程池 corePoolSize,阻塞队列 workQueue 和线程池 maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在需要框架中都会使用。 需要注意线程和任务之间的区别,任务是保存在?workQueue 中的,线程是从线程池里面取的,由?CAPACITY 控制容量。 addWorker 方法分析addWorker 方法的主要工作是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize,代码如下: boolean addWorker(Runnable firstTask,1)">boolean core) { retry: for (;;) { ctl.get(); 获取运行状态 int rs = runStateOf(c); /* * 这个if判断 * 如果rs >= SHUTDOWN,则表示此时不再接收新任务; * 接着判断以下3个条件,只要有1个不满足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 * 2. firsTask为空 * 3. 阻塞队列不为空 * * 首先考虑rs == SHUTDOWN的情况 * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false; * 然后,如果firstTask为空,并且workQueue也为空,则返回false, * 因为队列中已经没有任务了,不需要再添加线程了 */ Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return ; (;;) { 获取线程数 int wc = workerCountOf(c); 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较, 如果为false则根据maximumPoolSize来比较。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) ; 尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); Re-read ctl 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = ; boolean workerAdded = ; Worker w = try { 根据firstTask来创建Worker对象 w = Worker(firstTask); 每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != ) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); { Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired. runStateOf(ctl.get()); rs < SHUTDOWN表示是RUNNING状态; 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == )) { if (t.isAlive()) precheck that t is startable IllegalThreadStateException(); workers是一个HashSet workers.add(w); int s = workers.size(); largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = ; } } finally { mainLock.unlock(); } (workerAdded) { 启动线程 t.start(); workerStarted = ; } } } if (! workerStarted) addWorkerFailed(w); } workerStarted; } 这里需要注意有以下几点:
内部类 Worker 分析上面分析过程中,提到了一个 Worker 类,对于某些对源码不是很熟悉得同学可能有点不清楚,下面就来看看 Worker 的源码: class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized,but we provide a * serialVersionUID to suppress a javac warning. long serialVersionUID = 6138294804551838833L; Thread this worker is running in. Null if factory fails. final Thread thread; Initial task to run. Possibly null. */ Runnable firstTask; Per-thread task counter volatile completedTasks; * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) Worker(Runnable firstTask) { setState(-1); inhibit interrupts until runWorker this.firstTask = firstTask; 首先看到的是 Worker 继承了(AbstractQueuedSynchronizer) AQS,并实现了 Runnable 接口,说明 Worker 本身也是线程。然后看其构造函数可以发现,内部有两个属性变量分别是?Runnable 和 Thread 实例,该类其实就是对传进来得属性做了一个封装,并加入了获取锁的逻辑(继承了 AQS )。具体可参考文章:透过 ReentrantLock 分析 AQS 的实现原理 Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:
所以,Worker 继承自 AQS,用于判断线程是否空闲以及是否可以被中断。 此外,在构造方法中执行了? unused) { )) { setExclusiveOwnerThread(Thread.currentThread()); ; } ; } 正因为如此,在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0。tryAcquire 方法是根据 state 是否是 0 来判断的,所以, ?runWorker 方法分析前面提到了内部类 Worker 的 run 方法调用了外部类 runWorker,下面来看下 runWork 的具体逻辑。 runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = ; w.unlock(); status 设置为0,允许中断,也可以避免再次加锁失败 boolean completedAbruptly = ; { while (task != null || (task = getTask()) != ) { // 要派发task的时候,需要上锁 w.lock(); 如果线程池当前状态至少是stop,则设置中断标志; 如果线程池当前状态是RUNNININ,则重置中断标志,重置后需要重新 检查下线程池状态,因为当重置中断标志时候,可能调用了线程池的shutdown方法 改变了线程池状态。 if ((runStateAtLeast(ctl.get(),STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(),STOP))) && !wt.isInterrupted()) wt.interrupt(); { 任务执行前干一些事情 beforeExecute(wt,task); Throwable thrown = ; { task.run();执行任务 } (RuntimeException x) { thrown = x; throw x; } (Error x) { thrown = x; (Throwable x) { thrown = x; Error(x); } { 任务执行完毕后干一些事情 afterExecute(task,thrown); } } { task = 统计当前worker完成了多少个任务 w.completedTasks++; w.unlock(); } } completedAbruptly = ; } { 执行清了工作 processWorkerExit(w,completedAbruptly); } } 总结一下 runWorker 方法的执行过程:
这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的,留给子类来实现。 completedAbruptly 变量来表示在执行任务过程中是否出现了异常,在 processWorkerExit 方法中会对该变量的值进行判断。 ? getTask 方法分析getTask 方法是从阻塞队列里面获取任务,具体代码逻辑如下: private Runnable getTask() { timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; Did the last poll() time out? runStateOf(c); * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: * 1. rs >= STOP,线程池是否正在stop; * 2. 阻塞队列是否为空。 * 如果以上条件满足,则将workerCount减1并返回null。 * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); ; } workerCountOf(c); Are workers subject to culling? timed变量用于判断是否需要进行超时控制。 allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; 对于超过核心线程数量的这些线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时 * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 如果减1失败,则返回重试。 * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { (compareAndDecrementWorkerCount(c)) { * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 * Runnable r = timed ? workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) : workQueue.take(); if (r != ) r; 如果 r == null,说明已经超时,timedOut设置为true timedOut = ; } (InterruptedException retry) { 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = ; } } } 其实到这里后,你会发现在 ThreadPoolExcute 内部有几个重要的检验:
首先是第一个 if 判断,当运行状态处于非 RUNNING 状态,此外?rs >= STOP(线程池是否正在 stop)或阻塞队列是否为空。则将 workerCount 减 1 并返回 null。为什么要减 1 呢,因为此处其实是去获取一个 task,但是发现处于停止状态了,也就是没必要再去获取运行任务了,那这个线程就没有存在的意义了。后续也会在?processWorkerExit 将该线程移除。 第二个 if 条件目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。 什么时候会销毁?当然是 runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。 getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法。 ? processWorkerExit 方法下面在看?processWorkerExit 方法的具体逻辑: void processWorkerExit(Worker w,1)"> completedAbruptly) { 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) If abrupt,then workerCount wasn't adjusted decrementWorkerCount(); .mainLock; mainLock.lock(); 统计完成的任务数 completedTaskCount += w.completedTasks; 从workers中移除,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } { mainLock.unlock(); } 根据线程池状态进行判断是否结束线程池 tryTerminate(); ctl.get(); * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker; * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker; * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 */ (runStateLessThan(c,STOP)) { completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1if (workerCountOf(c) >= min) return; replacement not needed } addWorker(); } } 至此,processWorkerExit 执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期。但是这有两点需要注意:
tryTerminate方法tryTerminate 方法根据线程池状态进行判断是否结束线程池,代码如下: tryTerminate() { * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; if (isRunning(c) || runStateAtLeast(c,TIDYING) || (runStateOf(c) == SHUTDOWN && ! 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { Eligible to terminate interruptIdleWorkers(ONLY_ONE); .mainLock; mainLock.lock(); 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法 if (ctl.compareAndSet(c,ctlOf(TIDYING,1)">))) { { terminated方法默认什么都不做,留给子类实现 terminated(); } 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED,1)">)); termination.signalAll(); } ; } } { mainLock.unlock(); } else retry on failed CAS } }
一是线程数量很多,任务都完成了;二是线程在 getTask 方法中执行? 所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。 ? shutdown方法shutdown 方法要将线程池切换到 SHUTDOWN 状态,并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker,最后调用 tryTerminate 尝试结束线程池。 shutdown() { 安全策略判断 checkShutdownAccess(); 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); 中断空闲线程 interruptIdleWorkers(); onShutdown(); hook for ScheduledThreadPoolExecutor } 尝试结束线程池 tryTerminate(); } 这里思考一个问题:在 runWorker 方法中,执行任务时对 Worker 对象 w 进行了 lock 操作,为什么要在执行任务的时候对每个工作线程都加锁呢? 下面仔细分析一下:
下面就来分析一下 interruptIdleWorkers 方法。 interruptIdleWorkers方法interruptIdleWorkers() { interruptIdleWorkers(); } void interruptIdleWorkers( onlyOne) { (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { (SecurityException ignore) { } { w.unlock(); } } (onlyOne) ; } } { mainLock.unlock(); } } interruptIdleWorkers 遍历 workers 中所有的工作线程,若线程没有被中断 tryLock 成功,就中断该线程。 为什么需要持有 mainLock ?因为 workers 是 HashSet 类型的,不能保证线程安全。 ? shutdownNow方法public List<Runnable> shutdownNow() { List<Runnable> tasks; { checkShutdownAccess(); advanceRunState(STOP); 中断所有工作线程,无论是否空闲 interruptWorkers(); 取出队列中没有被执行的任务 tasks = drainQueue(); } { mainLock.unlock(); } tryTerminate(); tasks; } shutdownNow 方法与 shutdown 方法类似,不同的地方在于:
shutdownNow 方法执行完之后调用 tryTerminate 方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为 TERMINATED。 ? 线程池的监控通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。 到此,关于 ThreadPoolExecutor 的内容就讲完了。 ?? ?参考文献Java中线程池ThreadPoolExecutor原理探究【Java】 之ThreadPoolExcutor源码浅析线程池ThreadPoolExecutor实现原理深入理解Java线程池:ThreadPoolExecutor?(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |