Dubbo 压测插件已开源,本文涉及代码详见 gatling-dubbo(点击阅读原文)
Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。
插件主要结构
实现 Dubbo 压测插件,需实现以下四部分内容:
Protocol 和 ProtocolBuild
协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类
Action 和 ActionBuild
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类
Check 和 CheckBuild
检查部分,全链路压测中我们都使用 JsonPath
检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类
DSL
Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致
Protocol
协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:
如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。
object DubboProtocol {
val DubboProtocolKey = new ProtocolKey {
type Protocol = DubboProtocol
type Components = DubboComponents
def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]
def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol")
def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = {
dubboProtocol => DubboComponents(dubboProtocol)
}
}
}
case class DubboProtocol(
protocol: String, //dubbo
generic: String, //泛化调用?
url: String, //use url or
registryProtocol: String, //use registry
registryAddress: String //use registry
) extends Protocol {
type Components = DubboComponents
}
复制代码
为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:
case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents {
def onStart: Option[Session => Session] = None
def onExit: Option[Session => Unit] = None
}
复制代码
以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。
object DubboProtocolBuilderBase {
def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol)
}
case class DubboProtocolBuilderGenericStep(protocol: String) {
def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic)
}
case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) {
def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)
}
case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) {
def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)
}
case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) {
def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)
}
case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) {
def build = DubboProtocol(
protocol = protocol,
generic = generic,
url = url,
registryProtocol = registryProtocol,
registryAddress = registryAddress
)
}
复制代码
Action
DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。
DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}
、 ${args_values}
这样的表达式从数据 Feeder 中解析对应字段的值。
execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。
异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。
为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考 dubbo 泛化调用性能优化。
class DubboAction(
interface: String,
method: String,
argTypes: Expression[Array[String]],
argValues: Expression[Array[Object]],
genericService: GenericService,
checks: List[DubboCheck],
coreComponents: CoreComponents,
throttled: Boolean,
val objectMapper: ObjectMapper,
val next: Action
) extends ExitableAction with NameGen {
override def statsEngine: StatsEngine = coreComponents.statsEngine
override def name: String = genName("dubboRequest")
override def execute(session: Session): Unit = recover(session) {
argTypes(session) flatMap { argTypesArray =>
argValues(session) map { argValuesArray =>
val startTime = System.currentTimeMillis()
val f = Future {
try {
genericService.$invoke(method, argTypes(session).get, argValues(session).get)
} finally {
}
}
f.onComplete {
case Success(result) =>
val endTime = System.currentTimeMillis()
val resultMap = result.asInstanceOf[JMap[String, Any]]
val resultJson = objectMapper.writeValueAsString(resultMap)
val (newSession, error) = Check.check(resultJson, session, checks)
error match {
case None =>
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None)
throttle(newSession(session))
case Some(Failure(errorMessage)) =>
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
throttle(newSession(session).markAsFailed)
}
case FuFailure(e) =>
val endTime = System.currentTimeMillis()
statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
throttle(session.markAsFailed)
}
}
}
}
private def throttle(s: Session): Unit = {
if (throttled) {
coreComponents.throttler.throttle(s.scenario, () => next ! s)
} else {
next ! s
}
}
}
复制代码
DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:
case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder {
private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents =
protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._
val protocol = components(protocolComponentsRegistry).dubboProtocol
//Dubbo客户端配置
val reference = new ReferenceConfig[GenericService]
val application = new ApplicationConfig
application.setName("gatling-dubbo")
reference.setApplication(application)
reference.setProtocol(protocol.protocol)
reference.setGeneric(protocol.generic)
if (protocol.url == "") {
val registry = new RegistryConfig
registry.setProtocol(protocol.registryProtocol)
registry.setAddress(protocol.registryAddress)
reference.setRegistry(registry)
} else {
reference.setUrl(protocol.url)
}
reference.setInterface(interface)
val cache = ReferenceConfigCache.getCache
val genericService = cache.get(reference)
val objectMapper: ObjectMapper = new ObjectMapper()
new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)
}
}
复制代码
LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL:
case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport {
def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes)
def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues)
def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList)
def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks)
}
复制代码
Check
全链路压测中,我们都使用 JsonPath
校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 JsonPath
的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:
package object dubbo {
type DubboCheck = Check[String]
val DubboStringExtender: Extender[DubboCheck, String] =
(check: DubboCheck) => check
val DubboStringPreparer: Preparer[String, String] =
(result: String) => Success(result)
}
复制代码
基于 JsonPath
的校验逻辑:
trait DubboJsonPathOfType {
self: DubboJsonPathCheckBuilder[String] =>
def ofType[](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[](path, jsonParsers)
}
object DubboJsonPathCheckBuilder {
val CharsParsingThreshold = 200 * 1000
def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
response => {
if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
jsonParsers.safeParseJackson(response)
else
jsonParsers.safeParseBoon(response)
}
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
new DubboJsonPathCheckBuilder[](path, jsonParsers) with DubboJsonPathOfType
}
class DubboJsonPathCheckBuilder[X: JsonFilter](
private[check] val path: Expression[String],
private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
DubboStringExtender,
DubboJsonPathCheckBuilder.preparer(jsonParsers)
) {
import extractorFactory._
def findExtractor(occurrence: Int) = path.map(newSingleExtractor[](_, occurrence))
def findAllExtractor = path.map(newMultipleExtractor[X])
def countExtractor = path.map(newCountExtractor)
}
复制代码
DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL:
trait DubboCheckSupport {
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
DubboJsonPathCheckBuilder.jsonPath(path)
}
复制代码
Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法
DSL
trait AwsDsl
提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。
此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法。
trait DubboDsl extends DubboCheckSupport {
val Dubbo = DubboProtocolBuilderBase
def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method)
implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build
implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build()
def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = {
session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))
.set(argValueName, toArray(session(argValueName).as[JList[Any]]))
}
private def toArray[](value: JList[T]): Array[T] = {
value.asScala.toArray
}
}
object Predef extends DubboDsl
复制代码
Dubbo 压测脚本和数据 Feeder 示例
压测脚本示例:
import io.gatling.core.Predef._
import io.gatling.dubbo.Predef._
import scala.concurrent.duration._
class DubboTest extends Simulation {
val dubboConfig = Dubbo
.protocol("dubbo")
.generic("true")
//直连某台Dubbo机器,只单独压测一台机器的水位
.url("dubbo://IP地址:端口")
//或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心
.registryProtocol("")
.registryAddress("")
val jsonFileFeeder = jsonFile("data.json").circular //数据Feeder
val dubboScenario = scenario("load test dubbo")
.forever("repeated") {
feed(jsonFileFeeder)
.exec(session => transformJsonDubboData("args_types1", "args_values1", session))
.exec(dubbo("com.xxx.xxxService", "methodName")
.argTypes("${args_types1}")
.argValues("${args_values1}")
.check(jsonPath("$.code").is("200"))
)
}
setUp(
dubboScenario.inject(atOnceUsers(10))
.throttle(
reachRps(10) in (1 seconds),
holdFor(30 seconds))
).protocols(dubboConfig)
}
复制代码
data.json 示例:
[
{
"args_types1": ["com.xxx.xxxDTO"],
"args_values1": [{
"field1": "111",
"field2": "222",
"field3": "333"
}]
}
]
复制代码
Dubbo 压测报告示例
评论