1. 概述

Java 8引入了Stream API,可以轻松地将集合迭代为数据流。创建并行执行并利用多个处理器内核的流也非常容易。

我们可能会认为在更多内核上分配工作总是更快。但事实往往并非如此。

在本教程中,我们将探讨顺序流和并行流之间的差异。我们将首先查看并行流使用的默认分叉联接池。

我们还将考虑使用并行流的性能影响,包括内存局部性和拆分/合并成本。

最后,我们将建议何时将顺序流隐蔽为并行流。

2. Java 中的流

Java 中的流只是数据源的包装器,允许我们以方便的方式对数据执行批量操作。

它不会存储数据或对基础数据源进行任何更改。相反,它增加了对数据管道的功能样式操作的支持。

2.1. 顺序流

默认情况下,Java 中的任何流操作都是按顺序处理的,除非明确指定为并行。

顺序流使用单个线程来处理管道:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);

listOfNumbers.stream().forEach(number ->

System.out.println(number + " " + Thread.currentThread().getName())

);Copy

此顺序流的输出是可预测的。列表元素将始终按顺序打印:

1 main

2 main

3 main

4 mainCopy

2.2. 并行流

Java 中的任何流都可以轻松地从顺序转换为并行。

我们可以通过将并行方法添加到顺序流或使用集合的parallelStream方法创建流来实现这一点:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);

listOfNumbers.parallelStream().forEach(number ->

System.out.println(number + " " + Thread.currentThread().getName())

);Copy

并行流使我们能够在单独的内核上并行执行代码。最终结果是每个单独结果的组合。

但是,执行顺序不受我们控制。每次我们运行程序时,它都可能会更改:

4 ForkJoinPool.commonPool-worker-3

2 ForkJoinPool.commonPool-worker-5

1 ForkJoinPool.commonPool-worker-7

3 mainCopy

3. 分叉连接框架

并行流利用fork-join框架及其公共工作线程池。

fork-join 框架被添加到 Java 7 中的java.util.parallel中,以处理多个线程之间的任务管理。

3.1. 拆分源

fork-join 框架负责在工作线程之间拆分源数据,并在任务完成时处理回调。

让我们看一个并行计算整数总和的示例。

我们将使用reduce方法,在起始值为5基础上,再一起求和,而不是从零开始:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);

int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);

assertThat(sum).isNotEqualTo(15);Copy

在顺序流中,此操作的结果将为 15。

但是由于reduce操作是并行处理的,因此数字 5 实际上在每个工作线程中相加:

 

 

实际结果可能会有所不同,具体取决于公共分叉联接池中使用的线程数。

为了解决此问题,应在并行流之外添加数字 5:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);

int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;

assertThat(sum).isEqualTo(15);Copy

因此,我们需要注意哪些操作可以并行运行。

3.2. 公共线程池

公共池中的线程数等于(处理器内核数 -1)。

但是,API 允许我们通过传递 JVM 参数来指定它将使用的线程数:

-D java.util.concurrent.ForkJoinPool.common.parallelism=4Copy

请务必记住,这是一个全局设置,它将影响所有并行流和使用公共池的任何其他分叉联接任务。我们强烈建议不要修改此参数,除非我们有很好的理由这样做。

3.3. 自定义线程池

除了在默认的公共线程池中,还可以在自定义线程池中运行并行流:

List listOfNumbers = Arrays.asList(1, 2, 3, 4);

ForkJoinPool customThreadPool = new ForkJoinPool(4);

int sum = customThreadPool.submit(

() -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();

customThreadPool.shutdown();

assertThat(sum).isEqualTo(10);Copy

请注意,Oracle 建议使用公共线程池。我们应该有一个很好的理由在自定义线程池中运行并行流。

4. 性能影响

并行处理可能有利于充分利用多个内核。但是我们还需要考虑管理多个线程、内存局部性、拆分源和合并结果的开销。

4.1. 开销

让我们看一个示例整数流。

我们将对顺序和并行缩减操作运行基准测试:

IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);

IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);Copy

在这种简单的求和缩减中,将顺序流转换为并行流会导致性能下降:

Benchmark Mode Cnt Score Error Units

SplittingCosts.sourceSplittingIntStreamParallel avgt 25 35476,283 ± 204,446 ns/op

SplittingCosts.sourceSplittingIntStreamSequential avgt 25 68,274 ± 0,963 ns/opCopy

这背后的原因是,有时管理线程、源和结果的开销比执行实际工作的成本更高。

4.2. 分摊成本

均匀拆分数据源是启用并行执行的必要成本,但某些数据源的拆分效果优于其他数据源。

让我们使用ArrayList和LinkedList 来演示这一点:

private static final List arrayListOfNumbers = new ArrayList<>();

private static final List linkedListOfNumbers = new LinkedList<>();

static {

IntStream.rangeClosed(1, 1_000_000).forEach(i -> {

arrayListOfNumbers.add(i);

linkedListOfNumbers.add(i);

});

}Copy

我们将对两种类型的列表进行顺序和并行缩减操作运行基准测试:

arrayListOfNumbers.stream().reduce(0, Integer::sum)

arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);

linkedListOfNumbers.stream().reduce(0, Integer::sum);

linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);Copy

我们的结果表明,将顺序流转换为并行流只会给ArrayList 带来性能优势:

Benchmark Mode Cnt Score Error Units

DifferentSourceSplitting.differentSourceArrayListParallel avgt 25 2004849,711 ± 5289,437 ns/op

DifferentSourceSplitting.differentSourceArrayListSequential avgt 25 5437923,224 ± 37398,940 ns/op

DifferentSourceSplitting.differentSourceLinkedListParallel avgt 25 13561609,611 ± 275658,633 ns/op

DifferentSourceSplitting.differentSourceLinkedListSequential avgt 25 10664918,132 ± 254251,184 ns/opCopy

这背后的原因是数组可以廉价而均匀地拆分,而 LinkedList没有这些属性。TreeMap和HashSet比LinkedList拆分得更好,但不如数组。

4.3. 合并成本

每次我们拆分源进行并行计算时,我们还需要确保最终合并结果。

让我们在顺序和并行流上运行一个基准测试,将总和和分组作为不同的合并操作:

arrayListOfNumbers.stream().reduce(0, Integer::sum);

arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);

arrayListOfNumbers.stream().collect(Collectors.toSet());

arrayListOfNumbers.stream().parallel().collect(Collectors.toSet())Copy

我们的结果表明,将顺序流转换为并行流只会为求和运算带来性能优势:

Benchmark Mode Cnt Score Error Units

MergingCosts.mergingCostsGroupingParallel avgt 25 135093312,675 ± 4195024,803 ns/op

MergingCosts.mergingCostsGroupingSequential avgt 25 70631711,489 ± 1517217,320 ns/op

MergingCosts.mergingCostsSumParallel avgt 25 2074483,821 ± 7520,402 ns/op

MergingCosts.mergingCostsSumSequential avgt 25 5509573,621 ± 60249,942 ns/opCopy

合并操作对于某些操作(例如缩减和添加)非常便宜,但是合并操作(如分组到集合或映射)可能非常昂贵。

4.4. 内存局部性

现代计算机使用复杂的多级缓存将常用数据保存在处理器附近。当检测到线性内存访问模式时,硬件会预取下一行数据,前提是可能很快就会需要这些数据。

当我们可以让处理器内核忙于做有用的工作时,并行性会带来性能优势。由于等待缓存未命中不是有用的工作,因此我们需要考虑内存带宽作为限制因素。

让我们使用两个数组来演示这一点,一个使用基元类型,另一个使用对象数据类型:

private static final int[] intArray = new int[1_000_000];

private static final Integer[] integerArray = new Integer[1_000_000];

static {

IntStream.rangeClosed(1, 1_000_000).forEach(i -> {

intArray[i-1] = i;

integerArray[i-1] = i;

});

}Copy

我们将对两个阵列的顺序和并行归约操作运行基准测试:

Arrays.stream(intArray).reduce(0, Integer::sum);

Arrays.stream(intArray).parallel().reduce(0, Integer::sum);

Arrays.stream(integerArray).reduce(0, Integer::sum);

Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);Copy

我们的结果表明,当使用基元数组时,将顺序流转换为并行流会带来更多的性能优势:

Benchmark Mode Cnt Score Error Units

MemoryLocalityCosts.localityIntArrayParallel sequential stream avgt 25 116247,787 ± 283,150 ns/op

MemoryLocalityCosts.localityIntArraySequential avgt 25 293142,385 ± 2526,892 ns/op

MemoryLocalityCosts.localityIntegerArrayParallel avgt 25 2153732,607 ± 16956,463 ns/op

MemoryLocalityCosts.localityIntegerArraySequential avgt 25 5134866,640 ± 148283,942 ns/opCopy

一系列原语带来了Java中最好的局部性。一般来说,数据结构中的指针越多,我们给内存施加的压力就越大,以获取引用对象。这可能会对并行化产生负面影响,因为多个内核同时从内存中获取数据。

4.5.NQ模型

Oracle 提出了一个简单的模型,可以帮助我们确定并行性是否可以为我们提供性能提升。在NQ模型中,N表示源数据元素的数量,而Q表示每个数据元素执行的计算量。

N*Q 的乘积越大,我们就越有可能从并行化中获得性能提升。对于Q 非常小的问题,例如对数字求和,经验法则是N应大于 10,000。随着计算数量的增加,从并行性中获得性能提升所需的数据大小会减少。

4.6. 文件搜索成本

与顺序流相比,使用并行流的文件搜索性能更好。让我们在顺序和并行流上运行一个基准测试,以搜索超过 1500 个文本文件:

Files.walk(Paths.get("src/main/resources/")).map(Path::normalize).filter(Files::isRegularFile)

.filter(path -> path.getFileName().toString().endsWith(".txt")).collect(Collectors.toList());

Files.walk(Paths.get("src/main/resources/")).parallel().map(Path::normalize).filter(Files::

isRegularFile).filter(path -> path.getFileName().toString().endsWith(".txt")).

collect(Collectors.toList());Copy

我们的结果表明,在搜索大量文件时,将顺序流转换为并行流会带来更多的性能优势:

Benchmark Mode Cnt Score Error Units

FileSearchCost.textFileSearchParallel avgt 25 10808832.831 ± 446934.773 ns/op

FileSearchCost.textFileSearchSequential avgt 25 13271799.599 ± 245112.749 ns/opCopy

5. 何时使用并行流

正如我们所看到的,在使用并行流时,我们需要非常体贴。

在某些用例中,并行性可以带来性能优势。但并行流不能被视为神奇的性能助推器。因此,在开发过程中仍应将顺序流用作默认值。

当我们有实际的性能要求时,可以将顺序流转换为并行流。鉴于这些要求,我们应该首先运行性能测量,并将并行性视为可能的优化策略。

大量的数据和每个元素完成的大量计算表明并行性可能是一个不错的选择。

另一方面,少量数据、不均匀拆分源、昂贵的合并操作和较差的内存局部性表明并行执行存在潜在问题。

6. 结论

在本文中,我们探讨了 Java 中顺序流和并行流之间的区别。我们了解到并行流使用默认的分叉联接池及其工作线程。

然后,我们看到了并行流并不总是带来性能优势。我们考虑了管理多个线程、内存局部性、拆分源和合并结果的开销。我们看到数组是并行执行的绝佳数据源,因为它们带来了最佳局部性,并且可以廉价且均匀地拆分。

最后,我们研究了NQ模型,并建议仅在有实际性能要求时才使用并行流。

相关链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。