Dubbo 压测插件 2.0 —— 基于普通 API 调用

2020 年 3 月 15 日

Dubbo 压测插件 2.0 —— 基于普通 API 调用

插件已开源,详见 gatling-dubbo:https://github.com/youzan/gatling-dubbo.git


上一篇《Dubbo压测插件的实现——基于Gatling》中,我们介绍了基于 Dubbo 泛化调用实现的 Gatling Dubbo 压测插件,使用泛化调用发起 Dubbo 压测请求,consumer 端不需要拿到 provider 端的 API 包,使用上很便利,但是众所周知,Dubbo 泛化调用的性能不如普通 API 调用,虽然可以优化并使之达到与普通 API 调用相近的性能,但仍存在一些局限性。生产中除了网关等特殊应用外,一般很少使用泛化调用,如果以泛化调用的性能来表征生产中普通 API 调用的性能,其压测结论很难令人信服。做压测的时候,一般要求各种条件如环境等都尽可能保持一致。所以,我们又开发了基于普通 API 调用的 Gatling Dubbo 压测插件,即 gatling-dubbo2.0。此外,依托于 Gatling 强大的基础能力, gatling-dubbo2.0 相比于 Jmeter 还存在以下几方面的优势:


  • 更强的场景编排能力,支持多场景同时编排,如仿真电商业务中同时存在普通下单、团购、秒杀等多种交易类型的场景

  • 支持设置场景内流量模型,如漏斗模型,仿真用户从商品浏览 -> 加入购物车 -> 下单 -> 支付过程中的各级转化率

  • 不需要安装额外插件,原生支持设置压力模型,如设置压测需要达到的目标 RPS,甚至逐级加压进行梯度压力测试

  • 更低的资源消耗,更高的并发能力


一、插件主要组成


Action 和 ActionBuild


执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类


Check 和 CheckBuild


校验部分,全链路压测中我们使用 json path 校验 HTTP 请求结果,这里我们实现了一样的校验方法,而且,对于一些不规范的返回结果(如返回了基本数据类型),还增加了自定义校验方法。CheckBuild 则为 DSL 使用 Check 的辅助类。


DSL


插件的领域特定语言,提供简单易用的 API 方便编写 Dubbo 压测脚本。



1.1 Action


DubboAction 包含了发起 Dubbo 请求、请求结果校验以及压力控制逻辑,需要扩展 Gatling 的 ExitableAction 并实现 execute 方法。


DubboAction 的入参 f 是一个函数,从压测脚本传入,函数负责组织 Dubbo 请求,从 session 中取值并动态构造请求参数。这一过程类似于使用 Jmeter 压测 Java 接口,即扩展 AbstractJavaSamplerClient。所以,gatling-dubbo 2.0 也支持非 dubbo 的其他 java 调用压测,因为 f 怎么写的控制权完全掌握在写压测脚本的人手里(本质上,远程调用和本地调用的客户端使用方式上并没有区别)。


所有虚拟用户以并发方式执行 execute 方法,每个用户又以异步方式执行 Dubbo 请求,且无论请求是否正确返回,都需要记录相应的成功或失败日志,失败可能是由于请求失败了,也可能是请求成功了,但是校验请求结果失败了。下一步就是准备发起新的 Dubbo 请求,如果开启了 Rps 阀门(throttled),则会根据当前的 Rps 和 Rps 阀门阈值动态调整发送请求的频率,在施压机(consumer)未达到性能瓶颈的情况下,可以很稳定的保持在设置的 Rps 目标值上进行压测。如果 Rps 阀门未开启,则直接发起新的 Dubbo 请求(通过 AKKA Message 触发)。


class DubboAction[A]( requestName:   Expression[String],           f:        (Session) => A,           val executor:   ExecutorService,           val objectMapper: ObjectMapper,           checks:      List[DubboCheck],           coreComponents:  CoreComponents,           throttled:    Boolean,           val next:     Action          ) extends ExitableAction with NameGen { ...... override def execute(session: Session): Unit = recover(session) {  requestName(session) map { reqName =>   val startTime = System.currentTimeMillis()   val fu = Future {    try {     f(session)    } finally {    }   }
fu.onComplete { case Success(result) => val endTime = System.currentTimeMillis() val resultJson = objectMapper.writeValueAsString(result) val (newSession, error) = Check.check(resultJson, session, checks) error match { case None => statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("OK"), None, None) throttle(newSession(session)) case Some(Failure(errorMessage)) => statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage)) throttle(newSession(session).markAsFailed) }
case UFailure(e) => val endTime = System.currentTimeMillis() statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage)) throttle(session.markAsFailed) } } }
private def throttle(s: Session): Unit = { if (throttled) { coreComponents.throttler.throttle(s.scenario, () => next ! s) } else { next ! s } }}
复制代码


DubboActionBuilder 负责创建线程池并实例化 DubboAction:


case class DubboActionBuilder[](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck], threadPoolSize: Int) extends ActionBuilder { override def build(ctx: ScenarioContext, next: Action): Action = {  import ctx._  val executor = Executors.newFixedThreadPool(threadPoolSize)  val objectMapper: ObjectMapper = new ObjectMapper()  new DubboAction[](requestName, f, executor, objectMapper, checks, coreComponents, throttled, next) }}
复制代码


LambdaProcessBuilder 提供了设置 check 条件的 DSL 和 设置线程池大小的 DSL:


有赞的施压机是 4 核 8Gb 内存的,我们为其设置的默认线程池大小为 200,与 Dubbo 应用部署环境一致。你可以使用 DSL threadPoolSize(threadPoolSize: Int) 按照你的机器配置设置一个合适的线程池大小。如果施压机成了性能瓶颈,你可以考虑将其改造成集群来施压,具体可参考《有赞全链路压测引擎的设计与实现》


case class DubboProcessBuilder[](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck] = Nil, threadPoolSize: Int = 200) extends DubboCheckSupport { def check(dubboChecks: DubboCheck*): DubboProcessBuilder[](checks = checks ::: dubboChecks.toList)
def threadPoolSize(threadPoolSize: Int): DubboProcessBuilder[](threadPoolSize = threadPoolSize)
def build(): ActionBuilder = DubboActionBuilder[](requestName, f, checks, threadPoolSize)}
复制代码


1.2 Check


全链路压测中,我们使用 json path 校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 json path 的校验方法:


package object dubbo { type DubboCheck = Check[String]
val DubboStringExtender: Extender[DubboCheck, String] = (check: DubboCheck) => check
val DubboStringPreparer: Preparer[String, String] = (result: String) => Success(result)}trait DubboJsonPathOfType { self: DubboJsonPathCheckBuilder[String] =>
def ofType[](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[](path, jsonParsers)}
object DubboJsonPathCheckBuilder { val CharsParsingThreshold = 200 * 1000
def preparer(jsonParsers: JsonParsers): Preparer[String, Any] = response => { if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson) jsonParsers.safeParseJackson(response) else jsonParsers.safeParseBoon(response) }
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = new DubboJsonPathCheckBuilder[](path, jsonParsers) with DubboJsonPathOfType}
class DubboJsonPathCheckBuilder[X: JsonFilter]( private[check] val path: Expression[String], private[check] val jsonParsers: JsonParsers)(implicit extractorFactory: JsonPathExtractorFactory) extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X]( DubboStringExtender, DubboJsonPathCheckBuilder.preparer(jsonParsers) ) { import extractorFactory._
def findExtractor(occurrence: Int) = path.map(newSingleExtractor[](_, occurrence)) def findAllExtractor = path.map(newMultipleExtractor[X]) def countExtractor = path.map(newCountExtractor)}
复制代码


但有时候存在一些不规范的情况,dubbo 接口的返回结果并不能直接转化为 json,如返回了基本数据类型,所以我们还提供了自定义校验方法,可以将这样的返回结果转化为 String 类型,并使用字符串比较、正则表达式匹配等方法校验返回结果:


case class DubboCustomCheck(func: String => Boolean, failureMessage: String = "Dubbo check failed") extends DubboCheck { override def check(response: String, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {  func(response) match {   case true => CheckResult.NoopCheckResultSuccess   case _  => Failure(failureMessage)  } }}
复制代码


DubboCheckSupport 则提供了 json pathcustom 两种检验方式的 DSL


trait DubboCheckSupport { def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =  DubboJsonPathCheckBuilder.jsonPath(path)
def custom = DubboCustomCheck}
复制代码


Dubbo 压测脚本中可以设置一个或多个 check 来校验请求结果


1.3 DSL


DubboDsl 提供顶层 DSL,隐式方法 dubboProcessBuilder2ActionBuilderScala 用于自动从 DubboProcessBuilder 构造 ActionBuilder


trait DubboDsl extends DubboCheckSupport { def dubbo[](requestName: Expression[String], f: (Session) => A) = DubboProcessBuilder[](requestName, f)
implicit def dubboProcessBuilder2ActionBuilder[](builder: DubboProcessBuilder[A]): ActionBuilder = builder.build()}
复制代码


二、示例


2.1 压测脚本示例


class Mix extends Simulation { val application = new ApplicationConfig() application.setName("gatling-dubbo")
// 初始化 AService val referenceAService = new ReferenceConfig[AService] referenceAService.setApplication(application) referenceAService.setUrl("dubbo://IP:PORT/com.xxx.service.AService") referenceAService.setInterface(classOf[AService]) val aService = referenceAService.get()
// 初始化 BService val referenceBService = new ReferenceConfig[BService] referenceBService.setApplication(application) referenceBService.setUrl("dubbo://IP:PORT/com.yyy.service.BService") referenceBService.setInterface(classOf[BService]) val bService = referenceBService.get()
// 设置数据源 val jsonFileFeeder = jsonFile("data.json").shuffle.circular val mixScenario = scenario("scenario of mix") .forever("tripsCount") { feed(jsonFileFeeder) .randomSwitch(11d -> exec( dubbo("com.xxx.service.AService.aMethod", fAMethod) .check(jsonPath("$.success").is("true")) ) ) .randomSwitch(4d -> exec( dubbo("com.yyy.service.BService.bMethod", fBMethod) .check(jsonPath("$.success").is("true")) ) ) .randomSwitch(5d -> exec( ...... ) ...... ) }
setUp(mixScenario.inject(constantUsersPerSec(100) during (10 seconds)).throttle(reachRps(1000) in (1 seconds), holdFor(120 seconds)))
// 设置 aMethod 的请求参数并调用 def fAMethod(session: Session): Object = { val aParam = new AParam() aParam.setName("A Name"); // 从 session 中获取动态参数并设置 aParam.setAId(session.attributes("aId").asInstanceOf[Integer].toLong); aService.aMethod(aParam); }
// 设置 bMethod 的请求参数并调用 def fBMethod(session: Session): Object = { val bParam = new BParam() bParam.setAge(26) // 从 session 中获取动态参数并设置 bParam.setBId(session.attributes("bId").asInstanceOf[Integer].toLong) bService.bMethod(bParam); }
def fXxx(session: Session): Object = { ...... }}
复制代码


randomSwitch 的作用

以上示例其实是 gatling-dubbo 在有赞的一个典型使用场景,即评估一个应用的单实例性能。按生产环境真实的接口调用比例请求各个接口(该比例由场景执行各个请求的概率分布模拟),这样的压测结果就可以真实反映生产环境应用的单实例性能,并为容量报警、生产扩容等提供参考依据。


2.2 压测数据示例


[ {  "aId": 160,  "bId": 859296 }, {  "aId": 160,  "bId": 1019040 }, {  "aId": 160,  "bId": 1221792 }, ......]
复制代码


压测数据使用 Json 数组保存,其中每一个 Json 对象都包含了一次压测请求所需的所有动态参数,且为了方便通过 session 设置动态参数,Json 对象中不再嵌套其他 Json 对象。


2.3 压测报告示例


1、应用基线性能评估,用于精准扩容:



2、中心化限流效果验证:



2020 年 3 月 15 日 20:20106

评论

发布
暂无评论
发现更多内容

2020.06.04,我在《架构师训练营》的学习历程:架构方法

罗祥

极客大学架构师训练营

C++:两百字三段代码解决函数返回局部变量问题

韩小非

c++ 函数栈调用 返回局部变量

分布式事务 - 分布式事务框架Seata

Java收录阁

分布式事务

如果我能找到工作,那么你也行

escray

TCP 半连接队列和全连接队列满了会发生什么?又该如何应对?

小林coding

Linux TCP 网络安全 计算机网络 网络协议

我是一个连地摊都不会摆的废人

Neco.W

创业 投机 投机者 地摊

别再说你不懂Linux内存管理了,10张图给你安排的明明白白

柠檬橙

Linux 后台开发

原创 | TDD工具集:JUnit、AssertJ和Mockito (十七)编写测试-标签和过滤

编程道与术

Java 编程 TDD 单元测试 JUnit

ARTS - Week 2

Khirye

ARTS 打卡计划 arts

机器学习算法评估指标——2D 目标跟踪

做技术BP的文案Gou

学习 2D 评估标准

别做误人子弟的「职业导师」

Tony Wu

职业成长 导师 教练

5G时代,如何彻底搞定海量数据库的设计与实践

奈学教育

海量数据库的设计与实践

六处提及区块链!海南自贸港区块链产业应用先行,与“币”划清界限

CECBC区块链专委会

区块链技术 海南方案 严控 产业

判例学习(一)梨视频诉字节跳动帮助侵权二审判决

尹晓铁

学习 读书笔记 互联网 知识产权 法律

Silicon Labs Gecko bootloader 简介

taox

zigbee bootlaoder

使用ADMT和PES实现window AD账户跨域迁移-介绍篇

Young先生

windows AD ADMT PES 迁移

hexo博客系统的实现原理与搭建

音视频专家-李超

Hexo 博客

大数据中台之Kafka,到底好在哪里?

奈学教育

kafka

白话说流——什么是流,从批认识流(一)

KAMI

大数据 flink 流计算

极客大学架构师训练营 听课总结 -- 第一课

John(易筋)

极客时间 架构 极客大学 架构师 极客大学架构师训练营

地摊经济一千年:从《韩熙载夜宴图》到木屋烧烤“撸串”

夜来妖

产品经理 商业 新闻动态 新基建 地摊

不到100行代码的iOS组件化你怕了么?

毒手疯波

ios 组件化 url scheme scheme

分布式架构,刚性事务-2PC必须注意的问题及3PC详细解

奈学教育

分布式架构 2PC注意事项 3PC详解

万字总结——反射(框架之魂)

学习Java的小姐姐

Java 反射 Java 25 周年

python3.8.3安装ipython和jupyter

LinkPwd

python3.x Jupyter Notebook

【写作群星榜】5.29~6.4写作平台优秀作者&文章排名

InfoQ写作平台

写作平台 排行榜

centos6.9开机启动服务说明

唯爱

强烈安利第一个画图工具!

我是程序员小贱

高效工作 高效

预告|2020中国CRM品牌测评报告

人称T客

NIO 看破也说破(五): 搞,今天就搞,搞懂Buffer

小眼睛聊技术

Java 学习 读书笔记 架构 后端

HTML5 && CSS

shirley

html5 css3

Dubbo 压测插件 2.0 —— 基于普通 API 调用-InfoQ