导引
KafkaBridge 封装了对 Kafka 集群的读写操作,接口极少,简单易用,稳定可靠,支持 c++/c、php、python、golang 等多种语言,并特别针对 php-fpm 场景中作了长连接复用的优化,已在 360 公司内部广泛使用。
前言
- 众所周知,Kafka 是近几年来大数据领域最流行的分布式流处理平台。它最初由 LinkedIn 公司开发, 已于 2010 年贡献给了 Apache 基金会并成为顶级开源项目, 本质上是一种低延时的、可扩展的、设计内在就是分布式的,分区的和可复制的消息系统 ;
- Kafka 在 360 公司内部也有相当广泛的使用,业务覆盖搜索,商业广告,IOT, 视频,安全, 游戏等几乎所有核心业务,每天的写入流量近 1.2PB, 读取流量近 2.4PB;
- Kafka 官方提供了 Java 版本的客户端 SDK, 但因 360 公司内部产品线众多,语言几乎囊括目前所有主流语言,所以我们研发了 Kafka 客户端 SDK —— KafkaBridge;
简介
- KafkaBridge 底层基于 librdkafka , 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的 Kafka 系统细节,只需调用极少量的接口,就可完成消息的生产和消费 ;
- 针对使用者比较关心的消息生产的可靠性,作了近一步的提升;
- 开源地址:[ https://github.com/Qihoo360/kafkabridge ]
特点
- 支持多种语言:c++/c、php、python、golang, 且各语言接口完全统一 ;
- 接口少,简单易用 ;
- 针对高级用户,支持通过配置文件调整所有的 librdkafka 的配置 ;
- 在非按 key 写入数据的情况下,尽最大努力将消息成功写入 ;
- 支持同步和异步两种数据写入方式 ;
- 在消费时,除默认自动提交 offset 外,允许用户通过配置手动提交 offset;
- 在 php-fpm 场景中,复用长连接生产消息,避免频繁创建断开连接的开销 ;
编译
- 编译依赖于 librdkafka , liblog4cplus , boost(仅依赖于若干个头文件) ;
- 对于 C++/C 使用 CMake 编译 ;
- 对于 Python, Php, Golang 使用 swig 编译 ;
- 每种语言都提供了自动编译脚本,方便使用者自行编译。
使用
1、数据写入
- 在非按 key 写入的情况下,sdk 尽最大努力提交每一条消息,只要 Kafka 集群存有一台 broker 正常,就会重试发送 ;
- 每次写入数据只需要调用 _produce_ 接口,在异步发送的场景下,通过返回值可以判断发送队列是否填满,发送队列可通过配置文件调整 ;
- 在同步发送的场景中,_produce_ 接口返回当前消息是否写入成功,但是写入性能会有所下降,CPU 使用率会有所上升, 推荐还是使用异步写入方式 ;
- 我们来简单看一下写入 kafka 所涉及到的所有接口:
// 初始化接口 bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic) // 写入数据接口 bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key) // 不再需要写入数据时,需要调用的清理接口,必须调用 void QbusProducer::uninit()
- 具体使用可以参考源码中的实例 ;
2、数据消费
- 消费只需调用 subscribeOne 订阅 topic(也支持同时订阅多个 topic),然后执行 start 就开始消费,当前进程非阻塞,每条消息通过 callback 接口回调给使用者 ;
- sdk 还支持用户手动提交 offset 方式,用户可以通过 callback 中返回的消息体,在代码其他逻辑中进行提交。
- 下面是消费接口,以 c++ 为例:
// 初始化接口 bool QbusConsumer::init(const string& string broker_list, const string& string log_path, const string& string config_path, QbusConsumerCallback& callback) // 订阅需要消费的消息 bool QbusConsumer::subscribeOne(const string& string group, const string& string topic) // 开始消费 bool QbusConsumer::start() // 停止消费 void QbusConsumer::stop()
性能测试
- kafka 集群三台 broker, 除测试用 topic 外,无其他 topic 的读写操作 ;
- 测试用 topic 有 3 个 partition;
- Producer 单实例,单线程 ;
- Topic 无复本下测试:
- 单条消息 100 byte, 发送 1 百万 条消息,耗时 1.7 秒 ;
- 单条消息 1024 byte, 发送 1 百万 条消息,耗时 13 秒 ;
- Topic 有 2 复本下测试:
- 单条消息 100 byte, 发送 1 百万 条消息,耗时 1.7 秒 ;
- 单条消息 1024 byte, 发送 1 百万 条消息,耗时 14 秒 ;
写在最后
- KafkaBridge 一直在 360 公司内部使用,现在已经开源,有疏漏之处,欢迎广大使用者批评指正,也欢迎更多的使用者加入到 KafkaBridge 的持续改进中。
- 开源地址: KafkaBridge
活动推荐:
2023年9月3-5日,「QCon全球软件开发大会·北京站」 将在北京•富力万丽酒店举办。此次大会以「启航·AIGC软件工程变革」为主题,策划了大前端融合提效、大模型应用落地、面向 AI 的存储、AIGC 浪潮下的研发效能提升、LLMOps、异构算力、微服务架构治理、业务安全技术、构建未来软件的编程语言、FinOps 等近30个精彩专题。咨询购票可联系票务经理 18514549229(微信同手机号)。
评论 1 条评论