写点什么

通过 CQRS/ 事件溯源框架 Reveno 实现高负载交易事务处理

  • 2016-06-14
  • 本文字数:8467 字

    阅读完需:约 28 分钟

在当今世界,事务的处理每时每刻都在发生,其范围包括使用关系型数据库进行订购处理的个体零售网站,乃至每秒进行 10 多万次处理的实时交易系统。

Reveno 是一个全新的无锁事务处理框架,它支持 JVM 平台,并基于 CQRS 及事件溯源模式实现。虽然它只是一个简单而强大的工具,但在性能方面也毫不逊色。所有事务都被持久化为只读的日志,并且可以通过按顺序重演事件的方式恢复领域模型的最新状态。所有的运行时操作都是在内存中执行的,从而使其吞吐量可达到每秒种几百万次事务的数量级,平均延迟时间则限制在微秒级。虽然 Reveno 的功能如此强大,但它仍然是一个通用目的的框架,涵盖了大量不同种类的用例,并提供了丰富的引擎配置选项。举例来说,你可以调整它的持久性配置,在极其随意(为了提高系统吞吐量)与高度受限(对于数据丢失的情况具有较低的容忍度)之间进行选择。

对于不同的用例来说,其需求也是千差万别的。作为一个通用框架,它需要考虑所有不同的可能性。在本文中,我将通过示例为读者展现如何使用 Reveno 框架开发一个简单的交易系统。

首先让我们来了解一下 Reveno 如何处理你的领域模型。作为一个基于 CQRS 的框架,Reveno 会将你的领域切分为 _ 事务模型与查询模型 _。对这两个模型的定义没有任何限制,你无需使用任何强制性的注解、基类,甚至不需要实现 Serializable 接口,只需使用最简单的 POJO 就可以完成任务。

没有任何一种单一的途径能够应对所有不同的用例,因此应用程序的设计者需要决定如何处理事务模型的回滚操作。Reveno 为实现模型对象提供了两种主要方式:可变与不可变模型。这两种方式各有不同的底层处理机制,并且各有其优缺点。在 Java 中,不可变对象的开销很低,并且无需进行同步操作,这种方式目前非常流行。Reveno 能够非常高效地处理不可变对象,但每种使用不可变类型的领域都会产生额外的垃圾对象,Reveno 也不例外。因此,如果你能够接受一些额外的GC 开销,那么就应当将这种方式作为默认的选择。反之,如果使用可变模型,那么在进行事务运行时,就要为已使用的对象生成额外的序列化快照,而这将影响到你的性能。幸运的是,如果你坚持使用可变模型,并且仍需要保证性能的最大化及GC 影响的最小化,那么你可以选择一种额外的可变模型特性,“补偿行为”(Compensating Actions)。简单来说,补偿行为是指在实现常规的事务处理函数时,一并实现的一种手动回滚行为。如果读者想了解这方面的更多细节,请参考 Reveno 的官方文档页面

现在,我们已经设计好了一些基本规则,那么让我们进行一些实际的编码工作吧。在我们所设计的交易系统中存在大量的帐户,每个帐户可以有零至多个订单。我们必须提供各种维护性操作,例如帐户的创建和订单的处理等等。在实际应用中,这种类型的系统可能需要应对大量的负载,例如每秒种处理 10 个直至 50 万个事务,甚至更多。更复杂的是,此类系统对于延迟非常敏感,而频繁出现的负载峰值可能会直接造成财务损失。

安装

如果你正在使用一些流行的构建工具,例如 Maven、Gradle、Sbt 等等,那么你可以在 Maven Central 中加入一个 Reveno 的依赖。目前为止,共有 3 个可用的库能够选择:

  • reveno-core —— 包括所有 Reveno 核心功能包,负责引擎的初始化和事务处理等等。
  • reveno-metrics —— 这个库中的包负责从运行中的引擎中收集各种指标,并将这些指标传递给 Graphite、Slf4j 等工具。
  • reveno-cluster —— 支持在主 - 从架构的集群中运行 Reveno,以提供故障转移的能力。

你可以在 Reveno 安装页面中找到完整的安装指南与示例。

定义事务模型

让我们首先从领域模型的定义开始我们的开发过程。正如我们之前所说,领域模型将通过简单的 POJO 进行创建。我个人倾向于使用不可变对象,因为这将大大简化整个工作。它们不仅能够绕开各种并发问题,并且最重要的一点在于,由于不需要保留已访问对象的快照,因此它在 Reveno 中有非常出色的性能表现。Reveno 允许我们直接使用不可变对象(可以说,Reveno 也成为了一个帮助我们学习如何在常规的 Java 应用中处理不可变性的优秀教程)。

让我们首先定义一个表现系统中典型的交易帐户的实体(为了简单起见,我们将实例变量都定义为 public,但在实际应用中并不存在这种限制):

复制代码
public class TradeAccount {
public final long id;
public final long balance;
public final String currency;
private final LongSet orders;
public TradeAccount(long id, String currency) {
this(id, 0, currency, new LongOpenHashSet());
}
private TradeAccount(long id, long balance,
String currency, LongSet orders) {
this.id = id;
this.balance = balance;
this.currency = currency;
this.orders = orders;
}
public LongSet orders() {
return new LongOpenHashSet(orders);
}
}

正如我们所见,这个类是不可变的。但这种值对象并不具备任何功能,往往因此被人称为“贫血”对象。因此,更好的方式是让TradeAccount 类能够实现一些实用的功能,例如处理订单以及进行货币计算:

复制代码
public class TradeAccount {
public final long id;
public final long balance;
public final String currency;
private final LongSet orders;
public TradeAccount(long id, String currency) {
this(id, 0, currency, new LongOpenHashSet());
}
private TradeAccount(long id, long balance,
String currency, LongSet orders) {
this.id = id;
this.balance = balance;
this.currency = currency;
this.orders = orders;
}
public TradeAccount addBalance(long amount) {
return new TradeAccount(id, balance + amount, currency, orders);
}
public TradeAccount addOrder(long orderId) {
LongSet orders = new LongOpenHashSet(this.orders);
orders.add(orderId);
return new TradeAccount(id, balance, currency, orders);
}
public TradeAccount removeOrder(long orderId) {
LongSet orders = new LongOpenHashSet(this.orders);
orders.remove(orderId);
return new TradeAccount(id, balance, currency, orders);
}
public LongCollection orders() {
return new LongOpenHashSet(orders);
}
}

现在,这个类就变得非常实用了。在开始讲述实际的订单处理细节之前,首先要说明一下 Reveno 如何使用它的事务模型。所有的实体都会保存在某个 repository 中,任何类型的处理函数都可以访问该 repository(我们稍后将对此进行详细地讲解)。这些实体相互之前通过 ID 进行引用,并通过 ID 在 repository 中进行访问。由于内部的性能优化机制,所有的 ID 都限制为 long 类型。

Order 类的定义也与之类似,为了简便起见,我们将忽略这部分源代码。不过,你可以在 GitHub 上下载完整的示例代码,并在本文的末尾找到更多的链接。

定义查询模型

我们已经简单地探索了如何创建 Reveno 中的事务模型。从逻辑上说,查询功能的定义也同样关键。在 Reveno 中,查询是通过“视图”的定义而创建的,每个视图都表现了事务模型中的某些实体。除了定义视图类之外,你还应当为每种视图类型提供映射器。我们稍后将对细节进行深入讲解。

当一个事务成功地完成之后,Reveno 将对所改变的实体进行映射操作,以保证视图的更新发生在命令完成之前。在默认情况下,Reveno 中的查询模型是保存在内存中的。让我们为TradingAccount 类定义一个视图:

复制代码
public class TradeAccountView {
public final double balance;
public final Set<OrderView> orders;
public TradeAccountView(double balance, Set<OrderView> orders) {
this.balance = balance;
this.orders = orders;
}
}

TradingAccountView 类中还包括其他种类视图(在这个示例中对应着 OrderView)的一个集合,在进行查询、序列化、JSON 格式转换等操作时,这种方式能够带来很大的便利。Reveno 映射器支持多种实用的方法,以简化将 ID 的集合映射到视图的集合等操作。我们稍后将进行一些实际操作。

定义命令与事务行为

为了在 Reveno 中执行事务,我们必须首先执行一个“命令”对象。命令对象本身可以是一个简单的 POJO,它需要在系统中注册一个特定的处理函数。通常来说,命令将用于执行某些聚合与校验逻辑,以只读方式访问 repository。但最重要的是,命令需要履行它的职责,以发送各种“事务行为”(因此命令也被称为“状态转变器”)。

事务行为是用于在领域模型中进行状态改变的组件,它通过对 repository 的读 - 写访问以执行。事务行为对象本身可以表现为一个 POJO,并在系统中注册对应的处理函数。所有行为组合在一起成为一个单一的原子性事务,它包含在当前所执行命令的范围内。在成功执行完成之后,事务行为将被持久化至底层的存储引擎中,并且在重启或发生任何故障之后重演其状态。

在这个交易系统中,我们需要创建新的交易帐户,并设定初始的余额。与之前的做法一样,我们首先要定义一个事务命令:

复制代码
public class CreateAccount {
public final String currency;
public final double initialBalance;
public CreateAccount(String currency, double initialBalance) {
this.currency = currency;
this.initialBalance = initialBalance;
}
public static class CreateAccountAction {
public final CreateAccount info;
public final long id;
public CreateAccountAction(CreateAccount info, long id) {
this.info = info;
this.id = id;
}
}
}

我们实际上共创建了两个类。CreateAccount 作为命令,CreateAccountAction 则作为事务行为。通常来说,这种切分方式并非强制性的。如果命令与事务行为数据完全匹配,那么你可以放心地重用同一个类。但在这个示例中,我们所获取的货币数值类型为 double(例如来自于某些遗留的终端系统),而在内部引擎中,货币的数值将保存为 long,以确保其精确度能够完全匹配。

现在,我们就可以初始化一个 Reveno 引擎,并定义命令与事务行为的处理函数了:

复制代码
Reveno reveno = new Engine(pathToEngineFolder);
reveno.domain().command(CreateAccount.class, long.class, (c, ctx) -> {
long accountId = ctx.id(TradeAccount.class);
ctx.executeTxAction(new CreateAccount.CreateAccountAction(c, accountId));
if (c.initialBalance > 0) {
ctx.executeTxAction(new ChangeBalance(
accountId, toLong(c.initialBalance)));
}
return accountId;
});
reveno.domain().transactionAction(CreateAccount.CreateAccountAction.class,
(a, ctx) -> ctx.repo().store(a.id,
new TradeAccount(a.id, a.info.currency)));
reveno.domain().transactionAction(ChangeBalance.class,
(a, ctx) -> ctx.repo().
remap(a.accountId, TradeAccount.class,
(id, e) -> e.addBalance(a.amount))
);

这段代码包括的内容很多,让我们仔细分析一下。首先,我们定义了一个 CreateAccount 命令处理函数,它负责生成下一个帐户 ID,并执行了一个事务命令以进行帐户的创建。如果在定义时传入了初始的余额,则还需执行 ChangeBalance 这个事务行为。需要指出的是,ctx.executeTxAction 这个方法调用不会阻塞。当命令处理函数成功完成之后,所有事务行为都会在一个单一的进程中执行。因此,如果需要在任何一个 TxAction 处理函数中进行回滚操作,那么这些事务行为所生成的改动都将被顺利回滚(实际的回滚机制是基于事务模型等实现的)。

将实体映射至查询模型

由于事务模型与查询模型是分离的,因此我们需要定义一些映射器,将实体转换为对应的视图表现。不过,我们并不需要在代码中明确地调用这些映射方法,因为 Reveno 会自动发现 repository 中的“脏”实体,并调用相应的映射方法。让我们看看 TraceAccount 是如何映射到 TradeAccountView 的:

复制代码
reveno.domain().viewMapper(TradeAccount.class,
TradeAccountView.class, (id,e,r) ->
new TradeAccountView(fromLong(e.balance),
r.linkSet(e.orders(), OrderView.class)));

这段代码中的 id 指代实体的标识符,e 指代实体本身,而 r 指代一个特殊的映射上下文,其中包含各种实用的方法。实际完成映射工作的是 r.linkSet(…) 方法,它将以延迟的方式将 ID 指针的集合映射至实际视图的集合。

我们可以通过相同的方式定义 Order 至 OrderView 之间的映射:

复制代码
reveno.domain().viewMapper(Order.class, OrderView.class, (id,e,r) ->
new OrderView(fromLong(e.price), e.size, e.symbol,
r.get(TradeAccountView.class, e.accountId)));

正如你所见,我们的查询模型也是通过不可变对象组成的,这一点与事务模型中的实体一样,它将极大地简化映射逻辑。再次强调,虽然这一点并非强制约束,但如若不然,则我们必须自行负责映射逻辑的正确性。

执行命令

Reveno 中的事务处理操作默认就是异步的。当你在一个运行中的引擎中执行某个命令时,该方法调用将立即返回一个 CompletableFuture 对象,最终将在将来某一时刻返回结果。Reveno 在内部定义了一个具有多个阶段的“管道”,每个阶段将处理各自的线程。在这些管道中如果选择逐个传递对象会产生很大的消耗,因此在这里就可以使用批处理方式。在高负载情况下,Reveno 会在每个阶段处理进行批处理。正因为如此,Reveno 在一开始就为系统提供了高吞吐能力。

在完成了所有定义与业务逻辑实现之后,我们可以开始使用这个引擎了。首先我们需要启动它:

复制代码
reveno.startup();

随后,我们就可以在系统中创建一个新的交易帐户了。你应该留意一点,executeCommand() 方法也存在一个同步的版本,它对于测试以及编写示例非常有用:

复制代码
long accountId = reveno.executeSync(new CreateAccount("USD", 5.15));

在这个示例中,Reveno 内部将调用相应的命令以及事务行为处理函数,后者将为你创建一个新的美元帐户,并将初始余额设为 5.15 美元。我们可以通过以下方式检查它的正确性:

复制代码
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).balance);

这段代码将打印出“5.15”。为了让这个示例看起来更有趣,让我们为这个帐户添加一个新的订单:

复制代码
long orderId = reveno.executeSync(
new MakeOrder(accountId, "EUR/USD", 1, 1.213));

我们在这里创建了一个以 1.213 美元购买 1 欧元的新订单。随后,我们可以再次检查帐户信息,以了解其中的变化:

复制代码
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).orders.size());

这一次所打印的结果是“1”,这表示这个帐户有一个未完成的订单。最后,让我们关闭这个订单,完成这个以美元购买欧元的操作,它将会使帐户中的余额减少 1.213,最终余额为 3.937 美元。

复制代码
reveno.executeSync(new ExecuteOrder(orderId));
// the balance is expected to be 3.937, after order successfully executed
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).balance);

持久化

正如我们在介绍部分所说的一样,Reveno 首先是一个事务处理框架。你可以在同一个目录下重启你的引擎,而仍然能看到模型的最新状态。我们尽可能使该框架的每个部分都具有可配置性,而一点与持久性同样相关。你可以通过调用 reveno.config() 方法查看所有的可选项。

发布事件

Reveno 本身自带一个事件处理子系统。你的任务就是为它定义自己的事件类(通常来说定义为 POJO)以及处理函数,并发布这些事件以处理事务行为。需要指出的重点在于,事件只有在命令执行成功之后才会被发布,并且完全是异步的。所有的视图映射过程都严格地发生于事件处理函数执行之前。

事件的执行结果同样会被持久化到存储引擎中,如果事件成功地完成了,那么在引擎重启之后通常也不会被再次处理。但这种行为并不能得到严格的保障,因此,如果你需要确保处理函数是 100% 幂等的,则应当检查 EventMetadata.isReplay 这个标记,在每个事件处理函数中都可以访问它。

让我们再次扩展一下这个示例,让它对于某个交易帐户中的余额变化事件进行发布与处理。首先,我们将定义这个事件,并添加适当的字段:

复制代码
public class BalanceChangedEvent {
public final long accountId;
public BalanceChangedEvent(long accountId) {
this.accountId = accountId;
}
}

当某个帐户的余额产生变化时,我们只需要了解帐户的 ID,因为我们可以在处理函数中查询相应的视图。我们将通过以下方法定义事件处理函数:

复制代码
reveno.events().eventHandler(BalanceChangedEvent.class, (e, m) -> {
TradeAccountView account = reveno.query().find(TradeAccountView.class,
e.accountId);
System.out.println(String.format(
"New balance of account %s from event is: %s",
e.accountId, account.balance));
});

相应地,我们也需要在 ChangeBalance 事务行为处理函数的定义中添加一行代码:

复制代码
reveno.domain().transactionAction(ChangeBalance.class, (a, ctx) -> {
ctx.repo().remap(a.accountId, TradeAccount.class,
(id, e) -> e.addBalance(a.amount));
// publish an event to all listeners
ctx.eventBus().publishEvent(new BalanceChangedEvent(a.accountId));
});

由于 ChangeBalance 这个事务行为出现在多个命令中,当添加了这段事件发布代码后,我们就会不断收到它的事件。还有一点需要注意,publishEvent 调用会立即返回,而事件的发布则是 _ 最终 _ 某一时刻才会发生的。最终,我们将看到以下输出:

New balance of account 1 from event is: 5.15

New balance of account 1 from event is: 3.937

性能检测

现在,整个示例已经可以运行了,那么让我们来看看这个应用能够处理怎样的负载。Reveno 提供了一个非常实用的reveno-metrics库,能够帮助你追踪某个运行中引擎的性能指标。与其他部分一样,这个指标库也经过了优化,包括堆外内存(off-heap memory)的使用以及无锁的代码,因此它对于整体性能所造成的影响非常小。它还支持与某些流行的监控系统进行集成,例如 Graphite。

(需要指出的是,reveno-metrics 总的来说是一个性能监控工具,而不是一个微基准测试框架。如果要获取准确的基准测试结果,可以考虑使用 JMH 或类似的工具。)

我们选择的环境 MacBook Pro 2.7 GHz i5 CPU,首先要在代码中对指标集合进行初始化,以使用 Reveno Slf4j 这个 sink ,随后重复运行 ChangeBalance 这个命令 4 千 5 百万次(包括用于预热的迭代):

  • reveno.instances.MAC-15_local.default.latency.mean: 68804
  • reveno.instances.MAC-15_local.default.latency.min: 775
  • reveno.instances.MAC-15_local.default.latency.max: 522265
  • reveno.instances.MAC-15_local.default.throughput.hits: 1183396

这些数字表示平均延迟时间约 69 微秒,最小值为 775 纳秒,而最高值则为 522 微秒,总吞量是每秒运行 1183396 次事务。考虑到后台所需完成的各种工作以及持久性级别,这一结果令人印象十分深刻。

结论

Reveno 框架目前才刚刚崭露头角,但它的发展十分迅速。你可以访问我们的官方网站以学习更多的相关知识。我们对于任何建议以及反馈都保持开发的态度。你也可以加入我们的 Google 讨论小组,在 Issues 页面中提交 bug,或是向 mailto:support@reveno.org 提交非公开问题或其他任何问题。

本文中所描述的 Demo 的完整代码可以在 GitHub 上找到,你还能够找到使用 Reveno 的各种示例(或者自行提交一个pull request)。

关于作者

Artem Dmitriev目前在 GetIntent 这家广告科技创业公司担任软件工程师,他的工作是交付大规模实时投标这方面需求的平台。近几年来,Artem 致力于在 JVM 平台上创建高负载的系统。他的工作背景包括为多市场交易平台开发核心引擎,这些平台通常对于延迟与吞吐量有很高的要求。Artem 对于开源软件及其开发充满热情,他热诚欢迎用户的反馈,可以通过 art.dm.ser@gmail.com 向他发送邮件。

查看英文原文 High Load Trading Transaction Processing with Reveno CQRS/Event Sourcing Framework

2016-06-14 19:217678
用户头像

发布了 428 篇内容, 共 180.3 次阅读, 收获喜欢 39 次。

关注

评论

发布
暂无评论
发现更多内容
通过CQRS/事件溯源框架Reveno实现高负载交易事务处理_Java_Artem Dmitriev_InfoQ精选文章