Kafka 是一个非常流行的分布式消息队列,它可以实时地支持海量的数据,可以支持非常高的吞吐率和低延时,它在 Uber 得到了非常广泛的应用,很多的核心业务都会用到它,比如打车服务和送餐服务。
本文将介绍 Uber 如何搭建一个基于 Kafka 的跨数据中心复制平台,除了有关 Kafka 在 Uber 的应用,还有 Uber 的 Kafka Pipeline,以及在哪些应用场景下会用到跨数据中心的复制等等。
本文整理自徐宏亮在 ArchSummit 全球架构师峰会 2019 北京站的演讲。
今天我要讲的题目是:Uber 如何搭建一个基于 Kafka 的跨数据中心复制平台。
首先我会介绍一下 Kafka 在 Uber 的应用;然后是 Uber 的 Kafka Pipeline,以及在哪些应用场景下会用到跨数据中心的复制;之后会介绍我们开发的跨数据中心复制平台 uReplicator,最后是我们在复制过程中所做的数据丢失检测。
Apache Kafka at Uber
Kafka 是一个非常流行的分布式消息队列,它可以实时地支持海量的数据,可以支持非常高的吞吐率和低延时,它在 Uber 得到了非常广泛的应用,很多的核心业务都会用到它,比如打车服务和送餐服务。
在使用打车服务时,司机和用户的应用都会将自己当前的信息实时发送到 Kafka,我们在 Kafka 的下游有一个流处理的平台,它从 Kafka 实时读取数据进行计算,得出动态打车的价格。
而送餐服务有一个重要特性,需要计算出实时的预计送达时间,大家打开应用的时候,会看到每一个餐馆后面都会有一个预计的送达时间,我们的后台会有一个机器学习的平台,实时从 Kafka 读取商家、司机和用户三方的数据,用机器学习的方法去动态计算这个送达时间。当然还有很多这样的例子,包括我们对诈骗的实时检测、司机和用户的注册,这些都可以基于 Kafka。
总结来看,Kafka 在 Uber 的应用主要有以下五大类场景:
第一类是将 Kafka 作为最基本的消息队列。各种服务和应用都可以将数据发送到 Kafka,也可以从 Kafka 进行读取,不允许服务之间进行直接的通信,从而避免网状的通信;
第二类是将 Kafka 作为流处理平台数据的来源。在 Uber 我们有一个开源的流处理平台 AthenaX,在这个平台上我们可以运行 Samza 和 Flink 的 job,它们从 Kafka 读取数据进行实时计算,计算完成后也可以将数据发回给 Kafka,再传回到下游;
第三类是将 Kafka 作为数据库。我们的数据库会将自己的变更日志通过 Kafka 传输到下游,也会通过 Kafka 进行跨数据中心的传输;
第四类是将 Kafka 作为批量加载的服务。如果想把数据加载到 HDFS 和 S3,首先需要将数据发送到 Kafka,再由 Kafka 统一加载到 HDFS 和 S3;
第五类是将 Kafka 作为常见服务和应用的日志。所有的服务和应用都可以使用我们提供的库将日志发送到 Kafka,然后用户既可以直接从 Kafka 进行读取来进行调试,也可以通过 Kafka 将日志加载到 Elasticsearch 之类的检索平台进行搜索。
在这五类场景中,Kafka 在 Uber 扮演了一个数据中枢的角色,基本上所有的服务都会将数据发送到 Kafka,而后再进行读取和进一步的操作。
Apache Kafka pipeline & replication
因为对 Kafka 有着非常广泛的应用,所以我们的 Kafka 集群规模也非常大。我们的集群现在有数万个 Topic,每天都会处理超过万亿条的数据,每天的数据量也会达到 PB 的级别。下面我们来看一下 Uber 构建了怎样的 Kafka pipeline,以支持这些应用的场景。
在 Uber 我们有多个数据中心,每个数据中心都会有一个单独的 Kafka pipeline,在每个 pipeline 里会有多个 Regional Kafka 集群,每个集群处理不同类型的数据,比如有的集群处理服务日志相关的数据,有的集群处理核心业务相关的数据,从而做到较好地隔离不同类型的数据。
在每一个 Kafka 的集群前面,我们都会有一个 Kafka REST Proxy 集群,它其实就是一个 REST service,所有应用和服务都会使用所提供的 REST Client 将数据首先发送到 REST Proxy,再由 REST Proxy 统一发送到 Kafka,这样就做到了 Client 和 Kafka broker 之间的隔离。
每一个 Pipeline 最右边都有一个 Aggregate Kafka 集群,它的作用是收集所有数据中心里 Regional Kafka 的数据,相当于它会拥有全局的数据。如果一些服务处理需要全局数据的话,可以从 Aggregate Kafka 读取,再进行进一步的操作。在 Regional Kafka 和 Aggregate Kafka 集群之间,由 uReplicator 跨数据中心、数据平台进行实时复制。
下面,我们来看一下,我们在哪些场景下会需要用到跨数据中心的复制。
第一个应用场景是 Aggregation,很多服务都需要一个 Global view 来处理数据,需要一个复制平台将所有的数据从所有数据中心的 Regional 集群复制到 Aggregate 集群。
第二个应用场景,对于那些需要 Global view 的服务来说,他们通常只运行一个数据中心,当这个数据中心出现问题以后需要切换到另外的数据中心,在切换的过程中,他们需要知道切换之后需要从哪一个 offset 继续开始读取数据,所以复制平台在复制的过程中,还需要做一个额外操作,实时将复制数据的 source 和 Destination 的 offset 的对应关系发送到一个叫 offset Sync 的服务,这个服务可以实时收集所有 Partition offset 的对应关系,计算出从一个数据中心切换到另一个数据中心后需要从哪个 offset 进行读取。这样既可以保证没有数据丢失,还可以保证将重复数据降到最少。
第三个应用场景是批量加载服务,我们需要首先将所有的数据复制到 Aggregate 集群,再统一加载到 HDFS 或 S3,这样可以避免零星加载,也可以使加载过程做到更好的批量化。
最后一个应用场景是,我们可以利用这个复制平台来做 Topic 的迁移,既可以从一个数据中心迁移到另外一个数据中心,也可以在一个数据中心里将一个集群迁移到另外一个集群。Topic 迁移的难点在于通常 Producer 和 Consumer 都不在一个组,也有可能会有多个 Producer 和 Consumer,所以我们很难做到让它们在同一个时间进行切换。
我们的做法是,从新的数据中心往老的数据中心迁移时,先搭建一个复制的集群进行数据的复制。我们可以把 Producer 首先迁移到新的数据中心,因为有了这个复制集群,所以两个数据中心会有相同的数据,这样 Consumer 就可以在 Producer 迁移完之后,再慢慢迁移。如果我们有多个 Consumer,他们也可以按照自己的计划分别进行迁移,当所有的 Consumer 都迁移完以后,我们才可以把这个复制集群移除,至此迁移的过程就算完成了。这么做的好处是,在迁移的过程中可以把对 Consumer 的 down time 降到最低。
uReplicator
那么,我们为什么想要开发一个我们自己的跨数据中心复制平台呢?
2015 年,我们使用的是 Kafka 自带的 MirrorMaker,在使用的过程中碰到了很多的问题。
首先,它使用了 Kafka 的 High-Level Consumer,所以就不可避免地会发生昂贵的、非预期的 Consumer Rebalance,虽然 Rebalance 可以更好地将 Partition 分配到 Worker 上,但 High-Level Consumer Rebalance 有几个缺点。
第一, 不可以控制它什么时候发生,比如增加或减少 Topic 时可能会触发 Rebalance,增加或减少 Worker 时也可能会触发 Rebalance;
第二, Rebalance 的过程中所有复制都会停止,这段时间任何数据都无法流入到下流,这非常不好。另外,在 Rebalance 结束之后,会留下这段时间的 backlog,MirrorMaker 会有一个巨大的 traffic 去 Catch-up 这些 backlog,这样的 spark 对 Destination 的 broker 也有非常大的影响。
第三, 当时 MirrorMaker 使用了一个静态的 Topic List,需要在 MirrorMaker 集群启动的时候读取,想要增加或者减少一个 Topic 时,需要更新这个静态的文件列表,再重启整个 MirrorMaker 集群。每次都要维护静态文件很不方便,而且在重启集群的时候又会导致触发 Rebalance 的问题。
第四, 当时的 MirrorMaker 无法控制 Consumer Commit offset 的时间,经常出现把数据发送到了 Kafka 的 producer 里,但是还没有实际送到 Destination 的情况。这个时候,Consumer 就 Commit 了 offset,在这种情况下,如果这一个 MirrorMaker 的 Worker Crash 了,恢复以后就不会再去重复复制这些信息,那么这些信息就彻底丢失了。
最后,因为 MirrorMaker 使用了一些静态的配置文件,当这些文件没有被同步到所有 Worker 上的机器时,启动时就会发生各种问题。
所以基于以上这些问题,我们决定开发一个自己的复制平台。我们对这个平台提出了五个要求:
Stable replication
希望这个平台有稳定复制的能力,如果有 Topic 或者 Worker 在复制过程发生变动,复制行为不应该受到影响,还可以继续稳定复制。
Simple operations
希望可以方便地增加或者删除 Topic,而不需要维护静态文件,不需要在每次改变 Topic 的时候重启整个集群。
High throughput
希望这个平台有非常好的性能,以满足 Uber 数据量的增长。
No data loss
希望在复制的过程中可以保证没有数据丢失。
Auditing
和上一个相关,希望有机制可以检测是否真的发生了数据丢失。
基于这些要求,我们开发了第一代 uReplicator,引入了分布式的任务管理框架 Apache Helix。简单来说,它由 Controller、Participant、Resource 几部分构成。在我们 uReplicator 的具体应用中,Controller 就相当于我们的 uReplicator Controller,可以用来做 Partition 的分配;Participant 相当于实际用来进行复制操作的每个机器;而 Resource 就是每个 Kafka Topic。
Helix 的工作原理是在 Controller 里通过自带的算法或者用户自定义的算法进行一定的计算,把每个 Resource 以 Partition 为单位,分配到 Worker 上,Worker 在收到以后再进行进一步的操作。所以,在引入 Helix 以后,我们相当于取代了 High-Level Consumer 里面的 Consumer Group Coordinato 角色,完全自己控制 Partition 分配的行为。这样我们直接解决了两个问题,第一是稳定复制的能力,当 Topic 或者 Worker 发生变动的时候,可以控制这个变动所带来的影响;第二个好处是,我们在 Controller 里加了一个 End Point,这样我们就可以使用 curl 命令,在运行的时候动态地进行 Topic 的增加和删除,这样就弃掉了静态的 Topic List,不需要再重启整个集群。
我们还对 MirrorMaker 的 Worker 行了一系列的改进,把它演变成了我们的 uReplicator 的 Worker。我们没有改变模式,但将 MirrorMaker Worker 里面 High-Level Consumer 替换成了 SimpleConsumer,这样我们就有了更多底层的控制,可以控制更多 Consumer 的行为,并可以更好发控制 Simple Consumer 的数量,达到更高的并行度,增加吞吐量。另外我们修改了 Commit Offset 的逻辑,可以保证在运行的过程中,只有当 Worker 在 Producer 里的所有数据都被发送到 Destination broker,并且收到了发送成功的返回以后再去 Commit Offset,从而保证数据不会被丢失。如果 Worker crash 了,它就可以从上一个 Commit Offset 进行复制,虽然会有数据重复,但可以避免数据丢失。
Performance Issues
在大概 2017 年初的时候,我们发现随着 Uber 数据量的增长,性能已经不能满足当时 Uber 的吞吐量。当时我们做了很多研究,提出了很多解决方案。在此会介绍其中的 4 个解决方案。
Solution 1: Increase Batch Size
首先我们在改进的过程中发现,在 Catch-up 的过程中分为两个阶段,一个是全速(Full-speed)阶段,另外一个是 Long tail 阶段。全速阶段的定义是在这个过程中,所有的 Worker 上都含有非常高的 Backlog,每个 Worker 都在全速 Catch-up 这些 Backlog。我们在这个阶段观察到的第一个现象,Destination broker 上的 CPU 利用率非常高,从 Kafka 的指标来看,它上面 RequestHandler 的利用率也非常高,这说明每个 broker 都收到了太多发送的请求,由于 CPU 的限制,没办法把每一个发送的请求更快地处理掉,接收更多发送的请求。
对此问题一个最直观的解决方案是,增大每一个发送请求的 Batch Size,从而减少请求的频率或者数目,我们相应地增加了 uReplicator Worker 里一些 Kafka Producer 的参数,比如 batch size 和 linger time,增加这些参数之后效果非常明显,每个 request batch size 增大了将近四到五倍,同时因为 batch size 变大,压缩比例也变得更好了。
Solution 2: 1-1 Partition Mapping
我们发现 Kafka Producer 默认会用 Round robin 的方式,将数据发送到 Destination 的每个 Partition,但这样带来一个问题,假设我们有一个 Topic,它有 N 个 Partition,通常会被分配到 N 个 Worker 上,每一个 Worker 都会往 Destination 的 N 个 Partition 上发送数据,所以一个 Topic 就相当于产生了 N 方个连接。可以想象一下,我们有数万个 Topic,当所有连接都发生的时候,相当于我们的复制平台对我们自己的 Kafka 的集群进行了一个 DDOS 攻击。
我们对于这个问题的解决方案是引入了一个 1-1 Partition Mapping,如果一个 Message 在 Source 是 Partition I,我们把它发送到 Destination 时,也会继续把它发送到 Partition I,这样我们所有连接的数量就和 Partition 的数量成正比,很大地减轻了 Destination Broker 上的压力。在采用了这些改进措施以后,我们每个 Destination Broker 的吞吐量提升了将近 1 倍,并且这个吞吐量是压缩比更高的吞吐量。
Solution 3: Dynamic Workload Balance
下面我们来看一下,对 Long Tail Phase 做了哪些改进。 Long Tail Phase 的定义是一些 Worker 已经 Catch-up 了 backlog,变得比较空闲,但另外一些 Worker 它还在全速进行 Catch-up。
一个非常直观的解决方案是,我们希望将那些还在忙碌的 Worker 上的 Workload 分配到那些已经空闲的 Worker 上,使它们都可以全速地工作。所以我们提出的第一个解决方案是动态 Workload balance,一开始 uReplicator、MirrorMaker 进行 Partition 分配的逻辑,是希望每个 Worker 都有相同数量的 Partition,以求平均。这么做本身并没有问题,但是当数据量特别大的情况下,一些 Workload 比较大的 Partition 可能就会被分配到同一个 Worker 上,导致那个 Worker 被 overload,就不能 Catch-up 所有的 Backlog。
我们的做法不是用 Partition 的数量来进行分配,而是用 Workload 来进行分配,在我们的内部有一个叫 Chaperone 的服务,它可以实时提供每一个 Topic 在 Source Kafka 集群里面的 bytes-in-rate,我们的 Controller 就会从这个服务去实时获取 Workload,根据 Workload 进行 partition 的分配,最终做到每个 Worker 上都有相同数量的 Workload。
Solution 4: Lag-Feedback Rebalance
我们发现在数据中心切换的场景下,很多时候一些 Workload 并不大的 Partition 也可能有比较大的 backlog,如果只按照前面说的 Workload 去做 Balance,就没有办法为它们分配到更多的资源,但其实在 Lag 非常大的情况下,即使这些 Topic 的 Load 不大,每个 Worker 也都需要分配出相当一部分带宽去进行复制。
所以我们引入了一个 Lag-Feedback Rebalance 机制,我们认为在 Lag 非常大的情况下,它相对应的 Workload 也变大了。所以我们在计算 Workload 的时候引入了一个系数,这个系数会随着 Lag 变大,从而将 Lag 和整体的 Workload 统一。
我们采用了动态 Workload Rebalance 后,每隔十分钟会做一次 Rebalance,每做一次 Rebalance 之后,每个 Worker 上 Partition 的数量都会有一些细微的调整,Workload 或者说 Lag 特别大的一些 Worker,它上面有很大的 Backlog,在 Rebalance 之后它的 Backlog 就被挪到了一些空闲的 Worker 上。从而使所有的 Worker 都可以全速地 Catch-up。
至此,我们已经满足了一开始提出的前四项要求,该去实现最后一项要求了。但很不幸的是,在 2017 年年终的时候,我们发现前两个要求又得不到满足了,这是由于多方面的原因。
第一,随着 Uber 数据量的增长,我们每一个 Kafka 集群的规模也在不断增大,每个集群里 Partition 的数量也在增长。在当时,我们 uReplicator 的部署方式是,一个 Source 到 Destination 的 Kafka 的集群由一个 uReplicator 部署来负责,对于最大的那个 Kafka 集群来说,那个 uReplicator 里面需要负责处理非常多的 Partition。所以 Helix 在这个时候就碰到了一个计算上的瓶颈,我们发现在做 rolling restart 的时候,可能需要将近一个小时才能做一次 full Balance,在这一个小时里,很多 Partition 就停止了复制,于是稳定复制的要求就不能得到满足了。
第二,在当时 Uber Kafka 集群的数量也在不断的增长,Kafka 部署的数量通常会跟 Kafka 集群的数量成倍数关系,一旦我们又想要有新的 Kafka 集群的时候,需要去建立更多 uReplicator 的部署。当我们有这么大的部署量时,就很难去维护它们了,很难控制每个部署里面运行的版本号。
第三,因为有那么多的 uReplicator 部署,我们很容易就会引入一些人为产生的错误,比如有一次,我们 whitelist 一个 Topic,不小心在两个 Kafka 集群之间相互 whitelist,就变成了一个死循环。那个 Topic 本身也就 10QPS,最后因为相互的复制,变成了 100KQPS。另外一个可能出现的人为错误是,从一个 Source 到 Destination 可能有不同的路径,可能有时候会错误地将它们在两个不同的路径上对同一个 Topic 进行了 whitelist,这样就会重复复制同一份数据。
基于以上这些问题,我们开发了第二代 uReplicator,我们叫它 Federated uReplicator,它是在第一代 uReplicator 的基础上,加入了一个 Federation 层。
在 Federation 模式下,第一个解决的是 Partition 数量过多的问题。我们的做法是将 Partition 数量非常大的每一个 uReplicator 部署拆分成几个更小的部署,叫做 route。我们在每个 route 里规定了 Partition 数目的上限,有了这个上限,使每个 route 里面的 Helix 计算 Partition 分配时变得非常快。
第二个需要解决的是部署数量过多的问题。在第一代 uReplicator 里,每个部署都需要自己或者用 Script 人为完成,我们为此引入了一个叫做 Deployment Group 的概念,它的思想是把相同类型的部署聚合到一个 Group 里。打个比方,可能我们原来有几十个都是用来做跟 Aggregation 相关的部署,现在我们把它聚合到一个 Aggregation Deployment Group,原来我们可能需要人为地去直接面对几十个 Deployment,现在,我们就只需要面对一个 Deployment Group。这样不管有多少 Kafka 集群,我们只需要面对有限个类型的 Deployment Group。
在每一个 Deployment Group 里,都会有一个叫 uReplicator Manager 的模块来进行每个 route 的创建和维护工作。Manager 的第一个工作是收到一个 Topic whitelist 请求的时候,会决定是把这个 Topic 加入到已有的 route 里面,还是重新建一个新的 route。它创建新 route 的条件是需要保证现有的每一个 route 里 Partition 的数目不会超过我们规定的上限。Manager 的第二个工作是,它也会从 chaperone 服务获取 route 里所有 Topic 的 Workload,然后计算整个 route 有多少 Workload,据此计算出这个 route 需要多少个 Worker 进行复制可以达到一定的吞吐率。它同时可以阶段性的去获取数据,动态进行 Worker 的调整,当 Workload 变少以后就可以自动释放这个 route 里的一些 Worker,可以更好地利用硬件资源,不造成浪费。
在 uReplicator Manager 之前有一个叫做 uReplicator Front 的模块,使用户可以直接做 whitelist/blacklist Topic。在这个模块里有一个功能,会将 whitelist 的信息都存在数据库里,当它收到一个 whitelist 的请求时候,会从数据库或者内存里将这个 Topic 相关的 whitelist 信息都读取出来,我们可以把每一个 whitelist 想象成一个有向图,在收到 whitelist 请求以后,它可以判断在这个有向图里新加入一条边后,会不会造成 Loop 和 Double route 的情况。当所有检查都通过后,它才会将这个 whitelist 请求发送到 Manage,去做实际 whitelist 的操作。
Data loss detection
在我们开发了 Federated uReplicator 之后,终于满足了一开始提出的前四项要求。现在介绍一下,我们是怎么做数据丢失检测的。我们做数据丢失检测的服务,就是刚才提到的 chaperone 服务。它不光可以提供 Workload 信息,还可以做数据丢失的检测。
它的具体做法是,在整个 Kafka pipeline 的每个 tier 上分别 Kafka message 数量的进行统计,汇总到 Cassandra,然后比较 Regional Kafka 和 Aggregate Kafka 中 Message 的数量,从而判断是否有数据的丢失。
最终,我们满足了一开始在开发这个复制平台时提出的所有要求,uReplicator 和 Chaperone 都在我们的生产环境里运行了两年以上,这两个都是我们的开源项目,在 Github 上可以找到它们的源代码。
评论 1 条评论