写点什么

EB 级计算平台调度系统伏羲 DAG 2.0 解析

  • 2020-05-22
  • 本文字数:14612 字

    阅读完需:约 48 分钟

EB级计算平台调度系统伏羲 DAG 2.0解析

前言

作为阿里巴巴核心大数据底座,伏羲调度和分布式执行系统,支撑着阿里集团内部以及阿里云上大数据平台绝大部分的大数据计算需求,在其上运行的 MaxCompute(ODPS) 以及 PAI 等多种计算引擎,每天为用户进行海量的数据运算。 在"阿里体量"的大数据生态中,伏羲系统管理着弹内外多个物理集群,超十万台物理机, 以及数百万的 CPU/GPU cores。每天运行在伏羲分布式平台上的作业数已经超过千万, 是业界少有的,单天处理 EB 级别数据分布式平台。其中单个作业规模已经高达数十万计算节点,管理着数百亿的边连接。在过去的十年中,阿里集团以及阿里云上这样的作业数目和规模,锤炼了伏羲分布式平台;与此同时,今天平台上作业的日益多样化,以及向前再发展的需求,对于伏羲系统架构的进一步演化,也都带来了巨大挑战与机遇。本文主要介绍一下在过去的两年多时间中,阿里巴巴伏羲团队对于整个核心调度与分布式执行系统的升级换代,code name DAG 2.0。

1. 背景

1.1 伏羲 DAG/AM 组件

从较高的层面来看整个分布式系统的体系架构,物理集群之上运行的分布式系统,大概可以分成资源管理,作业分布式调度执行,与多个计算节点的运行这三个层次,如同下图所示。通常所说的 DAG 组件,指的是每个分布式作业的中心管理点,也就是 application master (AM)。 AM 之所以经常被称为 DAG (Directional Acyclic Graph, 有向无环图) 组件,是因为 AM 最重要的责任,就是负责协调分布式作业的执行。而现代的分布式系统中的作业执行流程,通常可以通过 DAG 上面的调度以及数据流来描述[1]。相对于传统的 Map-Reduce[2]执行模式, DAG 的模型能对分布式作业做更精准的描述,也是当今各种主流大数据系统(Hadoop 2.0+, SPARK, FLINK, TENSORFLOW 等)的设计架构基础,区别只在于 DAG 的语义是透露给终端用户,还是计算引擎开发者。



与此同时,从整个分布式系统 stack 来看, AM 肩负着除了运行 DAG 以外更多的责任。作为作业的中心管控节点,向下其负责与 Resource Manager 之间的交互,为分布式作业申请计算资源;向上其负责与计算引擎进行交互,并将收集的信息反馈到 DAG 的执行过程中。作为唯一有能力对每一个分布式作业的执行大局有最精准的了解的组件,在全局上对 DAG 的运行做准确的管控和调整,也是 AM 的重要职责。从上图描述的分布式系统 stack 图中,我们也可以很直观的看出,AM 是系统中唯一需要和几乎所有分布式组件交互的组件,在作业的运行中起了重要的承上启下的作用。这一组件之前在伏羲系统中被称为 JobMaster(JM), 在本文中我们统一用 DAG 或者 AM 来指代。

1.2 逻辑图与物理图

分布式作业的 DAG,有两种层面上的表述:逻辑图与物理图。简单地来说(over-simplified),终端用户平时理解的 DAG 拓扑,大多数情况下描述的是逻辑图范畴:比如大家平时看到的 logview 图,虽然里面包含了一些物理信息(每个逻辑节点的并发度),但整体上可以认为描述的就是作业执行流程的逻辑图。



准确一点说:


  • 逻辑图描述了用户想要实现的数据处理流程,从数据库/SQL 的角度(其他类型引擎也都有类似之处,比如 TENSORFLOW) 来看,可以大体认为 DAG 的逻辑图,是对优化器执行计划的一个延续。

  • 物理图更多描述了执行计划映射到物理分布式集群的具体描述,体现的是执行计划被物化到分布式系统上,具备的一些特性:比如并发度,数据传输方式等等。



而每个逻辑图的"物理化",可以有很多等效方式。选择合适的方式来将逻辑图变成物理化执行,并进行灵活的调整,是 DAG 组件的重要职责之一。从上图的逻辑图到物理图的映射可以看到,一个图的物理化过程,实际上就是在回答一系列图节点以及各个连接边物理特性的问题,一旦这些问题得到确认,就能得到在分布式系统上实际执行物理图。

1.3. 为什么需要 DAG 2.0 架构升级?

作为从阿里云飞天系统创建伊始就开始研发的伏羲分布式作业执行框架,DAG 1.0 在在过去十年中支撑了阿里集团的大数据业务,在系统规模以及可靠性等方面都走在了业界领先。另外一方面,作为一个开发了十年的系统,虽然在这个期间不断的演进,DAG 1.0 在基本架构上秉承了比较明显的 Map-Reduce 执行框架的一些特点,逻辑图和物理图之间没有清晰的分层,这导致在这个基本架构上要继续向前走,支持更多 DAG 执行过程中的动态性,以及同时支持多种计算模式等方面,都比较困难。事实上今天在 MaxCompute SQL 线上,离线作业模式以及准实时作业模式(smode)两种执行模式,使用了两套完全分开的分布式执行框架,这也导致对于优化性能和优化系统资源使用之间的取舍,很多情况下只能走两个极端,而无法比较好的 tradeoff。


除此之外,随着 MaxCompute 以及 PAI 引擎的更新换代以及新功能演进,上层的分布式计算自身能力在不断的增强。对于 AM 组件在作业管理,DAG 执行等方面的动态性,灵活性等方面的需求也日益强烈。在这样的一个大的背景下,为了支撑计算平台下个 10 年的发展,伏羲团队启动了 DAG 2.0 的项目,将从代码和功能方面,完整替代 1.0 的 JobMaster 组件,实现完全的升级换代。在更好的支撑上层计算需求的同时,也同时对接伏羲团队在 shuffle 服务(shuffle service)上的升级,以及 fuxi master(Resource Manager) 的功能升级。与此同时,站在提供企业化服务的角度来看,一个好的分布式执行框架,除了支持阿里内部极致的大规模大吞吐作业之外,我们需要支持计算平台的向外走,支持云上各种规模和计算模式的需求。除了继续锤炼超大规模的系统扩展能力意外,我们需要降低大数据系统使用的门槛,通过系统本身的智能动态化能力,来提供自适应(各种数据规模以及处理模式)的大数据企业界服务,是 DAG 2.0 在设计架构中考虑的另一重要维度。


2. DAG 2.0 架构以及整体设计

DAG 2.0 项目,在调研了业界各个分布式系统(包括 SPARK/FLINK/Dryad/Tez/Tensorlow)DAG 组件之后,参考了 Dryad/Tez 的框架。新一代的架构上,通过逻辑图和物理图的清晰分层,可扩展的状态机管理,插件式的系统管理,以及基于事件驱动的调度策略等基座设计,实现了对计算平台上多种计算模式的统一管理,并更好的提供了作业执行过程中在不同层面上的动态调整能力。

2.1 作业执行的动态性

传统的分布式作业执行流程,作业的执行计划是在提交之前确定的。以 SQL 执行为例,一个 SQL 语句,在经过编译器和优化器后产生执行图,并被转换成分布式系统(伏羲)的执行计划。



这个作业流程在大数据系统中是比较标准的操作。然而在具体实现中,如果在 DAG 的执行缺乏自适应动态调整能力的话,整个执行计划都需要事先确定,会使得作业的运行没有太多动态调整的空间。放在 DAG 的逻辑图与物理图的背景中来说,这要求框架在运行作业前,必须事先了解作业逻辑和处理数据各种特性,并能够准确回答作业运行过程,各个节点和连接边的物理特性问题,来实现逻辑图往物理图的转换。


然而在现实情况中,许多物理特性相关的问题,在作业运行前是无法被感知的。以数据特性为例,一个分布式作业在运行前,能够获得的只有原始输入的一些特性(数据量等), 对于一个较深的 DAG 执行而言,这也就意味着只有根节点的物理计划(并发度选择等) 是相对合理的,而下游的节点和边的物理特性只能通过一些特定的规则来猜测。虽然在输入数据有丰富的 statistics 的前提下,优化器有可能可以将这些 statistics,与执行 plan 中的各个 operator 特性结合起来,进行一些适度的演算:从而推断在整个执行流程中,每一步产生的中间数据可能符合什么样的特性。但这种推断在实现上,尤其在面对阿里大体量的实际生产环境中,面临着巨大的挑战,例如:


实际输入数据的 statistics 的缺失:即便是 SQL 作业处理的结构化数据,也无法保证其源表数据特性拥有很好的统计。事实上今天因为数据落盘方式多样化,以及精细化统计方式的缺失,大部分的源表数据都是没有完整的 statistics 的。此外对于集群内部和外部需要处理的非结构化数据,数据的特性的统计更加困难。


分布式作业中存在的大量用户逻辑黑盒:作为一个通用的大数据处理系统,不可避免的需要支持用户逻辑在系统中的运行。比如 SQL 中常用的 UDF/UDTF/UDJ/Extractor/Outputer 等等,这些使用 Java/Python 实现的用户逻辑,计算引擎和分布式系统并无法理解,在整个作业流程中是类似黑盒的存在。以 MaxCompute 为例,线上有超过 20%的 SQL 作业,尤其是重点基线作业,都包含用户代码。这些大量用户代码的存在,也造成了优化器在很多情况下无法对中间产出数据的特性进行预判。


优化器预判错误代价昂贵:在优化器选择执行计划时,会有一些优化方法,在数据符合一定特殊特性的时候,被合理选中能带来性能优化。但是一旦选择的前提假设错误(比如数据特性不符合预期),会适得其反,甚至带来严重的性能回退或作业失败。在这种前提下,依据静态的信息实现进行过多的预测经常得不到理想的结果。


这种种原因造成的作业运行过程中的非确定性,要求一个好的分布式作业执行系统,需要能够根据中间运行结果的特点,来进行执行过程中的动态调整。因为只有在中间数据已经在执行过程中产生后,其数据特性才能被最准确的获得,动态性的缺失,可能带来一系列的线上问题,比如:


  • 物理资源的浪费:比如计算节点事先选择的资源类型的不合理,或者大量的计算被消耗用于处理后继会被丢弃的无效数据。

  • 作业的严重长尾:比如中间数据分布倾斜或不合理编排,导致一个 stage 上计算节点需要处理的数据量极端化。

  • 作业的不稳定:比如由于优化器静态计划的错判,导致不合理的执行计划无法完成

  • 而 DAG/AM 作为分布式作业唯一的中心节点和调度管控节点,是唯一有能力收集并聚合相关数据信息,并基于这些数据特性来做作业执行的动态调整,的分布式组件。这包括简单的物理执行图调整(比如动态的并发度调整),也包括复杂一点的调整比如对 shuffle 方式和数据编排方式重组。除此以外,数据的不同特点也会带来逻辑执行图调整的需求:对于逻辑图的动态调整,在分布式作业处理中是一个全新的方向,也是我们在 DAG 2.0 里面探索的新式解决方案。


点,边,图的清晰物理逻辑分层,和基于事件的数据收集和调度管理,以及插件式的功能实现,方便了 DAG 2.0 在运行期间的数据收集,以及使用这些数据来系统性地回答,逻辑图向物理图转化过程中需要确定的问题。从而在必要的时候实现物理图和逻辑图的双重动态性,对执行计划进行合理的调整。在下文中提到几个落地场景中,我们会进一步举例说明基于 2.0 的这种强动态性能力,实现更加自适应,更加高效的分布式作业的执行。

2.2 统一的 AM/DAG 执行框架

DAG 2.0 抽象分层的点,边,图架构上,也使其能通过对点和边上不同物理特性的描述,对接不同的计算模式。业界各种分布式数据处理引擎,包括 SPARK, FLINK, HIVE, SCOPE, TENSORFLOW 等等,其分布式执行框架的本源都可以归结于 Dryad[1]提出的 DAG 模型。我们认为对于图的抽象分层描述,将允许在同一个 DAG 系统中,对于离线/实时/流/渐进计算等多种模型都可以有一个好的描述。在 DAG 2.0 初步落地的过程中,首要目标是在同一套代码和架构系统上,统一当前伏羲平台上运行的几种计算模式,包括 MaxCompute 的离线作业,准实时作业,以及 PAI 平台上的 Tensorflow 作业和其他的非 SQL 类作业。对更多新颖计算模式的探索,也会有计划的分步骤进行。

2.2.1 统一的离线作业与准实时作业执行框架

首先我们来看平台上作业数占到绝大多数的 SQL 线离线作业(batch job)与准实时作业(smode)。前面提到过,由于种种历史原因,之前 MaxCompompute SQL 线的这两种模式的资源管理和作业执行,是搭建在两套完全分开的代码实现上的。这除了导致两套代码和功能无法复用以外,两种计算模式的非黑即白,使得彼此在资源利用率和执行性能之间无法 tradeoff。而在 2.0 的 DAG 模型上,我们实现了这两种计算模式比较自然的融合和统一,如下图所示:



在通过对逻辑节点和逻辑边上映射不同的物理特性,离线作业和准实时作业都能得到准确的描述:


  • 离线作业:每个节点按需去申请资源,一个逻辑节点代表一个调度单位;节点间连接边上传输的数据,通过落盘的方式来保证可靠性;

  • 准实时作业:整个作业的所有节点都统一在一个调度单位内进行 gang scheduling;节点间连接边上通过网络/内存直连传输数据,并利用数据 pipeline 来追求最优的性能。


今天在线上,离线模式因为其 on-demand 的资源申请以及中间数据落盘等特点,作业在资源利用率,规模性和稳定性方面都有明显的优势。而准实时模式则通过常驻的计算资源池以及 gang scheduling 这种 greedy 资源申请,降低了作业运行过程中的 overhead,并使得数据的 pipelined 传输处理成为可能,达到加速作业运行的效果,但其资源使用的特点,也使其无法在广泛范围内来支持大规模作业。DAG 2.0 的升级,不仅在同一套架构上统一了这两种计算模式,更重要的是这种统一的描述方式,使得探索离线作业高资源利用率,以及准实时作业的高性能之间的 tradeoff 成为可能:当调度单位可以自由调整,就可以实现一种全新的混合的计算模式,我们称之为 Bubble 执行模式。



这种混合 Bubble 模式,使得 DAG 的用户,也就是上层计算引擎的开发者(比如 MaxCompute 的优化器),能够结合执行计划的特点,以及引擎终端用户对资源使用和性能的敏感度,来灵活选择在执行计划中切出 Bubble 子图。在 Bubble 内部充分利用网络直连和计算节点预热等方式提升性能,没有切入 Bubble 的节点则依然通过传统离线作业模式运行。回过头来看,现有的离线作业模式和准实时作业模式,分别可以被描述成 Bubble 执行模式的两个极端特例,而在统一的新模型之上,计算引擎和执行框架可以在两个极端之间,根据具体需要,选择不同的平衡点,典型的几个应用场景包括:


  • Greedy Bubble:在可用的资源(集群规模,quota 等)受限,一个大规模作业无法实现 gang scheduling 时,如果用户对资源利用率不敏感,唯一的目标是尽快跑完一个大规模作业。这种情况下,可以实现基于可用计算节点数目,实施 greedy 的 bubble 切割的策略, 尽量切出大的 bubble。

  • Efficient Bubble:在作业的运行过程中,节点间的运算可能存在天然的 barrier (比如 sort 运算, 建 hash 表等等)。如果把两个通过 barrier 边连接的节点切到一个 bubble 中,虽然作业 e2e 性能上还是会有调度 overhead 降低等带来的提升,但是因为数据无法完全 pipeline 起来,资源的利用率达不到最高。 那么在对资源的利用率较为敏感时,可以避免 bubble 内部出现 barrier 边。这同样是计算引擎可以根据执行计划做出决定的。


这里只列举了两个简单的策略,其中还有更多可以细化以及针对性优化的地方。在不同的场景上,通过 DAG 层面提供的这种灵活按照 bubble 执行计算的能力,允许上层计算可以在不同场景上挑选合适的策略,更好的支持各种不同计算的需求。

2.2.2 支持新型计算模式的描述

1.0 的执行框架的底层设计受 Map-Reduce 模式的影响较深,节点之间的边连接,同时混合了调度顺序,运行顺序,以及数据流动的多种语义。通过一条边连接的两个节点,下游节点必须在上游节点运行结束,退出,并产生数据后才能被调度。这种描述对于新型的一些计算模式并不适用。比如对于 Parameter Server 计算模式,Parameter Server(PS)与 Worker 在运行过程中有如下特点:


  • PS 作为 parameter 的 serving entity, 可以独立运行

  • Worker 作为 parameter 的 consumer 和 updater, 需要 PS 在运行后才能有效的运行,并且在运行过程中需要和 PS 持续的进行数据交互


这种运行模式下,PS 和 worker 之间天然存在着调度上的前后依赖关系。但是因为 PS 与 worker 必须同时运行,不存在 PS 先退出 worker 才调度的逻辑。所以在 1.0 框架上, PS 与 worker 只能作为两个孤立无联系的 stage 来分开调度和运行。此外所有 PS 与 worker 之间,也只能完全通过计算节点间直连通讯,以及在外部 entity (比如 zookeeper 或 nuwa)协助来进行沟通与协调。这导致 AM/DAG 作为中心管理节点作用的缺失,作业的管理基本被下放计算引擎上,由计算节点之间自行试图协调来完成。这种无中心化的管理,对稍微复杂的情况下(failover 等)无法很好的处理。


在 DAG 2.0 的框架上,为了更准确的描述节点之间的调度和运行关系,引入并且实现了 concurrent edge 的概念:通过 concurrent edge 连接的上下游节点,在调度上存在先后,但是可以同时运行。而调度的时机也可以灵活配置:可以上下游同步调度,也可以在上游运行到一定程度后,通过事件来触发下游的调度。在这种灵活的描述能力上,PS 作业可以通过如下这种 DAG 来描述,这不仅使得作业节点间的关系描述更加准确,而且使得 AM 能够理解作业的拓扑,进行更加有效的作业管理,包括在不同计算节点发生 failover 时不同的处理策略等。



此外,DAG 2.0 新的描述模型,也允许 PAI 平台上的 Tensorflow/PS 作业实现更多的动态优化,并进行新的创新性工作。在上图的 dynamic PS DAG 中,就引进了一个额外的 control 节点,这一节点可以在作业运行过程中(包括 PS workload 运行之前和之后),对作业的资源申请,并发度等进行动态的调整,确保作业的优化执行。


事实上 concurrent edge 这个概念,描述的是上下游节点运行/调度时机的物理特性,也是我们在清晰的逻辑物理分层的架构上实现的一个重要扩展。不仅对于 PS 作业模式,在之前描述过的对于通过 bubble 来统一离线与准实时作业计算模式,这个概念也有重要的作用。

3. DAG 2.0 与上层计算引擎的集成

DAG 2.0 作为计算平台的分布式运行基座,它的升级换代,为上层的各种计算引擎提供了更多灵活高效的执行能力,而这些能力的落地,需要通过与具体计算场景的紧密结合来实现。接下来通过 2.0 与上层各个计算引擎(包括 MaxCompute 以及 PAI 平台等)的一些对接场景,具体举例说明 2.0 新的调度执行框架,如何赋能平台上层的计算与应用。

3.1 运行过程中的 DAG 动态调整

作为计算平台上的作业大户,MaxCompute 平台上多种多样的计算场景,尤其是离线作业中的各种复杂逻辑,为动态图能力的落地提供了丰富多样的场景,这里从动态物理图和逻辑图几个方面讨论几个例子。

3.1.1 动态并发度调整

基于作业运行期间中间数据大小进行动态并发度调整,是 DAG 动态调整中最基本的能力。以传统 MR 作业为例,对于一个静态 MR 作业而言,能根据读取数据量来比较准确判断 Mapper 的并发,但是对于 Reducer 的并发只能简单推测,比如下图中对于处理 1TB 的 MR 作业而言,提交作业时,只能根据 Mapper 1000 并发,来猜测给出 500 的 Reducer 并发度,而如果数据在 Mapper 经过大量过滤导致最终之产出 10MB 中间数据时,500 并发度 Redcuer 显然是非常浪费的,动态的 DAG 必须能够根据实际的 Mapper 产出来进行 Reducer 并发调整(500->1)。



而实际实现中,最简单的动态调整,会直接按照并发度调整比例来聚合上游输出的 partition 数据,如下图这个并发度从 10 调整到 5 的例子所示,在调整的过程中,可能产生不必要的数据倾斜。



DAG 2.0 基于中间数据的动态并发调整实现,充分考虑了数据 partition 可能存在倾斜的情况,对动态调整的策略进行了优化,使得动态调整的策略后数据的分布更加均匀,可以有效避免由于动态调整可能引入的数据倾斜。



这种最常见下游并发调整方式是 DAG 2.0 动态物理图能力的一个直观展示。在 2.0 中项目中,结合计算引擎的数据处理的特点,还探索了基于源数据的动态并发调整。例如对于最常见的两个原表数据的 join (M1 join M2 at J), 如果用节点大小来表示其处理数据的的多少,那对于下图这么一个例子,M1 处理的是中等的一个数据表(假设 M1 需要并发度为 10),M2 处理的是较大的数据表(并发度为 1000),naïve 的执行方式会将按照 10 + 1000 的并发度调度,同时因为 M2 输出需要全量 shuffle 到 J, J 需要的并发度也会较大 (~1000).



而实际上,对于这种计算 pattern 而言,M2 需要读取(并进行处理)的,应该只有能和 M1 的输出 join 得上的数据,也就是说在考虑了整体执行 cost 后,在这种 M1 期望的输出数据要比 M2 小的多的情况下,可以先行调度 M1 完成计算,将 M1 输出数据的 statistics 在 AM/DAG 端进行聚合,然后只挑选出 M2 的有效数据进行处理。这里"M2 的有效数据"的选择本质上是一个 predicate push down 的过程,可以由计算引擎的优化器和运行时联合进行判断。也就是说,这种情况下 M2 的并发度调整,是和上层计算紧密结合的。


一个最直观的例子是,如果 M2 是一个 1000 个分区的分区表,并且分区的 key 和 join 的 key 相同,那么可以只读取 M2 能和 M1 输出 join 上的有效数据的 partition 进行读取处理。假如 M1 的输出只包含了 M2 原表数据的 3 个 partition keys, 那么在 M2 就只需要调度 3 个计算节点来处理这 3 个分区的数据。也就是说 M2 的并发度从默认的 1000,可以降低到 3,这在保证同样的逻辑计算等效性与正确性的前提下,能大大降低计算资源的消耗,并数倍加速作业的运行。这里的优化来自与几个方面:


  • M2 的并发度(1000->3)以及处理的数据量大大降低

  • M2 需要 shuffle 到 J 的数据量以及 shuffle 需要的计算量大大降低

  • J 需要处理的数据量以及其并发度能大大降低


从上图这个例子中我们也可以看到,为了保证 M1->M2 的调度顺序上,DAG 中在 M1 和 M2 间引入了一条依赖边,而这条边上是没有数据流动的,是一条只表示执行先后的依赖边。这与传统 MR/DAG 执行框架里,边的连接与数据流动紧绑定的假设也有不同,是在 DAG 2.0 中对于边概念的一个拓展之一。


DAG 执行引擎作为底层分布式调度执行框架,其直接的对接"用户" 是上层计算引擎的开发团队,其升级对于终端用户除了性能上的提升,直接的体感可能会少一点。这里我们举一个终端用户体感较强的具体例子,来展示 DAG 更加动态的执行能力,能够给终端用户带来的直接好处。就是在 DAG 动态能力的基础上,实现的 LIMIT 的优化。


对于 SQL 用户来说,对数据进行一些基本的 at hoc 操作,了解数据表的特性,一个非常常见的操作是 LIMIT,比如:


SELECT * FROM tpch_lineitem WHERE l_orderkey > 0 LIMIT 5;
复制代码


在分布式执行框架上,这个操作对应的执行计划,是通过将源表做切分后,然后调度起所需数目的 mapper 去读取全部数据,再将 mapper 的输出汇总到 reducer 后去做最后的 LIMIT 截断操作。假设源表(这里的 tpch_lineitem)是一个很大的表,需要 1000 个 mapper 才能读取,那么在整个分布式执行过程中,涉及的调度代价就是要调度 1000 mapper + 1 reducer。这个过程中会有一些上层计算引擎可以优化的地方,比如每个 mapper 可以最多输出 LIMIT 需要的 record 数目(这里的 LIMIT 5)提前退出,而不必处理完所有分配给它的数据分片等等。但是在一个静态的执行框架上,为了获取这样简单的信息,整体 1001 个计算节点的调度无法避免。这给这种 ad hoc query 执行,带来了巨大的 overhead, 在集群资源紧张的时候尤其明显。


DAG 2.0 上, 针对这种 LIMIT 的场景,依托新执行框架的动态能力,实现了一些优化,这主要包括几方面:


  • 上游 Exponential start: 对于这种大概率下上游 mapper 计算节点不需要全部运行的情况,DAG 框架将对 mapper 进行指数型的分批调度,也就是调度按照 1, 10 … FULL 的分批执行

  • 下游的 Early scheduling: 上游产生的 record 数目作为执行过程中的统计数据上报给 AM, AM 在判断上游已经产生足够的 record 条数后,则提前调度下游 reducer 来消费上游的数据。

  • 上游的 Early termination: 下游 reducer 在判断最终输出的 LIMIT 条数已经满足条件后,直接退出。这时候 AM 可以触发上游 mapper 整个逻辑节点的提前退出(在这种情况下,大部分 mapper 可能都还没有调度起来),整个作业也能提前完成。


这种计算引擎和 DAG 在执行过程中的灵活动态交互,能够带来大量的资源节省,以及加速作业的执行。在线下测试和实际上线效果上,基本上绝大多数作业在 mapper 执行完 1 个计算节点后就能提前退出,而无需全量调起(1000 vs 1)。


下图是在线下测试中,当 mapper 并发为 4000 时,上述 query 优化前后的区别:



可以看到,执行时间优化后增速了 5X+, 计算资源的消耗更是减小了数百倍。


这个线下测试结果作为比较典型的例子,稍微有些理想化。为了评估真实的效果,在 DAG 2.0 上线后,选取了 LIMIT 优化生效的线上作业,统计了一星期结果如下:这个优化平均为每个作业节省了(254.5 cores x min CPU + 207.3 GB x min) 的计算资源,同时每个作业上,平均能节省 4349 个(无效)计算节点的调度。


LIMIT 执行上的改进,作为一个针对特殊场景上实现的优化,涉及了整个 DAG 执行不同策略的调整,这种细化的改进能力,能更直观的体现 DAG 2.0 架构升级诸多好处:灵活的架构使得 DAG 的执行中拥有了更多的动态调整能力,也能和计算引擎在一起进行更多有针对性的优化。


不同情况下的动态并发度调整,以及具体调度执行策略的动态调整,只是图的物理特性动态调整的几个例子。事实上对于物理特性运行时的调整,在 2.0 的框架之上有各种各样的应用,比如通过动态数据编排/shuffle 来解决各种运行期间的 skew 问题等,这里不再做进一步的展开。接下来我们再来看看 DAG 2.0 上对于逻辑图的动态调整做的一些探索。

3.1.2 动态逻辑图的调整

分布式 SQL 中,map join 是一个比较常见的优化,其实现原理是在 join 的两个表中,如果有一个超小的表(可以 fit 到单个计算节点的内存中),那对于这个超小表可以不做 shuffle,而是直接将其全量数据 broadcast 到每个处理大表的分布式计算节点上。通过在内存中直接建立 hash 表,完成 join 操作。map join 优化能大量减少 (大表) shuffle 和排序,非常明显的提升作业运行性能。但是其局限性也同样显著:如果"超小表"实际不小,无法 fit 进单机内存,那么在试图建立内存中的 hash 表时就会因为 OOM 而导致整个分布式作业的失败,而需要重跑。所以虽然 map join 在正确使用时,可以带来较大的性能提升,但实际上优化器在产生 map join 的 plan 时需要偏保守,很多情况下需要用户显式的提供 map join hint 来产生这种优化。此外不管是用户还是优化器的选择,对于非源表的输入都无法做很好的判断,因为中间数据的大小往往需要在作业运行过程中才能准确得知。



而 map join 与默认 join 方式(sorted merge join)对应的其实是两种不同优化器执行计划,在 DAG 层面,其对应的是两种不同的逻辑图。要支持这种运行过程中根据中间数据特性的动态优化,就需要 DAG 框架具备动态逻辑图的执行能力,这也是在 DAG 2.0 上开发的 conditional join 功能。如同下图展示,在对于 join 使用的算法无法被事先确定的时候,允许优化器提供一个 conditional DAG,这样的 DAG 同时包括使用两种不同 join 的方式对应的不同执行计划支路。在实际执行时,AM 根据上游产出数据量,动态选择一条支路执行(plan A or plan B)。这样子的动态逻辑图执行流程,能够保证每次作业运行时都能根据实际作业数据特性,选择最优的执行计划。



conditional join 是动态逻辑图的第一个落地场景,在线上选择一批适用性作业,动态的 conditional join 相比静态的执行计划,整体获得了将近 3X 的性能提升。


3.2 混合 Bubble 模式

Bubble 模式是我们在 DAG 2.0 架构上探索的一种全新的作业运行方式,通过对于 bubble 大小以及位置的调整,可以获取性能和资源利用率的不同 tradeoff 点。这里通过一些更加直观的例子,来帮助大家理解 Bubble 执行在分布式作业中的实际应用。



在上图的 TPCH Q21 上。比如在 Q21 上,我们看到了通过将作业被切分为三个"bubble",数据能够有效的在节点之间 pipeline 起来,并且通过热点节点实现调度的加速。最终消耗的资源数(cpu * time) 是准实时作业的 35%, 而性能则与一体化调度的准实时作业非常相近(96%), 比离线作业性能提升 70%左右。


在标准 TPCH 1TB 全量测试中,混合 bubble 模式体现出了相比离线和准实时的一体化模式(gang scheduling)更好的资源/性能 tradeoff。选用 Greedy Bubble(size = 500)的策略,bubble 相比离线作业性能提升了 2X(资源消耗仅增加 17%,具体数值略)。同时与一体化调度的准实时作业比较,bubble 执行在只消耗了 40%不到的资源(cpu * time) 的前提下,其性能达到了准实时作业的 85%(具体数值略)。可以看到,这种新型的 bubble 执行模式,允许我们在实际应用中获取很好的性能与资源的平衡,达到系统资源有效的利用。Bubble 执行模式目前正在阿里集团内部全量上线中,我们在实际线上的作业也看到了与 TPCH 测试非常相似的效果。



如同之前所述,混合 bubble 模式支持了不同切分策略,这里提供的只是一种切分策略上的效果。在与上层计算引擎(e.g., MaxCompute 优化器)紧密结合时,这种 DAG 分布式调度 bubble 执行的能力,能够允许我们根据可用资源和作业计算特点,来寻找性能与资源利用率的最佳平衡点。

4. 资源的动态配置和动态管理

传统分布式作业对于每个计算节点需要的资源类型(CPU/GPU/Memory)和大小都是预先确定下来的。然而在分布式作业,在作业运行之前,对计算节点资源类型和大小的合理选择,是比较困难的。即便对于计算引擎的开发者,也需要通过一些比较复杂的规则,才能预估出大概合理的配置。而对于需要将这些配置透明给终端用户的计算模式,终端用户要做出选择就更加困难。


在这里以 PAI 的 Tensorflow(TF)作业为例,描述 DAG 2.0 的资源动态配置能力,怎样帮助平台的 TF 作业选择合理的 GPU 类型资源以及提高 GPU 资源的利用率。相比 CPU 而言,GPU 作为一种较新的计算资源,硬件的更新换代较快,同时普通终端用户对于其计算特点也相对不了解。因此终端用户在指定 GPU 资源类型时,经常存在着不合理的情况。与此同时,GPU 在线上又是相对稀缺资源。今天在线上,GPU 申请量经常超过集群 GPU 总数,导致用户需要花很长时间排队等待资源。而另外一方面,集群中 GPU 的实际利用率却偏低,平均只有 20%左右。这种申请和实际使用之间存在的 Gap,往往是由于用户作业配置中,事先指定的 GPU 资源配置不合理造成。


在 DAG2.0 的框架上,PAI TF GPU 作业(见 session 2.2.2 的 dynamic PS DAG)引入了一个额外的"计算控制节点",可以通过运行 PAI 平台的资源预测算法,来判断当前作业实际需要的 GPU 资源类型,并在必要的时候,通过向 AM 发送动态事件,来请求修改下游 worker 实际申请的 GPU 类型。这其中资源预测算法,可以是根据算法的类型,数据的特点,以及历史作业信息来做 HBO (history based optimization),也可以通过 dry-run 的方法来进行试运行,以此确定合理的资源类型。


具体实现上,这个场景中 control stage 与 Worker 之间通过 concurrent edge 连接,这条边上的调度触发条件是在 control stage 已经做出资源选择决定之后,通过其发出的事件来触发。这样的作业运行期间的动态资源配置,在线上功能测试中,带来了 40%以上的集群 GPU 利用率提升。


作为物理特性一个重要的维度,对计算节点的资源特性在运行时的动态调整能力,在 PAI 以及 MaxCompute 上都能找到广泛的应用。以 MaxCompute SQL 为例,对于下游节点的 CPU/Memory 的大小,可以根据上游数据的特点进行有效的预判;同时对于系统中发生的 OOM,可以尝试自动调高 OOM 后重试的计算节点的内存申请,避免作业的失败,等等,。这些都是在 DAG 2.0 上新的架构上实现的一些新功能,在这里不做具体的展开。

5. 工程化与上线

作为分布式系统的底座,DAG 本身的动态能力以及灵活度,在与上层计算引擎结合时,能够支持上层计算实现更加高效准确的执行计划,在特定场景上实现数倍的性能提升以及对资源利用率的提高。在上文中,也集中介绍了整个 DAG 2.0 项目工作中,开发实现的一些新功能与新的计算模式。除了对接计算引擎来实现更高效的执行计划,调度本身的敏捷性,是 AM/DAG 执行性能的基本素质。 DAG2.0 的调度决策均基于事件驱动框架以及灵活的状态机设计来实现,在这里也交出 DAG 2.0 在基本工程素养和性能方面的成绩单:



这里选用了最简单的 Map-Reduce(MR)作业为例,对于这种作业,调度执行上并无太多可以取巧的地方,考验的是调度系统本身的敏捷度和整个处理流程中的全面去阻塞能力。这个例子也凸显了 DAG 2.0 的调度性能优势,尤其作业规模越大,优势越发明显。此外,对于更接近线上的 work-load 的场景,在 TPCDS 标准 benchmark 中,当执行计划和运行逻辑完全相同时,2.0(未打开动态执行等功能)的高性能调度也给作业带来了显著的提升。



最后,对于一个从头到尾完整替代原有系统的新一代的全新框架,怎样无缝对接线上场景,实现大规模的上线,是一个同样重要(甚至更重要)的话题,也是对一个实际生产系统进行升级,与小范围的新系统 POC 之间最大的区别。今天的伏羲调度系统,每天支撑着阿里集团内外大数据计算平台千万的分布式作业。DAG/AM 这一核心分布式调度执行组件的更新换代,要完整替换线上已经支撑了大数据业务 10 年的分布式生产系统,而不造成现有场景的失败,这需要的不仅仅是架构和设计上的先进性。如何在"飞行中换引擎", 保质保量的实现系统升级,其挑战完全不亚于新的系统架构本身的设计。要实现这样的升级,拥有一个稳固的工程基座,以及测试/发布框架,都是不可或缺的。没有这样子的底座,上层的动态功能与新计算模式,都无从谈起。



目前 DAG 2.0 目前已全面覆盖了阿里集团 MaxCompute 所有线上的 SQL 离线作业和所有准实时作业,以及 PAI 平台的所有 Tensorflow 作业(CPU 和 GPU)+ PyTorch 作业。每天支撑数千万分布式作业的运行,并经受了 19 年双 11/双 12 的考验。在面对两次大促创历史记录的数据洪峰(相比 18 年增长 50%+)压力下,保障了集团重点基线在大促当天准时产出。与此同时,更多种类型的作业(例如跨集群复制作业等等)正在迁移到 DAG 2.0 的新架构,并且依托新的架构升级计算作业本身的能力。DAG 2.0 的框架基座的上线,为各条计算线上依托其实现新功能打下了坚实基础。

6. 展望

伏羲 DAG 2.0 核心架构的升级,旨在夯实阿里计算平台长期发展的基础,并支持上层计算引擎与分布式调度方面结合,实现各种创新和创建新计算生态。架构的升级本身是向前迈出的重要一步,但也只是第一步。要支撑企业级的,各种规模,各种模式的全频谱计算平台,需要将新架构的能力和上层计算引擎,以及伏羲系统其他组件进行深度整合。依托阿里的应用场景,DAG 2.0 除了在作业规模等方面继续在业界保持领先之外,架构和功能上也有许多创新, 比如前面我们已经介绍过的:


  • 在业界首次在分布式执行框架上,实现了执行过程中逻辑图和物理图的双重动态可调;

  • 通过 Bubble 机制实现了混合的计算模式,探索资源利用率和作业性能间的最佳平衡。


除此之外,2.0 更加清晰的系统封层架构带来的一个重要改变就是能有利于新功能更快速开发,提速平台和引擎向前创新。由于篇幅有限,本文只能由点击面对介绍了一部分新功能与新计算模式,还有许许多多已经实现,或正在开发中的功能,在业界都是全新的探索,暂时不做进一步展开,比如


  • 准实时作业体系架构的整体升级: 资源管理与多作业管理的解耦,支持准实时作业场景上的动态图功能

  • 常驻的单 container 多 slot 执行的 cache-aware 查询加速服务(MaxCompute 短查询)

  • 基于状态机的作业节点管理以及失败下的智能重跑机制

  • 动态可定义的 shuffle 方式:通过 recursive shuffle 等方式动态解决线上大规模作业中的 in-cast 问题

  • 基于 adaptive 的中间数据动态切分与聚合,解决实际分布式作业中各种数据倾斜问题

  • 支持 PAI TF GPU 作业的多执行计划选项

  • 通过 DAG 执行过程中与优化器的交互,实现渐进式的交互式动态优化

  • 支持 Imperative 语言特性,通过 DAG 的动态自增长等能力,对接 IF/ELSE/LOOP 等语义


核心调度底座能力的提升,能够为上层的各种分布式计算引擎提供真正企业级的服务能力,提供必须的弹药。而这些计算调度能力提升带来的红利,最终会通过 MaxCompute 和 PAI 等引擎,透传到终端的阿里云计算服务的各个企业。在过去的十年,阿里业务由内向外的驱动,锻造了业界规模最大的云上分布式平台。而通过更好服务集团内部以及云上的企业用户,我们希望能够平台的企业级服务能力,可以完成由内向外,到由外至内的整个正向循环过程,推动计算系统螺旋式上升的不断创新,并通过性能/规模,以及智能化自适应能力两个维度方面的推进,降低分布式计算服务的使用门槛,真正实现大数据的普惠。


作者介绍:


CHEN, Yingda 阿里云智能高级技术专家


2020-05-22 14:432629

评论

发布
暂无评论
发现更多内容

OpenHarmony标准设备应用开发(三)——分布式数据管理

OpenHarmony开发者

OpenHarmony 分布式数据

下拉推荐在 Shopee Chatbot 中的探索和实践

Shopee技术团队

算法 chatbot 推荐算法

如何快速实现持续交付

阿里云云效

云计算 阿里云 软件开发 CI/CD 持续交付

VuePress 博客搭建系列 33 篇正式完结!

冴羽

JavaScript Vue 前端 vuepress 博客搭建

教你VUE中的filters过滤器2种用法

华为云开发者联盟

Vue 过滤器 filters过滤器 组件过滤器 全局过滤器

《LeetCode 刷题报告》题解内容Ⅱ

謓泽

3月月更

从二十年开源经历出发,70 后大龄程序员谈成长、困境与突围

TDengine

数据库 tdengine 开源

后端开发—一文详解网络IO模型

Linux服务器开发

reactor 后端开发 Linux服务器开发 网络io 网络模型

深入垂直业务场景,SaaS版供应商业务协同管理系统促进企业与供应商高效协同

数商云

数字化转型 供应链系统

Facebook 开源 Golang 实体框架 Ent 现已支持 TiDB

Geek_2d6073

Tapdata 肖贝贝:实时数据引擎系列(六)-从 PostgreSQL 实时数据集成看增量数据缓存层的必要性

tapdata

数据库 实时数据

适合 Kubernetes 初学者的一些实战练习 (三)

汪子熙

云原生 集群 Kubernetes 集群 Kubernetes, 云原生, eBPF 3月月更

小程序开发入门教程

CRMEB

Git教程-帮助开发人员更好的运用Git | 云效

阿里云云效

git 云计算 阿里云 DevOps 开发者

软件定义存储厂商大道云行加入龙蜥社区

OpenAnolis小助手

生态 存储技术 龙蜥社区 大道云行 CLA

云时代,租电脑还是初创型企业最好的选择吗?

阿里云弹性计算

远程办公 无影云电脑 初创型企业

明天直播:如何测试硬件设备与龙蜥操作系统的兼容性?

OpenAnolis小助手

硬件 直播 开源社区 sig 兼容性

向工程腐化开炮 | 治理思路全解

阿里巴巴终端技术

Java android 腐化治理 工程腐化

利用 IoTDB 替换 OpenTSDB,服务大唐集团60家电厂,减少95%运维成本

Apache IoTDB

Apache IoTDB

外部数据的合规引入助力银行用户营销系统冷启动

易观分析

隐私计算

叮咚!参与微服务免费试用,有机会获得腾讯内推资格!

InfoQ写作社区官方

腾讯云 微服务 热门活动

汉化版postman

Liam

Jmeter Postman 接口测试 API swagger

一文带你了解 Python 中的迭代器

踏雪痕

Python 3月程序媛福利 3月月更

异构注册中心机制在中国工商银行的探索实践

SOFAStack

GitHub 开源 分布式架构 注册中心 工商银行

Rust Cell 与RefCell,有啥区别?

非凸科技

昇思MindSpore行至2022,开源社区成就生态共赢

这不科技

华为 昇思MindSpore

华为云GaussDB专家走进课堂,跟莘莘学子聊聊数据库

华为云数据库小助手

GaussDB GaussDB(for openGauss) GaussDB(for MySQL)

春暖花开,等你而来!4月月更挑战开始啦!

InfoQ写作社区官方

热门活动 4月月更

week4作业

Asha

墨天轮访谈 | 华为云温云博:从客户视角出发,GaussDB(for Redis)究竟“香”在哪里?

墨天轮

数据库 redis 华为云 国产数据库 键值数据库

产品FAQ(常见问题)文档模版

小炮

产品 FAQ

EB级计算平台调度系统伏羲 DAG 2.0解析_大数据_CHEN, Yingda_InfoQ精选文章