2025 年技术指引:让真实案例和经验为开发者开路 了解详情
写点什么

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

  • 2019-11-28
  • 本文字数:4459 字

    阅读完需:约 15 分钟

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据

将数据存储在 Amazon S3 中会在扩展、可靠性和成本效益方面提供很多优势。 除此之外,您可以使用 Apache SparkHivePresto 等开源工具来利用 Amazon EMR 处理和分析您的数据。 尽管这些工具很强大,但若要处理需要您进行增量数据处理和记录级插入、更新和删除的使用案例,仍然具有挑战性。


我们与客户交谈发现,有些使用案例需要处理对个别记录的增量更改,例如:


  • 遵守数据隐私规定,在这些情况下,其用户选择行使他们被遗忘的权限或就如何使用他们的数据更改同意。

  • 在您必须处理指定数据插入和更新事件时,处理流数据。

  • 使用变更数据捕获 (CDC) 架构从企业数据仓库或运营数据存储跟踪和提取数据库更改日志。

  • 恢复延迟到达的数据,或分析截至特定时间点的数据。


从今天开始,EMR 版本 5.28.0 包含 Apache Hudi(孵化),因此,您不再需要构建自定义解决方案来执行记录级插入、更新和删除操作。Hudi 开发于 2016 年开始于 Uber,用于解决提取和 ETL 管道间的效率低下。 近几个月来,EMR 团队与 Apache Hudi 社区密切合作,贡献了很多修补程序,包括将 Hudi 更新为 Spark 2.4.4 (HUDI-12)、支持 Spark Avro (HUDI-91)、增加对 AWS Glue Data Catalog (HUDI-306) 的支持以及多个漏洞修复。


使用 Hudi,您可以在 S3 上执行记录级插入、更新和删除,以便能够符合数据隐私法、使用实时流和变更数据捕获、恢复延迟到达的数据及以开放的、与供应商无关的格式追踪历史记录和回滚。您将创建数据集和表,Hudi 则管理基础数据格式。 Hudi 使用 Apache ParquetApache Avro 进行数据存储,并且包括与 Spark、Hive 和 Presto 的内置集成,以便您能够使用与当前使用相同的工具来查询 Hudi 数据集,并能近乎实时地访问全新数据。


启动 EMR 集群时,适用于 Hudi 的库和工具会在至少选择了以下组件之一:Hive、Spark 或 Presto 的任何时候自动安装和配置。 您可以使用 Spark 创建新的 Hudi 数据集,并插入、更新和删除数据。每个 Hudi 数据集均在您的集群的配置元存储(包括 AWS Glue Data Catalog)中进行注册,并显示为可使用 Spark、Hive 和 Presto 进行查询的表。


Hudi 支持两种存储类型,这两种类型定义写入、索引和从 S3 中读取数据的方式:


  • 写入时复制 – 数据按分列格式 (Parquet) 存储,且更新会在写入期间创建文件的新版本。此存储类型最适用于读取密集型工作负载,因为最新版的数据集始终以有效的分列文件提供。

  • 读取时合并 – 数据以分列 (Parquet) 和分行 (Avro) 格式的组合形式存储;更新记录在分行的“delta 文件”中,随后再被压缩,以创建新版本的分列文件。 此存储类型最适用于写入密集型工作负载,因为新提交内容像 delta 文件一样被快速写入,但读取数据集需要将压缩的分列文件与 delta 文件合并。


我们来快速概览如何在 EMR 集群中设置和使用 Hudi 数据集。


**将 Apache Hudi 与 Amazon EMR 结合使用


**我开始从 EMR 控制台中创建集群。在高级选项中,我选择 EMR 版本 5.28.0(包含 Hudi 的第一个版本)和以下应用程序:Spark、Hive 和 Tez。在硬件选项中,我添加了 3 个任务节点,以确保拥有足够的容量来同时运行 Spark 和 Hive。


当集群准备就绪时,我使用我在安全选项中选择的密钥对来 SSH 到主节点并访问 Spark Shell。我使用下面的命令来启动 Spark Shell 以将其用于 Hudi:


Bash


$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"              --conf "spark.sql.hive.convertMetastoreParquet=false"              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
复制代码


在这里,我使用下面的 Scala 代码使用“写入时复制”存储类型将一些示例 ELB 日志导入 Hudi 数据集中:


Scala


import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.functions._import org.apache.hudi.DataSourceWriteOptionsimport org.apache.hudi.config.HoodieWriteConfigimport org.apache.hudi.hive.MultiPartKeysValueExtractor
//将各种输入值设置为变量val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"val hudiTableName = "elb_logs_hudi_cow"val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName
//设置我们的 Hudi 数据源选项val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
//从 S3 中读取数据并使用分区和记录密钥创建 DataFrameval inputDF = spark.read.format("parquet").load(inputDataPath)
//将数据写入 Hudi 数据集中inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)
复制代码


在 Spark Shell 中,我现在可以对 Hudi 数据集中的记录进行计数:


scala> inputDF2.count()


res1: Long = 10491958


在选项中,我使用了与为集群配置的 Hive 元存储的集成,以便在默认数据库中创建表。用此方式,我可以使用 Hive 查询 Hudi 数据集中的数据:


hive> use default;


hive> select count(*) from elb_logs_hudi_cow;


...


OK


10491958


...


现在,我可以更新或删除数据集中的单个记录。在 Spark Shell 中,我准备了一些变量来查找我想要更新的记录,并准备了一个 SQL 语句来选择我想要更改的列值:


Scala


val requestIpToUpdate = "243.80.62.181"val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"
复制代码


我执行 SQL 语句来查看列的当前值:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_003|


+------------+


然后,我选择并更新记录:


Scala


//使用单个记录创建 DataFrame 并更新列值val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)                      .withColumn("elb_name", lit("elb_demo_001"))
复制代码


现在,我使用与我用于创建数据集的语法相似的语法来更新 Hudi 数据集。但此时,我编写的 DataFrame 仅包含一个记录:


Scala


//将 DataFrame 编写为现有 Hudi 数据集的更新updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我检查了更新的结果:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


现在,我想要删除相同的记录。要删除记录,我将 EmptyHoodieRecordPayload 有效负载传递到写入选项中:


Scala


//使用 EmptyHoodieRecordPayload 编写 DataFrame 以删除记录updateDF.write        .format("org.apache.hudi")        .options(hudiOptions)        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,                "org.apache.hudi.EmptyHoodieRecordPayload")        .mode(SaveMode.Append)        .save(hudiTablePath)
复制代码


在 Spark Shell 中,我发现记录不再可用:


scala> spark.sql(sqlStatement).show()


+--------+


|elb_name|


+--------+


+--------+


Hudi 如何管理所有这些更新和删除操作? 我们来使用 Hudi 命令行界面 (CLI) 连接到数据集,然后发现这些更改被解读为提交内容:



此数据集为“写入时复制”数据集,这意味着,每当对记录进行更新时,包含该记录的文件将被重新编写,以包含更新的值。您可以看到每个提交编写了多少记录。表格的底行描述了数据集的初始创建,其上面是单个记录的更新,顶行是单个记录的删除。


使用 Hudi,您可以回滚到每个提交。例如,我可以使用以下命令回滚删除操作:


hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031


在 Spark Shell 中,记录现在已返回原来的位置,刚好在更新之后:


scala> spark.sql(sqlStatement).show()


+------------+


| elb_name|


+------------+


|elb_demo_001|


+------------+


“写入时复制”为默认的存储类型。我可以重复上述步骤,通过将“读取时合并”数据集类型添加到我们的 hudiOptions 中来创建和更新该数据集类型:


DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"


如果您更新“读取时合并”数据集并使用 Hudi CLI 查看提交内容,您可以看到“读取时合并”与“写入时复制”相比有何不同。使用“读取时合并”,您只会写入更新的行,而不是像“写入时复制”一样写入整个文件。这就是“读取时合并”有助于需要更多写入或更新/删除密集型工作负载、具有较少读取次数的使用案例的原因。Delta 提交内容以 Avro 记录(基于行的存储)形式写入磁盘中,压缩的数据以 Parquet 文件(分列存储)的形式写入。为了避免创建太多 delta 文件,Hudi 将自动压缩您的数据集,以使您的读取尽可能具有高性能。


创建“读取时合并”数据集时,会创建两个 Hive 表:


  • 第一个表与数据集的名称匹配。

  • 第二个表的名称附加有字符 _rt;后缀 _rt 代表 real-time


查询时,第一个表会返回已压缩的数据,并且将不会显示最新的 delta 提交内容。使用此表会提供最佳性能,但会忽略最新的数据。查询实时表会将压缩的数据与读取时的 delta 提交内容合并,因此,此数据集被称为“读取时合并”。此操作将产生可用的最新数据,但会产生性能开销,且不会像查询压缩数据一样具有高性能。用此方式,数据工程师和分析师可以在性能与数据新鲜度之间灵活地进行选择。


**现已推出


**此新功能现已在具有 EMR 5.28.0 的所有区域推出。 将 Hudi 与 EMR 结合使用不会产生额外费用。您可以在 EMR 文档中了解有关 Hudi 的更多信息。这款新工具可以简化您处理、更新和删除 S3 中的数据的方式。请让我知道您打算将它用于哪些使用案例!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/


2019-11-28 08:001184

评论

发布
暂无评论
发现更多内容

Go-Excelize API源码阅读(七)—— CopySheet(from, to int)

Regan Yue

开源 源码阅读 8月日更 8月月更

“纯C”实现——扫雷游戏(递归实现展开功能)

一介凡夫

c 开源 8月月更

有关分库分表ShardingSphere-JDBC,这是我见过整理的最全的笔记了

Java全栈架构师

Java 数据库 程序员 面试 JDBC

开源一夏 |卷王必备学习的MyBatis-Plus用法~

叶秋学长

开源 mybaits 8月月更

AS北京站如约而至!发布参会感想有机会获得官方周边奖励

InfoQ写作社区官方

热门活动 ArchSummit

MySQL之JDBC编程增删改查

了不起的程序猿

Java MySQL JAVA开发 java程序员

拿捏了!火爆GitHub的字节内部1213页“数据结构与算法”面试手册

冉然学Java

Java 数据结构 面试 算法 构架

Redis为什么这么快?

京东科技开发者

数据库 消息队列 redis'

CCF大会腾源会专场即将召开,聚焦基础软件与开发语言未来发展

腾源会

开源 腾源会

TiSpark 原理之下推丨TiDB 工具分享

PingCAP

TiDB

直播预告 | Authing 如何打造云原生 SaaS 产品架构?

Authing

Netty进阶 -- WebSocket长连接开发

Bug终结者

8月月更

基于 TLS 1.3的百度安全通信协议 bdtls 介绍

百度Geek说

安全

如何设计一组会出现死锁(Deadlock)的ABAP程序

汪子熙

操作系统 SAP abap 8月月更 ABAP死锁

兴盛优选:时序数据如何高效处理?

TDengine

数据库 tdengine 时序数据库

从滴滴被罚款事件思考企业数据治理问题

墨天轮

大数据 滴滴 数据治理 数据安全

【设计模式-前端】单例模式深刻理解和实现

归子莫

前端 设计模式 js 8月月更

案例复现,带你分析Priority Blocking Queue比较器异常导致的NPE问题

华为云开发者联盟

后端 开发

技术分享| 视频传输Simulcast与Svc

anyRTC开发者

音视频 Simulcast Svc 视频传输

SAP ABAP 里存在 Java List 这种集合工具类么?CL_OBJECT_COLLECTION 了解一下

汪子熙

设计模式 迭代器模式 SAP abap 8月月更

阿里云 Hologres助力好未来网校实时数仓降本增效

阿里云大数据AI技术

数据分析 数据治理 数据安全

使用脚手架 快速开发 React组件 npm包 (基于TSDX)

HullQin

CSS JavaScript html 前端 8月月更

7月月更开奖啦!快来看看你中奖了吗?

InfoQ写作社区官方

热门活动 7月月更

python工程化配置方式

芥末拌个饭吧

8月月更

面试官:Redis Zset的实现为什么用跳表,而不用平衡树?

程序员小毕

Java redis 程序员 面试 后端

Spring-boot项目练习笔记(一)JS处理Long型数据精度丢失问题

赵四司机

Java web spring-boot 8月月更

易观分析:银行零售业务实现智能化营销还需突破七大关键点

易观分析

零售 银行 智能化营销

有了阿里这5份Java架构师手册,学习起来轻松多了!

冉然学Java

Java 算法 java面试 性能调优实战 并发架构设计思想

知乎杀疯了,疯传2022Java面试八股文解析+大厂面经

程序知音

Java 程序员 java面试 后端技术 Java面试八股文

和鲸科技创始人范向伟:大部分数据智能项目都面临着两个挑战

ModelWhale

工作流 数字化转型 数据智能 协同效应 8月月更

Spring-boot项目练习笔记(二)MybatisPlus实现公共字段自动填充

赵四司机

Java web MyBatisPlus 8月月更

使用 Amazon EMR 和 Apache Hudi 插入、更新、删除 S3 上的数据_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章