上一篇文章《 Kafka 设计解析(五)- Kafka 性能测试方法及 Benchmark 报告》从测试角度说明了 Kafka 的性能。本文从宏观架构层面和具体实现层面分析了 Kafka 如何实现高性能。
宏观架构层面
利用 Partition 实现并行处理
Partition 提供并行处理的能力
Kafka 是一个 Pub-Sub 的消息系统,无论是发布还是订阅,都须指定 Topic。如《 Kafka 设计解析(一)- Kafka 背景及架构介绍》一文所述,Topic 只是一个逻辑的概念。每个 Topic 都包含一个或多个 Partition,不同 Partition 可位于不同节点。同时 Partition 在物理上对应一个本地文件夹,每个 Partition 包含一个或多个 Segment,每个 Segment 包含一个数据文件和一个与之对应的索引文件。在逻辑上,可以把一个 Partition 当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。
一方面,由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的 disk drive 上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
利用多磁盘的具体方法是,将不同磁盘 mount 到不同目录,然后在 server.properties 中,将 log.dirs 设置为多目录(用逗号分隔)。Kafka 会自动将所有 Partition 尽可能均匀分配到不同目录也即不同目录(也即不同 disk)上。
注:虽然物理上最小单位是 Segment,但 Kafka 并不提供同一 Partition 内不同 Segment 间的并行处理。因为对于写而言,每次只会写 Partition 内的一个 Segment,而对于读而言,也只会顺序读取同一 Partition 内的不同 Segment。
Partition 是最小并发粒度
如同《 Kafka 设计解析(四)- Kafka Consumer 设计解析》一文所述,多 Consumer 消费同一个 Topic 时,同一条消息只会被同一 Consumer Group 内的一个 Consumer 所消费。而数据并非按消息为单位分配,而是以 Partition 为单位分配,也即同一个 Partition 的数据只会被一个 Consumer 所消费(在不考虑 Rebalance 的前提下)。
如果 Consumer 的个数多于 Partition 的个数,那么会有部分 Consumer 无法消费该 Topic 的任何数据,也即当 Consumer 个数超过 Partition 后,增加 Consumer 并不能增加并行度。
简而言之,Partition 个数决定了可能的最大并行度。如下图所示,由于 Topic 2 只包含 3 个 Partition,故 group2 中的 Consumer 3、Consumer 4、Consumer 5 可分别消费 1 个 Partition 的数据,而 Consumer 6 消费不到 Topic 2 的任何数据。
(点击放大图像)
以Spark 消费Kafka 数据为例,如果所消费的Topic 的Partition 数为N,则有效的Spark 最大并行度也为N。即使将Spark 的Executor 数设置为N+M,最多也只有N 个Executor 可同时处理该Topic 的数据。
ISR 实现可用性与数据一致性的动态平衡
CAP 理论
CAP 理论是指,分布式系统中,一致性、可用性和分区容忍性最多只能同时满足两个。
一致性
- 通过某个节点的写操作结果对后面通过其它节点的读操作可见
- 如果更新数据后,并发访问情况下后续读操作可立即感知该更新,称为强一致性
- 如果允许之后部分或者全部感知不到该更新,称为弱一致性
- 若在之后的一段时间(通常该时间不固定)后,一定可以感知到该更新,称为最终一致性
可用性
- 任何一个没有发生故障的节点必须在有限的时间内返回合理的结果
分区容忍性
- 部分节点宕机或者无法与其它节点通信时,各分区间还可保持分布式系统的功能
一般而言,都要求保证分区容忍性。所以在 CAP 理论下,更多的是需要在可用性和一致性之间做权衡。
常用数据复制及一致性方案
Master-Slave
- RDBMS 的读写分离即为典型的 Master-Slave 方案
- 同步复制可保证强一致性但会影响可用性
- 异步复制可提供高可用性但会降低一致性
WNR
- 主要用于去中心化的分布式系统中。DynamoDB 与 Cassandra 即采用此方案或其变种
- N 代表总副本数,W 代表每次写操作要保证的最少写成功的副本数,R 代表每次读至少要读取的副本数
- 当 W+R>N 时,可保证每次读取的数据至少有一个副本拥有最新的数据
- 多个写操作的顺序难以保证,可能导致多副本间的写操作顺序不一致。Dynamo 通过向量时钟保证最终一致性
Paxos 及其变种
- Google 的 Chubby,Zookeeper 的原子广播协议(Zab),RAFT 等
基于 ISR 的数据复制方案
如《 Kafka High Availability(上)》一文所述,Kafka 的数据复制是以Partition 为单位的。而多个备份间的数据复制,通过Follower 向Leader 拉取数据完成。从一这点来讲,Kafka 的数据复制方案接近于上文所讲的Master-Slave 方案。不同的是,Kafka 既不是完全的同步复制,也不是完全的异步复制,而是基于ISR 的动态复制方案。
ISR,也即 In-sync Replica。每个 Partition 的 Leader 都会维护这样一个列表,该列表中,包含了所有与之同步的 Replica(包含 Leader 自己)。每次数据写入时,只有 ISR 中的所有 Replica 都复制完,Leader 才会将其置为 Commit,它才能被 Consumer 所消费。
这种方案,与同步复制非常接近。但不同的是,这个 ISR 是由 Leader 动态维护的。如果 Follower 不能紧“跟上”Leader,它将被 Leader 从 ISR 中移除,待它又重新“跟上”Leader 后,会被 Leader 再次加加 ISR 中。每次改变 ISR 后,Leader 都会将最新的 ISR 持久化到 Zookeeper 中。
至于如何判断某个 Follower 是否“跟上”Leader,不同版本的 Kafka 的策略稍微有些区别。
- 对于 0.8.* 版本,如果 Follower 在 replica.lag.time.max.ms 时间内未向 Leader 发送 Fetch 请求(也即数据复制请求),则 Leader 会将其从 ISR 中移除。如果某 Follower 持续向 Leader 发送 Fetch 请求,但是它与 Leader 的数据差距在 replica.lag.max.messages 以上,也会被 Leader 从 ISR 中移除。
- 从 0.9.0.0 版本开始,replica.lag.max.messages 被移除,故 Leader 不再考虑 Follower 落后的消息条数。另外,Leader 不仅会判断 Follower 是否在 replica.lag.time.max.ms 时间内向其发送 Fetch 请求,同时还会考虑 Follower 是否在该时间内与之保持同步。
- 0.10.* 版本的策略与 0.9.* 版一致
对于 0.8.* 版本的 replica.lag.max.messages 参数,很多读者曾留言提问,既然只有 ISR 中的所有 Replica 复制完后的消息才被认为 Commit,那为何会出现 Follower 与 Leader 差距过大的情况。原因在于,Leader 并不需要等到前一条消息被 Commit 才接收后一条消息。事实上,Leader 可以按顺序接收大量消息,最新的一条消息的 Offset 被记为 High Wartermark。而只有被 ISR 中所有 Follower 都复制过去的消息才会被 Commit,Consumer 只能消费被 Commit 的消息。由于 Follower 的复制是严格按顺序的,所以被 Commit 的消息之前的消息肯定也已经被 Commit 过。换句话说,High Watermark 标记的是 Leader 所保存的最新消息的 offset,而 Commit Offset 标记的是最新的可被消费的(已同步到 ISR 中的 Follower)消息。而 Leader 对数据的接收与 Follower 对数据的复制是异步进行的,因此会出现 Commit Offset 与 High Watermark 存在一定差距的情况。0.8.* 版本中 replica.lag.max.messages 限定了 Leader 允许的该差距的最大值。
Kafka 基于 ISR 的数据复制方案原理如下图所示。
(点击放大图像)
如上图所示,在第一步中,Leader A 总共收到 3 条消息,故其 high watermark 为 3,但由于 ISR 中的 Follower 只同步了第 1 条消息(m1),故只有 m1 被 Commit,也即只有 m1 可被 Consumer 消费。此时 Follower B 与 Leader A 的差距是 1,而 Follower C 与 Leader A 的差距是 2,均未超过默认的 replica.lag.max.messages,故得以保留在 ISR 中。在第二步中,由于旧的 Leader A 宕机,新的 Leader B 在 replica.lag.time.max.ms 时间内未收到来自 A 的 Fetch 请求,故将 A 从 ISR 中移除,此时 ISR={B,C}。同时,由于此时新的 Leader B 中只有 2 条消息,并未包含 m3(m3 从未被任何 Leader 所 Commit),所以 m3 无法被 Consumer 消费。第四步中,Follower A 恢复正常,它先将宕机前未 Commit 的所有消息全部删除,然后从最后 Commit 过的消息的下一条消息开始追赶新的 Leader B,直到它“赶上”新的 Leader,才被重新加入新的 ISR 中。
使用 ISR 方案的原因
- 由于 Leader 可移除不能及时与之同步的 Follower,故与同步复制相比可避免最慢的 Follower 拖慢整体速度,也即 ISR 提高了系统可用性。
- ISR 中的所有 Follower 都包含了所有 Commit 过的消息,而只有 Commit 过的消息才会被 Consumer 消费,故从 Consumer 的角度而言,ISR 中的所有 Replica 都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。
- ISR 可动态调整,极限情况下,可以只包含 Leader,极大提高了可容忍的宕机的 Follower 的数量。与 Majority Quorum 方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。
ISR 相关配置说明
- Broker 的 min.insync.replicas 参数指定了 Broker 所要求的 ISR 最小长度,默认值为 1。也即极限情况下 ISR 可以只包含 Leader。但此时如果 Leader 宕机,则该 Partition 不可用,可用性得不到保证。
- 只有被 ISR 中所有 Replica 同步的消息才被 Commit,但 Producer 发布数据时,Leader 并不需要 ISR 中的所有 Replica 同步该数据才确认收到数据。Producer 可以通过 acks 参数指定最少需要多少个 Replica 确认收到该消息才视为该消息发送成功。acks 的默认值是 1,即 Leader 收到该消息后立即告诉 Producer 收到该消息,此时如果在 ISR 中的消息复制完该消息前 Leader 宕机,那该条消息会丢失。而如果将该值设置为 0,则 Producer 发送完数据后,立即认为该数据发送成功,不作任何等待,而实际上该数据可能发送失败,并且 Producer 的 Retry 机制将不生效。更推荐的做法是,将 acks 设置为 all 或者 -1,此时只有 ISR 中的所有 Replica 都收到该数据(也即该消息被 Commit),Leader 才会告诉 Producer 该消息发送成功,从而保证不会有未知的数据丢失。
具体实现层面
高效使用磁盘
顺序写磁盘
根据《一些场景下顺序写磁盘快于随机写内存》所述,将写磁盘的过程变为顺序写,可极大提高对磁盘的利用率。
Kafka 的整个设计中,Partition 相当于一个非常长的数组,而 Broker 接收到的所有消息顺序写入这个大数组中。同时 Consumer 通过 Offset 顺序消费这些数据,并且不删除已经消费的数据,从而避免了随机写磁盘的过程。
由于磁盘有限,不可能保存所有数据,实际上作为消息系统 Kafka 也没必要保存所有数据,需要删除旧的数据。而这个删除过程,并非通过使用“读 - 写”模式去修改文件,而是将 Partition 分为多个 Segment,每个 Segment 对应一个物理文件,通过删除整个文件的方式去删除 Partition 内的数据。这种方式清除旧数据的方式,也避免了对文件的随机写操作。
通过如下代码可知,Kafka 删除 Segment 的方式,是直接删除 Segment 对应的整个 log 文件和整个 index 文件而非删除文件中的部分内容。
/** * Delete this log segment from the filesystem. * * @throws KafkaStorageException if the delete fails. */ def delete() { val deletedLog = log.delete() val deletedIndex = index.delete() val deletedTimeIndex = timeIndex.delete() if(!deletedLog && log.file.exists) throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") if(!deletedIndex && index.file.exists) throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") if(!deletedTimeIndex && timeIndex.file.exists) throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.") }
充分利用 Page Cache
使用 Page Cache 的好处如下
- I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能
- I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
- 充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担
- 读操作可直接在 Page Cache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 Page Cache)交换数据
- 如果进程重启,JVM 内的 Cache 会失效,但 Page Cache 仍然可用
Broker 收到数据后,写磁盘时只是将数据写入 Page Cache,并不保证数据一定完全写入磁盘。从这一点看,可能会造成机器宕机时,Page Cache 内的数据未写入磁盘从而造成数据丢失。但是这种丢失只发生在机器断电等造成操作系统不工作的场景,而这种场景完全可以由 Kafka 层面的 Replication 机制去解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘,反而会降低性能。也正因如此,Kafka 虽然提供了 flush.messages 和 flush.ms 两个参数将 Page Cache 中的数据强制 Flush 到磁盘,但是 Kafka 并不建议使用。
如果数据消费速度与生产速度相当,甚至不需要通过物理磁盘交换数据,而是直接通过 Page Cache 交换数据。同时,Follower 从 Leader Fetch 数据时,也可通过 Page Cache 完成。下图为某 Partition 的 Leader 节点的网络 / 磁盘读写信息。
(点击放大图像)
从上图可以看到,该Broker 每秒通过网络从Producer 接收约35MB 数据,虽然有Follower 从该Broker Fetch 数据,但是该Broker 基本无读磁盘。这是因为该Broker 直接从Page Cache 中将数据取出返回给了Follower。
支持多Disk Drive
Broker 的 log.dirs 配置项,允许配置多个文件夹。如果机器上有多个 Disk Drive,可将不同的 Disk 挂载到不同的目录,然后将这些目录都配置到 log.dirs 里。Kafka 会尽可能将不同的 Partition 分配到不同的目录,也即不同的 Disk 上,从而充分利用了多 Disk 的优势。
零拷贝
Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程。这一过程的性能直接影响 Kafka 的整体吞吐量。
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过 Socket 将内存中的数据发送出去。
buffer = File.read Socket.send(buffer)
这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝),然后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝),接着用户程序通过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝),最后通过 DMA 拷贝将数据拷贝到 NIC Buffer。同时,还伴随着四次上下文切换,如下图所示。
(点击放大图像)
sendfile 和 transferTo 实现零拷贝
而 Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。数据通过 DMA 拷贝到内核态 Buffer 后,直接通过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件 - 网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示。
(点击放大图像)
从具体实现来看,Kafka 的数据传输通过TransportLayer 来完成,其子类PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝,如下所示。
@Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel); }
注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供 sendfile 这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。
减少网络开销
批处理
批处理是一种常用的用于提高 I/O 性能的方式。对 Kafka 而言,批处理既减少了网络传输的 Overhead,又提高了写磁盘的效率。
Kafka 0.8.1 及以前的 Producer 区分同步 Producer 和异步 Producer。同步 Producer 的 send 方法主要分两种形式。一种是接受一个 KeyedMessage 作为参数,一次发送一条消息。另一种是接受一批 KeyedMessage 作为参数,一次性发送多条消息。而对于异步发送而言,无论是使用哪个 send 方法,实现上都不会立即将消息发送给 Broker,而是先存到内部的队列中,直到消息条数达到阈值或者达到指定的 Timeout 才真正的将消息发送出去,从而实现了消息的批量发送。
Kafka 0.8.2 开始支持新的 Producer API,将同步 Producer 和异步 Producer 结合。虽然从 send 接口来看,一次只能发送一个 ProducerRecord,而不能像之前版本的 send 方法一样接受消息列表,但是 send 方法并非立即将消息发送出去,而是通过 batch.size 和 linger.ms 控制实际发送频率,从而实现批量发送。
由于每次网络传输,除了传输消息本身以外,还要传输非常多的网络协议本身的一些内容(称为 Overhead),所以将多条消息合并到一起传输,可有效减少网络传输的 Overhead,进而提高了传输效率。
从零拷贝章节的图中可以看到,虽然Broker 持续从网络接收数据,但是写磁盘并非每秒都在发生,而是间隔一段时间写一次磁盘,并且每次写磁盘的数据量都非常大(最高达到718MB/S)。
数据压缩降低网络负载
Kafka 从 0.7 开始,即支持将数据压缩后再传输给 Broker。除了可以将每条消息单独压缩然后传输外,Kafka 还支持在批量发送时,将整个 Batch 的消息一起压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个 Batch 的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。
Broker 接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch 到数据后再解压缩。因此 Kafka 的压缩不仅减少了 Producer 到 Broker 的网络传输负载,同时也降低了 Broker 磁盘操作的负载,也降低了 Consumer 与 Broker 间的网络传输量,从而极大得提高了传输效率,提高了吞吐量。
高效的序列化方式
Kafka 消息的 Key 和 Payload(或者说 Value)的类型可自定义,只需同时提供相应的序列化器和反序列化器即可。因此用户可以通过使用快速且紧凑的序列化 - 反序列化方式(如 Avro,Protocal Buffer)来减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。
作者简介
郭俊(Jason),从事大数据平台研发工作,精通 Kafka 等分布式消息系统,Hadoop/Storm/Spark 等大数据系统及数据仓库建模和性能调优。
感谢杜小芳对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ , @丁晓昀),微信(微信号: InfoQChina )关注我们。
评论