【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)
线程池
线程池初始化时是没有创建线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销
public ThreadPoolExecutor(// 线程池核心线程数int corePoolSize, // 线程池最大数int maximumPoolSize, // 空闲线程存活时间long keepAliveTime, // 时间单位TimeUnit unit,// 线程池所使用的缓冲队列BlockingQueue<Runnable> workQueue,// 线程池创建线程使用的工厂ThreadFactory threadFactory,// 线程池对拒绝任务的处理策略RejectedExecutionHandler handler)ThreadPoolExecutor 源码分析
线程池状态
ThreadPoolExecutor有一个AtomicInteger变量,叫ctl(control的简写),一共32位,高3位存线程池状态runState(一共5种状态:Running,Shutdown,Stop,Tidying,Terminate),低29位存当前有效线程数workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//COUNT_BITS = 29private static final int COUNT_BITS = Integer.SIZE - 3;// 线程池最大线程数 = 536870911(2^29-1)// 将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量// CAPACITY :00011111111111111111111111111111// 高3位000,低29位全为1private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 运行状态保存在 int 值的高 3 位 (所有数值左移 29 位)// RUNNING :11100000000000000000000000000000// 高3位111,低29位全为0private static final int RUNNING = -1 << COUNT_BITS;// SHUTDOWN :00000000000000000000000000000000// 高3位000,低29位全为0private static final int SHUTDOWN = 0 << COUNT_BITS;// STOP :00100000000000000000000000000000// 高3位001,低29位全为0private static final int STOP = 1 << COUNT_BITS;// TIDYING :01000000000000000000000000000000// 高3位010,低29位全为0private static final int TIDYING = 2 << COUNT_BITS;// TERMINATED:01100000000000000000000000000000// 高3位011,低29位全为0private static final int TERMINATED = 3 << COUNT_BITS;分析:
事实上COUNT_BITS = 29,而上面的5重线程状态实际上是使用32位中的高3位来表示,低29位存线程数,这样线程池的状态和线程数量就由一个变量存储,即:
- RUNNING = 111: 线程池正常运行,可以接受新的任务并处理队列中的任务。
- SHUTDOWN = 000:关闭状态,不再接受新的任务,但是会执行队列中的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP = 001:不再接受新任务,不处理队列中的任务,中断进行中的任务。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING = 010:所有任务已经终止,workerCount为0,线程状态转换到TIDYING,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED = 011:terminate()函数执行完成后进入该状态。
execute 方法
public void execute(Runnable command) {/*** 源代码阅读说明:* 1、这里的command就是task;因为这里使用了命令模式,所以用command命名;执行task,task被包装成了一个work。* 2、isRunning检查当前ThreadPoolExecutor是否是运行状态* 3、workerCountOf会根据ThreadPoolExecutor的状态值获得当前正的正在执行的和等待的work数量。* 这里先不用深究isRunning和workerCounterOf方法的实现。* 4、addWork方法会新增一个线程,返回true表示新增成功,返回false表示新增失败*/if (command == null)// 如果任务为null,则抛出 NullPointerException 异常throw new NullPointerException();// 获取当前线程池的状态 + 线程个数变量的组合值int c = ctl.get();// 当前线程池线程个数是否小于 corePoolSize,小于则开启新线程运行if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 如果线程池处于RUNNING状态,核心池已满,但任务队列未满,则添加任务到阻塞队列if (isRunning(c) && workQueue.offer(command)) {// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了int recheck = ctl.get();// 如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 否者如果当前线程池线程空,则添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 核心池已满,队列已满,试着创建一个新线程else if (!addWorker(command, false))// 如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务reject(command);}execute 方法 执行流程
1、首先判断任务是否为空,空则抛出空指针异常
2、不为空则获取线程池控制状态,判断小于corePoolSize(核心线程),添加到worker集合当中执行,
- 如成功,则返回
- 失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
3、判断线程池是否处于运行状态,是的话则添加 command 到阻塞队列,加入时也会再次获取状态并且检测状态是否不处于运行状态,不处于的话则将 command 从阻塞队列移除,并且拒绝任务
4、如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
5、如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃
addWorker方法
如果工作线程数小于核心线程数的话,会调用 addWorker ,顾名思义,其实就是要创建一个工作线程。我们来看看源码的实现
源码比较长,其实就做了两件事。
- 才用循环 CAS 操作来将线程数加 1;
- 新建一个线程并启用。
addWorker方法 执行流程
1、获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步
2、死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作,
3、如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环
4、+1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例。
Worker内部类
可以发现 addWorker 方法只是构造了一个 Worker,并且把 firstTask 封装到 worker 中,它是做什么的呢?我们来瞧瞧
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** * Thread this worker is running in. Null if factory fails. * 此工作线程正在运行的线程。如果工厂失败,则为null。*/// 这才是真正执行 task 的线程,从构造函数可知是由ThreadFactury 创建的final Thread thread;/** * Initial task to run. Possibly null. * 要运行的初始任务。可能为空*/// 这就是需要执行的 taskRunnable firstTask;/** * Per-thread task counter * 每线程任务计数器*/// 完成的任务数,用于线程池统计volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*//*** 创建并初始化第一个任务,使用线程工厂来创建线程* 初始化有3步* 1、设置AQS的同步状态为-1,表示该对象需要被唤醒* 2、初始化第一个任务* 3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread*/Worker(Runnable firstTask) {// 初始状态 -1,防止在调用 runWorker(),也就是真正执行 task前中断 thread。setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 创建一个线程this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */// 重写Runnable的run方法public void run() {// 调用ThreadPoolExecutor的runWorker方法runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.// 代表是否独占锁,0-非独占 1-独占protected boolean isHeldExclusively() {return getState() != 0;}// 重写AQS的tryAcquire方法尝试获取锁protected boolean tryAcquire(int unused) {// 尝试将AQS的同步状态从0改为1if (compareAndSetState(0, 1)) {// 如果改变成,则将当前独占模式的线程设置为当前线程并返回truesetExclusiveOwnerThread(Thread.currentThread());// 成功获得锁return true;}// 线程进入等待队列return false;}// 重写AQS的tryRelease尝试释放锁protected boolean tryRelease(int unused) {// 设置当前独占模式的线程为nullsetExclusiveOwnerThread(null);// 设置AQS同步状态为0setState(0);// 返回truereturn true;}// 获取锁public void lock() { acquire(1); }// 尝试获取锁public boolean tryLock() { return tryAcquire(1); }// 释放锁public void unlock() { release(1); }// 是否被独占public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}addWorkerFailed方法
addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) {// 重入锁final ReentrantLock mainLock = this.mainLock;// 获取锁mainLock.lock();try {// 如果worker不为nullif (w != null)// workers移除workerworkers.remove(w);// 通过CAS操作,workerCount-1decrementWorkerCount();tryTerminate();} finally {// 释放锁mainLock.unlock();}}tryTerminate方法
当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池
final void tryTerminate() {// 死循环for (;;) {// 获取线程池控制状态int c = ctl.get();/** 线程池处于RUNNING状态* 线程池状态最小大于TIDYING* 线程池==SHUTDOWN并且workQUeue不为空* 直接return,不能终止*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果workerCount不为0if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 获取锁mainLock.lock();try {// 通过CAS操作,设置线程池状态为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {// 设置线程池的状态为TERMINATEDctl.set(ctlOf(TERMINATED, 0));// 发送释放信号给在termination条件上等待的线程termination.signalAll();}return;}} finally {// 释放锁mainLock.unlock();}// else retry on failed CAS}}runWorker方法
该方法的作用就是去执行任务
final void runWorker(Worker w) {// 获取当前线程Thread wt = Thread.currentThread();// 获取worker里的任务Runnable task = w.firstTask;// 将worker实例的任务赋值为nullw.firstTask = null;/*** unlock方法会调用AQS的release方法* release方法会调用具体实现类也就是Worker的tryRelease方法* 也就是将AQS状态置为0,允许中断*/w.unlock(); // allow interrupts// 是否突然完成boolean completedAbruptly = true;try {// worker实例的task不为空,或者通过getTask获取的不为空while (task != null || (task = getTask()) != null) {// 获取锁w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt/*** 获取线程池的控制状态,至少要大于STOP状态* 如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP* 如果上述满足,检查该对象是否处于中断状态,不清除中断标记*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断改对象wt.interrupt();try {// 执行前的方法,由子类具体实现beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行完后调用的方法,也是由子类具体实现afterExecute(task, thrown);}} finally { // 执行完后// task设置为nulltask = null;// 已完成任务数+1w.completedTasks++;// 释放锁w.unlock();}}completedAbruptly = false;} finally {// 处理并退出当前workerprocessWorkerExit(w, completedAbruptly);}}runWorker方法 执行流程
1,首先在方法一进来,就执行了w.unlock(),这是为了将AQS的状态改为0,因为只有getState() >= 0的时候,线程才可以被中断;
2,判断firstTask是否为空,为空则通过getTask()获取任务,不为空接着往下执行
3,判断是否符合中断状态,符合的话设置中断标记
4,执行beforeExecute(),task.run(),afterExecute()方法
5,任何一个出异常都会导致任务执行的终止;进入processWorkerExit来退出任务
6,正常执行的话会接着回到步骤2
getTask方法
在上面的runWorker方法当中我们可以看出,当firstTask为空的时候,会通过该方法来接着获取任务去执行,那我们就看看获取任务这个方法到底是怎么样的?
private Runnable getTask() {// 标志是否获取任务超时boolean timedOut = false; // Did the last poll() time out?for (;;) {// 获取线程池的控制状态int c = ctl.get();// 获取线程池的runStateint rs = runStateOf(c);// Check if queue empty only if necessary./*** 判断线程池的状态,出现以下两种情况* 1、runState大于等于SHUTDOWN状态* 2、runState大于等于STOP或者阻塞队列为空* 将会通过CAS操作,进行workerCount-1并返回null*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 获取线程池的workerCountint wc = workerCountOf(c);// Are workers subject to culling?/*** allowCoreThreadTimeOut:是否允许core Thread超时,默认false* workerCount是否大于核心核心线程池*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 1、wc大于maximumPoolSize或者已超时* 2、队列不为空时保证至少有一个任务*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {/*** 通过CAS操作,workerCount-1* 能进行-1操作,证明wc大于maximumPoolSize或者已经超时*/if (compareAndDecrementWorkerCount(c))// -1操作成功,返回nullreturn null;// -1操作失败,继续循环continue;}try {/*** wc大于核心线程池* 执行poll方法* 小于核心线程池* 执行take方法*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 判断任务不为空返回任务if (r != null)return r;// 获取一段时间没有获取到,获取超时timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}processWorkerExit方法
明显的,在执行任务当中,会去获取任务进行执行,那既然是执行任务,肯定就会有执行完或者出现异常中断执行的时候,那这时候肯定也会有相对应的操作,至于具体操作是怎么样的,我们还是直接去看源码最实际。
private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** completedAbruptly:在runWorker出现,代表是否突然完成的意思* 也就是在执行任务过程当中出现异常,就会突然完成,传true** 如果是突然完成,需要通过CAS操作,workerCount-1* 不是突然完成,则不需要-1,因为getTask方法当中已经-1** 下面的代码注释貌似与代码意思相反了*/if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();// 生成重入锁final ReentrantLock mainLock = this.mainLock;// 获取锁mainLock.lock();try {// 线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数completedTaskCount += w.completedTasks;// 从HashSet<Worker>中移除workers.remove(w);} finally {// 释放锁mainLock.unlock();}// 因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池tryTerminate();// 获取线程池的控制状态int c = ctl.get();// 判断runState是否小于STOP,即是RUNNING或者SHUTDOWN// 如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池if (runStateLessThan(c, STOP)) {/** 是否突然完成* 如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环*/if (!completedAbruptly) {/*** allowCoreThreadTimeOut:是否允许core thread超时,默认false* min-默认是corePoolSize*/int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许core thread超时并且队列不为空// min为0,即允许core thread超时,这样就不需要维护核心核心线程池了// 如果workQueue不为空,则至少保持一个线程存活if (min == 0 && ! workQueue.isEmpty())min = 1;// 如果workerCount大于min,则表示满足所需,可以直接返回if (workerCountOf(c) >= min)return; // replacement not needed}// 如果是突然完成,添加一个空任务的worker线程addWorker(null, false);}}总结
在文章的最后,我们先对 ThreadPoolExecutor 的关键信息做一些总结:
线程池解决两个问题:
- 通过减少任务间的调度开销 (主要是通过线程池中的线程被重复使用的方式),来提高大量任务时的执行性能
- 提供了一种方式来管理线程和消费,维护基本数据统计等工作,比如统计已完成的任务数;
线程池容量相关参数:
- coreSize:当新任务提交时,发现运行的线程数小于 coreSize,一个新的线程将被创建,即使这时候其它工作线程是空闲的,可以通过 getCorePoolSize 方法获得 coreSize
- maxSize: 当任务提交时,coreSize < 运行线程数 <= maxSize,但队列没有满时,任务提交到队列中,如果队列满了,在 maxSize 允许的范围内新建线程;
- 一般来说,coreSize 和 maxSize 在线程池初始化时就已经设定了,但我们也可以通过 setCorePoolSize、setMaximumPoolSize 方法动态的修改这两个值;
Keep-alive times 参数:
- 作用: 如果当前线程池中有超过 coreSize 的线程,并且线程空闲的时间超过 keepAliveTime,当前线程就会被回收,这样可以避免线程没有被使用时的资源浪费;
- 通过 setKeepAliveTime 方法可以动态的设置 keepAliveTime 的值;
- 如果设置 allowCoreThreadTimeOut 为 ture 的话,core thread 空闲时间超过 keepAliveTime 的话,也会被回收;
线程池新建时的队列选择有很多,比如:
- ArrayBlockingQueue,有界队列,可以防止资源被耗尽;
- LinkedBlockingQueue,无界队列,未消费的任务可以在队列中等待
- SynchronousQueue,为了避免任务被拒绝,要求线程池的 maxSize 无界,缺点是当任务提交的速度超过消费的速度时,可能出现无限制的线程增长
拒绝策略:在 Executor 已经关闭或对最大线程和最大队列都使用饱和时,可以使用 RejectedExecutionHandler 类进行异常捕捉。有如下四种处理策略:
- AbortPolicy(默认):抛出异常
- CallerRunsPolicy:不使用线程池,主线程来执行
- DiscardPolicy:直接丢弃任务
- DiscardOldestPolicy:丢弃队列中最老任务
ExecutorService 使用线程池中的线程执行提交的任务,线程池我们可以使用 Executors 进行配置.Executors 为常用的场景设定了可直接初始化线程池的方法,比如:
- Executors#newCachedThreadPool 无界的线程池,并且可以自动回收
- Executors#newFixedThreadPool 固定大小线程池
- Executors#newSingleThreadExecutor 单个线程的线程池;
另外,线程池提供了很多可供扩展的钩子函数,比如有:
- 提供在每个任务执行之前 beforeExecute 和执行之后 afterExecute 的钩子方法,主要用于操作执行环境,比如初始化 ThreadLocals、收集统计数据、添加日志条目等
- 如果在执行器执行完成之后想干一些事情,可以实现 terminated 方法,如果钩子方法执行时发生异常,工作线程可能会失败并立即终止。
总结
以上是生活随笔为你收集整理的【多线程】ThreadPoolExecutor类万字源码解析(注解超级详细)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 【多线程】ThreadPoolExecu
- 下一篇: 【多线程】ThreadPoolExecu