最近,淘宝开源了分布式消息中间件 Memorphosis 项目,它是 Linkedin 开源 MQ——Kafka 的 Java 版本,针对淘宝内部应用做了定制和优化。
据了解,Metamorphosis(以下简称 Meta)的设计原则包括:
- 消息都是持久的,保存在磁盘。
- 吞吐量第一。
- 消费状态保存在客户端。
- 分布式,生产者、服务器和消费者都可分布。
Metamorphosis 的总体架构图如下:
除了完整实现 Kafka 的功能之外,淘宝开发团队还为 Meta 加入了额外的功能,使得 Meta 成为一个更为强大的通用消息中间件,包括:
- 彻底用 Java 重写的实现,高效的协议和通讯框架。
- 发送端的负载均衡。
- Master/Slave 异步和同步复制的高可用方案。
- 专门用于广播消息的客户端实现。
- 与 diamond 结合使用的顺序发送消息功能。
- 支持事务,包括本地事务和分布式事务,实现 JTA 规范。
消息中间件中有两个角色:消息生产者和消息消费者。Meta 里同样有这两个概念,消息生产者负责创建消息并发送到 Meta 服务器,Meta 服务器会将消息持久化到磁盘,消息消费者从 Meta 服务器拉取消息并提交给应用消费。
从 Meta 的技术手册中,我们可以更加深入的看到 Meta 的实现细节和应用指南。
在使用消息生产者和消费者之前,我们需要创建它们,这就要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory 还在后面帮你做很多事情,包括:
- 服务的查找和发现,通过 Diamond 和 Zookeeper 帮你查找日常的 Meta 服务器地址列表。
- 连接的创建和销毁,自动创建和销毁到 meta 服务器的连接,并做连接复用,也就是到同一台 Meta 的服务器在一个工厂内只维持一个连接。
- 消息消费者的消息存储和恢复。
- 协调和管理各种资源,包括创建的生产者和消费者的。
消息生产者的接口是 MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过 MessageSessionFactory 的 createProducer 方法即可以创建一个生产者。请注意,MessageProducer 是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为 Spring 里的 singleton bean。MessageProducer 创建的代价昂贵,每次都需要通过 zk 查找服务器并创建 TCP 长连接。
发送消息后,消费者可以接收消息了。通过 createConsumer 方法来创建 MessageConsumer,传入一个 ConsumerConfig 参数,这是消费者的配置对象。每个消息者都必须有一个 ConsumerConfig 配置对象,这里只设置了 group 属性,这是消费者的分组名称。Meta 的 Producer、Consumer 和 Broker 都可以为集群。消费者可以组成一个集群共同消费同一个 Topic,发往这个 Topic 的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个 Group,这个概念跟 Notify 里的订阅者组名是一样的。消息的消费过程可以是一个并发处理的过程,getExecutor 返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage 方法用于实际的消息消费处理,message 参数即为消费者收到的消息,它必不为空。
如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给 MessageListener 消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。
Metamorphosis 1.2 开始支持事务,包括发送端和消费端事务。发送端同时支持本地事务和分布式事务,可以在一个事务内发送多条消息,要么同时成功,要么同时失败;可以使用 XA 事务,在事务内操作其他 XA 资源,例如操作数据库,与此同时发送 meta 消息,可以保证这些操作和发送消息要么一起成功,要么一起失败。
如果你要在发送消息的同时操作数据库,比如同时将消息插入某张表,例如下订单的时候同时发送消息通知卖家并将订单插入数据库,这时候因为涉及到两个 Resource(数据库和 Meta),就需要使用分布式事务来保证 ACID。分布式事务一般采用两阶段提交协议,在 java 里就是使用 JTA 规范 API 的 XA 部分。
关于服务器的可靠性保证,消息生产者发送的消息在 Meta 服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。
写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层 OS,OS 对写有缓冲。Meta 有两个特性来保证数据落到磁盘上:
- 每 1000 条(可配置),即强制调用一次 force 来写入磁盘设备。
- 每隔 10 秒(可配置),强制调用一次 force 来写入磁盘设备。
因此,Meta 通过配置可保证在异常情况下(如磁盘掉电)10 秒内最多丢失 1000 条消息。
服务器通常组织为一个集群,一条从生产者过来的消息可能按照路由规则存储到集群中的某台机器。Meta 还正在实现高可用的 HA 方案,类似 MySQL 的异步复制,将一台 Meta 服务器的数据完整复制到另一台 Slave 服务器,并且 Slave 服务器还提供消费功能,本方案正在实现中。
对于消费者的可靠性保证 ,消息的消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消息(默认最大 5 次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线程继续往后走,消费后续的消息。因此,只有在 MessageListener 确认成功消费一条消息后,meta 的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。
消费者的另一个可靠性的关键点是 Offset 的存储,也就是拉取数据的偏移量。我们目前提供了以下几种存储方案:
- Zookeeper,默认存储在 Zoopkeeper 上,Zookeeper 通过集群来保证数据的安全性。
- MySQL,可以连接到您使用的 MySQL 数据库,只要建立一张特定的表来存储。完全由数据库来保证数据的可靠性。
- File,文件存储,将 Offset 信息存储在消费者的本地文件中。
Offset 会定期保存,并且在每次重新负载均衡前都会强制保存一次。
Meta 的 HA(High Availability) 提供了在某些 Broker 出现故障时继续工作而不影响消息服务的可用性;跟 HA 关系紧密的就是 Failover,当故障 Server 恢复时能重新加入 Cluster 处理请求,这个过程对消息服务的使用者是透明的。Meta 基于 Master/Slave 实现 HA,Slave 以作为 Master 的订阅者(consumer)来跟踪消息记录,当消息发送到 Master 时候,Slave 会定时的获取此消息记录,并存储在自己的 Store 实现上;当 Master 出现故障无法继续使用了,消息还会在 Slave 上 Backup 的记录。这种方式不影响原有的消息的记录,一旦 Master 记录成功,就返回成功,不用等待在 slave 上是否记录;正因如此,Slave 和 Master 还有稍微一点的时间差异,在 Master 出故障那一瞬间,或许有最新产生的消息,就无法同步到 Slave;另外 Slave 可以作为 Consumer 的服务提供者,意思就是如果要写入必须通过 Master,消费时候可以从 Slave 上直接获取。
Failover 机制采用 Client 端方式,Master 和 Slave 都需要注册到 ZK 上,一旦 Master 无法使用,客户端可使用与之对应的 Slave;当 Master 的故障恢复时候,处理的方式:原来的 Master 变成 Slave,Slave 变成 Master;恢复故障的 broker 作为 Slave 去之前的 Slave 同步消息。优点简单,但是需要 Slave 和 Master 有一样的配置和处理能力,这样就能取代 Master 的位置,目前 Meta 采用此方式。
对 Metamorphosis 感兴趣的开发人员可以访问其官方网站,了解更多信息。
作者的微信公众号“老崔瞎编”,关注 IT 趋势,承载前沿、深入、有温度的内容。感兴趣的读者可以搜索 ID:laocuixiabian,或者扫描下方二维码加关注。
评论