AI 年度盘点与2025发展趋势展望,50+案例解析亮相AICon 了解详情
写点什么

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

  • 2023-02-25
    北京
  • 本文字数:5709 字

    阅读完需:约 19 分钟

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler

近日,调度系统迁移工具 Air2phin 宣布开源。借助 Air2phin,用户可 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为有调度系统迁移需要的用户带来极大便利。

Air2phin 是什么?


Air2phin 是一个最近宣布开源的调度系统迁移工具,旨在将 Apache Airflow DAGs 文件转换成 Apache DolphinScheduler Python SDK 定义文件,从而实现用户将调度系统(Workflow orchestration)从 Airflow 迁移到 DolphinScheduler 的目的。它是一个基于多规则的 AST 转换器,使用 LibCST 来解析和转换 Airflow 的 DAG 代码,其全部规则使用 Yaml 文件定义,并提供了一定的自定义规则扩展能力。


近期,Air2phin 已经发布了 0.0.12 版本,提供了丰富的功能,可以更好地帮助用户完成 Airflow 到 Apache DolphinScheduler 的迁移。


注释:AST 是 Abstract Syntax Tree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST 是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(token stream),语法分析器将标记流转换成抽象语法树。AST 是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。

为什么开源 Air2phin?


可能有人会问,为什么我需要一个迁移工具?这是因为随着业务的发展,企业或组织原来使用的工作流编排系统已经无法满足当前的需求,需要将工作流编排系统迁移到新的平台或者更新到新的版本。经过调研,很多用户有了将调度系统从开源工作流编排系统 Airflow 迁移到 Apache DolphinScheduler 上来的需求。


在迁移过程中,由于数据处理任务可能涉及多个系统之间的依赖关系,迁移过程需要确保在不影响业务运行的前提下完成。此时,调度系统迁移工具就可以发挥重要作用,它能减少人工干预,尽量自动化地完成两个调度系统间的迁移工作,并且能兼容多个系统间的多个版本,几乎可以做到用户无干预完成迁移。

为此,白鲸开源专门研发了开源迁移工具 Air2phin,可以让用户 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为用户带来极大的便利。


为了让大家更好地理解 Air2phin 的重要性,我们先从调度系统的相关背景知识开始,了解将调度系统从 Airflow 迁移至 Apache DolphinScheduler 的好处。


为什么要从 Airflow 迁移至 DolphinScheduler?

什么是工作流编排系统?


工作流编排系统,是以尊重编排规则和业务逻辑的方式管理数据流。工作流编排工具让用户可以将多个有关联的任务转换为可以安排、运行和观测的工作流,帮助企业更好地管理和控制业务流程,从而提高业务效率。工作流编排是数据处理流程中不可或缺的组件之一,负责根据预先定义的规则和逻辑执行数据处理任务,确保数据处理流程按照预期顺利执行,常见工作流编排系统包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。

Airflow 是什么?


其中,Apache Airflow 是一个开源的工作流编排系统,它可以帮助用户创建、调度和监控复杂的工作流程。Airflow 最初由 Airbnb 开发,并于 2016 年开源,现在由 Apache 软件基金会维护。Airflow 使用 Python 语言编写,具有高度的可扩展性和灵活性,支持多种任务类型,如计算、数据处理、通知、交互等。Airflow 的工作流程是通过编写 Python 脚本来定义的,可以使用 Airflow 提供的操作符和钩子,以及自定义操作符和钩子来扩展其功能。但其有着不可忽视的缺陷,比如需要需要深度二次开发,脱离社区版本,升级成本高;Python 技术栈维护迭代成本高;scheduler loop 扫描 Dag folder 延迟降低性能的问题;以及在生产环境中使用稳定性差等。



在新数据时代业务需求下诞生的 Apache DolphinScheduler 是一个开源的分布式工作流调度系统,弥补了以往调度系统的弱势,旨在为企业用户提供一种可靠、高效、易于使用的工作流调度平台,支持多种任务类型,如计算、数据处理、ETL 等。


与 Airflow 相比,DolphinScheduler 采用了分布式架构,提供了多种任务类型,用户可以定义任务之间的依赖关系,设置任务的优先级和调度策略等,其使用可视化的界面来创建和管理工作流程的特性更是与 Airflow 形成鲜明对比,变得更加易于操作,对非编程人员来说更加友好。



经过调研对比,对于很多用户来说,将调度系统迁移至 Apache DolphinScheduler 是一个降本增效的更优选择。

Air2phin 如何安装和使用

Air2phin 是一个 python 的包,可以通过 Python 的包安装工具 pip 完成安装,详见 air2phin getting start。


python -m pip install --upgrade air2phin
复制代码


一个简单的例子

我们通过一个简单的例子,来说明如何使用 Air2phin 的。我们截取了 airflow tutorial.py 中的部分代码作为 Air2phin 转化的例子,来说明 Air2phin 如何逐步完成转化成 dolphinscheduler python sdk。

图 1:airflow tutorial.py 中的部分代码

图 2:Air2phin 如何逐步完成转化成 dolphinscheduler python sdk


假设将 airflow tutorial.py 部分内容保存至文件 tutorial_part.py,想要将其转化成 dolphinscheduler python sdk 定义,只需要一行命令就能完成。结果如图 2 所示,因为命令增加了 --inplace 参数,所以 Air2phin 会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用 --inplace 参数,Air2phin 会新增一个 tutorial_part-air2phin.py 文件来保存转化后的内容。


air2phin migrate --inplace tutorial_part.py
复制代码

通过观察,我们发现这次转化分别触发了多条转化规则,包括

  • 将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import 语句)以及第六行 DAG context

  • 将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用

  • 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta(days=1) 转成 '0 0 0 * * ? *'

最后,我们只需要安装 pydolphinscheduler ,并且将转化后的文件通过 python 运行,就能完成工作流的迁移了,详见 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)

# 安装 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 将工作流提交到 dolphinschedulerpython tutorial_part.py
复制代码


在运行 python tutorial_part.py 时,需要保证 dolphinscheduler API 和 python gateway 服务已经启动,并且开放了对应的端口,详见启动 python gateway service。


至此,我们通过一个简单的例子,说明了 Air2phin 是如何完成迁移的。


工作原理

Airflow 和 dolphinscheduler python sdk 如何工作?

在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置条件,帮助我们更好地了解 Air2phin 的迁移步骤,当遇到问题的时候也能更加从容地应对。


  • Airflow 如何工作:Airflow 工作流相关的信息都保存在 DAG 文件中,之后将 DAG 文件放置到 Airflow 的指定目录,Airflow 的 Scheduler 会间隔一定时间去扫描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被动被扫描和更新的。

  • dolphinscheduler python sdk: 同 Airflow 类似,将全部工作流相关的信息都通过 Python 文件定义,但是 dolphinscheduler python sdk 是通过人为主动触发的方式,将工作流信息提交,运行命令 python 工作流文件名 即可完成主动任务提交。

Air2phin 工作流程


了解完两者是如何使用,如何提交/发现工作流的,将更加利于我们对 Air2phin 的工作原理的理解。因为 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定义文件都是 Python 编写的,所以 Air2phin 的大部分代码都是处理两者间的差异,最后将 Airflow 的代码转化成 dolphinscheduler python sdk 和定义。


Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 来实现 airflow python DAG 代码的抽象语法树解析,然后通过 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)结合转化规则最后转化成 dolphinscheduler python sdk 的定义。


Air2phin 整体工作流程如下:

  • 从标准输入或者文件中获取原本的 Airflow DAG 内容

  • 从 Yaml 文件加载所有转换规则

  • 将 Airflow DAG 内容通过 LibCST 解析成 CST 树

  • 通过 LibCST Transformer 转换 dolphinscheduler python sdk 定义内容

Air2phin 最佳实践

迁移整个文件夹而不是单个文件

当用户想要迁移 Airflow 到 DolphinScheduler 的时候,都是想要整体做迁移而不是单个文件迁移的,Air2phin 提供整体文件夹迁移的能力,只需要将路径从文件路径改成文件夹即可。


# 迁移整个 ~/airflow/dags 文件夹air2phin migrate --inplace ~/airflow/dags
复制代码


增加自定义的规则

部分使用 Airflow 的用户自定义 Hook 或者 Operator,用户自定义的 Operator 无法通过 Air2phin 内置的转化规则完成转化,需要用户增加自定义的规则,并告诉 Air2phin 规则的位置。例如我们有一个叫 MyCustomOperator 的算子是继承 PostgresOperator 的大部分功能, 只是命名不一样,其定义如下:


from airflow.providers.postgres.operators.postgres import PostgresOperatorclass MyCustomOperator(PostgresOperator):    def __init__(        self,        *,        sql: str | Iterable[str],        my_custom_conn_id: str = 'postgres_default',        autocommit: bool = False,        parameters: Iterable | Mapping | None = None,        database: str | None = None,        runtime_parameters: Mapping | None = None,        **kwargs,    ) -> None:        super().__init__(            sql=sql,            postgres_conn_id=my_custom_conn_id,            autocommit=autocommit,            parameters=parameters,            database=database,            runtime_parameters=runtime_parameters,            **kwargs,        )
复制代码

它在 Airflow 的多个 DAG 中被使用,使用的方式如下:

from custom.my_custom_operator import MyCustomOperatorwith DAG(    dag_id='my_custom_dag',    default_args=default_args,    schedule_interval='@once',    start_date=days_ago(2),    tags=['example'],) as dag:    t1 = MyCustomOperator(        task_id='my_custom_task',        sql='select * from table',        my_custom_conn_id='my_custom_conn_id',    )
复制代码

现在需要对这个 Operator 进行转化,我们可以自定义一个转化规则,并将其命名为 MyCustomOperator.yaml,内容如下,最主要的内容是 migration.module 和 migration.parameter 的定义,其确定了转化规则:

name: MyCustomOperatordescription: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration:  module:    - action: replace      src: custom.my_custom_operator.MyCustomOperator      dest: pydolphinscheduler.tasks.sql.Sql  parameter:    - action: replace      src: task_id      dest: name    - action: replace      src: my_custom_conn_id      dest: datasource_name
复制代码

再使用 --custom-rules 参数指定转化自定义参数,就能应用自定义规则的转化:


# 指定自定义规则路径为 /path/to/MyCustomOperator.yamlair2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags
复制代码


让 Air2phin 运行地更快

Air2phin 默认是一个进程运行 DAG 文件的转化的,当你有许多 DAG 文件时,Air2phin 转化非常耗时,我们提供了一个启动多进程运行 Air2phin 转化的参数 --multiprocess,可以将其指定为用户机器的 CPU 数量来缩短转化时间:


# 指定 air2phin 启动 12 个进程同时进行转化air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags
复制代码


存在的问题


目前,作为一个转化工具,Air2phin 的使用方式已经算比较完善了,能够满足用户迁移调度系统的基本需求,但还有一些地方有待完善。


内置规则还不够多

转化规则还不够多,目前只有五个,分别是:

  • airflow.DAG

  • airflow.operators.bash.BashOperator

  • airflow.operators.dummy_operator.DummyOperator

  • airflow.operators.python_operator.PythonOperator

  • airflow.operators.spark_sql_operator.SparkSqlOperator


如果有更多的规则,Air2phin 将成为一个更加好用的转化工具,这里欢迎各位随时提交转化规则的 PR(https://github.com/WhaleOps/air2phin/pulls)


部分 Airflow 的用法不能被迁移过来


部分概念仅仅在 Airflow 中有,在 DolphinScheduler 中还没有,如任务的成功、失败、重试、触发 callback,任务的 owner,variable,工作流并发数,tag 等,这部分 Airflow DAG 可以被迁移,但兼容的属性将会丢失,无法迁移到 DolphinScheduler。

Air2phin 常见问题解答


Q:为什么选择解析 Airflow DAG 文件而不是数据库?

A:因为 Airflow DAG 文件中才有完成的工作流信息,Airflow 的数据库中只有工作流基本信息,没有任务定义的信息,也没有任务的关系,我们选择通过解析 Airflow 的 DAG 文件而不是数据库来完成转化。


Q:为什么要通过 dolphinscheduler python sdk 做中转不自己提交到 DolphinScheduler?

A:因为 Airflow DAG 就是 Python 定义的,在 Airflow DAG 中有很多 Python 的特性,我们不想将这部分特性转化成结构化的数据(转化可能存在信息丢失),恰好 DolphinScheduler 已经有了 Python 的 sdk,所以直接通过 LibCST 转化是成本更加低的做法。


Q:为什么使用 LibCST 而不是 python 内置的 AST?

A:因为 LibCST 更加符合我们,Python 内置的 AST 库解析成 AST 的时候会丢失掉 comment 的信息,但是我们呢希望保留着部分信息。且 LibCST 提供更加多 visitor 保证我们更加方便的实现替换。

参考链接:air2phin(https://github.com/WhaleOps/air2phin)

2023-02-25 12:303970

评论

发布
暂无评论
发现更多内容

用友BIP重磅升级,发布新品:用友BIP|商业网络

用友BIP

2023全球商业创新大会

蓝易云:python使用HTTP教程。

百度搜索:蓝易云

Python Linux HTTP requests urllib

“产业应用创新奖2023”启动征集

飞桨PaddlePaddle

人工智能 百度飞桨 文心大模型

多款国产服务器、操作系统与摩斯隐私计算完成兼容性互认

科技热闻

轻松玩转70亿参数大模型!借助Walrus在AWS上部署Llama2

SEAL安全

Seal软件 AI大语言模型 企业号 8 月 PK 榜 Walrus llama-2

GC面临的困境,JVM是如何解决跨代引用的?

Java随想录

Java JVM

库存预占架构升级方案设计-交易库存中心 | 京东物流技术团队

京东科技开发者

架构设计 库存系统 架构升级 企业号 8 月 PK 榜

基于开源IM即时通讯框架MobileIMSDK:RainbowChat-iOS端v7.0版已发布

JackJiang

网络编程 即时通讯 即时通讯IM

大规模块存储 EC 系统构建

Baidu AICLOUD

分布式存储 块存储 纠删码

带你读论文丨S&P2019 HOLMES Real-time APT Detection

华为云开发者联盟

人工智能 华为云 华为云开发者联盟 企业号 8 月 PK 榜

使用秘籍|如何实现图数据库 NebulaGraph 的高效建模、快速导入、性能优化

NebulaGraph

图数据库 NebulaGraph

小灯塔系列-中小企业数字化转型系列研究——BI测评报告

向量智库

途牛科技与火山引擎数智平台合作 打造企业大数据系统“降本”新范式

字节跳动数据平台

大数据 云服务 企业号 8 月 PK 榜 数据支持

科技新秀巅峰决战,百度商业AI技术创新大赛圆满收官

百度Geek说

人工智能 企业号 8 月 PK 榜

Mac软件推荐:ZBrush v2023.2.2中文激活版+可用补丁

mac大玩家j

数字雕刻软件 绘画工具 Mac软件推荐

阿里云大语言模型(LLM)实战训练营,火热开营中!

阿里云大数据AI技术

LLM模型

2024CITE中国电子信息博览会(电博会)

AIOTE智博会

电子展 深圳电子展 电子信息展 电博会

昨晚做梦面试官问我三色标记算法

Java随想录

Java JVM

当小白遇到FullGC | 京东云技术团队

京东科技开发者

企业号 8 月 PK 榜 Full GC TP99

千万级数据深分页查询SQL性能优化实践 | 京东云技术团队

京东科技开发者

MySQL 性能优化 sql 分页查询 企业号 8 月 PK 榜

一次性搞清楚,Java并发编程在各主流框架中的应用,保证看懂

java易二三

Java spring 程序员 计算机

pycharm pro v2023.2最新中文+激活码安装

胖墩儿不胖y

代码编辑器 代码编辑 编辑代码 代码编辑工具

字节跳动基于DataLeap的DataOps实践

字节跳动数据平台

大数据 数据中台 数据研发 企业号 8 月 PK 榜

从头到尾说一次 Spring 事务管理(器) | 京东云技术团队

京东科技开发者

spring spring事务管理 事务管理 企业号 8 月 PK 榜

企业数字化转型,财务规划与分析(FP&A)团队应该如何应对

智达方通

数字化转型 智达方通EPM 财务规划与分析

优化重复冗余代码的8种方式

java易二三

Java 编程 程序员 计算机

蓝易云:云服务器和专用服务器之间的区别?

百度搜索:蓝易云

云计算 服务器 云服务器 ECS 专用服务器

我的心血全在这了,这种方式讲@Async原理,你别再不懂Spring了

java易二三

Java spring 程序员 计算机

龙蜥白皮书精选:云原生混部资源隔离技术

OpenAnolis小助手

开源 云原生 白皮书 内核 龙蜥社区

ECMAScript 2023新增特性

数新网络官方账号

一文搞懂MySQL 数据库 MongoDB

java易二三

Java MySQL 数据库 程序员 计算机

迁移工具 Air2phin 宣布开源,2 步迁移 Airflow 至 Dolphinscheduler_大数据_钟嘉杰_InfoQ精选文章