本文最初发布于 GO-JEK 的博客站点,经原作者授权由InfoQ 中文站翻译并分享。
如果你还没有接触RxJava 或者刚刚开始使用它的话,那么你会发现始终会有很多新东西要学习。我们在GO-JEK 的App 中需要执行很多的异步操作,而且我们无法在UI 的速度和流畅性上妥协。
编写大量多线程的Android App 是一件很困难和很有挑战的事情,因为在这个过程中有很多的部件需要进行处理。这连同一些其他的原因促使我们在Android App 中大量使用了RxJava。
在本文中,我们将会讨论如何使用RxJava 真正的多线程功能,它会让复杂的App 开发过程再次变得简单、容易和有趣。本文中的所有代码样例都会关注RxJava,但是文中讨论的概念同样适用于其他的反应式扩展(Reactive Extension)。
为何要进行反应式编程?
任何一篇关于RxJava 的文章都会包含一个“为什么要进行反应式编程”的章节,我们也不打算破坏这个约定。在Android 中采用反应式的方式构建App 会带来多项收益,接下来我们讨论几项你_ 真正_ 值得关注的好处。
不再会有回调地狱
如果你已经做过一段时间的Android 开发的话,你肯定会明白嵌套回调会快速地让所有事情失去控制。
如果你想按顺序执行一系列的异步操作并且下一步操作的行为要依赖于上一步操作的结果,那么你就会遇到这种状况。几乎瞬间,代码就会变得超级丑陋和难以管理。
简单的错误处理
在命令编程的世界中,如果你要执行大量复杂、异步的操作,错误可能会在任意的地方出现,为了处理这些场景,我们就需要将大量补丁式的代码到处放得到处都是,从而形成大量重复和繁琐的代码。
超级简单的多线程
我们都知道(私下也承认)在Java 中编写多线程代码有多困难。在后台线程中执行一段代码并在UI 线程中获取结果,这听起来似乎很简单,但实际上有很多复杂的情况需要处理。
通过使用RxJava,你可以非常容易地在任意线程中执行复杂的操作,它会维持适当的状态同步,并能够让你无缝地切换线程。
RxJava 所带来的好处是无穷无尽的,就此可以谈论几个小时,但是现在我们要更深入地探索一下它为多线程编程所带来的真正威力。
默认情况下,RxJava 并不是多线程的
是的,你没有看错。RxJava 默认情况下跟多线程一点关系都没有。在官方网站上,RxJava 是这样定义的:
借助 observable 序列,在 Java VM 上组合异步和基于事件的程序的库。
看到“异步”这个词,很多人就形成了一种误解,认为 RxJava 默认就是多线程的。的确,它支持多线程并且提供了一些强大的特性以便于我们执行异步操作,但是不能就此断定它的默认行为就是多线程的。
如果你多少了解一些 RxJava 的话,会知道它的基本构造为:
- 源 Observable
- 一个或多个操作符(Operator)
- 目标 Subscriber
Observable.just(1, 2, 3, 4, 5) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { println("Emitting item on: " + currentThread().getName()); } }) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { println("Processing item on: " + currentThread().getName()); return integer * 2; } }) .subscribeWith(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer integer) { println("Consuming item on: " + currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
如果你运行上面的代码片段的话,你会发现所有的执行过程都在应用的主线程中执行(请关注打印出来的日志中的线程名)。这表明在默认情况下,RxJava 是阻塞的。所有的过程都是在代码运行所在的线程上执行的。
额外的福利:想知道doOnNext()
是什么吗?它只是一个副作用操作符,能够让你从 observable 链中脱离出来并执行一些不那么纯的操作,你可以阅读本文了解更多的信息。
让我们开始一些简单的多线程操作
如果我们想要在Android 中使用RxJava 做一些基本的多线程操作,需要做的就是熟悉 Schedulers和observeOn/subscribeOn 操作符,这样的话,就可以开始了。
现在,看一个最简单的多线程用例。假设,我们想要通过网络获取一个Book
列表,并且要在应用的 UI 线程中展现这个列表,我们就采用这个简单直接的用例作为开始。
getBooks().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Book>() { @Override public void onNext(@NonNull Book book) { // You can access your Book objects here } @Override public void onError(@NonNull Throwable e) { // Handler errors here } @Override public void onComplete() { // All your book objects have been fetched. Done! } });
在这里,有一个getBooks()
方法,它会发起网络调用并获取图书列表。网络调用会耗费一定的时间(通常几毫秒到几秒钟的时间),因此,我们使用subscribeOn()
并指定Schedulers.io()
Scheduler 在 I/O 线程中执行操作。
我们同时还使用了observeOn()
操作符和AndroidSchedulers.mainThread()
Scheduler,以便于在主线程中消费结果并将图书列表填充到应用的 UI 之中。这些知识可能你之前已经了解过或使用过了。
不必担心,我们马上就会进入更高级的内容。现在展现的这些内容只是为了确保我们在同一个起跑线上,在深入介绍更深入内容之前能有一个基本的认识。
使用 Scheduler
RxJava 中的多线程操作主要是由强大的 Scheduler 集合提供的。在 RxJava 中,我们无法直接访问或操作线程。如果想要使用线程的话,必须要通过内置的 Scheduler 来实现。
你可以将 Scheduler 视为线程或线程池(一个线程的集合),能够执行不同类型的工作。
简而言之,如果你需要在特定的线程中执行任务的话,我们就需要此选择恰当的 Scheduler,Scheduler 接下来会从它的池中获取一个可用的线程,并基于该线程执行任务。
在 RxJava 框架中有多种类型的 Scheduler,但是这里比较有技巧的一点就是为合适的工作选择恰当的 Scheduler。如果你没有选择恰当的 Scheduler 的话,那么任务就无法最优地运行,所以接下来,我们尝试理解每一个 Scheduler。
Schedulers.io()
这是由无边界线程池作为支撑的一个 Scheduler,它适用于非 CPU 密集的 I/O 工作,比如访问文件系统、执行网络调用、访问数据库等等。这个 Scheduler 是没有限制的,它的线程池可以按需一直增长。
Schedulers.computation()
这个 Scheduler 用于执行 CPU 密集的工作,比如处理大规模的数据集、图像处理等等。它由一个有界的线程池作为支撑,线程的最大数量就是可用的处理器数量。
因为这个 Scheduler 只适用于 CPU 密集的任务,我们希望限制线程的数量,这样的话,它们不会彼此抢占 CPU 时间或出现线程饿死的现象。
Schedulers.newThread()
这个 Scheduler 每次都会创建一个全新的线程来完成一组工作。它不会从任何线程池中受益,线程的创建和销毁都是很昂贵的,所以你需要非常小心,不要衍生出太多的线程,导致服务器系统变慢或出现内存溢出的错误。
理想情况下,你应该很少使用这个 Scheduler,它大多用于在一个完全分离的线程中开始一项长时间运行、隔离的一组任务。
Schedulers.single()
这个 Scheduler 是 RxJava 2 新引入的,它的背后只有一个线程作为支撑,只能按照有序的方式执行任务。如果你有一组后台任务要在 App 的不同地方执行,但是同时只能承受一个任务执行的话,那么这个 Scheduler 就可以派上用场了。
Schedulers.from(Executor executor)
我们可以使用它创建自定义的 Scheduler,它是由我们自己的Executor
作为支撑的。在有些场景下,我们希望创建自定义的 Scheduler 为 App 执行特定的任务,这些任务可能需要自定义的线程逻辑。
假设,我们想要限制 App 中并行网络请求的数量,那么我们就可以创建一个自定义的 Scheduler,使其具有一个固定线程池大小的 Executor:Scheduler.from(Executors.newFixedThreadPool(n))
,然后将其应用到代码中所有网络相关的 Observable 上。
AndroidSchedulers.mainThread()
这是一个特殊的 Scheduler,它无法在核心 RxJava 库中使用,要使用它,必须要借助 RxAndroid 扩展库。这个 Scheduler 对 Android App 特别有用,它能够在应用的主线程中执行基于 UI 的任务。
默认情况下,它会在应用主线程关联的 looper 中进行任务排队,但是它有一个其他的变种,允许我们以 API 的形式使用任意的 Looper:AndroidSchedulers.from(Looper looper)
。
注意:在使用无边界线程池支撑的 Scheduler 时,比如Schedulers.io()
,我们要特别小心,因为它有可能会导致线程池无限增长,使系统中出现大量的线程。
理解 subscribeOn() 和 observeOn()
现在,我们对不同类型的 Scheduler 已经有了一个清晰的理解,并且掌握了何时该使用哪种 Scheduler,我们继续前进,接下来会详细介绍subscribeOn()
和observeOn()
操作符。
要理解 RxJava 真正的多线程功能,我们需要深入理解这两个操作符分别如何运行以及何时要将它们组合起来。
subscribeOn()
简而言之,这个操作符指定源 observable 要在哪个线程中发出各个条目。读者需要理解这里“源(source)” observable 的含义,如果你有一个 observable 链的话,源 observable 始终位于根部或者说是链的顶部,也就是产出物(emission)生成的地方。
读者也看到了,如果我们不使用subscribeOn()
的话,所有的产出都是直接在代码执行的线程中生成的(在我们的场景中,也就是main
线程)。
接下来,我们将所有的产出物在计算线程中生成,这需要使用subscribeOn()
和 Schedulers.computation()
Scheduler。如果你运行下面的代码片段,所有产出物是在线程池中某个可用的计算线程中生成的,RxComputationThreadPool-1
。
简洁起见,我们没有使用完整的DisposableSubscriber
,因为在这种简单的场景中,我们不需要每次都处理onError()
和onComplete()
,我们只需要处理onNext()
,它只是一个简单的消费者。
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
在链中,将subscribeOn()
放到什么位置其实无关紧要。它只会影响源 observable 并控制在哪个线程中生成条目。
在上面的样例中,读者可能会注意到map()
和filter()
操作符也会生成其他的 observable,subscribeOn()
放到了链的底部。但是,如果你运行下面的代码片段的话,就会发现它只会影响源 observable。如果将observeOn()
同时加入到链中,会更加清晰。即便我们将subscribeOn()
放到observeOn()
下面,它也只会影响源 observable。
Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
另外需要注意的是,我们不能在链中多次使用subscribeOn()
。从技术上讲,你可以这样做,但是它并不会产生额外的作用。在下面的代码片段中,我们将三个不同的 Scheduler 连接到了一起,你能猜出来,哪个Scheduler
会成为源 observable 吗?
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
如果你的答案是Schedulers.io()
,那么恭喜你答对了!
即便我们在链中放置多个subscribeOn()
操作符,只有最靠近源 observable 的那一个会发挥作用。
背后的原理
我们值得花一些时间更深入地理解一下上述的场景。为什么Schedulers.io()
Scheduler 会发挥作用,而不是其他的 Scheduler?正常情况下,你可能会认为Schedulers.newThread()
会生效,因为它是在链的最后才添加上去的。
我们必须要理解,在 RxJava 中,订阅(subscription)必须是基于上游 observable 实例的。如下的代码与我们前面看到的非常类似,只是更加繁琐一些。
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
我们从片段的最后一行开始理解这段代码。在这里,目标订阅者(或者说链中最下游的 observer)基于 observable o3
调用subscribe()
方法,这样会隐式地对它的直接上游 observable o2
调用subscribe()
方法。o3
所提供的 observer 实现会将生成的数字乘以 10。
这个过程会重复进行,o2
会隐式地基于o1
调用 subscribe,所传入的 observer 实现会将偶数过滤出来并允许其通过。现在,我们到达了根部,源 observable o1
没有任何上游 observable 来调用subscribe
。这实际上就完成了 observable 链,源 observable 就能生成其条目了。
对于 RxJava 中订阅是如何实现的,读者应该就会比较清楚了。到目前为止,对 observable 链如何组成以及事件如何从源 observable 开始在链中传播应该有了一个基本的了解。
observeOn()
正如我们刚刚看到的,subscribeOn()
能够指明源 observable 要在一个特定的线程中生成其条目,这个线程还会负责将这些条目一直推送到 sink Subscriber
中。因此,默认情况下,订阅者也会在这个线程中消费这些条目。
但是,我们的应用所期望的行为往往并非总是如此。假设,我们想要通过网络获取一些数据并在 App 的 UI 中展现。
本质上,我们有两件事情需要完成:
- 在非阻塞的 I/O 线程上执行网络调用;
- 在应用的主(UI)线程上消费得到的结果。
我们需要有一个 observable 在 I/O 线程进行网络调用,并将产出传递给目标订阅者。如果你只是使用subscribeOn()
和Schedulers.io()
的话,最终的订阅者将会在 I/O 线程中进行操作。我们无法在主线程之外的其他线程中访问 UI 组件,因此我们如果这样做的话,会遇到麻烦。
现在,我们迫切需要切换线程,而这就是observeOn()
操作符能够发挥作用的地方。在 observable 链中,如果遇到observeOn()
,那么产出物将会立即切换到所指定的线程中。
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
在这个稍微有点牵强的样例中,我们有一个 observable 从网络上获取整数所组成的流。在实际的用例中,这可以换成任意的异步操作,比如读取大文件或从数据库获取数据等等。你可以尝试该代码片段并查看结果,需要关注的是日志中的线程名。
现在,我们看一个稍微复杂一点的样例,这里会使用多个observeOn()
操作符,这样在我们的 observable 链中会多次切换线程。
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(integer -> { println("Mapping item " + integer + " on: " + currentThread().getName()); return integer * integer; }) .observeOn(Schedulers.newThread()) .filter(integer -> { println("Filtering item " + integer + " on: " + currentThread().getName()); return integer % 2 == 0; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
在上面的代码片段中,源 observable 在 I/O 线程生成其条目,因为这里组合使用了subscribeOn()
和Schedulers.io()
。现在,我们想要使用map()
操作符来转换每个条目,只不过需要在计算线程中进行。为了实现这一点,我们可以在map()
操作符之前组合使用observeOn()
和Schedulers.computation()
,这样的话就能切换线程并将产出物传递到计算线程中了。
接下来,需要过滤条目,但是因为某些原因,我们想在一个全新的线程中进行操作。这样的话,我们可以在filter()
操作符前面组合使用observeOn()
和Schedulers.newThread()
,这样的话,就能为每个条目切换至一个新的线程。
最后,我们希望订阅者消费最终处理后的条目并在 UI 上展现结果,为了实现这一点,我们需要再次切换线程,不过这一次需要切换至主线程,这里需要组合使用observeOn()
和AndroidSchedulers.mainThread()
Scheduler。
但是,如果我们多次地连续使用observeOn()
会怎样呢?在下面的代码片段中,最终的订阅者在消费结果的时候,到底使用的是哪个线程呢?是第一个还是最后一个observeOn()
能发挥作用呢?
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
如果你运行这个代码片段的话,你会发现所有的条目都是在RxComputationThreadPool-1
线程中消费的,这意味着最后的observeOn()
和Schedulers.computation()
发挥了作用。但是,为什么呢?
背后的原理
你可能已经猜到,为何是最后的observeOn()
,而不是其他的observeOn()
发挥作用了。我们已经知道,订阅只能针对上游才能发生,另外一方面,产出物只能面对下游才能发挥作用。它们从源 observable 开始,一路沿着链向下,直至最后的 sink subscriber。
observeOn()
操作符只能针对下游 observable 才有效,所以最后的observeOn()
和Schedulers.computation()
覆盖了前面声明的observeOn()
操作符。因此,每当你想为特定的 observable 切换线程时,需要做的就是指定observeOn()
操作符。同步、状态不一致性、竞态条件以及其他线程相关的问题都会在幕后自动处理好了。
到目前为止,相信你已经对如何使用 RxJava 编写多线程的应用并保证 App 的快速和流畅有了很好的理解。
如果你还没有充分理解和吸收文中的知识的话,其实也不要紧,读者可以多读几遍并亲自尝试一下样例代码,这样的话,会有更深刻的理解。对于一篇文章来说,这些内容确实有些多,你可以在这上面多花一点时间。
感谢覃云对本文的审校。
评论