欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用...

发布时间:2025/4/14 编程问答 30 豆豆
生活随笔 收集整理的这篇文章主要介绍了 获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用... 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

 

方式一:

通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 

Java代码  
  • public class CompletionServiceTest {  
  •   
  •     static class Task implements Callable<String>{  
  •         private int i;  
  •           
  •         public Task(int i){  
  •             this.i = i;  
  •         }  
  •   
  •         @Override  
  •         public String call() throws Exception {  
  •             Thread.sleep(10000);  
  •             return Thread.currentThread().getName() + "执行完任务:" + i;  
  •         }     
  •     }  
  •       
  •     public static void main(String[] args){  
  •         testUseFuture();  
  •     }  
  •       
  •     private static void testUseFuture(){  
  •         int numThread = 5;  
  •         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  •         List<Future<String>> futureList = new ArrayList<Future<String>>();  
  •         for(int i = 0;i<numThread;i++ ){  
  •             Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
  •             futureList.add(future);  
  •         }  
  •                   
  •         while(numThread > 0){  
  •             for(Future<String> future : futureList){  
  •                 String result = null;  
  •                 try {  
  •                     result = future.get(0, TimeUnit.SECONDS);  
  •                 } catch (InterruptedException e) {  
  •                     e.printStackTrace();  
  •                 } catch (ExecutionException e) {  
  •                     e.printStackTrace();  
  •                 } catch (TimeoutException e) {  
  •                     //超时异常直接忽略  
  •                 }  
  •                 if(null != result){  
  •                     futureList.remove(future);  
  •                     numThread--;  
  •                     System.out.println(result);  
  •                     //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)  
  •                     break;  
  •                 }  
  •             }  
  •         }  
  •     }  
  • }  
  •  方式二:

    第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

    Java代码  
  • public class CompletionServiceTest {  
  •   
  •     static class Task implements Callable<String>{  
  •         private int i;  
  •           
  •         public Task(int i){  
  •             this.i = i;  
  •         }  
  •   
  •         @Override  
  •         public String call() throws Exception {  
  •             Thread.sleep(10000);  
  •             return Thread.currentThread().getName() + "执行完任务:" + i;  
  •         }     
  •     }  
  •       
  •     public static void main(String[] args) throws InterruptedException, ExecutionException{  
  •         testExecutorCompletionService();  
  •     }  
  •       
  •     private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
  •         int numThread = 5;  
  •         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  •         CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
  •         for(int i = 0;i<numThread;i++ ){  
  •             completionService.submit(new CompletionServiceTest.Task(i));  
  •         }  
  • }  
  •           
  •         for(int i = 0;i<numThread;i++ ){       
  •             System.out.println(completionService.take().get());  
  •         }  
  •           
  •     }  
  •  

    ExecutorCompletionService分析:

     CompletionService是Executor和BlockingQueue的结合体。

    Java代码  
  • public ExecutorCompletionService(Executor executor) {  
  •         if (executor == null)  
  •             throw new NullPointerException();  
  •         this.executor = executor;  
  •         this.aes = (executor instanceof AbstractExecutorService) ?  
  •             (AbstractExecutorService) executor : null;  
  •         this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
  •     }  
  •  任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture,

    Java代码  
  • public Future<V> submit(Callable<V> task) {  
  •         if (task == null) throw new NullPointerException();  
  •         RunnableFuture<V> f = newTaskFor(task);  
  •         executor.execute(new QueueingFuture(f));  
  •         return f;  
  •     }  
  •  QueueingFuture是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

     

    Java代码  
  • private class QueueingFuture extends FutureTask<Void> {  
  •         QueueingFuture(RunnableFuture<V> task) {  
  •             super(task, null);  
  •             this.task = task;  
  •         }  
  •         protected void done() { completionQueue.add(task); }  
  •         private final Future<V> task;  
  •     }  
  •  而通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

    Java代码  
  • public Future<V> take() throws InterruptedException {  
  •     return completionQueue.take();  
  • }  
  •   
  • public Future<V> poll() {  
  •     return completionQueue.poll();  
  • 原文:http://xw-z1985.iteye.com/blog/1997077
  • 转载于:https://www.cnblogs.com/langtianya/p/5013353.html

    总结

    以上是生活随笔为你收集整理的获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用...的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。