免费下载案例集|20+数字化领先企业人才培养实践经验 了解详情
写点什么

Flink Slot 详解与 Job Execution Graph 优化

  • 2019-08-01
  • 本文字数:5919 字

    阅读完需:约 19 分钟

Flink Slot详解与Job Execution Graph优化

前言

近期公司内部将 Flink Job 从 Standalone 迁移至了 OnYarn,随后发现 Job 性能较之前有所降低:迁移前有 8.3W+/S 的数据消费速度,迁移到 Yarn 后分配同样的资源但消费速度降为 7.8W+/S,且较之前的消费速度有轻微的抖动。经过原因分析和测试验证,最终采用了在保持分配给 Job 的资源不变的情况下将总 Container 数量减半、每个 Container 持有的资源从 1C2G 1Slot 变更为 2C4G 2Slot 的方式,使该问题得以解决。


经历该问题后,发现深入理解 Slot 和 Flink Runtime Graph 是十分必要的,于是撰写了这篇文章。本文内容分为两大部分,第一部分详细的分析 Flink Slot 与 Job 运行的关系,第二部详细的介绍遇到的问题和解决方案。

Flink Slot

Flink 集群是由 JobManager(JM)、TaskManager(TM)两大组件组成的,每个 JM/TM 都是运行在一个独立的 JVM 进程中。JM 相当于 Master,是集群的管理节点,TM 相当于 Worker,是集群的工作节点,每个 TM 最少持有 1 个 Slot,Slot 是 Flink 执行 Job 时的最小资源分配单位,在 Slot 中运行着具体的 Task 任务。


对 TM 而言:它占用着一定数量的 CPU 和 Memory 资源,具体可通过 taskmanager.numberOfTaskSlots, taskmanager.heap.size 来配置,实际上 taskmanager.numberOfTaskSlots 只是指定 TM 的 Slot 数量,并不能隔离指定数量的 CPU 给 TM 使用。在不考虑 Slot Sharing(下文详述)的情况下,一个 Slot 内运行着一个 SubTask(Task 实现 Runable,SubTask 是一个执行 Task 的具体实例),所以官方建议 taskmanager.numberOfTaskSlots 配置的 Slot 数量和 CPU 相等或成比例。


当然,我们可以借助 Yarn 等调度系统,用 Flink On Yarn 的模式来为 Yarn Container 分配指定数量的 CPU 资源,以达到较严格的 CPU 隔离(Yarn 采用 Cgroup 做基于时间片的资源调度,每个 Container 内运行着一个 JM/TM 实例)。而 taskmanager.heap.size 用来配置 TM 的 Memory,如果一个 TM 有 N 个 Slot,则每个 Slot 分配到的 Memory 大小为整个 TM Memory 的 1/N,同一个 TM 内的 Slots 只有 Memory 隔离,CPU 是共享的。


对 Job 而言:一个 Job 所需的 Slot 数量大于等于 Operator 配置的最大 Parallelism 数,在保持所有 Operator 的 slotSharingGroup 一致的前提下 Job 所需的 Slot 数量与 Job 中 Operator 配置的最大 Parallelism 相等。


关于 TM/Slot 之间的关系可以参考如下从官方文档截取到的三张图:


图一: Flink On Yarn 的 Job 提交过程,从图中我们可以了解到每个 JM/TM 实例都分属于不同的 Yarn Container,且每个 Container 内只会有一个 JM 或 TM 实例;通过对 Yarn 的学习我们可以了解到,每个 Container 都是一个独立的进程,一台物理机可以有多个 Container 存在(多个进程),每个 Container 都持有一定数量的 CPU 和 Memory 资源,而且是资源隔离的,进程间不共享,这就可以保证同一台机器上的多个 TM 之间是资源隔离的(Standalone 模式下,同一台机器下若有多个 TM,是做不到 TM 之间的 CPU 资源隔离的)。



图二: Flink Job 运行图,图中有两个 TM,各自有 3 个 Slot,2 个 Slot 内有 Task 在执行,1 个 Slot 空闲。若这两个 TM 在不同 Container 或容器上,则其占用的资源是互相隔离的。在 TM 内多个 Slot 间是各自拥有 1/3 TM 的 Memory,共享 TM 的 CPU、网络(Tcp:ZK、 Akka、Netty 服务等)、心跳信息、Flink 结构化的数据集等。



图三: Task Slot 的内部结构图,Slot 内运行着具体的 Task,它是在线程中执行的 Runable 对象(每个虚线框代表一个线程),这些 Task 实例在源码中对应的类是 org.apache.flink.runtime.taskmanager.Task。每个 Task 都是由一组 Operators Chaining 在一起的工作集合,Flink Job 的执行过程可看作一张 DAG 图,Task 是 DAG 图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个 Job 的 Execution Graph。


Operator Chain

Operator Chain 是指将 Job 中的 Operators 按照一定策略(例如: single output operator 可以 chain 在一起)链接起来并放置在一个 Task 线程中执行。Operator Chain 默认开启,可通过 StreamExecutionEnvironment.disableOperatorChaining()关闭,Flink Operator 类似 Storm 中的 Bolt,在 Strom 中上游 Bolt 到下游会经过网络上的数据传递,而 Flink 的 Operator Chain 将多个 Operator 链接到一起执行,减少了数据传递/线程切换等环节,降低系统开销的同时增加了资源利用率和 Job 性能。实际开发过程中需要开发者了解这些原理,并能合理分配 Memory 和 CPU 给到每个 Task 线程。


注: 【一个需要注意的地方】Chained 的 Operators 之间的数据传递默认需要经过数据的拷贝(例如:kryo.copy(…)),将上游 Operator 的输出序列化出一个新对象并传递给下游 Operator,可以通过 ExecutionConfig.enableObjectReuse()开启对象重用,这样就关闭了这层 copy 操作,可以减少对象序列化开销和 GC 压力等,具体源码可阅读 org.apache.flink.streaming.runtime.tasks.OperatorChain 与 org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。官方建议开发人员在完全了解 reuse 内部机制后才使用该功能,冒然使用可能会给程序带来 bug。


Operator Chain 效果可参考如下官方文档截图:


图四: 图的上半部分是 StreamGraph 视角,有 Task 类别无并行度,如图:Job Runtime 时有三种类型的 Task,分别是 Source->Map、keyBy/window/apply、Sink,其中 Source->Map 是 Source()和 Map()chaining 在一起的 Task;图的下半部分是一个 Job Runtime 期的实际状态,Job 最大的并行度为 2,有 5 个 SubTask(即 5 个执行线程)。若没有 Operator Chain,则 Source()和 Map()分属不同的 Thread,Task 线程数会增加到 7,线程切换和数据传递开销等较之前有所增加,处理延迟和性能会较之前差。补充:在 slotSharingGroup 用默认或相同组名时,当前 Job 运行需 2 个 Slot(与 Job 最大 Parallelism 相等)。


Slot Sharing

Slot Sharing 是指,来自同一个 Job 且拥有相同 slotSharingGroup(默认:default)名称的不同 Task 的 SubTask 之间可以共享一个 Slot,这使得一个 Slot 有机会持有 Job 的一整条 Pipeline,这也是上文提到的在默认 slotSharing 的条件下 Job 启动所需的 Slot 数和 Job 中 Operator 的最大 parallelism 相等的原因。通过 Slot Sharing 机制可以更进一步提高 Job 运行性能,在 Slot 数不变的情况下增加了 Operator 可设置的最大的并行度,让类似 window 这种消耗资源的 Task 以最大的并行度分布在不同 TM 上,同时像 map、filter 这种较简单的操作也不会独占 Slot 资源,降低资源浪费的可能性。


具体 Slot Sharing 效果可参考如下官方文档截图:


图五: 图的左下角是一个 soure-map-reduce 模型的 Job,source 和 map 是 4 parallelism,reduce 是 3 parallelism,总计 11 个 SubTask;这个 Job 最大 Parallelism 是 4,所以将这个 Job 发布到左侧上面的两个 TM 上时得到图右侧的运行图,一共占用四个 Slot,有三个 Slot 拥有完整的 source-map-reduce 模型的 Pipeline,如右侧图所示;注:map 的结果会 shuffle 到 reduce 端,右侧图的箭头只是说 Slot 内数据 Pipline,没画出 Job 的数据 shuffle 过程。



图六: 图中包含 source-map[6 parallelism]、keyBy/window/apply[6 parallelism]、sink[1 parallelism]三种 Task,总计占用了 6 个 Slot;由左向右开始第一个 slot 内部运行着 3 个 SubTask[3 Thread],持有 Job 的一条完整 pipeline;剩下 5 个 Slot 内分别运行着 2 个 SubTask[2 Thread],数据最终通过网络传递给 Sink 完成数据处理。


Operator Chain & Slot Sharing API

Flink 在默认情况下有策略对 Job 进行 Operator Chain 和 Slot Sharing 的控制,比如:将并行度相同且连续的 SingleOutputStreamOperator 操作 chain 在一起(chain 的条件较苛刻,不止单一输出这一条,具体可阅读 org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(…)),Job 的所有 Task 都采用名为 default 的 slotSharingGroup 做 Slot Sharing。但在实际的需求场景中,我们可能会遇到需人为干预 Job 的 Operator Chain 或 Slot Sharing 策略的情况,本段就重点关注下用于改变默认 Chain 和 Sharing 策略的 API。


StreamExecutionEnvironment.disableOperatorChaining(): 关闭整个 Job 的 Operator Chain,每个 Operator 独自占有一个 Task,如上图四所描述的 Job,如果 disableOperatorChaining 则 source->map 会拆开为 source(), map()两种 Task,Job 实际的 Task 数会增加到 7。这个设置会降低 Job 性能,在非生产环境的测试或 profiling 时可以借助以更好分析问题,实际生产过程中不建议使用。


someStream.filter(…).map(…).startNewChain().map(): startNewChain()是指从当前 Operator[map]开始一个新的 chain,即:两个 map 会 chaining 在一起而 filter 不会(因为 startNewChain 的存在使得第一次 map 与 filter 断开了 chain)。


someStream.map(…).disableChaining(): disableChaining()是指当前 Operator[map]禁用 Operator Chain,即:Operator[map]会独自占用一个 Task。


someStream.map(…).slotSharingGroup(“name”): 默认情况下所有 Operator 的 slotGroup 都为 default,可以通过 slotSharingGroup()进行自定义,Flink 会将拥有相同 slotGroup 名称的 Operators 运行在相同 Slot 内,不同 slotGroup 名称的 Operators 运行在其他 Slot 内。


Operator Chain 有三种策略 ALWAYS、NEVER、HEAD,详细可查看 org.apache.flink.streaming.api.operators.ChainingStrategy。startNewChain()对应的策略是 ChainingStrategy.HEAD(StreamOperator 的默认策略),disableChaining()对应的策略是 ChainingStrategy.NEVER,ALWAYS 是尽可能的将 Operators chaining 在一起;在通常情况下 ALWAYS 是效率最高,很多 Operator 会将默认策略覆盖为 ALWAYS,如 filter、map、flatMap 等函数。

迁移 OnYarn 后 Job 性能下降的问题

JOB 说明:


类似 StreamETL,100 parallelism,即:一个流式的 ETL Job,不包含 window 等操作,Job 的并行度为 100;


环境说明:


  1. Standalone 下的 Job Execution Graph:10TMs * 10Slots-per-TM ,即:Job 的 Task 运行在 10 个 TM 节点上,每个 TM 上占用 10 个 Slot,每个 Slot 可用 1C2G 资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;

  2. OnYarn 下初始状态的 Job Execution Graph:100TMs * 1Slot-per-TM,即:Job 的 Task 运行在 100 个 Container 上,每个 Container 上的 TM 持有 1 个 Slot,每个 Container 分配 1C2G 资源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;

  3. OnYarn 下调整后的 Job Execution Graph:50TMs * 2Slot-per-TM,即:Job 的 Task 运行在 50 个 Container 上,每个 Container 上的 TM 持有 2 个 Slot,每个 Container 分配 2C4G 资源,GCConfig:-XX:+UseG1GC -XX:MaxGCPauseMillis=100;


注: OnYarn 下使用了与 Standalone 一致的 GC 配置,当前 Job 在 Standalone 或 OnYarn 环境中运行时,YGC、FGC 频率基本相同,OnYarn 下单个 Container 的堆内存较小使得单次 GC 耗时减少。生产环境中大家最好对比下 CMS 和 G1,选择更好的 GC 策略,当前上下文中暂时认为 GC 对 Job 性能影响可忽略不计。


问题分析:


引起 Job 性能降低的原因不难定位,从这张 Container 的线程图(VisualVM 中的截图)可见:


图 7:在一个 1C2G 的 Container 内有 126 个活跃线程,守护线程 78 个。首先,在一个 1C2G 的 Container 中运行着 126 个活跃线程,频繁的线程切换是会经常出现的,这让本来就不充裕的 CPU 显得更加的匮乏。其次,真正与数据处理相关的线程是红色画笔圈出的 14 条线程(2 条 Kafka Partition Consumer、Consumers 和 Operators 包含在这个两个线程内;12 条 Kafka Producer 线程,将处理好的数据 sink 到 Kafka Topic),这 14 条线程之外的大多数线程在相同 TM、不同 Slot 间可以共用,比如:ZK-Curator、Dubbo-Client、GC-Thread、Flink-Akka、Flink-Netty、Flink-Metrics 等线程,完全可以通过增加 TM 下 Slot 数量达到多个 SubTask 共享的目的。


此时我们会很自然的得出一个解决办法:在 Job 使用资源不变的情况下,在减少 Container 数量的同时增加单个 Container 持有的 CPU、Memory、Slot 数量,比如上文环境说明中从方案 2 调整到方案 3,实际调整后的 Job 运行稳定了许多且消费速度与 Standalone 基本持平。



注: 当前问题是内部迁移类似 StreamETL 的 Job 时遇到的,解决方案简单但不具有普适性,对于带有 window 算子的 Job 需要更仔细缜密的问题分析。目前 Deploy 到 Yarn 集群的 Job 都配置了 JMX/Prometheus 两种监控,单个 Container 下 Slot 数量越多、每次 scrape 的数据越多,实际生成环境中需观测是否会影响 Job 正常运行,在测试时将 Container 配置为 3C6G 3Slot 时发现一次 java.lang.OutOfMemoryError: Direct buffer memory 的异常,初步判断与 Prometheus Client 相关,可适当调整 JVM 的 MaxDirectMemorySize 来解决。所出现异常如图 8


总结

Operator Chain 是将多个 Operator 链接在一起放置在一个 Task 中,只针对 Operator;Slot Sharing 是在一个 Slot 中执行多个 Task,针对的是 Operator Chain 之后的 Task。这两种优化都充分利用了计算资源,减少了不必要的开销,提升了 Job 的运行性能。此外,Operator Chain 的源码在 streaming 包下,只在流处理任务中有这个机制;Slot Sharing 在 flink-runtime 包下,似乎应用更广泛一些(具体还有待考究)。


最后,只有充分的了解 Slot、Operator Chain、Slot Sharing 是什么,以及各自的作用和相互间的关系,才能编写出优秀的代码并高效的运行在集群上。


参考资料:



作者简介


王成龙,TalkingData 数据工程师。


2019-08-01 16:107892

评论

发布
暂无评论
发现更多内容

C 语言文件读取全指南:打开、读取、逐行输出

小万哥

程序人生 编程语言 软件工程 C/C++ 后端开发

PreparedStatement实践和批处理实践

FunTester

数据库内核那些事|PolarDB查询优化:好好的谓词,为什么要做下推?

阿里云瑶池数据库

数据库 阿里云 云原生

EVE-NG初次启动及WEB客户端访问

小齐写代码

文心大模型融入荣耀MagicOS!打造大模型“端云协同”创新样板

爱编程的喵喵

IPQ8074: the leader in high-performance router motherboard chips

wallysSK

异步编程利器:CompletableFuture深度解析

Java随想录

Java 多线程 并发

软件测试开发/全日制/测试管理丨Web应用框架Flask优势与特点

测试人

软件测试

传统 VC 机构,是否还能在 Fair launch 的散户牛市中胜出?

BlockChain先知

线索系统性能优化实践

京东科技开发者

苹果应用上架是否需要软件著作权?

雪奈椰子

从 AutoMQ Kafka 导出数据到 Databend

Databend

Kafk

京东方精电亮相CES2024,携手合作伙伴共绘智慧出行未来图景

科技热闻

XSKY 与 英特尔联手,释放星飞分布式全闪存储潜能

XSKY星辰天合

sds intel 软件定义存储 QAT XSKY

企业数字化转型的七大误区

天津汇柏科技有限公司

数字化转型

Programming Abstractions in C阅读笔记:p242-p245

codists

Go与C语言的互操作,import “C“的实例

百度搜索:蓝易云

Go 云计算 Linux C语言 云服务器

vivo 海量微服务架构最新实践

vivo互联网技术

开源 微服务 中间件

帕特·基辛格:AI PC是个人电脑历史的“寒武纪”时刻

E科讯

可编程线性霍尔传感器 IC

芯动大师

使用JMeter安装RabbitMQ测试插件的步骤

百度搜索:蓝易云

云计算 Linux 运维 RabbitMQ Jmeter

极狐 GitLab 冷知识:使用 Email 也可以创建 Issue?

极狐GitLab

听6位专家畅谈AI大模型落地实践:场景和人才是关键

华为云开发者联盟

人工智能 华为云 大模型 华为云开发者联盟 DTSE Tech Talk

在线文档软件哪个好?5个好用的协同文档app推荐!

彭宏豪95

团队协作 在线文档 在线白板 在线协同文档 效率软件

docker rabbitmq-清空queue队列数据

百度搜索:蓝易云

云计算 Linux 运维 RabbitMQ 云服务器

舞台婚庆租赁屏选哪个型号更好?

Dylan

节日 LED显示屏 户外LED显示屏 led显示屏厂家 户内led显示屏

传统 VC 机构,是否还能在 Fair launch 的散户牛市中胜出?

加密眼界

2024-01-10:用go语言,给你一个下标从 0 开始的二维整数数组 pairs 其中 pairs[i] = [starti, endi] 如果 pairs 的一个重新排列 满足对每一个下标 i

福大大架构师每日一题

福大大架构师每日一题

【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你探索和分析JMM并发模型之(重排序机制)

洛神灬殇

Java 重排序 并发编程 JVM as-if-serial

Flink Slot详解与Job Execution Graph优化_大数据_王成龙_InfoQ精选文章