50万奖金+官方证书,深圳国际金融科技大赛正式启动,点击报名 了解详情
写点什么

Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

  • 2019-07-02
  • 本文字数:4045 字

    阅读完需:约 13 分钟

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了Event Sourcing的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot工程搭建

打开 http://start.spring.io/ 选择对应版本(这里是 2.1.5 )以及相应依赖,这里多选了一些之后会用到的服务:



添加配置文件:


yamlspring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
复制代码


为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加


搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:


添加 Axon 依赖:


  <!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->  <dependency>    <groupId>org.axonframework</groupId>    <artifactId>axon-spring-boot-starter</artifactId>    <version>4.1.1</version>    <exclusions>      <exclusion>        <groupId>org.axonframework</groupId>        <artifactId>axon-server-connector</artifactId>      </exclusion>    </exclusions>  </dependency>  <!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->  <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>27.1-jre</version>  </dependency>
复制代码


解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。


2、Axon 的配置


axon:  serializer:    general: jackson
复制代码


这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate


public interface ContractInterface {
Long getId();
@NotBlank String getName();
@NotBlank String getPartyA();
@NotBlank String getPartyB();}
@Getter@Setter@AllArgsConstructor@NoArgsConstructor@Aggregatepublic class ContractAggregate implements ContractInterface {
@AggregateIdentifier private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;}
复制代码


2、定义 commands


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractCommand {    @TargetAggregateIdentifier    private Long identifier;}
@Getter@Setter@NoArgsConstructorpublic class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@NoArgsConstructor@Getter@Setterpublic class DeleteContractCommand extends AbstractCommand { public DeleteContractCommand(Long identifier) { super(identifier); }}

复制代码


3、定义 events


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractEvent {
@TargetAggregateIdentifier private Long identifier;}

@Getter@Setter@NoArgsConstructorpublic class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@Getter@Setter@NoArgsConstructorpublic class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) { super(identifier); }}

复制代码


这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。


    @CommandHandler    public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {        if (null == command.getIdentifier()) {            command.setIdentifier(generator.getId());        }        AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);    }
@CommandHandler private void on(UpdateContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData); }
@CommandHandler private void on(DeleteContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData); }
@EventSourcingHandler private void on(ContractCreatedEvent event) { this.setIdentifier(event.getIdentifier()); this.onUpdate(event); }
@EventSourcingHandler private void onUpdate(ContractUpdatedEvent event) { this.setName(event.getName()); this.setPartyA(event.getPartyA()); this.setPartyB(event.getPartyB()); }
@EventSourcingHandler(payloadType = ContractDeletedEvent.class) private void on() { this.setDeleted(true); }
复制代码


  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。


@RestController@RequestMapping("/contracts")@AllArgsConstructorpublic class ContractController {
private final CommandGateway commandGateway;
@PostMapping public void createContract(@RequestBody @Valid CreateContractCommand command) { commandGateway.send(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); commandGateway.send(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { commandGateway.send(new DeleteContractCommand(id)); }}
复制代码


启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


2019-07-02 09:099030

评论 1 条评论

发布
用户头像
不错哦 DDD&CQRS&ES 大利器
2019-09-20 15:23
回复
没有更多了
发现更多内容

直播带货app源码搭建中,直播CDN的原理是什么?

开源直播系统源码

软件开发 直播带货 直播系统 app源码

5 个关于 NFT 的技术漏洞

devpoint

区块链 以太坊 NFT 6月月更

大一学生课设c——服装管理系统

工程师日月

6月月更

开发增效利器—2022年VsCode插件分享

中原银行

ide vscode 插件 中原银行 降本增效

Kafka ETL 之后,我们将如何定义新一代实时数据集成解决方案?

tapdata

kafka ETL 数据集成 实时数据 DaaS

Linux开发_摄像头编程(实现拍照、网页监控功能)

DS小龙哥

6月月更

【云舟说直播间】-数字安全专场明天下午正式上线

云计算

消息队列的丢失、重复与积压问题

Damon

6月月更

《Java编程思想》作者Bruce Eckel新作,到底做了哪些升级?

图灵教育

Java

quarkus+saas多租户动态数据源切换实现简单完美

weir威尔

SaaS 多租户 Quarkus 动态数据源

如何使用 Django Forms 创建表单?

海拥(haiyong.site)

Python django 6月月更

Rancher 2.6 全新 Monitoring 快速入门

Rancher

Kubernetes k8s rancher

“芯”有灵“蜥”,万人在线!龙蜥社区走进 Intel MeetUp 精彩回顾

OpenAnolis小助手

开源 直播 Meetup 龙蜥社区 走进 Intel

使用Mycat进行MySQL单库分表

迷彩

架构 运维 mycat 分布式数据库中间件 6月月更

实战监听Eureka client的缓存更新

程序员欣宸

Java SpringCloud 6月月更

坚持五件事,带你走出迷茫困境!

博文视点Broadview

不止于观测|阿里云可观测套件正式发布

阿里巴巴云原生

阿里云 云原生 可观测 套件

java培训 | Java设计模式之装饰者设计模式

@零度

JAVA开发

想学习eTS开发?教你开发一款IQ-EQ测试应用

HarmonyOS开发者

HarmonyOS

得物多活架构设计之路由服务设计

得物技术

架构 高可用 架构设计 双活 路由

NodeJS 防止xss攻击

德育处主任

Node 6月月更

如何用 Redis 实现一个分布式锁

Ayue、

redis 分布式锁

Vone新闻 | 旺链科技赋能众享链网自组织管理,打造企业级联盟DAO

旺链科技

区块链 产业区块链 DAO 自组织协作

并购增资或将有望启动东软越通新动能?

E科讯

java程序员培训 | Java设计模式之桥接模式

@零度

设计模式 JAVA开发

运行时应用自我保护(RASP):应用安全的自我修养

SEAL安全

RASP

攻防演练合集 | 3个阶段,4大要点,蓝队防守全流程纲要解读

青藤云安全

网络安全 网络攻防 安全服务 攻防演练

大数据培训 | Flink如何监控恶意登录

@零度

大数据

成熟的知识管理,应具备哪些条件?

小炮

DevEco Device Tool 助力OpenHarmony设备开发

OpenHarmony开发者

OpenHarmony

Angular 服务器端渲染应用一个常见的内存泄漏问题

汪子熙

typescript 前端开发 angular Spartacus 6月月更

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现_文化 & 方法_周国勇_InfoQ精选文章