本文原作者为 Optimizely 的分布式系统工程师 David Yu。
我们在 Optimizely 公司的使命就是帮助决策者们把数据转变为行动。这需要我们能够快速并可靠地移动数据。我们每天要处理几十亿个用户事件,包括浏览网页、点击和定制事件等。能以最快的速度将与用户有关的关键业务信息提供给我们的客户,这一直都是我们最高优先级的任务。正因如此,我们一直都在寻求创新的方式来改进我们的数据处理流水线。
在本文中会介绍为了给客户提供最实时的指标数据,我们是怎样把我们的数据处理流水线由批处理的方式转变为流处理的。
动机
统一。在过去,我们为不同的用途而使用了两种数据库:用 HBase 存储计算实验指标,而用 Druid 来计算个性化的结果。对这两套系统的需求是大相径庭的:
实验
个性化
快速导入数据
导入数据可延迟
秒级查询延迟
亚秒级查询延迟
访问者级别指标
会话级别指标
可是,随着业务需求的不断演进,扩展开始变得非常困难。为了满足业务需求而维护一套 Druid+HBase 的 Lambda 架构已经成了技术团队不能承受的负担。我们需要一套新的解决方案,来减少后台的复杂度,并提高开发效率。更重要的是,一套统一的计算基础设施可以成为通用的平台,满足我们未来的许多产品需求。
一致性:如上文所述,这两种计算基础设施提供了不同的指标和计算能力。比如说,实验结果可以让你知道有多少用户访问了你的登陆页面,而个性化结果却告诉你是有多少个会话。我们希望能为客户提供一致的指标,并为所有的产品都提供这两种类型的统计数据。
实时结果:基于会话的结果是通过 MR 任务算出来的,有可能在收到事件之后延迟几个小时才能得到。实时的解决方案则会为客户提供他们的数据的最新视图。
Druid + HBase
在之前的文章中,我们介绍过后台的数据输入流水线,和我们是怎样使用 Druid 和 MR 来基于用户会话存储事务统计数据的。我们从 Druid 得到的最大好处就是查询时低延迟。可是它本身也有许多固有缺点。比如,因为分段文件是不能更改的,所以不可能增量地更新索引。结果必要时我们只好每隔一段时间就不得不把用户事件重新处理一遍,这样才能解决乱序事件之类的数据问题。另外,我们也不能过于增加分片的数量,因为耗时过长的查询会变得代价巨大。
另一方面,我们也用 HBase 做用户访问计算。我们把每个事件写入一个 HBase Cell,这样就为支持我们运行的查询带来了最大的灵活性。比如,当一位客户想要知道“有多少个用户曾经做过添加购物车的动作”时,我们只需要把范围内的数据全遍历一遍就好了。因为事件都是经过 Kafka 准实时地推送到 HBase 里的,数据基本上可以反映最新的情况。可是,我们现在的表的模式没办法保留与每个事件相关的所有元数据。这些元数据又包含着许多通用的信息,比如浏览器类型和地理位置,还有用于定制数据分区的特定用户标签等等。这些数据的冗余让我们没办法支持非常多的用户分区,分区越多,我们存储的代价就越大,查询遍历时间也越长。
SessionDB
因为优化 Druid 索引越来越难,所以我们决定抛弃 Druid 这个方案,专心改进我们的 HBase 数据表现。提前累积事件,并将冗余信息压缩掉就成了最明显的改进方案。因此我们决定求助于 Samza。
由于 Samza 可以无缝地与我们的分布式消息队列 Kafka 整合在一起,所以对于我们的需求是非常合适的。在第二部分我们会详细讲述这个实时累积的过程。但是从比较高层的角度,Samza 可以不断地把事件按照会话来打包,并且周期性地将未写出的数据片段以流的方式写入 HBase。有了这个方法,每个 HBase Cell 就都成了一组事件的统一视图。
这带来了许多好处。首先要提到的就是我们计算各种不同统计数据的核心逻辑在最大程度上保持了不变。既然我们做的量最大的基础运算就是求和(当然这是极度简化了的),把一堆数字加起来,这和把一串累积值加起来是等价的。
我们得到的第二个好处是,会话级的信息是可以马上得到的,所以我们就可以从 HBase 中查询得到会话指标,实时地回答诸如“每个用户会话产生的平均收益是多少”这样的问题!毫不意外,新创建的 HBase 模式名就叫 SessionDB,这成了我们后台程序标准化的基础。
最后但同样重要的,HBase 的存储需求急剧下降,所以查询也可以运行得更快了。通过累积会话级别的元数据,我们不必再在各个 Cell 之间同步浏览器类型、位置和用户规模之类的信息。下图显示了平均查询延迟(X 轴)相对于不同的用户规模(Y 轴)之间的对应关系。比如每个会话上平均产生 10 个事件,中等的查询延迟降低到了 5 毫秒,以前的值是 40+ 毫秒(黄色线)。
背景
会话聚合(众所周知也叫会话化)并不是我们在 Optimizely 做的第一个流处理用例。我们曾经将流处理应用到各种 ETL 任务上,比如:
- 数据增强:为事件回填缺失的元数据;
- 事件流重新分区:针对各种下游用例;
- 实时实验指标:计算每次实验中得到的事件计数值;
现在已经有了许多可以用于生产的流处理框架,比如 Samza、Spark、Storm 和 Flink 等。我们选择 Samza 的原因有好几点。首先,Samza 允许你把若干个 Kafka Topic 串连起来,形成一个处理拓扑,这就有了很高的隔离性。因为我们鼓励工程师们提出不同的想法,这种可插入性给我们的数据处理流水线带来的连锁反应最小。其次,Samza 可以很容易地与 Kafka 和 YARN 整合起来,而我们的系统中后两者用得非常多。另外,Samza 的延迟非常低。它提供了一个简单的编程模型,以及非常易用的状态管理框架,所有这些都非常适合我们的需求。
Apache Samza 是什么?
从一个很高的层次来说,一个 Samza 任务可以从一个或多个 Kafka Topic 中消费流式的数据。然后它对这些事件做某种计算处理,一次一种,然后再把输出结果写到一个或多个下游 Kafka Topic 中。
关于 Samza 任务的一个值得注意的特征是,它们具有很高的可组合性。为了帮助你将 Samza 任务可以做的事情可视化,你可以把它们当成一批实时的 MR 任务,由 Kafka Topic 串连在一起。上图中显示了一个任务可以有多个输入流,以及任务之间如何连接起来,形成一个复杂的工作流。
Samza 是可扩展的、容错的和有状态的。我们会简单地涉及这些方面内容,因为我们的会话化任务会在某种程度上利用这些功能。
扩展性:就像 MR 任务一样,Samza 任务也是高度并发和可扩展的。不同点在于,与基于文件不同,Samza 的并行是基于分区的。一个任务被分成多个 Task。拿 MR 来做类比,一个 Task 大概可以相当于一对 Mapper+Reducer。Task 执行“Map”时,就是把事件输出到相应的指定 Kafka Topic 分区中。同时,从输入流分区中出来的事件全都会汇总给一个 Task 进行处理,这个行为和 Reducer 很像。
有个很重要的方面要牢记在心,一个输入 Topic 的分区总是静态地被分配给某个 Task 实例。那么,Task 实例的数量也就直接与任意指定输入 Topic 的最大分区数量有关(如下图中的例子所示)。当你第一次提交一个任务时,这种映射关系就建立起来了,并且出于容错的考虑被持久化到某个 Kafka Topic 中。这种耦合会有助于简化 Samza 的状态管理模型(下文中会讲到),因为每个 Task 只要管理好自己的状态就可以了。
如果 Task 的数量是固定的,那我们还怎么横向扩展?一个 Samza 任务可以通过增加运行着的容器的数量来横向扩展。当 Samza 运行在 YARN 集群(也是目前为止唯一一个支持运行 Samza 的分布式环境)之上时,每个 Task 实例都是在一个容器中运行的,容器也是计算资源的唯一单位。依据一个 Task 需要的计算能力的多少,你可以把所有东西都运行在一个容器中,也可以按 Task 的数量去创建容器,来为每个 Task 提供最强的处理能力。上图就表示出了这种弹性。
容错:Samza 承诺,假如一个容器崩溃了,不管出于什么原因,在容器重新启动后,那个容器里面的所有 Task 都可以从之前崩溃的点恢复。为了支持这个功能,Samza 会定期地记录检查点,把各个 Task 消费完的偏移量记下来。它也会为它内部的状态数据库上发生的变更记录检查点。在把所有这样的信息都记入 Kafka 之后,任务恢复功能就可以通过消费和重放变更操作的流来重建各种各样的内部任务状态。
无状态:大多数流处理系统都要记录某些状态,你在生成用户指标时可能会是某个指定用户的事件数量,而对于我们的案例,则是某个指定的会话里面触发的全部事件。为了实现真正意义上的实时流处理,切合实际的作法必然是将状态数据保存在 Task 的本地,而不是别的地方。Samza 自带地提供了 RocksDB 来做为一种有效的 KV 存储(就像一个嵌入式 HBase),来保存那些数据量过大而无法保存在本地内存中的状态数据。如上面所述,因为在变更日志的 Topic 中已经将相应的改动持久化了,在 Task 出故障时这个数据库也是可以恢复的。
在了解了 Samza 的各种基本概念之后,现在就可以继续介绍我们是如何利用这个工具来不断地将各种事件归入会话的了。
实现
简单来说,一个会话就是相对比较快速地连续触发的一系列用户事件。为了可以方便地将会话划分成不同的类型,我们要求上游的客户端为每个事件都指定一个会话 ID。举个有代表性的例子,如果某些事件它们发生的时间间隔少于 30 分钟,那就为它们指定相同的会话 ID。
这样的话,Samza 用于聚合事件的事件处理逻辑就变得非常清晰了。每个任务维护一个基于 Task 的 KV 数据库,使用会话 ID 做为主键。在收到一个事件时,任务就会在 KV 数据库中进行查找。如果会话 ID 不存在,它就创建一个新的会话,否则就更新相应的会话。更新操作常常会带来数据合并。一般的元数据(比如 IP 地址、位置信息、浏览器版本等)都是在顶层聚合的,而事件的具体信息(比如事件类型、添加到购物车的价格等)就保存成了一个列表。
如之前的博客中所述,每个 HBase Cell 现在都存储了一个会话。为了可以不断地用会话的最新版本覆盖掉 Cell 的内容,我们要不断地将 KV 数据库里的镜像写入某个 Kafka Topic。
我们之前提过的一个关于我们设计的很重要的问题是,为了让 HBase 中的数据保持最新,我们该以怎样的频率来将当前会话的信息保存到 HBase 中?我们可以在一个会话处理完了一个事件之后立刻就做这个操作。这么做的好处是它将会话更新延迟最小化了,也产生了最实时的结果。不过弊端是,这样做也会产生大量的会话更新操作,也就极大地增大了我们 Kafka 集群的负载。所以,我们利用了 Samza 的窗口功能,每分钟将会话数据成批地写入 HBase。如果某个用户在 60 秒内触发了 10 次点击操作,那么在这一分钟结束时,我们就会生成一份会话数据,其中包含 10 个事件,而不是生成 10 份会话数据。这极大地减少了通过网络发送的数据量。
挑战与教训
和会话化的逻辑听起来一样明显,我们在进行会话化开发的过程中也遇到了各种各样的挑战。
遍历 RocksDB 的操作会影响任务的吞吐率:如上文所述,Samza 窗口在导出所有会话的快照时,会将所有的会话挂起。因为每个 Samza 进程都是单线程的(在将来 Samza 会引入多线程处理的版本),所以事件处理实际上会被阻塞起来,直到窗口结束。这意味着在极端情况下,如果窗口持续的时间超过了配置的窗口之间的间隔,那事件处理过程就压根没机会执行了。为避免这样的事情发生,你应该让窗口内运行的操作尽可能的高效,所以也该避免频繁地对 RocksDB 进行全量扫描。就我们而言,我们的方法是在内存中保存了一个简化版的 KV 数据库内容,就是为了避免这个问题。遍历一张简单哈希表的操作,毕竟还是会比对 KV 数据库进行磁盘扫描快得多。
要小心 Kafka 的日志压缩功能:日志压缩功能让 Kafka 根据特定的策略删除一些日志记录,而不是简单地在每个分区的末尾将内容整块删除。启用了日志压缩功能的 Topic 将只包含每个主键的最后一条记录。如果相应的值是 NULL,那么 Kafka 就会在配置的时间超时之后,将那个主键删除。首先,如果你使用的 Kafka 版本比较旧(就是 0.8.2),请保证配置项 log.cleaner.enable 被设置为 true 了。其次,某些 Samza Topic,比如检查点或者变更日志之类的,就不会被压缩。在某次重新部署的过程中我们发现了这一点,当时那个任务经过了几小时才恢复处理。最后它要消费几 TB 的未压缩的变更日志,才能完成每个 KV 数据库的物化。
关于日志压缩功能我们得到的另一个教训是,如果没有删除操作的话,你就不该把这些内容写入一个有日志压缩功能的 Topic。关于这一点的收获是,配置项 log.retention.bytes 压根不适用于有日志压缩功能的 Topic。当我们的输出 Topic 不断地增长并且最终耗尽了我们的硬盘时,我们才最终明白了这一点。
为 RocksDB 配置 SSD 硬盘:在开发过程中我们注意到,有的时候有一些 Task 的处理速度会跟不上它们接收消息的速度。进一步的调查表明,这些 Task 的处理时间和窗口时间都相对比较长。最终我们发现根本原因在于它们是慢在与底层的 RocksDB 数据库的交互过程,是配给 YARN 实例的磁盘的缘故。这些 Task 的容器是部署在节点管理服务器上的,配置的是 EBS 磁盘。因为 RocksDB 针对在 SSD 或内存这样的快速存储上进行的快速压缩操作做了高度优化,所以在为它配置 EBS 硬盘时,就极大地削弱了 RocksDB 的 IO 性能。
感谢陈兴璐对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ , @丁晓昀),微信(微信号: InfoQChina )关注我们。
评论