速来报名!AICon北京站鸿蒙专场~ 了解详情
写点什么

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

  • 2020-01-13
  • 本文字数:6409 字

    阅读完需:约 21 分钟

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

为不同工作负载优化数据结构

由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。

数据排列

例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:


SQL


SELECT   user,  COUNT(*) FROM   events_table WHERE   ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ GROUP BY   user;
复制代码


(假设 ‘ts’ 是为每个事件存储时间戳的列。)


利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。

高效创建 Parquet 数据

平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。


最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。


但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:


  • 从实例中收集事件数据。

  • 以 Parquet 格式保存数据。

  • 对数据进行有效分区。


为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:


  1. 将 S3 临时存储桶用作目标,以一分钟的间隔将数据从实例发送到 Kinesis Firehose 中。

  2. 聚合每小时的数据,并使用 AWS LambdaAWS Glue 将其转换为 Parquet。

  3. 通过更新表分区将 Parquet 数据添加到 S3。


在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。

数据验证

为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:


SQL


create external TABLE spectrum.blog_clicks (    user_id varchar(50),    campaign_id varchar(50),    os varchar(50),    ua varchar(255),    ts bigint,    billing float)按 (date date, hour smallint) 分区  存储为 parquet位置 's3://nuviad-temp/blog/clicks/';
复制代码


上面的语句定义了包括几个属性的新外部表(所有的 Redshift Spectrum 表为外部表)。我们将 ‘ts’ 存储为 Unix 时间戳,并非时间戳,计费数据存储为浮点数,而不是小数(稍后进行更多讨论)。我们还说过,数据按日期和小时进行分区,然后在 S3 上存储为 Parquet。


首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:


SQL


SELECT   * FROM   svv_external_columns WHERE   tablename = 'blog_clicks';
复制代码


此查询列出表中的所有列及其各自的定义:


col 1col 2col 3col 4col 5col 6
schemanametablenamecolumnnameexternal_typecolumnnumpart_key
spectrumblog_clicksuser_idvarchar(50)10
spectrumblog_clickscampaign_idvarchar(50)20
spectrumblog_clicksosvarchar(50)30
spectrumblog_clicksuavarchar(255)40
spectrumblog_clickstsbigint50
spectrumblog_clicksbillingdouble60
spectrumblog_clicksdatedate71
spectrumblog_clickshoursmallint82


现在,我们可以使用此数据为我们的数据创建验证模式:


Json


const rtb_request_schema = {    "name": "clicks",    "items": {        "user_id": {            "type": "string",            "max_length": 100        },        "campaign_id": {            "type": "string",            "max_length": 50        },        "os": {            "type": "string",            "max_length": 50                    },        "ua": {            "type": "string",            "max_length": 255                    },        "ts": {            "type": "integer",            "min_value": 0,            "max_value": 9999999999999        },        "billing": {            "type": "float",            "min_value": 0,            "max_value": 9999999999999        }    }};
复制代码


接下来,我们创建一个函数,以使用此模式验证数据:


JavaScript


function valueIsValid(value, item_schema) {    if (schema.type == 'string') {        return (typeof value == 'string' && value.length <= schema.max_length);    }    else if (schema.type == 'integer') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'float' || schema.type == 'double') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'boolean') {        return typeof value == 'boolean';    }    else if (schema.type == 'timestamp') {        return (new Date(value)).getTime() > 0;    }    else {        return true;    }}
复制代码

使用 Kinesis Firehose 进行近实时的数据加载

在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:


传输流名称:事件来源:Direct PUTS3 存储桶:nuviad-eventsS3 前缀:rtb/IAM 角色:firehose_delivery_role_1数据转换:已禁用源记录备份:已禁用S3 缓冲区大小 (MB):100S3 缓冲区间隔(秒):60S3 压缩:GZIPS3 加密:未加密状态:活动错误记录:已启用
复制代码


此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:


JavaScript


if (validated) {    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
let params = { DeliveryStreamName: 'events', Record: { Data: itemString } };
firehose.putRecord(params, function(err, data) { if (err) { console.error(err, err.stack); } else { // 继续您的下一步 } });}
复制代码


现在,我们有一个 CSV 文件,该文件表示 S3 中存储的一分钟事件数据。在向 S3 写入对象之前,Kinesis Firehose 通过添加一个格式为 YYYY/MM/DD/HH 的 UTC 时间前缀对文件进行自动命名。由于我们将日期和小时用作分区,我们需要更换文件的名称和位置,以适应我们的 Redshift Spectrum 模式。

使用 AWS Lambda 自动化数据分布

我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:


S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
复制代码


我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):


JavaScript


/*  事件对象中的对象密钥结构:your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz  */
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];let hour = key_parts[4];if (hour.indexOf('0') == 0) { hour = parseInt(hour, 10) + '';}
let parts1 = key_parts[5].split('-');let minute = parts1[7];if (minute.indexOf('0') == 0) { minute = parseInt(minute, 10) + '';}
复制代码


现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:


JavaScript


copyObjectToHourlyFolder(event, date, hour, minute)        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))        .then(deleteOldMinuteObjects.bind(null, event))        .then(deleteStreamObject.bind(null, event))                .then(result => {            callback(null, { message: 'done' });                    })        .catch(err => {            console.error(err);            callback(null, { message: err });                    });
复制代码


Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。


由于我们按日期和小时对数据进行分区,如果处理的分钟是一小时中的第一分钟(即分钟 0),我们在 Redshift Spectrum 表上创建了一个新分区。我们运行了以下各项:


SQL


ALTER TABLE   spectrum.events ADD 分区  (date='2017-08-01', hour=0)   LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
复制代码


处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。

使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet

我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。

创建 AWS Glue 作业

此简单 AWS Glue 脚本执行以下操作:


  • 获取待处理的作业、日期和小时参数

  • 创建 Spark EMR 上下文,以便我们运行 Spark 代码

  • 将 CSV 数据读取到 DataFrame 中

  • 将数据以 Parquet 格式写入目标 S3 存储桶

  • 为该表添加或修改 Redshift Spectrum / Amazon Athena 表分区


Python


import sysimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobimport boto3
## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"#hour_partition_key = "partition_1"#day_partition_value = "2017-08-01"#hour_partition_value = "0"
day_partition_key = args['day_partition_key']hour_partition_key = args['hour_partition_key']day_partition_value = args['day_partition_value']hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution( QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
response = client.start_query_execution( QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
job.commit()
复制代码


注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。


下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误,例如:


S3 查询异常(获取)。由于内部错误,任务失败。文件 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column ‘s3://nuviad-events/events.lat’.列类型:DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq


我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。

创建一个 Lambda 函数以触发转换

接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:


Python


import boto3import jsonfrom datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName='convertEventsParquetHourly', Arguments={ '--day_partition_key': 'date', '--hour_partition_key': 'hour', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
复制代码


使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/big-data-using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/


2020-01-13 14:53746

评论

发布
暂无评论
发现更多内容

低代码开发平台推动生产性服务业与制造业融合

EquatorCoco

低代码 生产 制造业

Word文档内容自动生成PPT!这5款堪称办公神器的AI工具不要错过!

彭宏豪95

markdown PPT 在线白板 办公软件 AI生成PPT

低代码开发平台助力企业数据分析走向高阶段

不在线第一只蜗牛

数据挖掘 数据分析 低代码

怎样解决TikTok直播网络问题、顺利直播带货?

Ogcloud

TikTok tiktok运营 tiktok直播 tiktok直播网络 tiktok直播带货

深度解析观测云智能监控的核心设计原理

观测云

智能监控

TikTok直播专线实现流畅海外直播体验

Ogcloud

TikTok Tik Tok tiktok直播 tiktok直播带货

Paimon 在汽车之家的业务实践

Apache Flink

大数据 flink 实时计算 paimon

公平发售平台开发:重塑代币发行的未来

开发丨飞机丨 @aivenli

【HDC.2024】“零代码”智能组装,华为云新一代iPaaS超联接能力让集成更智能

华为云PaaS服务小智

云计算 软件开发 零代码 华为云 大模型

浅谈Java Profiling

乘云数字DataBuff

Java Profiling 实时监控

2024年,C++正在失去人气吗?

伤感汤姆布利柏

京东商品详情数据接口(JD.item_get)丨京东API接口指南

tbapi

京东 京东商品详情数据接口 京东API接口

国科云浅谈IPv6改造的技术挑战与解决方案

国科云

ES 慢上游响应问题优化在用户体验场景中的实践

字节跳动云原生计算

搜索引擎 elasticsearch

Moment of Inspiration for Mac(MoI 3D建模工具)

Mac相关知识分享

3d建模

大数据平台也“云化”?这份改造指南收藏了!

天翼云开发者社区

云计算 大数据 云平台

云行| 天翼云中国行走进甘肃,智绘数字陇原新图景!

天翼云开发者社区

人工智能 云计算

科改考核最高评级,拿下!

天翼云开发者社区

云计算 云服务 云平台

python提取特定格式的数据

快乐非自愿限量之名

Python

华为的成功,你也可以复制

IPD产品研发管理

华为 产品 产品设计 项目 项目经理

首次引入GPT-4o!图像自动评估新基准来啦

Openlab_cosmoplat

图像 #人工智能 ChatGPT

克服指标管理痛点,实现数据价值最大化

袋鼠云数栈

指标体系 指标监控 指标 指标平台 指标建设

KubeCon 香港:移动云与云猿生联合议题《在没有专用 Operator 的情况下管理数据库集群》

小猿姐

数据库 Kubernetes operator

《黑神话:悟空》2024年全球上线,多平台登陆,多版本售价公布!

青椒云云电脑

云电脑 黑神话悟空

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章