【AICon】 如何构建高效的 RAG 系统?RAG 技术在实际应用中遇到的挑战及应对策略?>>> 了解详情
写点什么

Event Sourcing 和 CQRS 落地(六):实现可靠消息

  • 2019-07-08
  • 本文字数:4431 字

    阅读完需:约 15 分钟

Event Sourcing 和 CQRS落地(六):实现可靠消息

在本系列的上一篇文章中,作者介绍了 Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法,本文将主要介绍如何实现可靠消息。

实现可靠消息

什么是可靠消息

微服务盛行的时代下,消息成为了不可缺少的组件,首先我们看一个例子,contract 系统创建了一个合同,然后发送创建合同的消息。看似简单,实际上分析一下它的出错可能性,会有以下几种:


  1. 创建合同成功,发送消息失败;

  2. 创建合同失败,发送消息成功;

  3. 创建合同成功,发送消息成功;

  4. 创建合同失败,发送消息失败;


同时成功或者同时失败,这个情况是一致的,是正确的,我们需要关心的就是不一致的情况。那么最简单的办法,就是让创建合同和发送消息成为一个事务,要么一起成功,要么一起失败,但是这么做的话耦合性太强,本身合同创建成功了,却因为消息发送的失败强制回滚。这个时候,可能就想到了存储消息数据,将合同创建和消息数据的存储作为一个事务,消息发送成功之后再去删除消息数据,定期去扫描未发送的消息数据,来保证消息的发送。但是这么做还是有一定的代价的,需要实现消息的存储,消息存储和合同创建还是耦合在一起的,不过这样的模式到 Event Sourcing 下面那就比较理想了,因为本身消息数据和 event 是一样的,存储了 event 相当于完成了存储消息数据,只需要在 event 下做一个标记即可。


做完了上面这些,就能保证消息一定从 producer 到 broker 这一过程,当然要做到消息不丢,必然产生的结果就是消息可能会重复,情况就是 broker 收到了消息,但是没有通知到 producer,这种情况下 producer 是认为消息没有投递成功的,会出现重复投递的情况。保证了消息一定送达 broker 之后,就是 consumer 和 broker 的关系了,consumer 在消费之后需要告诉 broker 消费成功,否则 broker 需要一直保存这些消息。当然消费端可能需要做更多的事情,比如保证同一 aggregate 事件的顺序消费。下面文章会以在 Axon 框架上做一些拓展,以分别实现 consumer 和 producer。

拓展 DomainEventEntry

上面也说到了,在 Event Sourcing 模式下,我们只需要给事件加上一个是否投递的标志,这里我们就看看如何实现(这里只针对 JPA 做了实现)。


  1. 建立对应的 entity 以取代DomainEventEntry


@Entity(name = "DomainEventEntry")@Getter@Setter@Table(indexes = @Index(columnList = "aggregateIdentifier,sequenceNumber", unique = true))public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> {
@NotNull @Column(columnDefinition = "tinyint(1) default 0") private boolean sent = false;
public CustomDomainEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer) { super(eventMessage, serializer, byte[].class); this.setSent(false); }
/** * Default constructor required by JPA */ protected CustomDomainEventEntry() { }}
复制代码


  1. 建立对应的 storage 以取代 JpaEventStorageEngine:


public class CustomJpaEventStorageEngine extends JpaEventStorageEngine {
public CustomJpaEventStorageEngine(Builder builder) { super(builder); }
@Override protected Object createEventEntity(EventMessage<?> eventMessage, Serializer serializer) { return new CustomDomainEventEntry(asDomainEventMessage(eventMessage), serializer); }
public static CustomJpaEventStorageEngine.Builder builder() { return new CustomJpaEventStorageEngine.Builder(); }
// 此处略去了 builder 部分代码 public static class Builder extends JpaEventStorageEngine.Builder { ... }}
复制代码


  1. 更新对应的 config:


    @Bean    public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,                                                 PersistenceExceptionResolver persistenceExceptionResolver,                                                 @Qualifier("eventSerializer") Serializer eventSerializer,                                                 EntityManagerProvider entityManagerProvider,                                                 EventUpcaster contractUpCaster,                                                 TransactionManager transactionManager) {        return CustomJpaEventStorageEngine.builder()            .snapshotSerializer(defaultSerializer)            .upcasterChain(contractUpCaster)            .persistenceExceptionResolver(persistenceExceptionResolver)            .eventSerializer(eventSerializer)            .entityManagerProvider(entityManagerProvider)            .transactionManager(transactionManager)            .build();    }
复制代码

handler 实现可靠消息的生产端

做好了准备工作再发送消息就比较清晰了,我们需要做的就是在事件存储后去尝试发送消息,然后标记为已发送即可,在之前的 实现 CQRS 中我们留了一个坑,就是 view 端的更新不是在事件存储之后,这里我们就去实现发消息在事件存储之后,然后 view 层去监听消息更新。具体的实现就是利用 entity postPersist 去监听存储,在 transaction 成功后去尝试发送消息,代码如下:



@EntityListeners(CustomDomainEventEntryListener.class)public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> { ...}
@Component@Slf4jpublic class CustomDomainEventEntryListener { private static CustomDomainEventEntryRepository customDomainEventEntryRepository;
private static ContractPublisher contractPublisher;
@Autowired public void init(CustomDomainEventEntryRepository customDomainEventEntryRepository, ContractPublisher contractPublisher) { this.customDomainEventEntryRepository = customDomainEventEntryRepository; this.contractPublisher = contractPublisher; }
@PostPersist void onPersist(CustomDomainEventEntry entry) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override public void afterCompletion(int status) { if (status == TransactionSynchronization.STATUS_COMMITTED) { CompletableFuture.runAsync(() -> sendEvent(entry.getEventIdentifier())); } } }); }
@Transactional public void sendEvent(String identifier) { CustomDomainEventEntry eventEntry = customDomainEventEntryRepository.findByEventIdentifier(identifier);
if (!eventEntry.isSent()) { contractPublisher.sendEvent(eventEntry); eventEntry.setSent(true); customDomainEventEntryRepository.save(eventEntry); } }}
@Repositorypublic interface CustomDomainEventEntryRepository extends JpaRepository<CustomDomainEventEntry, String> {
/** * 查找事件 * * @param identifier * * @return */ CustomDomainEventEntry findByEventIdentifier(String identifier);}
@Component@AllArgsConstructor@Slf4jpublic class ContractEventPublisher {
public void sendEvent(DomainEvent event) { // use stream to send message here log.info(MessageFormat.format("prepare to sending message : {0}]", new Gson().toJson(event))); }
public void sendEvent(CustomDomainEventEntry event) { // use com.craftsman.eventsourcing.stream to send message here ObjectMapper mapper = new ObjectMapper();
HashMap payload = null; HashMap metaData = null; try { payload = mapper.readValue(event.getPayload().getData(), HashMap.class); metaData = mapper.readValue(event.getMetaData().getData(), HashMap.class); } catch (Exception exception) { log.error(MessageFormat.format("byte[] to string failed; exception: {0}", exception)); }
if (payload == null || metaData == null) { log.warn(MessageFormat.format("nothing to send; exception: {0}", event.getEventIdentifier())); return; }
DomainEvent domainEvent = new DomainEvent( event.getType(), event.getAggregateIdentifier(), event.getPayload().getType().getName(), event.getPayload().getType().getRevision(), event.getSequenceNumber(), event.getEventIdentifier(), event.getTimestamp(), payload, metaData);
this.sendEvent(domainEvent); }}
复制代码


  • DomainEvent 是为了统一消息的格式包装的类,具体可以看代码这里就不贴了

  • ContractEventPublisher 作为消息统一发送出口,为了不涉及 rabbitmq 暂时以 log 的形式代替消息发送,后续在 Spring Cloud Stream 优化中实现完整的流程

实现消费端

DomainEvent的属性中,我们可以看到有一个sequenceNumber字段,这个字段可以用来保证同一 aggregate 的事件顺序,那么在消费端可以以 type aggregate sequenceNumber 形成一张表,用来记录每个 aggregate 的最新状态,如果 aggregate 数据量比较大,也可以分表存储,一般 aggregate_id 索引之后,单表性能在百万级别,应该都没什么问题。这样在消费的时候先比较 sequenceNumber 差异,只消费差异值为 1 的事件,就可以保证同一 aggregate 的事件被顺序消费。之后会写篇关于 Spring Cloud Stream 的文章,用来作为服务之间的桥梁,并解决框架用 header 作为路由之后引起的问题,这里暂时不做深入。完整的例子 - branch session7


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


《Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化》


2019-07-08 10:203863

评论

发布
暂无评论
发现更多内容

新版阿里神级“高并发”教程《基础+实战+源码+面试+架构》

钟奕礼

Java java面试 java编程 程序员、

一步一图带你深入理解 Linux 物理内存管理

bin的技术小屋

内存 内存管理 Linux Kenel 内核 内核源码

一周活动速递|Paper Time第五期;技术征文大赛即将收官

OceanBase 数据库

热门资讯:超大规模数字产业生态正在加快构建!

优秀

数字化转型

“天翼云杯”厦门软件开发大赛开赛 为开发者提供“沃土”

Geek_2d6073

当Kubernetes遇见Macvlan——网络互通

陆云

【原生Ajax】全面了解xhr的概念与使用。

坚毅的小解同志

ajax 11月月更

4.0体验站|OceanBase 4.0 我回来给你点个赞

OceanBase 数据库

SpringBoot邮件发送demo

@下一站

编程 email Java core 11月月更

HashData携手恒丰银行 入选信通院金融科技创新应用五大“最受关注案例”

酷克数据HashData

云数据仓库

百度APP iOS端内存优化实践-内存管控方案

百度Geek说

android 后端 内存管理 企业号十月 PK 榜

小令观点丨现代版 “见令如见人”

令牌云数字身份

python常用的内置对象

乔乔

11月月更

行业分析| 智慧消防对讲

anyRTC开发者

人工智能 监控 消防 调度 快对讲

LR低代码快速开发平台 高效调整企业组织架构

力软低代码开发平台

小令观点 | 从大批QQ账号被盗,看账号安全与数据资产问题

令牌云数字身份

数据安全 账号安全

MySQL中支持的字符集和排序规则

@下一站

MySQL 技术 字符集 11月月更

小令动态 | 令牌云成功通过国家等保三级认证

令牌云数字身份

嘉为科技彭一宽:组织度量,先做造钟人,再做报时人

嘉为蓝鲸

DevOps 度量

京东二面:MySQL 主从延迟,读写分离 7 种解决方案

钟奕礼

Java 程序员 java面试 java编程

源自双11混部实战,Koordinator 如何保障应用服务质量?

阿里技术

云原生 混部技术

Mybatis中使用${}和使用#{}

@下一站

mybatis MyBatis标签 Java core 11月月更

小令动态 | 令牌云新获上海市创新资金立项支持,此前还有......

令牌云数字身份

新力量,新希望|明道云伙伴大会2022秋圆满落幕

明道云

字节跳动基于数据湖技术的近实时场景实践

字节跳动数据平台

数据湖 火山引擎

华为云虚拟专用网络VPN,为企业铺就数据上云的安全路

路过的憨憨

云计算:基于互联网的超级计算

Finovy Cloud

云计算 云渲染

AI机器学习模型部署的典型策略

Baihai IDP

人工智能 AI MLOps 模型部署

DevOps制品管理——软件“工业革命”的里程碑式改革

嘉为蓝鲸

DevOps 制品管理

SAP UI5 应用和 Angular 应用视图里控件 id 生成逻辑的异同比较

Jerry Wang

前端开发 angular SAP SAP UI5 11月月更

JS有哪些变态语法,你知道吗?

千锋IT教育

Event Sourcing 和 CQRS落地(六):实现可靠消息_文化 & 方法_周国勇_InfoQ精选文章