欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程语言 > java >内容正文

java

java8并行流_Java 8:CompletableFuture与并行流

发布时间:2023/12/3 java 45 豆豆
生活随笔 收集整理的这篇文章主要介绍了 java8并行流_Java 8:CompletableFuture与并行流 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

java8并行流

这篇文章展示了Java 8的CompletableFuture在执行异步计算时如何与并行流进行比较。

我们将使用以下类对长时间运行的任务进行建模:

class MyTask {private final int duration;public MyTask(int duration) {this.duration = duration;}public int calculate() {System.out.println(Thread.currentThread().getName());try {Thread.sleep(duration * 1000);} catch (final InterruptedException e) {throw new RuntimeException(e);}return duration;} }

让我们创建十个任务,每个任务持续1秒:

List<MyTask> tasks = IntStream.range(0, 10).mapToObj(i -> new MyTask(1)).collect(toList());

我们如何有效地计算任务清单?

方法1:依次

您首先想到的是按顺序计算任务,如下所示:

public static void runSequentially(List<MyTask> tasks) {long start = System.nanoTime();List<Integer> result = tasks.stream().map(MyTask::calculate).collect(toList());long duration = (System.nanoTime() - start) / 1_000_000;System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);System.out.println(result); }

如您所料,这需要10秒钟才能运行,因为每个任务都在main线程上一个接一个地运行。

方法2:使用并行流

快速改进是将您的代码转换为使用并行流,如下所示:

public static void useParallelStream(List<MyTask> tasks) {long start = System.nanoTime();List<Integer> result = tasks.parallelStream().map(MyTask::calculate).collect(toList());long duration = (System.nanoTime() - start) / 1_000_000;System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);System.out.println(result); }

输出是

main ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-2 main ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Processed 10 tasks in 3043 millis

这次花了3秒,因为并行执行了4个任务(使用了来自ForkJoinPool三个线程以及main线程)。

方法3:使用CompletableFutures

让我们看看CompletableFuture的性能是否更好:

public static void useCompletableFuture(List<MyTask> tasks) {long start = System.nanoTime();List<CompletableFuture<Integer>> futures =tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> t.calculate())).collect(Collectors.toList());List<Integer> result =futures.stream().map(CompletableFuture::join).collect(Collectors.toList());long duration = (System.nanoTime() - start) / 1_000_000;System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);System.out.println(result); }

在上面的代码中,我们首先获取CompletableFuture的列表,然后在每个CompletableFuture调用join方法以等待它们CompletableFuture完成。 请注意, join与get相同,唯一的区别是前者不引发任何检查的异常,因此在lambda表达式中更为方便。

另外,您必须使用两个单独的流管道,而不是将两个map操作彼此放在后面,因为中间流操作是惰性的,您将不得不按顺序处理任务! 这就是为什么您首先需要在列表中收集CompletableFuture ,以允许它们在等待完成之前启动。

输出是

ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 Processed 10 tasks in 4010 millis

处理10个任务花了4秒钟。 您会注意到,仅使用了3个ForkJoinPool线程,并且与并行流不同,没有使用main线程。

方法4:将CompletableFutures与自定义执行器一起使用

与并行流相比, CompletableFuture的优点之一是它们允许您指定其他Executor来向其提交任务。 这意味着您可以根据应用程序选择更合适的线程数。 由于我的示例不是很占用CPU,因此可以选择将线程数增加到大于Runtime.getRuntime().getAvailableProcessors() ,如下所示:

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {long start = System.nanoTime();ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));List<CompletableFuture<Integer>> futures =tasks.stream().map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor)).collect(Collectors.toList());List<Integer> result =futures.stream().map(CompletableFuture::join).collect(Collectors.toList());long duration = (System.nanoTime() - start) / 1_000_000;System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);System.out.println(result);executor.shutdown(); }

输出是

pool-1-thread-2 pool-1-thread-4 pool-1-thread-3 pool-1-thread-1 pool-1-thread-5 pool-1-thread-6 pool-1-thread-7 pool-1-thread-8 pool-1-thread-9 pool-1-thread-10 Processed 10 tasks in 1009 millis

经过改进,现在仅需1秒即可处理10个任务。

如您所见, CompletableFuture s提供了对线程池大小的更多控制,如果您的任务涉及I / O,则应使用CompletableFuture 。 但是,如果您要执行CPU密集型操作,则线程数不会超过处理器没有意义,因此请选择并行流,因为它更易于使用。

翻译自: https://www.javacodegeeks.com/2016/06/java-8-completablefuture-vs-parallel-stream.html

java8并行流

总结

以上是生活随笔为你收集整理的java8并行流_Java 8:CompletableFuture与并行流的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。