写点什么

使用 GLUE 构建无服务器架构的 ETL Pipeline

  • 2019-10-24
  • 本文字数:5286 字

    阅读完需:约 17 分钟

使用 GLUE 构建无服务器架构的 ETL Pipeline

ETL 是将业务系统的数据经过抽取、清洗转换之后加载到数据处理平台的过程,其目的是将企业中的分散、凌乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。众所周知,ETL 是 BI 项目重要的一个环节,通常在 BI 项目中 ETL 会花掉整个项目中至少 1/3 的时间。ETL 设计的好坏直接关系整个 BI 项目的成败。目前市场上主流的 ETL 工具譬如 Oracle 的 OWB、SQL Server 的 DTS 和 SSIS 服务、Informatic 等,它们要么通过单纯的 SQL 方式实现,屏蔽编码的复杂性,但是缺少灵活性,要么与后端数据源集成度不够高,缺少友好开发环境支持,对触发器,脚本化方面支持不够好。基于上面的 ETL 工具的不足,AWS 推出 Glue 服务可以很方便的解决上面的问题。AWS Glue 易于使用,内置常用的 Classifiers,轻松识别常用的数据格式,同时支持自定义 Classifiers,自动抽取多种数据源的元数据;完全托管的无服务器架构的服务,用户完全不用担心底层基础架构的故障,软件的更新,以及并发能力等,只需要将精力聚焦在如何提高 ETL 的效率方面;另外在开发编程环境,触发器,自动化脚本作业方面都提供很好的支持。


默认情况下,AWS Glue 提供的内置 Classifiers 如果不能满足数据抽取的需求我们需要创建自定义的 Classifiers,本文将演示如何通过 AWS Glue 构建无服务器架构的 ETL Pipeline 实现自定义文本识别器和将多个 CSV 文件在同一 Job 中完成数据的清洗,并将目标格式转换为 Parquet。

准备数据

选择数据源 Web 服务器的日志文件,演示如何自定义文本识别器。Web 服务器的日志样例格式如下:



该日志文件直接使用 AWS Glue 默认的文本识别器是无法识别的,类型显示是 unkonwn 的。


另外一个数据源是选择从公共数据集 NYTaxi 中下载的 3 个 Excel 文 件,演示如何在一个 Job 中实现批量转换多个表。该 Excel 文件样例格式如下:


创建自定义文本识别器

Step 1 创建 Metadata 数据库

打开 AWS Glue 服务的 Console,在左面的导航面板中选择 database,点击 Add Database,输入数据库名:testdb,如下图所示:



选择 create,数据库创建完成。

Step 2 创建 S3 bucket 用于存放源数据

打开 AWS S3 Console,点击 Create Bucket, 输入 Bucket Name: teetttt,如下图所示:


Step 3 创建自定义的 Classifiers

在 AWS Glue Console 左面的导航面板,选择 Classifier,点击 Add Classifier,输入 Classifier Name 为 errlog,选择 Classifier Type 为 Grok,输入 Grok pattern 为:


%{TESTTIME:timestamp} %{IPORHOST:login_host} \S+ %{USER:login_user}/%{NUMBER:pid} as %{USER:sudouser}/%{NUMBER:sudouser_pid} on %{WORD:tty}/%{NUMBER:tty_id}/%{IPORHOST:host_ip}:%{NUMBER:source_port}->%{IPORHOST:local_ip}:%{NUMBER:dest_port} (%{UNIXPATH:current_path} %{GREEDYDATA:command}|%{GREEDYDATA:detail})


Custom Pattern 为:


TESTTIME ([A-z][A-z][A-z] [A-z][A-z][A-z] [0-9] [0-9][0-9][:][0-9][0-9][:][0-9][0-9] [0-9][0-9][0-9][0-9])


如下图所示:



点击 Apply,配置完成。


注意在自定义 classifier 时,Grok 中支持的模式变量和 AWS Glue 中内置的正则变量可以直接在 Grok pattern 使用,譬如 GREEDDATA,UNIXPATH,USER 等。但是如果使用自己定义的正则模式变量,则需要在 Custom pattern 中定义,在本例中 TESTTIME 变量,由于默认的时间模式不能识别 Sun Mar 7 16:05:29 2004 日期格式,因而需要自己定义。

Step 4 创建 Crawlers,使用自定义的 classifier(errlog) 进行元数据爬取

在 AWS Glue Console 的左面导航面板中选择 Crawler,点击 Add Crawler,输入名称 customerrlog,自定义的 classifier 显示在页面下,如下图所示:



点击 Add,添加 errlog 到该 Crawler 中。


点击 Next,选择 Datastore 为 S3,指定 Step 2 创建的 bucket,如下图所示:



继续点击 Next,指定角色,允许该角色访问 S3:



指定调度的频率为 Run On Demand,如下:



设置 Crawler 的元数据输出为 Step 1 中创建的数据库 testdb,如下:



继续选择 Next,review 信息无误,点击 Finish 创建完成。

Step 5 运行 Crawler(customerrlog),爬取 S3 bucket(teetttt)的数据

点击 Run Crawler,运行完毕,生成一个元数据表,如下图所示:


Step 6 调用 AWS Athena 浏览元数据表

选择 errlogbucket 表,点击 view data,AWS Glue 会自动调用 AWS Athena,显示元数据表,如下图所示:



日志文件通过自定义的 Classifier 抓取结果如下:



至此整个自定义 Classifier 对存储在 S3 上的数据进行爬取完成。这样用户可以根据元数据表的信息进行任意的数据转换和加载工作。


下面演示利用在一个 Job 中实现对多个元数据表数据进行清洗,由于 CSV 格式在大数据处理过程中效率相对于 parquet 稍差,需要将源数据(默认格式是 CSV 文件),转换为 parquet 格式,并再次写会到 S3 存储桶中。

创建一个 Job 实现对多个数据源文件进行清洗及格式转换

Step 7 创建 Crawler 命名为 crawlertaxidata,并创建 S3 数据输出存储桶

其具体操作方式参照上面的步骤介绍。

Step 8 运行该 Crawler,自动创建三张元数据表

分别为 flv, green 和 vendorinfo,点击 View data 可以查看具体的元数据信息,如下图所示:


Step 9 创建 Job,实现单元数据表到单元数据表的数据抽取和转换

选择创建 Job,输入 Job 名称,指定 IAM Role,脚本的存储路径和临时路径,如下图所示:


Step 10 选择元数据表

此处只允许选择一个元数据表,选择 green,如下图所示:


Step 11 选择目标数据输出的存储位置 S3 及格式 Parquet

如下图:



AWS Glue 支持 5 种输出格式,分别为 CSV,Parquet,ORC,Json,Avro。输出输出支持 S3 和 JDBC 两种方式。

Step 12 数据清洗及数据类型转换选择

根据需求自动进行数据的清洗及类型选择如下图所示:


Step 13 生成 Job 脚本,点击保存和编辑脚本

如下所示:



由于在实际项目中进行数据抽取的数据源会非常多,因为不能真对单个数据创建一个 Job,需要实现同一个 Job 进行多个数据源的转换操作,需要编辑脚本。

Step 14 编辑 Job 脚本,点击保存脚本

具体编辑完的脚本如下:


C


def map_function(dynamicRecord):  return dynamicRecord
复制代码


C


import sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Job
复制代码


C


## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME'])
复制代码


C


sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)
复制代码


C


datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "green", transformation_ctx = "datasource0")
复制代码


C


datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "flv",  transformation_ctx = "datasource1")datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "testdb", table_name = "vendorinfo", transformation_ctx = "datasource2")
复制代码


C


applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1")
复制代码


C


applymapping2 = ApplyMapping.apply(frame = datasource1, mappings = [("vendorid", "bigint", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count","bigint","passenger_count","long"),("trip_distance", "double", "trip_distance", "double"),("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude","double"), ("ratecodeid", "bigint", "ratecodeid", "long"),("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"),("payment_type", "bigint", "payment_type", "long"),  ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"),  ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping2")applymapping3 = ApplyMapping.apply(frame = datasource2, mappings = [("vendorid", "string", "vendorid", "bigint"), ("comname", "string", "comname", "string")], transformation_ctx = "applymapping3")
复制代码


C


resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")resolvechoice3 = ResolveChoice.apply(frame = applymapping2, choice = "make_struct", transformation_ctx = "resolvechoice3")resolvechoice4 = ResolveChoice.apply(frame = applymapping3, choice = "make_struct", transformation_ctx = "resolvechoice4")
复制代码


C


dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")dropnullfields4 = DropNullFields.apply(frame = resolvechoice3, transformation_ctx = "dropnullfields4")dropnullfields5 = DropNullFields.apply(frame = resolvechoice3, transformation_ctx = "dropnullfields5")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink4")datasink5 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields4, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink5")datasink6 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields5, connection_type = "s3", connection_options = {"path": "s3://nytaxisdata"}, format = "parquet", transformation_ctx = "datasink6")
job.commit()
复制代码


保存该脚本,点击 Run,完成数据的 ETL 工作。

结论

使用 AWS Glue 可以非常方便的帮助用户构建无服务器架构的 ETL Pipeline,用户不需要担心基础架构的可靠性以及后台的数据处理能力,只需专注于 Job 作业的编写。通过内置的 Classifier 和支持自定义创建 Classifier 两种方式为用户在数据爬取方面提供更大的灵活性,友好的脚本开发环境为用户的脚本开发和数据清洗提供更大便利。


作者介绍:


王友升


王友升拥有超过 13 年的 IT 从业经验,负责基于 AWS 的云计算方案架构咨询和设计,推广 AWS 云平台技术和各种解决方案。在加入 AWS 之前,王友升曾在中地数码,浪潮,惠普等公司担任软件开发工程师、DBA 和解决方案架构师。他在服务器、存储、数据库优化方面拥有多年的经验,同时对大数据、Openstack 及人工智能和机器学习方面也进行一定的研究和积累。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/etl-pipeline-for-serverless-architecture-using-glue/


2019-10-24 08:00526

评论

发布
暂无评论
发现更多内容

干货 | Authing 产品总监佟野:Authing 的产品打磨之路

Authing

身份认证 用户思维 2B 产品 用户旅程 产品功能设计

TiDB 6.0 新特性解读 | Collation 规则

TiDB 社区干货传送门

人工智能超大规模预训练模型浅谈

百度Geek说

2021年Java春招高级面试指南(1到5年Java面试者必备)

爱好编程进阶

Java 程序员 后端开发

直播预告丨OpenHarmony标准系统多媒体子系统之音频解读

OpenHarmony开发者

OpenHarmony 多媒体

在虚拟机上搭建单机k8s环境

红莲疾风

DDD领域驱动设计实战-分层架构及代码目录结构

爱好编程进阶

Java 程序员 后端开发

易仓跨境Saas全球租户,如何做到数据秒级响应?

阿里云大数据AI技术

数据库 flink SaaS

实力印证!青藤入选第一批“网络安全能力评价工作组”成员单位

青藤云安全

深度学习|AI芯片:上游产业率先爆发

Finovy Cloud

深度学习 gpu GPU服务器

一场会带来啥改变?三翼鸟引领行业进入有脑时代

脑极体

Hibernate多对多的关系映射,详解(代码

爱好编程进阶

Java 程序员 后端开发

GitHub 和 Gitee 开源免费 10 个超赞后台管理面板,看完惊呆了!

爱好编程进阶

Java 程序员 后端开发

TiDB Cloud GA,助力全球企业在云上构建新一代云原生应用

PingCAP

未来以体验为中心的数字化战略前景 已经变得愈发明朗

易观分析

精细运营 渠道融合

架构实战营之毕业总结

IT屠狗辈

架构实战营

10个经典又容易被人疏忽的JVM面试题

爱好编程进阶

Java 程序员 后端开发

BIO,NIO,AIO的区别

爱好编程进阶

Java 程序员

使用 GLUE 构建无服务器架构的 ETL Pipeline_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章