ThreadPoolExecutor(三)——Worker
1.Worker
Worker是ThreadPoolExecutor的一个内部类,实现了AbstractQueuedSynchronizer抽象类。
/*** Class Worker mainly maintains interrupt control state for* threads running tasks, along with other minor bookkeeping.* This class opportunistically extends AbstractQueuedSynchronizer* to simplify acquiring and releasing a lock surrounding each* task execution. This protects against interrupts that are* intended to wake up a worker thread waiting for a task from* instead interrupting a task being run. We implement a simple* non-reentrant mutual exclusion lock rather than use ReentrantLock* because we do not want worker tasks to be able to reacquire the* lock when they invoke pool control methods like setCorePoolSize.*/看注释,该类主要是控制在线程执行任务时的interrupt操作的。它继承了AbstractQueuedSynchronizer类,实现了一个非重入的锁。该锁会保护一个正在等待任务被执行的Worker不被interrupt操作打断。
为什么不用ReentrantLock,要用非重入的锁?因为作者不想让这个Worker task在setCorePoolSize这种线程池控制方法调用时能重新获取到锁。
2.构造方法
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}用自己作为task构造一个线程,同时把外层任务赋值给自己的task成员变量,相当于对task做了一个包装。
3.run方法
run方法调用了ThreadPoolExecutor的runWorker方法,
4.runWorker方法
/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and* clearInterruptsForTaskRun called to ensure that unless pool is* stopping, this thread does not have its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to* afterExecute. We separately handle RuntimeException, Error* (both of which the specs guarantee that we trap) and arbitrary* Throwables. Because we cannot rethrow Throwables within* Runnable.run, we wrap them within Errors on the way out (to the* thread's UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Runnable task = w.firstTask;w.firstTask = null;boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();clearInterruptsForTaskRun();try {beforeExecute(w.thread, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}这是执行task的主流程。
上面说过,addWorker会用当前task创建一个Worker对象,相当于对task的包装,然后用Worker对象作为task创建一个Thread,该Thread保存在Worker的thread成员变量中。在addWorker中启动了这个线程,线程中执行runWorker方法。
先看注释:
1.首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向workQueue队列中offer任务。
getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束,返回的是null,runWorker循环结束,执行processWorkerExit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
2.在开始执行任务之前,会调用Worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
3.beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeExecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedAbruptly的值是true,表示the worker died due to user exception,会用decrementWorkerCount调整wc。
4.因为Runnable的run方法不能抛出Throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute中对异常做一些处理。
5.afterExecute方法也可能抛出异常,也可能使当前线程死掉。
总结
以上是生活随笔为你收集整理的ThreadPoolExecutor(三)——Worker的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 设计模式(二)简单工厂模式
- 下一篇: 2021年缆索式起重机司机考试内容及缆索