云积互动,全称深圳市云积分科技有限公司,成立于 2014 年,是国内领先的 AI 驱动的消费者运营服务提供商,致力于发展消费者运营相关理论、技术、算法、模型及软件工具,为全球消费性企业提供基于 AI 的消费者运营系统及运营策略服务,打造消费者运营领域最佳服务和实践标准,帮助企业构建消费者运营核心能力,以应对当前及未来的场景化运营挑战。
目前已成为天猫、京东、抖音、腾讯等主流电商和社交平台深度合作伙伴,服务客户 2300+,其中世界 500 强客户超过 18 家,包括全球第一的美妆、日化、医药集团,均深度服务超过 7 年。
业务需求
云积互动的主要业务是以消费者运营为核心,包含了会员通,CRM,策略营销,数据资产等一系列业务(如下图所示)。
早期云积互动的大数据需求较少,起步也比较晚,2019 年才开始搭建基于 CDH 的大数据平台,因此大数据平台的主要目的是为了满足早期较为单一的 BI 数据看板及报表功能。近年来,随着业务量快速增长,数据量的增长,业务对数据的实时性及灵活性提出更高的要求,大数据平台也从早期的只需要满足单一的 BI 服务需求,扩展到需要支持各业务线,包含圈人服务,人群分析,AI 智能数据等多种业务需求。早期基于 CDH 的大数据平台已无法满足当前难度以及复杂度较高的的业务需求。
大数据平台的迭代
早期数仓架构
早期公司业务量较少,基于 Hive+Spark 构建的离线数仓即可满足早期大数据的需求。早期架构主要用于支持 BI 相关功能,数据大屏,自助报表等应用,大部分的指标仅要求 T+1 的指标,早期架构完全可以满足。
下图为云积互动早期数仓架构,早期的数据源主要为业务数据库 MySQL 以及日志,数据通过 Streamsets 实时采集数据并经 ETL 后传入 ODS 层,存储到 Kudu 中,通过 Impala 对 ODS 层的数据进行处理,实现实时查询业务的需求。通过 Hive 构建了离线数仓的 DWD、DWS 以及 DIM 层,使用 Spark 进行离线任务的计算与调度,最终处理并计算完成的数据输出到 MySQL 和 Kylin 中,应用于上层业务应用及分析。
存在的问题
查询效率低:使用 Impala 多表查询速度太慢,亿级别表 Join 时,查询时间基本上在 3 分钟以上,部分复杂查询会在超过 3 分钟左右判定超时;同时使用 Impala 并行查询对内存消耗较大,影响其他任务运行。
存储成本太高:使用多个系统存储数据( Hive,Hbase,Kudu),存储成本较高,随着业务量的增长,数据量指数级的增多,存储成本更是成倍数增加。
开发难度大:数仓开发基于代码,不能满足灵活的指标需求,当分析需求越来越多时,存在多个场景组合查询、自定义等查询场景,面对这样的场景,必须进行再开发,开发和时间成本都很高。
数据链路长:较长的链路使得数据的一致性很难保证,数据在某一环节出现问题,排查难度高,运维成本也会增加。
技术选型
基于业务对数据实时性及灵活性更高的要求,我们在 2021 年初对当前市面上较为流行的分析引擎 ClickHouse 和 Apache Doris 进行了调研,调研中我们发现, Apache Doris 具有高性能、简单易用、实现成本低等诸多优势。基于此,我们决定在部分业务上开始使用 Apache Doris,在使用的过程中逐渐发掘出 Apache Doris 更多强大之处以及优势,Apache Doris 在很多方面十分贴合我们的诉求,因此,我们决定在 2022 年全面应用 Apache Doris 在数据仓库中,基于 Apache Doris 构建云积互动企业级数仓,选择 Apache Doris 的主要原因如下:
该架构开发效率高,查询性能远高于 Hive
数仓 ETL 由原来的 Spark 任务改为 Doris SQL 任务,使用 SQL 开发模式可进行快速迭代,开发效率提升了近一倍
Doris 查询支持物化视图索引加速,Doris 在 1.0 版本开始引入了向量化引擎,性能提升 2~3 倍,平均查询耗时降低了 60%。
该架构对 OLAP 支持更好,支持更为灵活的查询。
Doris 支持 Cube 函数,实现了 Kylin 的多维计算功能,极大的减少了 SQL 的开发量。
Doris 支持 Bitmap 类型,可实现人群之间的快速交并差计算并落地新人群。
支持主键唯一和聚合模型的表,极大的减少了开发难度。
使用主键唯一模型可做到依据主键数据覆盖,自动实现数据更新功能。
使用聚合模型的表,减少了 ETL 过程中的 Join 操作,同时解决了上层数据到达时间不一致而导致的数据关联不上的问题。
新数仓架构
最开始使用新的数仓架构主要是要解决圈人速度慢的问题,圈人服务的核心在于人群圈选,通过 SQL 代码或标签取值组合等多种方式,实现人群查找,帮客户找到符合画像的人群,现在各行各业都会设计广告营销场景,其中也包括云积互动,而如何快速准确找到对的人推送广告就成了大数据场景需要解决的问题。当时我们只是在部分功能上使用了 Apache Doris,用 Apache Doris 替代了 Spark+Impala 来实现实时圈人功能,出乎意料的是,Apache Doris 投入使用之后效果极佳。新版架构的实时圈人业务平均每个任务时间由 3~5 分钟提升至 10 秒左右,并且在人群落地方面,使用存储更小的 Bitmap 代替原来的人群落地为表,不仅数据管理方便,而且磁盘空间占用减少了 80% 左右。
除此之外,在一段时间的使用和学习中我们发现 Apache Doris 丰富的功能和核心优势,综上原因,我们产生了用 Apache Doris 数仓替代 Hive 数仓的想法,并迅速的付诸于实践。
当确定数仓使用 Apache Doris 之后,结合当前的业务需求以及早期架构需要解决的问题,需要将多平台数据打通,构建统一数据口径和数据指标,我们将数据仓库构建分为:ODS 层,DWD 层,DWS 层和 ADS 层,下图为各个分层主要负责的数据类型。
新数仓的分层逻辑如下:
ODS 层:从业务侧、日志系统和埋点系统等拉取过来的数据,按照原字段名存入 ODS 层。
DWD 层:数据按照维度进行拆分,轻度聚合,将多个平台数据按照同一标准定义进行处理。
DWS 层:主要负责数据的聚合、数据宽表、基于维度的一些计算指标等等。值得注意的是,DWS 层中的部分表使用了 AGGREGATE KEY 模型, AGGREGATE KEY 模型可以提前聚合数据,适合报表和多维度业务,可以有效避免数据汇总时的 Join 操作,部分指标可以使用该表特性实现,无需敲代码,降低了开发成本。
ADS 层: 各业务模块根据各自的需求,将数据从 DWS 层汇聚数据指标到 ADS 层。
架构设计
数据接入
业务数据 MySQL 存储在多台 RDS 中,因 Binlog 的留存时间较短,且数据存放于多服务器,同时还进行了分库存储,因此如何接入历史数据以及如何同时接入多个库的数据成了棘手的问题。在调研过程中我们发现 FlinkCDC 可以完美解决上述问题:FlinkCDC 可以在接入历史数据之后自动切换为读取 Binlog,且 2.x 版本已经支持断点续传,支持水平扩展,支持动态添加表。
日志和埋点数据我们采用 Kafka + Doris Routine Load 导入方式,Routine Load 支持支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中,支持 Json 解析,并且可以做一些简单的 ETL,极大的减少了代码开发的工作。
数据加工
数据加工采用了 Doris SQL、Insert into 的方式将增量计算完的结果导入到数仓分层中(ODS\DWD\DWS\ADS)。因业务需求对数据的实时性允许存在一点延迟性,因此将 Dolphinscheduler 设置为每 5 分钟调度一次增量 SQL;同时设置数仓每一层错峰执行任务,避免任务堵塞。
对于数据量比较小的表可以用一个 SQL 完成导入,而对于数据量大的表,为避免 union,需要分成多个 insert into 来执行。但有些大表的逻辑是多个大表的 Join 结果,对于这种场景,我们应用 AGGREGATE KEY 模型的表来解决,利用表的聚合特性来代替 SQL 的 Join 操作。
任务调度
任务调度一直沿用 Dolphinscheduler,页面化的操作简单方便,且对 SQL 的支持友好,整个大数据平台的任务都是通过该调度器完成。目前使用了 2.x 以上的版本,支持使用钉钉报警和邮件报警的功能来监控任务,任务失败将通过钉钉或者邮件发送。
监控
使用 Prometheus+Grafana 对 Doris 集群和 Flink 任务进行监控管理,页面化的监控,极大的减少运维成本。
优化方案:
使用 Flink CDC 启动多个同步任务后,磁盘 IO 飙升导致查询延迟变高
对接多个数据源 CDC 任务同步数据时,中间数据写入 Kafka 进行合并和数据消峰,减少写入 Doris 的任务数。
调整 DorisSink 写入频率,控制每批次数据量。
优化部分表的分区分桶,降低数据分片数量。
Doris 1.1.0-rc05 版本偶发后台合并线程持续合并已删除的 Tablet, 合并持续失败且数据版本最多的那个 Tablet 的版本数量升高至 150 左右
使用元数据管理工具 meta_tool 删除已失效的 Tablet 元数据,版本数量显著下降,稳定在 30 左右
对大表的超过两年的数据做冷备份,减少大表的 Tablet 数,降低整个 Doris 集群对 Tablet 的管理压力。
Bitmap 存储散列用户 ID,使用 Bitmap 相关函数计算时性能较差
用户唯一 ID 通过字符串转换生成,基数大且非常稀疏;使用全局字典生成紧凑的 ID 代替,优化后性能提高近五倍
总结和收益:
Apache Doris 构建的离线+实时数仓一体化,采用 SQL 开发,并用 Dolphinscheduler 一键部署调度,极大的降低开发难度和开发工作量,可进行快速迭代以满足目前行业日益增长的数据需求。
新架构采用 Flink+Doris 的架构体系,FlinkCDC+StreamLoad 可以做到流批一体化数据接入,减少了组件的使用,解决了数据的冗余存储,服务器资源节省了 30%,数据存储磁盘占用减少 40%,同时组件的运维成本大大减少。
Doris 的易用性极高,支持 MySQL 协议和标准 SQL,各业务线均可通过查询 MySQL 的方式进行数据查询,极大的减少了学习成本。
从 2021 年 Apache Doris 上线云积互动的第一个业务至今,Apache Doris 在云积互动内部已成为大数据服务的基础,承担了包括人群分析、报表查询、指标计算等场景下的在线/离线需求,在较小的集群规模下支持了每天近 2 万次的用户在线分析查询。
未来规划
目前新的数据仓库已经建设完成,基于 Apache Doris 较多优异特性以及与业务需求较高吻合性,当前团队已经在着手搭建基于 Apache Doris 的数据质量管理和数据血缘,后续我们计划基于 Apache Doris 搭建数据指标体系。下面简单分享一下我们在数据质量管理和数据管理的实现想法。
数据质量管理
在 Apache Doris 的 ODS 层建表时设定一些非空主键,这些字段都是业务逻辑上的必须字段,当数据接入会给定一些默认值,这样就可以清晰的分类出这些数据,在质量分析中进行输出;在 ETL 中也会存在一些逻辑错误数据,这类数据会通过定时的 Doris SQL 脚本进行输出,同时也可以反馈到业务侧进行数据修复。
数据血缘
依托 Doris 提供的 SQL 审计功能,使用采集工具 Filebeat/Logstash 持续采集审计日志发送到 Kafka,使用开源的 SQL 解析工具或者抽取 Doris 的 SQL 解析模块针对 DDL 或者 DML 进行解析,解析后的数据存入图数据库或者关系型数据库供业务端展示;该功能的实现对于数据问题排查、数据资产管理均有意义。
围绕着 Apache Doris 为核心的数据平台建设目前也在一直迭代发展,当然在使用中也发现了该产品的一些需要优化的地方,但不可否认它优秀的性能和丰富的功能,后续我们也将持续不断地进行优化,将优化方案贡献给 Apache Doris 社区。
作者介绍:
王杰:云积互动大数据团队 leader,负责数据平台研发及数据治理。
蒙磊:云积互动大数据高级开发,负责数据平台研发和数仓开发。
评论