欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

ThreadPoolExecutor(二)——execute

发布时间:2024/3/13 编程问答 33 豆豆
生活随笔 收集整理的这篇文章主要介绍了 ThreadPoolExecutor(二)——execute 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

 1.execute方法

/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

看方法注释:

执行给定的任务在将来的某个时间。该任务可能会用一个新的线程来执行,也有可能用线程池中一个已有的线程来执行。

如果该executor已经被shutdown了,或者因为容量已满,任务不会被执行,通过RejectedExecutionHandler来处理剩下流程。

将来的某个时间执行:说的是任务会入队列,然后根据线程池目前的各项指标状况来决定何时执行。

新的线程或已有线程:根据线程池的各项指标状况来决定是唤醒线程池中一个已有的阻塞线程来执行还是new一个Thread来执行任务。

2.execute方法的三个步骤

看方法内部注释:

1.如果当前正在run的线程数小于corePoolSize,那么就调用addWorker方法来new一个线程用来执行传入的任务。

对应代码片:

int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}

2.如果addWorker方法执行失败了,任务要入队列,如果成功入队列了,需要做double check来处理一些极端情况,比如线程池是否shutdown了等等。

对应代码片:

if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}

3.如果任务无法入队列,再次尝试addWorker,这次是用正在run的线程数和maximumPoolSize比,如果超过了maximumPoolSize则reject任务,说明线程池已经饱和了。

对应代码片:

else if (!addWorker(command, false))reject(command);

3.addWorker

/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked, which requires a* backout of workerCount, and a recheck for termination, in case* the existence of this worker was holding up termination.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}Worker w = new Worker(firstTask);Thread t = w.thread;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}t.start();// It is possible (but unlikely) for a thread to have been// added to workers, but not yet started, during transition to// STOP, which could result in a rare missed interrupt,// because Thread.interrupt is not guaranteed to have any effect// on a non-yet-started Thread (see Thread#interrupt).if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())t.interrupt();return true;}

先看注释:

检查根据当前线程池的状态是否允许添加一个新的Worker,如果可以,调整wc(workerCount,以后都用wc表示),代码块:

if (compareAndIncrementWorkerCount(c))break retry;

如果线程被stop了或者可以shutdown,addWorker方法返回false。

如果thread工厂创建线程失败,需要a backout of workerCount(这是个啥?!),代码块:

// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}

3.1.addWorker局部分析(一)

// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;

这个代码块的判断,如果是STOP,TIDYING和TERMINATED这三种状态,都会返回false。

如果是SHUTDOWN,还要判断firstTask和workQueue的状况,如果firstTask不是null,返回false。

如果firstTask是null,判断workQueue的状况,workQueue是空的时候,返回false。

这个还要再看看,SHUTDOWN状态下为什么要判断firstTask和队列,是要保证在SHUTDOWN的时候,新添加进来和队列中剩余的task要正常执行完吗

3.2.addWorker局部分析(二)

for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}

这个代码块比上一个代码块更容易理解,是正常流程,当线程池没有处于RUNNING之外的几种状态的时候。

这时候的处理流程就是线程池是否创建线程的正常语义,依次进行下列比较:

1.和CAPACITY比较

如果当前wc超过CAPACITY(这个基本上不可能),返回false。

2.入参core为true,表示

转存失败重新上传取消addWorker的时候,wc还没到达corePoolSize,和corePoolSize比较

如果超过corePoolSize,返回false。否则原子操作compareAndIncrementWorkerCount修改wc值。

3.入参core为false,表示addWorker的时候队列已满,wc和maximumPoolSize比较

如果超过maximumPoolSize,返回false,否则原子操作compareAndIncrementWorkerCount修改wc值。

3.3.addWorker局部分析(三)

final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}

这个代码块,首先加锁,整个类用到这个锁的地方,除了获取该线程池的一些关键参数之外,就是shutdown和terminate等相关操作。

注释里说,Back out on ThreadFactory failure or if shut down before lock acquired,需要再看看。

如果这个if判断没有走,用该task构建的Worker就可以正常添加到workers这个HashSet中。

4.ctl

ctl是控制线程池状态的变量,由两部分组成,runState(高位)和workerCount(低28位),

private static int CAPACITY = (1 << COUNT_BITS) - 1;

CAPACITY是1左移COUNT_BITS,然后减一。
 

private static int COUNT_BITS = Integer.SIZE - 3;

COUNT_BITS是Integer的位数减去3,即29。

所以CAPACITY是100000000000000000000000000000-1=11111111111111111111111111111,表示低28位。这28位是表示运行的worker个数的。

private static final int RUNNING = -1 << COUNT_BITS;

二进制,1111111111111111111111111111111111100000000000000000000000000000,和CAPACITY互补,所以~CAPACITY也是这个值。
 

private static final int STOP = 1 << COUNT_BITS;

二进制,100000000000000000000000000000000
 

private static final int TIDYING = 2 << COUNT_BITS;

二进制,1000000000000000000000000000000000

private static final int TERMINATED = 3 << COUNT_BITS;

二进制,1100000000000000000000000000000000

放在一起对比看,状态位置看标红三位:

1111111111111111111111111111111111100000000000000000000000000000,RUNNING

0000000000000000000000000000000000000000000000000000000000000000,SHUTDOWN

0000000000000000000000000000000100000000000000000000000000000000,STOP

0000000000000000000000000000001000000000000000000000000000000000,TIDYING

0000000000000000000000000000001100000000000000000000000000000000,TERMINATED

5.和ctl相关的几组方法

private static int runStateOf(int c) { return c & ~CAPACITY; }

把低28位都清掉了,只拿高位的运行状态。

private static int workerCountOf(int c) { return c & CAPACITY; }

只取低28位,即拿workerCount。
 

 

总结

以上是生活随笔为你收集整理的ThreadPoolExecutor(二)——execute的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。