2025 AI基础设施风向标,不看必后悔!#AI基础设施峰会 了解详情
写点什么

RXJava 实例解析

  • 2016-11-16
  • 本文字数:7256 字

    阅读完需:约 24 分钟

要点

  • 响应式编程是一种处理异步数据流的规范
  • 响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持
  • 弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构
  • 响应式编程风格看起来跟 Java Streams API 有点相似,不过本质上是不一样的
  • 如何连接到动态流处理异步数据源

在高并发编程范式的发展过程中,我们使用过很多工具,比如 java.util.concurrent 包、Akka Streams 框架、CompletableFuture 类以及 Netty 框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。

但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。

在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。

首先要记住的是,响应式里所有的东西都是流。Observable 封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。

在运行我们给出的例子之前,需要把 RxJava 的依赖加入到项目里。可以在 Maven 里加入这个依赖:

复制代码
<dependency>
<groupId>io.reactivex.rxjava</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>

Observable 类有几个静态工厂方法和实例方法,它们被用来生成各种新的 Observable 对象,或者把 Observable 对象添加到感兴趣的处理流程里。Observable 是可变的,所以针对它们的操作总是会生成新的 Observable 对象。为了更好地理解我们的例子,我们先来温习一下 Observable 的基本操作,因为在后面的例子里会用到它们。

Observable.just 方法生成一个简单对象,然后返回。例如:

复制代码
Observable.just("Howdy!")

这行代码生成一个新的 Observable 对象,在结束之前触发一个单独的事件,生成字符串“Howdy!”。

可以把新生成的 Observable 对象赋给一个 Observable 变量:

复制代码
Observable<String> hello = Observable.just("Howdy!");

不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个 Observable 对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在 Java 支持 Lambda 表达式,我们就可以使用简洁的声明式风格来表示订阅操作:

复制代码
Observable<String> howdy = Observable.just("Howdy!");
howdy.subscribe(System.out::println);

这段代码仍然会生成字符串“Howdy!”。

跟 Observable 的其它方法一样,just 方法可以被重载:

复制代码
Observable.just("Hello", "World").subscribe(System.out::println);

这行代码会输出

复制代码
Hello
World

just 方法可以被重载,最多可以接收 10 个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。

让我们来看看如果使用列表会发生什么情况:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
Observable.just(words).subscribe(word->System.out.println(word));

这段代码输出一个很平常的结果:

复制代码
[the, quick, brown, fox, jumped, over, the, lazy, dog]

我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入 from 方法:

复制代码
Observable.from(words).subscribe(System.out::println);

这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。

执行这行代码会得到我们想要的多行输出:

复制代码
the
quick
brown
fox
jumped
over
the
lazy
dog

为了能从中获取编号,我们要在 Observable 上多做一些工作。

不过在写代码之前,我们先来看看另外两个操作,range 和 zip。range(i,n) 会创建一个包含 n 个数的流,它的第一个数是从 i 开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。

RX Marbles 这个网站对我们学习响应式编程很有帮助。这个网站使用 JavaScript 渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。

执行一个 zip 操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:

zip 操作通过成对的“zip”映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用 Lambda 表达式来表示。只要其中的一个流完成操作,整个 zip 操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip 可以支持最多 9 个源流的 zip 操作。zipWith 操作可以把一个指定流合并到一个已存在的流里。

现在回到我们的例子上,我们可以使用 range 和 zipWith 操作加入编号,并用 String.format 做映射转换:

复制代码
Observable.from(words)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count)->String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. the
2. quick
3. brown
4. fox
5. jumped
6. over
7. the
8. lazy
9. dog

看起来很不错!现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到 flatMap,flatMap 会从 Observable 里获取事件源(对象、集合或数组),并把这些元素分别映射成 Observable,然后把这些 Observable 扁平化成一个单独的 Observable。

对于我们的例子来说,我们会先用 split 方法把每个单词拆分成一个字母数组,然后用 flatMap 创建一个新的 Observable 对象,这个 Observable 对象包含了组成这些单词的所有字母:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码会输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
...
30. l
31. a
32. z
33. y
34. d
35. o
36. g

所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:

复制代码
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. t
2. h
3. e
4. q
5. u
6. i
7. c
8. k
9. b
10. r
11. o
12. w
13. n
14. f
15. x
16. j
17. m
18. p
19. d
20. v
21. l
22. a
23. z
24. y
25. g

我们从小被告知“quick brown fox”这个全字母短句包含了英语里所有的字母,不过在这里我们只看到 25 个,而不是 26 个。现在让我们对这些字母进行排序,找出丢失的那个字母:

复制代码
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);

这段代码输出:

复制代码
1. a
2. b
3. c
...
17. q
18. r
19. t
20. u
21. v
22. w
23. x
24. y
25. z

看样子是字母“s”丢掉了。为了得到我们期望的结果,需要对数组做一点修改:

复制代码
List<String> words = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dogs"
);
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
1. a
2. b
3. c
4. d
5. e
6. f
7. g
8. h
9. i
10. j
11. k
12. l
13. m
14. n
15. o
16. p
17. q
18. r
19. s
20. t
21. u
22. v
23. w
24. x
25. y
26. z

现在好了!

到目前为止,所有的代码都跟 Java 8 里引入的 Streams API 很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。

Java Streams 和 Lambda 表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管 Stream 的 parallel 操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到“永不停止”的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。

让我们通过弹珠交互图更好地理解这些概念。

merge 操作可以把最多 9 个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被“扁平化”到一个单独的线程里,包括异常事件和结束事件。

debounce 操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:

可以看到,上下两个图中的“1”之间有一个指定的时间间隔,而 2、3、4、5 之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:

另一个有趣的操作是 amb,它是一种不确定性的操作。

amb 操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以 amb 操作选择了这个流。

如果把第一个流里的“20”往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:

如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是 Bloomberg 或者 Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb 操作就会很有用。

Tick Tock

现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省 CPU,我们让它在周末时每三秒生成一次。我们使用混合型的“节奏器”按照一定的节奏生成数据。

首先,我们要创建一个返回 boolean 的方法,它会检查当前时间是否是周末,如果是就返回 true,否则就返回 false:

复制代码
private static boolean isSlowTickTime() {
return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
}

对于边读这篇文章边在 IDE 里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个 15 秒钟内返回 true,在另一个 15 秒钟内返回 false:

复制代码
private static long start = System.currentTimeMillis();
public static Boolean isSlowTime() {
return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
}

接下来我们创建两个 Observable 对象,fast 和 slow,然后使用过滤器对它们进行调度,并把它们合并起来。

我们使用 Observable.interval 操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从 0 开始计算)。

复制代码
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast 每秒生成一个事件,slow 每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。

现在我们把这两个 Observable 合并到一起,通过使用过滤器让 fast 流在工作日生成数据(或者在 15 秒内),slow 流在周末生成数据(或者在另一个 15 秒内)。

复制代码
Observable<Long> clock = Observable.merge(
slow.filter(tick-> isSlowTickTime()),
fast.filter(tick-> !isSlowTickTime())
);

最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。

复制代码
clock.subscribe(tick-> System.out.println(new Date()));

为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理 InterruptedException 异常)。

复制代码
Thread.sleep(60_000);

运行代码的结果:

复制代码
Fri Sep 16 03:08:18 BST 2016
Fri Sep 16 03:08:19 BST 2016
Fri Sep 16 03:08:20 BST 2016
Fri Sep 16 03:08:21 BST 2016
Fri Sep 16 03:08:22 BST 2016
Fri Sep 16 03:08:23 BST 2016
Fri Sep 16 03:08:24 BST 2016
Fri Sep 16 03:08:25 BST 2016
Fri Sep 16 03:08:26 BST 2016
Fri Sep 16 03:08:27 BST 2016
Fri Sep 16 03:08:28 BST 2016
Fri Sep 16 03:08:29 BST 2016
Fri Sep 16 03:08:30 BST 2016
Fri Sep 16 03:08:31 BST 2016
Fri Sep 16 03:08:32 BST 2016
Fri Sep 16 03:08:35 BST 2016
Fri Sep 16 03:08:38 BST 2016
Fri Sep 16 03:08:41 BST 2016
Fri Sep 16 03:08:44 BST 2016
. . .

可以看到,前面 15 个事件之间的时间间隔都是 1 秒,后面 15 秒内的事件之间的时间间隔是 3 秒,就像我们所期望的那样。

连接到已存在的数据源

以上方法用于创建能够生成静态数据的 Observable 是没有问题的。但如何把 Observable 连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?

静态 Observable 和动态 Observable

首先我们先岔开话题,来介绍一下静态 Observable 和动态 Observable 之间的区别。

到目前为止我们讨论的都是静态 Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远 不够。静态 Observable 只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态 Observable 不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态 Observable 转化成动态 Observable:

  1. 调用 Observable 的 publish 方法,生成一个新的 ConnectableObservable
  2. 调用 ConnectableObservable 的 connect 方法,开始生成数据

要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的 onNext 方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由 RxJava 的 AsyncEmitter 来处理。假设我们有一个叫做 SomeFeed 的数据服务,它会生成报价事件,同时有一个 SomeListener 监听这些报价事件以及其它生命周期事件。在 GitHub 上已经有一个实现,如果你想自己动手运行这些代码,可以去下载。

我们的数据源监听器有两个方法:

复制代码
public void priceTick(PriceTick event);
public void error(Throwable throwable);

PriceTick 类包含了 date、instrument 和 price 字段,还有一个 isLast 方法用来判断它是否是最后一个事件:

(点击放大图像)

让我们来看看如何使用 AsyncEmitter 把 Observable 连接到一个实时的数据源上:

复制代码
SomeFeed<PriceTick> feed = new SomeFeed<>();
Observable<PriceTick> obs =
Observable.fromEmitter((AsyncEmitter<PriceTick> emitter) ->
{
SomeListener listener = new SomeListener() {
@Override
public void priceTick(PriceTick event) {
emitter.onNext(event);
if (event.isLast()) {
emitter.onCompleted();
}
}
{1}
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, AsyncEmitter.BackpressureMode.BUFFER);

这段代码几乎是逐字逐句地从 Observable 类的 Javadoc 里摘抄出来的。AsyncEmitter 封装了监听器(第 5 行)的创建过程,并把它注册到数据源上(第 19 行)。Observable 直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了 AsyncEmitter(第 8 行)。第 20 行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:

BackpressureMode.NONE 不使用回压。如果流的速度无法保持同步,可能会抛出 MissingBackpressureException 或 IllegalStateException。

BackpressureMode.ERROR 会在下游跟不上速度时抛出 MissingBackpressureException。

BackpressureMode.DROP 会在下游跟不上速度时把 onNext 的值丢弃。

BackpressureMode.LATEST 会一直保留最新的 onNext 的值,直到被下游消费掉。

这样生成的是静态 Observable。静态 Observable 在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。

为了把它转化成动态 Observable,让所有订阅者可以实时地接收事件通知,我们必须调用 publish 和 connect 方法,就像之前提到的那样:

复制代码
ConnectableObservable<PriceTick> hotObservable = obs.publish();
hotObservable.connect();

最后,我们可以对它进行订阅并显示报价:

复制代码
hotObservable.subscribe((priceTick) ->
System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
priceTick.getInstrument(), priceTick.getPrice()));

关于作者

Victor Grazi是 InfoQ 的 Java 消息队列负责人。他是 2012 年的 Oracle Java Champion,在 Nomura Securities 负责核心平台工具的开发工作,同时是一名技术顾问和 Java 布道师。他还经常在技术大会上做演讲,是“Java Concurrent Animated”和“Bytecode Explorer”开源项目的负责人。

查看英文原文: RXJava by Example

2016-11-16 16:487781
用户头像

发布了 322 篇内容, 共 146.7 次阅读, 收获喜欢 148 次。

关注

评论

发布
暂无评论
发现更多内容

企业微信接入系列-扫码绑定/登录

六月的雨在InfoQ

企业微信 三周年连更 企业微信扫码 企业微信接入

阿里十亿级并发系统设计+java性能优化实战

做梦都在改BUG

Java 性能调优 并发系统设计

GrowingIO—UEI模型:万物皆可“事件化”

科技热闻

全栈开发实战|​名片管理系统的设计与实现(SSM + JSP)

TiAmo

管理系统 数据库开发 全栈开发 SQL Server 三周年连更

2023年超全前端面试题-背完稳稳拿offer(欢迎补充)

肥晨

三周年连更

Web平台规模化部署高效编码格式的实践和思考

阿里技术

视频编解码

Redis布隆过滤器的原理和应用场景,解决缓存穿透

做梦都在改BUG

Java redis 缓存 布隆过滤器

AI 圈新晋顶流:AutoGPT,Star量近8万,远超PyTorch

Openlab_cosmoplat

人工智能 开源社区 autogpt

实例分享| anyRTC 部署南京某区城市运行“一网统管”综合调度系统

anyRTC开发者

音视频 指挥调度 融合通信 快对讲 综合调度

阿里P8面试官推荐学习的11大专题:java面试精讲框架文档

做梦都在改BUG

Java java面试 框架

华为云数据灾备方案,助力政企数据无忧

神奇视野

华为云数据灾备方案,为数据安全铸造铜墙铁壁

神奇视野

防患于未然,华为云数据灾备解决方案保护企业数据安全

YG科技

MYSQL 主从复制如何保证数据一致性

做梦都在改BUG

Java MySQL 数据库 主从复制

终于学完阿里架构师推荐413页微服务分布式架构基础与实战笔记

做梦都在改BUG

Java 架构 分布式 微服务

React Native 9个好用的开发工具盘点

Onegun

React Native 移动开发 跨端框架

开源即时通讯IM框架MobileIMSDK的微信小程序端技术概览

JackJiang

网络编程 IM 即时通信

Lambda 应用介绍及实现原理剖析

架构精进之路

Java 后端 Lamdba表达式 三周年连更

产教融合| 赛意信息·讯方·深信息产教融合交流研讨会暨国产工业软件人才培养战略合作示范基地揭牌仪式圆满举办

科技热闻

华为云数据灾备,助力企业业务极速恢复

YG科技

抵御数据风险、保障业务安全,就看华为云数据灾备

YG科技

手机穿戴设备能力共享,提升丰富交互体验

HarmonyOS SDK

HMS Core

Shell在日常工作中的应用实践

京东科技开发者

Linux Shell 服务器 shell脚本编程 企业号 4 月 PK 榜

等保2.0来临,华为云助力企业更好应对等保合规

神奇视野

等保2.0时代,华为云助力客户做好等保合规

神奇视野

华为云安全建设安全云生态 保全企业运营安全

神奇视野

Mybatis 通过接口实现 sql 执行原理解析

做梦都在改BUG

Java mybatis SQL执行

浅论分布式训练中的recompute机制

百度Geek说

机器学习 深度学习 分布式 企业号 4 月 PK 榜

关于软件测试领域的 Happy Path

汪子熙

软件测试 测试 自动化测试 测试自动化 三周年连更

迎政策东风,华为云为企业“等保”建设打开想象空间

神奇视野

GreptimeDB v0.2 正式发布 | 50%+ PromQL 兼容、写入性能优化、Dashboard with Playground

Greptime 格睿科技

云原生 时序数据库 PromQL 国产时序数据库

RXJava实例解析_Java_Victor Grazi_InfoQ精选文章