腾讯游戏分析 iData 是国内领先的智能化数据服务平台,致力于为游戏提供一站式数据分析,上线至今覆盖了 567+款业务,涵盖 15 亿以上的游戏用户数据。精炼游戏多年经验核心指标,60 万个以上的可视化图表服务加专业化游戏多维分析,基于用户生命周期提供完整的数据化方案,实现数据驱动的精细化运营,对公司游戏运营提供了完善的数据化运营工具,帮助游戏更全面,准确,实时的分析数据。
腾讯 iData 分析中心是 iData 产品的重要组成部分之一,负责号码包提取、画像分析、工作分析等围绕用户号码包的数据分析功能。在长达几年的运营之后,针对运营中产生的一些问题和用户的新需求,我们意识到了旧系统的不足,开始打造新的分析中心后台。我们将以系列文章,围绕新分析中心后台 TGMars 的计算平台的方方面面,来探讨、介绍我们是如何思考、研发新分析中心的。
上篇文章《为什么要用 Spark?》中,我们探讨了在计算平台上的选型,为什么选择基于 Spark 构建计算平台,我们是如何使用 Spark 的,以及基于此又做了什么更多的工作。本文将探讨 TGSpark 如何通过 Spark 提供的扩展接口使得 Spark 可以加载 TGMars 上的数据,这将作为 TGMars 的计算引擎基础,我们会基于这套机制深入定制我们的需求,形成独有的 TGMars 计算能力。
一、为什么需要自定义的数据源?
很多计算引擎都有自定义数据源的能力;显然,即使各种大数据计算平台都已经默认支持 HDFS/S3/JDBC 等,但是不能排除这样一种情况:用户有自定义的数据需要和计算平台对接,或是旧系统的兼容,又可能是为了特定场景的设计,TGMars 正是后一种情况——我们需要为一种特定的场景优化存储。
TGMars 存储有什么特点?
具体的可见文章《新一代 tGmars 存储架构浅析及场景测试》,TGMars 存储是延续之前 iData 分析中心中位图存储进化而来的,除了该文章提及的,TGMars 存储解决了各种之前系统中缺失的数据容灾和数据迁移功能以外,还延续了之前设计的优点:
维护同业务下相同的数据表分片规则
维护数据文件的上传与分发
分片数据均匀分发到存储节点上
保障相同分片数据位于同一组机器
同一业务分片规则固定,且相同分片数据在同一机器,这使得计算节点加载数据时,相同业务(数据库)处理同一 id 的分片时,都可在同一个任务节点上计算完成,尽量不让数据出现 shuffle。
这也是延续之前我们在位图分析系统中系统的优势时提到的,是值得保留的特性,既然如此,在 TGSpark 中,我们必须也要让 Spark 能够适配这种特性,使得计算节点跟随存储分发,保证数据在本地读取与计算。
Spark 配套的默认存储则是 HDFS,它的存储逻辑则很不一样:文件按等长分块后分布到存储节点中,并不是数据预分片的。它也有自己的好处:计算部分不必过于关心数据节点本身,可以选择从网络直接加载;但同时又可以提供分块的节点信息,适配某些分布式计算系统的数据并行加载,是一种兼容性很高的设计。正是这样 HDFS 是很多大数据方案的首选存储,比如 CarbonData 虽然是自定义数据,也同样适配 Spark 计算,但是存储仍然是 HDFS,这样数据仍然是分块并行加载的。
不过既然我们确定了数据固定分片的原则,HDFS 自然是不可能采用的,这也是为什么要自定义数据源的缘故。
二、Spark 的 DataSource API V2
既然是用 DataSourceAPIV2 来自定义数据源,那么就有 DataSourceAPIV1 咯?
其实对于 Spark 来说,更早的是 CustomRDD 机制,比如 Spark 自带的 HadoopRDD 和 JdbcRDD,这是基于 RDD 的做法,在 1.x 的时代,还在大规模使用 RDD 的时候的首选。
不过在 2.x 时代,Spark 实际上渐渐雪藏了 RDD,而仅仅是作为底层的一种存储结构,DataFrame 进化成了 Dataset,并大规模重用:MLlib 已经被废弃,新的 ML 库基于 Dataset,取代了旧 MLlib;Dataset 支持以前的类 RDD 操作,照旧可以操作更低层的算子;Dataset 成为了新的 DataFrame,支持 SQL 操作,还有 Catalyst 和 Tungsten 的提携,性能提升十倍;相比旧 RDD 官方寥寥仅支持的 JDBC(难用还受不推荐)和 Hadoop 载入方式,Dataset 还支持多种数据源,使得数据的读取更加灵活。
官方爱 Dataset,甚至将 DataSourceAPI 赐给 Dataset,叫用它的人不仅仅能加载 HDFS 文件或 Hive 表,反能通过 DataSourceAPI 加载更多的数据!
不过 DataSourceAPI 也有自己的缺陷:
依赖 SQLContext,作为一个数据源 API 却依赖了高级接口,设计不合理;
不支持列存常见的列选取,如果是列式存储则需要侵入性的使用内部接口;
分片数据加载时候 Executor 和对应数据存放的位置不能很好的保证一致;
写数据不支持事务,不能通知被写入的数据源清理失败的任务;
数据源的扩展性较差。
因此现在官方推荐了 DataSourceAPIV2,虽然还是个不稳定的演进接口,但是整体的设计原则却较为固定,仍然可以一试。
三、SparkDataSourceAPIV2 的一些特性
DataSourceAPIV2 扩展性很好,其几个接口保证了它可以按照我们预想的方式加载到数据:
1. SupportsPushDownRequiredColumns
下推选列到数据源,使得数据源只提供必要的数据
2. SupportsPushDownFilters
下推过滤条件到数据源,使得数据源提前过滤部分数据
3. SupportsScanColumnarBatch
提供按列方式的数据加载,无需再把列式存储的数据转置为行
4. SupportsReportPartitioning
报告数据分片规则,让 Spark 进一步取消不必要的 Shuffle
5. InputPartition.preferredLocations
报告分片所在 ip,使得下发数据可以尽可能本地加载。
这样看下来,我们设立的原则,Spark 已经都考虑到了,1、2 保证选列和下推的实现,虽然旧 API 也有,但是 V2 的 API 更好用;3 则考虑到了流行的列存方式;4/5 则是其中的重点,保证本地计算的关键。
这其实得益于 Spark 一开始就存在的 Catalyst 优化器机制。SparkSQL 会根据数据的性质优化查询计划和物理计划。
可以简要的了解下 SparkSQL 是如何执行 SQL 语句的:
1.SQL 语句初步翻译为一个未绑定到数据源的逻辑计划,此时查询计划还没有绑定数据源
2.通过 Catalog 机制,查询计划被绑定到特定的库表和列上,进一步检查查询是否能够执行,形成逻辑计划
3.逻辑计划进过优化器优化,形成优化的查询计划
4.查询计划转变为一系列物理计划
5.物理计划通过对应的执行策略,转为实际动作,加载为 RDD(Dataset)
选列、下推和分片在生成物理计划前的执行策略中得以优化,支持选列、下推操作、分片报告时,扫表操作将只有选中的列输出,支持的下推则取消,分片规则符合条件的将去掉一层重分区/Shuffle 操作。Spark 具体是怎么实现的?那么等我介绍扩展点注入的时候再说。
本地加载机制则在物理计划执行时生效,它绑定到 RDD 的 preferredLocatin,在 Spark 内部的 TaskScheduler 生成任务时尽可能的去绑定到指定的机器上。
四、来做一些实验吧
写这篇文章的时候,基于 DataSourceAPIV2 的 TGMarsDataSource 已经开发的差不多了,因此我们可以来做一些实验验证下,我们提出的几个关键点是否已经都能实现。
我们这里会基于示例的表来看特定 SQL 下的查询计划来验证。
基础表描述如下:
首先看看列选择:
在这个例子里可以看到最终调用 ScanV2 任务扫表时,虽然优化的逻辑计划里面还有全表扫描操作,但是到最后的物理计划时,ScanV2 任务变成对应 Project 任务需要的,只对应列信息。
然后是过滤下推:
在这个例子里,where 条件包含两个条件 platid=0 和 level=10,这触发了过滤下推的支持,到最后的物理计划时,ScanV2 任务会带上两个 Filter,同时不再上层做多余的 Filter 操作。
最后再看一下分片规则的应用:
因为我们会根据主键列分片,因此输出包含主键的时候,distinct/group by 时可以跳过一层多余的 Exchange 操作,这个时候计算是数据和计算都在本地进行的,没有 Shuffle,而如果只有其他列,则不能实现本地计算,必须 Shuffle。这种方式使得我们可以同时兼容以前的计算方式,还能藉由 Spark 的能力支持更灵活的计算。
五、留个伏笔
我们借由 Spark 的 DataSource API V2 的实现我们的诸多目标。不过我们在这里再讨论下本地计算的问题:
如果我们支持本地计算,那么我们必须保证一个原则,有多少存储节点,对应下发多少 Executor,而且我们必须要保证 Executor 均匀的分布在各个节点上。这里我们是否可以确保这点呢?
实际上考虑这样的一个情况,我们有 8 台机器,但是由于任务资源的紧张,我们只有 4 个 Executor 可用,那么此时就有 4 个计算节点对应 8 个存储节点,这样必然有些数据不是本地加载的。
还有一种情况,如果我们有 8 台机器,但是只有 4 台机器有数据,那么又怎么保证计算节点准确的分发到了存储节点呢?
这个需要深入研究 Spark 的 Executor 分发机制,再考虑我们如何应对这个问题。
本文转载自公众号云加社区(ID:QcloudCommunity)。
原文链接:
https://mp.weixin.qq.com/s/caEOJ4V7Rs1Nc1MUggdPFg
评论