写点什么

快手 Druid 精确去重的设计和实现

  • 2019-05-27
  • 本文字数:6869 字

    阅读完需:约 23 分钟

快手 Druid 精确去重的设计和实现

本次分享内容提纲:


  • 快手 Druid 平台概览

  • Druid 精确去重功能设计

  • Druid 其他改进

  • 快手 Druid Roadmap

★ 快手 Druid 平台概览

1. 从业务需求角度考虑,快手为什么选择 Druid ?

快手的业务特点包括超大数据规模、毫秒级查询时延、高数据实时性要求、高并发查询、高稳定性以及较高的 Schema 灵活性要求;因此快手选择 Druid 平台作为底层架构。由于 Druid 原生不支持数据精确去重功能,而快手业务中会涉及到例如计费等场景,有精确去重的需求。因此,本文重点讲述如何在 Druid 平台中实现精确去重。另一方面,Druid 对外的接口是 json 形式 ( Druid 0.9 版本之后逐步支持 SQL ) ,对 SQL 并不友好,本文最后部分会简述 Druid 平台与 MySQL 交互方面做的一些改进。

2. 下面重点介绍快手 Druid 平台的架构:


  • 和大多数数据平台一样,快手的数据源主要有两种:基于 kafka 的实时数据源和建立在 hadoop 内的离线数据源。将这两种数据源分别通过 kafka index 和 hadoop index 导入到平台核心—— Druid 模型中。

  • 在 Druid 中,将数据进行业务隔离、冷热分层 ( 例如:冷数据放在硬盘,热数据放在 SSD )

  • 外部业务接口方面,支持通过 API 自行开发业务模块,支持通过 Kwai BI 做可视化;同时针对小部分人需求,现已支持 tableau ( tableau 通过 hive 连接 Druid ) 。

  • 数据平台辅助系统包括 metric 监控 ( 实时监控 CPU 占用率情况,进程情况,lag 信息等 ) 、探针系统 ( 用来查询,定义查询热度、维度等信息,有助于优化 ) 、管理系统 ( 在原生态 Druid 的 json 接口基础上定制管理系统,实现任务的可视化导入等管理操作 )

★ Druid 精确去重概述

1. 原生 Druid 去重功能支持情况

a. 维度列

  • cardinality agg,非精确,基于 hll 。查询时 hash 函数较耗费 CPU

  • 嵌套 group by,精确,耗费资源

  • 社区 DistinctCount 插件,精确,但是局限很大:

  • ① 仅支持单维度,构建时需要基于该维度做 hash partition

  • ② 不能跨 interval 进行计算

b. 指标列

  • HyperUniques/Sketch,非精确,基于 hll,摄入时做计算,相比 cardinality agg 性能更高


结论:Druid 缺乏一种支持预聚合、资源占用低、通用性强的精确去重支持。

2. 精确去重方案

a. 精确去重方案:hashset

使用 hashset 的方式存储,具备简单通用等优点,但是资源占用非常大。以下图所示为例,将左上角的原始数据使用 hashset 方法,Year 作为维度列,City 作为指标列;按照维度列分成 2 个 Segment,转换为右上角的数据格式;接下来将指标列聚合到 broker 中,计算出 size,即得到最终结果。整个过程如下图所示:



但是使用 hashset 的去重方式,其资源占用非常大;具体来说,假设有 5000W 条平均长度为 10B 的 string ( 共 500MB ) ,中间生成的 Hashset 会产生一系列链表结构,导致内存可达到 5G,即内存扩展可达 10 倍。因此,直接使用 hashset 的方法可以是完全不可行的。


在 hashset 方法基础上可以考虑一些优化的思路:例如,通过增加节点,将数据分散到不同机器上,查询时在每台机器上分别聚合进而求和;另一种方式类似于 MapReduce,计算前利用 shuffle 模型将数据打散。然而这两种优化思路和 Druid 平台属性有冲突,因此都不适用于 Druid 这个平台。

b. 精确去重方案:字典编码+Bitmap

Bitmap 方法,即用一个位来表示一个数据 ( 这是理论上的最小值 ) 。这种方案的思路是:对要去重的数据列做编码,将 string 或其他类型的数据转化成 int 型的编码,摄入到 Druid 后通过 Bitmap 的形式存储;查询时将多个 Bitmap 做交集,获取结果。Bitmap 的范围是 42 亿,最大占用空间 500M 左右;还可以使用压缩算法,使空间占用大大减少。


因此,字典编码+Bitmap 的方式,优点是存储和查询资源占用少,可以将构建和查询分开 ( 即字典不参与查询 ) ;缺点是:字典需要做全局编码,其构建过程较复杂。


考虑到 Kylin 曾使用过这种方案,因此快手的后续优化方案选择在这种方案的基础上进行优化。

3. 字典编码方案

a. 字典编码方案的使用

① Redis 方案


Redis 可以进行实时编码,可以同时支持离线任务和实时任务,因此,实时处理数据和离线处理数据可以使用相同的方案。


但是 Redis 的 ID 生成速度有瓶颈,最大 5W/s;查询上压力也较大,对交互会造成一定的麻烦;同时对 Redis 的稳定性要求也比较高。


② 离线 MR 方案


离线 MR 方案,利用 MR 的分布式存储,编码和查询的吞吐量更高,还用用更高的容错性。但是 MR 仅支持离线导入任务。


综合这两种字典编码方案的优劣,得到以下结论:


  • 离线任务使用离线 MR 编码,通过小时级任务解决实现准实时

  • 实时任务仅支持原始 int 去重 ( 无需编码 )

b. 字典编码方案模型

快手的字典编码方案参照的是 Kylin 的 AppendTrie 树模型,模型详见下图。



Trie 树模型主要是使用字符串做编码;快手支持各种类型数据,可将不同类型数据统一转换成字符串类型,再使用 Trie 树模型做编码。Trie 树模型可以实现 Append,有两种方式:


  • 按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。

  • 记录当前模型最大 ID,便于给新增节点分配 ID 。


但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。



为节省 CPU 资源,基于 guava LoadingCache,按需加载子树 ( 即懒加载 ) ,并使用 LRU 方法换出子树。


LRU ( Least Recently Used ) 是内存管理的一种置换算法,即内存不够时,将最近不常用的子树从内存清除,加载到磁盘上。LRU 算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。

c. 字典并发构建:

字典会存在并发构建的问题,分别使用 MVCC 以及 Zookeeper 分布式锁的方案。


① 使用 MVCC ( 持久化在 hdfs 上 ) ,在读取的时候会使用最新版本,拷贝到 working 中进行构建,构建完成生成新的版本号;如历史数据过多,会根据版本个数及 TTL 进行清理。


② 构建字典的过程中会操作临时目录,如果存在多个进程同时去写临时目录,会存在冲突问题;因此引入 Zookeeper 分布式锁,基于 DataSource 和列来做唯一的定位,从而保证同一个字典同时只能有一个进程。

d. 精确去重的实现

① 新增 unique 指标存储


  • 使用 ComplexMetricSerde 定义一个指标如何 Serde,即序列化与反序列化;

  • 使用 Aggregator,让用户定义一个指标如何执行聚合,即上卷;

  • 使用 Aggregator 的 Buffer 版本 BufferAggregator 作为 Aggregator 的替代品;

  • 使用 AggregatorFractory 定义指标的名称,获取具体的 Aggregator 。


② 精确去重的整体流程介绍



  • 使用 DetermineConfigurationJob 计算最终 Segment 分片情况。

  • 使用 BuildDictJob,Map 将同一去重列发送到一个 reducer 中 ( Map 端可先 combine ) ;每个 reducer 构建一列的全局字典;每列字典构建申请 ZK 锁。

  • IndexGeneratorJob,在 Map 中加载字典,将去重列编码成 int;Reducer 将 int 聚合成 Bitmap 存储;每一个 reducer 生成一个 Segment。


此套精确去重的使用方法:


  • 导入时,定义 unique 类型指标


  • 查询时,使用 unique 类型查询


e. 性能优化与测试

① 优化字典查询


· 存在一个超高基数列 ( UHC ) 的去重指标:在 IndexGenerator 之前增加 ClusterBy UHC 任务,保证每个 map 处理的 uhc 列数据有序,避免多次换入换出。


· 存在多个超高基数列的去重指标:拆成多个 DataSource,调大 IndexGenerator 任务的 map 内存,保证字典能加载到内存


② Bitmap 查询优化


减少 targetPartitionSize 或调大 numShards,增加 Segment 个数,提升并行度。


使用 Bitmap 做 or 计算时候,建议使用 batchOr 方法,而不是逐行 or,避免 I/O 与 or 计算交替,缓存持续被刷新导致性能降低。关于 batchOr 的选择,rolling_bitmap 提供一些接口,默认选择 naive_or ( 尽量延迟计算模型 ) ,通常情况下其性能较好;priorityqueue_or 使用堆排序,每次合并最小两个 bitmap,消耗更多内存,官方建议 benchmark 后使用;如果结果是较长的连续序列,可以选择手动按顺序依次 inplace or。


另一个思路是 Croaring-high performance low-level implementation


Rolling bitmap 用 C 语言实现了 Simd,相对于 java 版,Smid 版更多地利用了向量指令 ( avx2 ) ,性能提升了 80%。


快手将 Smid 以 JNI 的方式引入,对 100W 的 Bitmap 做 Or 操作,使用 Java 版本单线程操作,计算用时 13s;用 JNI 调用 Croaring,数据进行反序列化(需要 memcpy 用时 6.5s),计算消耗 7.5s,总用时 14s(慢于 java 版本)。Croaring 暂时不支持原地 inplace 反序列化 ( ImmutableRoaringBitmap ) ,导致其实际运行效率与官方称的 80%提升不一致。


③ 传输层编码


Broker 与 Historical 之间通过 http 协议交换数据,默认打开 gzip 压缩 ( 也可以选择不压缩,即 plain json ) 。测试发现 gzip 压缩的过程中会耗用大量 CPU,因此在万兆的网络下建议不压缩。



④ 对上述查询优化步骤依次性能测试:


8 个维度,10 亿基数的数据,选择列 author_id 作为其去重指标,摄入后达 150W 行;10 台 historical 机器:



未做优化去重,查询时间 50s;


增加 Segment 数量至 10,查询时间为 7s,提升约 7 倍性能;


将 Or 的策略设置为 BatchOr,查询时间为 4s;


关闭 gzip 压缩,查询时间为 2s。


以上是对 10 亿基数数据作去重的查询时间;如果数据基数在 1 亿以下,查询时间为毫秒级。

★ Druid 其他方面做的改进

1. 资源隔离部署方案


该方案为 Druid 官方推荐部署方案,充分利用 Druid 的分层特质性,根据业务,根据数据冷热,分到不同 proxy 上,保证各个业务相互不受影响。

2. 物化视图

物化视图是包括一个查询结果的数据库对象,可以将其理解为远程数据的的本地副本,或者用来生成基于数据表求和的汇总表,这些副本是只读的。


如果想修改本地副本,必须用高级复制的功能;如果想从一个表或视图中抽取数据时,可以用从物化视图中抽取。物化视图可以通过数据库的内部机制可以定期更新,将一些大的耗时的表连接用物化视图实现,会提高查询的效率。

a. 维度物化

Druid 社区 0.13 开始具备物化视图功能,社区实现了维度的物化。应用场景如下:原始数据有很高维度,而实际查询使用到的维度是原始维度的子集;因此不妨对原始维度做小的聚合 ( 类似 Kylin 中的 cube 和 cuboid ) ;对经常查询的部分维度做物化。


b. 时序物化

除了维度上的物化,还包括时序上的物化,例如将原始数据按照小时聚合、按照分钟聚合。


c. 物化效果


这里,主要关心物化膨胀率和物化命中率,两项指标会影响最终的查询效率。物化膨胀率,表示物化后的数据存储耗用资源情况;物化命中率,可以描述物化后与实际应用场景的匹配度。表格中,ds4 和 ds5 都做到了物化膨胀率很低,物化命中率很高,因此,查询效率可获得 7~9 倍的提升。

3. Historical 快速重启

快手数据平台硬件资源,大约是 12 块容量 2T 的 SATA 硬盘,数据量平均为 10W segments,数据占用 10T 空间,重启一次大约 40min。


Druid 中,默认重启的时候会加载全部数据;考虑到超过 10T 的元信息在启动时并不需要,可以将加载数据推迟到查询时候。因此,使用 Guava Suppliers.memoize 命令,延迟数据加载信息,只有在查询的时候才作列的加载。此过程通过一个参数 ( LazyLoad ) 控制。


( 此代码已提交 Druid 社区:druid.segmentCache.lazyLoadOnStart ( pr: 6988 ) )


优化后,重启过程只需要 2min,提升 20 倍。

4. Kafka Index 方面的改进

a. TaskCount 自动伸缩

Kafka index 任务数即 TaskCount 为固定,需要按照峰值任务数设定,这样导致在非高峰时刻会存在资源浪费。


这里实施的一个优化策略是:


  • KafkaSupervisor 增加 DynamicTaskCountNotice

  • 基于 task 的 cpu use & kafka lag 阈值,进行 25%增减 task count

  • 无需重启 supervisor


b. 精细化调度

Middle Manager 的 indexing task 资源分配从 slot 改成按照内存大小分配 ( 类似于 MapReduce 从 1.0 到 2.0 的改进 ) ,具体优化方法如下:


  • 区分 Kafka indexing task 和 Hadoop indexing task ( 一般 Hadoop indexing task 不需要占用内存 )

  • 允许在提交 task 时指定 task 内存大小。



使用这种方式优化,实时数据任务处理可节省超过 65%的内存占用,离线数据任务处理可节省超过 87%的内存占用。

5. 元数据交互

元数据 ( Metadata ),又称中介数据、中继数据,主要是描述数据属性 ( property ) 的信息,用来支持如指示存储位置、历史数据、资源查找、文件记录等功能,是关于数据的组织、数据域及其关系的信息,可以看作是一种电子式目录。简言之,元数据就是关于数据的数据。

a. Overlord 与 MySQL 交互优化

Overlord 节点负责接收任务,协调和分配任务,为任务创建锁,并返回任务状态给任务发送方,Overlord 有两种运行模式:本地模式或者远程模式 ( 默认本地模式 ) 。


Overlord 控制台可以查看等待的任务、运行的任务、可用的 worker,最近创建和结束的 worker 等。


Segment 是 Druid 中最基本的数据存储单元,采用列式的方式存储某一个时间间隔 ( interval ) 内某一个数据源 ( dataSource ) 的部分数据所对应的所有维度值、度量值、时间维度以及索引。


Segment 的逻辑名称结构为:


<dataSource><intervalStart><intervalEnd><version><partitionNum>


<dataSource> 表示数据源 ( 或表 ) ;<intervalStart> 和 <intervalEnd> 分别表示时间段的起止时间;<version> 表示版本号,用于区分多次加载同一数据对应的 Segment;<partitionNum> 表示分区编号 ( 在每个 interval 内,可能会有多个 partition )


Segments 在 HDFS 上的物理存储路径下包括两个文件:descriptor.json 和 index.zip 。前者记录的是 Segment 的描述文件(样例见下表),其内容也保存在 Druid 集群的元数据的 druid_segments 表中;index.zip 则是数据文件。



描述文件descriptor.json样例:{"dataSource": "AD_active_user","interval": "2018-04-01T00:00:00.000+08:00/2018-04-02T00:00:00.000+08:00","version": "2018-04-01T00:04:07.022+08:00","loadSpec": {"type": "hdfs","path": "/druid/segments/AD_active_user/20180401T000000.000+0800_20180402T000000.000+0800/2018-04-01T00_04_07.022+08_00/1/index.zip"},"dimensions": "appkey,spreadid,pkgid","metrics": "myMetrics,count,offsetHyperLogLog","shardSpec": {"type": "numbered","partitionNum": 1,"partitions": 0},"binaryVersion": 9,"size": 168627,"identifier": "AD_active_user_2018-04-01T00:00:00.000+08:00_2018-04-02T00:00:00.000+08:00_2018-04-01T00:04:07.022+08:00_1"}

复制代码


将 druid_segments 增加索引 ( dataSource,used,end ) ,查询时间从 10s 优化到 1s 。

b. Coordinator 与 MySQL 交互优化

Druid 的 Coordinator 节点主要负责 Segment 的管理和分发。具体的就是,Coordinator 会基于配置与历史节点通信加载或丢弃 Segment 。Druid Coordinator 负责加载新的 Segment,丢弃过期的 Segment,管理 Segment 的副本以及 Segment 的负载均衡。


Coordinator 会根据配置参数里的设置的事件定期的执行,每次执行具体的操作之前都会估算集群的状态。与 broker 节点和 historycal 节点类似的,Coordinator 节点也会和 Zookeeper 集群保持连接用于交互集群信息。同时,Coordinator 也会和保存着 Segment 和 rule 信息的数据源保持通信。可用的 Segment 会被存储在 Segment 的表中,并且列出了所有应该被集群加载的 Segment 。Rule 保存在 rule 的表中,指出了应该如何处理 Segment 。


① Segment 发现


将 Coordinator 中默认的全量扫描 druid_segments 表改成增加读取,druid_segments 添加索引 ( used,created_date ) ,查询时间从 1.7min 优化到 30ms 。


② 整个协调周期


  • Segment 之前使用 TreeSet 按时间排序确保优先 load 最近的 segment 现在优化成 ConcurrentHashSet,并分成最近一天和一天前的两个 set,先操作最近一天的;

  • Apply rules 的时候对 LoadRule 先判断集群副本和配置的是否一致,是的话就跳过,提升并发能力;

  • Cleanup 和 overshadow 两个操作合并;

  • 查询时间从 3min 优化到 30s 。

★ 快手 Druid Roadmap

1. 在线化

今年会实现多集群、高可用,着重保证在线业务的 SLA,同时线上业务也要着重权限的管理。

2. 易用性

实现对 SQL 的支持,同时快手的 BI 产品也会做相应的升级和优化。

3. 性能及优化 ( Druid 内核方面,与社区合作 )

  • 自适应的物化视图,智能化

  • 数值型索引

  • 向量化引擎


最后附上我们提交社区的代码:


https://github.com/apache/incubator-druid/pull/7594( 精确去重功能 )


https://github.com/apache/incubator-druid/pull/6988 ( historical 快速重启 )

作者介绍:

邓钫元,快手大数据架构团队研发工程师,毕业于浙江大学,曾就职于百度、贝壳,目前负责快手 Druid 平台研发工作,多年底层集群以及 OLAP 引擎研发、分布式系统的优化经验,热衷开源,为 hadoop / kylin / druid 等社区贡献代码。


本文来自 DataFun 社区


原文链接


https://mp.weixin.qq.com/s/jDW1sordtki-O5-tsVE94g


2019-05-27 08:0024238

评论

发布
暂无评论
发现更多内容

从草根到百万年薪C/C++程序员的二十年风雨之路

C语言技术网-码农有道

c++ 编程语言 C语言

早起实操手册

超超不会飞

效率 生活 自律

聊聊我对开源的理解

zygfengyuwuzu

开源

Netty 源码解析(六): Channel 的 register 操作

猿灯塔

Linux初学-01

Flychen

如何消除写作过程中的痛苦,让写作变成一种享受

董一凡

写作

DataGrip常用快捷键

fliter

LeetCode 153. Find Minimum in Rotated Sorted Array

隔壁小王

算法

LeetCode 565: Array Nesting

隔壁小王

算法

职场发展的思考

子不语

生涯规划 职业规划

中年危机,我们如何面对?

石云升

死磕Java并发编程(8):CurrentHashMap如何实现高效地线程安全?在Java8中有哪些设计实现的演进?

Seven七哥

Java Java并发 ConcurrentHashMap

部署Hexo博客到VPS

ini

零基础、非计算机相关专业的如何转型程序员

C语言技术网-码农有道

程序员 转型

从2009到2020,世界编程语言排行榜分析

C语言技术网-码农有道

编程语言

面试考试可用,十大排序算法

我不自豪谁志豪

学习 面试 算法

生活不奖赏心血来潮

池建强

个人成长 写作

NIO 看破也说破(二)—— Java 中的两种BIO

小眼睛聊技术

Java 学习 程序员 架构 编程语言

IT培训机构那些不得不说的事儿

C语言技术网-码农有道

IT培训机构

原创 | 类应该是匀称和均匀的

编程道与术

断章取义,不一样的C/C++语言的学习策略

C语言技术网-码农有道

C/C++

新人怎么寻求解决问题的方法

波波

编程 职场 新人

我们迫切需要块状时间

Neco.W

效率 时间分配 时间管理 工作效率 提升效率

Python 中怎样合并数据

张利东

Python

Centos的初步配置

玉龙BB

Docker Linux Docker-compose Centos 7

产品周刊 | 第 13 期(20200503)

八味阁

产品 设计 产品经理 产品设计

企业招聘的需求决定了C/C++程序员的学习方向

C语言技术网-码农有道

C/C++

原创 | 使用JUnit、AssertJ和Mockito编写单元测试和实践TDD (一)什么是单元测试

编程道与术

SpringIOC源码篇-Bean实例化-Spring如何选择类构造器(1)

申屠鹏会

Java Spring Boot

终端Terminal:程序员是如何查询天气预报的?

lmymirror

GitHub 工具 命令行 terminal 终端工具

1分钟理解M2M和IoT概念

老任物联网杂谈

物联网 M2M IoT

快手 Druid 精确去重的设计和实现_架构_DataFunTalk_InfoQ精选文章