写点什么

从命令式编程到 Fork/Join 再到 Java 8 中的并行 Streams

  • 2014-03-20
  • 本文字数:4465 字

    阅读完需:约 15 分钟

Java 8 带来了很多可以使编码更简洁的特性。例如,像下面的代码:

复制代码
Collections.sort(transactions, new Comparator<Transaction>(){
public int compare(Transaction t1, Transaction t2){
return t1.getValue().compareTo(t2.getValue());
}
});

可以用替换为如下更为紧凑的代码,功能相同,但是读上去与问题语句本身更接近了:

复制代码
transactions.sort(comparing(Transaction::getValue));

Java 8 引入的主要特性是 Lambda 表达式、方法引用和新的 Streams API。它被认为是自 20 年前 Java 诞生以来语言方面变化最大的版本。要想通过详细且实际的例子来了解如何从这些特性中获益,可以参考本文作者和 Alan Mycroft 共同编写的《 Java 8 in Action: Lambdas, Streams and Functional-style programming 》一书。

这些特性支持程序员编写更简洁的代码,还使他们能够受益于多核架构。实际上,编写可以优雅地并行执行的程序还是 Java 专家们的特权。然而,借助新的 Streams API,Java 8 改变了这种状况,让每个人都能够更容易地编写利用多核架构的代码。

在这篇文章中,我们将使用以下三种风格,以不同方法计算一个大数据集的方差,并加以对比。

  1. 命令式风格
  2. Fork/Join 框架
  3. Streams API

方差是统计学中的概念,用于度量一组数的偏离程度。方差可以通过对每个数据与平均值之差的平方和求平均值来计算。例如,给定一组表示人口年龄的数:40、30、50 和 80,我们可以这样计算方差:

  1. 计算平均值:(40 + 30 + 50 + 80) / 4 = 50
  2. 计算每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. 最后平均:1400/4 = 350

命令式风格

下面是计算方差的一种典型的命令式风格实现:

复制代码
public static double varianceImperative(double[] population){
double average = 0.0;
for(double p: population){
average += p;
}
average /= population.length;
double variance = 0.0;
for(double p: population){
variance += (p - average) * (p - average);
}
return variance/population.length;
}

为什么说这是命令式的呢?我们的实现用 _ 修改状态的语句序列 _ 描述了计算过程。这里,我们显式地对人口年龄数组中的每个元素进行迭代,而且每次迭代时更新 average 和 variance 这两个局部变量。这种代码很适合只有一个 CPU 的硬件架构。确实,它可以非常直接地映射到 CPU 的指令集。

Fork/Join 框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该使用线程吗?这些线程是不是要在某个点上同步?Java 7 引入的 Fork/Join 框架缓解了一些困难,所以让我们使用该框架来开发方差算法的一个并行版本吧。

复制代码
public class ForkJoinCalculator extends RecursiveTask<Double> {
public static final long THRESHOLD = 1_000_000;
private final SequentialCalculator sequentialCalculator;
private final double[] numbers;
private final int start;
private final int end;
public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
this(numbers, 0, numbers.length, sequentialCalculator);
}
private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator
sequentialCalculator) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.sequentialCalculator = sequentialCalculator;
}
@Override
protected Double compute() {
int length = end - start;
if (length <= THRESHOLD) {
return sequentialCalculator.computeSequentially(numbers, start, end);
}
ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2,
sequentialCalculator);
leftTask.fork();
ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end,
sequentialCalculator);
Double rightResult = rightTask.compute();
Double leftResult = leftTask.join();
return leftResult + rightResult;
}
}

这里我们编写了一个 _RecursiveTask_ 类的子类,它对一个 double 数组进行切分,当子数组的长度小于等于给定阈值(THRESHOLD)时停止切分。切分完成后,对子数组进行顺序处理,并将下列接口定义的操作应用于子数组。

复制代码
public interface SequentialCalculator {
double computeSequentially(double[] numbers, int start, int end);
}

利用该基础设施,可以按如下方式并行计算方差。

复制代码
public static double varianceForkJoin(double[] population){
final ForkJoinPool forkJoinPool = new ForkJoinPool();
double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double total = 0;
for (int i = start; i < end; i++) {
total += numbers[i];
}
return total;
}
}));
final double average = total / population.length;
double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
@Override
public double computeSequentially(double[] numbers, int start, int end) {
double variance = 0;
for (int i = start; i < end; i++) {
variance += (numbers[i] - average) * (numbers[i] - average);
}
return variance;
}
}));
return variance / population.length;
}

本质上,即便使用 Fork/Join 框架,相对于顺序版本,并行版本的编写和最后的调试仍然困难许多。

并行 Streams

Java 8 让我们可以以不同的方式解决这个问题。不同于编写代码指出计算如何实现,我们可以使用 Streams API 粗线条地描述让它做什么。作为结果,库能够知道如何为我们实现计算,并施以各种各样的优化。这种风格被称为 _ 声明式编程 _。Java 8 有一个为利用多核架构而专门设计的并行 Stream。我们来看一下如何使用它们来更快地计算方差。

假定读者对本节探讨的 Stream 有些了解。作为复习,Stream是 T 类型元素的一个序列,支持聚合操作。我们可以使用这些操作来创建表示计算的一个管道(pipeline)。这里的管道和 UNIX 的命令管道一样。并行 Stream 就是一个可以并行执行管道的 Stream,可以通过在普通的 Stream 上调用 parallel() 方法获得。要复习 Stream,可以参考 Javadoc 文档

好消息是,Java 8 API 内建了一些算术操作,如 max、min 和 average。我们可以使用 Stream 的几种基本类型特化形式来访问前面几个方法:IntStream(int 类型元素)、LongStream(long 类型元素)和 DoubleStream(double 类型元素)。例如,可以使用 IntStream.rangeClosed() 创建一系列数,然后使用 max() 和 min() 方法计算 Stream 中的最大元素和最小元素。

回到最初的问题,我们想使用这些操作来计算一个规模较大的人口年龄数据的方差。第一步是从人口年龄数组创建一个 Stream,可以通过 Arrays.stream() 静态方法实现:

复制代码
DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以使用 DoubleStream 所支持的 average() 方法:

复制代码
double average = populationStream.average().orElse(0.0);

下一步是使用 average 计算方差。人口年龄中的每个元素首先需要减去平均值,然后计算差的平方。可以将其视作一个 Map 操作:使用一个 Lambda 表达式 (double p) -> (p - average) * (p - average) 把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以调用 sum() 方法来计算所有结果元素的和了。.

不过别那么着急。Stream 只能消耗一次。如果复用 populationStream,我们会碰到下面这个令人惊讶的错误:

复制代码
java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要使用第二个流来计算方差,如下所示:

复制代码
public static double varianceStreams(double[] population){
double average = Arrays.stream(population).parallel().average().orElse(0.0);
double variance = Arrays.stream(population).parallel()
.map(p -> (p - average) * (p - average))
.sum() / population.length;
return variance;
}

通过使用 Streams API 内建的操作,我们以声明式、而且非常简洁的方式重写了最初的命令式风格代码,而且声明式风格读上去几乎就是方差的数学定义。我们再来研究一下三种实现版本的性能。

基准测试

我们以非常不同的风格编写了三个版本的方差算法。Stream 版本是最简洁的,而且是以声明式风格编写的,它让类库去确定具体的实现,并利用多核基础设施。不过你可能想知道它们的执行效果如何。为找出答案,让我们创建一个基准测试,对比一下三个版本的表现。我们先随机生成 1 到 140 之间的 3000 万个人口年龄数据,然后计算其方差。我们使用 jmh 来研究每个版本的性能。Jmh 是 OpenJDK 支持的一个 Java 套件。读者可以从 GitHub 克隆该项目,自己运行基准测试。

基准测试运行的机器是 Macbook Pro,配备 2.3 GHz 的 4 核 Intel Core i7 处理器,16GB 1600MHz DDR3 内存。此外,我们使用的 JDK 8 版本如下:

复制代码
java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

结果用下面的柱状图说明。命令式版本用了 60 毫秒,Fork/Join 版本用了 22 毫秒,而流版本用了 46 毫秒。

这些数据应该谨慎对待。比如,如果在 32 位 JVM 上运行测试,结果很可能有较大的差别。然而有趣的是,使用 Java 8 中的 Streams API 这种不同的编程风格,为在场景背后执行一些优化打开了一扇门,而这在严格的命令式风格中是不可能的;相对于使用 Fork/Join 框架,这种风格也更为直接。

关于作者

Raoul-Gabriel Urma 20 岁开始在剑桥大学攻读计算机科学博士学位。他的研究主要关注的是编程语言与软件工程。他以一等荣誉生的成绩获得了伦敦帝国理工学院的计算机科学工程硕士学位,并赢得一些技术创新奖项。他曾经为 Google、eBay、Oracle 和 Goldman Sachs 等很多大公司工作过,也参与过不少创业项目。此外,他还经常在 Java 开发者会议上发表讲话,也是一位 Java 课程讲师。他的 Twitter: @raoulUK 网站

Mario Fusco 是 Red Hat 的一位高级软件工程师,主要从事 Drools 核心和 JBoss 规则引擎方面的开发工作。作为 Java 开发者,他有着丰富的经验,参与了从媒体公司到金融部门等行业的很多企业级项目,而且经常作为项目的领导者。他的兴趣包括函数式编程和领域特定语言(Domain Specific Language,DSL)。凭借在这两个方面的激情,他创建了开源类库 lambdaj,意在在 Java 中为操作集合提供一种内部 Java DSL,并支持一点函数式编程。他的 Twitter 是 @mariofusco

查看英文原文: From Imperative Programming to Fork/Join to Parallel Streams in Java 8

2014-03-20 00:148394
用户头像
臧秀涛 略懂技术的运营同学。

发布了 300 篇内容, 共 137.7 次阅读, 收获喜欢 35 次。

关注

评论

发布
暂无评论
发现更多内容

读《非暴力沟通》

箭上有毒

读书笔记 4月日更

Redis 最后一课

escray

redis 学习 极客时间 Redis 核心技术与实战 4月日更

微服务网关:Spring Cloud Gateway —— Zuul

程序员架构进阶

微服务 网关 28天写作 4月日更

一篇文章告诉你什么是EGG Network(阿凡提)以及什么是EFTalk

币圈那点事

NA(Nirvana)Chain“以应用而生”如何强势突围

区块链第一资讯

Dubbo 学习笔记(一) Hello,Dubbo

U2647

dubbo 4月日更

Go1.16 中模块的新变化

Rayjun

Go 语言

线上PHP服务故障排查之路

风翱

PHP-FPM 线上事故 4月日更

《几何代数计算入门(计算机视觉)》

计算机与AI

计算机视觉 计算机图形学

用OpenCV制作庆祝武汉重启一周年短视频

老猿Python

Python OpenCV 音视频 图形图像处理 引航计划

redis Redis缓存穿透解决方案

Sakura

4月日更

浅论结构体与联合体

Integer

c

CI/CD之基于Jenkins的发布平台实践

小江

DevOps jenkins CI/CD 发布流程

使用Composition API在Vue3中创建防抖搜索输入框

devpoint

vite Vue3 防抖

Boss直聘转发超120W次Java全栈面试题!已帮我拿下5个Offer!

Java架构追梦

Java 面试 架构师 阿里巴巴面经总结

如何引入TDD实践

顿晓

TDD 4月日更

和老大的相爱相杀中,让我终于搞懂了函数式接口

麦洛

Java 函数式接口 Lambda java8

DEX领域第一个运用整合思维的DeFi协议 SumSwwap潜力巨大

币圈资讯

那束漂亮的手捧花

小天同学

爱情 4月日更 幸福 传递

使用FFmpeg开发的那些事

Bob

音视频 ffmpeg 开源文化

区块链技术驱动商业银行开展供应链金融业务的创新路径

CECBC

如何从Telegram下载一整套可爱的猫猫表情包?

彭宏豪95

GitHub 效率 社交 4月日更

翻译:《实用的Python编程》08_01_Testing

codists

Python

深圳龙华携手腾讯云 加快推进区块链先行试验区建设

CECBC

Linux mkdir 命令

一个大红包

4月日更

MapReduce优化

大数据技术指南

hadoop 4月日更

聊聊云厂商的指标监控组件

耳东@Erdong

Prometheus 4月日更 #Grafana

spring的IOC使用以及原理

邱学喆

spring ioc 对象创建 属性注入

「开源免费」基于Vue和Quasar的前端SPA项目crudapi后台管理系统实战之动态表单设计器(五)

crudapi

Vue crud 动态表单 quasar cruapi

你看起来很美味?独家揭露视频推荐系统AI秘方

白洞计划

OpenHarmony 1.1.0 LTS 版本正式发布

开放原子开源基金会

开源 开放原子开源基金会 OpenHarmony

从命令式编程到Fork/Join再到Java 8中的并行Streams_Java_Raoul-Gabriel Urma_InfoQ精选文章