欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程语言 > java >内容正文

java

java用不用stream_Java parallelStream不使用预期的线程数

发布时间:2025/3/19 java 44 豆豆
生活随笔 收集整理的这篇文章主要介绍了 java用不用stream_Java parallelStream不使用预期的线程数 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

Java 8 parallelStream似乎使用的线程数多于系统属性java.util.concurrent.ForkJoinPool.common.parallelism指定的线程数.这些单元测试显示我使用自己的ForkJoinPool使用所需数量的线程处理任务,但是当使用parallelStream时,线程数高于预期.

import org.junit.Test;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;

public class ParallelStreamTest {

private static final int TOTAL_TASKS = 1000;

@Test

public void testParallelStreamWithParallelism1() throws InterruptedException {

final Integer maxThreads = 1;

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());

List objects = new ArrayList<>();

for (int i = 0; i < 1000; i++) {

objects.add(i);

}

final AtomicInteger concurrentThreads = new AtomicInteger(0);

final AtomicInteger taskCount = new AtomicInteger(0);

objects.parallelStream().forEach(i -> {

processTask(concurrentThreads, maxThreads); //expected to be called one at the time

taskCount.addAndGet(1);

});

assertTrue(taskCount.get() == TOTAL_TASKS);

}

@Test

public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {

final Integer threads = 1;

List objects = new ArrayList<>();

for (int i = 0; i < TOTAL_TASKS; i++) {

objects.add(i);

}

ForkJoinPool forkJoinPool = new ForkJoinPool(1);

final AtomicInteger concurrentThreads = new AtomicInteger(0);

final AtomicInteger taskCount = new AtomicInteger(0);

forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {

processTask(concurrentThreads, threads); //expected to be called one at the time

taskCount.addAndGet(1);

}));

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);

assertTrue(taskCount.get() == TOTAL_TASKS);

}

/**

* It simply processes a task increasing first the concurrentThreads count

*

* @param concurrentThreads Counter for threads processing tasks

* @param maxThreads Maximum number of threads that are expected to be used for processing tasks

*/

private void processTask(AtomicInteger concurrentThreads, int maxThreads) {

int currentConcurrentThreads = concurrentThreads.addAndGet(1);

if (currentConcurrentThreads > maxThreads) {

throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);

}

// actual processing would go here

concurrentThreads.decrementAndGet();

}

}

应该只有一个线程用于处理任务,因为ForkJoinPool具有parallelism = 1和java.util.concurrent.ForkJoinPool.common.parallelism = 1.因此,两个测试都应该通过,但testParallelStreamWithParallelism1失败:

java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2

似乎设置java.util.concurrent.ForkJoinPool.common.parallelism = 1没有按预期工作,并且同时处理了多个并发任务.

有任何想法吗?

解决方法:

Fork / Join池的并行性设置确定了池工作线程的数量,但是从调用者线程开始,例如,主线程也将在作业上工作,使用公共池时总会有一个线程.这就是default setting of the common pool is “number of cores minus one”让实际工作线程数等于核心数的原因.

使用自定义Fork / Join池,流操作的调用者线程已经是池的工作线程,因此,利用它来处理作业不会增加实际工作线程数.

必须强调的是,Stream实现和Fork / Join池之间的交互完全没有指定,因为流使用Fork / Join框架的事实是一个实现细节.无法保证更改默认池的属性对流有任何影响,也不保证在自定义Fork / Join池的任务中调用流操作将使用该自定义池.

标签:java,java-8,java-stream,multithreading

来源: https://codeday.me/bug/20190828/1753500.html

总结

以上是生活随笔为你收集整理的java用不用stream_Java parallelStream不使用预期的线程数的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。