写点什么

蘑菇街千亿级消息 Kafka 上云实践

  • 2021-01-13
  • 本文字数:5820 字

    阅读完需:约 19 分钟

蘑菇街千亿级消息Kafka上云实践

Apache Kafka 凭借其高吞吐、高可靠等特性在实时数据或流式数据架构中扮演着重要角色,受到了众多企业用户的青睐。但是随着云时代来临,公有云厂商纷纷推出消息队列服务,很多用户也逐渐从自建消息集群过渡到使用云上消息队列服务。本文将以蘑菇街 Kafka 服务迁移上云为例,阐述腾讯云消息队列 CKafka 如何对用户产生价值。


Apache Kafka  简介


Apache Kafka 官网用这样一句话描述最新版本的 Kafka:A distributed streaming platform。即分布式流式计算平台,并对其做了如下阐述:


Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.


译文:Kafka®用于构建实时数据管道和流式应用程序。它具有水平可伸缩性、容错性、超快速性,可在数千家公司中投入生产。


升级到 2.0+的 Kafka 给自身加了一层定义,即流计算平台。但是在企业级使用场景下,Kafka 还是被经常当作数据管道来使用,履行消息队列的基本职能。其典型的使用场景如下:  


  • 数据管道和系统解耦。

  • 异步处理和事件驱动。

  • 流量削峰。

  • 事务消息和分布式事务的最终一致性。



自建 Kafka 集群的痛点


由于 Kafka 的搭建方式简单方便,且其性能高效稳定,很多企业用户选择自建 Kafka 集群。但这样看似完美的可行方案背后却有一个隐型风险:当业务的消息数据量到达一定程度后,自建的消息队列集群就会引发各种各样的问题,那么如何解决问题呢?


我们都知道 Kafka 入门简单,进阶却有一定的门槛。解决问题的研发人员需要具备扎实的计算机功底(熟悉计算机网络、IO 等),并且对 Kafka 的底层原理、各种配置参数项等具有深刻理解,可以进行 Kafka 集群参数调优,快速处理突发故障、恢复集群抖动和动态进行集群扩缩容等。正因如此,引发了一些问题:企业一方面需要投入更多的人力、物力成本,另一方面需要时刻监控集群的健康状况,及时排除问题以保障业务的稳定运行。所以自建 Kafka 集群虽然简单,但需要承担日益加重的研发和运维成本。


蘑菇街上云背景


蘑菇街的业务场景和软件架构决定了它对 Kafka 有着强大的依赖,作为电商领域的佼佼者,其消息总量达到了日均千亿条,生产峰值带宽达每秒 GB 级别。其主要的业务场景为分布式大数据处理场景,如广告、交易、安全、离线处理等。


在意识到自建 Kafka 集群的痛点后,为了保证数据的安全性和集群的稳定性,蘑菇街选择使用云上消息队列服务 CKafka。CKafka 不仅支持多可用区容灾,还可以帮助客户实现冷热数据分离,解决频繁读取磁盘 IO 瓶颈,为业务的稳定运行提供良好的保障。接下来我们来分析阐述 CKafka 是如何做到可用区容灾和高性能的集群服务器 IO。


集群跨可用区容灾方案



在 Kafka 消息系统中,客户端感知服务端最核心的操作就是生产和消费。跨可用区容灾的目标是:当一个可用区发生故障(如火灾,断电等)时,能够做到客户端无感知的进行生产和消费,保证业务的稳定运行。而满足可用区容灾需要在技术层面解决如下问题(以上面示意图为例):


  • 分区副本的跨可用区分布,即保证每个分区的副本分布在不同可用区。比如,当集群跨上海二区和四区两个可用区时,分区有四个副本,则需要保证每个可用区都分布两个副本;

  • Kafka 强依赖 Apache Zookeeper,当 Zookeeper 不能正常提供服务时,Kafka 集群也会受到影响,故实现 kafka 的跨区容灾,也要实现 zookeeper 的跨区容灾。Apache Zookeeper 和 Kafka 一样,具有跨区容灾的能力。

  • Broker 节点的 IP 对客户端需要透明化。即客户端不能感知 Broker 的地址。这样当后端 Borker 故障,切换机器 IP 发生改变时,客户端无感知,依然可以正常运行。


解决上述问题需要下面 4 个技术方案。


1. 透明可漂移的 Broker 节点 IP


为什么 Broker 的节点 IP 和端口需要对用户端透明呢?我们先来看如下一段代码:

Properties props = new Properties(); props.put("bootstrap.servers", "192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++)     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
复制代码


这是一段最简单的 Kafka Produce 代码。192.168.10.10:9092、192.168.10.11:9092、192.168.10.12:9092 是三台真实 Kafka Broker 的 IP 和端口获取 Server 端 Metadata 信息,开始进行生产消息操作。我们来设想一下如下的情况:


当其中一台 192.168.10.10 机器故障无法恢复时,我们重新启动了另外一台 Borker,比如 192.168.10.13:9092 提供服务。此时就需要通知所有客户端,将 Kafka 地址从: "192.168.10.10:9092,192.168.10.11:9092,192.168.10.12:9092" 

修改为 "192.168.10.13:9092,192.168.10.11:9092,192.168.10.12:9092"。


若 IP 配置硬编码到客户端代码中,则需要修改代码,然后打包并发布。由于服务端调整而导致客户端修改配置、重启,这简直是灾难!那要怎么解决这个问题呢?


解决问题的思路就是: Virtual IP Address。如下图所示,我们会在每台提供的 Broker 之前挂载一个四层的 Virtual IP(VIP)和 Virtual Port(VPORT),用户通过访问 VIP 和 VPORT 来访问实际的 Broker 服务。如 10.0.0.1:9092 对应的是真正的 Broker 服务 192.169.10.10.9092。这样就达到了实际 Broker IP 对用户透明化的目的。



那什么是漂移呢?服务需要做到跨可用区容灾。即我们提供的 Virtual IP  Address 能够在可用区之间进行切换的,当该可用区故障,该 VIP 可以迅速切换漂移至另一个可用区,继续提供服务。那么该 VIP 应该是可以访问所有可用区的。如下图,当上海可用区 2 发生故障后,Virtual Ip Service 迅速自动切换到上海可用区 1 可用的 broker 实例,保证客户端的正常使用。



2. 分区副本的跨区分布


原生的 Kafka 按照同一个可用区的副本不能分配在同一台机器上的原则,进行副本随机分配。副本分布逻辑是无感知可用区。即当集群里面哪台 broker 有空闲的空间,就将副本分布在 Broker 上。则有可能将同一个 partiton 的分区分布在同一个分区。


如上面的跨可用区 Virual IP 切换示意图所示,当创建一个 3 个 Replication(副本)的 Partition 时,很有可能该 Partition 的 Replication 都落在了上海可用区 2。如果此时上海可用区 2 发生故障,那么该 Partition 就不能正常提供服务,直接影响业务。怎么解决这个问题呢?


CKafka 会在 broker 上添加可用区标记,当发现客户创建的主题是跨可用区主题时,会将同一个分区的副本分配在多个可用区,保证一个可用区故障时分区仍然有存活的副本。通过修改 Kafka 源码的分区分配逻辑,添加了可用区标记逻辑,根据需求将不同的 Replicatiton 分配到不同的 Broker 上。而这些 Broker 则属于不同的可用区。实现原理如下:


首先来看一下 Zookeeper 上的节点/broker/topics/test-topic 的内容,内容如下:


{"version":1,"partitions":{"0":[10840,10839],"1":[10838,10840],"2":[10839,10838]}}
复制代码


这段内容意思是:test-topic 这个主题有 0、1、2 三个分区,0 分区分布在 broker[10840,10839]上,1 分区分布在 broker[10838,10840],依次类推。所以,只需要修改该内容的生成逻辑就可以控制 Partiton 的分布,即可实现该逻辑。


3. Zookeeper 的跨区部署


被 Kafka 强依赖的 Zookeeper 组件,它也需要跨区部署保证其可用性。首先来看一下 Zookeeper 的选举策略:半数以上的节点都同意后才能当选 leader,如果是偶数节点可能导致票数相同的情况,会使 leader 选取失败,最终导致集群失效。另外当 Zookeeper 集群故障节点数超过半数时,Zookeeper 集群将无法正常工作。


由 Zookeeper 分布式一致性算法的特点,可以得出一个结论:假如每个 zone 部署一个 zk 节点,zk 要支持 n 区容灾(同时挂掉 n 个区的 zk 节点),需要部署 2n+1 个分区才能保证 Zookeeper 的分区可用。即在 n=1 的情况下,需要部署 3 个可用区,才能保证 zookeeper 集群的单可用区可用。


4. Broker 配置优化


根据设计方案,在不同的可用区部署 Broker 时,需要调整一些参数。这些参数保证了服务跨区容灾的最大可用性。需要修改如下三个配置:


unclean.leader.election.enable=truemin.insync.replicas=1offsets.topic.replication.factor=3
复制代码


这三个配置什么意思呢? 依次来看一下:


  • unclean.leader.election.enable


官方描述:Indicates whether to enable replicas not in the ISR set to be selected as leader as a last resort, even though doing so may result in data loss。


解释:该字段默认值为 False。默认情况下 leader 不能从非 ISR 的副本列表里选择;因为在非 ISR 副本列表里选择 leader,很有可能会导致部分数据丢失。既然这样,那为什么还要打开这个字段呢?因为在很异常情况下,比如 ISR 内的副本都不可用了,此时如果该字段设置为 False,服务会直接挂掉;如果该字段设为 True,即允许从非 ISR 列表中选择 leader,那么服务尽管有可能丢失数据,却依然可以继续使用。所以这个参数必须参考业务特性来决定是否打开。


  • min.insync.replicas


官方描述:When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful。


解释:该字段默认值为 1。上述英文翻译为:表示当在 acks=-1 时,最少有一个 Replica 进行确认回执,才确认数据写入成功。这个参数在集群搭建时,为了保证数据的完整性,经常会被改为 2。这里改为 1 的原因是:在只有一个副本在工作 、其他都挂掉的极端情况下,保证客户端能够正常提供服务。如果设置为 2,当只有一个副本在工作的时候,就会出现生产端一直生产失败的情况,会影响业务。


  • offsets.topic.replication.factor


官网描述:The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.


解释:该值默认为 1。表示 kafka 的内部 topic consumer_offsets 副本数。当该副本所在的 broker 宕机,consumer_offsets 只有一份副本,该分区宕机。使用该分区存储消费分组 offset 位置的消费者均会收到影响,offset 无法提交,从而导致生产者可以发送消息但消费者不可用。所以需要设置该字段的值大于 1。


集群 IO 压力优化方案


自建消息集群的用户常常会遇到一个问题:在流量峰值时,集群 IO 压力很大,用户只能通过扩容来暂时解决问题。但这毕竟是权宜之计,为了帮助用户真正解决该问题,腾讯云 CKafka 团队对客户服务器端的各项指标及业务场景进行了深入分析。我们发现集群的 IO 压力占比最大的是磁盘读压力。但是为什么磁盘读压力大呢?我们首先来看一下 Kafka 底层的磁盘存储设计原理。


1. Kafka 磁盘存储设计原理


Kafka 的磁盘存储设计可以用三个词来概括:磁盘顺序读写、Page Cache 和零拷贝。


  • 磁盘顺序读写:即 Kafka 数据的写入和读取是顺序的。而根据局部性原理,在实际测试中,磁盘顺序写入和随机写入的性能比相差最大可达 6000 倍。

  • Page Cache:它是 Kafka 能够实现顺序读写的关键技术。另外,它也是操作系统主要实现的一种磁盘缓存,用来减少磁盘的 I/O 操作。具体做法是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Page Cache 中的数据会按照一定的策略更新到磁盘。

  • 零拷贝:将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。这样做大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux 操作系统而言,零拷贝技术依赖于底层的 sendfile() 方法实现。对于 Java 语言,FileChannal.transferTo() 方法的底层实现也是 sendfile() 方法。


2. 为什么服务器读压力大?



从上面的存储原理图来分析:理论上集群的读压力不应该这么大,因为大部分的读压力应该命中 Page Cache,不应该再从磁盘里面读取。然而实际情况中确实存在大量的磁盘读取行为。经过分析,客户存在多个业务消费同一份消息的业务场景,根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。


  • 实时消费者:对数据实时性要求较高,需要采用实时消费消息的方式。在实时消费的场景下,Kafka 会利用系统的 page cache 缓存,生产消息到 broker,然后直接从内存转发给实时消费者,磁盘压力为零。通常称上述操作为热读,常见的业务场景有广告、推荐等。

  • 离线消费者:又名定时周期性消费者,消费的消息通常是数分钟前或是数小时前的消息。而这类消息通常存储在磁盘中,消费时会触发磁盘的 IO 操作。通常称其为冷读,适合报表计算、批量计算等周期性执行的业务场景。


在消息量非常大的情况下,实时和离线消费者同时消费一个集群,会导致两个问题:


  • 实时消费者受到离线消费者影响:由于离线消费者消费,导致落盘数据和实时数据会频繁的换入换出内存,直接影响实时业务的实时性,增加实时业务的响应时延;

  • 离线数据会导致繁重的磁盘 IO 操作:当离线任务读取的数据量非常大时,会触发磁盘的高 IO,磁盘的 IO util 甚至达到 100%,影响集群的稳定性。


3. 优化之道:冷热数据分离方案


针对用户集群中存在的数据冷读和热读并存问题,我们认为将集群的数据进行冷热数据分离是当前较优的解决方案。而在不改变生产端行为的情况下,怎么对冷热数据进行分离呢?腾讯云 CKafka 推出了基于开源 Kafka Connector 的数据同步服务来解决上述问题。架构图如下图所示:



broker 集群被拆分为实时集群和离线集群。两个集群分别负责同时引导离线业务消费离线集群。CKafka 在两个集群中间添加了 connector 集群。connector 集群将离线业务订阅的消息(按照主题维度同步)从实时集群同步到离线集群中,connector 集群实时进行数据同步,和实时消费者保持一致。这样操作不仅对磁盘 IO 没有影响,也不会对其他的实时消费者造成影响。


CKafka 对业务的价值


CKafka 提供高吞吐性能、高可扩展性的消息队列服务。在性能、扩展性、业务安全保障、运维等方面具有超强优势,让用户在享受低成本、超强功能的同时,免除繁琐运维工作。



头图:Unsplash

作者:张晓宇, 许文强

原文:https://mp.weixin.qq.com/s/89zOy63MjDfyJnLhY8CkUA

原文:蘑菇街千亿级消息 Kafka 上云实践

来源:腾讯云中间件 - 微信公众号 [ID:gh_6ea1bc2dd5fd]

转载:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2021-01-13 22:102416

评论

发布
暂无评论
发现更多内容

jmeter 执行python脚本

陈磊@Criss

PIP的报错Could not fetch URL https://pypi.org/

陈磊@Criss

pipreqs:生成python项目的requirements

陈磊@Criss

Nginx的容器部署

陈磊@Criss

国内程序员最容易发音错误的单词集合

程序员生活志

程序员 经验总结

Kafka实战宝典:如何跨机房传输数据

数据社

大数据 kafka 跨机房

聊聊微前端的原理和实践

vivo互联网技术

大前端

一文道尽“表驱动法”

架构精进之路

编码 表驱动法

Kafka实战宝典:一文带解决Kafka常见故障处理

数据社

kafka 监控

企业微信群消息机器人发送开源项目

陈磊@Criss

如何选择一个性能测试工具(LoadRunner和Locust的一次对比)

陈磊@Criss

Docker的Image

陈磊@Criss

告别下载速度慢!Docker配置阿里云镜像仓库

程序员的时光

Docker 阿里云

Docker的Image

陈磊@Criss

DockerFile 详解

陈磊@Criss

人人都可以掌握的正交试验设计测试用例方法

陈磊@Criss

Python的Twisted事件驱动的网络引擎框架

陈磊@Criss

华章25周年活动——《迁移学习》限量5折!

华章IT

国家央行数字货币的优势与挑战

CECBC

数字货币 央行 商业银行

该了解一波了!零基础入门Nginx

程序员的时光

nginx Docker

Git删除仓库中的文件和文件夹

陈磊@Criss

最受欢迎的男友职业排行榜Top10

程序员生活志

程序员

快速掌握的测试用例优先级划分方法

陈磊@Criss

Java的Override和Overload

陈磊@Criss

好玩又好用,一款轻松就可以实现音视频的Demo

anyRTC开发者

音视频 移动互联网 RTC anyRTC Demo

微信小程序的自动化测试框架

陈磊@Criss

Docker 容器连接

陈磊@Criss

Git使用教程:最详细、最傻瓜、最浅显、真正手把手教!

程序员生活志

git

优质单元测试的十大标准,你有遵循吗?

禅道项目管理

项目管理 单元测试 自动化测试

你还应该知道的哈希冲突解决策略

vivo互联网技术

哈希冲突

分布式定时任务调度框架实践

vivo互联网技术

大数据 分布式 框架

蘑菇街千亿级消息Kafka上云实践_语言 & 开发_腾讯云中间件_InfoQ精选文章