写点什么

Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化

  • 2019-07-05
  • 本文字数:7749 字

    阅读完需:约 25 分钟

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化

本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。

Spring Cloud Stream 优化

问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:


  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。

  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。

  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。

解决方案

为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:



这样以上几点问题都会得到解决,下面我们来看看具体如何实现:


  • 首先定义一个注解用于接受自己分发的事件:



@Target( {ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface StreamEventHandler {
String[] payloadTypes() default {""};
String[] types();}

复制代码


types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。


  • 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :


@Entity@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))@Getter@Setter@NoArgsConstructorpublic class DomainAggregateSequence {
@Id @GeneratedValue private Long id;
private Long sequenceNumber;
private Long aggregateIdentifier;
private String type;}
@Repositorypublic interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {
/** * 根据 aggregate id 和 type 找到对应的记录 * * @param identifier * @param type * * @return */ DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);
}
复制代码


  • 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:


@Slf4j@Component@AllArgsConstructorpublic class StreamDomainEventDispatcher implements BeanPostProcessor {
private final ObjectMapper mapper;
private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;
private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();
@Autowired public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) { this.mapper = mapper; this.domainAggregateSequenceRepository = domainAggregateSequenceRepository; }
@Transactional public void dispatchEvent(DomainEvent event, String type) {
log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));
// 1. 检查是否是乱序事件或者重复事件 Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier()); String eventType = event.getType(); Long eventSequence = event.getSequenceNumber();
DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);
if (sequenceObject == null) { sequenceObject = new DomainAggregateSequence(); sequenceObject.setSequenceNumber(eventSequence); sequenceObject.setAggregateIdentifier(aggregateIdentifier); sequenceObject.setType(eventType); } else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) { // 重复事件,直接忽略 if (sequenceObject.getSequenceNumber().equals(eventSequence)) { log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber())); return; } throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence)); } else { sequenceObject.setSequenceNumber(eventSequence); }
domainAggregateSequenceRepository.save(sequenceObject);
// 2. 分发事件到各个 handler beanHandlerMap.forEach((key, value) -> { Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());
matchedMethod.ifPresent(method -> { try { invoke(key, method, event); } catch (IllegalAccessException | InvocationTargetException e) {
throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e); } });
if (!matchedMethod.isPresent()) { log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier())); } });
log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier())); }
@Transactional public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) { // 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性 List<Method> results = methods.stream().filter(m -> { StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class); List<String> types = new ArrayList<>(Arrays.asList(handler.types())); List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));
types.removeIf(StringUtils::isBlank); payloadTypes.removeIf(StringUtils::isBlank);
if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) { payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName()); }
boolean isTypeMatch = types.contains(type);
String checkedPayloadType = payloadType; if (StringUtils.contains(checkedPayloadType, ".")) { checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, "."); } boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);
return isTypeMatch && isPayloadTypeMatch; }).collect(Collectors.toList());
if (results.size() > 1) { throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType)); }
return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty(); }
@Transactional public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {
int count = method.getParameterCount();
if (count == 0) { method.invoke(bean); } else if (count == 1) { Class<?> payloadType = method.getParameterTypes()[0];
if (payloadType.equals(DomainEvent.class)) { method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class)); } else { method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType)); }
} else if (count == 2) { Class<?> payloadType0 = method.getParameterTypes()[0]; Class<?> payloadType1 = method.getParameterTypes()[1];
Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0); Object secondParameterValue = event.getMetaData();
// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入 if (payloadType0.equals(DomainEvent.class)) { firstParameterValue = mapper.convertValue(event, payloadType0); secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1); } if (payloadType1.equals(DomainEvent.class)) { secondParameterValue = mapper.convertValue(event, payloadType1); } method.invoke(bean, firstParameterValue, secondParameterValue); } }

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
List<Method> methods = new ArrayList<>(); for (Method method : uniqueDeclaredMethods) { StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamEventHandler.class); if (streamListener != null) { methods.add(method); } } if (!CollectionUtils.isEmpty(methods)) { beanHandlerMap.put(bean, methods); } return bean; }
}
复制代码


这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler 注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。


  • 实现一个比较简单的例子:


@Slf4j@Component@Transactional@AllArgsConstructorpublic class DomainEventDispatcher {
private final StreamDomainEventDispatcher streamDomainEventDispatcher;
@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'") public void handleBuilding(@Payload DomainEvent event) { streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT); }}
@Component@AllArgsConstructor@Transactionalpublic class ContractEventHandler { @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) public void handle(ContractCreatedEvent event) { // 实现你的 view 层更新业务 }}
复制代码


注意:


  • AbstractDomainEventDispatcher中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional会失效,这个有兴趣的同学可以研究一下@Transactional的机制。


至此以上几点就优化完了。

其他优化

错误处理

基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:


  1. 拒绝这个消息,丢在 broker 原先的队列里。

  2. 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。


方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:



  • 首先让 SCS 为我们自动生成一个 DLQ


spring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect  rabbitmq:    host: localhost    port: 5672    username: creams_user    password: Souban701  cloud:    stream.bindings:      contract-events: # 这个名字对应代码中@input("value") 的 value        destination: contract-events # 这个对应 rabbit 中的 channel        contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下      contract-events-input:        destination: contract-events        contentType: application/json        group: event-sourcing-service        durableSubscription: true    stream.rabbit.bindings.contract-events-input.consumer:      autoBindDlq: true      republishToDlq: true      deadLetterQueueName: contract-error.dlqlogging:  level.org:    springframework:      web: INFO      cloud.sleuth: INFO    apache.ibatis: DEBUG    java.sql: DEBUG    hibernate:      SQL: DEBUG      type.descriptor.sql: TRACE
axon: serializer: general: jackson

复制代码


加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。


  • 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。


@Componentpublic class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
private final RabbitTemplate rabbitTemplate;
private ApplicationContext applicationContext;
private static final String DLQ = "contract-error.dlq";
@Autowired public DLXHandler(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override public void onApplicationEvent(ContextRefreshedEvent event) { // SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成 if (event.getApplicationContext().equals(this.applicationContext)) { // 启动后获取 dlq 中所有的消息,进行消费 Message message = rabbitTemplate.receive(DLQ); while (message != null) { rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message); message = rabbitTemplate.receive(DLQ); } }
}}
复制代码


由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。

完善之前的 CQRS 例子

经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。


  1. 添加时间的监听


    @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)    public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {        QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView(); view.setIndustryName(aggregate.getIndustryName()); view.setId(aggregate.getIdentifier()); view.setPartyB(aggregate.getPartyB()); view.setPartyA(aggregate.getPartyA()); view.setName(aggregate.getName()); view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view); }
复制代码


StreamDomainEventDispatcher 对传参做了一些处理,当有两个参数的时候会将 DomainEvent 传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。


  1. 删除原来的 ContractViewHandler 即可。完整的例子 - branch session6


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


2019-07-05 09:027605

评论

发布
暂无评论
发现更多内容

数据增强(一):imgaug

AIWeker

人工智能 深度学习 数据增强 5月月更

【IT运维】运维告警方式有哪些?哪个工具好用?

行云管家

服务器 IT运维 服务器运维

java培训Redis数据结构面试分享

@零度

redis JAVA开发

OpenHarmony布道师招募正式启动,打造个人技术影响力的机会来了!

科技汇

黎明前的至暗时刻,旅企是该坚守还是放弃?

易观分析

旅游业

连续3年实力登榜!EMQ映云科技再度跻身“2022中国边缘计算企业20强”

EMQ映云科技

物联网 IoT 边缘计算 emq 5月月更

升级HarmonyOS 2最新版本,出门亮健康码快人一步!

科技汇

还有谁不知道CRM系统可以为企业做这些?

低代码小观

CRM 客户关系管理 CRM系统 客户关系管理系统 企业管理软件

十年磨一剑|沃趣数据库云产品战略首发

沃趣科技

云原生 公有云 私有云 数据库云 沃趣科技

OpenMLDB官网升级,神秘贡献者地图带你快速进阶!

第四范式开发者社区

人工智能 机器学习 数据库 开源 特征

AI驱动音乐创新,网易数帆X云音乐刷新MIREX世界纪录

网易数帆

人工智能 AI 语音识别 歌词识别 预练习

大咖说 X 智篆商业|未来五年:消费增长的“两大来源”与“四个方面”

大咖说

阿里云 存量时代 智篆商业

案例分享|一键式自动监测,跨境电商平台的业务转型模板

博睿数据

博睿数据 数据链DNA

RGB色彩空间

Loken

音视频 5月月更

如何写出GC更优的代码,以达到提升代码性能的目的

非凸科技

性能 编程语言 垃圾回收 GC 吞吐率

node爬虫爬取小说章节

空城机

爬虫 Node 5月月更

直播预告丨Hello HarmonyOS进阶课程第二课——计算机视觉

HarmonyOS开发者

HarmonyOS 图形图像 视觉开发

跨端跨框架 UI 自动化测试方案 Flybirds

liang chen

TASKCTL分布式任务调度平台-流程控制原理

敏捷调度TASKCTL

程序员 DevOps 分布式 大数据运维 TASKCTL

web前端培训React性能优化总结

@零度

前端开发 React

Connection reset

领创集团Advance Intelligence Group

Connection reset

【Linux深潜】详解Linux系统自身安全调优配置

沃趣科技

Linux

跟我学Python图像处理丨掌握4种图像平滑算法

华为云开发者联盟

Python OpenCV 图像属性 兴趣ROI区域 图像通道

Swoole 定时器能实现毫秒级任务调度,你敢相信吗?

CRMEB

火山引擎入选“2022 中国边缘计算20强”榜单

火山引擎边缘云

边缘计算

云图说|DDS读写两步走,带您领略只读节点的风采

华为云开发者联盟

数据库 华为云 读写分离 DDS

时序数据库为万物互联打下坚实的基石

华为云开发者联盟

数据仓库 物联网 时序数据库 GaussDB 时序数据

我是如何做到百万数据跑批半小时结束

skow

后端 java

【等保测评】2022年深圳等保测评公司排名看这里!

行云管家

等保 深圳 等保测评 等保2.0

CWE 4.7中的新视图:工业控制系统的安全漏洞类别

华为云开发者联盟

安全漏洞 cwe 软件安全 CWE 4.7 工业控制系统

OpenHarmony 3GPP协议开发深度剖析——一文读懂RIL

OpenHarmony开发者

OpenHarmony RIL

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化_文化 & 方法_周国勇_InfoQ精选文章