本文要点:
- 在进行响应式设计之前,确保你的项目的确适合使用响应式编程
- 响应式方法总会返回一些什么,因为它们构建了一个执行框架,但是不是开始去执行
- 响应式编程允许你声明操作之间的状态序列和并行关系,将执行优化推给了框架。
- 在流中,错误是头等大事,应该立即得到处理
- 由于许多流是异步的,所以在使用同步框架进行测试时必须特别小心。
这篇文章来自于 SpringOne 的一个演讲。你可以在
这里看到这个演讲的视频。
在过去的几年里,Java 世界中在大力推动响应式编程的。无论是NodeJS 开发人员使用非阻塞api 的成功,还是引发延迟的微服务的爆炸式增长,还是仅仅是想要更有效地利用计算资源,许多开发人员都开始将响应式编程看作一种可行的编程模型。
幸运的是,涉及到响应式框架以及如何正确使用它们时,Java 开发人员被选择给宠坏了。没有太多编写响应式代码的“错误”方法,但是,这同时也是问题所在;也没多少编写响应式代码的“正确”方法。
在本文中,我们的目的是给你一些关于如何编写响应式代码的意见。这些观点来自多年来开发一个大规模的响应式API 的经验,虽然它们可能并不适合你,但我们希望它们在你开始你的响应式之旅时能给你一些方向。
本文中的示例都来自于Cloud Foundry Java 客户端。这个项目使用Reactor 项目的响应式框架。我们为这个Java 客户端选择Reactor 的原因,是因为它与Spring 团队有紧密的集成,但是我们讨论的所有概念也都适用于其他的响应式框架,比如RxJava。如果你对Cloud Foundry 有一些了解,这将很有帮助,但这不是必需的。这些例子有自解释性命名,在解释每个响应式概念时它们将助你更好地理解。
响应式编程是一个巨大的主题,它远远超出了本文的范围,但是为了实现我们的目的,让我们宽泛地把它定义为一种用更流畅的方式定义事件驱动系统的方法,而不是传统的命令式编程风格。其目标是将命令式逻辑转换为异步、非阻塞、函数式的样式,这种样式更容易理解和推理。
为这些做法(threads、NIO、callbacks 等等)设计的命令式API 并未考虑如何正确、可靠和方便地使用,许多情况下,在应用程序代码中使用这些API 仍需要大量显式地管理。响应式框架的承诺是,这些关注点可以在幕后处理,从而让开发人员能够把主力精力放在应用程序功能代码的编写上。
我应该使用响应式编程吗?
在设计响应式API 时,首先要问自己的问题是,你是否想要一个响应式API! 响应式api 不可能适用于所有的一切。响应式编程有显而易见的缺点(目前最大的问题是调试,但框架和 ide 都正在积极解决此问题)。相反,当价值明显大于缺点时,你就选择响应式 API 吧。在作出这个判断时,有几个用于响应式编程的模式非常适合。
网络化
网络请求本质上就撇不开 (相对) 较大的延迟,而且等待这些响应返回通常是系统中最大的资源浪费。在非响应式应用程序中,那些等待中的请求通常会阻塞线程并消耗堆栈内存,空闲着等待响应到达。远程故障和超时通常没有得到系统地、明确地处理,因为提供的 API 不容易做到这一点。最后,远程调用的负载通常是未知的、无边界的,导致堆内存耗尽。响应式编程与非阻塞 IO 相结合,解决了这类问题,因为它为你提供了一个清晰的和显式的 API。
高并发操作
它也很适合用于协调高并发操作 (如网络请求或可并行化 cpu 密集型计算)。响应式框架,虽然允许显式管理线程,但采用自动线程管理也很出色。像.flatmap() 这样的操作符透明地并行化行为,最大化地利用可用资源。
大规模可扩展应用
每个链接一个线程的 servlet 模型已经为我们服务了很多年了。但是,随着微服务的出现,我们已经开始看到应用程序大规模地扩展 (25、50 甚至 100 个单个无状态应用程序的实例) 来处理连接负载,即使 CPU 使用率处于空闲状态。选择非阻塞 IO 加响应式编程效果更佳,打破了链接与线程间的这种联系,使可用资源得到更有效的利用。很明显,这样的优势通常是惊人的。它常常需要在 Tomcat 上构建一个应用程序的更多实例,这些应用程序需要成百上千的线程来处理相同的负载,就像同一应用程序构建在拥有 8 个线程的 Netty 上一样。
虽然以上所列不能完全用来评判响应式编程在哪里适用,但关键是要记住,如果你的应用不适合以上任何一种,那么你用它可能只是徒增复杂度,而不会增加任何价值。
响应式 API 应该返回什么?
如果你回答了第一个问题,判定出你的应用会从响应式 API 得到收益,那么就到了设计 API 的时候了。决定你的响应式 API 应该返回什么基本类型是一个好的起点。
Java 世界中的所有响应式框架 (包括 Java 9 的 Flow) 都是在响应式流规范之上通信的。这个规范定义了一个低级的交互API,但是它不被认为是一个响应式框架(也就是说,它未针对流指定可用的操作符)。
在Reactor 项目中有两种主要的类型。Flux
Flux<Application> listApplications() {...} Flux<String> listApplicationNames() { return listApplications() .map(Application::getName); } void printApplicationName() { listApplicationNames() .subscribe(System.out::println); }
在本例中,listApplications() 方法执行一个网络调用,并返回 0 到 N 个应用程序实例的 Flux。然后,我们使用.map() 操作符将每个应用程序转换为其名称的字符串。然后将以应用程序命名的 Flux 消费并输出到控制台。
Flux<Application> listApplications() {...} Mono<List<String>> listApplicationNames() { return listApplications() .map(Application::getName) .collectList(); } Mono<Boolean> doesApplicationExist(String name) { return listApplicationNames() .map(names -> names.contains(name)); }
Mono 并不像 Flux 那样有一个流,但是因为它们在概念上是一个元素的流,所以我们使用的操作符通常有相同的名称。在这个例子中,除了映射到应用程序名称的 Flux 之外,我们还将这些名称收集到一个 List 中。在这种情况下,包含该列表的 Mono 可以被转换为一个 boolean 值,表示其中是否包含某个名称。这可能与直觉不符,但是如果你正在处理的项目在逻辑上是一个项目的集合,而不是它们的流,那么返回一个集合的 Mono 也很正常 (例如 Mono<List
与命令式 API 不同,void 不是一个适当的响应式返回类型。相反,每一个方法都必须返回一个 Flux 或者一个 Mono。这可能看起来很奇怪 (仍然有一些行为没有任何返回呀!),但这是一个响应流基本操作的结果。调用响应式 API 的代码执行 (例如.flatmap ().map()…) 是构建了一个数据到流的结构,但实际上并没有转换数据。只有在最后,当.subscribe() 被调用时,数据才会开始向流转换,并在随之完成转换。这种惰性执行正是为什么基于 lambdas 构建响应式编程的原因,以及为什么总要有返回类型,因为必须得有一些东西去.subscribe()。
void delete(String id) { this.restTemplate.delete(URI, id); } public void cleanup(String[] args) { delete("test-id"); }
上面这种的命令式阻塞示例可以返回 void,因为它的网络调用会立即开始执行,直到接收到响应时才返回。
Mono<Void> delete(String id) { return this.httpClient.delete(URI, id); } public void cleanup(String[] args) { CountDownLatch latch = new CountDownLatch(1); delete("test-id") .subscribe(n -> {}, Throwable::printStackTrace, () -> latch::countDown); {1} latch.await(); } {1}
在这个响应式示例中,网络调用直到.subscribe() 被调用后才开始,在 delete() 之后返回,因为它是用来生成调用的结构,而不是调用本身的结果。在本例中,我们使用返回 0 个条目的 Mono
方法的范围
一旦你决定了你的 API 需要返回什么,你就需要考虑你的每个方法 (API 和实现) 将会做什么了。在该 Java 客户端上,我们发现把方法设计小且可复用会带来收益。它使每一种方法更容易组成更大的操作。这还能让它们更灵活地组合成并行或顺序操作。此外,它还使潜在的复杂流程更具可读性。
Mono<ListApplicationsResponse> getPage(int page) { return this.client.applicationsV2() .list(ListApplicationsRequest.builder() .page(page) .build()); } void getResources() { getPage(1) .flatMapMany(response -> Flux.range(2, response.getTotalPages() - 1) .flatMap(page -> getPage(page)) .startWith(response)) .subscribe(System.out::println); }
这个例子演示了我们如何调用一个分页的 API。第一个 getPage() 请求检索结果的第一页。在结果的第一页中包括我们需要检索的页面总数,以获得完整的结果。因为 getPage() 方法是小的、可重用的,而且没有其他额外作用,所以我们可以重用该方法,并可以通过 totalPages 并行为第 2 页进行调用!
顺序和并行协调
现在,几乎所有显著的性能改进都来自对并发性的提升。我们知道这一点,但许多系统的并发要么仅涉及传入的连接,要么根本不并发。大部分这种情况都是源自这样一个事实,那就是实现一个高度并发的系统又困难又容易出错。响应式编程的一个重要优点是,你可以定义操作之间的顺序和并行关系,并让框架确定利用可用资源的最佳方式。
再看一遍前面的例子 ; 保证第一个 getPage() 调用在针对每个附加页面的后续调用之前发生。此外,由于后续对 getPage() 的调用是在.Flatmapmany() 中的,所以由框架负责优化多线程执行,并将结果汇到一起返回,传播可能发生的任何错误。
条件逻辑
与命令式编程不同,在响应式编程中错误是作为一种值来考虑的。这意味着它们是通过流操作来传递的。这些错误可以通过所有方式传递给消费者,或者流可以基于它们改变行为。这种行为变化可以表现为错误的转换或基于错误产生新的结果。
public Mono<AppStatsResponse> getApplication(GetAppRequest request) { return client.applications() .statistics(AppStatsRequest.builder() .applicationId(request.id()) .build()) .onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR), t -> Mono.just(AppStatsResponse.builder().build())); }
在本例中,我们要求为正在运行的应用程序获取统计信息。如果一切正常,响应就会传回给消费者。但是,如果接收到一个错误 (带有特定的状态代码),则返回一个空响应。使用者永远不会看到错误和执行过程中的默认值,就好像从来没有发出过错误信号一样。
如前所述,一个流完成时未发送任何条目也是有效的。通常,这就相当于返回 null(其中 void 返回类型是一种特殊情况)。像以上这种出错的情况一样,没有任何条目的完成结果可以一直传递给消费者,或者流可以基于它们改变行为。
public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) { return requestPrivateDomains(request.getId()) .switchIfEmpty(requestSharedDomains(request.getId())); }
在本例中,getDomains() 返回一个域,该域可以位于两个不同的桶中。首先搜索私有域,如果成功完成,即使没有结果,也会搜索共享域。
public Mono<String> getDomainId(GetDomainIdRequest request) { return getPrivateDomainId(request.getName()) .switchIfEmpty(getSharedDomainId(request.getName())) .switchIfEmpty(ExceptionUtils.illegalState( "Domain %s not found", request.getName())); }
也可以用无条目表示一种错误条件。在这个示例中,如果没有找到私有或共享域,就会生成一个新的 IllegalStateException 并传递给使用者。
然而有时,你希望根据无错误或空来做决策,但不是根据值本身。虽然可以使用操作符来实现这个逻辑,但人们常常发现,其复杂度要远远高于其价值。在本例中,你应该只使用命令式条件语句。
public Mono<String> getDomainId(String domain, String organizationId) { return Mono.just(domain) .filter(d -> d == null) .then(getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() // select first returned .switchIfEmpty(ExceptionUtils.illegalState("Domain not found"))) .switchIfEmpty(getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain))); }
这个示例返回给定的组织 (一个分级容器) 中给定域名的 id。这里有两个分支:如果域为空,则返回组织范围内第一个共享域或私有域的 id。如果域不为空,则搜索显式的域名,并返回它的 id。如果你觉得这段代码令人迷惑难懂,不要绝望,我们也一样!
public Mono<String> getDomainId(String domain, String organizationId) { if (domain == null) { return getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() .switchIfEmpty(ExceptionUtils.illegalState("Domain not found")); } else { return getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain)); } }
这个示例效果一样,但使用的是命令式条件语句。但更容易理解得多了,你觉得呢?
测试
实际上,大多数有用的流都是异步的。这在测试中是有问题的,因为测试框架往往都是同步的,注册是通过了还是失败了,在异步结果返回之前就应该有结果了。为了弥补这一点,你必须阻塞主线程,直到返回结果,然后将这些结果发至断言的主线程中。
@Test public void noLatch() { Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(s -> assertEquals("bravo", s)); }
这个示例在非主线程上发出一个字符串,出人意料地是,通过了测试。这个测试通过的根本原因,就是当它显然不应该通过的时候,noLatch 方法将会完成执行,而没有抛出一个 AssertionError。
@Test public void latch() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> actual = new AtomicReference<>(); Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(actual::set, t -> latch.countDown(), latch::countDown); latch.await(); assertEquals("bravo", actual.get()); }
这个例子,它使用一个 CountDownLatch 来确保 latch() 方法在流完成之后才返回,虽然不可否认它很笨拙。一旦 latch 释放,主线程中的断言就会抛出一个 AssertionError,导致测试失败。
如果你看了这些代码,拒绝以这种方式实现你所有的测试,大家一定会体谅你的,我们保证。幸运的是,Reactor 提供了一个 StepVerifier 类来辅助测试。
对响应式设计的测试需要的不仅仅是阻塞。你通常需要对多个值和预期错误进行断言,同时确保意外错误会导致测试失败。StepVerifier 对每一项都有所考虑。
@Test public void testMultipleValues() { Flux.just("alpha", "bravo") .as(StepVerifier::create) .expectNext("alpha") .expectNext("bravo") .expectComplete() .verify(Duration.ofSeconds(5)); }
在这个示例中,使用 StepVerifier 来预期精确发出了 alpha 和 bravo,然后流完成。如果其中一个没有发出,发出了一个额外的元素,或者产生一个错误,测试就会失败。
@Test public void shareFails() { this.domains .share(ShareDomainRequest.builder() .domain("test-domain") .organization("test-organization") .build()) .as(StepVerifier::create) .consumeErrorWith(t -> assertThat(t) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Private domain test-domain does not exist")) .verify(Duration.ofSeconds(5)); }
这个例子使用了一些更高级的 StepVerifier 特性,并不仅断言已经发出了一个错误信号,而且它还是一个 IllegalArgumentException,并且消息匹配预期结果。
CountDownLatches
关于响应式框架的一个关键问题是,它们只能协调自己的操作和线程模型。许多响应式编程的执行环境将不仅仅只有一个线程 (例如 Servlet 容器)。在这些环境中,响应式编程天然的异步属性并不是问题。但是,有一些环境,比如上面的测试示例,那里的进程将在任何单独的线程之前结束。
public static void main(String[] args) { Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println); }
就像该测试方法一样,这个 main() 方法将在 alpha 发出之前终止。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println, t -> latch.countDown(), latch::countDown); latch.await(); }
就像在该测试示例中一样,一个 CountDownLatch 可以确保主线程在流终止之前不会终止,不管它是在什么线程上执行的。
阻塞流
在可预见的将来,在响应式编程中与阻塞 api 交互会成为一种常见现象。为了在两者之间架起桥梁,在等待结果的时候会适当地进行阻塞。但是,当以这种方式连接到阻塞 API 时,会丢失响应式编程的一些好处,比如有效的资源使用。因此,你将希望尽可能长地保持代码的响应性,直到最后一刻才阻塞。同样值得注意的是,这个想法的逻辑总结一下就是,一个响应式的 API 可以被阻塞,但是一个阻塞的 API 永远不能成为响应式。
Mono<User> requestUser(String name) {...} User getUser(String name) { return requestUser(name) .block(); }
在这个例子中,.block() 用于桥接 Mono 的结果到必须的返回类型。
Flux<User> requestUsers() {...} List<User> listUsers() { return requestUsers() .collectList() .block(); }
和前面的例子一样,.block() 用于将结果桥接到必须的返回类型,但在此之前,流必须被收集到一个列表中。
错误处理
如前所述,错误是流经系统的值。这意味着一直都没有一个合适的点来捕获异常。但是,你应该将它们作为流的一部分处理,或者作为订阅者。.Subscribe() 方法有 0 到 3 个参数,这些参数允许你处理每个条目,如果错误成了就对它进行处理,并对流的完成情况进行处理。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flux.concat(Mono.just("alpha"), Mono.error(new IllegalStateException())) .subscribe(System.out::println, t -> { t.printStackTrace(); latch.countDown(); }, latch::countDown); latch.await(); }
在本例中,值和错误都传递给了订阅服务器。需要记住的是,当使用 CountDownLatch 时,只有一个 onError() 或 onComplete() 被调用。因此,在错误和成功的情况下你都必须要释放锁。
可组合方法引用
正如你所想象的,任何严重依赖于 lambdas 的编程模型都容易受到“回调地狱”的影响。但是,具有一定的规范和方法引用,就不是问题了。所有明智的 Ruby 开发人员都会告诉你的是,小的私有方法 (甚至只有一行程序!) 在可读性方面非常有价值。如果你很好地为方法命了名并使用方法引用语法,那么你就可以创建出非常可读的流。
public Flux<ApplicationSummary> list() { return Mono .zip(this.cloudFoundryClient, this.spaceId) .flatMap(function(DefaultApplications::requestSpaceSummary)) .flatMapMany(DefaultApplications::extractApplications) .map(DefaultApplications::toApplicationSummary); }
在这个例子中,这个流就很易读。为了获得一个 Flux
点自由风格
在本文中,你可能已经注意到我们使用了非常紧凑的风格。 这叫做 Pointfree style 。它的主要好处是它帮助开发人员站在编写函数 (高层关注) 的角度来思考,而不是摆弄数据 (低层次的关注)。我们不会说在编写响应式编程时这是一个硬性要求,但是我们发现大多数人最终都更喜欢它。
Mono<Void> deleteApplication(String name) { return PaginationUtils .requestClientV2Resources(page -> this.client.applicationsV2() .list(ListApplicationsRequest.builder() .name(name) .page(page) .build())) .single() .map(applicationResource -> applicationResource.getMetadata().getId()) .flatMap(applicationId -> this.client.applicationsV2() .delete(DeleteApplicationRequest.builder() .applicationId(applicationId) .build())); }
如果你看一下这个例子,你会发现许多地方在分配变量、返回结果,通常这让它看起来更像传统的命令式代码。而这,并不会增加它的可读性。相反,添加更多的大括号、分号、等号和返回语句,虽然识别了数据来自哪里,希望能够更加明确,但可能会混淆流本身实际的重点。
响应式编程是一个巨大的课题,几乎每个人都在开始接触它。在编写响应式代码时,“错误”的选择非常少,但同时,大量的选择会让许多开发人员感到困惑,不知道什么是最好的入门方法。我们的意见来自于一个大型项目的经验,我们希望它能对你的响应式之旅有所帮助,我们鼓励你通过实验来推动技术发展,并将你的发现回馈给社区。
关于作者
Ben Hale 是 Pivotal Cloud Foundry Java 体验团队的领导者,他负责运行在 Cloud Foundry 上的 Java 应用程序相关的生态系统。
Paul Harris 是 Pivotal Cloud Foundry Java 客户端的首席开发人员,他的职责是使 Java 应用程序能够协调和管理 Cloud Foundry。
评论