写点什么

携程基于 Quasar 协程的 NIO 实践

  • 2020-09-03
  • 本文字数:6236 字

    阅读完需:约 20 分钟

携程基于Quasar协程的NIO实践

IO 密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA 上成熟的非阻塞 IO(NIO)技术可解决该问题。目前 Java 项目对接 NIO 的方式主要依靠回调,代码复杂度高,降低了代码可读性与可维护性。近年来 Golang、Kotlin 等语言的协程(Coroutine)能达到高性能与可读性的兼顾。


本文利用开源的 Quasar 框架提供的协程对系统进行 NIO 改造,解决以下两个问题:


1)提升单机任务的吞吐量,保证业务请求突增时系统的可伸缩性。


2)使用更轻量的协程同步等待 IO,替代处理 NIO 常用的异步回调。

一、Java 异步编程与非阻塞 IO

本文改造的系统处理来自前台的任务,通过 HTTP 请求对端服务,还通过 RPC 调用内部服务。当业务高峰时,系统会遇到瞬时并发任务量数十倍激增的情况,系统的线程数量急剧增加造成性能下降。为此,不得不扩容以保证业务高峰时期的性能。



基于 epoll 的 NIO 框架 Netty 在一些框架级别的应用中已经得到了广泛使用,但在快速迭代的业务系统中的应用依然有一定的局限性。NIO 消除了线程的同步阻塞,意味着只能异步处理 IO 的结果,这与业务开发者顺序化的思维模式有一定差异。当业务逻辑复杂以及出现多次远程调用的情况下,多级回调难以实现和维护。

1.1 Java 中的异步工具

Java 项目大多使用 JDK8,除线程外可以获得的异步的编程支持包括 CompletableFuture,以及开源的 RxJava、Vert.x 等反应式编程框架等。这些工具使用了基于响应式编程的链式调用逐级传递事件,未从根本解决回调问题。


如下为将一段简单的逻辑判断使用 CompletableFuture 进行异步改造后的对比。原始版本使用 getA 方法获得第一步的请求结果,根据其相应选择使用 getB1 还是 getB2 获取第二步的响应作为结果。


HttpResponse a = getA();
HttpResponse b ;if(a.getBody().equals("1")){ b=getB1();}else{ b=getB2();}
String ans=b.getBody();
复制代码


首先将三个获取响应的方法改为异步。此处假设 getB1 与 getB2 内部已经具有复杂逻辑,且不属于同一领域,不适合合并为一个方法。


private CompletableFuture<HttpResponse> getA();private CompletableFuture<HttpResponse> getB1();private CompletableFuture<HttpResponse> getB2();
复制代码


然后使用 CompletableFuture 的链式调用,将两个步骤组合起来:


String ans = getA()        .thenCompose(a -> {            if (a.getBody().equals("1")) {                return getB1();            } else {                return getB2();            }        }).get()        .getBody();
复制代码


使用 CompletableFuture 的链式回调后,代码变得不友好。RxJava 等框架同样具有这个问题。这类反应式的编程工具更适合于数据流的传递。对于 if/else、switch/case,乃至 while/for、break/continue 这类过程控制语句,实现与维护的难度都很大。业务系统需要类似于线程的同步等待,同时具有低资源消耗的编码工具,配合 NIO 使用。当时使用 NIO 时,由于可以不占用线程,可以使用一种资源消耗更小的协程来等待。

1.2 协程

协程是一种进程自身来调度任务的调度模式。协程与线程不同之处在于,线程由内核调度,而协程的调度是进程自身完成的。协程只是一种抽象,最终的执行者是线程,每个线程只能同时执行一个协程,但大量的协程可以只拥有少量几个线程执行者,协程的调度器负责决定当前线程在执行那个协程,其余协程处于休眠并被调度器保存在内存中。


和线程类似,协程挂起时需要记录栈信息,以及方法执行的位置,这些信息会被协程调度器保存。协程从挂起到重新被执行不需要执行重量级的内核调用,而是直接将状态信息还原到执行线程的栈,高并发场景下,协程极大地避免了切换线程的开销。下图展示了协程调度器内部任务的流转。



协程中调用的方法是可以挂起的。不同于线程的阻塞会使线程休眠,协程在等待异步任务的结果时,会通知调度器将自己放入挂起队列,释放占用的线程以处理其他的协程。异步任务完毕后,通过回调将异步结果告知协程,并通知调度器将协程重新加入就绪队列执行。

1.3 Quasar 任务调度原理

Quasar(https://github.com/puniverse/quasar)是一个开源的 Java 协程框架,通过利用 Java instrument 技术对字节码进行修改,使方法挂起前后可以保存和恢复 JVM 栈帧,方法内部已执行到的字节码位置也通过增加状态机的方式记录,在下次恢复执行可直接跳转至最新位置。以如下方法为例,该方法分为两步,第一步为 initial 初始化,第二部为通过 NIO 获取网络响应。


public String instrumentDemo(){    initial();    String ans = getFromNIO();    return ans;}
复制代码



Quasar 会在 initial 前增加一个 flag 字段,表明当前方法执行的位置。第一次执行方法时,检查到 flag 为 0,修改 flag 为 1 并继续往下执行 initial 方法。执行 getFromNIO 方法前插入字节码指令将栈帧中的数据全部保存在一个 Quasar 自定义的栈结构中,在执行 getFromNIO 后,挂起协程,让出线程资源。直至 NIO 异步完成后,协程调度器将第二次执行该方法,检测到 flag 为 1,将会调用 jump 指令跳转到 returnans 语句前,并将保存的栈结构还原到当前栈中,最后调用人 return ans 语句,方法执行完毕。

二、系统异步 IO 改造

在项目中添加 Quasar 依赖后,可以使用 Fiber 类新建协程。建立的方法与线程类似。


new Fiber(()->{    //方法体}).start();
复制代码

2.1 整合 Netty 与 Quasar

系统使用的 Http 框架是基于 Netty 的 async-http-client(https://github.com/AsyncHttpClient/async-http-client),该框架提供了异步回调和 CompletableFuture 两种对响应的异步处理方式。


CompletableFuture 自 JDK8 推出,与之前的 Future 类最大的不同在于,提供了异步任务跨线程的通知和控制机制。即,任务的等待者可以在 CompletableFuture 注册任务完成或异常时的回调,而执行者也可以通过它通知等待者。Quaasr 框架对它也做了支持,提供了 API 用于在协程中等待 CompletableFuture 的结果。调用后,协程将挂起,直至 future 状态为已完成。


AsyncCompletionStage.get(future)
复制代码


通过 CompletableFuture 作为通知中介,我们可以将 AsyncHttpClient 与 Quasar 做整合,挂起协程等待 IO 结果。


//创建HttpClientAsyncHttpClient httpClient = Dsl.asyncHttpClient();//创建请求Request request = createRequest();//将网络请求交给HttpClient执行CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();//通过Quasar挂起协程Response response = AsyncCompletionStage.get(future);//获取网络结果后,通过future传递response并唤醒协程重新执行deal(response);
复制代码


过程可由下图表示。



Quasar 框架 AsyncCompletionStage.get 内部完成的工作相当于,在 HttpClient 返回的 future 上注册回调,回调的内容是“IO 操作完成后通知调度器唤醒协程”,这样将 NIO 异步回调全部操作封装在协程调度器中,用户代码看起来是同步等待的形式,避免了自行实现回调处理带来的繁琐,解决了前文所述的回调地狱。

2.2 声明挂起方法

Quasar 需要织入字节码接管挂起方法的调度,在项目主 pom 下添加 quasar-maven-plugin 插件,该插件将在编译后的 class 文件中修改字节码。


<plugin>    <groupId>com.vlkan</groupId>    <artifactId>quasar-maven-plugin</artifactId>    <version>0.7.9</version>    <executions>        <execution>            <goals>                <goal>instrument</goal>            </goals>        </execution>    </executions></plugin>
复制代码


Quasar 通过识别方法是否抛出了该框架定义的 SuspendExecution 异常决定是否修改字节码。Quasar 框架在 AsyncCompletionStage.get 方法上声明了 SuspendExceution 异常,该异常是捕获异常,但仅作为识别挂起方法的声明,在运行时不会实际抛出。使用者必须逐层抛出该异常直至新建协程的一层。当方法内部存在 try/catch 语句时,也必须抛出该异常。


public void startFiber() throws ExecutionException, InterruptedException {    Fiber<Void> fiber = new Fiber<Void>(() -> {        //不用继续抛出异常        Response response = waitNextLayer1();        deal(response);    }).start();}
private Response waitNextLayer1() throws SuspendExecution { return waitNextLayer2();}
private Response waitNextLayer2() throws SuspendExecution { CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture(); try { // Quasar框架工具类抛出SuspendExecution return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.3 异步 RPC 调用

目前主流的 RPC 框架都基于 NIO 实现,支持异步回调,有的 RPC 框架已经直接提供了返回 CompletableFuture 或 ListenableFuture(Guava 工具类提供)的异步接口,通过使用 ComplatableFuture,可以按前文类似的方法将 Quasar 与 RPC 框架结合起来。当 RPC 框架没有该返回类型时,一般会提供如下类似的带泛型的异步回调接口:


interface Callback<TResponse> {    void callback(TResponse TResponse, Exception e);}
复制代码


这种情况,可以使用者自己创建 ComplatableFuture,在回调中设置其状态,并调用 AsyncCompletionStage.get 等待这个 future。


CompletableFuture<Response> future=new CompletableFuture<>();//调用hello接口的异步APInew RpcClient().helloAsync(request, new Callback<Response>() {    public void callback(Response response, Exception e) {        if (e == null) future.complete(response);        else future.completeExceptionally(e);    }});//在此处调用Quasar的API,挂起直至RPC调用完成Response response = AsyncCompletionStage.get(future);
复制代码


上述代码依然具有异步回调不直观的缺点,通过 JDK8 的函数式接口可以实现一个通用的调用模板,将异步回调变为同步等待的形式。


@FunctionalInterfaceprivate interface RpcAsyncCall<TRequest, TResponse> {    void request(TRequest request, Callback<TResponse> callback);}public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {    CompletableFuture<TResponse> future = new CompletableFuture<>();
call.request(request, (response, e) -> { if (e == null) future.complete(response); else future.completeExceptionally(e); });
try { //使用Quasar等待Future结果 return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码


最后的调用可简化一行代码,该方法适用于所有该 Rpc 框架提供的异步接口。


Response response= waitRpc(new RpcClient()::helloAsync, request);
复制代码

2.4 阻塞操作的处理

Quasar 协程使用的时候有一定的限制,由于调度器线程池大小固定,在协程中不能阻塞线程,执行线程将被占用。对于某些暂时只能依靠阻塞 IO 的调用,如数据库,消息队列等,无法使用协程等待其结果,当这些阻塞操作量不大的情况下,可使用另一个可伸缩的线程池等待结果,避免对协程调度器的影响。


public void waitBlocking() throws SuspendExecution {    //从DB获取结果    String ans = waitBlocking(this::selectFromDB);}
private ExecutorService threadPool = Executors.newCachedThreadPool();
private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution { CompletableFuture<T> future = new CompletableFuture<>(); threadPool.submit(() -> { T ans = supplier.get(); future.complete(ans); });
try { return AsyncCompletionStage.get(future); } catch (Exception e) { return null; }}
复制代码

2.5 并发工具的使用

协程对并发锁的使用有比较大的限制,需要使用者理解线程锁与协程的调度机制。在 synchronized 同步块的内部,不能包含挂起协程的语句。当持有锁的协程挂起后会让出线程资源,由于锁的可重入性,另一个运行在同一个线程上的协程再加锁时同样会成功。另一方面,协程挂起后恢复执行时,也可能会在另一个线程上运行。出现两个线程操作共享资源的异常。同时未持有锁的线程释放时,会出现 IllegalMonitorStateException 异常。



但如果同步块的内部没有挂起协程的语句,则线程锁的机制仍然有效。线程的在执行过程中可能切换,而协程的调度在每个执行线程上是串行的,协程持有的锁在不包含挂起操作时,会在占用线程执行完毕直到退出同步块为止,不会发生锁失效的情况。


JDK 并发包中的工具可分为两类,一类是 Lock、Semaphore、CountDownLatch 等具有线程可重入性的工具,不能在未释放资源前使用挂起协程的操作,而另一类则是原子变量、并发容器等不会让出线程的工具,仍可正常使用,但要注意高并发的情况下锁的性能。此外,在使用并发工具的阻塞方法,如 await 时,可能导致协程的执行线程中发生阻塞。

三、总结

系统运行在 4 核心的主机上,线程池构成如下。



业务逻辑运行在 Quasar 的协程调度线程池中,线程池大小为 CPU 核数。HTTP 请求与 RPC 调用均通过内部的 NIO 线程池管理。此外定义了一个 core size 为 8 的可伸缩的线程池用于少量消息队列、DB 等阻塞 IO 的操作。其余的线程是系统中引入的其他组件所新建的线程,正常情况下不会成为系统性能的瓶颈。


改造后,在业务高峰流量激增数十倍的情况下线程数量依然稳定,而 CPU 利用率也从平均 5%以下提升至 10%-60%,在瞬时与高峰流量下能保持稳定。集群 CPU 核数在保留一定的业务冗余以应对业务高峰的情况下,缩减至 1/5。

3.1 限制与风险

Quasar 协程不是 Java 的语言标准,没有 JVM 层面的支持,使用时必须手动抛出异常声明每一个挂起方法,对代码有一定的侵入性。使用不当时,可能出现异常。


代码的 try/catch 时可能同时捕获 SuspendExecution 异常,从而忘记标记方法,此方法字节码不会被修改,结合 Quasar 的原理不难看出,当没有织入字节码时,挂起方法恢复执行,无法还原方法栈帧和执行状态,将会出现语句被重复执行、空指针等错误。运行时空指针、死循环的症状,排查的重点是是否漏加 SuspendExecution 标记。


在新线程而不是新协程中使用挂起方法时,会出现同样的问题。Thread 的构造方法中传入的是 Runnable 接口对象,其 run 方法没有声明 SuspendExecution 异常,run 内部的语句不会被织入字节码,造成上述异常。

3.2 总结与展望

协程使得 NIO 能够更好地应用在 Java 中,比回调方法更易读易维护。对系统的改造集中在底层通信封装和对方法的标记上,业务逻辑无需修改。虽然具有一定的代码侵入性和理解成本,但这种学习成本能逐渐被代码的可维护性优势抵消。


异步编程最佳的实现方式是:“Codes Like Sync,Works Like Async”,即以同步的方式编码,达到异步的效果与性能,兼顾可维护性与可伸缩性。OpenJDK 在 2018 年创建了 Loom 项目(https://wiki.openjdk.java.net/display/loom),目标是在 JVM 上实现轻量级的线程,并解除 JVM 线程与内核线程的映射。相信会给 Java 生态带来巨大的改变。


作者介绍


Ryan,携程 Java 开发工程师,对高并发、网络编程等领域有浓厚兴趣。


本文转载自公众号携程技术(ID:ctriptech)。


原文链接


携程基于Quasar协程的NIO实践


2020-09-03 10:043131

评论 2 条评论

发布
用户头像
https://github.com/puniverse/quasar/issues/350

quasar代码的最后一次提交还是在2018年,项目看起来已经废弃了
2022-02-25 12:04
回复
用户头像
c# 的await模型不香吗
2020-09-04 09:00
回复
没有更多了
发现更多内容

蓝凌生态OA,重新定义中大型企业数字化办公

科技怪咖

从工程预算到项目管理,『蓝凌低代码』让房企管理更简单

科技怪咖

学习 Go 语言数据结构:实现双链表

宇宙之一粟

数据结构 双向链表 8月月更

干货|为什么说开源基金会的选择很关键?(上)

Orillusion

开源 WebGL 渲染引擎 webgpu web3d

Docker下Prometheus和Grafana三部曲之三:自定义监控项开发和配置

程序员欣宸

Grafana Prometheus 8月月更

规范代码命名,让你的代码阅读起来更愉悦!

岛上码农

flutter 前端 移动端开发 跨平台开发 8月月更

高效率团队为啥都会选择Jenkins?一文带您了解Jenkins

wljslmz

持续集成 jenkins 8月月更

重学网络系列之(UDP)

自然

网络 8月月更

有人相爱,有人年少财务自由,有人数据结构都背不出来

浅羽技术

Java 数据结构 队列 红黑树 8月月更

leetcode 242. Valid Anagram 有效的字母异位词(简单)

okokabcd

LeetCode 算法与数据结构

C/C++size(),sizeof(),length(),strlen()对比分析详解

CtrlX

c c++ 进阶 热门活动 8月月更

Java集合之map集合

楠羽

#开源

重学网络系列之(Ping与网关)

自然

网络 8月月更

重学网络系列之(TCP)

自然

网络 8月月更

每日一R「15」实践课之 kv-server(一)

Samson

学习笔记 8月月更 ​Rust

头脑风暴:二叉搜索树的最小绝对差

HelloWorld杰少

算法 LeetCode 8月月更

Nexus 私服Prometheus+Grafana

CTO技术共享

Docker 端口映射重大安全漏洞

CTO技术共享

「美团 CodeM 资格赛」数码 详解

Five

c++ 算法题 8月月更

ISO文件怎么管?“筷子第一股”双枪科技教你1招!

科技怪咖

从 Multirepo 到 Monorepo 袋鼠云数栈前端研发效率提升探索之路

袋鼠云数栈

解析大型电商网站系统架构分层设计

穿过生命散发芬芳

网站架构 8月月更

【Python编程技巧】简单理解和使用Python中@property

迷彩

@PropertySource 8月月更 Python编程技巧

Zabbix 监控系统保姆及教程

CTO技术共享

React在实际开发中Variables与Prop的实战运用

恒山其若陋兮

8月月更

阿里云-建站小能手快速体验

凌云Cloud

阿里云 网站建设

负载均衡算法

源字节1号

程序员 软件开发

再深一点:如何给女朋友解释什么是微服务?

浅羽技术

微服务 微服务架构 单体架构 微服务框架 8月月更

华为云构建“好用的化工数字化”

IT资讯搬运工

Polkadot + DeFi | 透明公平、高效交易的去中心化金融未来可期

One Block Community

区块链 金融创新 defi 波卡生态

蓝凌“智慧云脑”,助力水务、燃气等集团服务民生

科技怪咖

携程基于Quasar协程的NIO实践_开源_Ryan_InfoQ精选文章