最近,StreamSQL 在 Kafka 上构建了平台,但几经实践,研发团队认为 Kafka 并非最合适的工具。本文将阐述该团队用 Kafka 构建的解决方案,以及决定迁移到 Pulsar 的原因。
在今天,Apache Kafka 几乎是事件流的同义词。事件流是我们平台的核心部分,最近,我们将 Kafka 替换成了 Pulsar。我们已经与客户当面谈过这个问题,也在会议上讨论过。前不久,Apache Pulsar 社区的一个朋友建议我写一篇文章,分享我们的经验和转换的原因。
我们在 Kafka 上构建了平台,并且发现自己已经写了大量的代码来使系统按照我们的意愿来运行。我们认为,Kafka 并非适合这项工作的工具。这一结论显然对于很多用例来说,并不都正确,但即使这样,使用 Kafka 而不是 Pulsar 仍然是有意义的。我将在本文阐述我们用 Kafka 构建的解决方案,以及讲述决定迁移到 Pulsar 的原因。
问题陈述
StreamSQL 是什么?
StreamSQL 是一个围绕事件源构建的数据存储系统。StreamSQL 由三个组件组成:事件存储、转换和物化状态。事件存储是发送到我们系统的每个域事件不可篡改的账本。我们使用类似于 Cassandra、Redis 和 CockroachDB 的 API 为物化状态提供服务。Transformation (转换)是将事件映射到状态的纯函数。根据 Transformation,我们接收到的每个事件都被处理并应用到物化状态。
StreamSQL 在所有数据中追溯性地运行新的 Transformation。最终状态是整个事件流的真实物化。此外,你还可以通过回滚和回放事件来生成一个“虚拟”状态。虚拟状态可用于训练和验证机器学习模型,也可以用于调试目的(如用于前端开发的 Redux)。
要求
系统需要能够执行以下操作:
将每个域事件永久存储在系统中;
通过保证对每个传入的事件只处理一次,以保持物化状态的一致性;
能够按照我们接收到的相同顺序对所有历史事件进行 Transformation;
回滚并回放事件账本,并在该点物化视图。
最初的基于 Kafka 解决方案
最初的基于 Kafka 解决方案,由一组拼接在一起的大数据工具组成。系统将过去的事件存储在 S3 中,并用 Spark 对它们进行处理。对于流数据,它使用 Kafka 和 Flink。要保持事件和物化视图的一致性,需要在每个系统之间进行复杂的协调。
无限存储每个域事件
每个域事件都将通过 Kafka 进入系统,然后 Kafka 会将其保存到 S3 中。这就使得我们能够存储大量很少使用的数据,并且具有高持久性和低成本。
我们曾尝试在流上使用 Kafka 的无限保留,但是发现它不仅成本昂贵,而且难以维护。在更大的主题(Topic)上,我们开始看到性能下降和不稳定的延迟的现象。由于此时我们已经几乎完全迁移到 Pulsar 上,因此没有进一步研究原因。
从批数据引导物化视图
我们通过按顺序处理每个事件来物化视图。我们使用 Spark 来处理存储在 S3 的大部分历史数据。如果我们可以在这种情况发生时将事件暂停,事情就会变得简单了。在这种情况下,我们可以读取所有 S3 数据,然后切换到处理主题开头的 Kafka。实际上,从 Kafka 持久化到 S3 的事件之间有一个延迟,在将大型批集群交换为较小的流处理集群之间还存在另一个延迟。由于我们不能错过任何事件的处理,因此,我们使用 Spark 在 S3 中处理尽可能多的事件,然后让它返回最后一个事件的 ID。由于我们已经将 Kafka 配置为保留最近几周的数据,所以我们可以将 Kafka 的其余事件回填。
从 Kafka 回填
Spark 能够处理过去的大多数事件,但它并不能让我们了解最新的状态。为了处理最后一组过去的事件,我们已经配置了 Kafka 集群,以保留最后两周确认的事件。我们运行一个 Flink 作业来继续 Spark 启动的 SQL Transformation。我们将 Flink 指向 Kafka 中的第一个事件,并让它通读一遍,什么也不做,直到它到达 Spark 停止的 messageID 为止。从那时起,它将继续更新物化视图,直到它到达流的头部。最后,它通知 Transformermation API,物化视图是最新的,可以使用。
更新传入事件
一旦启动物化视图,StreamSQL 就必须保持物化视图是最新的。在这一点上,这个问题是微不足道的。Kafka 将每个传入事件直接传递给 Flink,然后 Flink 执行必要的更新。此时,Transformermation API 和 Spark 处于空闲状态。但是,我们仍然将每个传入的事件保存在 S3 中,以防用户更新或创建 Transformation。
多租户、回滚和回放、错误处理等
我们协调 Flink 和 Kafka 一起工作,保存物化视图的快照。通过适当的协调,我们可以实现无缝的回滚和回放功能。要对这一过程进行阐述,需要专门写一篇博文(我们希望在不久的将来会撰写)。
在本文中,我们也不会讨论如何扩展 Flink 和 Kafka 集群、如何处理服务故障,或者如何在所有这些不同的服务之间实现安全的多租户(提示:每个解决方案都有不同的答案)。
为什么是 Pulsar?
Pulsar 的构建是为了永久存储事件,而不是在系统之间传输事件。此外,Pulsar 是在 Yahoo! 为在全球范围内开发各种产品的团队服务之上构建的。它本身就支持地理分布和多租户。Pulsar 执行复杂的部署变得很容易,如为某些租户保留专用服务器。我们尽可能利用这些特性。这使得我们可以将大部分的自定义逻辑交给 Pulsar 去处理。
S3 中的分层存储
StreamSQL 用户可以随时创建新的物化视图。这些视图必须是所有事件的投影。因此每个 Transformation 都按顺序处理每个历史事件。在基于 Kafka 的解决方案中,我们将所有已确认的事件流传输到 S3 或 GCS。然后,Spark 中的批处理管道处理这些事件。整个系统需要我们协调事件流、批处理存储、批处理计算、流计算和状态存储。在现实世界中,协调这些系统很容易出错,而且成本高昂,且难以自动化。
如果我们可以将事件存储配置为永久保存事件,那么就可以将批处理和流管道合并在一起。Pulsar 和 Kafka 都可以这么做,但是,Kafka 没有分层存储。这意味着所有事件都必须保存在 Kafka 节点的磁盘上。事件账本单调地增加,因此我们必须不断地添加存储空间。我们很少读取大多数历史事件,因此,我们昂贵的磁盘存储大部分都处于休眠状态。
另一方面,Apache Pulsar 具有内置的分层存储。Pulsar 将每个事件日志分写成段,并将不活动的段卸载(offloads)到 S3。这意味着只需对 Kafka 进行简单的配置更改,即可获得无限的、廉价的存储空间。我们无需不断地扩大集群的规模,就可以合并批处理和流管道。
我们可以将 Pulsar 配置为在主题达到特定大小时卸载事件,也可以手动运行。这使我们能够灵活地设置正确的卸载策略来平衡成本和速度。我们正在构建机器学习模型,以使我们的卸载策略适合每个主题的特定需求。
独立的计算和存储扩展
我们的事件数量和使用模式在一天之中和不同的用户之间变化很大。每个用户的不同使用模式会导致存储量或计算使用量的增加。幸运的是,Pulsar 将它的 Broker 从存储层分开了。
Pulsar 可以执行三种不同的操作:Tail write(尾部写入)、Tail read(尾部读取)和 Historical read(历史读取)。和 Kafka 一样,Pulsar 的写操作总是走到最后。对于 Pulsar 来说,一次写操作有三个步骤:首先,Broker 接收请求,然后 Broker 将其写入到 Bookkeeper,最后,它将请求缓存以供后续 Tail read。这意味着 Tail read 非常快,根本不会触及存储层。相比之下,Historical read 对存储层的影响就非常大。
对于 Kafka 和 Pulsar 来说,增加存储节点相对容易,但这是一项非常昂贵的操作。必须对数据进行洗牌(Shuffle)和复制,才能正确地平衡存储节点。在 Kafka 的情况下,Broker 和存储位于同一个节点上,因此任何扩展操作都很昂贵。与之相比,在 Pulsar,Broker 是无状态的,而且很容易进行扩展,成本也低廉。这意味着 Tail read 并不会带来严重的规模问题。我们可以根据当前 Historical read 和 Tail read 的使用模式来调整集群。
内置多租户
Pulsar 在构建的时候,就内置了多租户。在 Yahoo!,许多在不同地域分布的、从事不同产品工作的团队共享同一个 Pulsar 集群。该系统必须跟踪各种预算和各种服务级别协议。它有一个特性及,允许我们在同一个 Pulsar 集群上运行所有用户,同时保持性能、可靠性和安全性。
每个 Pulsar 主题都属于一个命名空间(Namespace),而每个命名空间都属于一个租户。每个 StreamSQL 账户都映射到一个租户。租户彼此安全隔离。用户不可能接触到其他用户的数据流。
从性能的角度来看,命名空间提供了关于隔离的其他有趣的动态。我们可以将用户的命名空间隔离到一组特定的 Broker 和存储节点。这限制了单个用户对整个系统的影响。同时,我们可以在 Broker 上设置自动减载,这样单个客户机中的峰值可以被更大的系统吸收。
积极响应的社区
Pulsar 社区的 Slack 频道真的太棒了!我的大多数问题几乎都能立即得到解答,还有一些聚会和 Pulsar 峰会,以及面对面的学习和网络课程。我们知道,哪怕在最糟糕的情况下,我们也可以联系相关人员,即使是最小众的问题也能得到帮助。社区给了我们继续前进的信心。
基于 Pulsar 的解决方案
无限存储每个域事件
Pulsar 允许我们将整个不可篡改的账本存储在 Pulsar 主题中。我们将其视为全部都在 Pulsar 中,但是,事情的幕后是 Pulsar 将事件卸载到 S3 中了。我们从使用事件账本中得到了简单性的好处,以及将事件放入 S3 的成本和维护优势。这一切都比我们的 Kafka 系统表现得更好,而不需要我们保持任何复杂性。
从批数据引导物化视图
Pulsar 架构融合了我们的流和批处理能力。这使我们能够删除 Spark 以及 Spark 和 Flink 之间的所有协调代码。Pulsar->Flink 连接器在批处理模式和流处理模式之间进行无缝切换。该架构的简单性消除了基于 Kafka 的版本中存在的大量边缘情况、错误处理和维护成本。
更新传入事件
我们编写一个作业来同时处理批数据和流数据。在我们没有任何协调的情况下,Flink 只维护一次处理,并在批处理模式和流模式之间进行切换。
Pulsar 的缺点
集成方式
Pulsar 的历史几乎与 Kafka 一样长,并在 Yahoo!的生产环境中得到了验证。我们认为,Pulsar 的核心是稳定可靠的。集成是另外一个问题。有关集成问题的列表是永远写不完的。在大多数情况下,Pulsar 社区构建并维护其集成。例如,我们希望将 S3 设置为接收器,并了解到不存在任何开源版本的连接器。我们构建了自己的开源解决方案,以推动社区向前发展,但我们希望在未来能够找到缺失的集成。
鉴于到目前为止,Pulsar 远没有 Kafka 那么受欢迎,因此大部分 Pulsar 集成都是在 Pulsar 厂里中构建并维护的。例如,我们使用的 Flink 连接器在 Pulsar 仓库中,但也有一个开放的 Apache Flink 票证,可以在它们那边构建一个。除非 Pulsar 能够成为主流,否则缺失的集成还会继续存在。
缺乏公共案例研究
几乎所有的 Pulsar 内容都是由托管的 Pulsar 提供商发布的,比如 Streamlio(由 Splunk 提供)、Stream Native 和 Kafkaesque 等。在大规模生产中使用 Pulsar,却与 Pulsar 没有商业联系的公司进行 Pulsar 案例研究少之又少。有很多大公司在生产中使用 Pulsar,但他们却很少分享经验。如果有公共案例研究的话,我们就可以找到窍门,避开陷阱,而不需要重新发明轮子。
相比之下,有关 Kafka 的案例研究比比皆是。Kafka 是最著名的事件流平台,而且还在继续积攒人气,所以大多数撰写数据平台的公司都会深入讲述他们是如何使用 Kafka 的。
基础设施的责任
我们的 Pulsar 部署需要一个用于元数据的 Zookeeper 集群、一个用于存储的 Bookkeeper 集群、一个 Broker 集群和一个 Proxy 集群。即使使用 AWS 和 Google Cloud 服务,这也是一个很大的维护责任。单是 Pulsar 就有大量的配置可能性,但是,当你查看底层时,它可能需要多个专业工程师来维护和优化。
下一步是什么
Pulsar 函数
目前,我们使用 Flink 处理流事件并更新物化视图。Flink 不允许向集群添加新节点。相反,我们必须保存一个检查点并以更大的规模重新启动集群。通常,Pulsar 函数是在一个单独的计算集群中运行的,我们可以动态调整它的规模。
Flink 的处理引擎更具表现力,更强大,但扩展起来要复杂得多。Pulsar 很容易扩展,但限制更多。我们将很快就能够对 Transformation 进行分类,并决定在哪里运行,倾向于使用 Pulsar 函数。
流的有向无环图
StreamSQL 目前并不允许 Transformation 使用物化视图作为状态。我们正在把这个系统建模成一个有向无环图(Directed Acyclic Graph,DAG),就像 Airflow 一样。与 Airflow 不同的是,依赖关系不能一步一步地运行,每个事件都必须经过整个有向无环图。当每个事件通过有向无环图,Pulsar 将使维护这一保证变得容易得多。
StreamSQL 正处于测试阶段
StreamSQL 是一个基于事件源数据存储。它基于我们的数据基础设施,我们用它为我们的机器学习模型提供支持,该模型的最终用户超过一亿。我们已经限制了功能集,并开放了测试版,现在可供用户测试。
原文链接:
https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
评论 1 条评论