知乎为实现精细化运营,提高运营效率,依赖 Apache Doris 构建了内部统一的运营分析平台——舰桥平台,主要应用于事实接入层、事实建模层和事实运算层等架构核心层的建设,并持续对导入、查询等方面进行性能调优,最终实现上千亿行数据分钟级导入,千亿级数据秒级查询响应。该平台当前已经广泛应用于知乎不同事业部的社区、商广、教育 &会员、技术中台等领域,得到各部门广泛认可。
作者|知乎舰桥平台 Leader 侯容
在长期的业务运营中,知乎团队发现在内容运营、创作者运营、热点运营等许多场景中,运营团队需要依赖 SQL 或自行编写 SQL 代码来对用户信息、业务数据进行查询分析。这往往需要投入大量的精力,造成人力投入大、工作效率低等问题,无法实现精细化运营,无法高效完成业务目标。
为了解决上述问题,知乎舰桥平台应运而生。舰桥平台是知乎内部统一的运营分析平台(即一站式内容 &用户管理平台),主要应用于知乎的六大核心运营场景,包括找人、找内容、盯人、盯内容、找机会、查问题场景。该平台当前已经广泛应用于知乎不同事业部的社区、商广、教育 &会员、技术中台等领域。
知乎舰桥平台的基础能力包括筛选、分析、打包和监控,这些能力都不同程度地依赖 Apache Doris 提供的计算、存储和分析能力。在本文中,我们将主要介绍 Doris 在舰桥平台中的应用,以及在 Doris 的优化实践。
业务架构
如业务架构图所示,知乎舰桥是一个数据密集型的一个应用,架构共由五层组成,这里对较为重要的层级进行介绍:
数据层和事实层: 数据层主要由内容数据、用户数据和流量数据组成,考虑到原始数据不具备可展示性和可描述性,因此我们将原始数据抽象出内容事实、用户事实和流量事实,并存储在事实层以供上层应用。
基础能力层: 在基础能力层我们搭建筛选、打包、分析和监控四大基础能力。一般来说先按照业务要求筛选出目标用户数据,接着对这些用户进行下载并打包,打包后形成内容池、人群包或领域(可别用于投放、推荐、Push、推送等场景);同时我们提供了基于筛选、打包、全栈等的多维度面向分析的能力,具体体现为榜单、分布、趋势、明细等;除此之外还提供了监控的能力,包括实时/定时监控、监控模板和监控协作。
业务工具层: 我们将基础能力层的四大基础能力搭建成不同的业务工具,分别为榜单 &列表、业务分析、异动发现和问题诊断,用于支持业务侧的不同行动。
基于业务架构,我们思考虑应该通过一个怎样的技术架构可以低成本、高效率的实现我们的需求,因此我们先对技术架构进行了模块职责的划分,并希望各模块可具备以下能力:
人机(UI )界面:以用户体验为中心,构建高效易用、简单易懂的 UI 界面,帮助运营同学快速理解并上手。
协作能力:针对多场景、多部门的业务需求,构建统一完备的协作平台,最大程度地降低业务交互成本。
核心业务能力:需要将数据进行业务抽象,确保所有需求都在已知的概念中被定义,方便使用、降低使用成本。
事实运算:需要支持大规模数据的高效低延迟复杂运算,以满足各业务线的运营需求。
事实建模:隔绝接入层与业务层,提高迭代效率,以便更快地满足业务需求。
事实接入层:支持海量数据导入,并能够实现大量数据快速导入,同时支持敏捷接入,低成本扩展,以适应业务的快速发展和变化。
技术架构
为了建设符合要求和目标的技术架构,我们对多个大数据组件进行了调研选型,在调研中发现,Apache Doris 各方面能力都比较优秀,可以提供多种数据导入方案、拥有便捷易用的建表能力、更灵活的的物化视图以及对向量化的全面支持,基于这些优异性能,最终我们决定引入 Apache Doris 建设舰桥平台技术架构,并被主要应用在舰桥平台的三个核心层,即事实接入层、事实建模层和事实运算层。
事实接入层: 事实接入层采用了 Segment 文件预处理技术和大规模导入技术,极大程度地加速数据从 HDFS 导入到 Doris 的速度,在此过程中,广泛应用了 Spark 技术,此外我们还通过 Flink 直接将另一部分数据流式写入,数据流式写入有两个步骤:一部分通过 Flink Connector 直接写入的,另一部分先通过 Flink 完成 ETL 处理,再通过 Routine Load 完成写入。该层借助于 Doris 丰富的 Load 协议,实现了多种数据的规模化快速导入。
事实建模层: 事实建模层我们对业务进行了梳理和拆分,搭建了合适的业务模型,包括用户模型、内容模型、流量模型等等,同时还包括业务场景化的模型,例如主题模型和分层模型等等。因为 Doris 具有数据结构管理简单的特性,可以帮助我们快速试错和优化数据模型,极大程度的提升了数据模型迭代的效率。
事实运算层: 事实运算层我们采用了数据和机器预绑定的技术,并应用了 Doris 的向量化技术和物化视图。此外,我们还进行了大量的调优工作,例如,查询计划的调优、数据结构优化、算子合并技术等,从而实现性能的优化。
在基于 Doris 的事实接入、事实建模和事实运算层的支持下,我们高效地搭建了核心业务能力、人机界面和协作能力,最大程度地满足业务需求,充分达成了业务架构提出的目标。因本文以介绍 Doris 的应用为主,其他层的将不做具体描述。
优化实践
大量数据快速查询
在人群圈选和筛选场景中,我们需要处理大规模的数据,包括 240 万个标签、千亿级别的对象和标签量的关联数据,同时,我们需要在极短时间内完成查询操作,通常要求在 1s 内返回查询结果,10s 内完成数据打包,时效要求非常高。那么怎样可以实现大量数据的快速查询呢?
步骤 1:分而治之
分而治之的核心思想是将整体数据的与或非,转化为分组与或非后的合集。如果将它变成了一种倒排的 Bitmap,就能变成绘图的交并差。
我们发现整体数据的交并差等价于先对某一个分组数据交并差、再进行合并操作。在这个基础上如果先将整个 Bitmap 取出完成交并差,实际上可以理解为只有一个线程在运算(实际不是),基于该发现我们可以先将每一个数据进行交并差,这样就可以将其拆分成与分组相同数量的线程或队列进行计算,计算完再由一个队列来进行数据合并。
优化前一般是在一个存储区中存储所有的特征,每个特征分布在不同的机器上,而在上述思路的驱动下,我们修改了分组策略,先将人群特征分为许多小的分组,并将特征随机分布在不同机器上进行计算,通过该操作最终实现了速度的明显提升。
以用户筛选为例:
通过将用户 id 分组,如每 100 万 id 为一组,设置一个 group_id。
将该分组下不同用户特征、标签统一指向分组 group_id。
先在每一个分组中计算特征、标签计算的与或非(即并差)。
当分组数据完成计算后,最后进行数据汇总。
同时开启多线程模式,提升每组的计算效率
然而,在这个过程中,我们又遇到了第二个问题,即特征计算带来了非常大的网络开销。这是因为各个特征随机分布在不同的机器上,这就导致在一个机器上完成了一部分特征运算,然后执行 Shuffle 进行数据交换,再进行第二次运算,再交换进行第三次运算,以此类推,假设条件非常多,网络开销就会非常大。
步骤 2:数据机器预绑定
我们探索并发现 Doris 的 Colocate 原理可以有效解决该问题,利用 Colocate 可以减少数据 Shuffle 的次数,从而减少运算的次数。因此我们我们尝试使用对数据分布和机器进行预绑定,数据机器预绑定应用了 Doris 底层的 Colocate 原理。
我们将某一个分组 Key 和机器进行绑定,当数据与该分组 Key 相对应,该数据将存在某一台机器上面,从而完成数据和机器的预绑定。通过该方式可以避免在特征计算中出现频繁网络交互和数据混洗操作,从而大幅降低网络开销。
如下图所示为优化前的流程,数据进行不停的交换,查询计划非常高,网络开销非常大。
下图为利用 Doris 的 Colocate 原理进行优化的结果,可以发现查询计划相比较之前少了很多,简单数据处理后即可完成,同时速度也非常快,主要归功于查询计划的降低占用了比较少的网络开销。
步骤 3:算子合并
在解决网络开销问题之后,我们开始思考如何加速执行的效率,因此我们引出了算子合并(非官方命名)这一概念。其原理是使用更复杂的函数代替原先简单的函数组合,在这个过程中,我们与 SelectDB 团队和 Apache Doris 社区与进行了多次沟通及配合,将日常使用的函数组合进行开发和落地,将合并组合好的函数进行上线使用。以下为拼接函数组成介绍:
bitmap_and_count == bitmap_count(bitmap_and(bitmap1, bitmap2))
bitmap_and_not_count == bitmap_count(bitmap_not(bitmap1, bitmap_and(bitmap1, bitmap2))
orthogonal_bitmap_union_count==bitmap_and(bitmap1,bitmap_and(bitmap2,bitmap3)
比如我们需要进行一个数据查询,用简单的函数和复杂的函数处理流程如下图所示:
简单函数:先查出数据,再执行
bitmap_and
,中间存储,执行bitmap_not
,再进行中间存储,最后执行bitmap_count
,输出结果。可以看出处理流程很长、速度很慢。复杂函数:如果使用合并后的函数
bitmap_and_not_count
,当我们直接将数据输入到这个函数里,就可以输出结果。输出速度相比之前大幅提升,从而提升了查询效率。
大量数据快速导入
在离线导入场景中,由 Hive 完成大量数据计算,这些数据文件写入到 HDFS 中,我们将定期通过 Broker Load 将 HDFS 中的的数据拉取到 Doris 里。在这个过程我们发现,在限定的集群资源下,当遇到大数据量导入操作,Broker Load 则会出现超时。
经排查发现 Doris 从 HDFS 拿到 Parquet 之后,需要先进行解压缩,再进行分桶数据传输,最后经过排序、聚合、再压缩等一系列操作生成 Segment 文件,而这些过程都会在 Doris BE 上进行,同时我们还会在此基础上进行 Bitmap 操作,从而导致 CPU 压力增大。
经过探索,我们发现 Spark Load 可以很好解决该问题,Spark Load 可以将导入拆分为计算和存储两部分,将分桶、排序、聚合、压缩等计算逻辑放到 Spark 集群,产出结果写到 HDFS,Doris 再直接从 HDFS 中拉取结果文件写到本地盘。
Broker Load :BE 节点负责计算,算力取决 BE 节点个数及配置。
Spark Load:Spark 集群负责计算,算力取决于集群配置,且弹性强。
我们将 Segment 文件预处理移至 Spark 后,速度有了明显的提升。当前 1.2 TB、1100 亿+ 行数据,导入时间从 9 小时缩短为 55 分钟,速度大幅提升,其中 Doris 的使用时间缩短到了 20 分钟,另外 35 分钟在 Spark 集群上,有效降低了 Doris 集群负载。
在探索海量数据快速导入的过程中,我们遇到了一些问题,并成功地解决了它们。在这个过程中,我们积累了许多宝贵的经验和解决方案,现在将这些经验和方案分享给大家,希望能为大家提供帮助。
HDFS 权限认证问题:
知乎当前的 HDFS 是使用 Symbol 方式认证的,这与很多其他公司不同。我们发现,Spark Load 命令处理完后,将转发到 Spark Launcher,再由 Spark Launcher 执行 Spark Submit 命令。在这个过程中,不会传递环境变量,因此我们无法将用户名和密码传递给 Spark Submit 再执行,并且也无法将它们配置到环境变量中。而在实际场景中,我们需要使用不同的用户名和密码来读取不同的数据进行导入,因此,我们增加了动态设置和环境变量等功能来解决这个问题,目前相关 PR 合并到了社区中。相关 PR:https://github.com/apache/doris/pull/12276
Doris 拉取 Spark 产物速度慢
在 Spark 完成计算之后,我们发现 Doris 拉取产物的速度比较慢的问题,经过进一步跟踪发现当在处理小规模数据时,能够在一分钟内处理完一个文件,但当数据规模变大时,则需要花费五分钟才能处理一个文件。那么是否可以通过调高任务数来提高速度呢?于是我们根据线上实际的超时情况和导入速度要求,最终决定将下方参数从 3 增加到 9,结果发现速度立即得到了明显的提升。
push_worker_count_high_priority
:改为 9。push_worker_count_normal_priority
:改成 9。
参数调整后不仅大幅提升了拉取速度,单个 BE 写入速度达到 120MB/s, IO 和 CPU 资源也得到了更充分的利用。
通过这次调参我们发现,大家可以根据实际需求来调整以下三个参数,以解决拉取产物速度较慢的问题:
push_write_mbytes_per_sec
:BE 磁盘写入限速。push_worker_count_high_priority
: 同时执行的 Push 任务个数。push_worker_count_normal_priority
: 同时执行的 Push 任务个数。
隐式转换改为显示转换
在使用 Doris 向量化版本的过程中,由于我们有很多基于 Bitmap 表的计算,在使用隐式转换时会出现无法导入 Bitmap 表的问题。为了解决这个问题,我们禁止隐式转换并开启了显式转换,并将相关的 PR 合并到了社区中。相关 PR:https://github.com/apache/doris/pull/12394/files
Spark 聚合速度 慢
由于数据存在倾斜,导致在 Spark 数据聚合速度比较慢,基于此,我们重新按照离线计算的一个 Key 来进行分组,新增一个 Bucket 列,以解决数据倾斜导致计算速度慢的问题。
并发 数量限制
我们在 Spark Load 的 Spark DPP 代码中发现:在 stage 2 的过程中,任务的并行上限为 200,这导致在面对数据量非常大的任务时,写入速度非常慢。为解决这个问题,我们增加了自适应的并发数,并将相关的 PR 合并到了社区中。相关 PR:https://github.com/apache/doris/pull/12186
性能提升
Apach Doris 1.1 版本实现了计算层和存储层的全面向量化、正式将向量化执行引擎作为稳定功能进行全面启用,性能较之前版本有 3-5 倍的巨大提升;并在 1.2 版本所有模块都实现了向量化,包括数据导入、Schema Change、Compaction、数据导出、UDF 等,查询性能较非向量化版本大幅提升。因此在 1.1 向量化版本推出后,我们针对某些重要场景进行向量化迁移,并主要逐步在所有场景中应用。
当我们从 0.15.3 迁移到 1.1 版本之后,给业务带来非常明显的收益,大多数场景均能达到 5 倍以上的响应速度提升,个别场景响应速度甚至可以达到非向量化版本的 10+ 倍,我们分别对以下 7 个场景的查询耗时进行了对比。
场景 1:简单(数百)圈人条件,百万级别 Bitmap 人群打包
场景 2:复杂(数千)圈人条件,上亿级别 Bitmap 人群打包
场景 3:多维度(6 种)筛选、单表查询、单日期指标宽表、数据聚合 SUM,单日数据量 1.8 亿+
场景 4:(6 种)筛选、单表查询、多日期指标宽表(周期:15 天)、数据聚合 SUM,单日数据量 1.8 亿+
场景 5:单表查询、COUNT 计数,单日数据量为 1.8 亿+
场景 6:多表查询,A、B 各表数据量为 1.8 亿+、1507 万+。A 表涉及每天数据 SUM 聚合、COUNT 聚合,B 表涉及 Bitmap 聚合,A、B 先聚合再与 C 表 Join,子表再依次 Join,Join 次数共为 6 次。
场景 7:5 亿+ 数据明细分析及单表查询
未来展望
在技术上我们将在查询和写入方面进行优化,在查询方面将实现图引擎,现阶段的业务场景主要通过 Doris OLAP 和 Doris On ES 实现了多维分析和全文检索,未来随着业务发展,关系场景将越来越多,基于此我们将尝试通过 Doris 扩充图引擎,最终在多维分析和全文检索的基础上实现与图引擎的结合。根据近期社区动态得知, Doris 对图数据库 Nebula Graph 支持的 PR 已经就绪,将在未来版本中正式发布(相关 PR:https://github.com/apache/doris/pull/19209) 。 在写入方面我们将实现 Spark Load 底层解耦,Spark Load 底层实现时,目前 Doris 和 Spark 是耦合的,导致在使用时有诸多不便、无法大规模使用。未来我们计划将 Spark 和 Doris 解耦,不需要 Doris 来提交任务就可直接在 Spark 提交生成产物 Doris Segment 数据文件,完成后通知 Doris 下载 Segment。
在业务上, 我们计划与实验平台展开合作,将目标制定及完成的判断从人工把控转变为自动配置实验和验证。同时我们也将进行业务插件化能力建设:
插件化架构落地,联合业务提供相对完善的产销联动工具链。
将原有通过人工维护的流程,以工具链的形式配置,充分发挥运营同学的核心竞争力,整体降低业务成本。
Apache Doris 在 2.0 Alpha 版本中已经实现了单节点数万 QPS 的高并发点查询能力、高性能的倒排索引、基于对象存储的冷热数据分离、基于代价模型的全新查询优化器以及 Pipeline 执行引擎等,欢迎大家下载体验。为了让用户可以体验社区开发的最新特性,同时保证最新功能可以收获到更广范围的使用反馈,我们建立了 2.0 Alpha 版本的专项支持群,请大家填写申请,欢迎广大社区用户在使用最新版本过程中多多反馈使用意见,帮助 Apache Doris 持续改进。
评论