High Level Consumer
很多时候,客户程序只是希望从 Kafka 读取数据,不太关心消息 offset 的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer 消费(单播)或被所有 Consumer 消费(广播)。因此,Kafka High Level Consumer 提供了一个从 Kafka 消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。
Consumer Group
High Level Consumer 将从某个 Partition 读取的最后一条消息的 offset 存于 ZooKeeper 中( Kafka 从 0.8.2 版本开始同时支持将 offset 存于 Zookeeper 中与将offset 存于专用的Kafka Topic 中)。这个offset 基于客户程序提供给Kafka 的名字来保存,这个名字被称为Consumer Group。Consumer Group 是整个Kafka 集群全局的,而非某个Topic 的。每一个High Level Consumer 实例都属于一个Consumer Group,若不指定则属于默认的Group。ZooKeeper 中Consumer 相关节点如下图所示:
很多传统的Message Queue 都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue 的长度比较短,提高效率。而如上文所述,Kafka 并不删除已消费的消息,为了实现传统Message Queue 消息只被消费一次的语义,Kafka 保证每条消息在同一个Consumer Group 里只会被某一个Consumer 消费。与传统Message Queue 不同的是,Kafka 还允许不同Consumer Group 同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。
实际上,Kafka 的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm 这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop 这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer 在不同的 Consumer Group 即可。下图展示了Kafka 在LinkedIn 的一种简化部署模型。
为了更清晰展示Kafka Consumer Group 的特性,笔者进行了一项测试。创建一个Topic (名为topic1),再创建一个属于group1 的Consumer 实例,并创建三个属于group2 的Consumer 实例,然后通过 Producer 向topic1 发送Key 分别为1,2,3 的消息。结果发现属于group1 的Consumer 收到了所有的这三条消息,同时 group2 中的3 个Consumer 分别收到了Key 为1,2,3 的消息,如下图所示。
(点击放大图像)
注:上图中每个黑色区域代表一个Consumer 实例,每个实例只创建一个MessageStream。实际上,本实验将Consumer 应用程序打成jar 包,并在4 个不同的命令行终端中传入不同的参数运行。
High Level Consumer Rebalance
注:本节所讲述 Rebalance 相关内容均基于 Kafka High Level Consumer。
Kafka 保证同一 Consumer Group 中只有一个 Consumer 会消费某条消息,实际上,Kafka 保证的是稳定状态下每一个 Consumer 实例只会消费某一个或多个特定 Partition 的数据,而某个 Partition 的数据只会被某一个特定的 Consumer 实例所消费。也就是说 Kafka 对消息的分配是以 Partition 为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个 Consumer Group 里的 Consumer 均匀消费数据,优势是每个 Consumer 不用都跟大量的 Broker 通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个 Partition 里的数据是有序的,这种设计可以保证每个 Partition 里的数据可以被有序消费。
如果某 Consumer Group 中 Consumer(每个 Consumer 只创建 1 个 MessageStream)数量少于 Partition 数量,则至少有一个 Consumer 会消费多个 Partition 的数据,如果 Consumer 的数量与 Partition 数量相同,则正好一个 Consumer 消费一个 Partition 的数据。而如果 Consumer 的数量多于 Partition 的数量时,会有部分 Consumer 无法消费该 Topic 下任何一条消息。
如下例所示,如果 topic1 有 0,1,2 共三个 Partition,当 group1 只有一个 Consumer(名为 consumer1) 时,该 Consumer 可消费这 3 个 Partition 的所有数据。
增加一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2 个Partition 的数据(Partition 0 和Partition 1),另外一个Consumer(consumer2) 可消费另外一个Partition(Partition 2)的数据。
再增加一个Consumer(consumer3)后,每个Consumer 可消费一个Partition 的数据。consumer1 消费partition0,consumer2 消费partition1,consumer3 消费partition2。
再增加一个Consumer(consumer4)后,其中3 个Consumer 可分别消费一个Partition 的数据,另外一个Consumer(consumer4)不能消费topic1 的任何数据。
此时关闭consumer1,其余3 个Consumer 可分别消费一个Partition 的数据。
接着关闭consumer2,consumer3 可消费2 个Partition,consumer4 可消费1 个Partition。
再关闭consumer3,仅存的consumer4 可同时消费topic1 的3 个Partition。
Consumer Rebalance 的算法如下:
- 将目标 Topic 下的所有 Partirtion 排序,存于 PT
- 对某 Consumer Group 下所有 Consumer 排序,存于 CG,第 i 个 Consumer 记为 Ci
- N=size(PT)/size(CG),向上取整
- 解除 Ci 对原来分配的 Partition 的消费权(i 从 0 开始)
- 将第 i∗N 到(i+1)∗N−1 个 Partition 分配给 Ci
目前,最新版(0.8.2.1)Kafka 的 Consumer Rebalance 的控制策略是由每一个 Consumer 通过在 Zookeeper 上注册 Watch 完成的。每个 Consumer 被创建时会触发 Consumer Group 的 Rebalance,具体启动流程如下:
- High Level Consumer 启动时将其 ID 注册到其 Consumer Group 下,在 Zookeeper 上的路径为
/consumers/[consumer group]/ids/[consumer id]
- 在
/consumers/[consumer group]/ids
上注册 Watch - 在
/brokers/ids
上注册 Watch - 如果 Consumer 通过 Topic Filter 创建消息流,则它会同时在
/brokers/topics
上也创建 Watch - 强制自己在其 Consumer Group 内启动 Rebalance 流程
在这种策略下,每一个 Consumer 或者 Broker 的增加或者减少都会触发 Consumer Rebalance。因为每个 Consumer 只负责调整自己所消费的 Partition,为了保证整个 Consumer Group 的一致性,当一个 Consumer 触发了 Rebalance 时,该 Consumer Group 内的其它所有其它 Consumer 也应该同时触发 Rebalance。
该方式有如下缺陷:
- Herd effect任何 Broker 或者 Consumer 的增减都会触发所有的 Consumer 的 Rebalance
- Split Brain每个 Consumer 分别单独通过 Zookeeper 判断哪些 Broker 和 Consumer 宕机了,那么不同 Consumer在同一时刻从 Zookeeper“看”到的 View 就可能不一样,这是由 Zookeeper 的特性决定的,这就会造成不正确的 Reblance 尝试。
- 调整结果不可控所有的 Consumer 都并不知道其它 Consumer 的 Rebalance 是否成功,这可能会导致 Kafka工作在一个不正确的状态。
根据 Kafka 社区 wiki,Kafka 作者正在考虑在还未发布的 0.9.x 版本中使用中心协调器 (Coordinator) 。大体思想是为所有 Consumer Group 的子集选举出一个 Broker 作为 Coordinator,由它 Watch Zookeeper,从而判断是否有 Partition 或者 Consumer 的增减,然后生成 Rebalance 命令,并检查是否这些 Rebalance 在所有相关的 Consumer 中被执行成功,如果不成功则重试,若成功则认为此次 Rebalance 成功(这个过程跟 Replication Controller 非常类似)。具体方案将在后文中详细阐述。
Low Level Consumer
使用 Low Level Consumer (Simple Consumer) 的主要原因是,用户希望比 Consumer Group 更好的控制数据的消费。比如:
- 同一条消息读多次
- 只读取某个 Topic 的部分 Partition
- 管理事务,从而确保每条消息被处理一次,且仅被处理一次
与 Consumer Group 相比,Low Level Consumer 要求用户做大量的额外工作。
- 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个 Partition 的 Leader 是谁
- 必须处理 Leader 的变化
使用 Low Level Consumer 的一般流程如下
- 查找到一个“活着”的 Broker,并且找出每个 Partition 的 Leader
- 找出每个 Partition 的 Follower
- 定义好请求,该请求应该能描述应用程序需要哪些数据
- Fetch 数据
- 识别 Leader 的变化,并对之作出必要的响应
Consumer 重新设计
根据社区社区 wiki,Kafka 在 0.9.* 版本中,重新设计 Consumer 可能是最重要的 Feature 之一。本节会根据社区 wiki 介绍 Kafka 0.9.* 中对 Consumer 可能的设计方向及思路。
设计方向
简化消费者客户端
部分用户希望开发和使用 non-java 的客户端。现阶段使用 non-java 发 SimpleConsumer 比较方便,但想开发 High Level Consumer 并不容易。因为 High Level Consumer 需要实现一些复杂但必不可少的失败探测和 Rebalance。如果能将消费者客户端更精简,使依赖最小化,将会极大的方便 non- java 用户实现自己的 Consumer。
中心 Coordinator
如上文所述,当前版本的 High Level Consumer 存在 Herd Effect 和 Split Brain 的问题。如果将失败探测和 Rebalance 的逻辑放到一个高可用的中心 Coordinator,那么这两个问题即可解决。同时还可大大减少 Zookeeper 的负载,有利于 Kafka Broker 的 Scale Out。
允许手工管理 offset
一些系统希望以特定的时间间隔在自定义的数据库中管理 Offset。这就要求 Consumer 能获取到每条消息的 metadata,例如 Topic,Partition,Offset,同时还需要在 Consumer 启动时得到每个 Partition 的 Offset。实现这些,需要提供新的 Consumer API。同时有个问题不得不考虑,即是否允许 Consumer 手工管理部分 Topic 的 Offset,而让 Kafka 自动通过 Zookeeper 管理其它 Topic 的 Offset。一个可能的选项是让每个 Consumer 只能选取 1 种 Offset 管理机制,这可极大的简化 Consumer API 的设计和实现。
Rebalance 后触发用户指定的回调
一些应用可能会在内存中为每个 Partition 维护一些状态,Rebalance 时,它们可能需要将该状态持久化。因此该需求希望支持用户实现并指定一些可插拔的并在 Rebalance 时触发的回调。如果用户使用手动的 Offset 管理,那该需求可方便得由用户实现,而如果用户希望使用 Kafka 提供的自动 Offset 管理,则需要 Kafka 提供该回调机制。
非阻塞式 Consumer API
该需求源于那些实现高层流处理操作,如 filter by, group by, join 等,的系统。现阶段的阻塞式 Consumer 几乎不可能实现 Join 操作。
如何通过中心 Coordinator 实现 Rebalance
成功 Rebalance 的结果是,被订阅的所有 Topic 的每一个 Partition 将会被 Consumer Group 内的一个(有且仅有一个)Consumer 拥有。每一个 Broker 将被选举为某些 Consumer Group 的 Coordinator。某个 Cosnumer Group 的 Coordinator 负责在该 Consumer Group 的成员变化或者所订阅的 Topic 的 Partititon 变化时协调 Rebalance 操作。
Consumer
1) Consumer 启动时,先向 Broker 列表中的任意一个 Broker 发送 ConsumerMetadataRequest,并通过 ConsumerMetadataResponse 获取它所在 Group 的 Coordinator 信息。ConsumerMetadataRequest 和 ConsumerMetadataResponse 的结构如下
ConsumerMetadataRequest { GroupId => String } ConsumerMetadataResponse { ErrorCode => int16 Coordinator => Broker }
2)Consumer 连接到 Coordinator 并发送 HeartbeatRequest,如果返回的 HeartbeatResponse 没有任何错误码,Consumer 继续 fetch 数据。若其中包含 IllegalGeneration 错误码,即说明 Coordinator 已经发起了 Rebalance 操作,此时 Consumer 停止 fetch 数据,commit offset,并发送 JoinGroupRequest 给它的 Coordinator,并在 JoinGroupResponse 中获得它应该拥有的所有 Partition 列表和它所属的 Group 的新的 Generation ID。此时 Rebalance 完成,Consumer 开始 fetch 数据。相应 Request 和 Response 结构如下
HeartbeatRequest { GroupId => String GroupGenerationId => int32 ConsumerId => String } HeartbeatResponse { ErrorCode => int16 } JoinGroupRequest { GroupId => String SessionTimeout => int32 Topics => [String] ConsumerId => String PartitionAssignmentStrategy => String } JoinGroupResponse { ErrorCode => int16 GroupGenerationId => int32 ConsumerId => String PartitionsToOwn => [TopicName [Partition]] } TopicName => String Partition => int32
Consumer 状态机
Down:Consumer 停止工作
Start up & discover coordinator:Consumer 检测其所在 Group 的 Coordinator。一旦它检测到 Coordinator,即向其发送 JoinGroupRequest。
Part of a group:该状态下,Consumer 已经是该 Group 的成员,并周期性发送 HeartbeatRequest。如 HeartbeatResponse 包含 IllegalGeneration 错误码,则转换到 Stopped Consumption 状态。若连接丢失,HeartbeatResponse 包含 NotCoordinatorForGroup 错误码,则转换到 Rediscover coordinator 状态。
Rediscover coordinator:该状态下,Consumer 不停止消费而是尝试通过发送 ConsumerMetadataRequest 来探测新的 Coordinator,并且等待直到获得无错误码的响应。
Stopped consumption:该状态下,Consumer 停止消费并提交 offset,直到它再次加入 Group。
故障检测机制
Consumer 成功加入 Group 后,Consumer 和相应的 Coordinator 同时开始故障探测程序。Consumer 向 Coordinator 发起周期性的 Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若 Consumer 在 session.timeout.ms 内未收到 HeartbeatResponse,或者发现相应的 Socket channel 断开,它即认为 Coordinator 已宕机并启动 Coordinator 探测程序。若 Coordinator 在 session.timeout.ms 内没有收到一次 HeartbeatRequest,则它将该 Consumer 标记为宕机状态并为其所在 Group 触发一次 Rebalance 操作。
Coordinator Failover 过程中,Consumer 可能会在新的 Coordinator 完成 Failover 过程之前或之后发现新的 Coordinator 并向其发送 HeatbeatRequest。对于后者,新的 Cooodinator 可能拒绝该请求,致使该 Consumer 重新探测 Coordinator 并发起新的连接请求。如果该 Consumer 向新的 Coordinator 发送连接请求太晚,新的 Coordinator 可能已经在此之前将其标记为宕机状态而将之视为新加入的 Consumer 并触发一次 Rebalance 操作。
Coordinator
1)稳定状态下,Coordinator 通过上述故障探测机制跟踪其所管理的每个 Group 下的每个 Consumer 的健康状态。
2)刚启动时或选举完成后,Coordinator 从 Zookeeper 读取它所管理的 Group 列表及这些 Group 的成员列表。如果没有获取到 Group 成员信息,它不会做任何事情直到某个 Group 中有成员注册进来。
3)在 Coordinator 完成加载其管理的 Group 列表及其相应的成员信息之前,它将为 HeartbeatRequest,OffsetCommitRequest 和 JoinGroupRequests 返回 CoordinatorStartupNotComplete 错误码。此时,Consumer 会重新发送请求。
4)Coordinator 会跟踪被其所管理的任何 Consumer Group 注册的 Topic 的 Partition 的变化,并为该变化触发 Rebalance 操作。创建新的 Topic 也可能触发 Rebalance,因为 Consumer 可以在 Topic 被创建之前就已经订阅它了。
Coordinator 发起 Rebalance 操作流程如下所示。
Coordinator 状态机
Down:Coordinator 不再担任之前负责的 Consumer Group 的 Coordinator
Catch up:该状态下,Coordinator 竞选成功,但还未能做好服务相应请求的准备。
Ready:该状态下,新竞选出来的 Coordinator 已经完成从 Zookeeper 中加载它所负责管理的所有 Group 的 metadata,并可开始接收相应的请求。
Prepare for rebalance:该状态下,Coordinator 在所有 HeartbeatResponse 中返回 IllegalGeneration 错误码,并等待所有 Consumer 向其发送 JoinGroupRequest 后转到 Rebalancing 状态。
Rebalancing:该状态下,Coordinator 已经收到了 JoinGroupRequest 请求,并增加其 Group Generation ID,分配 Consumer ID,分配 Partition。Rebalance 成功后,它会等待接收包含新的 Consumer Generation ID 的 HeartbeatRequest,并转至 Ready 状态。
Coordinator Failover
如前文所述,Rebalance 操作需要经历如下几个阶段
1)Topic/Partition 的改变或者新 Consumer 的加入或者已有 Consumer 停止,触发 Coordinator 注册在 Zookeeper 上的 watch,Coordinator 收到通知准备发起 Rebalance 操作。
2)Coordinator 通过在 HeartbeatResponse 中返回 IllegalGeneration 错误码发起 Rebalance 操作。
3)Consumer 发送 JoinGroupRequest
4)Coordinator 在 Zookeeper 中增加 Group 的 Generation ID 并将新的 Partition 分配情况写入 Zookeeper
5)Coordinator 发送 JoinGroupResponse
在这个过程中的每个阶段,Coordinator 都可能出现故障。下面给出 Rebalance 不同阶段中 Coordinator 的 Failover 处理方式。
1)如果 Coordinator 的故障发生在第一阶段,即它收到 Notification 并未来得及作出响应,则新的 Coordinator 将从 Zookeeper 读取 Group 的 metadata,包含这些 Group 订阅的 Topic 列表和之前的 Partition 分配。如果某个 Group 所订阅的 Topic 数或者某个 Topic 的 Partition 数与之前的 Partition 分配不一致,亦或者某个 Group 连接到新的 Coordinator 的 Consumer 数与之前 Partition 分配中的不一致,新的 Coordinator 会发起 Rebalance 操作。
2)如果失败发生在阶段 2,它可能对部分而非全部 Consumer 发出带错误码的 HeartbeatResponse。与第上面第一种情况一样,新的 Coordinator 会检测到 Rebalance 的必要性并发起一次 Rebalance 操作。如果 Rebalance 是由 Consumer 的失败所触发并且 Cosnumer 在 Coordinator 的 Failover 完成前恢复,新的 Coordinator 不会为此发起新的 Rebalance 操作。
3)如果 Failure 发生在阶段 3,新的 Coordinator 可能只收到部分而非全部 Consumer 的 JoinGroupRequest。 Failover 完成后,它可能收到部分 Consumer 的 HeartRequest 及另外部分 Consumer 的 JoinGroupRequest。与第 1 种情况类似,它将发起新一轮的 Rebalance 操作。
4)如果 Failure 发生在阶段 4,即它将新的 Group Generation ID 和 Group 成员信息写入 Zookeeper 后。新的 Generation ID 和 Group 成员信息以一个原子操作一次性写入 Zookeeper。Failover 完成后,Consumer 会发送 HeartbeatRequest 给新的 Coordinator,并包含旧的 Generation ID。此时新的 Coordinator 通过在 HeartbeatResponse 中返回 IllegalGeneration 错误码发起新的一轮 Rebalance。这也解释了为什么每次 HeartbeatRequest 中都需要包含 Generation ID 和 Consumer ID。
5)如果 Failure 发生在阶段 5,旧的 Coordinator 可能只向 Group 中的部分 Consumer 发送了 JoinGroupResponse。收到 JoinGroupResponse 的 Consumer 在下次向已经失效的 Coordinator 发送 HeartbeatRequest 或者提交 Offset 时会检测到它已经失败。此时,它将检测新的 Coordinator 并向其发送带有新的 Generation ID 的 HeartbeatRequest。而未收到 JoinGroupResponse 的 Consumer 将检测新的 Coordinator 并向其发送 JoinGroupRequest,这将促使新的 Coordinator 发起新一轮的 Rebalance。
作者简介
郭俊(Jason),硕士,从事大数据平台研发工作,精通 Kafka 等分布式消息系统,Storm 等流式处理系统及数据库性能调优。
新浪微博:郭俊_Jason
微信: habren
公众号:大数据架构
个人博客: http://www.jasongj.com
下篇预告
下篇文章将详细介绍 Kafka 性能测试方法及相应的性能测试报告。
感谢郭蕾对本文的策划和审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ , @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。
评论