时常会思考消息队列的价值是什么?新人加入团队后该如何理解消息队列?又该如何理解小米的自研产品 Talos 和 EMQ?
鉴于这些考虑,我把对消息队列的理解做一个简单总结,希望能帮助感兴趣的同学了解 Talos/EMQ 的价值和定位,以及它在企业架构中扮演着什么样的角色。
作者批注:思考手上的工作,找到它的价值和定位,就找到了工作的目标 — 努力将价值最大化
一、日志与消息队列
说到消息队列,就不得不说一下日志 。
作者批注:本质上日志与消息队列都可以抽象成 Pub/Sub 的模式
Jay Kreps (Confluent CEO,Kafka 核心作者) 在《The Log: What every software engineer should know about real-time data’s unifying abstraction》[1] 中系统性描述了日志的价值和重要性,指出了日志特定的应用目标:它记录了什么时间发生了什么事情(they record what happened and when)。而这,正是分布式系统许多问题的核心。
作者批注:《日志:每个软件工程师都应该知道的有关实时数据的统一抽象》据说是史诗般必读文章
这个“按时间天生有序”的特性让日志解决了分布式系统中的两个重要问题:修改操作的排序和数据分发(ordering changes and distributing data),这为并发更新的一致性和副本复制提供了基础。分布式系统中为了保证各副本的一致性,协商出一致的修改操作顺序是非常关键且核心的问题,利用日志天生有序的特性可以将这个复杂的问题变得简单。
我们来看一个不太严谨的例子:假设系统有三个副本,都存储着 A=1 的初始值,某一时刻,要执行一个加法乘法的操作序列对 A 的值进行修改:"+1"、"*3"
假设 Primary 收到两条指令后,对其他副本依次广播了 “+1”、"*3",由于网络的不确定因素,第一个副本收到的指令为 “*3”、"+1",第二个副本收到的指令为 “+1”、"*3",这就会带来副本的一致性问题;
如何解决这个问题呢?答案是日志,利用日志将并发更新进行排序,所有副本从日志中按顺序读取更新操作,应用到本地,就可以将这个复杂的问题简单化。
如图,Primary 依次进行 “+1”、"*3" 的操作,并写入日志,利用日志做修改操作的“数据分发”,使得各副本能够在本地应用完全相同的操作序列,从而保证各副本数据的一致;
作者批注:本质上是是把多台机器一起执行同一件事情的问题简化为实现分布式一致性日志,通过日志的 Pub/Sub 保证多台机器对数据处理的最终一致
上面的例子能很好的说明为什么顺序是保证副本间一致性的关键,而日志为此提供了基础和载体。让我们进一步思考和联想:
Primary 将各种操作通过日志序列化,各 Replica 从日志中读取并应用到本地状态,这种解决问题的模式也叫 Pub/Sub,即抽象成通用的数据订阅机制,而将这种抽象产品化,就是消息队列。
二、消息队列的应用价值
消息队列作为大型分布式系统的关键组件,在实时数据或流式数据架构中扮演着重要角色,它通常被应用在系统解耦、异步处理、流量削峰,分布式事务/金融电商等场景中,接下来我们分别从这几个场景浅谈消息队列的应用价值。
1. 数据集成与系统解耦
如果说日志为解决分布式一致性问题提供了基础,那么同样是 Pub/Sub 模式的消息队列,则为琳琅满目的数据系统之间协作提供了一件利器,大大简化了数据集成的复杂度(O(N^2) 变为 O(N)),提升了数据流转的效率,加速了数据价值展现;
什么是数据集成?引用 Jay Kreps 的文章:
Data integration is making all the data an organization has available in all its services and systems.
数据集成即将一个组织所拥有的数据,使其在所有的服务和系统中都可用。
那么数据集成是解决什么问题?使用消息队列又是如何加速数据集成的?我们看一个案例。
不少业务都有这样的场景:随着业务量的爆发,为理解用户行为,需要收集日志保存到 Hadoop 上做离线分析;为了能方便定位问题,同时把日志导一份到 ElasticSearch 做全文检索;为了给老板查看业务状况,需要将数据汇总到数仓中提供统计报表;此外还需要进行一些实时的流式计算…一个业务系统需要同时与多个大数据系统交互,一段时间后,团队有了新的业务,新业务系统又重复上面的事情,对接各种系统,如下图,最终的结果是系统架构盘根交错,快乐与痛苦齐飞。
可以看到,上面案例本质上是一个数据集成的需求,但数据集成的难处就在于需要面对:1)越来越多的数据;2)越来越多的数据系统;
为什么会有这种现象,只能说大数据时代下,很难有一个系统能够解决所有的事情,这就导致了业务数据因为不同用途而需要存入不同的系统,比如上面说的检索,分析,计算等;于是,数据集成的挑战就变成了不同系统间繁杂的数据同步,有多复杂?M 个业务,N 个数据系统:
1)所有业务都需要关心到每一个系统的数据导入,复杂度 M*N ;
2)架构复杂交错,各个链路容易互相影响。比如一个业务数据写入 A 系统失败,一般会影响 B 系统的写入;
3)出现问题后很难定位,且容易丢失数据,数据恢复也变得困难。
如何解决上面这些问题,从而提高集成的效率呢?还是 Pub/Sub 的思路,引入消息队列做数据分发,架构如下:
这样的架构业务只需要关心如何将数据导入消息队列即可,好处不仅如此:
1)由于消息队列的解耦,架构复杂度大大降低,各系统间的数据同步不会互相影响(各系统无需知道彼此的存在);
2)由于消息队列的持久化,不易丢失数据,也可以在必要时进行数据重放;
3)由于所有数据都汇总到消息队列中,数据集成非常高效,可以快速接入新的数据系统而不影响存量架构。
作者批注:解耦是消息队列解决的最本质的问题,价值所在。
如此,我们看到消息队列在提高数据集成效率上起到了关键作用,实际上,基于消息队列构建一个稳定可靠的、连接所有数据系统的数据流是非常关键的,小米数据流系统便是在做这样的事情;借用 Jay Kreps 的话来说明数据集成平台的重要性:很多组织没有完整可靠的数据流基础平台,却在过多的关注上层数据模型,这是本末倒置的表现.
作者批注:做好消息队列和数据集成平台的重要性不言而喻,价值所在。
2. 异步处理与事件驱动
异步处理本质上是解耦系统间的 RPC 调用,将那些依赖其他系统,同时对时效性要求不是很高的逻辑剥离出来,提高系统吞吐和响应时间,降低系统耦合的复杂度;异步处理注重的是“通知”,事件驱动;
作者批注:能够异步处理的逻辑一般是不必在当时那个时间点拿到结果,有个通知就行
举一个例子来说明,用户注册为了验证真实性,会向注册邮箱发送验证邮件,只有验证过的用户才能行使注册用户的权利。同步的处理/调用过程一般如下:
采用同步调用的问题是:一方面用户看到注册成功需要等待发送邮件成功,延长了整个流程的耗时;另一方面如果发送邮件失败会影响整个注册流程;
对于注册流程来说,系统不需要等待发送邮件成功才告知用户注册成功,即发送邮件是否成功这个结果在注册的时间点并不是必须要拿到的,注册逻辑只需要“通知”下游产生了一个注册事件即可;
引入消息队列异步处理的流程如下,发送验证邮件的动作由订阅消息队列的下游系统/逻辑来处理:
使用消息队列异步处理的好处是:
1)简化业务逻辑,更快的返回结果,提升响应效率和系统吞吐,改善用户体验;
2)即使异步逻辑的邮件系统出问题,也不会影响注册流程,降低系统耦合度。
上述案例是一个异步处理非常典型的问题,同时也足够简单,在一些复杂的场景中比如电商,引入消息队列的作用就更加明显了,比如用户下单后,需要将订单信息推给配送系统安排配货和物流,需要短信通知用户下单成功,需要发送给大数据系统做统计分析,需要给推荐系统做计算 … 如果采用同步处理:
1)用户的下单流程会非常长,且任何一个系统有问题都会导致下单失败;
2)随着业务复杂度提高,要适配和接入的系统会越来越多,这几乎是反人类的;异步处理可以让问题变得简单,如下图,既能让下游系统实时的处理订单信息,也能简化架构复杂度,提升下单效率。
异步处理更多的是事件驱动,引入消息队列做事件的转述与投递,从架构上也会达到解耦的功效;这种应用处处可见,除去上面说的准实时的场景,还有类似保险投保有一个犹豫期,保单的延时生效(延时队列)这种延迟处理场景;事实上,随着云原生技术的发展,Serverless 日益流行,Serverless 的核心是事件驱动,而事件驱动的核心是一套稳定、靠谱的 Event-Streaming 系统:消息队列。
作者批注:大型分布式架构的数据总线、微服务中隔离直连的异步调用,Serverless 的事件驱动,处处都有消息中间件的身影,可谓价值所在
3. 流量削峰
电商大促、秒杀、节假日抢票等场景都是对系统服务的巨大挑战,这类场景的特点就是瞬时并发的流量 / QPS 非常高(海量用户同一时间访问),很容易将后端服务打挂。
为什么要削峰?归根结底主要是两个原因
1)后端服务的处理能力往往是有限的,无法跟前端对等,比如电商后台的数据库,比如短信系统的网关,同样的机器能够支撑的并发量无法跟前端处在同一个数量级,从成本考虑也无法无限制扩展;
2)后端服务对到达自身的流量是不可控的,只要上游调用不限流,后端就随时暴露在流量洪峰的压力下。
消息队列之所以能承担削峰的角色,也是因为能解决上面的问题:
1)消息队列高吞吐的特性可以在不高的成本下承接住巨大的流量,起到蓄水池的作用,拦截上游洪水;
2)后端采用拉取的模式消费,可以按照自身的处理能力进行流控,平稳地从队列中消费,从而达到保护自身,避免被压垮的目的 。
4. 事务消息与分布式事务的最终一致
消息队列中的事务是什么?解决什么问题?
消息队列中的事务是指 “业务执行本地事务” 与 “消息发送到消息队列” 的原子性。事务消息跟普通消息的区别是,在事务提交前,这个消息对消费者来说是不可见的;基于事务消息,消息队列最终解决的是生产者与消费者在分布式事务场景中的一致性问题。
作者批注:消费端对数据的处理逻辑一般是幂等的
例如在电商系统中,为了提高效率,降低系统耦合,通常会对订单流做异步拆分,但有时异步处理的某个事件需要上下游系统保持一致,这就带来了分布式事务的问题。
我们以小米有品的一个真实场景“优惠券核销”来说明,双十一和米粉节有品发放了一堆优惠券,使用优惠券购物下单成功后,被使用的优惠券就要删掉,如果下单没成功,优惠券就不应被删掉,即下单成功与优惠券核销要保持一致。
如上,引入消息队列做异步处理,但使用非事务消息可能会有如下情况:
1)创建订单成功,写消息失败导致使用的优惠券没有被删掉;
2)订单因支付失败没有创建成功,优惠券却被删除了;
作者批注:以后用券购物,可以洞察背后系统的事务逻辑是不是靠谱了 :-)
这就是事务消息要解决的问题。即保证“业务执行本地事务”(创建订单)和“发送消息”这两件事情的原子性,然后依赖消息队列的可靠投递,实现订单系统与优惠券系统对数据处理的最终一致;事务语义的流程如下图:
服务端为了实现事务机制(保障下游系统能消费到客户端认为已经提交的事务消息),需要做以下工作:
1)引入事务状态来“记录”客户端事务是否提交;
2)由于网络原因或订单系统重启,导致有些客户端已提交或撤回的消息在服务端的状态是未知的,需要辅助“补偿”机制(对不确定状态的消息进行回查或客户端询问)来解决。
Talos 系统在实现事务机制时有两点需要说明:
1)关于“补偿”机制的实现:RocketMQ 为业务提供了一种反查本地事务状态接口的方案,而 Talos 则使用了一种成本更为低廉的方法,具体可期待后续文章《Talos 事务消息设计》;
2)优惠券系统消费消息失败,由于 Talos 对消息的持久化和可重放,即使优惠券核销失败,也可以通过重试来解决;
简单总结下分布式事务场景中消息队列的价值:通过两阶段提交语义(事务消息),事务状态反查的补偿机制,消息队列的可靠投递,以及业务消费逻辑的幂等,实现了如上所述分布式事务的最终一致。
三、从历史看消息队列的价值演化
消息中间件作为企业基础架构的关键组件由来已久,跟 Google 的三驾马车一样,同处于大数据时代的开启;十几年来,消息系统的演化历程,大致可以分为三个阶段[2]。从这三个阶段的历史来看,我们基本可以得到消息队列在大数据架构变迁过程中的角色和价值演化。
最早的消息队列开源产品 ActiveMQ,要追溯到 2003 年,这一时代的 MQ 产品,主要是围绕 JMS、AMQP 等标准化消息规范和协议来设计的,其目标主要是用来减轻经典 RPC 调用的紧耦合,剥离异步处理逻辑,解耦系统复杂度;
价值关键词:调用松耦合、异步处理
消息系统第二个阶段便是以 Kafka 为经典代表的实时数据管道时代,这一时代的产品在吞吐和数据量上都有质的提升,其主要的应用场景是解决数据集成的痛点,提高数据在不同系统间的流转效率。同时,Lambda 架构的引入使得 Kafka + Storm 成为那个时代实时计算的标配。
价值关键词:数据管道、数据集成、实时计算
消息系统的第三个阶段,流数据平台,不仅仅是一个简单的数据管道,更多的强调平台化。平台化会带来更多的挑战与困难,如业务多样性,数据海量化等,这就使得消息系统必须配备完善的多租户管理、I/O 隔离、流控与配额管理等。随着容器化的发展,消息队列更需要一个灵活的架构来适应这种变革,存储计算分离很可能是中间件产品未来架构的发展方向。仔细想来,在云计算方面,无论是早期的 MapReduce 还是最近的 Spark-Streaming、Flink,都是存储计算分离的架构,计算层只负责“计算”,而数据的存储都依赖 HDFS 等存储系统进行“读取与写入”。
价值关键词:平台化、容器化、存储计算分离架构
四、小米的消息队列产品 Talos 与 EMQ
聊了这么多消息队列的“价值”,显然,小米消息队列产品的使命便是把这些“价值”落地到业务。
作者批注:这就是工作的目标啊,工作的目标啊,工作的目标啊!重要的事情说三遍。
小米有两款自研的消息中间件产品:Talos 与 EMQ;篇幅限制,这里简单回答两个经常被小伙伴问到的问题:
1.Talos/EMQ 与开源产品 Kafka/ActiveMQ/RocketMQ 等有什么区别?
2.Talos 和 EMQ 之间又有什么区别?
1. Talos/EMQ 与开源产品的区别
Talos 和 EMQ 均立项于 2015 年,前者基于 HDFS,后者基于 HBase,得益于小米云平台在分布式存储方面的深耕与积累,这两款产品都得到了存储团队的大力支持,伴随业务的信任与锤炼而来,具体可参考后续文章《万亿级消息背后:小米在消息队列的实践》。
要说与开源产品的联系,一方面在系统设计上会针对开源产品的一些痛点进行优化,另一方面也会不断学习/借鉴开源产品的优点落地内部业务;主要的区别主要有两方面:
1)企业级特性与平台化
Talos 与 EMQ 立项的目标便是服务于小米内部业务和生态链公司,对内打通 LADP,服务各部门业务,对外打通小米账号,向生态链公司输出中间件价值。针对企业深度定制的多租户管理以及一些平台化特性是它们与开源产品在落地点上的主要不同。
2)架构设计的不同
Talos 和 EMQ 架构上皆是存储计算分离的设计,从架构理念上相对领先于业界同时期的开源产品,存储计算分离的好处有很多,主要两个方面:
计算层无状态:管理、调度都变得简单;天然支持扩展,扩容无感知;具有高弹性的容错机制(相比计算层无状态的灵活,存储计算耦合的架构,Failover 非常麻烦,一般需要复杂的选举算法,正常服务前需保证数据同步);
存储层更加专注:只需要做好数据存储功能即可,而存储系统相对比较成熟,比如 HDFS / HBase,借助它们的力量,间接降低了系统复杂度;
当然,这种架构也不是完美无瑕,系统设计从来都是实际情况和业务特点综合衡量取舍的结果,不存在完美的架构和设计,只有合适的架构与设计。
2. Talos 与 EMQ 的区别
Talos 和 EMQ 的区别,更像是 “从历史看消息队列价值演化” 小节中的 “数据管道” 与 “异步调用解耦” 的区别。
从消息队列的使用需求或消费模型来说,一般分为两种类型的设计:一种是面向流而设计的,如 Kafka;一种是面向队列去设计的,即传统意义的 Queue 数据结构,如 RabbitMQ;Talos 是前者,EMQ 偏向于后者;
简单列举下这两种设计的特点:
流模型(Stream)
强调消息的有序,消费下游一般关注消息顺序
强调消费的独占,Consumer 是跟 Partition 强绑定的
消息的读写一般是 Batch 进行,batch ack,偏重吞吐的优化
消息持久化保存,支持消息回溯重放
典型场景:数据集成下的系统解耦;数据收集 + 后端计算,这也是 Talos 主要的应用场景
队列模型(Queue)
采用无序的方式消费,消费下游一般不关心消息排序
采用共享的方式消费,可以通过增加 Consumer 数量来增加并发度
对于一条消息,多个 Consumer 中只有一个(可能是任何一个)能消费到这条消息
能够跟踪单条消息状态,可以随机 ACK 一条消息,偏重延迟优化
一般情况下,消息消费后就会被标记删除
典型场景:异步调用,事件通知,比如上面提到的注册时异步的发送邮件、下单时异步的发送短信,这些都是 EMQ 的应用场景。
到这里,本文一开始提出的问题,大家得到了吗?欢迎讨论和提出建议,给耐心看完的你点个赞!
五、参考文献
[1]《The Log: What every software engineer should know about real-time data’s unifying abstraction》
[2]《杠上 Spark、Flink?Kafka 为何转型流数据平台》
评论 6 条评论