消息队列作为智联招聘非常重要的平台级服务负责全业务线的消息投递。有很多非常典型的业务场景,我们用一个业务场景简历投递来说明消息队列为业务提供的支持
图 1.简历投递业务
一条消息会被多个业务方消费
多个业务方之间广播方式消费(每个业务方消费完整的一份数据)
单个业务方采用集群消费模式(每个 consumer 消费部分数据)
每条消息都需要确保送达,消息队列会采用重试的机制来保证这一点
重要业务消息需要提供跟踪机制可以查询整个消息的生命周期
还有一些业务的需求会使用到延时消息,定时消息等。
基于 RabbitMQ 的自研 MQService
RabbitMQ 作为一款非常成熟的消息队列产品可以很好的应对工作队列的场景,当然也有一些不足比如单队列的扩展能力、延时消息支持的不够好等。我们在 RabbitMQ 基础上又做了一层抽象(MQService),将 RabbitMQ 看做一个消息服务的存储节点来用,在 Zookeeper 中会记录 Topic 的数据在 RabbitMQ 节点的分布并增加了容错的特性来保证存储节点失败的情况下可以持续提供消息写入能力。在招聘旺季消息队列每天约承载数 10 亿的消息投递。
MQService 整体结构如下:
图 2.MQService 架构
接下来我们看一下通过 zookeeper 维护的 Topic 与 RabbitMQ 分组以及 RabbitMQ 节点的关系。
每个 Topic 都对对应到一个 Group,每个 Group 下会挂一些 RabbitMQ 节点。当 producer 发送一条消息时 MQService 会从缓存中拿到对应这个 Topic 可用的 RabbitMQ 节点的列表,MQService 会通过负载均衡策略选择其中的一个 RabbitMQ 节点进行写入,写入失败会重试下一个节点直到写入成功。单个节点如果写入失败次数在一定时间内达到一个特定值会触发熔断机制,单个 RabbitMQ 节点在熔断期间不对外提供写入及查询服务。
通过上面的介绍,大家应该可以对 MQService 有一个直观的印象,这里不再详细展开来介绍实现的细节。
以 Kafka 为中心的流批处理
首先 Kafka 在智联有招聘有大规模的应用,每天的数据传输量大约在数十 TB 量级,覆盖的范围包括 ELK、实时计算等。我们还是以计算每天不同时间端的投递量为例子来介绍 Kakfa 在这个场景下的使用。
图 3.基于 Kafka 的 Streaming 模型
这是一个非常经典的流式计算的架构,通过采集业务日志入 Kafka,再通过 Spark/Flink 之类的计算框架做计算工作。在计算投递量的场景中,我们通过将计算结果按小时以及职位为维度将计算结果保存在 MySQL 中,也会将明细数据存储在 Hive 中供离线计算使用。
那么很容易发现同一个业务场景但是数据来源是不一样的,一个是业务方发送至 MQService 的一个是通过 Logstash 采集业务端日志来做的,那么我们如何来保证这两份数据的一致性?即使不通过采集日志,业务方去双写又如何来保证一致性呢?这样也给用户带来额外的负担。
矛盾点在于 MQService 很难融入到流行的实时计算框架或者批处理框架,而 Kafka 也应用在工作队列模式下显得有点力不从心。主要体现在 Kakfa 数据消费的支持,Partition 数量与 Consumer 数量的绑定造成的 Partition 数量要跟着 consumer 的消费能力决定,业务方处理数据很难保证一批数据都能够成功处理,做 offset commit 的时候也是无法达到用户的预期。这是基于产品在业务场景的匹配上而讨论论的,就像 Kafka 介绍的那样(A distributed streaming platform)。
因此在 2018 年初时我们提出了通过一套方案来解决“工作队列 + Streaming”的想法,也就是事件中心,期望事件中心可以承载智联全业务线用户行为、中台以及后台的业务事件传递。产品和业务系统在服务过程中会产生事件,事件是在先前定义好的,对于事件的生产方无需关心事件的消费,只需要关心事件的格式定义以及事件的生产。事件的消费方可以在事件中心去查阅自己想要的订阅的事件来申请订阅,甚至是数据产品也可以在事件中心去找找灵感。
这样我们不需要关心两个消息中间件数据的一致性问题,一份数据就可以匹配“工作队列 + Streaming”场景,对资源消耗、系统运维都有很好的改善。
工作队列 + Streaming 场景全新诉求
有了这个想法之后我们开始总结我们需要的是一个什么样的产品,然后围绕我们的需求去做设计工作和技术调研工作。我们总结了我们一些诉求如下:
容灾能力及一致性
数据做分布式存储并且在分布式环境中要保证一致性。有一些重要的业务是依赖消息可靠性以及数据一致性的,所以在技术选型的时候如果在一个支持一致性的模型下去弱化一致性提升可用性是比较容易的,但是如果在一个没有一致性模型的方案上去做一致性这将会需要一个很大的改动。
单 Topic 扩展能力
就像在 MQService 描述的那样,同一个 Topic 可以利用多个节点来做横向扩展。Kafka 在这一点做了很好的抽象(Partition)。同一个 Partition 的事件可以提供顺序性消费。
累计签收与单条签收
累计签收主要应用在 Streaming 场景下,而单条签收可以很好的匹配工作队列的场景,就像一次简历投递的业务处理,业务本身没有顺序性的要求,单条签收可以很好的支持消费者的消费能力扩展。而在累计签收模式下单分区是要保证顺序性的。
事件回溯能力
我们需要根据不同的事件来决定保留的时长或大小,可以为一些想要拿到历史事件的业务提供支持,我们也可以看到这也是 MQService(上文提到的)薄弱的地方,MQService 是无法给用户提供回溯能力的。
基于上面的一些主要的特性我们开始了技术选型的调研工作。经过了一段时间的调研工作后,我们发现开源的消息中间件产品兼顾容灾能力和一致性的产品几乎没有,因此我们产生了一个想法,那就是基于一个强一致性的分布式日志存储系统来做队列功能的开发,期间也考虑过使用 Raft 协议+Log 存储,但是最终还是 Bookkeeper 吸引了我们的关注。Bookeeper 提供了开箱即用的 API,同时它在 Twitter、Hadoop 已经有大规模应用的场景,其稳定性以及成熟度等都是可以保证的。因此我们在大约 5 月份的时候已经开始做基于 Bookkeeper 事件中心的一些设计工作,在接触 Bookkeeper 社区的的时候才了解到 Apache Puslar,通过了一段时间对 Apache Pulsar 的了解以及对社区活跃度的观察,在 Apache Pulsar 社区小伙伴们的大力支持下,我们决定基于 Apache Pulsar 来搭建我们的事件中心。
为什么选择 Apache Pulsar ?
Apache Pulsar 有很多特性在满足事件中心需求的前提了还给了我们更多的惊喜,为更多的场景提供非常好的解决方案。
灵活的可用性和一致性选择
在每个 Topic 中由一系列的 Ledger 构成,每个 Ledger 有三个关键配置:
Ensemble Size (E)
Write Quorum Size (Qw)
Ack Quorum Size (Qa)
Ensemble Size (E) 决定了 Pulsar 写入 Ledger 可用的 Bookies 池的大小。
Write Quorum (Qw) 是 Pulsar 将要写入的实际的 Bookies 数量。可以等于或者小于 E。
图 4.E = 3 Qw = 3
当 Qw 小于 E 时,以条带化的方式分配读/写即每个 Bookie 只提供读写请求的子集。因此可以提升吞吐量,降低延迟。这也是提升单个 Partition 吞吐能力的一个很好的方案,这也得益于基于 Segment 为物理单元的存储设计。消息通过 Robin 的方式写入指定的 Bookie,在查询消息是可以根据 MessageId 取模即能获得所在的 Bookie 列表。
图 5.E = 5 Qw = 3
Ack Quorum (Qa) 是确认写入 Bookies 的数量,Pulsar Broker 将确认发送给客户端。为了一致性,Qa 应该是:(Qw + 1) / 2 或者更大。
这个特性可以很好的让我们在可用性和一致性上去做选择。
订阅的抽象
单队列的扩展 Kafka 为我们做了很好的抽象,Apache Pulsar 也基本采用相同的思路。而对于订阅的抽象,我们认为 Apache Pulsar 再一次为我们做了很好的抽象,通过 3 种不同的订阅方式来匹配不同的使用场景。
图 6.Apache Pulsar 对订阅的抽象
消息存储在 Topic 中。逻辑上一个 Topic 是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar 使用游标来跟踪偏移量。生产者将消息发送到一个指定的 Topic,Apache Pulsar 保证消息一旦被确认就不会丢失(正确的配置和非整个集群故障的情况下)。
消费者通过订阅来消费 Topic 中的消息。订阅是游标(跟踪偏移量)的逻辑实体,并且还根据不同的订阅类型提供一些额外的保证
Exclusive(独享) - 一个订阅只能有一个消息者消费消息
Shared(共享) - 一个订阅中同时可以有多个消费者,多个消费者共享 Topic 中的消息
Fail-Over(灾备) - 一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。
一个 Topic 可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。
Apache Pulsar 通过允许消费者将 Topic 看做在消费者消费确认后删除消息的队列,或者消费者可以根据游标的回放来提供队列和日志的语义。在底层都使用日志作为存储模型。
这为我们通过一套系统支持工作队列和 Streaming 的诉求提供了很好的支持,在工作队列场景我们使用 share 模式,在 Streaming 模式我们使用 Failover 或者 Exclusive。我们只需要一份数据就可以同时支持两种场景。
更好的 IO 和存储设计
当在 Bookie 上写入数据时,首先将该消息写入日志文件,这是一个预写日志(WAL),它可以帮助 Bookkeeper 在发生故障时避免数据丢失。它与关系型数据库持久化保证的机制相同。
写入预写日志的操作完成后会将数据放入缓存。写入的缓存会在内存中做积累并定期进行排序和刷盘。对写入进行排序以便将同一 Ledger 的条目放在一起,从而提高读取性能。如果条目以严格的时间顺序写入,在读取时无法利用磁盘的高效顺序操作
Bookkeeper 容许将磁盘 IO 做读写分离。写入都按顺序写入日志文件可以存储在专用的磁盘上,并且可以批量刷盘以获得搞得吞吐量。除此之外从写入操作来看没有其他的同步磁盘 IO 操作,数据都是写入到内存的缓存区。
写缓存通过异步的方式批量将条目写入到日志文件和 RocksDB,因此,一个磁盘用于同步写入日志文件,另一个磁盘用于异步写入数据和读取操作,
图 7.Apache Pulsar 的 IO 及存储设计
在存储设计上 Bookkeeper 以 Segment 为中心设计对系统扩容、冷热数据分离提供了很好的支持。在扩容方面通过增加 Bookie 节点就可以分担整个集群的存储压力,在冷热数据分离方面通过将 Segment 搬迁至二级存储如 S3、OSS 等更廉价的存储设备中,支持在线业务往往使用 SSD 来做存储。因此我们可以兼顾热数据的高性能与冷数据的大空间存储。
图 8.Bookie 扩容
图 9.冷数据搬迁
在 IO 和存储设计上以及 Offload 的特性给了我们更多的惊喜,可以更好的为我们在不影响在线业务的支持上兼顾大量事件存储需求的痛点,大大的降低了冷数据的存储成本。我们计划将冷数据存储至 OSS。
上面挑选了 Apache Pulsar 非常核心的 3 个 messaging 特性来做介绍,这与事件中心的初衷是非常匹配的,然而 Apache Pulsar 远不止这些,有完善的多租户特性提供 Topic 的分层次管理,多种 Schema 的支持为数据校验、序列化提供更便捷的方式,轻量级的 Pulsar Function 以及 Pulsar SQL 都是非常值得去探索的特性,这里就不一一展开介绍了。
Apache Pulsar 在智联招聘的落地实践
下面将介绍 Apache Pulsar 在智联招聘落地过程中的一些实践
为 Namespace 设置合理的 Backlog Quota
Pulsar 为我们提供了 Backlog 的机制能够记录每个 Subscription 的消费状况。也提供了 Backlog Quota 的设置,主要可以设置 Backlog 大小以及达到阈值时的控制策略。
Backlog Quota 的控制策略有 3 种:
producer_request_hold
producer_exception
consumer_backlog_eviction
producer_request_hold 作为默认配置,在达到 Backlog 设置的大小阈值后会 block producer 发消息操作,这个配置不适合用于消息发送方是在线业务的使用场景。
producer_exception 在达到 Backlog 设置的大小阈值后,producer 会快速失败。
consumer_backlog_eviction 在达到 Backlog 设置的大小阈值后会将 subscription 未签收的头部数据逐出,可以理解为自动签收。其实这个和 producer_exception 的区别在于 producer_exception 对于订阅方将会丢失尾部数据,而 consumer_backlog_eviction 是丢失头部数据。
我们大部分使用 consumer_backlog_eviction 策略。目前 Pulsar 支持在 namespace 级别设置这个策略,在 2.2.0 版本可以在 broker.conf 文件修改全局策略。将 Backlog 作为一个重点的监控项监控起来也是非常有必要的,后面会说到这部分。
增加 MaxProducersPerTopic 限制
防止错误或者恶意的 Client 使用造成 Broker 维持大量的 Producer 对象。Broker 默认的配置是不限制的,增加限制可以提升 Pulsar Cluster 的安全性。
Pulsar 提供两种方式来设定 MaxProducerPerTopic
broker.conf 中设置 maxProducersPerTopic
通过./pulsar-admin namespaces set-max-producers-per-topic -p
目前我们在 broker.conf 中的 maxProducersPerTopic = 10000,如果 namespace 有个性需求的话通过 ./pulsar-admin namespaces set-max-producers-per-topic -p 设置。
Apache Pulsar 监控与报警
Pulsar 提供丰富的 Prometheus 指标信息输出,我们可以这些指标信息来做好 Pulsar 的监控报警。Pulsar 的客户端也记录了丰富的指标,我们做了一个 client 的扩展包将 client 的节点信息记录在 Zookeeper 中,由 Prometheus 自动发现,这样 Client 端的指标信息由 Prometheus 采集。
配合 Grafana 的监控展示,实时了解集群的状态
图 10.集群状态看板-1
图 11.集群状态看板-2
图 12.分 Namespace 状态展示
Client 发送失败次数
Backlog 超阈值
Rates/Network 超阈值
Client > 50ms 延迟
Broker > 50ms 延迟
Storage Size 超阈值
Pulsar 多集群流量切换
为了避免集群整体不可用,我们通过 Zookeeper 控制 Client 的连接串。基于 Pulsar Client 的 Service URL Provider 基础上做的二次开发。在 Zookeeper 中存储的 Pulsar 连接串改变的时候,Client 会自动断掉当前的连接并重新与新的 Pulsar 地址进行连接。
为重要业务提供消息链路追踪
我们基于 Pulsar Client 的 Interceptor 接口以及 Zipkin 进行二次开发,为了实现消息的链路跟踪。消息链路跟踪的方案是通过日志采集统一入 Hbase。每条消息都具备消息链路跟踪成本是昂贵的,并不适用于所有的场景,更适应与一些比较重要切消息量不太大的场景,当然这个根据不同的业务而定。
总结
我们通过介绍之前的消息中间件在智联招聘的应用情况来说明我们的痛点所在,我们计划打造一个可以解决当下痛点的产品来支撑智联招聘的业务。我们通过一段时间的技术选型工作后最终选择了 Apache Pulsar 作为我们的搭建企业级事件中心的基础。
截止目前事件中心接入的事件种类约 100 个,每天产生 5 亿事件量。部分业务通过灰度的方式接入,计划在 11 月底能够接入 20 亿事件量/日。
智联招聘也在持续为 Apache Pulsar 社区贡献新的特性比如 Dead Letter Topic, Client Interceptors 等,智联招聘有很多业务场景也非常依赖延时消息特性,后面我们也会在 Pulsar 上贡献此特性。
感谢 Apache Pulsar 社区小伙伴们在项目落地过程中的技术支持。
评论 5 条评论