写点什么

测试 RxJava

  • 2016-11-03
  • 本文字数:9057 字

    阅读完需:约 30 分钟

关键要点:

  • RxJava 含有内建的、测试友好的解决方案。
  • 使用 TestSubscriber 去验证 Observable。
  • 使用 TestScheduler 可实现对时间的严格控制。
  • Awaitility 库提供了对测试环境进一步的控制。

你已经阅读过 RxJava 的相关内容,也已经在互联网上体验过像“ RxJava by Example ”中的那些示例,现在打算在自己的代码中探索一下响应式编程了。但是,现在却一直困扰着如何测试那些可能会在代码库中发现的新功能呢?

使用响应式编程,就必须转变对给定问题的推理方式,因为我们要聚焦于作为事件流的流动数据,而非个别数据项。事件通常是被不同的线程所产生和消费,因此在编写测试时必须要对并发问题有着清晰的认识。幸运的是,RxJava 提供了测试 Observable 和 Subscription 的内建支持,并且是直接构建于 RxJava 的核心依赖中。

第一步

让我们回顾一下在“ RxJava by Example ”一文中所给出的那个词汇的例子,看一下如何对该例子作测试。让我们从基础测试工具的设置开始。在我们的测试架构中,使用了 JUnit 作为测试工具。

复制代码
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.*;
import org.junit.Test;
import static org.junit.Assert.assertThat;
public class RxJavaTest {
private static final List<String> WORDS = Arrays.asList(
"the",
"quick",
"brown",
"fox",
"jumped",
"over",
"the",
"lazy",
"dog"
);
}

事实上在没有给定调度器(Scheduler)的情况下,Subscription 将默认运行于调用线程上。因此我们将在首个测试中使用原生的方法。这意味着我们可实现一个 Subscription 接口的对象,在 Subscription 发生后就立刻对其状态做断言(assert)。

复制代码
@Test
public void testInSameThread() {
// given:
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(results::add);
// then:
assertThat(results, notNullValue());
assertThat(results, hasSize(9));
assertThat(results, hasItem(" 4. fox"));
}

注意这里使用了显式的 List容器,与实际订阅者一起累计结果。由于给定的测试很简单,所以可能会使你认为这种显式累加器的方法已经足够好了。但是切记产品级的 Observable 中可能封装了错误或可能产生意外的事件。例子中的 Subscriber 与累加器的简单组合并不足以覆盖这种情况。但不用为此烦恼,RxJava 提供的 TestSubscriber 类型就是用于处理这种情况的。下面我们使用 TestSubscriber 类型重构上面的测试。

复制代码
@Test
public void testUsingTestSubscriber() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

TestSubscriber 不仅可替代用户累加器,还另给出了一些行为。例如它能够给出接收到的消息和每个事件相关数据的规模,它也可对 Subscription 被完成且在 Observable 消费期间没有错误出现的状态做断言。虽然当前测试中的 Observable 并未生成任何的错误,但是回到“ RxJava by Example ”一文,我们从中得知了 Observable 将例外与数据事件等同对待。我们可通过如下的方式通过连接例外事件而模拟错误:

复制代码
@Test
public void testFailure() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Exception exception = new RuntimeException("boom!");
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string))
.concatWith(Observable.error(exception));
// when:
observable.subscribe(subscriber);
// then:
subscriber.assertError(exception);
subscriber.assertNotCompleted();
}

在我们所给出的有限用例中,所有的机制运行良好。但是实际的产品代码可能会完全不同于例子。因此在下文中,我们将考虑一些更加复杂的产品实例。

定制调度器(Scheduler)

在产品代码中,很多用例中的 Observable 都是在特定的线程上执行,这种线程在响应式编程环境中被称为“调度器(Scheduler)”。很多 Observable 操作将可选的调度器参数作为附加参数使用。RxJava 定义了一系列任何时候都可用的命名调度器,包括 IO 调度器(io)、计算调度器(computation,为共享线程)和新线程调度器(newThread)。开发人员也可去实现个人定制的调度器。让我们通过指定计算调度器来修改 Observable 的代码吧。

复制代码
@Test
public void testUsingComputationScheduler() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable
.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable
.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

当运行时就会立刻发现该代码是存在问题的。Subscriber 在测试线程上执行其断言,但是 Observable 在后台线程(计算线程)上生成值。这意味着执行 Subscriber 断言可能先于 Observable 生成所有相关事件,因而导致测试的失败。

为使测试顺利执行,有如下的一些策略可选:

  • 将 Observable 转化为阻塞式的。
  • 强制测试等待,直至给定的条件被满足。
  • 将计算调度器转换为即刻(Schedulers.immediate())调度器。

我们将对每个策略做展开介绍,但将从“将 Observable 转化为阻塞式”开始,因为实现该策略所需做的技术工作最少,这些工作与所使用的调度器无关。我们假设数据在后台线程中生成,这将导致 Subscriber 从同一后台线程得到通知。

我们要做的是强制生成所有的事件,并在下一个声明被执行前就在测试中完成 Observable。这是通过在 Observable 自身上调用 toBlocking() 方法实现的。

复制代码
@Test
public void testUsingBlockingCall() {
// given:
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
Iterable<String> results = observable
.subscribeOn(Schedulers.computation())
.toBlocking()
.toIterable();
// then:
assertThat(results, notNullValue());
assertThat(results, iterableWithSize(9));
assertThat(results, hasItem(" 4. fox"));
}

该方法虽然适用于我们所给出的简单代码,但可能并不适用于实际的产品代码。如果生产者生成所有的数据需要很长的时间,那将会产生什么后果?这将使测试变得非常慢,并增加了编译时间,还可能会有其它的性能问题。这里我推荐一个便利的程序库,就是 Awaitility 。简单地说,Awaitility 是一个以精确、简单易读的方式对异步系统相关期望进行表述的 DSL。在项目中可以用 Maven 添加 Awaitility 的依赖关系。

复制代码
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>

或是使用 Gradle:

testCompile 'org.awaitility:awaitility:2.0.0'Awaitility DSL 的接入点是 org.awaitility.Awaitility.await() 方法(参见下面例子中的第 13 和 14 行代码)。可以使用 Awaitility 定义使测试继续所必须达成的条件,也可在条件中加入超时或其它的时序约束,例如最小、最大或持续范围。对于上面的例子,下面的代码给出了如何在结果中使用 Awaitility:

复制代码
@Test
public void testUsingComputationScheduler() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
await().timeout(2, SECONDS)
.until(subscriber::getValueCount, equalTo(9));
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

此版本测试并未以任何方式改变 Observable 的本质,这使得你做测试时不必对产品代码做任何改动。该版本测试使用最多 2 秒的等待时间通过检查 Subscriber 状态使 Observable 执行其作业。如果一切进行顺利,在 2 秒内就可将 Subscriber 的状态释放给所有的 9 个事件。

Awaitility 具有和 Hamcrest 的匹配符、Java 8 的 lambda 表达式和方法引用等的良好协作,从而给出精确的和可读的测试条件。Awaitility 还提供了预制扩展,用于那些被广泛使用的 JVM 语言,其中包括 Groovy 和 Scala。

我们要给出最后一个策略中使用了 RxJava 的扩展机制,该扩展是以 RxJava API 的组成部分发布的。RxJava 中定义了一系列的扩展点,允许对几乎任何默认的 RxJava 行为进行微调。这种扩展机制使我们可以针对特定的 RxJava 特性提供修改过的值。利用该机制,在无需关心生成代码中所指定的调度器的情况下,我们可在测试中注入选定的调度器。这正是我们所寻找的方法,该方法被封装在 RxJavaHooks 类中。假设产品代码依赖于计算调度器,我们将覆盖它的默认值,返回一个调度器,它作为被调用的代码使事件处理发生,这是即刻调度器(Schedulers.immediate())。下面给出测试的代码:

复制代码
@Test
public void testUsingRxJavaHooksWithImmediateScheduler() {
// given:
RxJavaHooks.setOnComputationScheduler(scheduler -> Schedulers.immediate());
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
try {
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
} finally {
RxJavaHooks.reset();
}
}

在测试中,产品代码察觉不到计算调度器是即刻的。请注意钩子函数必须被重置,否则即刻调度器的设置可能会发生泄漏,导致在各处的测试被破坏。使用 try/finall 代码块会在一定程度上模糊了测试的目的,但是幸运的是我们可以使用 JUnit 规则重构该行为,使测试更加精炼,结果更可读。下面给出使用上述规则的一种可能的实现代码:

复制代码
public class ImmediateSchedulersRule implements TestRule {
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaHooks
.setOnIOScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks
.setOnComputationScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks
.setOnNewThreadScheduler(scheduler -> Schedulers.immediate());
try {
base.evaluate();
} finally {
RxJavaHooks.reset(); }
}
};
}
}

此外,我们还对另外两个调度器的生成方法做了重写。该规则对此后其它的测试目标更为通用。在新的测试用例类中,该规则的使用方法很直接,只需简单地定义一个域,并将其中新类型标注为 @Rule 即可。示例代码如下:

复制代码
@Rule
public final ImmediateSchedulersRule schedulers = new ImmediateSchedulersRule();
@Test
public void testUsingImmediateSchedulersRule() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, index) -> String.format("%2d. %s", index, string));
// when:
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// then:
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValueCount(9);
assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

最终我们可得到与前面测试一样的行为,却没有像前面测试那样的杂乱。下面用一些篇幅来回顾一下我们目前已经做到的事情:

  • Subscribers 将在同一线程中处理数据,只要没有使用特定的调度器。这意味着在 Subscriber 向 Observable 做订阅后,我们就可在该 Subscriber 上做断言。
  • TestSubscriber 可累计事件,并给出自身状态的追加断言。
  • 任何 Observable 都可转换为阻塞式的,这使得无论 Observable 使用何种调度器,我们都可以同步等待事件的生成。
  • RxJava 提供了扩展机制,允许开发人员重写其默认方法,并以适当的方式注入到产品代码中。
  • 并发代码可使用 Awaitility DSL 测试。

上述的每个技术都作用于不同的场景中,但是所有技术都是通过“共同的线程”(译者注:作者在原文中指出 common thread 是作为双关语使用的,其另一个意思是“类似的思路”)相关联:在对 Subscriber 状态做断言之前,测试代码需等待 Observable 完成。考虑到 Observable 的行为会生成数据,是否有方法对该行为进行检查呢?换句话说,是否可以用编程的方式做 Observable 的现场调试?我们将在后文中给出这样的技术。

操控时间

到目前为止我们已用黑箱方式测试了 Observable 和 Subscription。下面我们将考虑另外一种操控时间的技术,该技术使我们可以在 Observable 依然处于活动状态时,打开引擎盖去查看 Subscriber 状态。换句话说,我们将使用采用了 RxJava 的 TestScheduler 类白箱测试技术,这可以说是 RxJava 再一次来救场。这种特定的调度器可精确地设定时间的内部使用方式,例如可将时间提前半秒,或是使时间跳跃 5 秒。我们将首先给出这种新调度器实例的创建方法,然后再讨论代码的测试。

复制代码
@Test
public void testUsingTestScheduler() {
// given:
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, SECONDS, scheduler);
Observable<String> observable = Observable.from(WORDS)
.zipWith(tick, (string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(scheduler)
.subscribe(subscriber);
// expect:
subscriber.assertNoValues();
subscriber.assertNotCompleted();
// when:
scheduler.advanceTimeBy(1, SECONDS);
// then:
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues(" 0. the");
// when:
scheduler.advanceTimeTo(9, SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
}

该“产品”代码有了略微的改变,这是由于我们使用了绑定到调度器时隙(interval())的方法生成计数(第 6 行),而非生成一个计数的范围。但这样做具有一个副作用,就是计数是从零开始生成的,而非从 1 开始。一旦配置了 Observable 和测试调度器,我们立刻做出这样的断言,即假定 Subscriber 不具有值(第 15 行)且没有被完成或生成任何的错误(第 16 行)。这是一个完整性测试,因为此时调度器并没有被移动,因而没有任何值被 Observable 产生或是被 Subscriber 接收到。

下面将时间向前调 1 整秒(第 19 行),该操作将会导致 Observable 生成第一个值,这正是随后的断言集所要检查的(第 22 到 24 行)。

下面将时间从当前时间调到 9 秒。需要注意的是,这意味着将时间准确地调整为调度器启动后的第 9 秒(并非是向前调 1 秒后再向前调 9 秒,即调度器检查启动后的第 10 秒)。换句话说,advanceTimeBy() 方法将调度器的时间调整为相对于当前位置的时间,而 advanceTimeTo() 以绝对的方式调整时间。此后我们做出下一轮的断言(第 28 到 20 行),用于确保所有的数据由 Observable 生成且被 Subscriber 消费。另一件需要说明的事情就是使用 TestScheduler 时,真实的时间是立刻发生调整的,这着意味着测试并不用实际等待 9 秒才去完成。

正如你所看到的,该调度器的使用是非常便利的,仅需将该调度器提供给正在测试的 Observable 即可。但是对使用了指定类型调度器的 Observable,该调度器并不能很好地适用。但是稍等一下,之前我们看到的是如何使用 RxJavaHooks 切换一个不影响生产代码的调度器,而这一次是提供一个代替即刻调度器的 TestScheduler(第 13 到 15 行)。我们甚至可以 apply 定制 JUnit 规则同样的技术,使之前的代码可以用更重用的方式予以重写。首先该新规则为:

复制代码
public class TestSchedulerRule implements TestRule {
private final TestScheduler testScheduler = new TestScheduler();
public TestScheduler getTestScheduler() {
return testScheduler;
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaHooks.setOnIOScheduler(scheduler -> testScheduler);
RxJavaHooks.setOnComputationScheduler(scheduler -> testScheduler);
RxJavaHooks.setOnNewThreadScheduler(scheduler -> testScheduler);
try { base.evaluate(); }
finally { RxJavaHooks.reset(); }
}
};
}
}

紧接着是实际的测试代码(在一个新的测试用例类中),去使用我们的测试规则:

复制代码
@Rule
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
@Test
public void testUsingTestSchedulersRule() {
// given:
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<String> observable = Observable.from(WORDS)
.zipWith(Observable.interval(1, SECONDS),
(string, index) -> String.format("%2d. %s", index, string));
observable.subscribeOn(Schedulers.computation())
.subscribe(subscriber);
// expect
subscriber.assertNoValues();
subscriber.assertNotCompleted();
// when:
testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS);
// then:
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues(" 0. the");
// when:
testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(9);
}

这样你就成功地实现了它。使用经由 RxJavaHooks 注入 TestScheduler 的方法,可在无需更改原始 Observable 组合的情况下编写测试代码,此外它给出了一种在 observable 自身执行期间改变时间、并在特定点上做断言的方法。在本文中给出的所有这些技术,应该足够你选择用来测试 RxJava 的代码了。

未来

RxJava 是最先为 Java 提供响应式编程能力的程序库之一。为了使 RxJava API 更好地符合 Reactive Streams 规范,即将推出的 2.0 版将会是重新设计的。Reactive Streams 规范以 Java 和 JavaScript 运行时为目标,提供了使用非阻塞背压机制(back pressure)的异步流处理标准。这意味着下一版的 RxJava 中将会出现一些 API 改进。对这些改进的详细描述参见 RxJava wiki

对于测试而言,这些核心类型(Observable、Maybe 和 Single)现在都给出了便利易用的 test() 方法,实现现场创建 TestSubscriber 实例。也可在 TestSubscriber 上链接方法调用,对这类用法也有一些新的断言方法。

关于作者

Andres Almiray是一位 Java/Groovy 开发者和 Java 冠军程序员,具有超过 17 年的软件设计和开发经验。他在 Java 早期推出的时代就参与了 Web 和桌面应用的开发。他是开源软件的忠实信徒,参与了 Groovy、JMatter、Asciiidoctor 等广为人知的项目。他是 Griffon 架构的创始者和现任项目领导者,还是 JSR 377 规范的牵头者。

查看英文原文: Testing RxJava


感谢冬雨对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们。

2016-11-03 18:274168
用户头像

发布了 227 篇内容, 共 74.2 次阅读, 收获喜欢 28 次。

关注

评论

发布
暂无评论
发现更多内容

Docker发布/上传镜像到dockerhub&&下载/拉取镜像&&删除dockerhub镜像

A-刘晨阳

Docker Linux 运维 11月月更

追求极致性能!RocketMQ消息通信详解

Java全栈架构师

Java 程序员 面试 RocketMQ 消息中间件

docker修改容器的端口、容器名、映射地址......

A-刘晨阳

Docker Linux 运维 11月月更

Docker——denied: requested access to the resource is denied问题以及解决方法

A-刘晨阳

Docker Linux 运维 11月月更

太强了!终于有人整理出了仿京东电商项目,再次开源了

钟奕礼

Java 编程 架构 项目 java程序员

RxJS 全面解析

PingCode研发中心

响应式编程 RXJS reactivex

Kotlin函数声明与闭包

子不语Any

android kotlin 11月月更

python小知识-python时间操作

AIWeker

Python python小知识 11月月更

自制操作系统日记(7):字符串显示

操作系统

Dragonfly 中 P2P 传输协议优化

SOFAStack

开源

互联网公司网络堡垒机首选哪家品牌?有什么优势?

行云管家

互联网 网络安全 信息安全 堡垒机

想要设计一个良好的接口至少要考虑这14点!

程序员小毕

Java 编程 程序员 程序人生 java面试

旺链科技出席Hyperledger区块链技术峰会,分享数字乡村新业态

旺链科技

区块链 hyperledger 产业区块链 企业号十月PK榜

RxJS 全面解析

阿杰

JavaScript 响应式编程 RXJS

2023年语言和框架我们值得关注什么?

阿里巴巴终端技术

框架 语言 & 开发

【C语言】for 关键字

謓泽

11月月更

OpenHarmony开发之MQTT讲解

OpenHarmony开发者

OpenHarmony

从零到一构建完整知识体系!阿里巴巴Java并发编程技术内幕全网首次公开

Java全栈架构师

源码 程序员 程序人生 Java并发 java面试

Alibaba最新推出的Spring Cloud手册惨遭开源

小小怪下士

Java 程序员 阿里 SpringCloud

Kubectl 命令总结

蜗牛也是牛

如何杜绝 spark history server ui 的未授权访问?

明哥的IT随笔

hadoop spark

LED透明屏焊接和插接安装以及三招提升稳定性

Dylan

LED LED显示屏 led显示屏厂家

制造业行业现状及智能生产管理系统一体化解决方案

优秀

制造业 生产管理系统

微服务熔断限流的一些使用场景

Java永远的神

Java 程序员 微服务 程序人生 架构师

这次,听人大教授讲讲分布式数据库的多级一致性|TDSQL关键技术突破

腾讯云数据库

腾讯云 tdsql 腾讯云数据库 多级一致性 中国人民大学

手慢无!清华大牛熬夜整理Spring微服务架构设计第2版文档,限时删

钟奕礼

Java 编程 架构 计算机 java程序员

全国首个AIGC创作大赛开赛,创作者可靠“AI打工人”躺赚

科技热闻

华为云开发者日震撼来袭!11月20日,上海见!

华为云开发者联盟

开发者 华为云

极客时间架构训练营模块五作业

李晨

架构

高可用性集群软件就选Skybility HA!优势多多!

行云管家

高可用 双机热备

欢迎来嫖!阿里P8高级技术专家携这份818页Java核心技术重磅来袭

钟奕礼

Java 编程 计算机 java程序员 java架构

测试RxJava_Java_Andres Almiray_InfoQ精选文章