写点什么

Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

  • 2019-07-02
  • 本文字数:4045 字

    阅读完需:约 13 分钟

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了Event Sourcing的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot工程搭建

打开 http://start.spring.io/ 选择对应版本(这里是 2.1.5 )以及相应依赖,这里多选了一些之后会用到的服务:



添加配置文件:


yamlspring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
复制代码


为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加


搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:


添加 Axon 依赖:


  <!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->  <dependency>    <groupId>org.axonframework</groupId>    <artifactId>axon-spring-boot-starter</artifactId>    <version>4.1.1</version>    <exclusions>      <exclusion>        <groupId>org.axonframework</groupId>        <artifactId>axon-server-connector</artifactId>      </exclusion>    </exclusions>  </dependency>  <!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->  <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>27.1-jre</version>  </dependency>
复制代码


解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。


2、Axon 的配置


axon:  serializer:    general: jackson
复制代码


这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate


public interface ContractInterface {
Long getId();
@NotBlank String getName();
@NotBlank String getPartyA();
@NotBlank String getPartyB();}
@Getter@Setter@AllArgsConstructor@NoArgsConstructor@Aggregatepublic class ContractAggregate implements ContractInterface {
@AggregateIdentifier private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;}
复制代码


2、定义 commands


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractCommand {    @TargetAggregateIdentifier    private Long identifier;}
@Getter@Setter@NoArgsConstructorpublic class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@NoArgsConstructor@Getter@Setterpublic class DeleteContractCommand extends AbstractCommand { public DeleteContractCommand(Long identifier) { super(identifier); }}

复制代码


3、定义 events


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractEvent {
@TargetAggregateIdentifier private Long identifier;}

@Getter@Setter@NoArgsConstructorpublic class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@Getter@Setter@NoArgsConstructorpublic class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) { super(identifier); }}

复制代码


这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。


    @CommandHandler    public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {        if (null == command.getIdentifier()) {            command.setIdentifier(generator.getId());        }        AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);    }
@CommandHandler private void on(UpdateContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData); }
@CommandHandler private void on(DeleteContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData); }
@EventSourcingHandler private void on(ContractCreatedEvent event) { this.setIdentifier(event.getIdentifier()); this.onUpdate(event); }
@EventSourcingHandler private void onUpdate(ContractUpdatedEvent event) { this.setName(event.getName()); this.setPartyA(event.getPartyA()); this.setPartyB(event.getPartyB()); }
@EventSourcingHandler(payloadType = ContractDeletedEvent.class) private void on() { this.setDeleted(true); }
复制代码


  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。


@RestController@RequestMapping("/contracts")@AllArgsConstructorpublic class ContractController {
private final CommandGateway commandGateway;
@PostMapping public void createContract(@RequestBody @Valid CreateContractCommand command) { commandGateway.send(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); commandGateway.send(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { commandGateway.send(new DeleteContractCommand(id)); }}
复制代码


启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


2019-07-02 09:098815

评论 1 条评论

发布
用户头像
不错哦 DDD&CQRS&ES 大利器
2019-09-20 15:23
回复
没有更多了
发现更多内容

直播预告 | PolarDB-X 动手实践系列——基于 Prometheus + Grafana 的 PolarDB-X 监控体系

阿里云数据库开源

数据库 阿里云 开源 PolarDB-X 教学

明源云天际PaaS平台,构建零代码、低代码在线协同开发实践

科技热闻

两届获奖选手 手把手教你如何征战华为软件精英挑战赛

科技热闻

平行云CEO 李岩:CloudXR ,开启通往元宇宙的通道

阿里云弹性计算

XR 元宇宙

【直播回顾】参与文档贡献,开启OpenHarmony社区贡献

OpenHarmony开发者

OpenHarmony

照亮旷野的,是少年开发者眼中的炬火

脑极体

阿里云云原生一体化数仓入选 2022数博会“十佳大数据案例”

阿里云大数据AI技术

数据挖掘 大数据 分布式计算 数据处理 MaxCompute

10分钟弄懂云原生网络功能,快来瞧瞧!

VoltDB

云原生 云原生网络 网络功能

图分析的22种算法与图形理解

清林情报分析师

数据分析 知识图谱 图算法 图论 知识结构

netty系列之:在netty中使用UDP协议请求DNS服务器

程序那些事

Java Netty 程序那些事 5月月更

喜报|海泰方圆成功入选中国档案学会单位会员

电子信息发烧客

Hoo联合SwapAll发布赏金活动 用户可体验“救援任务”瓜分赏金奖池

区块链前沿News

SAP Hoo

JVM 线上问题定位实战(CPU 飙升)

Ayue、

JVM

OpenClusterManagement 开源之夏 2022 来了

阿里巴巴云原生

阿里云 云原生 开源之夏

实践GoF的设计模式:工厂方法模式

华为云开发者联盟

设计模式 工厂方法模式

基于信息检索和深度学习结合的单元测试用例断言自动生成

华为云开发者联盟

深度学习 单元测试 信息检索

漫画 | 新一代软件架构会影响到谁?

阿里巴巴云原生

阿里云 云原生 事件总线 EventBridge

Java开发规范(一)

DC.夜猫

开发 规范 开发规范 java

Python图像处理丨图像缩放、旋转、翻转与图像平移

华为云开发者联盟

Python 图像平移 图像缩放

一、KVM虚拟化的功能特性

穿过生命散发芬芳

kvm 5月月更

一个轻量的数据库数据告警器

山河已无恙

Java 数据监控

第二章 启航

Geek_古藤模根

图数据库实战 gremlin 入门 Gremlin

英特尔以“整合论”谋篇布局,加码数据中心

科技之家

相较国外代码托管平台 gitlab,咱们中国自己的代码托管平台有哪些优势?

阿里云云效

云计算 阿里云 代码管理 代码托管 代码安全

这道静态变量题,我居然考了0分

华为云开发者联盟

Java 静态变量 Java static

LinkedHashMap 源码分析-访问最少删除策略

zarmnosaj

5月月更

PolarDB-X迎来开源后首个重大版本升级,2.1版本新增5大特色功能

阿里云数据库开源

数据库 阿里云 开源 国产数据库 PolarDB-X

vue番茄钟&electron打包

空城机

Electron vue cli 5月月更

2022Gartner容器预测:2025年85%的企业将使用容器管理服务

York

容器 云原生 数字化转型

如何实现文档协作共享?

小炮

企业智能化转型meetup回顾|开源BI & AI助力企业转型之旅三阶段!

第四范式开发者社区

人工智能 开源 企业 大数据平台 智能化转型

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现_文化 & 方法_周国勇_InfoQ精选文章