写点什么

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(二)

  • 2020-01-13
  • 本文字数:6835 字

    阅读完需:约 22 分钟

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(二)

为不同工作负载优化数据结构

由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。

数据排列

例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:


SQL


SELECT   user,  COUNT(*) FROM   events_table WHERE   ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ GROUP BY   user;
复制代码


(假设 ‘ts’ 是为每个事件存储时间戳的列。)


利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。

高效创建 Parquet 数据

平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。


最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。


但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:


  • 从实例中收集事件数据。

  • 以 Parquet 格式保存数据。

  • 对数据进行有效分区。


为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:


  1. 将 S3 临时存储桶用作目标,以一分钟的间隔将数据从实例发送到 Kinesis Firehose 中。

  2. 聚合每小时的数据,并使用 AWS LambdaAWS Glue 将其转换为 Parquet。

  3. 通过更新表分区将 Parquet 数据添加到 S3。


在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。

数据验证

为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:


SQL


create external TABLE spectrum.blog_clicks (    user_id varchar(50),    campaign_id varchar(50),    os varchar(50),    ua varchar(255),    ts bigint,    billing float)按 (date date, hour smallint) 分区  存储为 parquet位置 's3://nuviad-temp/blog/clicks/';
复制代码


上面的语句定义了包括几个属性的新外部表(所有的 Redshift Spectrum 表为外部表)。我们将 ‘ts’ 存储为 Unix 时间戳,并非时间戳,计费数据存储为浮点数,而不是小数(稍后进行更多讨论)。我们还说过,数据按日期和小时进行分区,然后在 S3 上存储为 Parquet。


首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:


SQL


SELECT   * FROM   svv_external_columns WHERE   tablename = 'blog_clicks';
复制代码


此查询列出表中的所有列及其各自的定义:


col 1col 2col 3col 4col 5col 6
schemanametablenamecolumnnameexternal_typecolumnnumpart_key
spectrumblog_clicksuser_idvarchar(50)10
spectrumblog_clickscampaign_idvarchar(50)20
spectrumblog_clicksosvarchar(50)30
spectrumblog_clicksuavarchar(255)40
spectrumblog_clickstsbigint50
spectrumblog_clicksbillingdouble60
spectrumblog_clicksdatedate71
spectrumblog_clickshoursmallint82


现在,我们可以使用此数据为我们的数据创建验证模式:


Json


const rtb_request_schema = {    "name": "clicks",    "items": {        "user_id": {            "type": "string",            "max_length": 100        },        "campaign_id": {            "type": "string",            "max_length": 50        },        "os": {            "type": "string",            "max_length": 50                    },        "ua": {            "type": "string",            "max_length": 255                    },        "ts": {            "type": "integer",            "min_value": 0,            "max_value": 9999999999999        },        "billing": {            "type": "float",            "min_value": 0,            "max_value": 9999999999999        }    }};
复制代码


接下来,我们创建一个函数,以使用此模式验证数据:


JavaScript


function valueIsValid(value, item_schema) {    if (schema.type == 'string') {        return (typeof value == 'string' && value.length <= schema.max_length);    }    else if (schema.type == 'integer') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'float' || schema.type == 'double') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'boolean') {        return typeof value == 'boolean';    }    else if (schema.type == 'timestamp') {        return (new Date(value)).getTime() > 0;    }    else {        return true;    }}
复制代码

使用 Kinesis Firehose 进行近实时的数据加载

在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:


传输流名称:事件来源:Direct PUTS3 存储桶:nuviad-eventsS3 前缀:rtb/IAM 角色:firehose_delivery_role_1数据转换:已禁用源记录备份:已禁用S3 缓冲区大小 (MB):100S3 缓冲区间隔(秒):60S3 压缩:GZIPS3 加密:未加密状态:活动错误记录:已启用
复制代码


此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:


JavaScript


if (validated) {    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
let params = { DeliveryStreamName: 'events', Record: { Data: itemString } };
firehose.putRecord(params, function(err, data) { if (err) { console.error(err, err.stack); } else { // 继续您的下一步 } });}
复制代码


现在,我们有一个 CSV 文件,该文件表示 S3 中存储的一分钟事件数据。在向 S3 写入对象之前,Kinesis Firehose 通过添加一个格式为 YYYY/MM/DD/HH 的 UTC 时间前缀对文件进行自动命名。由于我们将日期和小时用作分区,我们需要更换文件的名称和位置,以适应我们的 Redshift Spectrum 模式。

使用 AWS Lambda 自动化数据分布

我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:


S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
复制代码


我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):


JavaScript


/*  事件对象中的对象密钥结构:your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz  */
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];let hour = key_parts[4];if (hour.indexOf('0') == 0) { hour = parseInt(hour, 10) + '';}
let parts1 = key_parts[5].split('-');let minute = parts1[7];if (minute.indexOf('0') == 0) { minute = parseInt(minute, 10) + '';}
复制代码


现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:


JavaScript


copyObjectToHourlyFolder(event, date, hour, minute)        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))        .then(deleteOldMinuteObjects.bind(null, event))        .then(deleteStreamObject.bind(null, event))                .then(result => {            callback(null, { message: 'done' });                    })        .catch(err => {            console.error(err);            callback(null, { message: err });                    });
复制代码


Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。


由于我们按日期和小时对数据进行分区,如果处理的分钟是一小时中的第一分钟(即分钟 0),我们在 Redshift Spectrum 表上创建了一个新分区。我们运行了以下各项:


SQL


ALTER TABLE   spectrum.events ADD 分区  (date='2017-08-01', hour=0)   LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
复制代码


处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。

使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet

我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。

创建 AWS Glue 作业

此简单 AWS Glue 脚本执行以下操作:


  • 获取待处理的作业、日期和小时参数

  • 创建 Spark EMR 上下文,以便我们运行 Spark 代码

  • 将 CSV 数据读取到 DataFrame 中

  • 将数据以 Parquet 格式写入目标 S3 存储桶

  • 为该表添加或修改 Redshift Spectrum / Amazon Athena 表分区


Python


import sysimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobimport boto3
## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"#hour_partition_key = "partition_1"#day_partition_value = "2017-08-01"#hour_partition_value = "0"
day_partition_key = args['day_partition_key']hour_partition_key = args['hour_partition_key']day_partition_value = args['day_partition_value']hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution( QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
response = client.start_query_execution( QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
job.commit()
复制代码


注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。


下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误,例如:


S3 查询异常(获取)。由于内部错误,任务失败。文件 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column ‘s3://nuviad-events/events.lat’.列类型:DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq


我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。

创建一个 Lambda 函数以触发转换

接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:


Python


import boto3import jsonfrom datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName='convertEventsParquetHourly', Arguments={ '--day_partition_key': 'date', '--hour_partition_key': 'hour', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
复制代码


使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。

Redshift Spectrum 和 Node.js

我们的开发堆栈基于 Node.js,非常适合需要处理大量交易的高速轻服务器。然而,Node.js 环境有一些限制,要求我们创建变通方法并使用其他工具完成此过程。

Node.js 和 Parquet

由于 Node.js 缺乏 Parquet 模块,需要我们执行 AWS Glue/Amazon EMR 过程以将数据从 CSV 有效迁移到 Parquet。我们宁愿直接保存到 Parquet,但我们找不到有效的方法来执行此操作。


一个有趣的项目是通过称为 node-parquet 的 Marc Vertes 开发 Parquet NPM (https://www.npmjs.com/package/node-parquet)。它没有进入生产状态,但我们认为很值得跟进此生产包的进展。

时间戳数据类型

根据 Parquet 文档,时间戳数据以 64 位整数存储在 Parquet 中。然而,JavaScript 不支持 64 位整数,因为本机号码类型为 64 位双精度型,仅提供 53 位的整数范围。


因此,无法使用 Node.js 将时间戳正确存储在 Parquet 中。解决方法是将时间戳存储为字符串,并将类型转换为查询中的时间戳。使用这种方法,我们没有发现任何性能下降。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/big-data-using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/


2020-01-13 14:52549

评论

发布
暂无评论
发现更多内容

中国信通院正式启动视联网产业与技术研究工作

信通院IOMM数字化转型团队

中国信通院 视频行业 视联网

软件测试/测试开发|给你剖析闭包与装饰器的魔力

霍格沃兹测试开发学社

链路传播(Propagate)机制及使用场景

观测云

APM 链路

鸿蒙开发:于前端开发而言是福音还是挑战?

EquatorCoco

华为 前端开发 HarmonyOS 鸿蒙开发

低代码开发视角下的大模型时代:探索“新思维”技术管理

不在线第一只蜗牛

人工智能 低代码 大模型 AI模型

深度解析Android APP加固中的必备手段——代码混淆技术

雪奈椰子

Python多任务协程:编写高性能应用的秘密武器

霍格沃兹测试开发学社

毫巅之微---不同写法的性能差异 番外篇

fliter

测试开发高薪私教线下班手把手带你提升职业技能

测试人

软件测试

测试开发高薪私教线下班手把手带你提升职业技能

霍格沃兹测试开发学社

如何快速上手Visio?从入门到精通 | Visio替代软件,建议收藏!

彭宏豪95

效率工具 在线白板 架构图 绘图软件 Visio

中国信通院筹备启动“边缘算力网络推进计划”,招募首批成员单位

信通院IOMM数字化转型团队

边缘计算 边缘算力网络

Docker 魔法解密:探索 UnionFS 与 OverlayFS

EquatorCoco

Docker 运维 容器化

【第七在线】智能商品计划:重塑服装行业的供应链管理

第七在线

一站式解决对接问题,让企业管理更轻松!

聚道云软件连接器

案例分享

山寨币发展的崛起:随着比特币 ETF 重新定义市场动态,山寨币激增

区块链软件开发推广运营

dapp开发 区块链开发 链游开发 NFT开发 公链开发

一起畅玩!幻兽帕鲁服务器华为云搭建教程(Windows平台)

YG科技

云手机与实体手机的对比

Ogcloud

云手机 海外云手机 跨境电商云手机 tiktok云手机 云手机海外版

短程无线自组网协议之:发展现状与趋势?

Geek_ab1536

【教程】如何在苹果手机上查看系统文件?

雪奈椰子

OpenKruise :Kubernetes背后的托底

不在线第一只蜗牛

Kubernetes 容器 云原生

追求成长还是追求“高薪”?

老张

个人成长 能力提升

用Python实现高效数据记录!Web自动化技术助你告别重复劳动!

霍格沃兹测试开发学社

媒体声音|PolarDB再升级:欢迎来到云数据库 x AI新时代

阿里云瑶池数据库

人工智能 数据库 云计算 阿里云 云原生

IPQ9554 and IPQ9574 difference - the biggest difference point?

wifi6-yiyi

Go语言对象池实践

FunTester

幻兽帕鲁 Palworld 私有服务器一键部署教程

米开朗基杨

游戏 steam Sealos Palworld 幻兽帕鲁

SDWAN为什么性价比更高,价格更低廉?

Geek一起出海

SD-WAN

2024年值得推荐的 API 开发工具

快乐非自愿限量之名

推荐 测试 开发工具 API

运用ETLCloud快速实现数据清洗、转换

RestCloud

ETL 数据清洗 数据集成工具

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(二)_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章