写点什么

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

  • 2019-11-28
  • 本文字数:4459 字

    阅读完需:约 15 分钟

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。


我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:


  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。

  • 在您必须处理指定数据插入和更新事件时,处理流数据。

  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。

  • 恢复延迟到达的数据,或分析截至特定时间点的数据。


从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。


使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。


启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。


Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:


  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。

  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。


我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。


**将 Apache Hudi 与 Amazon EMR 结合使用


**我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。


当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:


Bash


$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"              --conf "spark.sql.hive.convertMetastoreParquet=false"              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码


在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:


Scala


import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor
//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
//从 S3 中读取数据并使用分区和记录密钥创建 DataFrameval inputDF = spark.read.format("parquet").load(inputDataPath)
//将数据写入 Hudi 数据集中inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)
复制代码


在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:


scala> inputDF2.count()


res1: Long = 10491958


在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:


hive> use default;


hive> select count(*) from elb_logs_hudi_cow;


...


OK


10491958


...


现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:


Scala


val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码


我执行 SQL 语句来查看列的当前值:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_003|


+------------+


然后,我选择并更新记录:


Scala


//使用单个记录创建 DataFrame 并更新列值val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)                      .withColumn("elb_name", lit("elb_demo_001"))
复制代码


现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:


Scala


//将 DataFrame 编写为现有 Hudi 数据集的更新updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我检查了更新的结果:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:


Scala


//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,                "org.apache.hudi.EmptyHoodieRecordPayload")        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我发现记录不再可用:


scala> spark.sql(sqlStatement).show()


+--------+


|elb_name|


+--------+


+--------+


Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:



此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。


使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:


hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031


在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:


DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"


如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。


创建“读取时合并”数据集时,会创建两个 Hive 表:


  • 第一个表与数据集的名称匹配。

  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time


查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。


**现已推出


**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/


2019-11-28 08:001154

评论

发布
暂无评论
发现更多内容

Baklib|为什么说企业需要重视客户体验?

Baklib

怎样搭建企业内部维基百科

Baklib

深开鸿:万物智联的大江上,升起一轮开源鸿蒙月

脑极体

SpringBoot基于异常处理exception发送邮件消息提醒

宁在春

springboot 7月月更

第二轮1000个Okaleido Tiger,再次登录Binance NFT 1小时售罄

BlockChain先知

【周周有奖】云原生编程挑战赛“边缘容器”赛道邀你来战!

阿里巴巴云原生

阿里云 边缘容器 云原生编程挑战赛

leetcode 763. Partition Labels 划分字母区间(中等)

okokabcd

LeetCode 数据结构与算法 贪心算法

React Refs 笔记📝

程序员海军

React 7月月更

requestVideoFrameCallback() 简单实例

devpoint

3D 视频处理 7月月更

基于对象的实时空间音频渲染丨Dev for Dev 专栏

声网

Dev for Dev 空间音频 实时互动

研发效能的道法术器

laofo

DevOps cicd 研发效能 基础设施 持续交付

编码用这16个命名规则能让你少写一半以上的注释!

岛上码农

flutter ios 前端 安卓开发 签约计划第三季

一文读懂Okaleido Tiger近期动态,挖掘背后价值与潜力

西柚子

PLATO上线LAAS协议Elephant Swap,用户可借此获得溢价收益

鳄鱼视界

年中总结 | 与自己对话,活在当下,每走一步都算数

宇宙之一粟

年中总结 7月月更

一文读懂Okaleido Tiger近期动态,挖掘背后价值与潜力

鳄鱼视界

学习Typescript(二)

bo

前端 ts 7月月更

DDD领域驱动设计如何进行工程化落地

慕枫技术笔记

DDD 架构设计 7月月更

Bootstrap Affix和过渡效果插件的详细使用【前端Bootstrap框架】

恒山其若陋兮

7月月更

PlatoFarm社区生态福音,用户可借助Elephant Swap获得溢价收益

股市老人

一文读懂Okaleido Tiger近期动态,挖掘背后价值与潜力

股市老人

openEuler Embedded SIG | 分布式软总线

openEuler

开源 分布式 操作系统 嵌入式 openEuler

基于java springboot失物招领微信小程序源码

清风

计算机毕业设计 失物招领小程序

什么是低代码?哪些平台适合业务人员?用来开发系统靠不靠谱?

优秀

低代码 低代码平台

智能电视与小程序的结合

Geek_99967b

物联网

快手重点整治搬运、洗稿等方式的养号行为,自媒体平台如何净化内容生态

石头IT视角

英特尔数据中心GPU正式发货,以开放灵活提供强劲算力

科技新消息

被忽视的智能电视小程序领域

Geek_99967b

物联网

Prometheus 的 API 稳定性保障

耳东@Erdong

Prometheus API 7月月更

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章