免费下载案例集|20+数字化领先企业人才培养实践经验 了解详情
写点什么

从命令式编程到 Fork/Join 再到 Java 8 中的并行 Streams

  • 2014-03-20
  • 本文字数:4465 字

    阅读完需:约 15 分钟

Java 8 带来了很多可以使编码更简洁的特性。例如,像下面的代码:

复制代码
Collections.sort(transactions, new Comparator<Transaction>(){
public int compare(Transaction t1, Transaction t2){
return t1.getValue().compareTo(t2.getValue());
}
});

可以用替换为如下更为紧凑的代码,功能相同,但是读上去与问题语句本身更接近了:

复制代码
transactions.sort(comparing(Transaction::getValue));

Java 8 引入的主要特性是 Lambda 表达式、方法引用和新的 Streams API。它被认为是自 20 年前 Java 诞生以来语言方面变化最大的版本。要想通过详细且实际的例子来了解如何从这些特性中获益,可以参考本文作者和 Alan Mycroft 共同编写的《 Java 8 in Action: Lambdas, Streams and Functional-style programming 》一书。

这些特性支持程序员编写更简洁的代码,还使他们能够受益于多核架构。实际上,编写可以优雅地并行执行的程序还是 Java 专家们的特权。然而,借助新的 Streams API,Java 8 改变了这种状况,让每个人都能够更容易地编写利用多核架构的代码。

在这篇文章中,我们将使用以下三种风格,以不同方法计算一个大数据集的方差,并加以对比。

  1. 命令式风格
  2. Fork/Join 框架
  3. Streams API

方差是统计学中的概念,用于度量一组数的偏离程度。方差可以通过对每个数据与平均值之差的平方和求平均值来计算。例如,给定一组表示人口年龄的数:40、30、50 和 80,我们可以这样计算方差:

  1. 计算平均值:(40 + 30 + 50 + 80) / 4 = 50
  2. 计算每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. 最后平均:1400/4 = 350

命令式风格

下面是计算方差的一种典型的命令式风格实现:

复制代码
public static double varianceImperative(double[] population){
double average = 0.0;
for(double p: population){
average += p;
}
average /= population.length;
double variance = 0.0;
for(double p: population){
variance += (p - average) * (p - average);
}
return variance/population.length;
}

为什么说这是命令式的呢?我们的实现用 _ 修改状态的语句序列 _ 描述了计算过程。这里,我们显式地对人口年龄数组中的每个元素进行迭代,而且每次迭代时更新 average 和 variance 这两个局部变量。这种代码很适合只有一个 CPU 的硬件架构。确实,它可以非常直接地映射到 CPU 的指令集。

Fork/Join 框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该使用线程吗?这些线程是不是要在某个点上同步?Java 7 引入的 Fork/Join 框架缓解了一些困难,所以让我们使用该框架来开发方差算法的一个并行版本吧。

复制代码
public class ForkJoinCalculator extends RecursiveTask<Double> {
public static final long THRESHOLD = 1_000_000;
private final SequentialCalculator sequentialCalculator;
private final double[] numbers;
private final int start;
private final int end;
public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
this(numbers, 0, numbers.length, sequentialCalculator);
}
private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator
sequentialCalculator) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.sequentialCalculator = sequentialCalculator;
}
@Override
protected Double compute() {
int length = end - start;
if (length <= THRESHOLD) {
return sequentialCalculator.computeSequentially(numbers, start, end);
}
ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2,
sequentialCalculator);
leftTask.fork();
ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end,
sequentialCalculator);
Double rightResult = rightTask.compute();
Double leftResult = leftTask.join();
return leftResult + rightResult;
}
}

这里我们编写了一个 _RecursiveTask_ 类的子类,它对一个 double 数组进行切分,当子数组的长度小于等于给定阈值(THRESHOLD)时停止切分。切分完成后,对子数组进行顺序处理,并将下列接口定义的操作应用于子数组。

复制代码
public interface SequentialCalculator {
double computeSequentially(double[] numbers, int start, int end);
}

利用该基础设施,可以按如下方式并行计算方差。

复制代码
public static double varianceForkJoin(double[] population){
final ForkJoinPool forkJoinPool = new ForkJoinPool();
double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double total = 0;
for (int i = start; i < end; i++) {
total += numbers[i];
}
return total;
}
}));
final double average = total / population.length;
double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double variance = 0;
for (int i = start; i < end; i++) {
variance += (numbers[i] - average) * (numbers[i] - average);
}
return variance;
}
}));
return variance / population.length;
}

本质上,即便使用 Fork/Join 框架,相对于顺序版本,并行版本的编写和最后的调试仍然困难许多。

并行 Streams

Java 8 让我们可以以不同的方式解决这个问题。不同于编写代码指出计算如何实现,我们可以使用 Streams API 粗线条地描述让它做什么。作为结果,库能够知道如何为我们实现计算,并施以各种各样的优化。这种风格被称为 _ 声明式编程 _。Java 8 有一个为利用多核架构而专门设计的并行 Stream。我们来看一下如何使用它们来更快地计算方差。

假定读者对本节探讨的 Stream 有些了解。作为复习,Stream是 T 类型元素的一个序列,支持聚合操作。我们可以使用这些操作来创建表示计算的一个管道(pipeline)。这里的管道和 UNIX 的命令管道一样。并行 Stream 就是一个可以并行执行管道的 Stream,可以通过在普通的 Stream 上调用 parallel() 方法获得。要复习 Stream,可以参考 Javadoc 文档

好消息是,Java 8 API 内建了一些算术操作,如 max、min 和 average。我们可以使用 Stream 的几种基本类型特化形式来访问前面几个方法:IntStream(int 类型元素)、LongStream(long 类型元素)和 DoubleStream(double 类型元素)。例如,可以使用 IntStream.rangeClosed() 创建一系列数,然后使用 max() 和 min() 方法计算 Stream 中的最大元素和最小元素。

回到最初的问题,我们想使用这些操作来计算一个规模较大的人口年龄数据的方差。第一步是从人口年龄数组创建一个 Stream,可以通过 Arrays.stream() 静态方法实现:

复制代码
DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以使用 DoubleStream 所支持的 average() 方法:

复制代码
double average = populationStream.average().orElse(0.0);

下一步是使用 average 计算方差。人口年龄中的每个元素首先需要减去平均值,然后计算差的平方。可以将其视作一个 Map 操作:使用一个 Lambda 表达式 (double p) -> (p - average) * (p - average) 把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以调用 sum() 方法来计算所有结果元素的和了。.

不过别那么着急。Stream 只能消耗一次。如果复用 populationStream,我们会碰到下面这个令人惊讶的错误:

复制代码
java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要使用第二个流来计算方差,如下所示:

复制代码
public static double varianceStreams(double[] population){
double average = Arrays.stream(population).parallel().average().orElse(0.0);
double variance = Arrays.stream(population).parallel()
.map(p -> (p - average) * (p - average))
.sum() / population.length;
return variance;
}

通过使用 Streams API 内建的操作,我们以声明式、而且非常简洁的方式重写了最初的命令式风格代码,而且声明式风格读上去几乎就是方差的数学定义。我们再来研究一下三种实现版本的性能。

基准测试

我们以非常不同的风格编写了三个版本的方差算法。Stream 版本是最简洁的,而且是以声明式风格编写的,它让类库去确定具体的实现,并利用多核基础设施。不过你可能想知道它们的执行效果如何。为找出答案,让我们创建一个基准测试,对比一下三个版本的表现。我们先随机生成 1 到 140 之间的 3000 万个人口年龄数据,然后计算其方差。我们使用 jmh 来研究每个版本的性能。Jmh 是 OpenJDK 支持的一个 Java 套件。读者可以从 GitHub 克隆该项目,自己运行基准测试。

基准测试运行的机器是 Macbook Pro,配备 2.3 GHz 的 4 核 Intel Core i7 处理器,16GB 1600MHz DDR3 内存。此外,我们使用的 JDK 8 版本如下:

复制代码
java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

结果用下面的柱状图说明。命令式版本用了 60 毫秒,Fork/Join 版本用了 22 毫秒,而流版本用了 46 毫秒。

这些数据应该谨慎对待。比如,如果在 32 位 JVM 上运行测试,结果很可能有较大的差别。然而有趣的是,使用 Java 8 中的 Streams API 这种不同的编程风格,为在场景背后执行一些优化打开了一扇门,而这在严格的命令式风格中是不可能的;相对于使用 Fork/Join 框架,这种风格也更为直接。

关于作者

Raoul-Gabriel Urma 20 岁开始在剑桥大学攻读计算机科学博士学位。他的研究主要关注的是编程语言与软件工程。他以一等荣誉生的成绩获得了伦敦帝国理工学院的计算机科学工程硕士学位,并赢得一些技术创新奖项。他曾经为 Google、eBay、Oracle 和 Goldman Sachs 等很多大公司工作过,也参与过不少创业项目。此外,他还经常在 Java 开发者会议上发表讲话,也是一位 Java 课程讲师。他的 Twitter: @raoulUK 网站

Mario Fusco 是 Red Hat 的一位高级软件工程师,主要从事 Drools 核心和 JBoss 规则引擎方面的开发工作。作为 Java 开发者,他有着丰富的经验,参与了从媒体公司到金融部门等行业的很多企业级项目,而且经常作为项目的领导者。他的兴趣包括函数式编程和领域特定语言(Domain Specific Language,DSL)。凭借在这两个方面的激情,他创建了开源类库 lambdaj,意在在 Java 中为操作集合提供一种内部 Java DSL,并支持一点函数式编程。他的 Twitter 是 @mariofusco

查看英文原文: From Imperative Programming to Fork/Join to Parallel Streams in Java 8

2014-03-20 00:148261
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 133.9 次阅读, 收获喜欢 35 次。

关注

评论

发布
暂无评论
发现更多内容

DOM(文档对象模型):理解网页结构与内容操作的关键技术

小万哥

xml 程序人生 编程语言 软件工程 前端开发

一文搞懂 Kafka consumer 与 broker 交互机制与原理

AutoMQ

大数据 kafka 云原生 broker AutoMQ

双料荣誉!微帧科技荣获「实力先锋企业」&「数智出海服务企业奖」

微帧Visionular

010 Editor for Mac(最好用的十六进制编辑器)v14.0激活版

iMac小白

实例操作教你爬取京东的商品数据

技术冰糖葫芦

Redis 开源协议变更背后:开源软件与云计算巨头的竞争博弈

AutoMQ

redis 大数据 云原生 AutoMQ BSL

回南天、沙尘天轮番来袭?华为天气这份每日早报请及时查收!

最新动态

第十一届网络视听大会:鸿蒙激活产业创新,以系统级AI技术赋能体验升级

最新动态

HN 热帖|替换 Redis 的一场赛跑

小猿姐

数据库 redis 开源协议

解决centos离线安装cmake找不到OpenSSL问题

百度搜索:蓝易云

Linux centos 运维 openssl cmake

JavaScript代码安全性提升:选择和使用JS混淆工具的指南

Macos思维导图工具:XMind for Mac v24.01中文版

iMac小白

Mac专业级音频制作工具:Logic Pro X for Mac中文版 支持M1

iMac小白

60万年薪失业后,我对"成长、价值、时间"的理解

Jackchang234987

职场发展 成长性 成长感悟

containerd快速安装指南🚀

GousterCloud

Docker cRI Containerd

低代码与系统集成:革新企业应用开发的新动力

不在线第一只蜗牛

低代码 系统集成 项目开发

五款常用在线JavaScript加密混淆工具详解:jscrambler、JShaman、jsfack、ipaguard和jjencode

雪奈椰子

Macos数据库管理工具:Valentina Studio Pro for Mac激活版 支持M1

iMac小白

低代码助力企业打造业务管理云平台

快乐非自愿限量之名

低代码 业务管理

AI时代来临我们要如何面对?

高端章鱼哥

实时渲染是什么意思?实时渲染和离线渲染的区别

3DCAT实时渲染

实时渲染

查看服务器/IIS日志、log、访问信息基本方法

百度搜索:蓝易云

云计算 运维 服务器 IIS 云服务器

Docker技术全景:推动云原生架构的关键力量

快乐非自愿限量之名

Docker 容器 云原生

Timecho 联合组织 Apache PLC4X 社区 Meetup:探索工业物联网的未来

Apache IoTDB

你知道APP主要关心MobPush消息推送的哪些能力吗?

MobTech袤博科技

ios 开发者 智能推送

低代码与数智化OA:重塑企业办公新生态

EquatorCoco

低代码 数智化 OA

慢工之旅:婺源的故事

明道云

Mysql/etc/my.cnf参数详解

百度搜索:蓝易云

MySQL Linux 运维 云服务器 /etc/my.cnf

【Ubuntu20.04】安装gcc11 g++11, Ubuntu18.04

百度搜索:蓝易云

Linux ubuntu 运维 云服务器 gcc11

使用git克隆仓库报错:Warning: Permanently added‘github.com’ to the .....(ssh )

百度搜索:蓝易云

云计算 Linux 运维 云服务器 ECS

在iPhone / iPad上轻松模拟GPS位置:AnyGo for Mac 支持M1

iMac小白

从命令式编程到Fork/Join再到Java 8中的并行Streams_Java_Raoul-Gabriel Urma_InfoQ精选文章