金山云创立于 2012 年,是中国前三的互联网云服务商,2020 年 5 月在美国纳斯达克上市,业务范围遍及全球多个国家和地区。成立 8 年以来,金山云始终坚持以客户为中心的服务理念,提供安全、可靠、稳定、高品质的云计算服务。
金山云日志服务是针对日志类数据处理的一站式服务系统,提供从日志采集、日志存储到日志检索分析、实时消费、日志投递等多项服务,支撑着多个业务线的日志查询和监控业务,提升金山云各个产品线的运维、运营效率,目前每天数据量级在 200 TB。
作为针对日志类数据处理的一站式服务系统,金山云日志服务需要具备以下特性:
数据采集:基于 Logstash 和 Filebeat 进行定制开发,支持更多数据采集形态。
数据查询:支持 SQL 和 ElasticSearch Query String 语法。
数据消费:基于 Pulsar 对外部 socket 进行封装,有些产品线(想在控制台展示日志滚动的场景)可以通过整个日志服务的 websocket 协议实现;也可以通过暴露的 REST API 查询全部日志数据(即作为队列来使用)。
异常告警:在控制台检索数据后,把数据以及检索语法保存为告警项,支持配置整体的告警策略及告警方式。检索到异常后,后台会通过启动相应的任务实现实时告警。
图表展示:将在控制台检索的语句和查询结果保存为图表(柱状图、折线图等),再次进入控制台时,点击仪表盘即可看到当前或之前保存过的所有查询语句和结果数据。
数据异构:可以自定义是否把日志投递到其他云端产品线中,比如将某几个日志的数据投递到对象存储中,从而实现一些其他操作(如把数据投递到 Hive 数仓,再进行分析)。
为什么选择 Pulsar
在调研过程中,我们从基础功能和可靠性两方面对比了 RocketMQ、Kafka 和 Pulsar,并总结了三者的优缺点(对比结果见下表)。
我们发现 Pulsar 非常适合应用于日志流处理。从 BookKeeper 层面来讲,Pulsar 就是日志流存储的组件。Pulsar 采用云原生架构,日志流服务同样采用云原生、无服务模式,所有服务都在云上实现。Pulsar 拥有诸多灵活的企业级特性,比如支持多租户、支持租户存储配额、数据 ETL、整体数据负载均衡策略等;支持传输大量数据;针对消息队列的监控比较完善等。下面我来详细介绍下我们选择 Pulsar 的一些特性。
计算与存储分离
Pulsar 的 producer 和 consumer 都与 broker 相连接,broker 作为无状态服务,可以横向扩缩容,扩缩容时不会影响数据的整体生产和消费;broker 不存储数据,数据存储在 broker 的下一层(即 bookie )中,实现了计算与存储的分离。
弹性水平扩缩容
对于云端产品而言,Pulsar 无需重平衡即可实现 broker 扩缩容。相比之下,Kafka 扩缩容前需要先进行重平衡操作,可能会导致集群负载较高,也会对整体服务产生影响。
其次,Pulsar topic 分区也可以实现无限扩容,扩容之后,通过负载均衡策略自动平衡整个分片和流量。
Pulsar 多租户
Pulsar 原生支持多租户。在日志服务中也有租户的概念,每一条产品线(即每一个项目)属于一个租户,实现了产品线之间的数据隔离。Pulsar 集群支持数百万个 topic(在雅虎已有实践),整个 topic 也通过租户实现了隔离,在租户级别,Pulsar 实现了存储配额、消息过期删除、隔离策略等优秀特性,且支持单独的认证和授权机制。
负载均衡策略
Pulsar 在命名空间级别有 bundle 的概念,如果当前 broker 负载较高,bundle 会通过管理 topic 分区策略进行 bundle split 操作,自动将子分区均衡到其他负载较低的 broker 上。在创建 topic 时,Pulsar 也会自动把 topic 优先分配到当前负载较低的 broker 上。
Pulsar IO 模型
写入操作中,broker 并发向 BookKeeper 写入数据;当 bookie 向 broker 反馈数据写入成功时,在 broker 层面,内部只维护一个队列。如果当前的消费模式是实时消费,则可以直接从 broker 获取数据,无需经过 bookie 查询,从而提升消息吞吐量。在追赶读场景中,查询历史数据才需要查询 bookie;追赶读还支持数据卸载功能,即将数据卸载到其他存储介质中(比如对象存储或 HDFS ),实现冷存历史数据。
Topic 创建、生产与消费
在控制台创建 topic 后,将 topic 信息和租户信息记录到 etcd 和 MySQL 中,然后图示右侧的两类服务会监听 etcd,一类是 producer 类服务,监听创建或删除 topic 后的内部操作。另一类是 consumer 类服务,当监听到创建新 topic 操作后,对应的服务会连接到 Pulsar topic,实时消费 topic 上的数据。然后 producer 开始接收数据,并判断应该向哪个 topic 写入数据,consumer 消费数据并在判断后写入,或转存再写入到其他 ES 或其他存储中。
图 1. Topic 创建、生产、消费流程
Topic 逻辑抽象
Pulsar 中有三个级别:topic、命名空间和租户。因为 Pulsar 目前不支持命名空间级别的正则消费模式,所以我们需要把整体概念往上提一层,减少后台 Flink 的任务量,实现整个项目级别的消费。也就是说,在日志服务中,topic 对应 Pulsar 逻辑上的分片,命名空间对应 Pulsar 逻辑上的 topic。通过这一改动,我们实现了两个功能,一是动态增加和减少分片数量,二是在后台启动的 Flink 任务可以消费单个项目级别的数据。
图 2. Pulsar topic 逻辑抽象图
消息订阅模型
Pulsar 提供四种消息订阅模型:
独占模式(Exclusive):当有多个 consumer 使用同一个订阅名称订阅 topic 时,只有一个 consumer 可以消费数据。
故障转移模式(Failover):当多个 consumer 通过同一个订阅名称订阅 Pulsar topic 时,如果某一个 consumer 出现故障或连接中断,Pulsar 会自动切换到下一个 consumer,实现单点消费。
共享模式(Shared):应用比较广泛的一个模型,如果启动多个 consumer,但只通过一个订阅者订阅 topic 信息,Pulsar 会通过轮询方式依次向 consumer 发送数据;如果某一个 consumer 宕机或连接中断,则消息会被轮询到下一个 consumer 中。LogHub 使用的就是共享订阅模型,整个 Hub 运行在容器中,可以根据整体负载或其他指标动态扩缩容消费端。
Key_Shared 消息订阅模式:通过 Key 哈希方式保持数据消费的一致性。
Broker 故障恢复
由于 broker 无状态,所以某一个 broker 宕机对整体的生产和消费没有任何影响,同时会有一个新 broker 担任 owner 角色,从 ZooKeeper 中获取 topic 元数据,并自动演进到新 owner 中,数据的存储层也不会发生变化。此外,无需拷贝 topic 内的数据,避免数据冗余。
Bookie 故障恢复
Bookie 层使用分片存储信息。由于 bookie 本身有多副本机制,当某个 bookie 出现故障时,系统会从其他 bookie 读取对应分片的信息,并进行重平衡,因此整个 bookie 的写入不会受到影响,保证整个 topic 的可用性。
Pulsar 在日志服务中的应用
日志服务系统的最底层是数据采集工具,我们基于开源的数据采集工具(如 Logstash、Flume、Beats)进行了定制化开发。数据存储中日志池是一个逻辑概念,对应于 Pulsar 中的 topic。日志服务系统的上层为查询分析和业务应用,查询分析指在日志服务的工作台进行检索分析,或通过 SQL 语法进行查询;业务应用指在控制台定制仪表盘和图表,实现实时告警等。查询分析和业务应用都支持数据转存,即把日志数据转存到存储介质或价格较低的存储设备中,如基于 KS3 的对象存储、ElasticSearch 集群或 Kafka。日志服务的产品功能概况如下图。
图 3. 日志服务的产品功能概况图
日志服务架构设计
我们根据日志服务的产品功能设计了日志服务的分层架构(如下图)。最下层为数据采集端,负责采集日志类文本数据、TP/TCP 协议数据、MySQL 中的日志数据等,我们自研的采集端开发工作仍在进行中。采集到的数据通过日志服务数据入口发送到对应的 Pulsar topic 中。我们将 Pulsar 集群应用于三大板块,一是通过 Flink-on-Pulsar 框架实现多维统计分析场景,因为有些业务线需要通过日志数据做多维度的聚合统计,产生指标结果类数据,再转存给业务线。二是将 Pulsar 集群应用于 LogHub(微服务化服务),主要消费 Pulsar topic 的数据,将数据直接写入到 ES,通过控制台即可查询整个日志流的数据,也可以做检索分析。三是在控制台上使用 Pulsar Functions 设置一些算子或 ETL 逻辑,后台通过 Pulsar Functions 模块做数据 ETL。我们采用 EalsticSearch 集群存储数据检索分析结果,KS3、KMR、KES 对应于我们内部的一些云端产品线,用于存储和计算。上层的数据输出部分可以分为两大模块,一是 Search API 模块,负责对外提供 API,通过调用 API 在控制台进行一些和日志紧密耦合的动作;二是 Control API 模块,支持在控制台进行管理类操作,比如创建 topic、调整 topic 的分区数量、检索告警等。
图 4. 日志服务的分层架构设计图
日志服务的通信设计
从日志服务的产品架构来讲,整个服务采用无状态运行模式,所以各类服务(特别是 producer 和 consumer 服务)通过 etcd 方式实现数据共享。也就是说,在控制台执行创建、更新、删除操作后,producer 和 consumer 就会感知到这些动作,从而进行相应的变化。此外,由于 producer 和 consumer 完全在容器中运行,且服务本身无状态,因此可以进行动态扩缩容。日志服务的通信设计图如下。
图 5. 日志服务的通信设计图
日志流处理架构
根据日志流处理的需求,我们设计了如下架构图。左边为数据采集端,采集端把数据发送给数据接收端(PutDatas),接收端再把数据发送给对应的 Pulsar topic。我们将 Pulsar 主要应用于三个场景。
在 Pulsar 上添加 Flink 程序,实现定制化的 ETL 多维分析、统计、聚合等操作。
在 LogHub 使用 Pulsar 消费和存储数据。从 Pulsar 消费数据后,把采集到的日志数据写入到 ElasticSearch 集群。
在 WebSocket 和 REST API 上使用 Pulsar。WebSocket 实现了在控制台查看实时滚动的日志,REST API 支持查询特定队列中的数据。同时我们通过 Pulsar Functions 实现了一些简单的 ETL 处理,将处理后的数据转存到业务线的存储介质中(比如转存到数仓、Kafka 或 ElasticSearch 集群)。
图 6. 日志流处理架构图
未来规划
在 Pulsar 的支撑下,金山云日志服务一直运行良好。我们期待日志服务可以支持更多功能,实现更多需求。在日志服务方面,我们的规划如下:
新增顺序消费能力(账单、审计场景可能需要顺序消费能力)。
合并和分裂分区。
实现完全容器化部署。目前日志服务的内部服务已经完成了容器化操作,下一步我们会集中实现所有 Pulsar 集群的容器化部署。
目前,日志服务支撑金山云内部产品线约 15 条(如下图),单条线上数据传输大概为 200 TB/天,topic 数量已超过 3 万个。当 AI 业务接入 Pulsar 以后,整体数据量和 topic 数量都会有大幅度提升。
图 7. 使用日志服务的产品线
在测试和使用过程中,我们对 Pulsar 有了更全面的了解,期待 Pulsar 可以支持更多特性,比如:
去除对 ZooKeeper 的依赖。目前由 ZooKeeper 维护 Pulsar 的所有元数据,压力较大;且 Pulsar 的 topic 数量强制依赖 ZooKeeper 集群。如果将 Pulsar 元数据信息存储在 bookie 中,即可实现 topic 数量的无限增加。
自动扩缩容分区。日志类数据有高峰和低谷,在高峰时,把当前 topic 的分区数量自动进行扩容,提升整体并发量;在低谷时,将分区数量进行缩容,减轻集群资源的压力。
提供命名空间级别的正则匹配。在后台的 Flink 任务中,不再监听命名空间级别的数据,降低 Flink 的后台任务量。
结语
作为下一代云原生分布式消息流平台,Apache Pulsar 有其独特的优势,非常适合我们的日志流处理场景。Pulsar 社区非常活跃,有问必答。我们在前期调研、后续测试和正式上线过程中,StreamNative 的小伙伴们给予了极大支持,帮助我们快速推进业务上线。
目前,金山云日志服务中有 3 万多个 Pulsar topic,每天处理约 200 TB 数据,支撑 15 条产品线。自上线以来,Pulsar 运行状态稳定,大大节省了我们的开发和运维成本。我们期待尽快实现 Pulsar 集群的容器化部署,也期待 Pulsar 可以去除对 ZooKeeper 的依赖,支持自动扩缩容分区。我们愿意和 Pulsar 社区的小伙伴们一起开发新功能,进一步加快 Pulsar 的发展。
作者简介
刘彬,金山云大数据高级开发工程师。
评论 1 条评论