写点什么

Kafka 设计解析(三):Kafka High Availability (下)

  • 2015-06-11
  • 本文字数:9712 字

    阅读完需:约 32 分钟

Kafka 是由 LinkedIn 开发的一个分布式的消息系统,使用 Scala 编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark 都支持与 Kafka 集成。InfoQ 一直在紧密关注Kafka 的应用以及发展,“Kafka 剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

本文在上篇文章基础上,更加深入讲解了Kafka 的HA 机制,主要阐述了HA 相关各种场景,如Broker failover、Controller failover、Topic 创建/ 删除、Broker 启动、Follower 从Leader fetch 数据等详细处理过程。同时介绍了Kafka 提供的与Replication 相关的工具,如重新分配Partition 等。

Broker Failover 过程

Controller 对 Broker failure 的处理过程

  1. Controller 在 ZooKeeper 的/brokers/ids节点上注册 Watch。一旦有 Broker 宕机(本文用宕机代表任何让 Kafka 认为其 Broker die 的情景,包括但不限于机器断电,网络不可用,GC 导致的 Stop The World,进程 crash 等),其在 ZooKeeper 对应的 Znode 会自动被删除,ZooKeeper 会 fire Controller 注册的 Watch,Controller 即可获取最新的幸存的 Broker 列表。
  2. Controller 决定 set_p,该集合包含了宕机的所有 Broker 上的所有 Partition。
  3. 对 set_p 中的每一个 Partition: 3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该 Partition 当前的 ISR。

3.2 决定该 Partition 的新 Leader。如果当前 ISR 中有至少一个 Replica 还幸存,则选择其中一个作为新 Leader,新的 ISR 则包含当前 ISR 中所有幸存的 Replica。否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)。如果该 Partition 的所有 Replica 都宕机了,则将新的 Leader 设置为 -1。

3.3 将新的 Leader,ISR 和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有 Controller 版本在 3.1 至 3.3 的过程中无变化时才会执行,否则跳转到 3.1。
4. 直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令。Controller 可以在一个 RPC 操作中发送多个命令从而提高效率。 Broker failover 顺序图如下所示。

LeaderAndIsrRequest 结构如下

LeaderAndIsrResponse 结构如下

创建 / 删除 Topic

  1. Controller 在 ZooKeeper 的/brokers/topics节点上注册 Watch,一旦某个 Topic 被创建或删除,则 Controller 会通过 Watch 得到新创建 / 删除的 Topic 的 Partition/Replica 分配。
  2. 对于删除 Topic 操作,Topic 工具会将该 Topic 名字存于/admin/delete_topics。若delete.topic.enable为 true,则 Controller 注册在/admin/delete_topics上的 Watch 被 fire,Controller 通过回调向对应的 Broker 发送 StopReplicaRequest,若为 false 则 Controller 不会在/admin/delete_topics上注册 Watch,也就不会对该事件作出反应。
  3. 对于创建 Topic 操作,Controller 从/brokers/ids读取当前所有可用的 Broker 列表,对于 set_p 中的每一个 Partition: 3.1 从分配给该 Partition 的所有 Replica(称为 AR)中任选一个可用的 Broker 作为新的 Leader,并将 AR 设置为新的 ISR(因为该 Topic 是新创建的,所以 AR 中所有的 Replica 都没有数据,可认为它们都是同步的,也即都在 ISR 中,任意一个 Replica 都可作为 Leader)

3.2 将新的 Leader 和 ISR 写入/brokers/topics/[topic]/partitions/[partition]
4. 直接通过 RPC 向相关的 Broker 发送 LeaderAndISRRequest。 创建 Topic 顺序图如下所示。

Broker 响应请求流程

Broker 通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于 Java NIO 开发,并采用 Reactor 模式,其中包含 1 个 Acceptor 负责接受客户请求,N 个 Processor 负责读写数据,M 个 Handler 处理业务逻辑。

Acceptor 的主要职责是监听并接受客户端(请求发起方,包括但不限于 Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个 Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。

Processor 主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有 SocketChannel。Processor 的 run 方法会循环从队列中取出新的 SocketChannel 并将其SelectionKey.OP_READ注册到 selector 上,然后循环处理已就绪的读(请求)和写(响应)。Processor 读取完数据后,将其封装成 Request 对象并将其交给 RequestChannel。

RequestChannel 是 Processor 和 KafkaRequestHandler 交换数据的地方,它包含一个队列 requestQueue 用来存放 Processor 加入的 Request,KafkaRequestHandler 会从里面取出 Request 来处理;同时它还包含一个 respondQueue,用来存放 KafkaRequestHandler 处理完 Request 后返还给客户端的 Response。

Processor 会通过 processNewResponses 方法依次将 requestChannel 中 responseQueue 保存的 Response 取出,并将对应的SelectionKey.OP_WRITE事件注册到 selector 上。当 selector 的 select 方法返回时,对检测到的可写通道,调用 write 方法将 Response 返回给客户端。

KafkaRequestHandler 循环从 RequestChannel 中取 Request 并交给kafka.server.KafkaApis处理具体的业务逻辑。

LeaderAndIsrRequest 响应过程

对于收到的 LeaderAndIsrRequest,Broker 主要通过 ReplicaManager 的 becomeLeaderOrFollower 处理,流程如下:

  1. 若请求中 controllerEpoch 小于当前最新的 controllerEpoch,则直接返回 ErrorMapping.StaleControllerEpochCode。
  2. 对于请求中 partitionStateInfos 中的每一个元素,即((topic, partitionId), partitionStateInfo): 2.1 若 partitionStateInfo 中的 leader epoch 大于当前 ReplicManager 中存储的 (topic, partitionId) 对应的 partition 的 leader epoch,则:

2.1.1 若当前 brokerid(或者说 replica id)在 partitionStateInfo 中,则将该 partition 及 partitionStateInfo 存入一个名为 partitionState 的 HashMap 中

2.1.2 否则说明该 Broker 不在该 Partition 分配的 Replica list 中,将该信息记录于 log 中

2.2 否则将相应的 Error code(ErrorMapping.StaleLeaderEpochCode)存入 Response 中
3. 筛选出 partitionState 中 Leader 与当前 Broker ID 相等的所有记录存入 partitionsTobeLeader 中,其它记录存入 partitionsToBeFollower 中。
4. 若 partitionsTobeLeader 不为空,则对其执行 makeLeaders 方。
5. 若 partitionsToBeFollower 不为空,则对其执行 makeFollowers 方法。
6. 若 highwatermak 线程还未启动,则将其启动,并将 hwThreadInitialized 设为 true。
7. 关闭所有 Idle 状态的 Fetcher。

LeaderAndIsrRequest 处理过程如下图所示

Broker 启动过程

Broker 启动后首先根据其 ID 在 ZooKeeper 的/brokers/idszonde 下创建临时子节点( Ephemeral node ),创建成功后 Controller 的 ReplicaStateMachine 注册其上的 Broker Change Watch 会被 fire,从而通过回调 KafkaController.onBrokerStartup 方法完成以下步骤:

  1. 向所有新启动的 Broker 发送 UpdateMetadataRequest,其定义如下。
  2. 将新启动的 Broker 上的所有 Replica 设置为 OnlineReplica 状态,同时这些 Broker 会为这些 Partition 启动 high watermark 线程。
  3. 通过 partitionStateMachine 触发 OnlinePartitionStateChange。

Controller Failover

Controller 也需要 Failover。每个 Broker 都会在 Controller Path (/controller) 上注册一个 Watch。当前 Controller 失败时,对应的 Controller Path 会自动消失(因为它是 Ephemeral Node),此时该 Watch 被 fire,所有“活”着的 Broker 都会去竞选成为新的 Controller(创建新的 Controller Path),但是只会有一个竞选成功(这点由 ZooKeeper 保证)。竞选成功者即为新的 Leader,竞选失败者则重新在新的 Controller Path 上注册 Watch。因为 ZooKeeper 的 Watch 是一次性的,被 fire 一次之后即失效,所以需要重新注册。

Broker 成功竞选为新 Controller 后会触发 KafkaController.onControllerFailover 方法,并在该方法中完成如下操作:

  1. 读取并增加 Controller Epoch。
  2. 在 ReassignedPartitions Patch(/admin/reassign_partitions) 上注册 Watch。
  3. 在 PreferredReplicaElection Path(/admin/preferred_replica_election) 上注册 Watch。
  4. 通过 partitionStateMachine 在 Broker Topics Patch(/brokers/topics) 上注册 Watch。
  5. delete.topic.enable设置为 true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 Watch。
  6. 通过 replicaStateMachine 在 Broker Ids Patch(/brokers/ids) 上注册 Watch。
  7. 初始化 ControllerContext 对象,设置当前所有 Topic,“活”着的 Broker 列表,所有 Partition 的 Leader 及 ISR 等。
  8. 启动 replicaStateMachine 和 partitionStateMachine。
  9. 将 brokerState 状态设置为 RunningAsController。
  10. 将每个 Partition 的 Leadership 信息发送给所有“活”着的 Broker。
  11. auto.leader.rebalance.enable配置为 true(默认值是 true),则启动 partition-rebalance 线程。
  12. delete.topic.enable设置为 true 且 Delete Topic Patch(/admin/delete_topics) 中有值,则删除相应的 Topic。

Partition 重新分配

管理工具发出重新分配 Partition 请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发 ReassignedPartitionsIsrChangeListener,从而通过执行回调函数 KafkaController.onPartitionReassignment 来完成以下操作:

  1. 将 ZooKeeper 中的 AR(Current Assigned Replicas)更新为 OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新 ZooKeeper 中的 leader epoch,向 AR 中的每个 Replica 发送 LeaderAndIsrRequest。
  3. 将 RAR - OAR 中的 Replica 设置为 NewReplica 状态。
  4. 等待直到 RAR 中所有的 Replica 都与其 Leader 同步。
  5. 将 RAR 中所有的 Replica 都设置为 OnlineReplica 状态。
  6. 将 Cache 中的 AR 设置为 RAR。
  7. 若 Leader 不在 RAR 中,则从 RAR 中重新选举出一个新的 Leader 并发送 LeaderAndIsrRequest。若新的 Leader 不是从 RAR 中选举而出,则还要增加 ZooKeeper 中的 leader epoch。
  8. 将 OAR - RAR 中的所有 Replica 设置为 OfflineReplica 状态,该过程包含两部分。第一,将 ZooKeeper 上 ISR 中的 OAR - RAR 移除并向 Leader 发送 LeaderAndIsrRequest 从而通知这些 Replica 已经从 ISR 中移除;第二,向 OAR - RAR 中的 Replica 发送 StopReplicaRequest 从而停止不再分配给该 Partition 的 Replica。
  9. 将 OAR - RAR 中的所有 Replica 设置为 NonExistentReplica 状态从而将其从磁盘上删除。
  10. 将 ZooKeeper 中的 AR 设置为 RAR。
  11. 删除/admin/reassign_partition

注意:最后一步才将 ZooKeeper 中的 AR 更新,因为这是唯一一个持久存储 AR 的地方,如果 Controller 在这一步之前 crash,新的 Controller 仍然能够继续完成该过程。

以下是 Partition 重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition 重新分配过程中 ZooKeeper 中的 AR 和 Leader/ISR 路径如下

AR leader/isr Sttep {1,2,3} 1/{1,2,3} (initial state) {1,2,3,4,5,6} 1/{1,2,3} (step 2) {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4) {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7) {1,2,3,4,5,6} 4/{4,5,6} (step 8) {4,5,6} 4/{4,5,6} (step 10)### Follower 从 Leader Fetch 数据

Follower 通过向 Leader 发送 FetchRequest 获取消息,FetchRequest 结构如下

从 FetchRequest 的结构可以看出,每个 Fetch 请求都要指定最大等待时间和最小获取字节数,以及由 TopicAndPartition 和 PartitionFetchInfo 构成的 Map。实际上,Follower 从 Leader 数据和 Consumer 从 Broker Fetch 数据,都是通过 FetchRequest 请求完成,所以在 FetchRequest 结构中,其中一个字段是 clientID,并且其默认值是 ConsumerConfig.DefaultClientId。

Leader 收到 Fetch 请求后,Kafka 通过 KafkaApis.handleFetchRequest 响应该请求,响应过程如下:

  1. replicaManager 根据请求读出数据存入 dataRead 中。
  2. 如果该请求来自 Follower 则更新其相应的 LEO(log end offset)以及相应 Partition 的 High Watermark
  3. 根据 dataRead 算出可读消息长度(单位为字节)并存入 bytesReadable 中。
  4. 满足下面 4 个条件中的 1 个,则立即将相应的数据返回
  • Fetch 请求不希望等待,即 fetchRequest.macWait <= 0
  • Fetch 请求不要求一定能取到消息,即 fetchRequest.numPartitions <= 0,也即 requestInfo 为空
  • 有足够的数据可供返回,即 bytesReadable >= fetchRequest.minBytes
  • 读取数据时发生异常
  1. 若不满足以上 4 个条件,FetchRequest 将不会立即返回,并将该请求封装成 DelayedFetch。检查该 DeplayedFetch 是否满足,若满足则返回请求,否则将该请求加入 Watch 列表

Leader 通过以 FetchResponse 的形式将消息返回给 Follower,FetchResponse 结构如下

Replication 工具

Topic Tool

$KAFKA_HOME/bin/kafka-topics.sh,该工具可用于创建、删除、修改、查看某个 Topic,也可用于列出所有 Topic。另外,该工具还可修改某个 Topic 的以下配置。

复制代码
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes

Replica Verification Tool

$KAFKA_HOME/bin/kafka-replica-verification.sh,该工具用来验证所指定的一个或多个 Topic 下每个 Partition 对应的所有 Replica 是否都同步。可通过topic-white-list这一参数指定所需要验证的所有 Topic,支持正则表达式。

Preferred Replica Leader Election Tool

用途

有了 Replication 机制后,每个 Partition 可能有多个备份。某个 Partition 的 Replica 列表叫作 AR(Assigned Replicas),AR 中的第一个 Replica 即为“Preferred Replica”。创建一个新的 Topic 或者给已有 Topic 增加 Partition 时,Kafka 保证 Preferred Replica 被均匀分布到集群中的所有 Broker 上。理想情况下,Preferred Replica 会被选为 Leader。以上两点保证了所有 Partition 的 Leader 被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由 Leader 完成,若 Leader 分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为 Broker 的宕机而被打破,该工具就是用来帮助恢复 Leader 分配的平衡。

事实上,每个 Topic 从失败中恢复过来后,它默认会被设置为 Follower 角色,除非某个 Partition 的 Replica 全部宕机,而当前 Broker 是该 Partition 的 AR 中第一个恢复回来的 Replica。因此,某个 Partition 的 Leader(Preferred Replica)宕机并恢复后,它很可能不再是该 Partition 的 Leader,但仍然是 Preferred Replica。

原理

1. 在 ZooKeeper 上创建/admin/preferred_replica_election节点,并存入需要调整 Preferred Replica 的 Partition 信息。

2. Controller 一直 Watch 该节点,一旦该节点被创建,Controller 会收到通知,并获取该内容。

3. Controller 读取 Preferred Replica,如果发现该 Replica 当前并非是 Leader 并且它在该 Partition 的 ISR 中,Controller 向该 Replica 发送 LeaderAndIsrRequest,使该 Replica 成为 Leader。如果该 Replica 当前并非是 Leader,且不在 ISR 中,Controller 为了保证没有数据丢失,并不会将其设置为 Leader。

用法

复制代码
$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

在包含 8 个 Broker 的 Kafka 集群上,创建 1 个名为 topic1,replication-factor 为 3,Partition 数为 8 的 Topic,使用如下命令查看其 Partition/Replica 分布。

复制代码
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

查询结果如下图所示,从图中可以看到,Kafka 将所有 Replica 均匀分布到了整个集群,并且 Leader 也均匀分布。

手动停止部分 Broker,topic1 的 Partition/Replica 分布如下图所示。从图中可以看到,由于 Broker 1/2/4 都被停止,Partition 0 的 Leader 由原来的 1 变为 3,Partition 1 的 Leader 由原来的 2 变为 5,Partition 2 的 Leader 由原来的 3 变为 6,Partition 3 的 Leader 由原来的 4 变为 7。

再重新启动 ID 为 1 的 Broker,topic1 的 Partition/Replica 分布如下。可以看到,虽然 Broker 1 已经启动(Partition 0 和 Partition5 的 ISR 中有 1),但是 1 并不是任何一个 Parititon 的 Leader,而 Broker 5/6/7 都是 2 个 Partition 的 Leader,即 Leader 的分布不均衡——一个 Broker 最多是 2 个 Partition 的 Leader,而最少是 0 个 Partition 的 Leader。

运行该工具后,topic1 的 Partition/Replica 分布如下图所示。由图可见,除了 Partition 1 和 Partition 3 由于 Broker 2 和 Broker 4 还未启动,所以其 Leader 不是其 Preferred Repliac 外,其它所有 Partition 的 Leader 都是其 Preferred Replica。同时,与运行该工具前相比,Leader 的分配更均匀——一个 Broker 最多是 2 个 Parittion 的 Leader,最少是 1 个 Partition 的 Leader。

启动 Broker 2 和 Broker 4,Leader 分布与上一步相比并未变化,如下图所示。

再次运行该工具,所有 Partition 的 Leader 都由其 Preferred Replica 承担,Leader 分布更均匀——每个 Broker 承担 1 个 Partition 的 Leader 角色。

除了手动运行该工具使 Leader 分配均匀外,Kafka 还提供了自动平衡 Leader 分配的功能,该功能可通过将auto.leader.rebalance.enable设置为 true 开启,它将周期性检查 Leader 分配是否平衡,若不平衡度超过一定阈值则自动由 Controller 尝试将各 Partition 的 Leader 设置为其 Preferred Replica。检查周期由leader.imbalance.check.interval.seconds指定,不平衡度阈值由leader.imbalance.per.broker.percentage指定。

Kafka Reassign Partitions Tool

用途

该工具的设计目标与 Preferred Replica Leader Election Tool 有些类似,都旨在促进 Kafka 集群的负载均衡。不同的是,Preferred Replica Leader Election 只能在 Partition 的 AR 范围内调整其 Leader,使 Leader 分布均匀,而该工具还可以调整 Partition 的 AR。

Follower 需要从 Leader Fetch 数据以保持与 Leader 同步,所以仅仅保持 Leader 分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给 Kafka 集群扩容。向 Kafka 集群中增加 Broker 非常简单方便,但是对于已有的 Topic,并不会自动将其 Partition 迁移到新加入的 Broker 上,此时可用该工具达到此目的。某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的 Partition 重装分配到某些机器上,然后可以停止不需要的 Broker 从而实现节约资源的目的。

需要说明的是,该工具不仅可以调整 Partition 的 AR 位置,还可调整其 AR 数量,即改变该 Topic 的 replication factor。

原理

该工具只负责将所需信息存入 ZooKeeper 中相应节点,然后退出,不负责相关的具体操作,所有调整都由 Controller 完成。

1. 在 ZooKeeper 上创建/admin/reassign_partitions节点,并存入目标 Partition 列表及其对应的目标 AR 列表。

2. Controller 注册在/admin/reassign_partitions上的 Watch 被 fire,Controller 获取该列表。

3. 对列表中的所有 Partition,Controller 会做如下操作:

  • 启动RAR - AR中的 Replica,即新分配的 Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
  • 等待新的 Replica 与 Leader 同步
  • 如果 Leader 不在 RAR 中,从 RAR 中选出新的 Leader
  • 停止并删除AR - RAR中的 Replica,即不再需要的 Replica
  • 删除/admin/reassign_partitions节点

用法

该工具有三种使用模式

  • generate 模式,给定需要重新分配的 Topic,自动生成 reassign plan(并不执行)
  • execute 模式,根据指定的 reassign plan 重新分配 Partition
  • verify 模式,验证重新分配 Partition 是否成功

下面这个例子将使用该工具将 Topic 的所有 Partition 重新分配到 Broker 4/5/6/7 上,步骤如下:

1. 使用 generate 模式,生成 reassign plan

指定需要重新分配的 Topic ({“topics”:[{“topic”:“topic1”}],“version”:1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file /tmp/topics-to-move.json
--broker-list "4,5,6,7" --generate

结果如下图所示

2. 使用 execute 模式,执行 reassign plan

将上一步生成的 reassignment plan 存入/tmp/reassign-plan.json文件中,并执行

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --execute

此时,ZooKeeper 上/admin/reassign_partitions节点被创建,且其值与/tmp/reassign-plan.json文件的内容一致。

3. 使用 verify 模式,验证 reassign 是否完成

执行 verify 命令

复制代码
$KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--reassignment-json-file /tmp/reassign-plan.json --verify

结果如下所示,从图中可看出 topic1 的所有 Partititon 都根据 reassign plan 重新分配成功。

接下来用 Topic Tool 再次验证。

复制代码
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

结果如下图所示,从图中可看出 topic1 的所有 Partition 都被重新分配到 Broker 4/5/6/7,且每个 Partition 的 AR 与 reassign plan 一致。

需要说明的是,在使用 execute 之前,并不一定要使用 generate 模式自动生成 reassign plan,使用 generate 模式只是为了方便。事实上,某些场景下,generate 模式生成的 reassign plan 并不一定能满足需求,此时用户可以自己设置 reassign plan。

State Change Log Merge Tool

用途

该工具旨在从整个集群的 Broker 上收集状态改变日志,并生成一个集中的格式化的日志以帮助诊断状态改变相关的故障。每个 Broker 都会将其收到的状态改变相关的的指令存于名为state-change.log的日志文件中。某些情况下,Partition 的 Leader election 可能会出现问题,此时我们需要对整个集群的状态改变有个全局的了解从而诊断故障并解决问题。该工具将集群中相关的state-change.log日志按时间顺序合并,同时支持用户输入时间范围和目标 Topic 及 Partition 作为过滤条件,最终将格式化的结果输出。

用法

复制代码
bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
--logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
--topic topic1 --partitions 0,1,2,3,4,5,6,7

作者简介

郭俊(Jason),硕士,从事大数据平台研发工作,精通 Kafka 等分布式消息系统,Storm 等流式处理系统及数据库性能调优。

新浪微博:郭俊_Jason

微信: habren

公众号:大数据架构

个人博客: http://www.jasongj.com

下篇预告

下篇文章将详细介绍 Kafka High Level Consumer,Consumer Group,Consumer Group Rebalance 和 Simple Consumer,以及未来新版本中对 Kafka High Level Consumer 的重新设计——使用 Consumer Controller 解决 Split Brain 和 Herd 等问题。


感谢郭蕾对本文的策划和审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入 InfoQ 读者交流群)。

2015-06-11 00:0326116

评论

发布
暂无评论
发现更多内容

【报告回顾】精、稳、敏、融,步入人民金融时代

易观分析

金融

极客公园对话 Zilliz 星爵:大模型时代,需要新的「存储基建」

Zilliz

Milvus Zilliz AIGC 向量数据库 LLMs

疯狂 SQL 转换系列-SQL for Milvus

数由科技

sql Milvus moql

感受国产BI工具的理论,瓴羊Quick BI、Smartbi对比

对不起该用户已成仙‖

基于Sovit2D智慧养鸡组态大屏管理系统

2D3D前端可视化开发

物联网 组态软件 智慧农业 web组态 智慧养鸡

不要焦虑,要不断超越自己 | 社区征文

于仔学技术

认识自己 驱动力量 职场发展 学会思考 三周年征文

盘点漏洞种类和代码审计工具

北桥苏

代码审计 漏洞挖掘

轻松网站下载:SiteSucker mac汉化激活版

真大的脸盆

Mac Mac 软件 网站下载 下载网站工具 网站下载工具

DApp泰山众筹系统开发合约搭建

薇電13242772558

智能合约 dapp

TPM — 系统安全的基石

鼎道智联

安全

九步排查Gateway-Worker启动失败问题

北桥苏

php Gateway thinkphp workerman

Selenium 自动化测试如何优雅的解决图片验证码问题

QE_LAB

自动化测试 图片验证码 selenium 登录验证 测试技术

聊点技术 | 自适应AI,让Bonree ONE更智能

博睿数据

可观测性 智能运维 博睿数据 Bonree ONE ONE有引力

AI 时代的到来,普通前端如何生存下去? | 社区征文

程序员海军

三周年征文

ThinkPHP5.1无法记录SQL日志解决思路

北桥苏

php 日志级别 thinkphp

Thinkphp5.1允许uni-app的H5跨域请求接口解决方法

北桥苏

php uni-app 跨域 thinkphp

疯狂SQL转换系列- SQL for MongoDB

数由科技

sql mongodb dsl moql

ThinkPHP5中如何实现模板完全静态化

北桥苏

thinkphp 模板静态

如何使用Fiddler抓取APP接口和微信授权网页源代码

北桥苏

fiddler 网络抓包 抓包分析 抓包工具

ChatGPT 再遭禁用 | 人工智能时代下数据安全如何保障

BinTools图尔兹

人工智能 数据库 ChatGPT CloudQuery

高性能存储SIG月度动态:EROFS支持直接索引容器镜像tar包,io_uring将支持并优化NVMe直通

OpenAnolis小助手

操作系统 容器镜像 高性能存储 anck 龙蜥sig

蚂蚁实时低代码研发和流批一体的应用实践

Apache Flink

大数据 flink 实时计算

uni-app结合PHP实现单用户登陆

北桥苏

php uni-app 单点登录 thinkphp

【技术干货】PCB焊盘设计之问题详解

华秋PCB

工具 PCB PCB设计 焊盘 可焊性

3 步集成 Terraform + 极狐GitLab CI ,实现基础设施自动化管理

极狐GitLab

ci DevOps 基础设施 Terraform 极狐GitLab

MobTech MobPush|TCP通道和共享链路通道

MobTech袤博科技

TP5.0使用助手函数model出现\common\Model\类不存在

北桥苏

php thinkphp

ios打包ipa的四种实用方法(.app转.ipa)

雪奈椰子

高性能网络 SIG 月度动态:长期投入得到业界认可,新增一位 virtio reviewer

OpenAnolis小助手

高性能网络 龙蜥社区 virtio anck SIG动态

图片文字识别:揭开数字世界的神秘面纱

来自四九城儿

Kafka设计解析(三):Kafka High Availability (下)_语言 & 开发_郭俊_InfoQ精选文章