HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

从命令式编程到 Fork/Join 再到 Java 8 中的并行 Streams

  • 2014-03-20
  • 本文字数:4465 字

    阅读完需:约 15 分钟

Java 8 带来了很多可以使编码更简洁的特性。例如,像下面的代码:

复制代码
Collections.sort(transactions, new Comparator<Transaction>(){
public int compare(Transaction t1, Transaction t2){
return t1.getValue().compareTo(t2.getValue());
}
});

可以用替换为如下更为紧凑的代码,功能相同,但是读上去与问题语句本身更接近了:

复制代码
transactions.sort(comparing(Transaction::getValue));

Java 8 引入的主要特性是 Lambda 表达式、方法引用和新的 Streams API。它被认为是自 20 年前 Java 诞生以来语言方面变化最大的版本。要想通过详细且实际的例子来了解如何从这些特性中获益,可以参考本文作者和 Alan Mycroft 共同编写的《 Java 8 in Action: Lambdas, Streams and Functional-style programming 》一书。

这些特性支持程序员编写更简洁的代码,还使他们能够受益于多核架构。实际上,编写可以优雅地并行执行的程序还是 Java 专家们的特权。然而,借助新的 Streams API,Java 8 改变了这种状况,让每个人都能够更容易地编写利用多核架构的代码。

在这篇文章中,我们将使用以下三种风格,以不同方法计算一个大数据集的方差,并加以对比。

  1. 命令式风格
  2. Fork/Join 框架
  3. Streams API

方差是统计学中的概念,用于度量一组数的偏离程度。方差可以通过对每个数据与平均值之差的平方和求平均值来计算。例如,给定一组表示人口年龄的数:40、30、50 和 80,我们可以这样计算方差:

  1. 计算平均值:(40 + 30 + 50 + 80) / 4 = 50
  2. 计算每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. 最后平均:1400/4 = 350

命令式风格

下面是计算方差的一种典型的命令式风格实现:

复制代码
public static double varianceImperative(double[] population){
double average = 0.0;
for(double p: population){
average += p;
}
average /= population.length;
double variance = 0.0;
for(double p: population){
variance += (p - average) * (p - average);
}
return variance/population.length;
}

为什么说这是命令式的呢?我们的实现用 _ 修改状态的语句序列 _ 描述了计算过程。这里,我们显式地对人口年龄数组中的每个元素进行迭代,而且每次迭代时更新 average 和 variance 这两个局部变量。这种代码很适合只有一个 CPU 的硬件架构。确实,它可以非常直接地映射到 CPU 的指令集。

Fork/Join 框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该使用线程吗?这些线程是不是要在某个点上同步?Java 7 引入的 Fork/Join 框架缓解了一些困难,所以让我们使用该框架来开发方差算法的一个并行版本吧。

复制代码
public class ForkJoinCalculator extends RecursiveTask<Double> {
public static final long THRESHOLD = 1_000_000;
private final SequentialCalculator sequentialCalculator;
private final double[] numbers;
private final int start;
private final int end;
public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
this(numbers, 0, numbers.length, sequentialCalculator);
}
private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator
sequentialCalculator) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.sequentialCalculator = sequentialCalculator;
}
@Override
protected Double compute() {
int length = end - start;
if (length <= THRESHOLD) {
return sequentialCalculator.computeSequentially(numbers, start, end);
}
ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2,
sequentialCalculator);
leftTask.fork();
ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end,
sequentialCalculator);
Double rightResult = rightTask.compute();
Double leftResult = leftTask.join();
return leftResult + rightResult;
}
}

这里我们编写了一个 _RecursiveTask_ 类的子类,它对一个 double 数组进行切分,当子数组的长度小于等于给定阈值(THRESHOLD)时停止切分。切分完成后,对子数组进行顺序处理,并将下列接口定义的操作应用于子数组。

复制代码
public interface SequentialCalculator {
double computeSequentially(double[] numbers, int start, int end);
}

利用该基础设施,可以按如下方式并行计算方差。

复制代码
public static double varianceForkJoin(double[] population){
final ForkJoinPool forkJoinPool = new ForkJoinPool();
double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double total = 0;
for (int i = start; i < end; i++) {
total += numbers[i];
}
return total;
}
}));
final double average = total / population.length;
double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double variance = 0;
for (int i = start; i < end; i++) {
variance += (numbers[i] - average) * (numbers[i] - average);
}
return variance;
}
}));
return variance / population.length;
}

本质上,即便使用 Fork/Join 框架,相对于顺序版本,并行版本的编写和最后的调试仍然困难许多。

并行 Streams

Java 8 让我们可以以不同的方式解决这个问题。不同于编写代码指出计算如何实现,我们可以使用 Streams API 粗线条地描述让它做什么。作为结果,库能够知道如何为我们实现计算,并施以各种各样的优化。这种风格被称为 _ 声明式编程 _。Java 8 有一个为利用多核架构而专门设计的并行 Stream。我们来看一下如何使用它们来更快地计算方差。

假定读者对本节探讨的 Stream 有些了解。作为复习,Stream是 T 类型元素的一个序列,支持聚合操作。我们可以使用这些操作来创建表示计算的一个管道(pipeline)。这里的管道和 UNIX 的命令管道一样。并行 Stream 就是一个可以并行执行管道的 Stream,可以通过在普通的 Stream 上调用 parallel() 方法获得。要复习 Stream,可以参考 Javadoc 文档

好消息是,Java 8 API 内建了一些算术操作,如 max、min 和 average。我们可以使用 Stream 的几种基本类型特化形式来访问前面几个方法:IntStream(int 类型元素)、LongStream(long 类型元素)和 DoubleStream(double 类型元素)。例如,可以使用 IntStream.rangeClosed() 创建一系列数,然后使用 max() 和 min() 方法计算 Stream 中的最大元素和最小元素。

回到最初的问题,我们想使用这些操作来计算一个规模较大的人口年龄数据的方差。第一步是从人口年龄数组创建一个 Stream,可以通过 Arrays.stream() 静态方法实现:

复制代码
DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以使用 DoubleStream 所支持的 average() 方法:

复制代码
double average = populationStream.average().orElse(0.0);

下一步是使用 average 计算方差。人口年龄中的每个元素首先需要减去平均值,然后计算差的平方。可以将其视作一个 Map 操作:使用一个 Lambda 表达式 (double p) -> (p - average) * (p - average) 把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以调用 sum() 方法来计算所有结果元素的和了。.

不过别那么着急。Stream 只能消耗一次。如果复用 populationStream,我们会碰到下面这个令人惊讶的错误:

复制代码
java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要使用第二个流来计算方差,如下所示:

复制代码
public static double varianceStreams(double[] population){
double average = Arrays.stream(population).parallel().average().orElse(0.0);
double variance = Arrays.stream(population).parallel()
.map(p -> (p - average) * (p - average))
.sum() / population.length;
return variance;
}

通过使用 Streams API 内建的操作,我们以声明式、而且非常简洁的方式重写了最初的命令式风格代码,而且声明式风格读上去几乎就是方差的数学定义。我们再来研究一下三种实现版本的性能。

基准测试

我们以非常不同的风格编写了三个版本的方差算法。Stream 版本是最简洁的,而且是以声明式风格编写的,它让类库去确定具体的实现,并利用多核基础设施。不过你可能想知道它们的执行效果如何。为找出答案,让我们创建一个基准测试,对比一下三个版本的表现。我们先随机生成 1 到 140 之间的 3000 万个人口年龄数据,然后计算其方差。我们使用 jmh 来研究每个版本的性能。Jmh 是 OpenJDK 支持的一个 Java 套件。读者可以从 GitHub 克隆该项目,自己运行基准测试。

基准测试运行的机器是 Macbook Pro,配备 2.3 GHz 的 4 核 Intel Core i7 处理器,16GB 1600MHz DDR3 内存。此外,我们使用的 JDK 8 版本如下:

复制代码
java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

结果用下面的柱状图说明。命令式版本用了 60 毫秒,Fork/Join 版本用了 22 毫秒,而流版本用了 46 毫秒。

这些数据应该谨慎对待。比如,如果在 32 位 JVM 上运行测试,结果很可能有较大的差别。然而有趣的是,使用 Java 8 中的 Streams API 这种不同的编程风格,为在场景背后执行一些优化打开了一扇门,而这在严格的命令式风格中是不可能的;相对于使用 Fork/Join 框架,这种风格也更为直接。

关于作者

Raoul-Gabriel Urma 20 岁开始在剑桥大学攻读计算机科学博士学位。他的研究主要关注的是编程语言与软件工程。他以一等荣誉生的成绩获得了伦敦帝国理工学院的计算机科学工程硕士学位,并赢得一些技术创新奖项。他曾经为 Google、eBay、Oracle 和 Goldman Sachs 等很多大公司工作过,也参与过不少创业项目。此外,他还经常在 Java 开发者会议上发表讲话,也是一位 Java 课程讲师。他的 Twitter: @raoulUK 网站

Mario Fusco 是 Red Hat 的一位高级软件工程师,主要从事 Drools 核心和 JBoss 规则引擎方面的开发工作。作为 Java 开发者,他有着丰富的经验,参与了从媒体公司到金融部门等行业的很多企业级项目,而且经常作为项目的领导者。他的兴趣包括函数式编程和领域特定语言(Domain Specific Language,DSL)。凭借在这两个方面的激情,他创建了开源类库 lambdaj,意在在 Java 中为操作集合提供一种内部 Java DSL,并支持一点函数式编程。他的 Twitter 是 @mariofusco

查看英文原文: From Imperative Programming to Fork/Join to Parallel Streams in Java 8

2014-03-20 00:148273
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 134.3 次阅读, 收获喜欢 35 次。

关注

评论

发布
暂无评论
发现更多内容

如何实现70%丢包下音视频的高可用-信令篇

ZEGO即构

音视频 弱网 QUIC协议

《计算机网络 PDF》搞起!

苹果看辽宁体育

大前端 后端 计算机网络

2021挚物· AIoT 产业领袖峰会亮点:EMQ 映云科技赋能传统工业

EMQ映云科技

物联网 AIOT 云边一体 边云协同

华为首次发布HarmonyOS职业认证,助力开发者实现职业进阶

科技汇

BTAU比特金盾系统软件开发内容

Pravega Flink connector 的过去、现在和未来

Apache Flink

flink

高能预警!以阿里社招前端面试为例,详讲面对面试官到面试中到面试结束

前端依依

程序员 面试 大前端 阿里 经验分享

Gemini Mining双子矿业系统APP开发模板

捕货拼团软件系统开发详情

喜讯:恒拓高科荣获“2020年度华侨城集团优秀数字化服务商”称号

WorkPlus

开源 解决方案 即时通讯 开源软件

架构训练营 - 模块二 - 作业

姑射仙人

架构训练营

DMDOGEplus钻石狗软件系统开发需求

批量下载gitlab代码

阿呆

#GitLab

优评海洋APP系统开发模板

Polar Network/PN币挖矿APP系统开发搭建

OD万基国际系统软件开发搭建

WorkPlus综合企业数字化解决方案—华侨城

WorkPlus

企业 移动开 开源软件

膜拜!阿里内部都在强力进阶学习springboot实战派文档

Java spring 程序员 架构 面试

差点跳起来了!阿里首推22w字Java面试复盘宝典成功助我入职美团

白亦杨

Java 编程 程序员

U评海洋软件系统开发搭建

乐活星球系统APP开发简介

名列GitHub必看榜!腾讯架构师纯手敲Spring Boot高级进阶笔记

Java架构追梦

Java 架构 腾讯 面试 springboot

国内首发!阿里高工手码分布式系统速成笔记!

Java 编程 程序员

PHA挖矿|PHA云算力挖矿系统开发案例

Geek_23f0c3

区块链 云算力挖矿系统开发详解 PHA矿机挖矿

云原生数据库的幕后英雄—浅谈分布式数据库的计算和存储分离

速拼商城APP系统开发介绍

袋鼠云:基于Flink构建实时计算平台的总体架构和关键技术点

Apache Flink

flink

ONE红地球/ONE Network系统APP开发费用

WorkPlus高端制造业数字化解决方案—中集集团

WorkPlus

企业 即时通讯 协同办公 开源软件

全球对话式AI平台评估报告出炉 Gartner:百度位居领先阵营

百度大脑

人工智能 智能客服

IPFS矿机多少钱1T?IPFS矿机多少钱一台?

分布式存储 IPFS fil fil矿机 ipfs矿机

从命令式编程到Fork/Join再到Java 8中的并行Streams_Java_Raoul-Gabriel Urma_InfoQ精选文章