欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

高并发编程-自定义简易的线程池(2),体会原理

发布时间:2025/3/21 编程问答 60 豆豆
生活随笔 收集整理的这篇文章主要介绍了 高并发编程-自定义简易的线程池(2),体会原理 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

文章目录

  • 概述
  • 示例

概述

高并发编程-自定义简易的线程池(1),体会原理 中只实现了任务队列,我们这里把其余的几个也补充进来

  • 拒绝策略
  • 关闭线程池
  • 最小 最大 活动线程数

示例

比较简单,直接上代码,见注释

package com.artisan.test;import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List;public class SimpleThreadPool extends Thread {// 线程数private int size;// 队列大小private final int queueSize;// 默认队列大小private final static int DEFAULT_TASK_QUEUE_SIZE = 2000;// 工作线程相关属性private static volatile int seq = 0;private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");// 任务队列private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();// 线程队列private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();// 拒绝策略private final DiscardPolicy discardPolicy;// 默认拒绝策略public final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {throw new DiscardException("Discard This Task.");};//是否销毁private volatile boolean destroy = false;// 最小线程数private int min;// 最大线程数private int max;// 活动线程数private int active;/*** 默认无参构造函数*/public SimpleThreadPool() {// 调用有参构造函数this(4, 8, 12, DEFAULT_TASK_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);}/*** 构造函数* @param min* @param active* @param max* @param queueSize* @param discardPolicy*/public SimpleThreadPool(int min, int active, int max, int queueSize, DiscardPolicy discardPolicy) {this.min = min;this.active = active;this.max = max;this.queueSize = queueSize;this.discardPolicy = discardPolicy;init();}/*** 初始化方法*/private void init() {for (int i = 0; i < this.min; i++) {//createWorkTask();}this.size = min;this.start();}/*** 提交任务到 TASK_QUEUE* @param runnable*/public void submit(Runnable runnable) {if (destroy)throw new IllegalStateException("The thread pool already destroy and not allow submit task.");synchronized (TASK_QUEUE) {if (TASK_QUEUE.size() > queueSize)discardPolicy.discard();TASK_QUEUE.addLast(runnable);TASK_QUEUE.notifyAll();}}/*** 管理线程池本身的参数*/@Overridepublic void run() {while (!destroy) {System.out.printf("Pool#Min:%d,Active:%d,Max:%d,Current:%d,QueueSize:%d\n",this.min, this.active, this.max, this.size, TASK_QUEUE.size());try {Thread.sleep(5_000L);if (TASK_QUEUE.size() > active && size < active) {for (int i = size; i < active; i++) {createWorkTask();}System.out.println("The pool incremented to active.");size = active;} else if (TASK_QUEUE.size() > max && size < max) {for (int i = size; i < max; i++) {createWorkTask();}System.out.println("The pool incremented to max.");size = max;}synchronized (THREAD_QUEUE) {if (TASK_QUEUE.isEmpty() && size > active) {System.out.println("=========Reduce========");int releaseSize = size - active;for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator(); it.hasNext(); ) {if (releaseSize <= 0)break;WorkerTask task = it.next();task.close();task.interrupt();it.remove();releaseSize--;}size = active;}}} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建工作线程*/private void createWorkTask() {WorkerTask task = new WorkerTask(GROUP, THREAD_PREFIX + (seq++));task.start();THREAD_QUEUE.add(task);}/*** 关闭线程池* @throws InterruptedException*/public void shutdown() throws InterruptedException {while (!TASK_QUEUE.isEmpty()) {Thread.sleep(50);}synchronized (THREAD_QUEUE) {int initVal = THREAD_QUEUE.size();while (initVal > 0) {for (WorkerTask task : THREAD_QUEUE) {if (task.getTaskState() == TaskState.BLOCKED) {task.interrupt();task.close();initVal--;} else {Thread.sleep(10);}}}}System.out.println(GROUP.activeCount());this.destroy = true;System.out.println("The thread pool disposed.");}public int getQueueSize() {return queueSize;}public int getSize() {return size;}public boolean isDestroy() {return this.destroy;}public int getMin() {return min;}public int getMax() {return max;}public int getActive() {return active;}/*** 工作线程的状态*/private enum TaskState {FREE, RUNNING, BLOCKED, DEAD}/*** 触发决绝策略抛出的异常*/public static class DiscardException extends RuntimeException {public DiscardException(String message) {super(message);}}/*** 拒绝策略接口*/public interface DiscardPolicy {void discard() throws DiscardException;}/*** 工作线程*/private static class WorkerTask extends Thread {private volatile TaskState taskState = TaskState.FREE;public WorkerTask(ThreadGroup group, String name) {super(group, name);}public TaskState getTaskState() {return this.taskState;}public void run() {OUTER:while (this.taskState != TaskState.DEAD) {Runnable runnable;synchronized (TASK_QUEUE) {while (TASK_QUEUE.isEmpty()) {try {taskState = TaskState.BLOCKED;TASK_QUEUE.wait();} catch (InterruptedException e) {System.out.println("Closed.");break OUTER;}}runnable = TASK_QUEUE.removeFirst();}if (runnable != null) {taskState = TaskState.RUNNING;runnable.run();taskState = TaskState.FREE;}}}/*** 关闭*/public void close() {this.taskState = TaskState.DEAD;}}/*** 测试* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {SimpleThreadPool threadPool = new SimpleThreadPool();for (int i = 0; i < 40; i++) {threadPool.submit(() -> {System.out.println("The runnable be serviced by " + Thread.currentThread() + " start.");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("The runnable be serviced by " + Thread.currentThread() + " finished.");});}Thread.sleep(10000);threadPool.shutdown();/* Thread.sleep(10000);threadPool.shutdown();threadPool.submit(() -> System.out.println("======="));*/} }

运行日志:

"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=53638:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.test.SimpleThreadPool The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. Pool#Min:4,Active:8,Max:12,Current:4,QueueSize:38 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The pool incremented to active. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. Pool#Min:4,Active:8,Max:12,Current:8,QueueSize:31 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] start. The pool incremented to max. Pool#Min:4,Active:8,Max:12,Current:12,QueueSize:15 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. Closed. Closed. Closed. Closed. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. Closed. Closed. Closed. Closed. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] finished. Closed. 4 =========Reduce======== The thread pool disposed. Closed. Closed. Closed.Process finished with exit code 0

总结

以上是生活随笔为你收集整理的高并发编程-自定义简易的线程池(2),体会原理的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。