欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Java高并发编程:Callable、Future和FutureTask

发布时间:2025/4/16 54 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Java高并发编程:Callable、Future和FutureTask 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

1. Callable

泛型接口,用于获取线程执行完的结果,Callable的声明如下

public interface Callable<V> {// 返回 V 类型的结果V call() throws Exception; }

Callable 接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常,而Callable是可返回结果并且可以抛出异常的任务

public class MyCallable implements Callable{private int number;public MyCallable(int number) {this.number = number;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int x = 1; x <= number; x++) {sum += x;}return sum;} }public class CallableDemo {ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> future = pool.submit(new SumCallable(10));Integer sum = future.get();pool.shutdown();class SumCallable implements Callable<Integer> {private int number;public SumCallable(int number){this.number = number;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int i=0; i<number; i++){sum += i;}return sum;}} }

2. Future

Future 为线程池提供了一个可管理的任务标准,它提供了对Runnable或Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作

Future 接口表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用get()方法来获取结果,如有必要,计算完成前可以阻塞此方法

Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。Callable要采用ExecutorService的submit()方法提交,返回的future对象可以取消任务。Future声明如下

public interface Future<V> {boolean cancel(boolean var1);// 该任务是否已经取消boolean isCancelled();// 判断是否已经完成boolean isDone();// 获取结果,该方法会阻塞V get() throws InterruptedException, ExecutionException;// 获取结果,如果还未完成那么等待,直到timeout或返回结果,该方法会阻塞V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException; }

Future的简单使用

// 创建线程池对象 ExecutorService pool = Executors.newFixedThreadPool(2); // 可以执行Runnable对象或者Callable对象代表的线程 Future<Integer> future = pool.submit(new MyCallable(100)); Integer i1 = future.get(); pool.shutdown();

Future常用方法

方法声明功能描述
cancel()取消任务
isCanceled()任务是否取消
isDone()任务是否完成
get()获取结果,get()方法会阻塞,直到任务返回结果
set()设置结果

3. FutureTask

Future只是定义了一些接口规范,而FutureTask则是它的实现类

public class FutureTask<V> implements RunnableFuture<V>{// 代码省略 }public interface RunnableFuture<V> extends Runnable, Future<V>{void run(); }

FutureTask会像Thread包装Runnable那样对Runnable和Callable进行包装,Runnable和Callable由构造函数注入

public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable }

由于FutureTask实现了Runnable,因此,它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行,并且还可以直接通过get()方法获取执行结果,该方法会阻塞,直到结果返回。因此,FutureTask既是Future、Runnable,又是包装了Callable,它是两者的合体

public class FutureDemo {// 线程池static ExecutorService mExecutor = Executors.newSingleThreadExecutor();// main函数public static void main(String[] args) {try {futureWithRunnable();futureWithCallable();futureTask();} catch (Exception e) {}}/*** 其中Runnable实现的是void run()方法,无返回值;Callable实现的是 V* call()方法,并且可以返回执行结果。其中Runnable可以提交给Thread来包装下* ,直接启动一个线程来执行,而Callable则一般都是提交给ExecuteService来执行。*/private static void futureWithRunnable() throws InterruptedException, ExecutionException {// 提交runnable则没有返回值, future没有数据Future<?> result = mExecutor.submit(new Runnable() {@Overridepublic void run() {fibc(20);}});System.out.println("future result from runnable : " + result.get());}private static void futureWithCallable() throws InterruptedException, ExecutionException {/*** 提交Callable, 有返回值, future中能够获取返回值*/Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return fibc(20);}});System.out.println("future result from callable : "+ result2.get());}private static void futureTask() throws InterruptedException, ExecutionException {/*** FutureTask则是一个RunnableFuture<V>,即实现了Runnbale又实现了Futrue<V>这两个接口,* 另外它还可以包装Runnable(实际上会转换为Callable)和Callable* <V>,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行* ,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。*/FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return fibc(20);}});// 提交futureTaskmExecutor.submit(futureTask);System.out.println("future result from futureTask : "+ futureTask.get());}// 效率底下的斐波那契数列, 耗时的操作private static int fibc(int num) {if (num == 0) {return 0;}if (num == 1) {return 1;}return fibc(num - 1) + fibc(num - 2);} }

输出结果

future result from runnable : null future result from Callable : 6765 future result from futureTask : 6765

4. CompletionService

CompletionService用于提交一组Callable任务,其take()方法返回已完成的一个Callable任务对应的Future对象。

示例:这里数据的获取好比同时种了好几块地的麦子,然后等待收割,秋收时,哪块先熟,先收割哪块

import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CallableAndFuture { public static void main(String[] args){ //创建一个单独的线程 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //future泛型与Callable的类型一致 Future<String> future = threadPool.submit(new Callable<String>(){ @Override public String call() throws Exception { Thread.sleep(3000); return "hello"; } }); System.out.println("等待結果……"); //在指定时timeout内等待,未等到抛出TimeoutException //System.out.println("拿到结果:" + future.get(long timeout,TimeUnit unit)); try { System.out.println("拿到结果:" + future.get()); //获取线程执行后的结果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } //CompletionService用于提交一组Callable任务, //其take方法返回已完成的一个Callable任务对应的Future对象。 ExecutorService threadPool2 = Executors.newFixedThreadPool(10); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2); //创建10任务 for(int i=1;i<=10;i++){ final int seq = i; //将任务提交给线程池 completionService.submit(new Callable<Integer>(){ @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(5000)); return seq; } }); } //获取结果,哪个结果先返回就先获得 for(int i=0;i<10;i++){ try { System.out.println(completionService.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }

输出结果

等待結果…… 拿到结果:hello 3 6 10 2 4 8 5 1 7 9

总结

以上是生活随笔为你收集整理的Java高并发编程:Callable、Future和FutureTask的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。