写点什么

RxJava2 实例解析

2017 年 2 月 27 日

本文要点

* 响应式编程是一种处理异步数据流的规范

* 响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持

* 弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构

* 响应式编程风格看起来跟 Java Streams API 有点相似,不过本质上是不一样的

* 如何连接到动态流处理异步数据源

译者注:关注 InfoQ 资讯的读者可能已经留意到,我们前面给出了一篇很好的 RxJava 文章“ RxJava 实例解析”。本文秉承同一风格,用相同的例子循序渐进地介绍了 RxJava2。译者将两篇文章中的不同之处用粗体标识出来,以供读过前篇的读者快速浏览本文。

在高并发编程范式的发展过程中,我们使用过很多工具,比如 java.util.concurrent 包、Akka Streams 框架、CompletableFuture 类以及 Netty 框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。

但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。

在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。

首先要记住的是,响应式里所有的东西都是流。Observable 封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。

在运行我们给出的例子之前,需要把 RxJava 的依赖加入到项目里。可以在 Maven 里加入这个依赖:

复制代码
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.5</version>
</dependency>

Observable 类有几个静态工厂方法和实例方法,它们被用来生成各种新的 Observable 对象,或者把 Observable 对象添加到感兴趣的处理流程里。Observable 是可变的,所以针对它们的操作总是会生成新的 Observable 对象。为了更好地理解我们的例子,我们先来温习一下 Observable 的基本操作,因为在后面的例子里会用到它们。

Observable.just 方法生成一个简单对象,然后返回。例如:

复制代码
Observable.just("Howdy!")

这行代码生成一个新的 Observable 对象,在结束之前触发一个单独的事件,生成字符串“Howdy!”。

可以把新生成的 Observable 对象赋给一个 Observable 变量:

复制代码
Observable<String> hello = Observable.just("Howdy!");

不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个 Observable 对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在 Java 支持 Lambda 表达式,我们就可以使用简洁的声明式风格来表示订阅操作:

复制代码
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);

这段代码仍然会生成字符串“Howdy!”。

跟 Observable 的其它方法一样,just 方法可以被重载:

复制代码
Observable.just("Hello", "World")
.subscribe(System.out::println);

这行代码会输出

复制代码
Hello
World

just 方法可以被重载,最多可以接收 10 个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。

让我们来看看如果使用列表会发生什么情况:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
Observable.just(words)
.subscribe(System.out::println);

这段代码输出一个很平常的结果:

复制代码
[the, quick, brown, fox, jumped, over, the, lazy, dog]

我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入fromIterable方法:

复制代码
Observable.fromIterable(words)
.subscribe(System.out::println);

(注意在 rxjava1 中,有一个重载过的 from 方法。该方法已被多种 from 方法的具体实现所替代,包括 fromIterable 和 fromArray 等。)

这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。

执行这行代码会得到我们想要的多行输出:

复制代码
the
quick
brown
fox
jumped
over
the
lazy
dog

为了能从中获取编号,我们要在 Observable 上多做一些工作。

不过在写代码之前,我们先来看看另外两个操作,range 和 zip。range(i,n) 会创建一个包含 n 个数的流,它的第一个数是从 i 开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。

复制代码
Observable.range(1, 5).subscribe(System.out::println);

输出:

复制代码
1
2
3
4
5

如果有一种将区间流与我们的单词流合并的方法,就会解决添加编号的问题。

RX Marbles 这个网站对我们学习响应式编程很有帮助。这个网站使用 JavaScript 渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。

执行一个 zip 操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:

zip 操作通过成对的“zip”映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用 Lambda 表达式来表示。只要其中的一个流完成操作,整个 zip 操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip 可以支持最多 9 个源流的 zip 操作。zipWith 操作可以把一个指定流合并到一个已存在的流里。

现在回到我们的例子上,我们可以使用 range 和 zipWith 操作加入编号,并用 String.format 做映射转换:

复制代码
Observable.fromIterable(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. the
2. quick
3. brown
4. fox
5. jumped
6. over
7. the
8. lazy
9. dog

看起来很不错!注意一旦任何一个流的操作完成,zip 和 zipWith 操作就停止对所有流的拉取。这就是为什么不受 Integer.MAX_VALUE 上限限制的原因。

现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到 flatMap,flatMap 会从 Observable 里获取事件源(对象、集合或数组),并把这些元素分别映射成 Observable,然后把这些 Observable 扁平化成一个单独的 Observable。

对于我们的例子来说,我们会先用 split 方法把每个单词拆分成一个字母数组,然后用 flatMap 创建一个新的 Observable 对象,这个 Observable 对象包含了组成这些单词的所有字母:

复制代码
Observable.fromIterable(words)
.flatMap(word -> Observable.fromArray(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
...
30. l
31. a
32. z
33. y
34. d
35. o
36. g

所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:

复制代码
Observable.fromIterable(words)
.flatMap(word -> Observable.fromArray(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
9. b
10. r
11. o
12. w
13. n
14. f
15. x
16. j
17. m
18. p
19. d
20. v
21. l
22. a
23. z
24. y
25. g

我们从小被告知“quick brown fox”这个全字母短句包含了英语里所有的字母,不过在这里我们只看到 25 个,而不是 26 个。现在让我们对这些字母进行排序,找出丢失的那个字母:

复制代码
.flatMap(word -> Observable.fromIterable(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. a
2. b
3. c
...
17. q
18. r
19. t
20. u
21. v
22. w
23. x
24. y
25. z

看样子是字母“s”丢掉了。为了得到我们期望的结果,需要对数组做一点修改:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);
Observable.fromIterable(words)
.flatMap(word -> Observable.fromArray(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

修改后的代码输出为:

复制代码
1. a
2. b
3. c
4. d
5. e
6. f
7. g
8. h
9. i
10. j
11. k
12. l
13. m
14. n
15. o
16. p
17. q
18. r
19. s
20. t
21. u
22. v
23. w
24. x
25. y
26. z

现在好了!

但是到目前为止,所有的代码都跟 Java 8 里引入的 Streams API 很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。

Java Streams 和 Lambda 表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管 Stream 的 parallel 操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到“永不停止”的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。

让我们通过弹珠交互图更好地理解这些概念。

merge 操作可以把最多 9 个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被“扁平化”到一个单独的线程里,包括异常事件和结束事件。

debounce 操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:

可以看到,上下两个图中的“1”之间有一个指定的时间间隔,而 2、3、4、5 之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:

另一个有趣的操作是 amb,它是一种不确定性的操作。对应的数组形式操作是 ambArray。

amb 操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以 amb 操作选择了这个流。

如果把第一个流里的“20”往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:

如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是 Bloomberg 或者 Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb 操作就会很有用。

Tick Tock

现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省 CPU,我们让它在周末时每三秒生成一次。我们使用混合型的“节奏器”按照一定的节奏生成数据。

首先,我们要创建一个返回 boolean 的方法,它会检查当前时间是否是周末,如果是就返回 true,否则就返回 false:

复制代码
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}

对于边读这篇文章边在 IDE 里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个 15 秒钟内返回 true,在另一个 15 秒钟内返回 false:

复制代码
private static long start = System.currentTimeMillis();
public static Boolean isSlowTickTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}

接下来我们创建两个 Observable 对象,fast 和 slow,然后使用过滤器对它们进行调度,并把它们合并起来。

我们使用 Observable.interval 操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从 0 开始计算)。

复制代码
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast 每秒生成一个事件,slow 每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。

现在我们把这两个 Observable 合并到一起,通过使用过滤器让 fast 流在工作日生成数据(或者在 15 秒内),slow 流在周末生成数据(或者在另一个 15 秒内)。

复制代码
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);

最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。

clock.subscribe(tick-> System.out.println(new Date()));

为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理 InterruptedException 异常)。

复制代码
Thread.sleep(60_000)

运行代码的结果:

复制代码
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .

可以看到,前面 15 个事件之间的时间间隔都是 1 秒,后面 15 秒内的事件之间的时间间隔是 3 秒,就像我们所期望的那样。

连接到已存在的数据源

以上方法用于创建能够生成静态数据的 Observable 是没有问题的。但如何把 Observable 连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?

在继续介绍之前,我们应该认识一下 RxJava2 新引入的一些类。

静态 Observable、动态 Observable 和 Flowable

在 RxJava 的前期版本中,即使对于无需流控制的小型流,Observable 也给出了流控制方法。为符合响应式的规范,RxJava2 将流控制从 Observable 类中移除,并引入了新的 Flowable 类。Flowable 可以看作是提供流控制的 Observable。

到目前为止我们讨论的都是静态 Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远 不够。静态 Observable 只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态 Observable 不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态 Observable 转化成动态 Observable:

  1. 调用 Observable 的 publish 方法,生成一个新的 ConnectableObservable
  2. 调用 ConnectableObservable 的 connect 方法,开始生成数据

这种方式也能工作,但并不支持任何流控制。要连接到长期运行的现有数据源上,除非是提供背压控制,我们通常会选择使用 Flowable,使用一种 Observable 的并行语法。

1a. 调用 Flowable 的 publish 方法生成一个新的 ConnectableFlowable

2a. 调用 ConnectableFlowable 的 connect 方法开始生成数据

要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的 onNext 方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由 Flowabled 的 create 方法来处理。假设我们有一个叫做 SomeFeed 的数据服务,它会生成报价事件,同时有一个 SomeListener 监听这些报价事件以及其它生命周期事件。在 GitHub 上已经有一个实现,如果你想自己动手运行这些代码,可以去下载。

我们的数据源监听器有两个方法:

复制代码
public void priceTick(PriceTick event);
public void error(Throwable throwable);

PriceTick 类包含了 date、instrument 和 price 字段,还有一个 isLast 方法用来判断它是否是最后一个事件:

让我们来看看如何使用 AsyncEmitter 把 Observable 连接到一个实时的数据源上:

复制代码
SomeFeed<PriceTick> feed = new SomeFeed<>();
Flowable<PriceTick> flowable = Flowable.create(emitter -> {
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onComplete();
}
}
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println);

这段代码几乎是逐字逐句地从Flowable 类的 Javadoc 里摘抄出来的。Flowable封装了监听器(第 3 行)的创建过程,并把它注册到数据源上(第 17 行)。Flowable直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了 listener(第 6 行)。第 18 行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:

BackpressureMode.MISSING不使用回压。如果流的速度无法保持同步,可能会抛出 MissingBackpressureException 或 IllegalStateException。

BackpressureStrategy.ERROR会在下游跟不上速度时抛出 MissingBackpressureException。

BackpressureStrategy.DROP会在下游跟不上速度时把 onNext 的值丢弃。

BackpressureStrategy.LATEST会一直保留最新的 onNext 的值,直到被下游消费掉。

这样生成的是静态 Flowable。静态 Observable 在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。

为了把它转化成动态 Observable,让所有订阅者可以实时地接收事件通知,我们必须调用 publish 和 connect 方法,就像之前提到的那样:

复制代码
ConnectableFlowable<PriceTick> hotObservable = flowable.publish();
hotObservable.connect();

最后,我们可以对它进行订阅并显示报价:

复制代码
hotObservable.subscribe((priceTick) ->
System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
priceTick.getInstrument(), priceTick.getPrice()));

关于本文作者

Victor Grazi是 InfoQ 的 Java 版块主编。在 2012 年成为 Java 冠军程序员(Oracle Java Champion)之后,Victor 就在 Nomura Securities 从事核心平台工具相关的工作,同时也担任技术咨询和 Java 布道师。他还经常在技术会议上做演讲。Victor 还负责着名为“Java Concurrent Animated”和“Bytecode Explorer”开源项目。

查看英文原文: RXJava2 by Example

2017 年 2 月 27 日 16:507710

评论

发布
暂无评论
发现更多内容

[python基础]2 python数据类型上篇

我是程序员小贱

航运区块链 抗疫危中有机

CECBC区块链专委会

区块链 航运

JDK中居然也有反模式接口常量

看山

Java 源码阅读

16张图入门Nginx——(前端够用,运维入门)

执鸢者

nginx 运维 前端

[python基础]3 python数据类型下篇(不得不看的字典,列表大总结)

我是程序员小贱

华为的“少年天才”攀登者,出发向智能存储的“奥林帕斯山”

脑极体

Bash 脚本的单元测试

柴锋

bash Linux DevOps Unit Test Shell

spark学习之IDEA配置spark并wordcount提交集群

我是程序员小贱

异常处理的那些事儿

松花皮蛋me

Java 设计模式

Docker搭建PHP+Nginx+MySQL+Redis

书旅

Docker 镜像 lnmp

全面了解CGI、FastCGI、PHP-FPM

书旅

CGI PHP-FPM Fast-CGI

平均负载是什么?

我是程序员小贱

MySQL 基准测试

多选参数

MySQL

结算场景下的跳坑记

墨凡

python必备知识总结

我是程序员小贱

目前数字人民币试点仍是“4+1” 别误读了

CECBC区块链专委会

数字货币 央行 人民币

一次由默认参数引起的思考

Lart

编程 思考

高效程序员的45个习惯:敏捷开发修炼之道(1)

石云升

读书笔记 敏捷开发

区块链技术--公证人机制

CECBC区块链专委会

区块链 数字货币 公证人

正则表达式位置匹配——匹配两个特殊符号中间的内容

jerry.mei

Java 正则表达式 前端 字符串匹配

SpringBoot系列(八):SpringBoot 中的事务处理

xcbeyond

Java 微服务 事务 springboot

敏捷到底是个什么鬼?

刘华Kenneth

程序员 敏捷 change

解析 HashMap 源码概括

shengjk1

Java hashmap

这样看mybatis,谁都会分析源码!

诸葛小猿

源码 mybatis mybatis源码

螺旋矩阵算法,臭代码解析,微服务架构 Service Mesh 服务网格 RPC 协议实现原理 Dubbo 通讯协议,John 易筋 ARTS 打卡 Week 13

John(易筋)

ARTS 打卡计划

这些年看过的Linux相关书籍推荐

我是程序员小贱

SpringBoot系列(七):SpringBoot 中使用Redis缓存

xcbeyond

Java redis 微服务 springboot

解析 HashMap 源码之基本操作 put

shengjk1

Java hashmap

毕玄大佬的分享以及给我的感悟

白色蜗牛

Java 程序员 技术 职场 架构师

解析 hashMap 源码之基本操作 get

shengjk1

Java hashmap

Rust特征与泛型区别点

编号94530

rust 泛型 封装、继承、多态

InfoQ 极客传媒开发者生态共创计划线上发布会

InfoQ 极客传媒开发者生态共创计划线上发布会

RxJava2实例解析-InfoQ