在要求严格顺序消息的场景下,消息的发送者,BROKER 端(BROKER 端和消息存储放在一起),消息的消费者都要求按照顺序进行,三者任何一个环节的乱序都会导致消息最终的消费顺序被打乱。
如果为每一个消息维护一个有序的 ID,发送和存储消息无序,消费逻辑会变得非常复杂,消费端要对消息进行重新编排,会影响消费的性能。
为了保证消息发送、保存、消费三个环节都有顺序,就要求在同一个时刻只能有一个同步发送消息的线程,消息必须按照接收到的顺序进行保存,消息的消费也只能由一个线程处理。
发送端,消费端为了高可用需要部署多个实例,然后再通过一个协调者,比如 ZOOKEEPER 等,控制单个实例工作,其他实例处于待命状态。当工作实例发生了故障,协调者就会唤醒待命的实例进行工作。由于发送端、消费端实例是无状态的,切换工作实例不会产生乱序的问题。消息保存的 BROKER 端是一个有状态的应用,如果部署多个实例,当发生故障时,由于故障实例上可能还有未消费的消息就不能进行切换。
在一些要求数据不丢失、必须有序、BROKER 高可用的场景下(比如跨数据中心数据库表的同步,需要按照数据库 LOG 顺序回放到另一个数据中心,数据乱序或者丢失信息都可能导致两个数据中心的数据不一致),BROKER 往往采用 MASTER-SLAVE 同步双写,或者同一个消息被同步写到多台机器上,为了保证服务宕机等情况下消息不丢失,有的业务要求每条消息都落到磁盘上。如果采用同步写多份会严重影响性能,如果采用单组 MASTER-SLAVE 的结构,当 MASTER 宕机后,SLAVE 成为新的 MASTER 可以接受发送者的消息,但是无法满足数据任一时刻都有两份的要求。
我们现在需要一种设计方案,在保证数据可靠性的条件下性能尽可能的高,同时满足任一时刻数据至少写入 2 份。
下面提供一种 BROKER 高可用,又能满足数据任一时刻都有两份的方案 :
- 采用 MASTER-SLAVE 结构方式,同步写入消息(消息允许重复),MASTER-SLAVE 上的消息在逻辑上保持一致;
- SLAVE 在 MASTER 宕机后不接受发送请求,但可以进行消费;
- 一个消息队列分配两组以上的 BROKER 组(一个 BROKER 组由 MASTER-SLAVE 组成),BROKER 组的集群信息在协调者上保存为一个单向的链表,消费者和发送者各有一份独立的链表数据。有消息的 BROKER 组一定会按受理发送请求的先后顺序保存在消费者对应的链表上,消费者只能从链表表头的 BROKER 组上消费,当 BROKER 组上的消息消费完且不为当前受理发送请求的 BROKER 组则从消息链表中移除;
- 没有积压消息的 BROKER 组才能被添加到发送链表的表尾,当有 BROKER 组发生故障时会从 BROKER 组中移除,移除的 BROKER 组必须保证没有积压消息后才能被添加回链表;
- 只有发送链表表头的 BROKER 组才能接受发送请求,同时新切换为受理发送请求的 BROKER 组会添加到消费链表的表尾。
异常处理流程:
- BROKER 组有机器宕机则从发送链表中移除;
- 当新 BROKER 组被挑选为当前发送者,则把该组 BROKER 添加到消费链表的表尾;
- 当异常 BROKER 组的消息消费完成则从消费链表表头移除;
- 当 BROKER 组机器都恢复正常,且没有可以消费的消息则添加到发送链表的表尾。
(点击放大图像)
具体的处理流程描述如下所述。
发送者处理流程
正常情况下,我们可以采用单组MASTER-SLAVE 结构的集群方案,MASTER 接收到发送者的消息后同步转发给SLAVE。发送者只有接收到MASTER,SLAVE 都写入成功的信息才算成功,否则这条消息需要发送者再次进行发送。但是当有一台机器发生故障时这个集群无法满足MASTER,SLAVE 都写入成功的条件。这个时候我们需要把发送者的发送请求FAILOVER 到其他的集群上。如果只是简单地进行发送请求的切换,如果切换到的BROKER 集群上有未消费的消息就可能破坏数据的顺序要求。同时消费者还必须知道发送者切换的过程,否则消费者无法知道自己应该先从哪个BROKER 集群上消费,一旦获取消费的BROKER 集群顺序与发送时的顺不一致,顺序性就会被破坏。我们需要记录好发送到不同BROKER 集群的先后顺序,消费者按照记录的顺序进行消费。
如果BROKER 集群发生过切换,当前接受请求的BROKER 集群可能和消费者当前应该消费的集群不同,需要对发送者和消费者单独维护当前应该使用的集群信息。
BROKER 集群发生故障后怎么通知发送者,可以有多种方式,比如由 ZOOKEEPER 协调,或者由客户端处理。我们可以采用发送者来处理 BROKER 集群故障的问题,当发送者感知到发送失败或者连接失败时向协调者发起请求,由协调者返回当前可用的 BROKER 集群。
协调者判断 BROKER 集群是否可以接收新的消息,除了要判断 BROKER 是否存活外,还需要查询其是否有未消费的消息,只有集群上没有可消费的消息时才能接收新的发送请求。因此协调者需要知道每个 BROKER 集群上存放的消息情况。我们可以在 BROKER 集群被选中为可以接收发送请求时,标识其为有未消费消息的状态,当消费者把上面的消息都消费完成后,由该 BROKER 集群向协调者汇报自己已经消费完成。如果该集群服务都不可用时,无法汇报自己的消息积压情况,协调者会一直标记其为有未消费的消息,直到该集群服务恢复后,汇报完是否存在有未消费的消息。
(点击放大图像)
消费者处理流程
消费者需要消费消息时,先从协调者上获取当前应该获取消息的BROKER 集群,当消费完成时,BROKER 集群会向协调者汇报自己已经没有积压消息了。协调者接收到汇报后就把当前BROKER 集群从需要消费的列表中移除。消费者从一个集群上获取不到消息后会再次请求协调者,获取下一个可以消费的集群信息,从新的集群上继续消费消息。
协调者处理流程
当协调者接收到发送者的请求时,先查看发送列表中是否存在可用的集群,如果没有就会检查消息分配的所有集群,把满足条件(消息无积压,MASTER-SLAVE 都工作正常)的集群加入到可发送集群列表中。如果也没有找到可用集群,那么发送者会被阻塞,直到找到可以使用的集群。
当集群被选为当前可用集群时,需要在未返回给发送者之前把该集群信息同步添加到消费集群列表中,防止协调者出现故障时,消费者获取不到这个集群的信息,被跳过导致消费乱序。
当协调者接收到消费者的请求时,协调者只需要把消费集群列表表头第一个集群返回给消费者就可以了。消费者消费完消息会通知相应的BROKER 集群,该集群感知到消息都已经被消费后马上汇报给协调者,协调者收到汇报信息就会把该集群从消费集群列表的表头移除。
(点击放大图像)
如何控制单个实例发送
上面主要描述了对BROKER 集群的控制,防止消息由于BROKER 集群调度顺序不对导致消息乱序。
顺序消息还需要满足发送者顺序发送,消费者顺序消费,通常为了保证应用的高可用。我们会对发送者和消费者部署多个实例,当一个实例发生异常宕机时,其他的实例可以继续工作,防止单点故障。对于顺序消息同一个时间点只能有一个线程在工作,单个实例只启动一个线程进行发送和消费,只需要编写代码的时候控制就可以做到,但是当我们把应用部署为多个实例时,实例之间就需要一个协调者,保证每次都只有一个工作实例。
发送者启动时先注册一个ZOOKEEPER 的监听事件,通过ZOOKEEPER 选举出来一个LEADER,只有拿到LEADER 权限的发送者实例才能够发送消息,没有取到LEADER 权限的发送者需要马上中断发送消息的线程。消费者应用可以按照上述方案进行相同的处理。
注意事项
MASTER-SLAVE 集群中单台机器接收到消息,发送者视为发送失败,可能存在消息重复发送,SLAVE 成为 MASTER 后继续接受消费请求,消费者可能取到已经消费过的消息,因此需要业务逻辑做可以重复消费的处理。
如果有积压的消息,MASTER 和 SLAVE 同时宕机,由于顺序的要求,消费者会被阻塞,不能继续进行消费,虽然这种情况极少发生,还是需要注意。消费者被阻塞,但是不会影响发送者,只要有可以接收消息的 BROKER 集群,发送者可以继续进行工作。
主从之间同步复制消息也需要保证顺序处理,避免 SLAVE 上消息的顺序与 MASTER 上的顺序不一致。
单个线程发送和消费,在一些业务场景下可能不能满足性能需求,用户可以根据自己的业务逻辑,把没有顺序要求的业务进行拆分,分成不同的消息类型进行发送,单个消息类型保证顺序。
作者简介
丁俊, 有 9 年工作经验,目前就职于京东商城云平台,为消息中间件研发小组 leader,主要负责公司内部高性能、高可用消息中间件的架构。
感谢丁晓昀对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ , @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。
评论