写点什么

RXJava 实例解析

  • 2016-11-16
  • 本文字数:7256 字

    阅读完需:约 24 分钟

要点

  • 响应式编程是一种处理异步数据流的规范
  • 响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持
  • 弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构
  • 响应式编程风格看起来跟 Java Streams API 有点相似,不过本质上是不一样的
  • 如何连接到动态流处理异步数据源

在高并发编程范式的发展过程中,我们使用过很多工具,比如 java.util.concurrent 包、Akka Streams 框架、CompletableFuture 类以及 Netty 框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。

但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。

在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。

首先要记住的是,响应式里所有的东西都是流。Observable 封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。

在运行我们给出的例子之前,需要把 RxJava 的依赖加入到项目里。可以在 Maven 里加入这个依赖:

复制代码
<dependency>
<groupId>io.reactivex.rxjava</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>

Observable 类有几个静态工厂方法和实例方法,它们被用来生成各种新的 Observable 对象,或者把 Observable 对象添加到感兴趣的处理流程里。Observable 是可变的,所以针对它们的操作总是会生成新的 Observable 对象。为了更好地理解我们的例子,我们先来温习一下 Observable 的基本操作,因为在后面的例子里会用到它们。

Observable.just 方法生成一个简单对象,然后返回。例如:

复制代码
Observable.just("Howdy!")

这行代码生成一个新的 Observable 对象,在结束之前触发一个单独的事件,生成字符串“Howdy!”。

可以把新生成的 Observable 对象赋给一个 Observable 变量:

复制代码
Observable<String> hello = Observable.just("Howdy!");

不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个 Observable 对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在 Java 支持 Lambda 表达式,我们就可以使用简洁的声明式风格来表示订阅操作:

复制代码
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);

这段代码仍然会生成字符串“Howdy!”。

跟 Observable 的其它方法一样,just 方法可以被重载:

复制代码
Observable.just("Hello", "World").subscribe(System.out::println);

这行代码会输出

复制代码
Hello
World

just 方法可以被重载,最多可以接收 10 个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。

让我们来看看如果使用列表会发生什么情况:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
Observable.just(words).subscribe(word->System.out.println(word));

这段代码输出一个很平常的结果:

复制代码
[the, quick, brown, fox, jumped, over, the, lazy, dog]

我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入 from 方法:

复制代码
Observable.from(words).subscribe(System.out::println);

这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。

执行这行代码会得到我们想要的多行输出:

复制代码
the
quick
brown
fox
jumped
over
the
lazy
dog

为了能从中获取编号,我们要在 Observable 上多做一些工作。

不过在写代码之前,我们先来看看另外两个操作,range 和 zip。range(i,n) 会创建一个包含 n 个数的流,它的第一个数是从 i 开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。

RX Marbles 这个网站对我们学习响应式编程很有帮助。这个网站使用 JavaScript 渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。

执行一个 zip 操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:

zip 操作通过成对的“zip”映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用 Lambda 表达式来表示。只要其中的一个流完成操作,整个 zip 操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip 可以支持最多 9 个源流的 zip 操作。zipWith 操作可以把一个指定流合并到一个已存在的流里。

现在回到我们的例子上,我们可以使用 range 和 zipWith 操作加入编号,并用 String.format 做映射转换:

复制代码
Observable.from(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. the
2. quick
3. brown
4. fox
5. jumped
6. over
7. the
8. lazy
9. dog

看起来很不错!现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到 flatMap,flatMap 会从 Observable 里获取事件源(对象、集合或数组),并把这些元素分别映射成 Observable,然后把这些 Observable 扁平化成一个单独的 Observable。

对于我们的例子来说,我们会先用 split 方法把每个单词拆分成一个字母数组,然后用 flatMap 创建一个新的 Observable 对象,这个 Observable 对象包含了组成这些单词的所有字母:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
...
30. l
31. a
32. z
33. y
34. d
35. o
36. g

所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
9. b
10. r
11. o
12. w
13. n
14. f
15. x
16. j
17. m
18. p
19. d
20. v
21. l
22. a
23. z
24. y
25. g

我们从小被告知“quick brown fox”这个全字母短句包含了英语里所有的字母,不过在这里我们只看到 25 个,而不是 26 个。现在让我们对这些字母进行排序,找出丢失的那个字母:

复制代码
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. a
2. b
3. c
...
17. q
18. r
19. t
20. u
21. v
22. w
23. x
24. y
25. z

看样子是字母“s”丢掉了。为了得到我们期望的结果,需要对数组做一点修改:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
1. a
2. b
3. c
4. d
5. e
6. f
7. g
8. h
9. i
10. j
11. k
12. l
13. m
14. n
15. o
16. p
17. q
18. r
19. s
20. t
21. u
22. v
23. w
24. x
25. y
26. z

现在好了!

到目前为止,所有的代码都跟 Java 8 里引入的 Streams API 很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。

Java Streams 和 Lambda 表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管 Stream 的 parallel 操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到“永不停止”的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。

让我们通过弹珠交互图更好地理解这些概念。

merge 操作可以把最多 9 个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被“扁平化”到一个单独的线程里,包括异常事件和结束事件。

debounce 操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:

可以看到,上下两个图中的“1”之间有一个指定的时间间隔,而 2、3、4、5 之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:

另一个有趣的操作是 amb,它是一种不确定性的操作。

amb 操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以 amb 操作选择了这个流。

如果把第一个流里的“20”往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:

如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是 Bloomberg 或者 Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb 操作就会很有用。

Tick Tock

现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省 CPU,我们让它在周末时每三秒生成一次。我们使用混合型的“节奏器”按照一定的节奏生成数据。

首先,我们要创建一个返回 boolean 的方法,它会检查当前时间是否是周末,如果是就返回 true,否则就返回 false:

复制代码
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}

对于边读这篇文章边在 IDE 里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个 15 秒钟内返回 true,在另一个 15 秒钟内返回 false:

复制代码
private static long start = System.currentTimeMillis();
public static Boolean isSlowTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}

接下来我们创建两个 Observable 对象,fast 和 slow,然后使用过滤器对它们进行调度,并把它们合并起来。

我们使用 Observable.interval 操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从 0 开始计算)。

复制代码
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast 每秒生成一个事件,slow 每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。

现在我们把这两个 Observable 合并到一起,通过使用过滤器让 fast 流在工作日生成数据(或者在 15 秒内),slow 流在周末生成数据(或者在另一个 15 秒内)。

复制代码
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);

最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。

复制代码
clock.subscribe(tick-> System.out.println(new Date()));

为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理 InterruptedException 异常)。

复制代码
Thread.sleep(60_000);

运行代码的结果:

复制代码
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .

可以看到,前面 15 个事件之间的时间间隔都是 1 秒,后面 15 秒内的事件之间的时间间隔是 3 秒,就像我们所期望的那样。

连接到已存在的数据源

以上方法用于创建能够生成静态数据的 Observable 是没有问题的。但如何把 Observable 连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?

静态 Observable 和动态 Observable

首先我们先岔开话题,来介绍一下静态 Observable 和动态 Observable 之间的区别。

到目前为止我们讨论的都是静态 Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远 不够。静态 Observable 只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态 Observable 不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态 Observable 转化成动态 Observable:

  1. 调用 Observable 的 publish 方法,生成一个新的 ConnectableObservable
  2. 调用 ConnectableObservable 的 connect 方法,开始生成数据

要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的 onNext 方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由 RxJava 的 AsyncEmitter 来处理。假设我们有一个叫做 SomeFeed 的数据服务,它会生成报价事件,同时有一个 SomeListener 监听这些报价事件以及其它生命周期事件。在 GitHub 上已经有一个实现,如果你想自己动手运行这些代码,可以去下载。

我们的数据源监听器有两个方法:

复制代码
public void priceTick(PriceTick event);
public void error(Throwable throwable);

PriceTick 类包含了 date、instrument 和 price 字段,还有一个 isLast 方法用来判断它是否是最后一个事件:

(点击放大图像)

让我们来看看如何使用 AsyncEmitter 把 Observable 连接到一个实时的数据源上:

复制代码
SomeFeed<PriceTick> feed = new SomeFeed<>();
Observable<PriceTick> obs =
Observable.fromEmitter((AsyncEmitter<PriceTick> emitter) ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onCompleted();
}
}
{1}
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, AsyncEmitter.BackpressureMode.BUFFER);

这段代码几乎是逐字逐句地从 Observable 类的 Javadoc 里摘抄出来的。AsyncEmitter 封装了监听器(第 5 行)的创建过程,并把它注册到数据源上(第 19 行)。Observable 直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了 AsyncEmitter(第 8 行)。第 20 行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:

BackpressureMode.NONE 不使用回压。如果流的速度无法保持同步,可能会抛出 MissingBackpressureException 或 IllegalStateException。

BackpressureMode.ERROR 会在下游跟不上速度时抛出 MissingBackpressureException。

BackpressureMode.DROP 会在下游跟不上速度时把 onNext 的值丢弃。

BackpressureMode.LATEST 会一直保留最新的 onNext 的值,直到被下游消费掉。

这样生成的是静态 Observable。静态 Observable 在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。

为了把它转化成动态 Observable,让所有订阅者可以实时地接收事件通知,我们必须调用 publish 和 connect 方法,就像之前提到的那样:

复制代码
ConnectableObservable<PriceTick> hotObservable = obs.publish();
hotObservable.connect();

最后,我们可以对它进行订阅并显示报价:

复制代码
hotObservable.subscribe((priceTick) ->
System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
priceTick.getInstrument(), priceTick.getPrice()));

关于作者

Victor Grazi是 InfoQ 的 Java 消息队列负责人。他是 2012 年的 Oracle Java Champion,在 Nomura Securities 负责核心平台工具的开发工作,同时是一名技术顾问和 Java 布道师。他还经常在技术大会上做演讲,是“Java Concurrent Animated”和“Bytecode Explorer”开源项目的负责人。

查看英文原文: RXJava by Example

2016-11-16 16:487478
用户头像

发布了 322 篇内容, 共 139.3 次阅读, 收获喜欢 145 次。

关注

评论

发布
暂无评论
发现更多内容

你听过CatBoost吗?本文教你如何使用CatBoost进行快速梯度提升

计算机与AI

Python 学习 优化

mongodb内核源码实现、性能调优、最佳运维实践系列-百万级高并发mongodb集群性能数十倍提升优化实践(上篇)

杨亚洲(专注MongoDB及高性能中间件)

MySQL 数据库 nosql mongodb 分布式数据库mongodb

LeetCode题解:98. 验证二叉搜索树,递归,JavaScript,详细注释

Lee Chen

大前端 LeetCode

面试大厂被面试官用MyBatis怼到“哑口无言”?这份MyBatis源码笔记助你吊打面试官!

Java架构之路

Java 程序员 架构 面试 编程语言

程序员的美丽假期(并不)

Philips

敏捷开发 快速开发

第四周作业

熊桂平

极客大学架构师训练营

第四周学习心得

熊桂平

极客大学架构师训练营

Go发起HTTP2.0请求流程分析(中篇)——数据帧&流控制

Gopher指北

后端 HTTP2.0 Go 语言

台湾地区为什么会丢包高?

德胜网络-阳

Week 4 命题作业及总结

阿泰

SpringBoot有多重要?面试用SpringBoot把面试官唬住了要30k都行!

Java架构之路

Java 程序员 架构 面试 编程语言

从理论到工具:带你全面了解自动化测试框架

禅道项目管理

开源 DevOps 工具 自动化测试

成为 Apache 贡献者,So easy!

代立冬

Apache 贡献

Github惊现高星神作,两份算法宝典让你横扫大厂算法面试题

编程 程序员 算法 计算机

jvm笔记

pCat

Java JVM

成为一名合格的技术类产品经理

小清新同学

产品经理

华为鲲鹏专家解读:90%代码如何移植到鲲鹏平台

华为云开发者联盟

软件 鲲鹏

让黑产无处遁形 京东智联云推出风险识别服务

京东科技开发者

人工智能 学习 风险识别

阿里P8大牛呕心沥血总结整理的《Java面经手册》,通过实践的方式向你深度讲解Java核心知识点

Java架构之路

Java 程序员 架构 面试 编程语言

mongodb 源码实现、调优、最佳实践系列-百万级高并发mongodb集群性能数十倍提升优化实践(下篇)

杨亚洲(专注MongoDB及高性能中间件)

MySQL nosql mongodb 架构 分布式 分布式数据库mongodb

深入理解 JVM 垃圾回收算法 - 复制算法

Java架构师迁哥

出炉!华为18A自爆SpringCloud微服务分布式笔记

996小迁

Java 编程 架构 面试 SpringCloud

直播预告 | 云时代的数据库客户端——CloudQuery最佳实践

BinTools图尔兹

数据库 sql 安全 工具软件

java安全编码指南之:Thread API调用规则

程序那些事

Java并发 多线程 java安全编码 java安全编码指南 java编码规范

灯下黑中的自己

非著名程序员

个人成长 管理 管理者

日常工作问题集锦

hasWhere

华为云数据安全中心正式公测,8大核心数据安全能力守护你的数据

华为云开发者联盟

华为 安全 数据

详细分析定制企业应用的价格

Learun

敏捷开发 快速开发 软件架构

几行代码轻松实现跨系统传递 traceId,再也不用担心对不上日志了!

程序员小航

Java 日志 链路追踪 工作笔记 traceId

想要高效搭建企业信息平台?教你轻松选择开发框架!

Marilyn

敏捷开发 快速开发

Linux下diff的操作详解

良知犹存

Linux

RXJava实例解析_Java_Victor Grazi_InfoQ精选文章