写点什么

手把手教你使用 Amazon EMR 进行交互式数据查询

  • 2019-11-20
  • 本文字数:21434 字

    阅读完需:约 70 分钟

手把手教你使用Amazon EMR进行交互式数据查询
本文将带您一步步完成一个利用Amazon EMR进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将**整个过程自动化**的方法。其中涉及的EMR组件主要包括: Hive, Hadoop, presto, sqoop。除EMR外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是cloudfront产生的日志。


在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:
什么是EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
具体参考: [](https://amazonaws-china.com/cn/documentation/elastic-mapreduce/)
什么是S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。
具体参考:[](https://amazonaws-china.com/cn/documentation/s3/)
什么是RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
具体参考:[](https://amazonaws-china.com/cn/documentation/rds/)
**准备工作**
1. Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart
4. 注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。
**手动方式完成交互式数据查询**
第一步,创建一个EMR集群,方法如下:
1. 进入到AWS的控制台,选择EMR服务,点击创建集群。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-1.png)
2. 点击转到高级选项
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-2.png)
3. 软件配置
选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-3.png)
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-4.png)
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-5.png)
6. 安全选项
在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-6.png)7. 修改安全组规则,并登录EMR的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-7.png)
第二步,创建数据表并进行查询
1. SSH到主节点后,执行hive命令,进入到hive命令行界面
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-8.png)
2. 创建一个用日期作为分区的hive表,用来作为最终被查询的表
将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下
以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。
假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-9.png)
向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是INSERT语句执行过程的显示信息。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-10.png)
完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:
执行quit, 退出hive命令行程序。
然后执行以下语句进入presto命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-11.png)
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-12.png)
然后退出presto命令行工具,输入 quit;
第四步,删除EMR集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-13.png)
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
**自动方式完成交互式数据查询**
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从/log目录向/logbydate目录中copy特定日期的文件。
– 创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。
– 从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。
– 使用presto从cloudfrontlogpart表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将hive元数据放在集群的外部。
在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。
针对以上的几个需求,在EMR中对应的解决方法如下.
1. 使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用EMR的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate目录下的数据
2) s3://testcloudfrontlog/logpart目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本loganaly.sh
`#!/bin/bash`
`# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。`
`KEYNAME=yourkeyname`
`# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR集群产生的日志所存放的s3目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的hive表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的mysql的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`


`# 创建emr集群,名字是loganaly, 其中:`
`# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。`
`# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
`--use-default-roles \`
`--ec2-attributes KeyName=$KEYNAME \`
`--termination-protected \`
`--auto-terminate \`
`--configurations $CONFIGFILE \`
`--enable-debugging \`
`--log-uri $LOGURI \`
`--steps \`
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
2. 准备cpjar.sh脚本
作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。
在国外region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
在北京Region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
3. 准备log2logbydate.sh脚本
做用是将原始日志文件copy到按天划分的目录下。
`#!/bin/bash`
`LOGSOURCEDIR=$1`
`STAGINGDIR=$2`
`DATE=$3`
`DATEE=$4`
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
4. 准备hivetables.q脚本
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`LOCATION '${PARTDIRh}';`


`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
`tblproperties("skip.header.line.count"="2");`


`--make dynamic insert available`
`set hive.exec.dynamic.partition.mode=nonstrict;`


`--insert data.`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog${DATEh};`


`--delete the staging table`
`DROP TABLE cloudfrontlogstaging${DATEh};`
5. 准备presto2s3.sh脚本
作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。
`#!/bin/bash`
`TIME=$(date +%H%M%S)`
`SQOOPFILE=$1`
`DATE=$2`
`DATEE=$3`
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
6. 准备s3tomysql.sh脚本
作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。
`#!/bin/bash`
`JDBCURL=$1`
`DBUSER=$2`
`DBPASS=$3`
`SQOOPFILE=$4`
`DATE=$5`
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
7. 准备emrconf.json脚本
作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`}`
`]`
注意:如果是在北京Region, emrconf.json使用以下脚本
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`},`
`{"Classification":"presto-connector-hive",`
`"Properties":{`
`"hive.s3.pin-client-to-current-region":"true"`
`},`
`"Configurations":[]`
`}`
`]`
8. 准备jar包
需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
9. 上传文件
在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。
由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。
cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
如果在手动流程中已经上传了日志文件,则不必再上传。
第三步,执行程序
暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:
1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。
2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。
集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-14.png)
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-15.png)
如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.
如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
第四步,删除资源
为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。
#### 总结
本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
**作者介绍:**
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/HanXiaoyong-mini.jpg)
韩小勇
AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。
复制代码


TAGS:


Amazon EMR


,


分析


,


大咖专栏


本文将带您一步步完成一个利用Amazon EMR进行交互式数据查询的实例,过程包括数据的注入、数据的分析、结果的转存、以及将**整个过程自动化**的方法。其中涉及的EMR组件主要包括: Hive, Hadoop, presto, sqoop。除EMR外,涉及到的其他服务包括:S3, RDS. 本文所使用的数据源是cloudfront产生的日志。


在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:
什么是EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
具体参考: [](https://amazonaws-china.com/cn/documentation/elastic-mapreduce/)
什么是S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。
具体参考:[](https://amazonaws-china.com/cn/documentation/s3/)
什么是RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
具体参考:[](https://amazonaws-china.com/cn/documentation/rds/)
**准备工作**
1. Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart
4. 注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。
**手动方式完成交互式数据查询**
第一步,创建一个EMR集群,方法如下:
1. 进入到AWS的控制台,选择EMR服务,点击创建集群。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-1.png)
2. 点击转到高级选项
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-2.png)
3. 软件配置
选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-3.png)
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-4.png)
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-5.png)
6. 安全选项
在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-6.png)7. 修改安全组规则,并登录EMR的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-7.png)
第二步,创建数据表并进行查询
1. SSH到主节点后,执行hive命令,进入到hive命令行界面
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-8.png)
2. 创建一个用日期作为分区的hive表,用来作为最终被查询的表
将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下
以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。
假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-9.png)
向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是INSERT语句执行过程的显示信息。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-10.png)
完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:
执行quit, 退出hive命令行程序。
然后执行以下语句进入presto命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-11.png)
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-12.png)
然后退出presto命令行工具,输入 quit;
第四步,删除EMR集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-13.png)
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
**自动方式完成交互式数据查询**
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从/log目录向/logbydate目录中copy特定日期的文件。
– 创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。
– 从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。
– 使用presto从cloudfrontlogpart表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将hive元数据放在集群的外部。
在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。
针对以上的几个需求,在EMR中对应的解决方法如下.
1. 使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用EMR的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate目录下的数据
2) s3://testcloudfrontlog/logpart目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本loganaly.sh
`#!/bin/bash`
`# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。`
`KEYNAME=yourkeyname`
`# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR集群产生的日志所存放的s3目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的hive表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的mysql的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`


`# 创建emr集群,名字是loganaly, 其中:`
`# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。`
`# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
`--use-default-roles \`
`--ec2-attributes KeyName=$KEYNAME \`
`--termination-protected \`
`--auto-terminate \`
`--configurations $CONFIGFILE \`
`--enable-debugging \`
`--log-uri $LOGURI \`
`--steps \`
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
2. 准备cpjar.sh脚本
作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。
在国外region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
在北京Region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
3. 准备log2logbydate.sh脚本
做用是将原始日志文件copy到按天划分的目录下。
`#!/bin/bash`
`LOGSOURCEDIR=$1`
`STAGINGDIR=$2`
`DATE=$3`
`DATEE=$4`
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
4. 准备hivetables.q脚本
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`LOCATION '${PARTDIRh}';`


`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
`tblproperties("skip.header.line.count"="2");`


`--make dynamic insert available`
`set hive.exec.dynamic.partition.mode=nonstrict;`


`--insert data.`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog${DATEh};`


`--delete the staging table`
`DROP TABLE cloudfrontlogstaging${DATEh};`
5. 准备presto2s3.sh脚本
作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。
`#!/bin/bash`
`TIME=$(date +%H%M%S)`
`SQOOPFILE=$1`
`DATE=$2`
`DATEE=$3`
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
6. 准备s3tomysql.sh脚本
作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。
`#!/bin/bash`
`JDBCURL=$1`
`DBUSER=$2`
`DBPASS=$3`
`SQOOPFILE=$4`
`DATE=$5`
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
7. 准备emrconf.json脚本
作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`}`
`]`
注意:如果是在北京Region, emrconf.json使用以下脚本
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`},`
`{"Classification":"presto-connector-hive",`
`"Properties":{`
`"hive.s3.pin-client-to-current-region":"true"`
`},`
`"Configurations":[]`
`}`
`]`
8. 准备jar包
需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
9. 上传文件
在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。
由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。
cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
如果在手动流程中已经上传了日志文件,则不必再上传。
第三步,执行程序
暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:
1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。
2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。
集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-14.png)
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-15.png)
如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.
如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
第四步,删除资源
为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。
#### 总结
本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
**作者介绍:**
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/HanXiaoyong-mini.jpg)
韩小勇
AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。
复制代码


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/amazon-emr/


2019-11-20 08:00734

评论

发布
暂无评论
发现更多内容

大数据培训机构学习会不会有点难?

小谷哥

兆骑科创创新创业服务平台,投融资对接,线上直播路演

兆骑科创凤阁

阿里云 ACK One 多集群管理全面升级:多集群服务、多集群监控、两地三中心应用容灾

阿里巴巴云原生

阿里云 容器 分布式 云原生 集群

TDengine 3.0 三大创新详解

TDengine

数据库 tdengine 时序数据库

一箭双雕!刷完阿里P8架构师spring学习笔记+源码剖析,涨薪8K

Geek_Yin

编程 程序员 springboot #java Spring Java

腾讯云大神亲码“redis深度笔记”,不讲一句废话,全是精华

Geek_Yin

编程 程序员 架构师 #java redis 底层原理

数字藏品APP开发定制

开源直播系统源码

软件开发 数字藏品 数字藏品开发 数字藏品系统

2022秋招,Java岗最全面试攻略,吃透25个技术栈Offer拿到手软

Geek_Yin

程序人生 java面试 程序猿 #java Java面试八股文

MobTech MobLink功能说明及应用创建

MobTech袤博科技

开发 短链接 跳转访问网页

什么样的人适合参加Web前端培训

小谷哥

开源一夏 |log4j2漏洞复现及修复

六月的雨在InfoQ

开源 Log4j 2 Log4j2 漏洞 8月月更

泛谈传统运营商借鉴电商模式

鲸品堂

电商 运营商 通信运营商 电信运营商

我用开天平台做了一个字符串检查API,hin 简单~~

华为云开发者联盟

云计算 API 华为云

学习WEB前端去哪里比较好

小谷哥

发展场景金融需要重视生态能力建设,加深对场景的渗透程度

易观分析

金融 客户 场景生态建设

“智慧”有为!AntDB数据库助力某省高速率先完成自主可控建设

亚信AntDB数据库

AntDB 国产数据库 aisware antdb

数字无限 云领未来 | 华为云&赛意信息线上直播约定您

Geek_2d6073

死磕它七年“腾讯限量版”Java架构笔记,要个40k不过分吧?

Geek_Yin

编程 程序员 涨薪 架构师 #java

2021年中国智能驾驶行业洞察

易观分析

自动化 智能驾驶

低学历能通过Java培训学习吗?

小谷哥

狂刷《Java权威面试指南(阿里版)》,冲击“金九银十”有望了

Geek_Yin

阿里 Java 面试 架构师 #java 程序员面试、

【数据结构实践】简单实现Python自定义队列

迷彩

数据结构 算法 队列 8月月更

向量数据库公司 Zilliz 完成 6000 万美元 B+ 轮融资

Zilliz

融资 数据库·

华为云CDN同舟计划

科技云未来

一个月闭关直接面进阿里P7,这份Java面试指导手册是真的牛逼

收到请回复

Java 程序员 语言 & 开发

RadonDB MySQL Kubernetes 2.2.1 发布!

RadonDB

MySQL 数据库 Kubernetes RadonDB

行业分析| 调度行业未来趋势

anyRTC开发者

音视频 调度 快对讲 语音对讲 视频对讲

兆骑科创承办创业赛事活动,双创服务,创业服务平台

兆骑科创凤阁

太牛了!这份什么神仙级Spring Cloud Alibaba全套笔记,从入门到实战,全方位讲解微服务技术栈!

Geek_Yin

程序员 阿里 架构师 spring cloud alibaba #java

怎么选择合适自己的web编程培训机构?

小谷哥

牛客网论坛最具争议的Java面试成神笔记,GitHub已下载量已过百万

Geek_Yin

编程 程序员 牛客网 #java Java面试八股文

手把手教你使用Amazon EMR进行交互式数据查询_其他_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章