延迟队列,简单来说就是在普通队列之上增加延迟的属性,使得消息在指定的延迟时间后被投递到目标队列。其适用场景如下:
超时处理:订单支付超时关闭,在下单时投递一条延迟消息,并在指定时间后消费消息并关闭订单;
异常重试:业务处理逻辑出现异常,需要在一段时间后进行重试,可投递一条包含重试内容的延迟消息。
在未实现 Kafka 延迟队列之前,公司内部实现延迟队列的方式不一,其实现方式主要有以下几种:
定时任务扫表,每分钟扫表一次,查询一小时内超过 10 分钟未支付的订单;
RabbitMQ 自带的延迟消息特性;
基于 Redis 的 zset 实现延迟消息。
但随着业务的快速增长,以上方式逐渐暴露出了不同问题:
定时任务扫表:延迟时间长,有接近 1 分钟的延迟,而且频繁扫表增加数据库压力;
RabbitMQ 自带的延迟消息:在日益增长的业务背景下,RabbitMQ 的吞吐量劣势愈发明显,无法满足业务需求;
基于 Redis 实现的延迟消息:消息未经持久化,不支持消息回溯,且存在丢失风险,对于可靠性要求较高的业务不适用。
基于以上问题,也出于技术架构的统一,我们决定调研或自研一款通用的高吞吐、低延迟的延迟队列,来满足业务不同场景的需求。
解决思路
当前比较常规的低延迟的延迟队列实现主要有两种:通过 消息队列(MQ) 实现 以及 通过 Redis 来实现。由于 Redis 的持久化机制导致其对消息的可靠性无法保证,不予考虑,因此主要考虑使用 MQ 来实现延迟队列。常用的几款 MQ 的对比如表 2-1 所示:
表 2-1 消息中间件对比
在前文已经提到,由于 RabbitMQ 的吞吐量太低,无法满足日益增长的业务需求,所以不予考虑。从表 2-1 可以看出,RocketMQ 和 Kafka 的吞吐和消息延迟都很低,而且 RocketMQ 还已经支持延迟队列,最高版本还支持任意延迟等级的延迟队列,能够满足业务高吞吐、低延迟的要求,理应是最佳选择。但最新版本的 RocketMQ 如果选择支持任意延迟等级的功能,其吞吐量会相较于之前下降一倍以上,另外目前公司内部未使用 RocketMQ,引入新的中间件会引入额外的风险以及增加运维成本,所以这个选项也先被搁置。
目前公司内部主要用的消息队列是 Kafka,Kafka 在吞吐和延迟上能够满足业务对延迟队列的需求,但是它不支持延迟队列,因此我们考虑是否可以基于 Kafka 来自研一套延迟队列。基于 Kafka 实现延迟队列虽然要自己实现延迟消息的管理与投递,但它的优势有:
(1) 延迟队列服务可以独立于 Kafka broker,对 broker 不会引入额外的压力;
(2) 延迟队列服务是可以横向扩容的,理论上可以支持任意级别的延迟时间,而且还能保持高吞吐特性;
(3) 基于 Kafka 实现延迟队列,无需引入新的中间件,无额外的运维压力;
(4) 自研的延迟队列更易于维护和更新迭代。
基于以上考虑,我们决定基于 Kafka 自行实现一套延迟队列。
实现方案
核心原理
延迟队列的核心实现原理如图 3-1 所示,其原理与 RabbitMQ 以及 RocketMQ 旧版实现相似,都是针对不同的延迟等级创建队列,增设调度服务用于定时消费带有延迟属性的队列,并将消费到的数据投递到目标队列。这里每个队列都是对应一个个 Kafka topic,业务服务将消息投递到对应的队列中(也就是不同延迟等级对应的 topic 上),由延迟调度服务将各个队列中的数据准时地投递到业务实际目标的 topic 中,之后业务服务再消费目标 topic 中的数据进行下一步处理。
图 3-1 延迟队列实现原理
整体架构
延迟队列实现的整体架构图如图 3-2 所示。其中核心部分主要有 3 个模块:Manager 服务、延迟调度服务 以及 SDK(延迟队列客户端)。它们的各自负责的功能介绍如下:
Manager 服务
延迟队列的管理平台,负责对整个延迟队列进行监控和管理,如业务接入、队列健康情况监控等等。
延迟调度服务
延迟调度服务是延迟队列实现的核心,负责将延迟等级队列中的数据准时地转发至业务实际 topic 中。为了让业务之间不互相影响,在队列之上引入了分组的概念,每个业务域可以独立配置一个组,组与组之间的延迟队列是相互独立的。延迟队列的每个延迟等级都与 Kafka 的 topic 相对应,是 1:N 的关系,确保每个延迟等级的吞吐可以无限扩展。延迟调度服务在运行过程中会上报关键指标(转发延时、各节点处理耗时等),助于问题排查和稳定性监控。
SDK(延迟队列客户端)
延迟队列客户端是用于协助业务将需要延迟处理的数据写入到延迟队列中,这里将延迟队列客户端封装为 SDK,方便业务接入。
图 3-2 整体架构图
核心设计点
消息高效转发与低延迟
延迟队列实现的核心点是要将业务写入到不同延迟等级的队列中的数据准时地投递到目标 topic。这里「准时」的定义是在消息延迟时间未到时不能进行消息投递,当延迟时间到达时,要快速将数据投递到目标 topic,从而降低延迟时间。
最初我们实现数据转发的方案是通过 KafkaConsumer 的 pause、resume 来对分区消费的暂停、恢复。kafka-clients 提供了两个 API:KafkaConsumer#pause、KafkaConsumer#resume,通过这两个 api 能够对未到达消息发送的分区进行暂停,并在发送时刻唤醒转发。后来经过测试发现,进行 pause 后 resume 的分区并不能及时消费到消息,而是在 resume 后的 100~500ms 才能获取到消息,这大大增加了延时时间,不满足「低延迟」的要求。
通过对 Kafka-clients 的源码和版本分析发现:在 kafka-clients 2.3 及以下版本中,对分区进行 pause 后,本地缓冲区中的分区消息失效,再进行 resume 时,需要等待缓冲区消息消费完成后重新 fetch 消息,增加了网络消耗,耗时远大于本地内存处理。在 kafka-clients 2.4 及以上版本中,调整了这部分逻辑,pause 的分区在过滤后重新放回本地缓冲区,在未触发重平衡的情况下,resume 后仍可以从内存中消费。图 3-3 列出了 kafka-clients 2.3.1 版本和 2.4.0 版本代码的差异(其中左图为 2.3.1 版本,右图为 2.4.0 版本)。
图 3-3 kafka-clients 差异代码对比
发现问题后,我们采用了另一种方式:线程内休眠等待。这种实现方式有两个注意点:
休眠时间的长短设置。休眠时间过长,会导致延迟上升。休眠时长过短,会导致频繁无效拉取,导致资源浪费;
消息投递的准确性。正常情况下一个消费组会消费同一个队列的多个分片,这里如果消费线程出现休眠等待,会导致所有分片的数据无法被投递。
针对休眠时间的问题,为了能够降低延迟时间,我们采用根据最近一条待投递的消息的投递时间距离当前时间的长短来确定休眠时间。另外为了避免过长的休眠时间导致消费组发生重平衡,所以也加入了固定时长休眠机制。通过这两种机制结合确保低延迟,同时也不会造成重平衡。
针对消息投递准确性问题的实现方案是让每个 topic 都只有一个分片,每个 topic 对应一个消费组。这样如果第一条数据都不需要被投递,那么这个队列就没有消息需要被投递,也不存在该被投递的消息因为休眠而无法被准时投递出去的问题。另外在配置关系上将延迟队列等级与 topic 的对应关系改为 1:N,可以解决由于每个 topic 只有一个分片而引起的写入性能瓶颈。这样做的好处不仅仅是确保了消息的准确投递,理论上通过增加延迟调度服务和扩展队列对应的 topic,能够让延迟队列支持任意时间的延迟等级,以及支持每个延迟等级的吞吐无上限。
支持消费任务的负载均衡
上文提到可以通过增加延迟调度服务的实力以及扩展延迟队列对应的 topic 将对应的延迟级别以及吞吐提升上去。但当延迟调度服务的实例和 topic 数增加到一定程度后就会出现一个问题,就是各个延迟调度服务分配到的 topic 会出现不均衡的情况,如图 3-4 所示。这样会导致即使增加延迟调度服务的实例,整个系统也会出现性能瓶颈。
图 3-4 topic 分配不均
因此需要由一个支持消费任务负载均衡的方案,将多个 topic 的消费任务均匀地分配给各个延迟调度服务实例中,确保各个延迟调度服务之间压力近似。具体方案的实现逻辑如图 3-5 所示:
图 3-5 负载均衡实现逻辑
流程说明:
延迟调度服务实例启动后,需要向 ZK 注册临时节点;
延迟分配策略由其中一个延迟调度实例完成,各个延迟调度服务实例通过抢占锁的方式来确定由谁执行;
负责执行调度策略的实例根据当前需要消费的总 topic 数以及延迟调度实例数,在结合队列类型,将 topic 分配给各个实例,并将结果写入 zk 中;
各延迟调度服务实例监听到分配结果变更后就会将结果拉取下来,并启动对应的消费组消费对应 topic 的消息;
当有新的延迟调度实例加入或者老的宕机后,会重新开启新的一轮分配。新的分配逻辑与之前近似,只是会结合当前已有分配情况进行重新调整,避免大面积的消费组重启。
消息可靠性的保证
作为一款通用的延迟队列,我们需要提供消息可靠性的保障,确保数据不丢失。为此,基于 Kafka 特性,延迟队列对外提供两种语义的配置:
至少一次:确保消息不丢且保证有序,但有可能重复投递;
有且仅有一次:确保消息不丢且不重复投递,并保证有序。这种方案在吞吐上有较多的损耗,且该语义是基于 Kafka 事务机制来实现,包含了 Kafka 事务缺陷。
除此之外,为了保证数据不丢失,还对于延迟调度服务进行了以下优化:
生产者 acks 设置为 all ;
消费具有延迟属性的延迟消息,offset 在消息成功转发回调中处理提交。这样即使在断电等极端情况下,仍能在系统恢复后继续从上次成功发送的记录后开始消费;
当出现异常时,回退 offset 并等待后重试。打印日志,触发告警,人为介入。若为网络原因,则自行恢复;
优雅关闭,关闭前处理完当前内存中的消息再关闭。
线上实践效果
(1) 单队列可支撑的吞吐量
目前单队列(队列对应的是一个单分片的 topic)在延迟转发服务实例配置为 2 核 4G 的情况下进行压测,结果如表 4-1 所示(数据仅供参考,未进行参数调优):
表 4-1 吞吐量压测结果
(2) 目前业务接入量
目前该延迟队列服务上线稳定运行接近 5 个月,有近 40 个服务正在使用,已配置 16 个延迟时间(可动态增加),如图 4-2 所示:
图 4-2 业务接入情况
(3) 毫秒级别的转发延时
这里转发延时表示队列中的消息应当被投递的时间与真实被投递的时间之间的差异。目前通过监控可以看到,各个队列里消息的转发投递延迟都在 1 ms 以内,如图 4-3 所示。
图 4-3 消息延迟指标
展望
目前延迟队列已在线上稳定运行了一段时间,一些基本功能特性都已经具备并能稳定提供服务。但当前我们实现的延迟队列包括目前市面上较多的延迟队列都使用“给单个队列赋予固定延迟时间”的方案,这种方式的缺点是想要支持任意延迟等级的延迟队列需要不停地创建 topic 去支持,最终会导致 topic 无限膨胀。
在日益复杂的业务背景下,任意延迟等级的队列也会被提上议程。因此后续我们将优化任意延迟等级的特性的实现。此外我们还会在易用性以及高可用性上进行相应建设,并不断丰富产品特性,为业务稳定发展提供有力支持。
评论 5 条评论