随着 Uber 业务规模不断增长,我们的系统也在持续不断地产生更多的事件、服务间的消息和日志。这些数据在得到处理之前需要经过 Kafka。那么我们的平台是如何实时地对这些数据进行审计的呢?
为了监控 Kafka 数据管道的健康状况并对流经 Kafka 的每个消息进行审计,我们完全依赖我们的审计系统 Chaperone。Chaperone 自 2016 年 1 月成为 Uber 的跨数据中心基础设施以来,每天处理万亿的消息量。下面我们会介绍它的工作原理,并说明我们为什么会构建 Chaperone。
Uber 的 Kafka 数据管道概览
Uber 的服务以双活的模式运行在多个数据中心。Apache Kafka 和 uReplicator 是连接 Uber 生态系统各个部分的消息总线。
截止 2016 年 11 月份,Uber 的 Kafka 数据管道概览。数据从两个数据中心聚合到一个 Kafka 集群上。
要让 Uber 的 Kafka 对下游的消费者做出即时响应是很困难的。为了保证吞吐量,我们尽可能地使用批次,并严重依赖异步处理。服务使用自家的客户端把消息发布到 Kafka 代理,代理把这些消息分批转发到本地的 Kafka 集群上。有些 Kafka 的主题会被本地集群直接消费,而剩下的大部分会跟来自其他数据中心的数据一起被组合到一个聚合 Kafka 集群上,我们使用 uReplicator 来完成这种面向大规模流或批处理的工作。
Uber 的 Kafka 数据管道可以分为四层,它们跨越了多个数据中心。Kafka 代理和它的客户端分别是第二层和第一层。它们被作为消息进入第三层的网关,也就是每个数据中心的本地 Kafka 集群。本地集群的部分数据会被复制到聚合集群,也就是数据管道的最后一层。
Kafka 数据管道的数据都会经过分批和确认(发送确认):
Kafka 数据管道的数据流经的路径概览。
Uber 的数据从代理客户端流向 Kafka 需要经过几个阶段:
- 应用程序通过调用代理客户端的 produce 方法向代理客户端发送消息。
- 代理客户端把收到的消息放到客户端的缓冲区中,并让方法调用返回。
- 代理客户端把缓冲区里的消息进行分批并发送到代理服务器端。
- 代理服务器把消息放到生产者缓冲区并对代理客户端进行确认。这时,消息批次已经被分好区,并根据不同的主题名称放在了相应的缓冲区里。
- 代理服务器对缓冲区里的消息进行分批并发送到本地 Kafka 服务器上。
- 本地 Kafka 服务器把消息追加到本地日志并对代理服务器进行确认(acks=1)。
- uReplicator 从本地 Kafka 服务器获取消息并发送到聚合服务器上。
- 聚合服务器把消息追加到本地日志并对 uReplicator 进行确认(acks=1)。
我们为了让 Kafka 支持高吞吐量,做出了一些权衡。数以千计的微服务使用 Kafka 来处理成百上千的并发业务流量(而且还在持续增长)会带来潜在的问题。Chaperone 的目标是在数据流经数据管道的每个阶段,能够抓住每个消息,统计一定时间段内的数据量,并尽早准确地检测出数据的丢失、延迟和重复情况。
Chaperone 概览
Chaperone 由四个组件组成:AuditLibrary、ChaperoneService、ChaperoneCollector 和 WebService。
Chaperone 架构:AuditLibrary、ChaperoneService、ChaperoneCollector 和 WebService,它们会收集数据,并进行相关计算,自动检测出丢失和延迟的数据,并展示审计结果。
AuditLibrary实现了审计算法,它会定时收集并打印统计时间窗。这个库被其它三个组件所依赖。它的输出模块是可插拔的(可以使用 Kafka、HTTP 等)。在代理客户端,审计度量指标被发送到 Kafka 代理。而在其它层,度量指标直接被发送到专门的 Kafka 主题上。
审计算法是 AuditLibrary 的核心,Chaperone 使用 10 分钟的滚动时间窗来持续不断地从每个主题收集消息。消息里的事件时间戳被用来决定该消息应该被放到哪个时间窗里。对于同一个时间窗内的消息,Chaperone 会计算它们的数量和 p99 延迟。Chaperone 会定时把每个时间窗的统计信息包装成审计消息发送到可插拔的后端,它们可能是 Kafka 代理或者之前提到的 Kafka 服务器。
Chaperone 根据消息的事件时间戳把消息聚合到滚动时间窗内。
审计消息里的 tier 字段很重要,通过它可以知道审计是在哪里发生的,也可以知道消息是否到达了某一个地方。通过比较一定时间段内不同层之间的消息数量,我们可以知道这段时间内所生成的消息是否被成功送达。
ChaperoneService是工作负载最高的一个组件,而且总是处在饥饿的状态。它消费 Kafka 的每一个消息并记录时间戳。ChaperoneService 是基于 uReplicator 的 HelixKafkaConsumer 构建的,这个消费者组件已经被证明比 Kafka 自带的消费者组件(Kafka 0.8.2)更可靠,也更好用。ChaperoneService 通过定时向特定的 Kafka 主题生成审计消息来记录状态。
ChaperoneCollector监听特定的 Kafka 主题,并获取所有的审计消息,然后把它们存到数据库。同时,它还会生产多个仪表盘:
Chaperone 创建的仪表盘,从上面我们看出数据的丢失情况。
从上图可以看出每个层的主题消息总量,它们是通过聚合所有数据中心的消息得出的。如果没有数据丢失,所有的线会完美地重合起来。如果层之间有数据丢失,那么线与线之间会出现裂缝。例如,从下图可以看出,Kafka 代理丢掉了一些消息,不过在之后的层里没有消息丢失。从仪表盘可以很容易地看出数据丢失的时间窗,从而可以采取相应的行动。
从仪表盘上还能看出消息的延迟情况,借此我们就能够知道消息的及时性以及它们是否在某些层发生了传输延迟。用户可以直接从这一个仪表盘上看出主题的健康状况,而无需去查看 Kafka 服务器或 uReplicator 的仪表盘:
Chaperone 提供一站式的仪表盘来查看每个数据中心的主题状态。
最后,WebService提供了 REST 接口来查询 Chaperone 收集到的度量指标。通过这些接口,我们可以准确地计算出数据丢失的数量。在知道了数据丢失的时间窗后,我们可以从 Chaperone 查到确切的数量:
Chaperone 的 Web 界面。
Chaperone 的两个设计目标
在设计 Chaperone 时,为了能够做到准确的审计,我们把注意力集中在两个必须完成的任务上:
1)每个消息只被审计一次
为了确保每个消息只被审计一次,ChaperoneService 使用了预写式日志(WAL)。ChaperoneService 每次在触发 Kafka 审计消息时,会往审计消息里添加一个 UUID。这个带有相关偏移量的消息在发送到 Kafka 之前被保存在 WAL 里。在得到 Kafka 的确认之后,WAL 里的消息被标记为已完成。如果 ChaperoneService 崩溃,在重启后它可以重新发送 WAL 里未被标记的审计消息,并定位到最近一次的审计偏移量,然后继续消费。WAL 确保了每个 Kafka 消息只被审计一次,而且每个审计消息至少会被发送一次。
接下来,ChaperoneCollector 使用 ChaperoneService 之前添加过的 UUID 来移除重复消息。有了 UUID 和 WAL,我们可以确保审计的一次性。在代理客户端和服务器端难以实现一次性保证,因为这样会给它们带来额外的开销。我们依赖它们的优雅关闭操作,这样它们的状态才会被冲刷出去。
2)在层间使用一致性的时间戳
因为 Chaperone 可以在多个层里看到相同的 Kafka 消息,所以为消息内嵌时间戳是很有必要的。如果没有这些时间戳,在计数时会发生时间错位。在 Uber,大部分发送到 Kafka 的数据要么使用 avro 风格的 schema 编码,要么使用 JSON 格式。对于使用 schema 编码的消息,可以直接获取时间戳。而对于 JSON 格式的消息,需要对 JSON 数据进行解码才能拿到时间戳。为了加快这个过程,我们实现了一个基于流的 JSON 消息解析器,这个解析器无需预先解码整个消息就可以扫描到时间戳。这个解析器用在 ChaperoneService 里是很高效的,不过对代理客户端和服务器来说仍然需要付出很高代价。所以在这两个层里,我们使用的是消息的处理时间戳。因为时间戳的不一致造成的层间计数差异可能会触发错误的数据丢失警告。我们正在着手解决时间戳不一致问题,之后也会把解决方案公布出来。
Chaperone 在 Uber 的两大用途
1. 检测数据丢失
在 Chaperone 之前,数据丢失的第一个征兆来自数据消费者,他们会出来抱怨数据的丢失情况。但是等他们出来抱怨已经为时已晚,而且我们无法知道是数据管道的哪一部分出现了问题。有了 Chaperone 之后,我们创建了一个用于检测丢失数据的作业,它会定时地从 Chaperone 拉取度量指标,并在层间的消息数量出现不一致时发出告警。告警包含了 Kafka 数据管道端到端的信息,从中可以看出那些管道组件的度量指标无法告诉我们的问题。检测作业会自动地发现新主题,并且你可以根据数据的重要性配置不同的告警规则和阈值。数据丢失的通知会通过多种通道发送出去,比如页式调度系统、企业聊天系统或者邮件系统,总之会很快地通知到你。
2. 在 Kafka 里通过偏移量之外的方式读取数据
我们生产环境的大部分集群仍然在使用 Kafka 0.8.x,这一版本的 Kafka 对从时间戳到偏移量的索引没有提供原生支持。于是我们在 Chaperone 里自己构建了这样的索引。这种索引可以用来做基于时间区间的查询,所以我们不仅限于使用 Kafka 的偏移量来读取数据,我们可以使用 Chaperone 提供的时间戳来读取数据。
Kafka 对数据的保留是有期限的,不过我们对消息进行了备份,并把消息的偏移量也原封不动地保存起来。借助 Chaperone 提供的索引,用户可以基于时间区间读取这些备份数据,而不是仅仅局限于 Kafka 现存的数据,而且使用的访问接口跟 Kafka 是一样的。有了这个特性,Kafka 用户可以通过检查任意时间段里的消息来对他们的服务进行问题诊断,在必要时可以回填消息。当下游系统的审计结果跟 Chaperone 出现不一致,我们可以把一些特定的消息导出来进行比较,以便定位问题的根源。
总结
我们构建了 Chaperone 来解决以下问题:
- 是否有数据丢失?如果是,那么丢失了多少数据?它们是在数据管道的哪个地方丢失的?
- 端到端的延迟是多少?如果有消息延迟,是从哪里开始的?
- 是否有数据重复?
Chaperone 不仅仅告诉我们系统的健康情况,它还告诉我们是否有数据丢失。例如,在 Kafka 服务器返回非预期的错误时,uReplicator 会出现死循环,而此时 uReplicator 和 Kafka 都不会触发任何告警,不过我们的检测作业会很快地把问题暴露出来。
如果你想更多地了解 Chaperone,可以自己去探究。我们已经把 Chaperone 开源,它的源代码放在 Github 上。
查看英文原文: Introducing Chaperone:How Uber Engineering Audits Kafka End-To-End
感谢陈兴璐对本文的审校。
给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ , @丁晓昀),微信(微信号: InfoQChina )关注我们。
评论