本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。
Spring Cloud Stream 优化
问题
Spring Cloud Stream
(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:
StreamListener
用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。
在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。
解决方案
为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:
这样以上几点问题都会得到解决,下面我们来看看具体如何实现:
首先定义一个注解用于接受自己分发的事件:
@Target( {ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StreamEventHandler {
String[] payloadTypes() default {""};
String[] types();
}
types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated
,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。
定义用于记录 aggregate sequenceNumber 的 entity 和 repository :
@Entity
@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))
@Getter
@Setter
@NoArgsConstructor
public class DomainAggregateSequence {
@Id
@GeneratedValue
private Long id;
private Long sequenceNumber;
private Long aggregateIdentifier;
private String type;
}
@Repository
public interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {
/**
* 根据 aggregate id 和 type 找到对应的记录
*
* @param identifier
* @param type
*
* @return
*/
DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);
}
由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:
@Slf4j
@Component
@AllArgsConstructor
public class StreamDomainEventDispatcher implements BeanPostProcessor {
private final ObjectMapper mapper;
private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;
private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();
@Autowired
public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) {
this.mapper = mapper;
this.domainAggregateSequenceRepository = domainAggregateSequenceRepository;
}
@Transactional
public void dispatchEvent(DomainEvent event, String type) {
log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));
// 1. 检查是否是乱序事件或者重复事件
Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier());
String eventType = event.getType();
Long eventSequence = event.getSequenceNumber();
DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);
if (sequenceObject == null) {
sequenceObject = new DomainAggregateSequence();
sequenceObject.setSequenceNumber(eventSequence);
sequenceObject.setAggregateIdentifier(aggregateIdentifier);
sequenceObject.setType(eventType);
} else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) {
// 重复事件,直接忽略
if (sequenceObject.getSequenceNumber().equals(eventSequence)) {
log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber()));
return;
}
throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence));
} else {
sequenceObject.setSequenceNumber(eventSequence);
}
domainAggregateSequenceRepository.save(sequenceObject);
// 2. 分发事件到各个 handler
beanHandlerMap.forEach((key, value) -> {
Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());
matchedMethod.ifPresent(method -> {
try {
invoke(key, method, event);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e);
}
});
if (!matchedMethod.isPresent()) {
log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier()));
}
});
log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier()));
}
@Transactional
public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) {
// 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性
List<Method> results = methods.stream().filter(m -> {
StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class);
List<String> types = new ArrayList<>(Arrays.asList(handler.types()));
List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));
types.removeIf(StringUtils::isBlank);
payloadTypes.removeIf(StringUtils::isBlank);
if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) {
payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName());
}
boolean isTypeMatch = types.contains(type);
String checkedPayloadType = payloadType;
if (StringUtils.contains(checkedPayloadType, ".")) {
checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, ".");
}
boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);
return isTypeMatch && isPayloadTypeMatch;
}).collect(Collectors.toList());
if (results.size() > 1) {
throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType));
}
return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty();
}
@Transactional
public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {
int count = method.getParameterCount();
if (count == 0) {
method.invoke(bean);
} else if (count == 1) {
Class<?> payloadType = method.getParameterTypes()[0];
if (payloadType.equals(DomainEvent.class)) {
method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class));
} else {
method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType));
}
} else if (count == 2) {
Class<?> payloadType0 = method.getParameterTypes()[0];
Class<?> payloadType1 = method.getParameterTypes()[1];
Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0);
Object secondParameterValue = event.getMetaData();
// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入
if (payloadType0.equals(DomainEvent.class)) {
firstParameterValue = mapper.convertValue(event, payloadType0);
secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1);
}
if (payloadType1.equals(DomainEvent.class)) {
secondParameterValue = mapper.convertValue(event, payloadType1);
}
method.invoke(bean, firstParameterValue, secondParameterValue);
}
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
List<Method> methods = new ArrayList<>();
for (Method method : uniqueDeclaredMethods) {
StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
StreamEventHandler.class);
if (streamListener != null) {
methods.add(method);
}
}
if (!CollectionUtils.isEmpty(methods)) {
beanHandlerMap.put(bean, methods);
}
return bean;
}
}
这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler
注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。
实现一个比较简单的例子:
@Slf4j
@Component
@Transactional
@AllArgsConstructor
public class DomainEventDispatcher {
private final StreamDomainEventDispatcher streamDomainEventDispatcher;
@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'")
public void handleBuilding(@Payload DomainEvent event) {
streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT);
}
}
@Component
@AllArgsConstructor
@Transactional
public class ContractEventHandler {
@StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)
public void handle(ContractCreatedEvent event) {
// 实现你的 view 层更新业务
}
}
注意:
AbstractDomainEventDispatcher
中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional
会失效,这个有兴趣的同学可以研究一下@Transactional
的机制。
至此以上几点就优化完了。
其他优化
错误处理
基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:
拒绝这个消息,丢在 broker 原先的队列里。
将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。
方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:
首先让 SCS 为我们自动生成一个 DLQ
spring:
application:
name: event-sourcing-service
datasource:
url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE
username: root
password: root
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: false
show-sql: false
properties:
hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
rabbitmq:
host: localhost
port: 5672
username: creams_user
password: Souban701
cloud:
stream.bindings:
contract-events: # 这个名字对应代码中@input("value") 的 value
destination: contract-events # 这个对应 rabbit 中的 channel
contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下
contract-events-input:
destination: contract-events
contentType: application/json
group: event-sourcing-service
durableSubscription: true
stream.rabbit.bindings.contract-events-input.consumer:
autoBindDlq: true
republishToDlq: true
deadLetterQueueName: contract-error.dlq
logging:
level.org:
springframework:
web: INFO
cloud.sleuth: INFO
apache.ibatis: DEBUG
java.sql: DEBUG
hibernate:
SQL: DEBUG
type.descriptor.sql: TRACE
axon:
serializer:
general: jackson
加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName
指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。
在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。
@Component
public class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
private final RabbitTemplate rabbitTemplate;
private ApplicationContext applicationContext;
private static final String DLQ = "contract-error.dlq";
@Autowired
public DLXHandler(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成
if (event.getApplicationContext().equals(this.applicationContext)) {
// 启动后获取 dlq 中所有的消息,进行消费
Message message = rabbitTemplate.receive(DLQ);
while (message != null) {
rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message);
message = rabbitTemplate.receive(DLQ);
}
}
}
}
由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。
完善之前的 CQRS 例子
经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。
添加时间的监听
@StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)
public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {
QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView();
view.setIndustryName(aggregate.getIndustryName());
view.setId(aggregate.getIdentifier());
view.setPartyB(aggregate.getPartyB());
view.setPartyA(aggregate.getPartyA());
view.setName(aggregate.getName());
view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view);
}
StreamDomainEventDispatcher
对传参做了一些处理,当有两个参数的时候会将 DomainEvent
传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。
删除原来的
ContractViewHandler
即可。完整的例子 - branch session6
作者介绍:
周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。
相关文章:
《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》
《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》
《Event Sourcing 和 CQRS 落地(三):CQRS 实现》
《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》
更多内容推荐
05|实现完整的 IoC 容器:构建工厂体系并添加容器事件
建立BeanFactory体系,添加容器事件
2023-03-22
测试 RxJava2
你已经探索了如何在代码中使用响应式编程,但是还需要考虑如何在代码库中对响应式编程进行测试。在本文中,Java冠军程序员Andres Almiray介绍了RxJava2的测试技术和工具。本文是“测试RxJava”一文的修订,根据RxJava2规范做了全面更新。
dubbo 源码之启动过程分析
RPC框架。在平常业务开发过程中使用的越来越频繁,同时也会遇到更多的问题。这就需要我们更多的了解一下dubbo源码,以便更好的处理问题。
Spring 发布 1.1 版 Statemachine 框架
Spring发布了名为Statemachine的1.1版状态机框架,支持Spring Security、Redis,以及UI建模等功能。
Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现
本篇内容会实现一个将增删改操作使用 EventSoucing取代的例子,读取部分暂时不做涉及。
RxJava2 实例解析
在高并发编程范式的发展过程中,响应式编程最为抢眼。它是处理异步数据流的一种规范,为数据流的转换和聚合以及数据流的控制管理提供了工具支持,让考量程序整体设计的工作变得简单。本文中,我们将根据例子循序渐进地学习RxJava2。
19|Pointcut :如何批量匹配代理方法?
Pointcut :如何批量匹配代理方法?
2023-04-24
11|ModelAndView :如何将处理结果返回给前端?
将处理结果返回给前端
2023-04-05
Reactor 实例解析
Reactor是第四代响应式框架,跟RxJava 2有些相似。Reactor项目由Pivotal启动,以响应式流规范、Java8和ReactiveX术语表为基础。它的设计是Reactor 2(上一个主要版本)和RxJava核心贡献者共同努力的结果。
Event Sourcing 和 CQRS 落地(三):CQRS 实现
这篇文章就暂时以同一个数据库为例子,同样使用JPA去做View的ORM。
20|AutoProxyCreator:如何自动添加动态代理?
如何自动添加动态代理?
2023-04-26
Event Sourcing 和 CQRS 落地(四):深入使用 -Axon
在本系列的前一篇文章中,我们介绍了CQRS实现,以数据库为例使用JPA去做 View的ORM,本文将重点介绍Axon实现。
Event Sourcing 和 CQRS 落地(六):实现可靠消息
在本系列的上一篇文章中,作者介绍了Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法,本文将主要介绍如何实现可靠消息。
Spring 5.0 GA 版本发布,支持 JDK9 及反应式编程
Spring官方博客昨天(9月28日)撰文宣布了Spring Framework 5.0 GA版本的正式发布。该版本耗时2年,带来了一系列全新特性,包含对JDK 9和Java EE 8 API(如Servlet 4.0)的支持,全面集成Reactor 3.1、JUnit 5和Kotlin语言,还包括一个反应式Web框架Spring WebFlux。Spring 5.0最低要求JDK8。
17|动态代理:如何在运行时插入逻辑?
如何在运行时插入逻辑?
2023-04-19
Pivotal 发布包含反应式数据访问特性的新一代 Spring Data 的第一个里程碑版本
Pivotal最近发布了下一代Spring Data项目的第一个里程碑版本,它的特性包括完全支持Java 8和Spring 5,提供了针对MongoDB、Apache Cassandra和Redis的反应式(Reactive)数据库访问。
18|拦截器 :如何在方法前后进行拦截?
如何在方法前后进行拦截?
2023-04-21
Event Sourcing 和 CQRS 落地(七):服务优化
在本系列的上一篇文章中,主要介绍了如何实现可靠消息,本文主要介绍如何进行服务优化。
Kafka-Clients 源码学习:KafkaProducer 篇
本文基于 Kafka-clients:1.1.0 版本,分享了Kafka-Clients源码学习:KafkaProducer篇。
基于 Kafka 实现分布式事件驱动
事件驱动有好的一面也有不好的一面。本文是运满满基于Kfaka实现分布式事件驱动的实践,包含设计,实现和示例。
推荐阅读
电子书
大厂实战PPT下载
换一换 单家骏 | 腾讯 专家工程师
王胜 | 三维家 图灵实验室 AI 负责人
罗璇 | 元始智能 联合创始人兼COO
评论