使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

2020 年 1 月 02 日

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)

使用 AWS Glue Python Shell 构建


首先,在 AWS 管理控制台中导航到 AWS Glue


建立连接


Amazon Redshift 集群位于 VPC 中,因此您首先需要使用 AWS Glue 创建连接。连接包含访问数据存储所需的属性,包括 VPC 网络信息。您最终将此连接附加到 Glue Python Shell 作业,以便其可以到达 Amazon Redshift 集群。


从菜单栏中选择连接,然后选择添加连接。为您的连接指定一个名字(例如_blog_rs_connection_),选择 Amazon Redshift 作为 连接类型,然后选择下一步,如以下屏幕截图所示。



集群下,输入 AWS CloudFormation 模板启动的集群的名称,即 _blogstack-redshiftcluster-####__。_因为我为此博客提供的 Python 代码已处理凭证检索,所以您在此处输入的围绕数据库信息的其余值大部分为占位符。您与连接关联的关键信息与网络相关。


请注意,如果没有正确的集群信息,将无法测试连接如果您对此感兴趣,请注意,在选择了正确的集群之后,将自动填充数据库名称用户名,如以下屏幕截图所示。请遵循此处的说明 Secrets Manager 中检索密码信息,然后将其复制到密码字段中。



ETL 代码审查


了解此示例中使用的两个主要的 Python 脚本:


Pygresql_redshift_common.py 是一组功能,可以从 Secrets Manger 中检索群集连接信息和凭据,建立与集群的连接,并分别提交查询。通过传递的参数检索运行时集群信息,这些功能使作业可以连接到有权限的任何集群。您可以按照说明创建 python .egg 文件(已作为 AWS CloudFormation 模板启动的一部分完成)将这些函数打包到库中。请注意,AWS Glue Python Shell 本机原生支持多个 python 库


Python


import pgimport boto3import base64from botocore.exceptions import ClientErrorimport json
#uses session manager name to return connection and credential informationdef connection_info(db):
session = boto3.session.Session() client = session.client( service_name='secretsmanager' )
get_secret_value_response = client.get_secret_value(SecretId=db)
if 'SecretString' in get_secret_value_response: secret = json.loads(get_secret_value_response['SecretString']) else: secret = json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
return secret
#creates a connection to the clusterdef get_connection(db,db_creds):
con_params = connection_info(db_creds)
rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % (con_params['host'], con_params['port'], db, con_params['username'], con_params['password']) rs_conn = pg.connect(dbname=rs_conn_string) rs_conn.query("set statement_timeout = 1200000")
return rs_conn
#submits a query to the clusterdef query(con,statement): res = con.query(statement) return res
复制代码


调用时,AWS Glue Python Shell 作业运行 rs_query.py。它首先解析在调用时传递的作业参数。它使用其中一些参数从 S3 检索 .sql 文件,然后使用 pygresql_redshift_common.py 中的函数连接文件中的语句并将其提交给集群。因此,除了使用刚打包的 Python 库连接到任何集群之外,它还可以检索并运行任何 SQL 语句。这意味着您只需将参数传输到完成管道中每个任务应连接的位置和应提交的项目,即可为所有基于 Amazon Redshift 的 ETL 管理单个 AWS Glue Python Shell 作业。


Python


from redshift_module import pygresql_redshift_common as rs_commonimport sysfrom awsglue.utils import getResolvedOptionsimport boto3
#get job argsargs = getResolvedOptions(sys.argv,['db','db_creds','bucket','file'])db = args['db']db_creds = args['db_creds']bucket = args['bucket']file = args['file']
#get sql statementss3 = boto3.client('s3') sqls = s3.get_object(Bucket=bucket, Key=file)['Body'].read().decode('utf-8')sqls = sqls.split(';')
#get database connectionprint('connecting...')con = rs_common.get_connection(db,db_creds)
#run each sql statementprint("connected...running query...")results = []for sql in sqls[:-1]: sql = sql + ';' result = rs_common.query(con, sql) print(result) results.append(result)
print(results)
复制代码


创建 Glue Python Shell 作业


接下来,将该代码付诸实践:


  1. 导航到 AWS Glue 控制台页面左侧菜单上的作业,然后从那里选择添加作业

  2. 为作业指定名称,例如 blog_rs_query

  3. 对于 IAM 角色,选择您先前在 AWS CloudFormation 控制台的资源部分中记下的相同 GlueExecutionRole

  4. 对于类型,选择 Python shell,将 Python version 保留为 Python 3 的默认值,并为此作业运行选择一个您提供的现有脚本

  5. 对于存储脚本的 S3 路径,请导航到 AWS CloudFormation模板创建的脚本存储桶(在资源中查找 ScriptBucket),然后选择 python/py 文件。

  6. 展开安全性配置、脚本库和作业参数部分,将带有 Amazon Redshift 连接库的 Python .egg 文件添加到 Python 库路径。它也位于下 python /redshift_module-0.1-py3.6.egg 下的脚本存储区中。


完成上述所有操作后,所有内容应看上去与以下屏幕截图中的相同:



选择下一步。通过选择选择将其移动到必需连接下,添加您创建的连接。(从_建立连接_部分开始回想,这使作业能够与 VPC 进行交互。) 选择保存作业并编辑脚本完成操作,如以下屏幕截图所示。



测试驱动 Python Shell 作业


创建作业后,会转到 AWS Glue Python Shell IDE。如果一切顺利,您应该看到 rs_query.py 代码。现在,Amazon Redshift 集群进入空闲状态,因此请使用 Python 代码运行以下 SQL 语句并使用表格进行填充。


  1. 创建一个外部数据库amzreviews_)_。

  2. 创建一个外部表评论_)_,Amazon Redshift Spectrum 可以从该表中读取 S3 中的源数据(公共评论数据集)。该表按 product_category 进行分区,因为源文件是按类别进行组织的,但是通常您应该对经常筛选的列进行分区(请参阅 #4)。

  3. 将分区添加到外部表。

  4. 创建一个本地内部表(评论_)到 Amazon Redshift 集群。product_id 作为 DISTKEY 使用时效果很好,因为它具有高基数,分布均匀,并且很可能(尽管不是此博客场景的明确组成部分)有一列将用于与其他表联接。我选择 review_date 作为 SORTKEY,以有效过滤掉不属于我的目标查询的评论数据(2015 年之后)。通过阅读设计表_文档,了解有关如何最佳选择 DISTKEY/SORTKEY 以及其他表设计参数,进而优化性能的更多信息。

  5. SQL


   CREATE EXTERNAL SCHEMA amzreviews    from data catalog   database 'amzreviews'   iam_role 'rolearn'   CREATE EXTERNAL database IF NOT EXISTS;
CREATE EXTERNAL TABLE amzreviews.reviews( marketplace varchar(10), customer_id varchar(15), review_id varchar(15), product_id varchar(25), product_parent varchar(15), product_title varchar(50), star_rating int, helpful_votes int, total_votes int, vine varchar(5), verified_purchase varchar(5), review_headline varchar(25), review_body varchar(1024), review_date date, year int) PARTITIONED BY ( product_category varchar(25)) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://amazon-reviews-pds/parquet/';


ALTER TABLE amzreviews.reviews ADD partition(product_category='Apparel') location 's3://amazon-reviews-pds/parquet/product_category=Apparel/' partition(product_category='Automotive') location 's3://amazon-reviews-pds/parquet/product_category=Automotive' partition(product_category='Baby') location 's3://amazon-reviews-pds/parquet/product_category=Baby' partition(product_category='Beauty') location 's3://amazon-reviews-pds/parquet/product_category=Beauty' partition(product_category='Books') location 's3://amazon-reviews-pds/parquet/product_category=Books' partition(product_category='Camera') location 's3://amazon-reviews-pds/parquet/product_category=Camera' partition(product_category='Grocery') location 's3://amazon-reviews-pds/parquet/product_category=Grocery' partition(product_category='Furniture') location 's3://amazon-reviews-pds/parquet/product_category=Furniture' partition(product_category='Watches') location 's3://amazon-reviews-pds/parquet/product_category=Watches' partition(product_category='Lawn_and_Garden') location 's3://amazon-reviews-pds/parquet/product_category=Lawn_and_Garden';
CREATE TABLE reviews( marketplace varchar(10), customer_id varchar(15), review_id varchar(15), product_id varchar(25) DISTKEY, product_parent varchar(15), product_title varchar(50), star_rating int, helpful_votes int, total_votes int, vine varchar(5), verified_purchase varchar(5), review_date date, year int, product_category varchar(25))
SORTKEY ( review_date );
复制代码


手动执行此第一项作业,以便您可以看到我所讨论的所有元素在哪里发挥作用。选择 IDE 屏幕顶部的运行作业。展开安全性配置、脚本库和作业参数部分。您可以在此处将参数作为键值对添加,如以下屏幕截图所示。


col 1col 2
–dbreviews
–db_credsreviewssecret
–bucket
–filesql/reviewsschema.sql



选择运行作业以将其启动。完成该作业需要几秒钟。您可以在 IDE 中的代码下方查找日志输出,以观察作业进度。


作业完成后,请导航至 AWS Glue 控制台中的数据库,然后查找 amzreviews 数据库和 reviews 表,如以下屏幕截图所示。如果它们在该位置,则一切会按计划运行! 您还可以使用 Redshift 查询编辑器或您自己的SQL 客户端工具连接到 Amazon Redshift 集群,并查找本地_评论_表。



本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/orchestrate-amazon-redshift-based-etl-workflows-with-aws-step-functions-and-aws-glue/


2020 年 1 月 02 日 14:36123

评论

发布
暂无评论
发现更多内容

阿里P8大牛的建议,工作1-5年的Java工程师如何让自己变得更值钱

Java成神之路

Java 程序员 架构 面试 编程语言

不为人知的网络编程(十):深入操作系统,从内核理解网络包的接收过程(Linux篇)

JackJiang

Linux 网络编程 TCP/IP

架构师训练营第一期 - 第十一周学习总结

卖猪肉的大叔

极客大学架构师训练营

六度空间软件系统开发|六度空间APP开发

开發I852946OIIO

系统开发

LeetCode题解:102. 二叉树的层序遍历,BFS,JavaScript,详细注释

Lee Chen

算法 LeetCode 前端进阶训练营

linux命令:查询占用端口文件所在位置

梁小航航

Linux

刷完这两份pdf轻松拿下了蚂蚁金服、头条、小米等大厂的offer。

Java架构之路

Java 程序员 架构 面试 编程语言

卧槽,牛皮了!某程序员苦刷这两份算法PDF47天,四面字节斩获心仪大厂offer!

Java架构之路

Java 程序员 架构 面试 编程语言

LeetCode题解:45. 跳跃游戏 II,贪心从后向前,JavaScript,详细注释

Lee Chen

算法 LeetCode 前端进阶训练营

测试右移之日志收集与监控

BY林子

敏捷 软件测试

安卓开发不得不会!啃下这些Framework技术笔记,成功入职阿里

欢喜学安卓

android 程序员 编程开发 Android进阶

GitHub标星17K,超火的SpringBoot +Vue实战项目,文档视频贼全

Java成神之路

Java 程序员 架构 面试 编程语言

在阿里巴巴内网找到的一份《Java核心宝典》简直太细了,如获至宝!

Java架构之路

Java 程序员 架构 面试 编程语言

Alibaba Java面试题大揭秘,把这些知识点吃透去面试成功率高达100%

Java架构之路

Java 程序员 架构 面试 编程语言

Flutter技术在会展云中大显身手

京东智联云开发者

flutter 跨平台 移动开发

Norns.Urd 中的一些设计

八苦-瞿昙

C# 随笔 随笔杂谈 aop

一个优秀的程序员,不仅要会编写程序,更要会编写高质量的程序

Java成神之路

Java 程序员 架构 面试 编程语言

可以秒杀全场的SpringCloud微服务电商实战项目,文档贼全

Java成神之路

Java 程序员 架构 面试 编程语言

某Javva程序员金秋9月靠这份文档涨薪10K,你把这份Java进阶文档吃透涨薪超简单!

Java架构之路

Java 程序员 架构 面试 编程语言

4项探索+4项实践,带你了解华为云视觉预训练研发技术

华为云开发者社区

AI 华为云 modelarts

华为工程师:扔掉你手里的其他Netty资料吧,有这份足以

小Q

Java 学习 面试 Netty 网络

话题讨论 | 聊聊那些年你重构过的代码?

xcbeyond

话题讨论

架构师训练营 1 期 -- 第十二周总结

曾彪彪

极客大学架构师训练营

中国SaaS的病与痛?

ToB行业头条

话题讨论 | 对于懂得编程的人来说,编程对你来说有什么乐趣?编程大概是什么感觉?

xcbeyond

话题讨论

金九银十Android热点知识!架构师花费近一年时间整理出来的安卓核心知识,送大厂面经一份!

欢喜学安卓

程序员 面试 编程开发 Android进阶 Android开发

性能之巅:定位和优化程序CPU、内存、IO瓶颈

华为云开发者社区

性能 cpu 优化

足不出户带你体验专业实验室,技术实现不在话下

华为云开发者社区

体验 平台 实验

MySQL最全整理(面试题+笔记+导图),面试大厂不再被MySql难倒

Java成神之路

Java 程序员 架构 面试 编程语言

二层交换机和三层交换机之间VLAN的区别

网络技术平台

小熊派开发实践丨小熊派+合宙Cat.1接入云服务器

华为云开发者社区

IoT 小熊派 实践

使用 AWS Step Functions 和 AWS Glue 编排基于 Amazon Redshift 的 ETL 工作流(二)-InfoQ