HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化

  • 2019-07-05
  • 本文字数:7749 字

    阅读完需:约 25 分钟

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化

本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。

Spring Cloud Stream 优化

问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:


  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。

  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。

  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。

解决方案

为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:



这样以上几点问题都会得到解决,下面我们来看看具体如何实现:


  • 首先定义一个注解用于接受自己分发的事件:



@Target( {ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface StreamEventHandler {
String[] payloadTypes() default {""};
String[] types();}

复制代码


types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。


  • 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :


@Entity@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))@Getter@Setter@NoArgsConstructorpublic class DomainAggregateSequence {
@Id @GeneratedValue private Long id;
private Long sequenceNumber;
private Long aggregateIdentifier;
private String type;}
@Repositorypublic interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {
/** * 根据 aggregate id 和 type 找到对应的记录 * * @param identifier * @param type * * @return */ DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);
}
复制代码


  • 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:


@Slf4j@Component@AllArgsConstructorpublic class StreamDomainEventDispatcher implements BeanPostProcessor {
private final ObjectMapper mapper;
private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;
private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();
@Autowired public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) { this.mapper = mapper; this.domainAggregateSequenceRepository = domainAggregateSequenceRepository; }
@Transactional public void dispatchEvent(DomainEvent event, String type) {
log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));
// 1. 检查是否是乱序事件或者重复事件 Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier()); String eventType = event.getType(); Long eventSequence = event.getSequenceNumber();
DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);
if (sequenceObject == null) { sequenceObject = new DomainAggregateSequence(); sequenceObject.setSequenceNumber(eventSequence); sequenceObject.setAggregateIdentifier(aggregateIdentifier); sequenceObject.setType(eventType); } else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) { // 重复事件,直接忽略 if (sequenceObject.getSequenceNumber().equals(eventSequence)) { log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber())); return; } throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence)); } else { sequenceObject.setSequenceNumber(eventSequence); }
domainAggregateSequenceRepository.save(sequenceObject);
// 2. 分发事件到各个 handler beanHandlerMap.forEach((key, value) -> { Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());
matchedMethod.ifPresent(method -> { try { invoke(key, method, event); } catch (IllegalAccessException | InvocationTargetException e) {
throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e); } });
if (!matchedMethod.isPresent()) { log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier())); } });
log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier())); }
@Transactional public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) { // 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性 List<Method> results = methods.stream().filter(m -> { StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class); List<String> types = new ArrayList<>(Arrays.asList(handler.types())); List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));
types.removeIf(StringUtils::isBlank); payloadTypes.removeIf(StringUtils::isBlank);
if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) { payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName()); }
boolean isTypeMatch = types.contains(type);
String checkedPayloadType = payloadType; if (StringUtils.contains(checkedPayloadType, ".")) { checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, "."); } boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);
return isTypeMatch && isPayloadTypeMatch; }).collect(Collectors.toList());
if (results.size() > 1) { throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType)); }
return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty(); }
@Transactional public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {
int count = method.getParameterCount();
if (count == 0) { method.invoke(bean); } else if (count == 1) { Class<?> payloadType = method.getParameterTypes()[0];
if (payloadType.equals(DomainEvent.class)) { method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class)); } else { method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType)); }
} else if (count == 2) { Class<?> payloadType0 = method.getParameterTypes()[0]; Class<?> payloadType1 = method.getParameterTypes()[1];
Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0); Object secondParameterValue = event.getMetaData();
// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入 if (payloadType0.equals(DomainEvent.class)) { firstParameterValue = mapper.convertValue(event, payloadType0); secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1); } if (payloadType1.equals(DomainEvent.class)) { secondParameterValue = mapper.convertValue(event, payloadType1); } method.invoke(bean, firstParameterValue, secondParameterValue); } }

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
List<Method> methods = new ArrayList<>(); for (Method method : uniqueDeclaredMethods) { StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamEventHandler.class); if (streamListener != null) { methods.add(method); } } if (!CollectionUtils.isEmpty(methods)) { beanHandlerMap.put(bean, methods); } return bean; }
}
复制代码


这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler 注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。


  • 实现一个比较简单的例子:


@Slf4j@Component@Transactional@AllArgsConstructorpublic class DomainEventDispatcher {
private final StreamDomainEventDispatcher streamDomainEventDispatcher;
@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'") public void handleBuilding(@Payload DomainEvent event) { streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT); }}
@Component@AllArgsConstructor@Transactionalpublic class ContractEventHandler { @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) public void handle(ContractCreatedEvent event) { // 实现你的 view 层更新业务 }}
复制代码


注意:


  • AbstractDomainEventDispatcher中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional会失效,这个有兴趣的同学可以研究一下@Transactional的机制。


至此以上几点就优化完了。

其他优化

错误处理

基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:


  1. 拒绝这个消息,丢在 broker 原先的队列里。

  2. 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。


方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:



  • 首先让 SCS 为我们自动生成一个 DLQ


spring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect  rabbitmq:    host: localhost    port: 5672    username: creams_user    password: Souban701  cloud:    stream.bindings:      contract-events: # 这个名字对应代码中@input("value") 的 value        destination: contract-events # 这个对应 rabbit 中的 channel        contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下      contract-events-input:        destination: contract-events        contentType: application/json        group: event-sourcing-service        durableSubscription: true    stream.rabbit.bindings.contract-events-input.consumer:      autoBindDlq: true      republishToDlq: true      deadLetterQueueName: contract-error.dlqlogging:  level.org:    springframework:      web: INFO      cloud.sleuth: INFO    apache.ibatis: DEBUG    java.sql: DEBUG    hibernate:      SQL: DEBUG      type.descriptor.sql: TRACE
axon: serializer: general: jackson

复制代码


加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。


  • 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。


@Componentpublic class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
private final RabbitTemplate rabbitTemplate;
private ApplicationContext applicationContext;
private static final String DLQ = "contract-error.dlq";
@Autowired public DLXHandler(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override public void onApplicationEvent(ContextRefreshedEvent event) { // SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成 if (event.getApplicationContext().equals(this.applicationContext)) { // 启动后获取 dlq 中所有的消息,进行消费 Message message = rabbitTemplate.receive(DLQ); while (message != null) { rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message); message = rabbitTemplate.receive(DLQ); } }
}}
复制代码


由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。

完善之前的 CQRS 例子

经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。


  1. 添加时间的监听


    @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)    public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {        QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView(); view.setIndustryName(aggregate.getIndustryName()); view.setId(aggregate.getIdentifier()); view.setPartyB(aggregate.getPartyB()); view.setPartyA(aggregate.getPartyA()); view.setName(aggregate.getName()); view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view); }
复制代码


StreamDomainEventDispatcher 对传参做了一些处理,当有两个参数的时候会将 DomainEvent 传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。


  1. 删除原来的 ContractViewHandler 即可。完整的例子 - branch session6


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


2019-07-05 09:027479

评论

发布
暂无评论
发现更多内容

MongoDB源码学习:执行创建Collection命令

云里有只猫

mongodb 源码解析

vivo 短视频用户访问体验优化实践

vivo互联网技术

CDN HTTP 优化 DNS 实践

低代码四大典型使用场景,你都知道吗?

SoFlu软件机器人

cortex ingester 基于 hash ring 进行 token 管理

jupiter

Prometheus 一致性hash Cortex Mimir

openGemini正式加入openEuler SIG-DB ,携手开展全方面技术创新

openEuler

数据库 Linux 开源 操作系统 openEuler

基于 RocketMQ Connect 构建数据流转处理平台

Apache RocketMQ

"鸿蒙生态专家面对面"三月专场等你前来!

HarmonyOS开发者

使用 Athena (Presto) 分析本地 Oracle 数据库导出的数据

亚马逊云科技 (Amazon Web Services)

中国半导体市场份额进一步提升,2023年将迎全新发展良机

华秋电子

NodeJS 实战系列:模块设计与文件分类

光毅

JavaScript node.js

优秀软件工程师必备的五大技能,快看你还差什么?

SoFlu软件机器人

Spring Boot中如何优雅地实现异步调用?

JAVA旭阳

Java springboot

100Wqps短链系统,怎么设计?

小小怪下士

Java 程序员 后端 QPS

Toast的基本使用

梦笔生花

android Adapter toast

深圳.NET线下技术沙龙倒计时一天

MASA技术团队

.net MASA

如何测试一个AI系统?

陈磊@Criss

AI 测试

PyTorch深度学习实战 | 基于ResNet的人脸关键点检测

TiAmo

深度学习 人脸识别 PyTorch

测试的底层逻辑

京东科技开发者

Java 测试 代码 企业号 3 月 PK 榜

硬核!阿里大佬都在内卷的SpringBoot从入门到实战笔记

程序知音

Java 编程语言 springboot Java进阶 后端技术

在 windows 上连接 wsl 和直接打开 ubantu 有什么区别?

玄兴梦影

wsl window

如何快速理解网络IO模型

Dinfan

Netty 事件循环 IO模型 Reactor多线程 网络io模型

Matlab常用图像处理命令108例(七)

timerring

图像处理

【微电平台】-高并发实战经验-奇葩问题解决之旅

京东科技开发者

架构 服务端 js 平台

视频下载软件:MediaHuman YouTube Downloader 中文版

真大的脸盆

Mac 视频下载 Mac 软件 下载视频 视频下载工具

板边器件距离不够,导致元器件无法焊接,怎么办?

华秋电子

前端进阶:在 Web 中使用 C++,我让学妹另眼相看 | 技术分享

LigaAI

c++ 程序人生 前端 webassembly 企业号 3 月 PK 榜

聊聊「订单」业务的设计与实现

Java 架构 订单管理 订单系统 订单

【干货】常见库存设计方案-各种方案对比总有一个适合你

Java你猿哥

Java 架构 微服务 系统设计 后端

内部开发者门户是什么?

SEAL安全

微服务 企业号 3 月 PK 榜 内部开发者门户 信息碎片化

Nacos心跳机制实现快速上下线

做梦都在改BUG

Java Spring Cloud nacos 心跳机制

索信达再度打造智能营销标杆案例,这次是头部券商!

索信达控股

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化_文化 & 方法_周国勇_InfoQ精选文章