写点什么

快手 Druid 精确去重的设计和实现

  • 2019-05-27
  • 本文字数:6869 字

    阅读完需:约 23 分钟

快手 Druid 精确去重的设计和实现

本次分享内容提纲:


  • 快手 Druid 平台概览

  • Druid 精确去重功能设计

  • Druid 其他改进

  • 快手 Druid Roadmap

★ 快手 Druid 平台概览

1. 从业务需求角度考虑,快手为什么选择 Druid ?

快手的业务特点包括超大数据规模、毫秒级查询时延、高数据实时性要求、高并发查询、高稳定性以及较高的 Schema 灵活性要求;因此快手选择 Druid 平台作为底层架构。由于 Druid 原生不支持数据精确去重功能,而快手业务中会涉及到例如计费等场景,有精确去重的需求。因此,本文重点讲述如何在 Druid 平台中实现精确去重。另一方面,Druid 对外的接口是 json 形式 ( Druid 0.9 版本之后逐步支持 SQL ) ,对 SQL 并不友好,本文最后部分会简述 Druid 平台与 MySQL 交互方面做的一些改进。

2. 下面重点介绍快手 Druid 平台的架构:


  • 和大多数数据平台一样,快手的数据源主要有两种:基于 kafka 的实时数据源和建立在 hadoop 内的离线数据源。将这两种数据源分别通过 kafka index 和 hadoop index 导入到平台核心—— Druid 模型中。

  • 在 Druid 中,将数据进行业务隔离、冷热分层 ( 例如:冷数据放在硬盘,热数据放在 SSD )

  • 外部业务接口方面,支持通过 API 自行开发业务模块,支持通过 Kwai BI 做可视化;同时针对小部分人需求,现已支持 tableau ( tableau 通过 hive 连接 Druid ) 。

  • 数据平台辅助系统包括 metric 监控 ( 实时监控 CPU 占用率情况,进程情况,lag 信息等 ) 、探针系统 ( 用来查询,定义查询热度、维度等信息,有助于优化 ) 、管理系统 ( 在原生态 Druid 的 json 接口基础上定制管理系统,实现任务的可视化导入等管理操作 )

★ Druid 精确去重概述

1. 原生 Druid 去重功能支持情况

a. 维度列

  • cardinality agg,非精确,基于 hll 。查询时 hash 函数较耗费 CPU

  • 嵌套 group by,精确,耗费资源

  • 社区 DistinctCount 插件,精确,但是局限很大:

  • ① 仅支持单维度,构建时需要基于该维度做 hash partition

  • ② 不能跨 interval 进行计算

b. 指标列

  • HyperUniques/Sketch,非精确,基于 hll,摄入时做计算,相比 cardinality agg 性能更高


结论:Druid 缺乏一种支持预聚合、资源占用低、通用性强的精确去重支持。

2. 精确去重方案

a. 精确去重方案:hashset

使用 hashset 的方式存储,具备简单通用等优点,但是资源占用非常大。以下图所示为例,将左上角的原始数据使用 hashset 方法,Year 作为维度列,City 作为指标列;按照维度列分成 2 个 Segment,转换为右上角的数据格式;接下来将指标列聚合到 broker 中,计算出 size,即得到最终结果。整个过程如下图所示:



但是使用 hashset 的去重方式,其资源占用非常大;具体来说,假设有 5000W 条平均长度为 10B 的 string ( 共 500MB ) ,中间生成的 Hashset 会产生一系列链表结构,导致内存可达到 5G,即内存扩展可达 10 倍。因此,直接使用 hashset 的方法可以是完全不可行的。


在 hashset 方法基础上可以考虑一些优化的思路:例如,通过增加节点,将数据分散到不同机器上,查询时在每台机器上分别聚合进而求和;另一种方式类似于 MapReduce,计算前利用 shuffle 模型将数据打散。然而这两种优化思路和 Druid 平台属性有冲突,因此都不适用于 Druid 这个平台。

b. 精确去重方案:字典编码+Bitmap

Bitmap 方法,即用一个位来表示一个数据 ( 这是理论上的最小值 ) 。这种方案的思路是:对要去重的数据列做编码,将 string 或其他类型的数据转化成 int 型的编码,摄入到 Druid 后通过 Bitmap 的形式存储;查询时将多个 Bitmap 做交集,获取结果。Bitmap 的范围是 42 亿,最大占用空间 500M 左右;还可以使用压缩算法,使空间占用大大减少。


因此,字典编码+Bitmap 的方式,优点是存储和查询资源占用少,可以将构建和查询分开 ( 即字典不参与查询 ) ;缺点是:字典需要做全局编码,其构建过程较复杂。


考虑到 Kylin 曾使用过这种方案,因此快手的后续优化方案选择在这种方案的基础上进行优化。

3. 字典编码方案

a. 字典编码方案的使用

① Redis 方案


Redis 可以进行实时编码,可以同时支持离线任务和实时任务,因此,实时处理数据和离线处理数据可以使用相同的方案。


但是 Redis 的 ID 生成速度有瓶颈,最大 5W/s;查询上压力也较大,对交互会造成一定的麻烦;同时对 Redis 的稳定性要求也比较高。


② 离线 MR 方案


离线 MR 方案,利用 MR 的分布式存储,编码和查询的吞吐量更高,还用用更高的容错性。但是 MR 仅支持离线导入任务。


综合这两种字典编码方案的优劣,得到以下结论:


  • 离线任务使用离线 MR 编码,通过小时级任务解决实现准实时

  • 实时任务仅支持原始 int 去重 ( 无需编码 )

b. 字典编码方案模型

快手的字典编码方案参照的是 Kylin 的 AppendTrie 树模型,模型详见下图。



Trie 树模型主要是使用字符串做编码;快手支持各种类型数据,可将不同类型数据统一转换成字符串类型,再使用 Trie 树模型做编码。Trie 树模型可以实现 Append,有两种方式:


  • 按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。

  • 记录当前模型最大 ID,便于给新增节点分配 ID 。


但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。



为节省 CPU 资源,基于 guava LoadingCache,按需加载子树 ( 即懒加载 ) ,并使用 LRU 方法换出子树。


LRU ( Least Recently Used ) 是内存管理的一种置换算法,即内存不够时,将最近不常用的子树从内存清除,加载到磁盘上。LRU 算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。

c. 字典并发构建:

字典会存在并发构建的问题,分别使用 MVCC 以及 Zookeeper 分布式锁的方案。


① 使用 MVCC ( 持久化在 hdfs 上 ) ,在读取的时候会使用最新版本,拷贝到 working 中进行构建,构建完成生成新的版本号;如历史数据过多,会根据版本个数及 TTL 进行清理。


② 构建字典的过程中会操作临时目录,如果存在多个进程同时去写临时目录,会存在冲突问题;因此引入 Zookeeper 分布式锁,基于 DataSource 和列来做唯一的定位,从而保证同一个字典同时只能有一个进程。

d. 精确去重的实现

① 新增 unique 指标存储


  • 使用 ComplexMetricSerde 定义一个指标如何 Serde,即序列化与反序列化;

  • 使用 Aggregator,让用户定义一个指标如何执行聚合,即上卷;

  • 使用 Aggregator 的 Buffer 版本 BufferAggregator 作为 Aggregator 的替代品;

  • 使用 AggregatorFractory 定义指标的名称,获取具体的 Aggregator 。


② 精确去重的整体流程介绍



  • 使用 DetermineConfigurationJob 计算最终 Segment 分片情况。

  • 使用 BuildDictJob,Map 将同一去重列发送到一个 reducer 中 ( Map 端可先 combine ) ;每个 reducer 构建一列的全局字典;每列字典构建申请 ZK 锁。

  • IndexGeneratorJob,在 Map 中加载字典,将去重列编码成 int;Reducer 将 int 聚合成 Bitmap 存储;每一个 reducer 生成一个 Segment。


此套精确去重的使用方法:


  • 导入时,定义 unique 类型指标


  • 查询时,使用 unique 类型查询


e. 性能优化与测试

① 优化字典查询


· 存在一个超高基数列 ( UHC ) 的去重指标:在 IndexGenerator 之前增加 ClusterBy UHC 任务,保证每个 map 处理的 uhc 列数据有序,避免多次换入换出。


· 存在多个超高基数列的去重指标:拆成多个 DataSource,调大 IndexGenerator 任务的 map 内存,保证字典能加载到内存


② Bitmap 查询优化


减少 targetPartitionSize 或调大 numShards,增加 Segment 个数,提升并行度。


使用 Bitmap 做 or 计算时候,建议使用 batchOr 方法,而不是逐行 or,避免 I/O 与 or 计算交替,缓存持续被刷新导致性能降低。关于 batchOr 的选择,rolling_bitmap 提供一些接口,默认选择 naive_or ( 尽量延迟计算模型 ) ,通常情况下其性能较好;priorityqueue_or 使用堆排序,每次合并最小两个 bitmap,消耗更多内存,官方建议 benchmark 后使用;如果结果是较长的连续序列,可以选择手动按顺序依次 inplace or。


另一个思路是 Croaring-high performance low-level implementation


Rolling bitmap 用 C 语言实现了 Simd,相对于 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了 80%。


快手将 Smid 以 JNI 的方式引入,对 100W 的 Bitmap 做 Or 操作,使用 Java 版本单线程操作,计算用时 13s;用 JNI 调用 Croaring,数据进行反序列化(需要 memcpy 用时 6.5s),计算消耗 7.5s,总用时 14s(慢于 java 版本)。Croaring 暂时不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,导致其实际运行效率与官方称的 80%提升不一致。


③ 传输层编码


Broker 与 Historical 之间通过 http 协议交换数据,默认打开 gzip 压缩 ( 也可以选择不压缩,即 plain json ) 。测试发现 gzip 压缩的过程中会耗用大量 CPU,因此在万兆的网络下建议不压缩。



④ 对上述查询优化步骤依次性能测试:


8 个维度,10 亿基数的数据,选择列 author_id 作为其去重指标,摄入后达 150W 行;10 台 historical 机器:



未做优化去重,查询时间 50s;


增加 Segment 数量至 10,查询时间为 7s,提升约 7 倍性能;


将 Or 的策略设置为 BatchOr,查询时间为 4s;


关闭 gzip 压缩,查询时间为 2s。


以上是对 10 亿基数数据作去重的查询时间;如果数据基数在 1 亿以下,查询时间为毫秒级。

★ Druid 其他方面做的改进

1. 资源隔离部署方案


该方案为 Druid 官方推荐部署方案,充分利用 Druid 的分层特质性,根据业务,根据数据冷热,分到不同 proxy 上,保证各个业务相互不受影响。

2. 物化视图

物化视图是包括一个查询结果的数据库对象,可以将其理解为远程数据的的本地副本,或者用来生成基于数据表求和的汇总表,这些副本是只读的。


如果想修改本地副本,必须用高级复制的功能;如果想从一个表或视图中抽取数据时,可以用从物化视图中抽取。物化视图可以通过数据库的内部机制可以定期更新,将一些大的耗时的表连接用物化视图实现,会提高查询的效率。

a. 维度物化

Druid 社区 0.13 开始具备物化视图功能,社区实现了维度的物化。应用场景如下:原始数据有很高维度,而实际查询使用到的维度是原始维度的子集;因此不妨对原始维度做小的聚合 ( 类似 Kylin 中的 cube 和 cuboid ) ;对经常查询的部分维度做物化。


b. 时序物化

除了维度上的物化,还包括时序上的物化,例如将原始数据按照小时聚合、按照分钟聚合。


c. 物化效果


这里,主要关心物化膨胀率和物化命中率,两项指标会影响最终的查询效率。物化膨胀率,表示物化后的数据存储耗用资源情况;物化命中率,可以描述物化后与实际应用场景的匹配度。表格中,ds4 和 ds5 都做到了物化膨胀率很低,物化命中率很高,因此,查询效率可获得 7~9 倍的提升。

3. Historical 快速重启

快手数据平台硬件资源,大约是 12 块容量 2T 的 SATA 硬盘,数据量平均为 10W segments,数据占用 10T 空间,重启一次大约 40min。


Druid 中,默认重启的时候会加载全部数据;考虑到超过 10T 的元信息在启动时并不需要,可以将加载数据推迟到查询时候。因此,使用 Guava Suppliers.memoize 命令,延迟数据加载信息,只有在查询的时候才作列的加载。此过程通过一个参数 ( LazyLoad ) 控制。


( 此代码已提交 Druid 社区:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )


优化后,重启过程只需要 2min,提升 20 倍。

4. Kafka Index 方面的改进

a. TaskCount 自动伸缩

Kafka index 任务数即 TaskCount 为固定,需要按照峰值任务数设定,这样导致在非高峰时刻会存在资源浪费。


这里实施的一个优化策略是:


  • KafkaSupervisor 增加 DynamicTaskCountNotice

  • 基于 task 的 cpu use & kafka lag 阈值,进行 25%增减 task count

  • 无需重启 supervisor


b. 精细化调度

Middle Manager 的 indexing task 资源分配从 slot 改成按照内存大小分配 ( 类似于 MapReduce 从 1.0 到 2.0 的改进 ) ,具体优化方法如下:


  • 区分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要占用内存 )

  • 允许在提交 task 时指定 task 内存大小。



使用这种方式优化,实时数据任务处理可节省超过 65%的内存占用,离线数据任务处理可节省超过 87%的内存占用。

5. 元数据交互

元数据 ( Metadata ),又称中介数据、中继数据,主要是描述数据属性 ( property ) 的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能,是关于数据的组织、数据域及其关系的信息,可以看作是一种电子式目录。简言之,元数据就是关于数据的数据。

a. Overlord 与 MySQL 交互优化

Overlord 节点负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方,Overlord 有两种运行模式:本地模式或者远程模式 ( 默认本地模式 ) 。


Overlord 控制台可以查看等待的任务、运行的任务、可用的 worker,最近创建和结束的 worker 等。


Segment 是 Druid 中最基本的数据存储单元,采用列式的方式存储某一个时间间隔 ( interval ) 内某一个数据源 ( dataSource ) 的部分数据所对应的所有维度值、度量值、时间维度以及索引。


Segment 的逻辑名称结构为:


<dataSource><intervalStart><intervalEnd><version><partitionNum>


<dataSource> 表示数据源 ( 或表 ) ;<intervalStart> 和 <intervalEnd> 分别表示时间段的起止时间;<version> 表示版本号,用于区分多次加载同一数据对应的 Segment;<partitionNum> 表示分区编号 ( 在每个 interval 内,可能会有多个 partition )


Segments 在 HDFS 上的物理存储路径下包括两个文件:descriptor.json 和 index.zip 。前者记录的是 Segment 的描述文件(样例见下表),其内容也保存在 Druid 集群的元数据的 druid_segments 表中;index.zip 则是数据文件。



描述文件descriptor.json样例:{"dataSource": "AD_active_user","interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00","version": "2018-04-01T00:04:07.022+08:00","loadSpec": {"type": "hdfs","path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"},"dimensions": "appkey,spreadid,pkgid","metrics": "myMetrics,count,offsetHyperLogLog","shardSpec": {"type": "numbered","partitionNum": 1,"partitions": 0},"binaryVersion": 9,"size": 168627,"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"}

复制代码


将 druid_segments 增加索引 ( dataSource,used,end ) ,查询时间从 10s 优化到 1s 。

b. Coordinator 与 MySQL 交互优化

Druid 的 Coordinator 节点主要负责 Segment 的管理和分发。具体的就是,Coordinator 会基于配置与历史节点通信加载或丢弃 Segment 。Druid Coordinator 负责加载新的 Segment,丢弃过期的 Segment,管理 Segment 的副本以及 Segment 的负载均衡。


Coordinator 会根据配置参数里的设置的事件定期的执行,每次执行具体的操作之前都会估算集群的状态。与 broker 节点和 historycal 节点类似的,Coordinator 节点也会和 Zookeeper 集群保持连接用于交互集群信息。同时,Coordinator 也会和保存着 Segment 和 rule 信息的数据源保持通信。可用的 Segment 会被存储在 Segment 的表中,并且列出了所有应该被集群加载的 Segment 。Rule 保存在 rule 的表中,指出了应该如何处理 Segment 。


① Segment 发现


将 Coordinator 中默认的全量扫描 druid_segments 表改成增加读取,druid_segments 添加索引 ( used,created_date ) ,查询时间从 1.7min 优化到 30ms 。


② 整个协调周期


  • Segment 之前使用 TreeSet 按时间排序确保优先 load 最近的 segment 现在优化成 ConcurrentHashSet,并分成最近一天和一天前的两个 set,先操作最近一天的;

  • Apply rules 的时候对 LoadRule 先判断集群副本和配置的是否一致,是的话就跳过,提升并发能力;

  • Cleanup 和 overshadow 两个操作合并;

  • 查询时间从 3min 优化到 30s 。

★ 快手 Druid Roadmap

1. 在线化

今年会实现多集群、高可用,着重保证在线业务的 SLA,同时线上业务也要着重权限的管理。

2. 易用性

实现对 SQL 的支持,同时快手的 BI 产品也会做相应的升级和优化。

3. 性能及优化 ( Druid 内核方面,与社区合作 )

  • 自适应的物化视图,智能化

  • 数值型索引

  • 向量化引擎


最后附上我们提交社区的代码:


https://github.com/apache/incubator-druid/pull/7594( 精确去重功能 )


https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重启 )

作者介绍:

邓钫元,快手大数据架构团队研发工程师,毕业于浙江大学,曾就职于百度、贝壳,目前负责快手 Druid 平台研发工作,多年底层集群以及 OLAP 引擎研发、分布式系统的优化经验,热衷开源,为 hadoop / kylin / druid 等社区贡献代码。


本文来自 DataFun 社区


原文链接


https://mp.weixin.qq.com/s/jDW1sordtki-O5-tsVE94g


2019-05-27 08:0024309

评论

发布
暂无评论
发现更多内容

2个动作,让研发效率提升120%,代码减少50%

云智慧AIOps社区

敏捷开发 代码优化 开发规范 研发提效 研发效率

大咖说|制造业发展趋势:“专精特新”与数字化转型

大咖说

阿里巴巴 阿里云 数字化 中制智库

教你Mac下终端配置iterm2+oh-my-zsh+powerlevel10k

锋享前端

Mac iterm2

服务器被入侵了?反手溯源出入侵者画像【网络安全】

H

黑客 网络安全

Apache SeaTunnel & Kyuubi 联合 Meetup | 见证中国大数据崛起!

Apache SeaTunnel

大数据 开源 大数据平台 apache 社区 Apache SeaTunnel

改进DevSecOps框架的 5 大关键技术

禅道项目管理

DevOps 敏捷 自动化

FinClip 黑客马拉松正式开赛,码力集结,等你来战!

Speedoooo

小程序生态 hackathon APP开发 黑客马拉松 黑客松

LigaAI完成A轮融资,加速打造全新的智能研发协作平台

LigaAI

行业资讯 智能 LigaAI A轮融资 研发协作平台

数据孤岛下的新破局 Real Time DaaS:面向 AP+TP 业务的数据平台架构

tapdata

数据中台 数据仓库 异构数据 Real Time DaaS DaaS

如何编写有效的常见问题解答(内附 5 个最佳示例)

小炮

4种常见分支模式解析及优劣对比 | 研发效能提升36计

阿里云云效

阿里云 云原生 研发团队 研发 分支管理

全网渗透率达80%!“耳朵经济”将成为当下市场的流行趋势

易观分析

耳朵经济 在线音频

企业如何快速地制作出电子产品宣传册?

小炮

2022 年了,还不了解 PWA ? 教你 VuePress 博客如何快速兼容 PWA

冴羽

JavaScript Vue 前端 vuepress PWA

OAuthApp H5 应用开发/云托管平台

unclewang

微服务 前端 .net core H5制作 SaaS平台

视频渲染靠cpu还是显卡 视频渲染的作用是什么

懒得勤快

【专访蓝景科技】5G+实时云渲染赋能数字孪生,共建元宇宙

3DCAT实时渲染

5G 数字孪生 实时云渲染

分库分表中间件的高可用实践讲解

Linux服务器开发

高可用 高并发 中间件 Linux服务器开发 Linux后台开发

项目启动 | 德荣医疗携手用友iuap共谱数字化转型新篇章

用友BIP

用友 用友iuap

基于 XuperChain 的区块链项目从 0 到 N(二)

刘旭东

区块链 XuperChain

天翼云基于 KubeEdge 的大规模 CDN 场景落地实践

华为云原生团队

开源 云原生 边缘计算 边缘技术 边缘云

Amazon Graviton2上数据压缩算法性能比较

亚马逊云科技 (Amazon Web Services)

数据 应用性能

ModStartCMS 模块化建站系统 Laravel 9.0 版 v3.3.0

ModStart开源

你可以不知道KFC疯狂星期四,但不能不知道InfoQ会员周!七天限时福利冲冲冲!

InfoQ写作社区官方

热门活动 InfoQ会员周

在线TOML转YAML工具

入门小站

工具

CRM系统帮助降低业务成本的方式

低代码小观

企业管理 CRM 企业管理系统 CRM系统 客户关系管理系统

三步教企业搭建产品帮助中心

小炮

Web 键盘输入法应用开发指南 (6) —— 开发实战(一)

天择

JavaScript 键盘 实战 输入法 3月月更

大画 Spark :: 网络(5)-Spark中的server端和client端

dclar

大数据 hadoop spark Spark 源码 大数据开发

“东数西算”超级工程上马,利好云计算但暗藏汹涌

行云管家

云计算 混合云 多云 东数西算

Linux之traceroute命令

入门小站

Linux

快手 Druid 精确去重的设计和实现_架构_DataFunTalk_InfoQ精选文章