在去年 11 月的QCon Plus 2021上,Datazoom 的首席架构师暨Apache Airflow的 PMC 成员Sid Anand做出了在精益团队中构建高保真、近数据流处理为服务的演讲,在演讲中,Sid 给出了一个从底层构建高保真数据流处理的大师级教程。
介绍
在当今世界中,机械的智能化和个性化不断推进着用户的线上体验。无论是你最爱的搜索引擎的智能优化排序系统、音乐或视频推荐、“你可能感兴趣的人”,还是任何社交平台上根据喜好排序内容的系统,各种各样的数据不断被整合,并借此做出预测让人们保持参与感。虽然有些看起来很神奇的 SQL 的确可以做到数据整合,但随着现实中数据量的不断增长,将全部数据都存储在同一个数据库变得非常不切实际了。在十年之前我们还会用一个单体数据库来保存数据,但如今下面这张图或许更能代表我们所见到的现代数据架构:
图中所示的是由一或多个数据流动的基础设施服务打包在一起的、针对性解决方案集合,其重点在于数据的流动,而公司对数据复杂性管理通常有两种形式:批量处理或流处理。
是什么阻碍了流处理?
在前面我们所展示的图中,有很多可移动部分,而其宽广的表面积也意味着更高的出错概率。在流处理架构中,任何非功能性需求间的差距都是致命的。数据工程师们但凡是在构建系统时没有将“可 X 性”作为头等大事的,事后都要花费大把的时间在救急和系统维持上;这里的“可 X 性”指的是非功能性需求,如可扩展性、可靠性、可观察性、可操作性等等。更具体地讲,如果团队不以“可 X 性”为数据系统构建首要考量,那么数据管道的运维将会饱受中断和干扰的困扰,并通常会因此导致客户不满、团队成员焦头烂额并最终离开团队。
让我们深入研究,构建一个流处理系统吧。
从简单的开始做起
这是一段充满野心的征途,而就和任何大型项目或努力一样,我喜欢从简单的开始,比如定义一个目标:
我们想要建立一个系统,将信息从源头 S 传递到目标 D。
第一步是通过在 S 和 D 之间加一个消息中介来解耦,这一步骤很常见也很传统,是全球范围内通用的。这个消息中介系统可以是任何技术,我在这里选择的是Kafka。在系统之中创建一个 Topic(主题)E,代表流经这个主题的事件。S 和 D 通过事件主题 E 解耦,也就是说,如果 D 失败了,那么 S 可以继续向 E 发布消息;如果 S 失败了,D 也可以继续从 E 消费消息。
下一步是确定系统的具体实施部分。假设系统在云平台上运行,多数人第一反应是 AWS 或 GCP 等公开可用的云平台,但其实 Kubernetes 也可以,甚至两者混合也没问题。另外,作为项目一开始,我们可以从小规模入手,在主题 E 的一个分区中运行 Kafka 的同时,在三个可用区域分别运行一个代理以确保稳定性,将 RF 或复制因子设为 3。此外,S 和 D 需要在一个独立的EC2实例运行以提升系统可用性。
将流处理以服务的形式提供,意味着我们可以在流程 S 所托管的 API 端点接收输入消息,并在处理后发送向事件主题 E。流程 D 将会消费主题 E 中的消息,将其发布到互联网上的第三方端点上。
可靠性
我要问的第一个问题是,“这个系统可靠吗?”让我们重新编辑一下目标:
我们想要建立一个系统,将信息从源头 S 可靠地传递到目标 D。
更确切地说,应该加上这句话:
我想要零信息损失。
我们的设计需要根据这个需求变动什么吗?我们需要确保在流程 S 确认到远程发送来的消息后,流程 D 必须将该消息传递回发送者。那要如何建立系统可靠性?让我们先将系统一般化,假设我们有三个流程,而不是笼统的 S 和 D,这三个流程都通过 Kafka 主题线性链接。
同所有链式连接一样,这种连接的强度取决于最薄弱处;如果链中每一个流程或环节都是事务性的,那么这个链条本身就会是事务性的。我对“事务性”的定义是:至少有一次交付,这也是现在 Kafka 最常见的使用方式。那要如何让链中每一个环节都是事务性的呢?可以先将链中的处理环节分解出来,流程 A 是摄取节点;流程 B 是内部节点,负责从 Kafka 中读取并写入信息数据;流程 C 是输出或驱逐节点,负责从 Kafka 中读取信息并将其发送至互联网。
如何让流程 A 可靠?流程 A 会通过 REST 端点接收到一个处理消息 m1 的请求,在将数据以 Kafka 事件的形式可靠地发送至 Kafka 后,再给服务调用者发送一个 HTTP 响应。至于要如何可靠地将数据传输至 Kafka,A 需要调用 kProducer.send,入参有两个:主题和信息。之后立刻调用刷新(flush),清空 Kafka 内部的缓存并强制将 m1 发送向所有三个代理。因为配置了 acks=all,A 需要等三个代理都发回成功确认,才能响应服务的调用者。
那流程 C 要怎么才能变得可靠呢?流程 C 是用于读取数据的,通常是从 Kafka 中批量读取数据,一番处理之后再可靠地发送出去。在我们的示例中,可靠性意味着要接收到外部服务的 200 OK 响应代码。在流程 C 接收到 200 响应后会向下一个 Kafka 检查点驱动,一旦出现问题,流程 C 会负向 ACK(如 NACK)Kafka,强制流程 B 重新读取数据。
最后,流程 B 要如何可靠?流程 B 是流程 A 和 C 的组合体,流程 B 需要同流程 A 一样充当可靠的 Kafka 生产者,同时也需要像流程 C 一样充当一个可靠的 Kafka 消费者。
现在我们系统的可靠性是什么情况?万一进程崩溃,会发生什么?如果流程 A 崩溃,那么系统的摄取将完全中断,无法接收任何新消息。如果流程 C 崩溃,服务将停止向外部消费者传递消息,但从流程 A 中传来的消息仍会保存在 Kafka 中,流程 B 也可以继续处理,但直到流程 C 恢复之前,这些消息都不会被交付。
这类问题的常见解决方案是将所有服务都分别打包在一个大小为 T 的自动扩展组(autoscaling group)中,每个组都可以处理(T-1)个并发故障。自动扩展组这个词最初由亚马逊提出,但目前任何云平台,包括 Kubernetes 中都能适用。
滞后
就目前而言,我们的数据流处理系统似乎非常可靠了,但这种可靠性要如何衡量呢?根据可观察性。流处理系统有两个重要的关键质量指标:滞后和损失。如果你对流很熟悉,那么这两个指标应该都不陌生,但如果你是流处理方面的新手,那么请容我介绍下。
首先从滞后开始。
滞后只是对系统中信息延迟的一种衡量。
信息在系统之中的传输时间越长,其滞后性就越大,对企业的影响也就约打,依赖低滞后性见解的企业更会深受其害。我们的目标是尽量缩减滞后,以更快地提供见解。滞后性要如何计算?我们需要一个“事件时间”的概念,也就是一个事件或消息的创建时间。事件时间通常会存放在消息文件中,并随消息一同在系统中传递。对系统中任何节点 N 的任何消息 m1,都可以用以下公式计算滞后性。
那么在实际情况中会怎么样?
假设我们有一条在中午(T0)创建的消息,消息到达系统中节点 A 的时间是下午 12.01(T1)。节点 A 处理信息后传递到节点 B。消息到达节点 B 的时间是下午 12.04(T3).节点 B 处理后传递到节点 C,到达时间为下午 12.10(T5)。最后,节点 C 在处理信息后会将其发送到外部。
根据前面的公式,我们可以计算出消息 m1 和节点 C 之间的滞后时间是 10 分钟。但实际情况中,系统的滞后时间往往是以毫秒为单位,而不是例子中的分钟级。
我们在计算时不断提到的信息到达节点时间也被称作是到达滞后或滞后(lag-in)。另外需要注意,滞后是会累积的。在节点 C 计算出的滞后时间包含了其上游节点 A 和 B 上的滞后;在节点 B 计算出的滞后时间会包含其上游节点 A 的滞后。
处理到达滞后外,还有一种滞后叫离开滞后。离开滞后是从信息离开节点开始计算,计算方式同到达滞后的类似。参照下图,我们在 A、B、C 三个节点分别计算了离开滞后时间 T2、T4 和 T6。
在所有流处理系统中最为重要的滞后指标是端到端滞后(又称 E2E 滞后)。E2E 滞后是指消息在系统中的全部时间,直接计算系统最后一个节点的离开滞后时间,也就是我们系统中节点 C 的 10ms。
虽然计算出了某条消息(m1)的滞后时间非常有意思,但在我们要处理数十亿,甚至数万亿的消息量时,这个计算结果就没什么用了。因此,我们需要用统计数据来收集群体行为。我个人偏好用第 95 或第 99 个百分数(又称P95、P99),但别人更偏好 P99 或 MAX。
我们可以构建的统计数据有很多,可以计算在 P95 时整体的端到端滞后,或者任何节点上的到达滞后或离开滞后。另外还可以计算带有到达滞后和离开滞后的流程持续时间,也就是信息在链上任何节点内的时间。但这组数据有什么用呢?
让我们考虑一个更实际的例子。假设在一个上图中的线性结构里,有一条消息一路从红色、绿色、蓝色节点一路流转,最终到达橙色节点。这个结构其实是我们在生产环境运行的系统,上图是从我们在生产上的服务CloudWatch中截取的。如你所见,我们计算了每个节点的流程持续时间,并以饼状图展示。这让我们对系统中滞后性的分布情况有了大致的概念。这个系统非常完善,每个节点的滞后时间大致相同,没有特别突出的。把饼状图按时间摊开会得到右侧的图,可以看出性能在某段时间内基本相同。因此,我们会认为这个系统是很合适的,性能表现在任何时间范围内都保持一致。
损失
在分析完滞后后,我们再来看看损失。损失是一种衡量信息在通过系统时损耗的计量方式。导致信息损失的原因有很多,其中大多数都是可避免的。数据损失得越多,数据质量就越差。因此,我们需要尽量降低数据损耗,以提供更高质量的见解。那么要怎么计算流处理系统的损失呢?损失可以看作是系统中任意两点间的信息集差。还是以先前的线性结构为例,不过在系统其中流转的是十条信息而非一条。
我们可以通过损失表格来计算损失。表格中的每一行都代表一条信息,列则代表链中的节点。消息一成功通过系统,因此在每一列都是“1”;消息二同理。但消息三在成功通过红色节点后没能到达绿色节点,蓝色和橙色也没能接收到消息三。在表格最底部计算了系统每个节点中的损失情况,右下角则是计算了系统整体的端到端损失。在我们的例子中是 50%。
在流数据系统中,消息会不断流动。那我们该从哪里计算呢?我们需要利用消息事件时间,将消息分配到一分钟长度的时间桶内。比如在某天的 12.34 时,我们计算所有落入 12.34 这一分钟事件时间的损失表格,这个方案也可以应用到一天中的任何时间。
假设现在是下午 12.40,系统中的消息通常会延迟到达。我们现在可以看到下面这四张表都有统计数据更新。然而,到下午 12.35 表格中数据便不再变化,因此我们可以断定能到达的消息均已到达,并可以开始计算损失。时间早于这个时间点的损失表格都可以弃之不用,这样也可以裁剪掉不需要再计算的表格来缩小损失计算规模。
总的来说,我们等待几分钟的信息传输时间后再计算损失,如果损失超过阈值(如 1%),就触发损失警告。
在梳理清楚滞后和损失的定义后,我们就可以衡量系统的可靠性和滞后性了。
性能
那么我们系统的性能已经调整完毕了吗?让我们再回顾下目标。我们想要建立一个系统,将信息从源头 S 可靠且低延迟地传递到目标 D。要想更好地了解流处理系统的性能,我们需要先认识端到端延迟的组成部分。首先是摄取时间,这是指从接收请求的第一个字节开始,到发送响应的最后一个字节。这个时间段包含了所有我们在可靠地将消息发送至 Kafka 时产生的任何开销。在管道尽头还有“驱逐时间(expel time)”和“排放时间(egest time)”,这是指在 D 段处理和整合消息所需要时间,其余时间被统称为传输时间(transit time)。这三者共同构成了端到端的滞后时间
性能惩罚
在讨论性能时,我们也不能忽视为建造无损耗系统所牺牲的性能,我们需要用滞后换取可靠性。其中部分性能惩罚项如下。
首先是摄取惩罚(ingest penalty)。为了保障可靠性,S 需要对每个接入的请求调用 kProducer.flush,等待 Kafka 传来三个 ACK 才会发送 API 响应给客户端。虽然我们并不能干掉这些摄取惩罚,但却可以利用批处理来平摊成本。也就是说,我们可以通过支持推广批处理 API,在 API 调用滞后时间不变的情况下最大限度地提高吞吐量,在每个网络请求中获取多个消息。
同样,还有驱逐惩罚(expel penalty)。有一个需要注意的点是,Kafka 的速度很快,远比公共互联网上常见 HTTP 的往返时间(RTTs)还要快很多个数量级。事实上,大部分的驱逐时间就是 HTTP 的往返时间。同理,这里我们也采用分摊开销的方法:在每个 D 节点中,都增加批处理和并行性。我们从 Kafka 中读取批量数据后重新批处理为更小的批次,然后通过并行线程将这些批处理发送到各自的目的地。这样,我们就可以最大限度地提高吞吐量,减少单个消息的驱逐成本或惩罚
最后但同样重要的是重试惩罚(retry penalty)。为保证管道数据的零损失,我们需要在 D 端不断重试消息,只要次数够多就会成功。我们对 D 端调用的远程端点没有任何控制,这些端点可能会遭受瞬时故障,或者时不时对 D 进行限制,以及其他我们无法控制的事情都有可能发生,我们必须通过不断重试才能确保成功。这些情况被称作是可恢复故障,但仍有一些故障时不可恢复的。例如,4 开头的 HTTP 响应代码,除了常见的节流响应代码 429 之外,我们都不应该继续重试,因为即使重试也是不会成功的。
也就是说,为了处理重试惩罚,我们必须以滞后性为代价,并慎重考虑需要重试的内容。这一点前文已经提过了,我们不应该重试任何不可恢复故障,并巧妙采用重试的方式。
性能-分级重试
我有一个叫做“分层重试”的想法,可以参考下面的架构图:我们将架构分为两层,本地重试层和全局重试层。在本地层中,我们尝试低频在 D 端发送一个可配置次数的消息,这一层是为尝试在远程目标短暂或瞬时中断期间重试消息传递。
如果耗尽本地重试次数,D 端会将消息转移到全局重试服务(gr)中,全局重试层会在长时间内不断重试,以期望能超过目标的中断时长并成功传递消息。通过将消息重试任务转移到全局层,D 服务可以将资源用于传递正常消息。但需要注意的是,D 服务可能会需要向不同的远程服务或端点发送消息,也就是说,如果其中之一的远程目标中断,而其他目标完全正常。流处理系统中全局重试和服务 D 都会由 Kafka 主题分隔为 RI(Retry_In)和 RO(Retry_Out)。
这种方式的好处在于,实践中我们所遇到的全局重试概率远低于 1%,甚至常常连 0.1%都不到。因此,即使这些重试会需要更长时间,它们也不会影响我们的 P95 端到端延迟。
可扩展性
目前为止,我们已经有了一个小规模的、运行良好的系统。那么这个系统要如何随流量的增加而扩容呢?
首先是辟谣,可以处理无限制规模的系统并不存在。很多人会以为,通过转移 AWS 或其他托管平台可以实现无限制规模系统,但现实情况是,每个账户都会有限制,因此流量和吞吐量都是有上限的。从根本来说,每个系统都有自己的流量等级,无论这个系统是在哪里运行,流量等级都是通过对系统的负载测试来衡量的。我们只有通过迭代运行负载测试并移除瓶颈才能到达更高的规模。
在对数据流自动扩缩容时,我们一般会有两个目标。目标一是自动扩容以保持低延迟(如减少端到端滞后性),目标二是缩容以减少成本。我们在此先关注目标一。在自动扩容时,有不少是需要考虑在内的,首先,什么可以自动扩容?至少在过去十年间,亚马逊已经可以做到计算的自动扩容了,所有的计算节点都是可自动扩容的。那 Kafka 呢?Kafka 目前还不支持自动扩容,但未来或许可以。
如何正确挑选合适的指标来触发自动扩缩容操作是非常关键的。我们所选择的指标必须要有低延迟,要能够随流量的增加而增加,随微服务的扩容而下降。根据我的经验,平均 CPU 是很好的衡量标准。但还有一点需要注意,如果你的代码中有锁、代码同步,或者 IO 等待,那么你可能会没办法触发系统 CPU 的过饱和。随着流量的增加,CPU 用量也会到达高峰并导致自动扩缩容操作停止,系统延迟增加。如果你看到系统中出现这种情况是,最简单的解决方法是将阈值降低到 CPU 峰值之下。
结论
现在,我们的系统拥有了期望中的所有非功能性需求。虽然这篇文章中覆盖了很多关键要素,但还有很多没有提及的:隔离、容器的多级自动扩展、流处理操作者,以及缓存的架构。
原文链接:
评论