近日,调度系统迁移工具 Air2phin 宣布开源。借助 Air2phin,用户可 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为有调度系统迁移需要的用户带来极大便利。
Air2phin 是什么?
Air2phin 是一个最近宣布开源的调度系统迁移工具,旨在将 Apache Airflow DAGs 文件转换成 Apache DolphinScheduler Python SDK 定义文件,从而实现用户将调度系统(Workflow orchestration)从 Airflow 迁移到 DolphinScheduler 的目的。它是一个基于多规则的 AST 转换器,使用 LibCST 来解析和转换 Airflow 的 DAG 代码,其全部规则使用 Yaml 文件定义,并提供了一定的自定义规则扩展能力。
近期,Air2phin 已经发布了 0.0.12 版本,提供了丰富的功能,可以更好地帮助用户完成 Airflow 到 Apache DolphinScheduler 的迁移。
注释:AST 是 Abstract Syntax Tree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST 是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(token stream),语法分析器将标记流转换成抽象语法树。AST 是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。
为什么开源 Air2phin?
可能有人会问,为什么我需要一个迁移工具?这是因为随着业务的发展,企业或组织原来使用的工作流编排系统已经无法满足当前的需求,需要将工作流编排系统迁移到新的平台或者更新到新的版本。经过调研,很多用户有了将调度系统从开源工作流编排系统 Airflow 迁移到 Apache DolphinScheduler 上来的需求。
在迁移过程中,由于数据处理任务可能涉及多个系统之间的依赖关系,迁移过程需要确保在不影响业务运行的前提下完成。此时,调度系统迁移工具就可以发挥重要作用,它能减少人工干预,尽量自动化地完成两个调度系统间的迁移工作,并且能兼容多个系统间的多个版本,几乎可以做到用户无干预完成迁移。
为此,白鲸开源专门研发了开源迁移工具 Air2phin,可以让用户 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为用户带来极大的便利。
为了让大家更好地理解 Air2phin 的重要性,我们先从调度系统的相关背景知识开始,了解将调度系统从 Airflow 迁移至 Apache DolphinScheduler 的好处。
为什么要从 Airflow 迁移至 DolphinScheduler?
什么是工作流编排系统?
工作流编排系统,是以尊重编排规则和业务逻辑的方式管理数据流。工作流编排工具让用户可以将多个有关联的任务转换为可以安排、运行和观测的工作流,帮助企业更好地管理和控制业务流程,从而提高业务效率。工作流编排是数据处理流程中不可或缺的组件之一,负责根据预先定义的规则和逻辑执行数据处理任务,确保数据处理流程按照预期顺利执行,常见工作流编排系统包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。
Airflow 是什么?
其中,Apache Airflow 是一个开源的工作流编排系统,它可以帮助用户创建、调度和监控复杂的工作流程。Airflow 最初由 Airbnb 开发,并于 2016 年开源,现在由 Apache 软件基金会维护。Airflow 使用 Python 语言编写,具有高度的可扩展性和灵活性,支持多种任务类型,如计算、数据处理、通知、交互等。Airflow 的工作流程是通过编写 Python 脚本来定义的,可以使用 Airflow 提供的操作符和钩子,以及自定义操作符和钩子来扩展其功能。但其有着不可忽视的缺陷,比如需要需要深度二次开发,脱离社区版本,升级成本高;Python 技术栈维护迭代成本高;scheduler loop 扫描 Dag folder 延迟降低性能的问题;以及在生产环境中使用稳定性差等。
在新数据时代业务需求下诞生的 Apache DolphinScheduler 是一个开源的分布式工作流调度系统,弥补了以往调度系统的弱势,旨在为企业用户提供一种可靠、高效、易于使用的工作流调度平台,支持多种任务类型,如计算、数据处理、ETL 等。
与 Airflow 相比,DolphinScheduler 采用了分布式架构,提供了多种任务类型,用户可以定义任务之间的依赖关系,设置任务的优先级和调度策略等,其使用可视化的界面来创建和管理工作流程的特性更是与 Airflow 形成鲜明对比,变得更加易于操作,对非编程人员来说更加友好。
经过调研对比,对于很多用户来说,将调度系统迁移至 Apache DolphinScheduler 是一个降本增效的更优选择。
Air2phin 如何安装和使用
Air2phin 是一个 python 的包,可以通过 Python 的包安装工具 pip 完成安装,详见 air2phin getting start。
一个简单的例子
我们通过一个简单的例子,来说明如何使用 Air2phin 的。我们截取了 airflow tutorial.py 中的部分代码作为 Air2phin 转化的例子,来说明 Air2phin 如何逐步完成转化成 dolphinscheduler python sdk。
图 1:airflow tutorial.py 中的部分代码
图 2:Air2phin 如何逐步完成转化成 dolphinscheduler python sdk
假设将 airflow tutorial.py 部分内容保存至文件 tutorial_part.py,想要将其转化成 dolphinscheduler python sdk 定义,只需要一行命令就能完成。结果如图 2 所示,因为命令增加了 --inplace 参数,所以 Air2phin 会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用 --inplace 参数,Air2phin 会新增一个 tutorial_part-air2phin.py 文件来保存转化后的内容。
通过观察,我们发现这次转化分别触发了多条转化规则,包括
将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import 语句)以及第六行 DAG context
将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用
除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta(days=1) 转成 '0 0 0 * * ? *'
最后,我们只需要安装 pydolphinscheduler ,并且将转化后的文件通过 python 运行,就能完成工作流的迁移了,详见 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)。
在运行 python tutorial_part.py 时,需要保证 dolphinscheduler API 和 python gateway 服务已经启动,并且开放了对应的端口,详见启动 python gateway service。
至此,我们通过一个简单的例子,说明了 Air2phin 是如何完成迁移的。
工作原理
Airflow 和 dolphinscheduler python sdk 如何工作?
在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置条件,帮助我们更好地了解 Air2phin 的迁移步骤,当遇到问题的时候也能更加从容地应对。
Airflow 如何工作:Airflow 工作流相关的信息都保存在 DAG 文件中,之后将 DAG 文件放置到 Airflow 的指定目录,Airflow 的 Scheduler 会间隔一定时间去扫描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被动被扫描和更新的。
dolphinscheduler python sdk: 同 Airflow 类似,将全部工作流相关的信息都通过 Python 文件定义,但是 dolphinscheduler python sdk 是通过人为主动触发的方式,将工作流信息提交,运行命令 python 工作流文件名 即可完成主动任务提交。
Air2phin 工作流程
了解完两者是如何使用,如何提交/发现工作流的,将更加利于我们对 Air2phin 的工作原理的理解。因为 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定义文件都是 Python 编写的,所以 Air2phin 的大部分代码都是处理两者间的差异,最后将 Airflow 的代码转化成 dolphinscheduler python sdk 和定义。
Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 来实现 airflow python DAG 代码的抽象语法树解析,然后通过 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)结合转化规则最后转化成 dolphinscheduler python sdk 的定义。
Air2phin 整体工作流程如下:
从标准输入或者文件中获取原本的 Airflow DAG 内容
从 Yaml 文件加载所有转换规则
将 Airflow DAG 内容通过 LibCST 解析成 CST 树
通过 LibCST Transformer 转换 dolphinscheduler python sdk 定义内容
Air2phin 最佳实践
迁移整个文件夹而不是单个文件
当用户想要迁移 Airflow 到 DolphinScheduler 的时候,都是想要整体做迁移而不是单个文件迁移的,Air2phin 提供整体文件夹迁移的能力,只需要将路径从文件路径改成文件夹即可。
增加自定义的规则
部分使用 Airflow 的用户自定义 Hook 或者 Operator,用户自定义的 Operator 无法通过 Air2phin 内置的转化规则完成转化,需要用户增加自定义的规则,并告诉 Air2phin 规则的位置。例如我们有一个叫 MyCustomOperator 的算子是继承 PostgresOperator 的大部分功能, 只是命名不一样,其定义如下:
它在 Airflow 的多个 DAG 中被使用,使用的方式如下:
现在需要对这个 Operator 进行转化,我们可以自定义一个转化规则,并将其命名为 MyCustomOperator.yaml,内容如下,最主要的内容是 migration.module 和 migration.parameter 的定义,其确定了转化规则:
再使用 --custom-rules 参数指定转化自定义参数,就能应用自定义规则的转化:
让 Air2phin 运行地更快
Air2phin 默认是一个进程运行 DAG 文件的转化的,当你有许多 DAG 文件时,Air2phin 转化非常耗时,我们提供了一个启动多进程运行 Air2phin 转化的参数 --multiprocess,可以将其指定为用户机器的 CPU 数量来缩短转化时间:
存在的问题
目前,作为一个转化工具,Air2phin 的使用方式已经算比较完善了,能够满足用户迁移调度系统的基本需求,但还有一些地方有待完善。
内置规则还不够多
转化规则还不够多,目前只有五个,分别是:
airflow.DAG
airflow.operators.bash.BashOperator
airflow.operators.dummy_operator.DummyOperator
airflow.operators.python_operator.PythonOperator
airflow.operators.spark_sql_operator.SparkSqlOperator
如果有更多的规则,Air2phin 将成为一个更加好用的转化工具,这里欢迎各位随时提交转化规则的 PR(https://github.com/WhaleOps/air2phin/pulls)。
部分 Airflow 的用法不能被迁移过来
部分概念仅仅在 Airflow 中有,在 DolphinScheduler 中还没有,如任务的成功、失败、重试、触发 callback,任务的 owner,variable,工作流并发数,tag 等,这部分 Airflow DAG 可以被迁移,但兼容的属性将会丢失,无法迁移到 DolphinScheduler。
Air2phin 常见问题解答
Q:为什么选择解析 Airflow DAG 文件而不是数据库?
A:因为 Airflow DAG 文件中才有完成的工作流信息,Airflow 的数据库中只有工作流基本信息,没有任务定义的信息,也没有任务的关系,我们选择通过解析 Airflow 的 DAG 文件而不是数据库来完成转化。
Q:为什么要通过 dolphinscheduler python sdk 做中转不自己提交到 DolphinScheduler?
A:因为 Airflow DAG 就是 Python 定义的,在 Airflow DAG 中有很多 Python 的特性,我们不想将这部分特性转化成结构化的数据(转化可能存在信息丢失),恰好 DolphinScheduler 已经有了 Python 的 sdk,所以直接通过 LibCST 转化是成本更加低的做法。
Q:为什么使用 LibCST 而不是 python 内置的 AST?
A:因为 LibCST 更加符合我们,Python 内置的 AST 库解析成 AST 的时候会丢失掉 comment 的信息,但是我们呢希望保留着部分信息。且 LibCST 提供更加多 visitor 保证我们更加方便的实现替换。
参考链接:air2phin(https://github.com/WhaleOps/air2phin)
评论