写点什么

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

  • 2020-01-13
  • 本文字数:6409 字

    阅读完需:约 21 分钟

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)

为不同工作负载优化数据结构

由于 S3 的成本相对便宜且我们只需为每个查询扫描的数据付款,我们认为,对不同的工作负载和不同的分析引擎,以不同的格式保存数据是有意义的。需要注意的是,我们可以有任意数量的表指向 S3 上的相同数据。这完全取决于我们如何对数据分区及如何更新表分区。

数据排列

例如,我们有一个一分钟运行一次且为最后一分钟收集的数据生成统计数据的进程。利用 Amazon Redshift,将如下所示通过在表上运行查询来完成此操作:


SQL


SELECT   user,  COUNT(*) FROM   events_table WHERE   ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ GROUP BY   user;
复制代码


(假设 ‘ts’ 是为每个事件存储时间戳的列。)


利用 Redshift Spectrum 时,我们为每个查询中扫描的数据付费。如果数据按分钟而不是小时分区,则查看一分钟数据的查询费用为成本的 1/60。如果我们使用仅指向最后一分钟数据的临时表,我们将节省不必要的费用。

高效创建 Parquet 数据

平均而言,我们有 800 个实例来处理流量。每个实例发送最终加载到 Amazon Redshift 中的事件。从三年前开始,我们可以将数据从每个服务器卸载到 S3 中,然后再执行从 S3 到 Amazon Redshift 的定期复制命令。


最近,Amazon Kinesis Firehose 将卸载数据的功能直接添加到了 Amazon Redshift 中。虽然现在这是个可行选项,但我们保留了相同的收集过程,这个过程已完美、高效地工作了三年。


但当我们合并 Redshift Spectrum 时,情况发生了变化。利用 Redshift Spectrum,我们需要找到一种方法来:


  • 从实例中收集事件数据。

  • 以 Parquet 格式保存数据。

  • 对数据进行有效分区。


为完成此操作,我们将数据另存为 CSV 格式,然后将其转换为 Parquet。生成 Parquet 文件的最有效方法是:


  1. 将 S3 临时存储桶用作目标,以一分钟的间隔将数据从实例发送到 Kinesis Firehose 中。

  2. 聚合每小时的数据,并使用 AWS LambdaAWS Glue 将其转换为 Parquet。

  3. 通过更新表分区将 Parquet 数据添加到 S3。


在此新过程中,我们必须在将数据发送到 Kinesis Firehose 之前更加注意验证数据,因为分区中的单个损坏记录无法对该分区进行查询。

数据验证

为了将我们的点击数据存储在表中,我们考虑了以下 SQL 创建表命令:


SQL


create external TABLE spectrum.blog_clicks (    user_id varchar(50),    campaign_id varchar(50),    os varchar(50),    ua varchar(255),    ts bigint,    billing float)按 (date date, hour smallint) 分区  存储为 parquet位置 's3://nuviad-temp/blog/clicks/';
复制代码


上面的语句定义了包括几个属性的新外部表(所有的 Redshift Spectrum 表为外部表)。我们将 ‘ts’ 存储为 Unix 时间戳,并非时间戳,计费数据存储为浮点数,而不是小数(稍后进行更多讨论)。我们还说过,数据按日期和小时进行分区,然后在 S3 上存储为 Parquet。


首先,我们需要获取表定义。此操作可以通过运行以下查询来实现:


SQL


SELECT   * FROM   svv_external_columns WHERE   tablename = 'blog_clicks';
复制代码


此查询列出表中的所有列及其各自的定义:


col 1col 2col 3col 4col 5col 6
schemanametablenamecolumnnameexternal_typecolumnnumpart_key
spectrumblog_clicksuser_idvarchar(50)10
spectrumblog_clickscampaign_idvarchar(50)20
spectrumblog_clicksosvarchar(50)30
spectrumblog_clicksuavarchar(255)40
spectrumblog_clickstsbigint50
spectrumblog_clicksbillingdouble60
spectrumblog_clicksdatedate71
spectrumblog_clickshoursmallint82


现在,我们可以使用此数据为我们的数据创建验证模式:


Json


const rtb_request_schema = {    "name": "clicks",    "items": {        "user_id": {            "type": "string",            "max_length": 100        },        "campaign_id": {            "type": "string",            "max_length": 50        },        "os": {            "type": "string",            "max_length": 50                    },        "ua": {            "type": "string",            "max_length": 255                    },        "ts": {            "type": "integer",            "min_value": 0,            "max_value": 9999999999999        },        "billing": {            "type": "float",            "min_value": 0,            "max_value": 9999999999999        }    }};
复制代码


接下来,我们创建一个函数,以使用此模式验证数据:


JavaScript


function valueIsValid(value, item_schema) {    if (schema.type == 'string') {        return (typeof value == 'string' && value.length <= schema.max_length);    }    else if (schema.type == 'integer') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'float' || schema.type == 'double') {        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);    }    else if (schema.type == 'boolean') {        return typeof value == 'boolean';    }    else if (schema.type == 'timestamp') {        return (new Date(value)).getTime() > 0;    }    else {        return true;    }}
复制代码

使用 Kinesis Firehose 进行近实时的数据加载

在 Kinesis Firehose 上,我们创建了一个新的传输流以按如下所示处理事件:


传输流名称:事件来源:Direct PUTS3 存储桶:nuviad-eventsS3 前缀:rtb/IAM 角色:firehose_delivery_role_1数据转换:已禁用源记录备份:已禁用S3 缓冲区大小 (MB):100S3 缓冲区间隔(秒):60S3 压缩:GZIPS3 加密:未加密状态:活动错误记录:已启用
复制代码


此传输流每分钟聚合一次事件数据,或最多聚合 100 MB,并将数据作为 CSV/GZIP 压缩文件写入 S3 存储桶中。接下来,在我们验证数据后,我们可以将其安全发送至我们的 Kinesis Firehose API:


JavaScript


if (validated) {    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line
let params = { DeliveryStreamName: 'events', Record: { Data: itemString } };
firehose.putRecord(params, function(err, data) { if (err) { console.error(err, err.stack); } else { // 继续您的下一步 } });}
复制代码


现在,我们有一个 CSV 文件,该文件表示 S3 中存储的一分钟事件数据。在向 S3 写入对象之前,Kinesis Firehose 通过添加一个格式为 YYYY/MM/DD/HH 的 UTC 时间前缀对文件进行自动命名。由于我们将日期和小时用作分区,我们需要更换文件的名称和位置,以适应我们的 Redshift Spectrum 模式。

使用 AWS Lambda 自动化数据分布

我们创建了一个由 S3 put 事件触发的简单 Lambda 函数,该事件将文件复制到另一个位置(或多个位置),同时对文件进行重新命名以适应我们的数据结构和处理流程。如前所述,Kinesis Firehose 生成的文件按照预先定义的层次构造结构,如:


S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
复制代码


我们需要做的就是解析对象名称,并按照我们认为合适的方式重新构造其结构。在我们的例子中,我们执行了以下操作(事件是 Lambda 函数中接收的对象,该对象的所有数据都写入了 S3):


JavaScript


/*  事件对象中的对象密钥结构:your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz  */
let key_parts = event.Records[0].s3.object.key.split('/');
let event_type = key_parts[0];let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];let hour = key_parts[4];if (hour.indexOf('0') == 0) { hour = parseInt(hour, 10) + '';}
let parts1 = key_parts[5].split('-');let minute = parts1[7];if (minute.indexOf('0') == 0) { minute = parseInt(minute, 10) + '';}
复制代码


现在,我们可以将文件重新分布到我们需要的两个目标——一个用于分钟处理任务,另一个用于小时聚合:


JavaScript


copyObjectToHourlyFolder(event, date, hour, minute)        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))        .then(deleteOldMinuteObjects.bind(null, event))        .then(deleteStreamObject.bind(null, event))                .then(result => {            callback(null, { message: 'done' });                    })        .catch(err => {            console.error(err);            callback(null, { message: err });                    });
复制代码


Kinesis Firehose 将数据存储在临时文件夹中。我们将对象复制到保存最后一分钟处理数据的另一个文件夹。该文件夹连接到一个小的 Redshift Spectrum 表,数据在该表中进行处理,不需要扫描更大的数据集。我们还将数据复制到保存一整个小时数据的文件夹中,稍后再进行聚合并转换为 Parquet 格式。


由于我们按日期和小时对数据进行分区,如果处理的分钟是一小时中的第一分钟(即分钟 0),我们在 Redshift Spectrum 表上创建了一个新分区。我们运行了以下各项:


SQL


ALTER TABLE   spectrum.events ADD 分区  (date='2017-08-01', hour=0)   LOCATION 's3://nuviad-temp/events/2017-08-01/0/';
复制代码


处理过数据并将其添加到表中之后,我们从 Kinesis Firehose 临时存储和分钟存储文件夹中删除处理后的数据。

使用 AWS Glue 和 Amazon EMR 将 CSV 迁移到 Parquet

我们发现 CSV 数据至 Parquet 转换的小时作业的最简单运行方法是使用 Lambda 和 AWS Glue(感谢强大的 AWS 大数据团队在这方面的帮助)。

创建 AWS Glue 作业

此简单 AWS Glue 脚本执行以下操作:


  • 获取待处理的作业、日期和小时参数

  • 创建 Spark EMR 上下文,以便我们运行 Spark 代码

  • 将 CSV 数据读取到 DataFrame 中

  • 将数据以 Parquet 格式写入目标 S3 存储桶

  • 为该表添加或修改 Redshift Spectrum / Amazon Athena 表分区


Python


import sysimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobimport boto3
## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])
#day_partition_key = "partition_0"#hour_partition_key = "partition_1"#day_partition_value = "2017-08-01"#hour_partition_value = "0"
day_partition_key = args['day_partition_key']hour_partition_key = args['hour_partition_key']day_partition_value = args['day_partition_value']hour_partition_value = args['hour_partition_value']
print("Running for " + day_partition_value + "/" + hour_partition_value)
sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)
df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)df.registerTempTable("data")
df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")
df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)
client = boto3.client('athena', region_name='us-east-1')
response = client.start_query_execution( QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
response = client.start_query_execution( QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' , QueryExecutionContext={ 'Database': 'spectrumdb' }, ResultConfiguration={ 'OutputLocation': 's3://nuviad-temp/convertresults' })
job.commit()
复制代码


注:由于 Redshift Spectrum 和 Athena 都使用 AWS Glue 数据目录,我们可以使用 Athena 客户端将分区添加到表中。


下面是关于浮点类型、小数类型和双精度型的一些单词。经证明,使用小数类型比我们预期的更具有挑战性,因为 Redshift Spectrum 和 Spark 似乎以不同的方式使用它们。当我们在 Redshift Spectrum 和 Spark 中使用小数类型时,我们不断的收到错误,例如:


S3 查询异常(获取)。由于内部错误,任务失败。文件 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column ‘s3://nuviad-events/events.lat’.列类型:DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq


我们必须对几个浮点格式进行试验,直到我们发现唯一有效的组合是在 Spark 代码中将列定义为双精度型在 Spectrum 中将其定义为浮点类型。这是账单在 Spectrum 中被定义为浮点类型在 Spark 代码中被定义为双精度型的原因。

创建一个 Lambda 函数以触发转换

接下来,我们创建了一个简单的 Lambda 函数,以使用简单的 Python 代码每小时触发 AWS Glue 脚本:


Python


import boto3import jsonfrom datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context): last_hour_date_time = datetime.now() - timedelta(hours = 1) day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") hour_partition_value = last_hour_date_time.strftime("%-H") response = client.start_job_run( JobName='convertEventsParquetHourly', Arguments={ '--day_partition_key': 'date', '--hour_partition_key': 'hour', '--day_partition_value': day_partition_value, '--hour_partition_value': hour_partition_value } )
复制代码


使用 Amazon CloudWatch Events,我们可以按小时触发此函数。此函数将触发名为 ‘convertEventsParquetHourly’ 的 AWS Glue 作业,并运行前一小时的作业,从而将要处理的作业名称和分区值传递到 AWS Glue 中。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/big-data-using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/


2020-01-13 14:53937

评论

发布
暂无评论
发现更多内容

一文整理区块链技术为企业带来的九大好处

CECBC

作业4

施正威

科技驱动经济发展的时代全面到来

CECBC

「自我检验」熬夜总结50个Vue知识点,全都会你就是神!!!

Sunshine_Lin

面试 Vue 前端 进阶 ES6

SphereEx 完成近千万美元 Pre-A 轮融资,加速构建新一代数据库生态引擎

SphereEx

开源 融资 ShardingSphere SphereEx 嘉御资本

今天你的静态变量和静态代码块执行了吗?

华为云开发者联盟

Java 类加载 静态 静态变量 静态代码块

10个问题让你快速避开java中的jdbc常见坑

华为云开发者联盟

Java 数据库 JDBC fetchSize Prepared Statement

Hive on Spark和Spark sql on Hive,你能分的清楚么

华为云开发者联盟

sql 分布式计算 Sparksql hive on spark 数据源

潘娟:Keep open,Stay tuned 开源为我打开的全新世界 | TiDB Hackathon 2021 评委访谈

PingCAP

Kafka原理——Kafka为何如此之快?

Kafka中文社区

网络安全好学吗?基础入门篇,NMAP高级使用技巧和漏洞扫描发现

学神来啦

网络安全 渗透测试 kali基础 nmap kali Linux

测试阻碍交付,如何破解这一难题?

飞算JavaAI开发助手

带你认识7种云化测试武器

华为云开发者联盟

测试 接口测试 华为云DevCloud 云化测试 Mock 服务

面试官:为什么不同返回类型不算方法重载?

王磊

【Golang】浅谈协程并发竞争资源问题

恒生LIGHT云社区

golang 后端 协程 并发 Go 语言

架构营模块八作业

GTiger

架构实战营

大数据开发之Hive SQL的优化分享

@零度

大数据 Hive SQL

编写Spring MVC控制器的技巧

编程江湖

Spring MVC

Java开发之测试框架知识分享

@零度

Java

EMQ 映云科技入围 Venture50 行业榜单,数字科技企业风向标!

EMQ映云科技

物联网 Venture50

Flink类型系统的根及相关接口

编程江湖

flink

数据库批量插入这么讲究的么?

秦怀杂货店

Java 数据库 批量插入

书单 | 学习数据可视化?看这些书就够了!

博文视点Broadview

【量化】量化交易入门系列3:经典的量化交易策略(中)

恒生LIGHT云社区

量化投资 量化交易 量化

前端开发Vue中的v-指令的使用

@零度

Vue 前端开发

nodejs 异步I/O和事件驱动

编程江湖

nodejs

一文了解区块链如何帮助打击虚假信息

CECBC

Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据

万俊峰Kevin

微服务 stream go-zero Go 语言

ReactNative进阶(一):ReactNative 学习资料汇总

No Silver Bullet

React Native 1月月更

netty系列之:选byte还是选message?这是一个问题

程序那些事

Java Netty 程序那些事 UDT 1月月更

什么是Log4Shell?Log4j漏洞解读

龙智—DevSecOps解决方案

log4j Log4j 2 Log4Shell

在生产中结合使用 Amazon Redshift Spectrum、Amazon Athena 和 AWS Glue 与 Node.js(三)_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章