写点什么

Druid Segment Balance 及其代价计算函数分析

  • 2020-03-18
  • 本文字数:5890 字

    阅读完需:约 19 分钟

Druid Segment Balance 及其代价计算函数分析

一、引言

Druid 的查询需要有实时和历史部分的 Segment,历史部分的 Segment 由 Historical 节点加载,所以加载的效率直接影响了查询的 RT(不考虑缓存)。查询通常需要指定一个时间范围[StartTime, EndTime],该时间范围的内所有 Segment 需要由 Historical 加载,最差的情况是所有 Segment 不幸都储存在一个节点上,加载无疑会很慢;最好的情况是 Segment 均匀分布在所有的节点上,并行加载提高效率。所以 Segment 在 Historical 集群中分布就变得极为重要,Druid 通过 Coordinator 的 Balance 策略协调 Segment 在集群中的分布。


本文将分析 Druid 的 Balance 策略、源码及其代价计算函数,本文使用 Druid 的版本是 0.12.0。

二、Balance 方法解析

2.1 Balance 相关的配置

Druid 目前有三种 Balance 算法: cachingCost, diskNormalized, Cost, 其中 cachingCost 是基于缓存的,diskNormalized 则是基于磁盘的 Balance 策略,本文不对前两种展开篇幅分析, Druid Coordinator 中开启 cost balance 的配置如下:


druid.coordinator.startDelay=PT30Sdruid.coordinator.period=PT30S 调度的时间druid.coordinator.balancer.strategy=cost 默认
动态配置:maxSegmentsToMove = 5 ##每次Balance最多移动多少个Segment
复制代码

2.2 Cost 算法概述

Cost 是 Druid 在 0.9.1 开始引入的,在 0.9.1 之前使用的 Balance 算法会存在 Segment 不能快速均衡,分布不均匀的情况,Cost 算法的核心思想是:当在做均衡的时候,随机选择一个 Segment(假设 Segment A ), 依次计算 Segment A 和 Historical 节点上的所有 Segment 的 Cost,选取 Cost 值最小的节点,然后到该节点上重新加载 Segment。


2.3 源码和流程图分析

以下会省略一些不必要的代码


DruidCoordinatorBalancer 类


@Overridepublic DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params){  final CoordinatorStats stats = new CoordinatorStats();  // 不同tier层的分开Balance  params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {   balanceTier(params, tier, servers, stats);  });  return params.buildFromExisting().withCoordinatorStats(stats).build();}
复制代码


DruidCoordinatorBalancer 类的 balanceTier 方法,主要是均衡入口函数


private void balanceTier(DruidCoordinatorRuntimeParams params, String tier, SortedSet<ServerHolder> servers,CoordinatorStats stats){  final BalancerStrategy strategy = params.getBalancerStrategy();  final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
//集群中只有一个 Historical 节点时不进行Balance if (serverHolderList.size() <= 1) { log.info("[%s]: One or fewer servers found. Cannot balance.", tier); return; }
int numSegments = 0; for (ServerHolder server : serverHolderList) { numSegments += server.getServer().getSegments().size(); }
if (numSegments == 0) { log.info("No segments found. Cannot balance."); return; } long unmoved = 0L; for (int iter = 0; iter < maxSegmentsToMove; iter++) { //通过随机算法选择一个候选Segment,该Segment会参与后面的Cost计算 final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { //找Cost最小的节点,Cost计算入口 final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); //找到候选节点,发起一次Move Segment的任务 if (holder != null) { moveSegment(segmentToMove, holder.getServer(), params); } else { ++unmoved; } } } ......}
复制代码


Reservoir 随机算法,随机选择一个 Segment 进行 Balance。Segment 被选中的概率:


public class ReservoirSegmentSampler{
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders) { final Random rand = new Random(); ServerHolder fromServerHolder = null; DataSegment proposalSegment = null; int numSoFar = 0;
//遍历所有List上的Historical节点 for (ServerHolder server : serverHolders) { //遍历一个Historical节点上所有的Segment for (DataSegment segment : server.getServer().getSegments().values()) { int randNum = rand.nextInt(numSoFar + 1); // w.p. 1 / (numSoFar+1), swap out the server and segment // 随机选出一个Segment,后面的会覆盖前面选中的,以最后一个被选中为止。 if (randNum == numSoFar) { fromServerHolder = server; proposalSegment = segment; } numSoFar++; } } if (fromServerHolder != null) { return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment); } else { return null; } }}
复制代码


继续调用到 CostBalancerStrategy 类的 findNewSegmentHomeBalancer 方法,其实就是找最合适的 Historical 节点:


@Overridepublic ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders){  return chooseBestServer(proposalSegment, serverHolders, true).rhs;}
protected Pair<Double, ServerHolder> chooseBestServer( final DataSegment proposalSegment, final Iterable<ServerHolder> serverHolders, final boolean includeCurrentServer){ Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
for (final ServerHolder server : serverHolders) { futures.add( exec.submit( new Callable<Pair<Double, ServerHolder>>() { @Override public Pair<Double, ServerHolder> call() throws Exception { //计算Cost:候选Segment和Historical节点上所有Segment的cost和 return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server); } } ) ); }
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures); final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>(); bestServers.add(bestServer); try { for (Pair<Double, ServerHolder> server : resultsFuture.get()) { if (server.lhs <= bestServers.get(0).lhs) { if (server.lhs < bestServers.get(0).lhs) { bestServers.clear(); } bestServers.add(server); } }
//Cost最小的如果有多个,随机选择一个 bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size())); } catch (Exception e) { log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); } return bestServer;}
protected double computeCost(final DataSegment proposalSegment, final ServerHolder server,final boolean includeCurrentServer){ final long proposalSegmentSize = proposalSegment.getSize();
// (optional) Don't include server if it is already serving segment if (!includeCurrentServer && server.isServingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; }
// Don't calculate cost if the server doesn't have enough space or is loading the segment if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { return Double.POSITIVE_INFINITY; }
// 初始cost为0 double cost = 0d;
//计算Cost:候选Segment和Historical节点上所有Segment的totalCost cost += computeJointSegmentsCost( proposalSegment, Iterables.filter( server.getServer().getSegments().values(), Predicates.not(Predicates.equalTo(proposalSegment)) ) );
// 需要加上和即将被加载的Segment之间的cost cost += computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad());
// 需要减掉和即将被加载的 Segment 之间的 cost cost -= computeJointSegmentsCost (proposalSegment, server.getPeon().getSegmentsMarkedToDrop());
return cost;}
复制代码


开始计算:


static double computeJointSegmentsCost(final DataSegment segment, final Iterable<DataSegment> segmentSet){  double totalCost = 0;  // 此处需要注意,当新增的Historical节点第一次上线的时候,segmentSet应该是空,所以totalCost=0最小  // 新增节点总会很快的被均衡  for (DataSegment s : segmentSet) {   totalCost += computeJointSegmentsCost(segment, s);  }  return totalCost;}
复制代码


进行一些处理:1)Segment 的 Interval 毫秒转换成 hour;2)先计算了带 lambda 的 x1, y0, y1 的值。


public static double computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB){  final Interval intervalA = segmentA.getInterval();  final Interval intervalB = segmentB.getInterval();
final double t0 = intervalA.getStartMillis(); final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR; //x1 final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR; //y0 final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR; //y1
// constant cost-multiplier for segments of the same datsource final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;}
复制代码


真正计算 cost 函数的值


public static double intervalCost(double x1, double y0, double y1){  if (x1 == 0 || y1 == y0) {   return 0;  }
// 保证Segment A开始时间小于B的开始时间 if (y0 < 0) { // swap X and Y double tmp = x1; x1 = y1 - y0; y1 = tmp - y0; y0 = -y0; }
if (y0 < x1) { // Segment A和B 时间有重叠的情况,这个分支暂时不分析 ....... } else { // 此处就是计算A和B两个Segment之间的cost,代价计算函数:See https://github.com/druid-io/druid/pull/2972 final double exy0 = FastMath.exp(x1 - y0); final double exy1 = FastMath.exp(x1 - y1); final double ey0 = FastMath.exp(0f - y0); final double ey1 = FastMath.exp(0f - y1);
return (ey1 - ey0) - (exy1 - exy0); }}
复制代码

2.4 代价计算函数分析

现在我们有 2 个 Segment, A 和 B,需要计算他们之间的代价,假设 A 的 start 和 end 时间都是小于 B 的。


2.4.1 Cost 函数介绍

Cost 函数的提出请参考 Druid PR2972(https://github.com/druid-io/druid/pull/2972):


其中 是 Cost 函数的半衰期


为了弄清楚这个 Cost 函数以及影响 Cost 值的因素?我们先使用一些常用的参数配置:


假设 1:Segment A 的 Interval 是 1 小时,即 , 得到:


$x_1 = \frac{(A_{end}-A_{start})log_e2}{24Hour} = \frac{log_e2}{24}$


假设 2:Segment B 的 Interval 也是 1 小时, 得到:


假设 3:Segment B 和 A start 时间相差了 t 个小时,得到:


$y_0 = \frac{tHourlog_e2}{24*Hour} = \frac{t}{24}*log_e2$


在实际的代码中, 的计算已经放到了

2.4.2 计算 Cost 函数


根据假设 2,得到:


继续简化,得到:


根据假设 1,得到:


根据假设 3,得到:


继续简化,得到:


2.4.5 小结

根据上诉 cost 函数化简的结果,当 Segment A 和 B 的 Interval 都是 1 小时的情况下:Segment A 和 B 时间相距越大 Cost 越小,它们就越可能共存在同一个 Historical 节点。这也和本文开始时候提出的时间相邻的 Segment 存储在不同的节点上让查询更快相呼应。

三、总结

Druid 的 balance 机制,主要解决 segments 数据在 history 节点的分布问题,这里的优化主要针对于查询做优化,一般情况下,用户的某一次查询针对的是一个时间范围内的多个 Segment 数据, cost 算法的核心思想是,尽可能打散 Segment 数据分布,这样在一次查询设计多个连续时间 Segment 数据的时候能够利用多台 history server 的并行处理能力,分散系统开销,缩短查询 RT.


2020-03-18 19:54632

评论

发布
暂无评论
发现更多内容

Jmeter高手进阶-脚本增强

伤心的辣条

Python 程序人生 软件测试 IT 自动化测试

为什么越来越多人选择自助式洗车

共享电单车厂家

自助洗车加盟 车白兔自助洗车 自助式洗车

重磅发布 | Serverless 应用中心:Serverless 应用全生命周期管理平台

阿里巴巴云原生

阿里云 Serverless 云原生 应用中心

妙!JMeter/Pytest/Ginkgo 和自建测试平台这样接入 Zadig

Zadig

DevOps 云原生 自动化测试 CI/CD

Apache Knox SSO 及在移动云 EMR 中的实践

移动云大数据

MapReduce服务

自建Gitlab迁移工具使用指南

阿里云云效

云计算 阿里云 gitlab 代码迁移 代码库

LSM树读写放大问题及KV分离技术解析

移动云大数据

HBase LSM树

Java中观察者模式与委托,还在傻傻分不清

华为云开发者联盟

Java 观察者模式 委托 事件执行者

2022年第1季度中国网络零售B2C市场交易规模达16988.5亿元

易观分析

网络零售

共享自助洗车多少钱一次?怎么收费

共享电单车厂家

自助洗车加盟 自助洗车多少钱一次 共享自助洗车多少钱 自助洗车怎么收费

天翼云电脑和企业安全“锁”了

天翼云开发者社区

“双碳”背后的硬核存储(上)

天翼云开发者社区

DevOps 向业务进阶,BizDevOps 要如何实现?

SoFlu软件机器人

模块一作业

joak

6元自助洗车既能省钱还能赚钱?

共享电单车厂家

自助洗车加盟 6元自助洗车 车白兔自助洗车

Java中的线程到底有哪些安全策略

华为云开发者联盟

Java 线程 高并发 线程安全 并发容器

模块一

Geek_2ce415

开源之夏IoTDB项目宣讲会落幕,你关心的问题这里都有

Apache IoTDB

时序数据库 Apache IoTDB 开源之夏

一图详解java-class类文件原理

华为云开发者联盟

Java JVM class 类文件

大前端技术的边界在哪里?

博文视点Broadview

安全感何止“亿”点 看云电脑如何保障企业云网安全

天翼云开发者社区

宜搭小技巧|海量数据管理难?这招帮你事半功倍

一只大光圈

钉钉宜搭

弱网优化,GCC 动态带宽评估算法(内附详细公式)

融云 RongCloud

通信系统 链路 网络管理

英特尔On产业创新峰会:脚踏实地挖掘每一分性能潜能,着眼未来保证PC产业可持续发展

科技新消息

Spring Cloud Alibaba 开源之夏,最后 7 天倒计时

阿里巴巴云原生

阿里云 云原生 spring cloud alibaba 开源之夏

GaussDB(DWS) NOT IN优化技术解密:排他分析场景400倍性能提升

华为云开发者联盟

数据库 GaussDB(DWS) 排他分析 NOT IN

AliAGC 自动增益控制算法:解决复杂场景下的音量问题

阿里云视频云

算法 3A 音频

加盟共享洗车多少钱?投入大吗?

共享电单车厂家

加盟共享洗车 自助洗车加盟费用

开放报名 | Serverless 技术进阶研读班,碎片时间提升技术新方式

阿里巴巴云原生

阿里云 Serverless 云原生 研读版 活动报名

最佳实践 | 用腾讯云AI人脸融合实现云毕业照推广活动小程序

牵着蜗牛去散步

腾讯 技术实践 腾讯云AI 人脸融合 云毕业照

“双碳”背后的硬核存储(下)

天翼云开发者社区

Druid Segment Balance 及其代价计算函数分析_文化 & 方法_zhaojiandong_InfoQ精选文章