RxJava 是 Java 对于反应式编程的一个实现框架,是一个基于事件的、提供实现强大且优雅的异步调用程序的代码库。18 年以来,由淘宝技术部发起的应用架构升级项目,希望通过反应式架构、全异步化的改造,提升系统整体性能和机器资源利用率,减少网络延时,资源的重复使用,并为业务快速创新提供敏捷的架构支撑。在闲鱼的基础链路诸如商品批量更新、订单批量查询等,都利用了 RxJava 的异步编程能力。
不过,RxJava 是入门容易精通难,一不小心遍地坑。今天来一起看下 RxJava 的使用方式、基本原理、注意事项。
开始之前
让我们先看下,使用 RxJava 之前,我们曾经写过的回调代码存在的痛点。当我们的应用需要处理用户事件、异步调用时,随着流式事件的复杂性和处理逻辑的复杂性的增加,代码的实现难度将爆炸式增长。比如我们有时需要处理多个事件流的组合、处理事件流的异常或超时、在事件流结束后做清理工作等,如果需要我们从零实现,势必要小心翼翼地处理回调、监听、并发等很多棘手问题。还有一个被称作“回调地狱”的问题,描述的是代码的不可读性。
Code 1.1
以上 js 代码有两个明显槽点:1.由于传入的层层回调方法,代码结尾出现一大堆的 }) ;2. 代码书写的顺序与代码执行的顺序相反:后面出现回调函数会先于之前行的代码先执行。
而如果使用了 RxJava,我们处理回调、异常等将得心应手。
引入 RxJava
假设现在要异步地获得一个用户列表,然后将结果进行处理,比如展示到 ui 或者写到缓存,我们使用 RxJava 后代码如下:
Code 2.1
userService.getAllUser()是一个普通的同步方法,但是我们把它包到了一个 Observable 中,当有结果返回时,将 user 逐个发送至监听者。第一个监听者更新 ui,第二个监听者写到缓存。并且当上游发生异常时,进行打印;在事件流结束时,打印 finish。
另外还可以很方便的配置上游超时时间、调用线程池、fallback 结果等,是不是非常强大。
需要注意的是,RxJava 代码就像上面例子中看起来很容易上手,可读性也很强,但是如果理解不充分,很容易出现意想不到的 bug:初学者可能会认为,上面的代码中,一个 user 列表返回后,每个元素会被异步地发送给两个下游的观察者,这两个观察者在各自的线程内打印结果。但事实却不是这样:userService.getAllUser()会被调用两次(每当建立订阅关系时方法 getAllUser()都会被重新调用),而 user 列表被查询出后,会同步的发送给两个观察者,观察者也是同步地打印出每个元素。即 sub1 = user1,sub1 = user2,sub1 = user3,sub2 = user1,sub2 = user2,sub2 = user3。
可见,如果没有其他配置,RxJava 默认是同步阻塞的!!!那么,我们如何使用它的异步非阻塞能力呢,我们接着往下看。
Code 2.2
我们用 Observable.fromCallable()代替 code2.1 中最底层的 Observable.create 方法,来创建了一个 Observable(即被观察者)。fromCallable 方法创建的是一个 lazy 的 Observable,只有当有人监听它时,传入的代码才被执行。(关于这一点,我们后面会讲,这里只是为了展示有很多种创建 Observable 的方式)。
然后通过 subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者(map 方法实际也是一个观察者)执行的线程池。map 方法如同很多流式编程 api 一样,将上游的每个元素转化成另一个元素。最后又通过 observeOn(Schedulers.newThread())制定了当前下游的观察者,即最后的 subscribe 中传入的观察者(lambda 方式)执行的线程池。
上面的代码执行后,通过打印的线程名可以看出,被观察者、map、观察者均是不同的线程,并且,主线程最后的"end"会先执行,也就是实现了异步非阻塞。
使用方式
本文不是 RxJava 的接口文档,不会详细介绍每个 api,只是简单讲下一些常见或者特殊 api,进一步阐述 RxJava 的能力。
基本组件
RxJava 的核心原理其实非常简单。可类比观察者模式。Observable 是被观察者,作为数据源产生数据。Observer 是观察者,消费上游的数据源。
每个 Observable 可注册多个 Observer。但是默认情况下,每当有注册发生时,Observable 的生产方法 subscribe 都会被调用。如果想只生产一次,可以调用 Observable.cached 方法。
被观察者 Observable 还有多个变体,如 Single、Flowable。Single 代表只产生一个元素的数据源。Flowable 是支持背压的数据源。通过背压设计,下游监听者可以向上游反馈信息,可以达到控制发送速率的功能。
Observable 和 Observer 是通过装饰器模式层层包装达到从而串联起来。转换 API 如 map 等,会创建一个新的 ObservableMap(基层自 Observable),包装原始的 Observable 作为 source,而在真正执行时,先做转换操作,再发给下游的观察者。
Scheduler 是 RxJava 为多线程执行提供的支持类,它将可以将生产者或者消费者的执行逻辑包装成一个 Worker,提交到框架提供的公共线程池中,如 Schedulers.io()、Schedulers.newThread()等。便于理解,可以将 Schedulers 类比做线程池,Worker 类比做线程池中的线程。可以通过 Observable.subscribeOn 和 Observable.observeOn 分别制定被观察者和观察者执行的线程,来达到异步非阻塞。
RxJava 核心架构图如下:
转换 API
map:见 Code 2.2,一对一转换,如同很多流式编程 api 一样,将上游的每个元素转化成另一个元素
flatMap:一对多转换,将上游的每个元素转化成 0 到多个元素。类比 Java8:Stream.flatMap 内返回的是 stream,Observerable.flatMap 内返回的是 Observerable。注意,本方法非常强大,很多 api 底层都是基于此方法。并且由于 flatMap 返回的多个 Observerable 是相互独立的,可以基于这个特点,实现并发。
组合 API
merge:将两个事件流合并成一个时间流,合并后的事件流的顺序,与上流两个流中元素到来的时间顺序一致。
zip:逐个接收上游多个流的每个元素,并且一对一的组合起来,转换后发送给下游。示例见 code3.1
code 3.1
代码 code 3.1 看起来没什么问题,两个流并发执行,最后用 zip 等待他们的结果。但是却隐藏了一个很重要的问题:RxJava 默认是同步、阻塞的!!当我们想去仿照上面的方式并发发送多个请求,最后用 zip 监听所有结果时,很容易发先一个诡异的现象, code 3.2 的代码中,ob2 的代码总是在 ob1 执行之后才会执行,并不是我们预期的两个请求并发执行。而打印出来的线程名也可以看到,两个 Single 是在同一个线程中顺序执行的!
code 3.2
那为什么 code 3.1 的两个流能够并发执行呢?阅读源码可以发现 zip 的实现其实就是先订阅第一个流,再订阅第二个流,那么默认当然是顺序执行。但是通过 Observable.interval 创建的流,默认会被提交到 Schedulers.computation()提供的线程池中。关于线程池,本文后面会讲解。
创建 API
create :最原始的 create 和 subscribe,其他创建方法都基于此
code 3.3
just :Observable.just("e1","e2"); 简单的创建一个 Observable,发出指定的 n 个元素。
interval:code 3.1 已给出示例,创建一个按一定间隔不断产生元素的 Observable,默认执行在 Schedulers.comutation()提供的线程池中
defer:产生一个延迟创建的 Observable。有点绕:Observable.create 等创建出来的被观察者虽然是延迟执行的,只有有人订阅的时候才会真正开始生成数据。但是创建 Observable 的方法却是立即执行的。而 Observable.defer 方法会在有人订阅的时候才开始创建 Observable。如代码 Code3.4
fromCallable :产生一个延迟创建的 Observable,简化的 defer 方法。Observable.fromCallable(() -> myFun()) 等同于 Observable.defer(() -> Observable.just(myFun()) );
基本原理
RxJava 的代码,就是观察者模式+装饰器模式的体现。
Observable.create
见代码 code 3.3,create 方法接收一个 ObserverableOnSubscribe 接口对象,我们定义了了发送元素的代码,create 方法返回一个 ObserverableCreate 类型对象(继承自 Observerable 抽象类)。跟进 create 方法原码,直接返回 new 出来的 ObserverableCreate,它包装了一个 source 对象,即传入的 ObserverableOnSubscribe。
code4.1
Create 方法就这么简单,只需要记住它返回了一个包装了 source 的 Observerble。
4.2 Observerable.subscribe(observer)
看下 code3.3 中创建订阅关系时(observalbe.subscribe)发生了什么:
code4.2
Observable 是一个抽象类,定义了 subscribe 这个 final 方法,最终会调用 subscribeActual(observer);而 subscribeActual 是由子类实现的方法,自然我们需要看 ObserverableCreate 实现的该方法。
code4.3
1.将观察者 observer 包装到一个 CreateEmitter 里。
2.调用 observer 的 onSubscribe 方法,传入这个 emitter。
3.调用 source(即生产代码接口)的 subscribe 方法,传入这个 emitter。
第二步中,直接调用了我们写的消费者的 onSubscribe 方法,很好理解,即创建订阅关系的回调方法。
重点在第三步,source.subscribe(parent); 这个 parent 是包装了 observer 的 emitter。还记得 source 就是我们写的发送事件的代码。其中手动调用了 emitter.onNext()来发送数据。那么我们 CreateEmitter.onNext()做了什么
code4.4
!isDisposed()判断若订阅关系还没取消,则调用 observer.onNext(t);这个 observer 就是我们写的消费者,code 3.3 中我们重写了它的 onNext 方法来 print 接收到的元素。
以上就是 RxJava 最基本的原理,其实逻辑很简单,就是在创建订阅关系的时候,直接调用生产逻辑代码,然后再生产逻辑的 onNext 中,调用了观察者 observer.onNext。时序图如下。
显然,最基本的原理,完全解耦了和异步回调、多线程的关系。
Observable.map
通过最简答的 map 方法,看下转换 api 做了什么。
如 Code2.1 中,调用 map 方法,传入一个转换函数,可以一对一地将上游的元素转换成另一种类型的元素。
code4.5
code4.5 是 Observable 定义的 final 的 map 方法,可见 map 方法将 this(即原始的 observer)和转换函数 mapper 包装到一个 ObservableMap 中(ObservableMap 也继承 Observable),然后返回这个 ObservableMap(onAssembly 默认什么都不做)。
由于 ObservableMap 也是一个 Observable,所以他的 subscribe 方法会在创建订阅者时被层层调用到,subscribe 是 Observable 定义的 final 方法,最终会调用到他实现的 subscribeAcutal 方法。
code4.6
可以看到 ObservableMap 的 subscribeActual 中,将原始的观察者 t 和变换函数 function 包装到了一个新的观察者 MapObserver 中,并将它订阅到被观察者 source 上。
我们知道,发送数据的时候,观察者的 onNext 会被调用,所以看下 MapObserver 的 onNext 方法。
code4.7
code4.7 中可以看到 mapper.apply(t)将变换函数 mapper 施加到每个元素 t 上,变换后得到 v,最后调用 actual.onNext(v)将 v 发送给下游观察者 actual(actual 为 code4.6 中创建 MapObserver 时传入的 t)。
总结一下例如 map 之类的变换 api 的原理:
1.map 方法返回一个 ObservableMap,包装了原始的观察者 t 和变换函数 function
2.ObservableMap 继承自 AbstractObservableWithUpstream(它继承自 Observable)
3.订阅发生时,observable 的 final 方法 subscribe()会调用实现类的 subscribeActual
4.ObservableMap.subscribeActual 中创建 MapObserver(包装了原 observer),订阅到原 Observable
5.发送数据 onNext 被调用时,先 apply 变换操作,再调用原 observer 的 onNext,即传给下游观察者
线程调度
代码 Code 2.2 中给出了线程调度的示例。subscribeOn(Schedulers.io())指定了被观察者执行的线程池。observeOn(Schedulers.single())指定了下游观察者执行的线程池。经过了上面的学习,很自然的能够明白,原理还是通过装饰器模式,将 Observable 和 Observer 层层包装,丢到线程池里执行。我们以 observeOn()为例,见 code4.8。
1.observeOn(Scheduler) 返回 ObservableObserveOn
2.ObservableObserveOn 继承自 Observable
3.所以 subscribe 方法最终会调用到 ObservableObserveOn 重写的 subscribeActual 方法
4.subscribeActual 返回一个 ObserveOnObserver(是一个 Observer)包装了真实的 observer 和 worker
根据 Observer 的逻辑,发送数据时 onNext 方法会被调用,所以要看下 ObserveOnObserver 的 onNext 方法:
code4.9
1.最终生产者代码中调用 onNext 时,会调用 schedule 方法
2.schedule 方法中,会提交自身(ObserveOnObserver)到线程池
3.而 run 方法会调用 onNext(emmiter)
可见,RxJava 线程调度的机制就是通过 observeOn(Scheduler)将发送元素的代码 onNext(emmiter)提交到线程池里执行。
使用注意
最后,给出几个我们在开发中总结的注意事项,避免大家踩坑。
适用场景
并不是所有的 IO 操作、异步回调都需要使用 RxJava 来解决,比如如果我们只是一两个 RPC 服务的调用组合,或者每个请求都是独立的处理逻辑,那么引入 RxJava 并不会带来多大的收益。下面给出几个最佳的适用场景。
处理 UI 事件
异步响应和处理 IO 结果
事件或数据 是由无法控制的生产者推送过来的
组合接收到的事件
下面给一个闲鱼商品批量补数据的使用场景:
背景:算法推荐了用户的一些商品,目前只有基础信息,需要调用多个业务接口,补充用户和商品的附加业务信息,如用户头像、商品视频连接、商品首图等。并且根据商品的类型不同,填充不同的垂直业务信息。
难点:1. 多个接口存在前后依赖甚至交叉依赖;2. 每个接口都有可能超时或者报错,继而影响后续逻辑;3.根据不同的依赖接口特点,需要单独控制超时和 fallback。整个接口也需要设置整体的超时和 fallback。
方案:如果只是多个接口独立的异步查询,那么完全可以使用 CompletableFuture。但基于它对组合、超时、fallback 支持不友好,并不适用于此场景。我们最终采用 RxJava 来实现。下面是大致的代码逻辑。代码中的 HsfInvoker 是阿里内部将普通 HSF 接口转为 Rx 接口的工具类,默认运行到单独的线程池中,所以能实现并发调用。
可以看到,通过引入 RxJava,对于超时控制、兜底策略、请求回调、结果组合都能更方便的支持。
Scheduler 线程池
RxJava2 内置多个 Scheduler 的实现,但是我们建议使用 Schedulers.from(executor)指定线程池,这样可以避免使用框架提供的默认公共线程池,防止单个长尾任务 block 其他线程执行,或者创建了过多的线程导致 OOM。
CompletableFuture
当我们的逻辑比较简单,只想异步调用一两个 RPC 服务的时,完全可以考虑使用 Java8 提供的 CompletableFuture 实现,它相较于 Future 是异步执行的,也可以实现简单的组合逻辑。
并发
单个 Observable 始终是顺序执行的,不允许并发地调用 onNext()。
code5.1
但是,每个 Observable 可以独立的并发执行。
code5.2
ob3 中组合了 ob1 和 ob2 两个流,每个流是独立的。(这里需要注意,这两个流能并发执行,还有一个条件是他们的发送代码运行在不同线程,就如果 code3.1 和 code3.2 中的示例一样,虽然两个流是独立的,但是如果不提交到不同的线程中,还是顺序执行的)。
背压
在 RxJava 2.x 中,只有 Flowable 类型支持背压。当然,Observable 能解决的问题,对于 Flowable 也都能解决。但是,其为了支持背压而新增的额外逻辑导致 Flowable 运行性能要比 Observable 慢得多,因此,只有在需要处理背压场景时,才建议使用 Flowable。如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用 Flowable。关于 Flowable 的使用,由于篇幅原因,就不在本文阐述。
超时
强烈建议设置异步调用的超时时间,用 timeout 和 onErrorReturn 方法设置超时的兜底逻辑,否则这个请求将一直占用一个 Observable 线程,当大量请求到来时,也会导致 OOM。
结语
目前,闲鱼的多个业务场景都采用 RxJava 做异步化,大大降低了开发同学的异步开发成本。同时在多请求响应组合、并发处理都有很好的性能表现。自带的超时逻辑和兜底策略,在批量业务数据处理中能保证可靠性,是用户流畅体验的强力支撑。
本文转载自:闲鱼技术(ID:XYtech_Alibaba)
评论