写点什么

Event Sourcing 和 CQRS 落地(七):服务优化

  • 2019-07-08
  • 本文字数:4221 字

    阅读完需:约 14 分钟

Event Sourcing 和 CQRS落地(七):服务优化

在本系列的上一篇文章中,主要介绍了如何实现可靠消息,本文主要介绍如何进行服务优化。

服务优化

失败消息的补偿机制

由于消息存在发送失败的情况,比如 broker 临时下线或者不可用了,尽管这种情况很少,我们最好做一个机制可以定期或者手动检查,并且尝试自己发送,这里我们就来实现这个机制。

提供未发送的 event 查询

  1. CustomDomainEventEntryRepository中加入:


    /**     * 查找未发送的事件     *     * @param pageable     *     * @return     */    Page<CustomDomainEventEntry> findBySentFalse(Pageable pageable);        /**     * 查询未发送事件的数量     * @return     */    Long countBySentFalse();
复制代码


  1. 将未发送事件的数量集成到 actuator,让我们可以事实看到失败消息的数量:


@Component@AllArgsConstructorpublic class EventHealthContributor implements InfoContributor {
private final CustomDomainEventEntryRepository customDomainEventEntryRepository;
@Override public void contribute(Info.Builder builder) { Long count = customDomainEventEntryRepository.countBySentFalse();
builder.withDetail("failedMessage", count); }}
复制代码


打开 http://localhost:8080/actuator/info 应该就可以看到我们的失败消息数量。

定期检查并且自动发送

建立对应的 service 和 controller:


@Service@Slf4j@AllArgsConstructorpublic class ScheduleService {
private final CustomDomainEventEntryRepository customDomainEventEntryRepository; private final ContractEventPublisher contractEventPublisher;
@Scheduled(cron = "0 0 12 * * ?") @SchedulerLock(name = "failedMessageDiscoveryTask") public void failedMessageDiscovery() {
Integer page = 0; PageRequest request = PageRequest.of(page, 1000);
Page<CustomDomainEventEntry> results = customDomainEventEntryRepository.findBySentFalse(request); log.warn(MessageFormat.format("发现 [{0}] 条失败消息,尝试重新发送", results.getTotalElements())); sendFailedMessage(results.getContent()); while (results.hasNext()) { request = PageRequest.of(page + 1, 1000); results = customDomainEventEntryRepository.findBySentFalse(request); sendFailedMessage(results.getContent()); } log.info("所有失败消息尝试发送完毕"); } private void sendFailedMessage(Collection<CustomDomainEventEntry> failedEvents) {
failedEvents.forEach(e -> { contractEventPublisher.sendEvent(e); e.setSent(true); customDomainEventEntryRepository.save(e); }); }}
复制代码


有些时候我们可能需要自己触发一个修复操作,可以把这个写成 API:


/** * 用于修复 view 视图和 aggregate 的不一致性,以及未发送消息的重试 */@Slf4j@AllArgsConstructor@RestController@RequestMapping("/repair")public class DataRepairController {
private final ScheduleService scheduleService; private static final String SECRET = "e248b98418db4cdcb069e8a1c08f6bb7";
@GetMapping("/message") @Async public void repairMessage(@RequestParam("secret") String secret) { if (!StringUtils.equals(secret, SECRET)) { return; }
scheduleService.failedMessageDiscovery(); }}

复制代码


pom 中加入:



<dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-spring</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-provider-jdbc-template</artifactId> <version>1.0.0</version> </dependency>
复制代码


数据库中执行脚本:


CREATE TABLE shedlock(    name VARCHAR(64),     lock_until TIMESTAMP(3) NULL,     locked_at TIMESTAMP(3) NULL,     locked_by  VARCHAR(255),     PRIMARY KEY (name)) 
复制代码


使用SchedulerLock定义了每晚 12 点开始检查并自动发送,由于 SchedulerLock 在集群下的问题,这里使用了 shedlock 加锁,使得只有一个实例会执行该代码。这样就完成了自动修复与手动修复的接口暴露。

view 和 aggregate 之间的不一致补偿

在并发比较高的时候,可能会出现同时 update 一条记录的情况,这个时候需要上锁,JPA 可以自动创建并管理乐观锁,乐观锁会在 update 同一条记录的时候直接返回一个错误,我们只需要在 entity 上加上 version 字段即可:


@Entity@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class ContractView implements ContractInterface {
@Id @Column(length = 64) private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;
private String industryName;
private long sequenceNumber;
@Version private Long version;}
复制代码


某些情况下我们可能还需要手动干预 aggregateview 之间的一致性问题,比如在线上运行中莫名其妙有一个 id 为 xxx 的数据出现了不一致的情况,可能是在某些极端情况下消费端出现了逻辑错误导致了数据错误,但是又很难找到这批数据的特征。这个时候其实可以写一个 DataRepair 的 API,专门重新生成 view,或者说范围性的重新生成 view(因为真正在生产环境全部重新生成 view,必定是很耗时的一件事情),所以如果之前的 view 层代码是通过读取 aggregate 状态之后做映射的话,这里就会方便很多。比如:


@Service@AllArgsConstructorpublic class ContractViewService {
private final QueryGateway queryGateway; private final ContractViewRepository contractViewRepository;
public void updateViewFromAggregateById(Long aggregateIdentifier, Instant time) {
QueryContractCommand command = new QueryContractCommand(aggregateIdentifier, time); ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join(); ContractView view = contractViewRepository.findById(aggregateIdentifier).orElse(new ContractView());
ContractAggregateViewMapper.mapAggregateToView(aggregate, view); contractViewRepository.save(view); }}
@Component@AllArgsConstructor@Transactionalpublic class ContractEventHandler {
private final ContractViewService contractViewService;
@StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) { contractViewService.updateViewFromAggregateById(event.getIdentifier(), domainEvent.getTimestamp()); }}
@Slf4j@AllArgsConstructor@RestController@RequestMapping("/repair")public class DataRepairController {
private final ScheduleService scheduleService;
private final ContractViewService contractViewService;
private static final String SECRET = "e248b98418db4cdcb069e8a1c08f6bb7";
@GetMapping("/message") @Async public void repairMessage(@RequestParam("secret") String secret) { if (!StringUtils.equals(secret, SECRET)) { return; }
scheduleService.failedMessageDiscovery(); }
@PostMapping("/aggregate") @Async public void repairAggregate(@RequestParam("secret") String secret, Long aggregateIdentifier) { if (!StringUtils.equals(secret, SECRET)) { return; } contractViewService.updateViewFromAggregateById(aggregateIdentifier, Instant.now()); }}
复制代码

事件失败之后的补偿

updateContractView的时候可能会出现各种异常,由于 view 是消费消息处理的,所以重试机制就在 SCSrabbitmq 这里了,默认是重试三次。

实现分布式 CommandBus

为什么需要分布式

前面我们也提到过,为了防止资源争夺等问题的出现,最好尽可能的保证同一 aggregate 的内容让同一个 service 去处理。这个时候就需要分布式 command bus 了,我们知道 Spring Cloud 都是以 http 通讯的,这种一般请求的分布不受我们控制,好在 Axon 框架为我们提供了和 Spring Cloud 融合的功能,下面就看看具体怎么实现。

Spring Cloud Connector 实现

Spring Cloud Connector 实际上就是在每个节点用 ServiceInstance.Metadata 记录了自己的 routing 规则来让别的节点知道如何去做 routing ,但是在某些服务发现的实现下 ServiceInstance.Metadata 是不可编辑的,这个时候就会在生成一个 API 来返回策略(理论上效率应该低很多,毕竟要走下请求)。Axon 对配置支持的也比较好:


    <dependency>      <groupId>org.axonframework.extensions.springcloud</groupId>      <artifactId>axon-springcloud</artifactId>      <version>4.1</version>    </dependency>
复制代码


然后配置文件中将分布式 command 打开就好了:


axon:  serializer:    general: jackson  distributed:    enabled: true    spring-cloud:      fallback-to-http-get: true      fallback-url: /axon-routing
复制代码


完整的例子 - branch session8


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


《Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化》


《Event Sourcing 和 CQRS 落地(六):实现可靠消息》


2019-07-08 10:307978

评论

发布
暂无评论
发现更多内容

MatrixOne 助力开启分布式计算格局新征程

MatrixOrigin

分布式数据库 HTAP MatrixOrigin MatrixOne 矩阵起源

辅助测试和研发人员的一款小插件【数据安全】 | 京东云技术团队

京东科技开发者

浏览器 数据安全 插件开发 企业号 5 月 PK 榜

电商行业实践专栏上线|阿里巴巴风控实战如何解决大规模风控的技术难点?

Apache Flink

大数据 flink 实时计算

大语言模型技术原理

NineData

AIGC ChatGPT AI大语言模型 大语言模型 技术原理

从7天到1天,Kyligence 和亚马逊云科技助力欣和提高数据应用价值

Kyligence

数字化转型 指标平台

房地产行业IT运维安全就用行云管家堡垒机!

行云管家

运维 房地产 IT运维

Spring Boot实现第一次启动时自动初始化数据库

做梦都在改BUG

Java spring Spring Boot

阿里大神级Elasticsearch学习笔记,还学不会就埋了

做梦都在改BUG

Java elasticsearch 分布式搜索引擎 ES

Flutter三棵树系列之BuildOwner | 京东云技术团队

京东科技开发者

flutter 移动开发 源码解读 企业号 5 月 PK 榜 BuildOwner

医疗领域实体抽取:UIE Slim最新升级版含数据标注、serving部署、模型蒸馏等教学,助力工业应用场景快速落地

汀丶人工智能

人工智能 自然语言处理 知识图谱 关系抽取 命名实体识别

太赞了,京东研发一哥力荐的高可用网站构建技术

做梦都在改BUG

Java 架构 京东

Mysql DDL执行方式-pt-osc介绍 | 京东云技术团队

京东科技开发者

MySQL 数据库 企业号 5 月 PK 榜 DDL执行方式 pt-soc

Solaris Network:BSC上首个链上合成资产解决方案

鳄鱼视界

Git入门指南:从新手到高手的完全指南

小万哥

git Linux 程序员 后端 C/C++

莉莉丝游戏与火山引擎ByteHouse达成合作,为实时数仓建设提速

字节跳动数据平台

数据仓库 云原生 实时

Kafka集群是如何选择leader,你知道吗?

做梦都在改BUG

Java kafka 集群

小程序容器与PWA的完美结合:提升应用性能与用户体验

FinFish

私有小程序技术 小程序容器 PWA 小程序化 小程序技术

双非渣硕,开发两年,苦刷算法47天,四面字节斩获offer

做梦都在改BUG

Java 数据结构 算法 LeetCode

内核调试环境搭建

郑州埃文科技

网络安全 网络环境

500行代码手写docker-实现硬件资源限制cgroups

蓝胖子的编程梦

容器 k8s ,docker Cgroups #k8s

全国流体力学盛会召开,飞桨AI4S携最新科研进展亮相西湖大学

飞桨PaddlePaddle

人工智能 百度飞桨 科学计算

国内好用的堡垒机推荐-行云管家堡垒机

行云管家

网络安全 堡垒机

精准快速搜索文件:Find Any File 激活版

真大的脸盆

Mac 办公效率 文件搜索 搜索工具 搜索文件

如何将千亿文件放进一个文件系统,EuroSys'23 CFS 论文背后的故事

百度Geek说

数据库 云计算 百度 企业号 5 月 PK 榜

如何避免写重复代码:善用抽象和组合

阿里技术

Java 代码实战

kafka集群是如何选择leader,你知道吗?

JAVA旭阳

kafka

软件测试/测试开发丨学习笔记之Web自动化测试

测试人

程序员 软件测试 自动化测试 测试开发

ByConity与主流开源OLAP引擎(Clickhouse、Doris、Presto)性能对比分析

墨天轮

数据库 字节跳动 OLAP Clickhouse Doris

Event Sourcing 和 CQRS落地(七):服务优化_文化 & 方法_周国勇_InfoQ精选文章