在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:
什么是EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
具体参考: [](https://amazonaws-china.com/cn/documentation/elastic-mapreduce/)
什么是S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。
具体参考:[](https://amazonaws-china.com/cn/documentation/s3/)
什么是RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
具体参考:[](https://amazonaws-china.com/cn/documentation/rds/)
**准备工作**
1. Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart
4. 注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。
**手动方式完成交互式数据查询**
第一步,创建一个EMR集群,方法如下:
1. 进入到AWS的控制台,选择EMR服务,点击创建集群。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-1.png)
2. 点击转到高级选项
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-2.png)
3. 软件配置
选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-3.png)
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-4.png)
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-5.png)
6. 安全选项
在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-6.png)7. 修改安全组规则,并登录EMR的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-7.png)
第二步,创建数据表并进行查询
1. SSH到主节点后,执行hive命令,进入到hive命令行界面
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-8.png)
2. 创建一个用日期作为分区的hive表,用来作为最终被查询的表
将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下
以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。
假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-9.png)
向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是INSERT语句执行过程的显示信息。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-10.png)
完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:
执行quit, 退出hive命令行程序。
然后执行以下语句进入presto命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-11.png)
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-12.png)
然后退出presto命令行工具,输入 quit;
第四步,删除EMR集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-13.png)
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
**自动方式完成交互式数据查询**
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从/log目录向/logbydate目录中copy特定日期的文件。
– 创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。
– 从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。
– 使用presto从cloudfrontlogpart表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将hive元数据放在集群的外部。
在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。
针对以上的几个需求,在EMR中对应的解决方法如下.
1. 使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用EMR的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate目录下的数据
2) s3://testcloudfrontlog/logpart目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本loganaly.sh
`#!/bin/bash`
`# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。`
`KEYNAME=yourkeyname`
`# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR集群产生的日志所存放的s3目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的hive表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的mysql的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`
`# 创建emr集群,名字是loganaly, 其中:`
`# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。`
`# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
`--use-default-roles \`
`--ec2-attributes KeyName=$KEYNAME \`
`--termination-protected \`
`--auto-terminate \`
`--configurations $CONFIGFILE \`
`--enable-debugging \`
`--log-uri $LOGURI \`
`--steps \`
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
2. 准备cpjar.sh脚本
作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。
在国外region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
在北京Region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
3. 准备log2logbydate.sh脚本
做用是将原始日志文件copy到按天划分的目录下。
`#!/bin/bash`
`LOGSOURCEDIR=$1`
`STAGINGDIR=$2`
`DATE=$3`
`DATEE=$4`
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
4. 准备hivetables.q脚本
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`LOCATION '${PARTDIRh}';`
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
`tblproperties("skip.header.line.count"="2");`
`--make dynamic insert available`
`set hive.exec.dynamic.partition.mode=nonstrict;`
`--insert data.`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog${DATEh};`
`--delete the staging table`
`DROP TABLE cloudfrontlogstaging${DATEh};`
5. 准备presto2s3.sh脚本
作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。
`#!/bin/bash`
`TIME=$(date +%H%M%S)`
`SQOOPFILE=$1`
`DATE=$2`
`DATEE=$3`
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
6. 准备s3tomysql.sh脚本
作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。
`#!/bin/bash`
`JDBCURL=$1`
`DBUSER=$2`
`DBPASS=$3`
`SQOOPFILE=$4`
`DATE=$5`
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
7. 准备emrconf.json脚本
作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`}`
`]`
注意:如果是在北京Region, emrconf.json使用以下脚本
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`},`
`{"Classification":"presto-connector-hive",`
`"Properties":{`
`"hive.s3.pin-client-to-current-region":"true"`
`},`
`"Configurations":[]`
`}`
`]`
8. 准备jar包
需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
9. 上传文件
在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。
由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。
cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
如果在手动流程中已经上传了日志文件,则不必再上传。
第三步,执行程序
暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:
1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。
2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。
集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-14.png)
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-15.png)
如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.
如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
第四步,删除资源
为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。
#### 总结
本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
**作者介绍:**
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/HanXiaoyong-mini.jpg)
韩小勇
AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。
在按照本文档进行操作之前,读者需了解S3,RDS并能够进行基本的S3,RDS的操作,读者需了解EMR的基本概念。以下是参考资料:
什么是EMR:
Amazon Elastic MapReduce (Amazon EMR) 是一种托管数据分析服务的框架,提升企业、研究人员、数据分析师和开发人员轻松、经济高效掌控海量数据的能力。其当前版本中托管的服务包括:Hadoop, Zeppelin, Tez, Ganglia, HBase, Pig, Hive, Presto, ZooKeeper, Sqoop, Mahout, Hue, Phoenix, Oozie, Spark, Hcatalog. EMR让您专注于数据分析,无需担心费时的集群设置、管理或调整,也无需担心所需要的计算能力。
具体参考: [](https://amazonaws-china.com/cn/documentation/elastic-mapreduce/)
什么是S3:
Amazon Simple Storage Service (Amazon S3) 为开发人员和 IT 团队提供安全、耐用且高度可扩展的对象存储。S3 可为EMR提供文件存储服务。
具体参考:[](https://amazonaws-china.com/cn/documentation/s3/)
什么是RDS:
Amazon Relational Database Service (Amazon RDS) 是一种可让用户在云中轻松设置、操作和扩展关系数据库的 Web 服务。 它在承担耗时的数据库管理任务的同时,又可提供经济高效的可调容量,使您能够腾出时间专注于应用程序开发。Amazon RDS 让您能够访问非常熟悉的 MySQL、PostgreSQL、Oracle 或 Microsoft SQL Server 等数据库引擎的功能。
具体参考:[](https://amazonaws-china.com/cn/documentation/rds/)
**准备工作**
1. Cloudfront生成的日志已经存储在s3桶中,并在不断更新, 存储目录是:s3://testcloudfrontlog/log
其中testcloudfrontlog是s3存储桶的名字,在实际操作的时候,需要换一个名字,因为s3存储桶的名字是全局唯一的, 而其他人也有可能使用了这个名字。
可以从以下链接下载本例中使用的示例文件,并上传到s3://testcloudfrontlog/log目录下。
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
2. 创建一个目录来存储按照日期划分的日志数据。 目录是s3://testcloudfrontlog/logbydate
3. 创建一个目录用来做hive表的数据存储,目录是s3://testcloudfrontlog/logpart
4. 注意:直接copy本文的代码有可能会由于字符原因出现错误,建议先copy到纯文本编辑器中再执行。
**手动方式完成交互式数据查询**
第一步,创建一个EMR集群,方法如下:
1. 进入到AWS的控制台,选择EMR服务,点击创建集群。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-1.png)
2. 点击转到高级选项
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-2.png)
3. 软件配置
选择EMR发行版本以及所需要的软件, 在本例中,我们选择emr-5.0.0版本,所需要的工具选择hadoop, hive, presto, sqoop。 本步骤中的其余选项使用默认值,然后点击下一步。如果是使用北京Region,在输入配置中输入以下内容,使得presto可访问s3, 如果使用其他Region,不必输入。
`[{"classification":"presto-connector-hive", "properties":{"hive.s3.pin-client-to-current-region":"true"}, "configurations":[]}]`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-3.png)
4. 硬件配置
进入到硬件配置界面,默认配置如下,直接使用默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-4.png)
5. 一般选项
在一般选项的集群名称后面输入一个名字,作为集群的名字。其余的可按照默认配置。然后点击下一步。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-5.png)
6. 安全选项
在EC2键对后面的框中选择一个已有的键对,该键对用来在集群创建成功后,从SSH客户端登录到集群中的任意一台服务器。如果选择“在没有EC2键对的情况下继续”,则后续不能登录到集群中的机器。其余选项均可默认。然后点击创建集群。![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-6.png)7. 修改安全组规则,并登录EMR的主节点
进入到刚刚创建的集群的信息界面,点击主节点安全组,进入到该安全组的配置界面,在入规则中增加SSH的访问规则,这样才可以通过SSH的方式从外部机器登录到主节点。然后通过任意一个SSH客户端登录到主节点,目标地址是图中所示的主节点共有DNS, 用户名是hadoop, 通过私钥登录,私钥与前面所提到的键对对应。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-7.png)
第二步,创建数据表并进行查询
1. SSH到主节点后,执行hive命令,进入到hive命令行界面
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-8.png)
2. 创建一个用日期作为分区的hive表,用来作为最终被查询的表
将以下脚本copy到hive>提示符下执行,注意LOCATION的参数需要改成你自己的目录。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`STORED AS PARQUET`
`LOCATION 's3://testcloudfrontlog/logpart';`
3. 输入quit命令,退出hive。用aws s3命令将s3://testcloudfrontlog/log中日志copy到s3://testcloudfrontlog/logbydate中按照时间划分的目录下
以2016-07-11这一天的文件为例,命令如下,注意,s3目录需要改成你自己的。
`aws s3 cp s3://testcloudfrontlog/log/ s3://testcloudfrontlog/logbydate/2016-07-11/ --exclude "*" --include "*.2016-07-11*" --recursive`
这里用到了aws s3命令行工具。你可以在EMR主节点中退出hive命令行程序,然后执行以上命令。或者在任意一个安装了aws cli工具并配置了s3访问权限的机器中执行。本例中直接在EMR的主节点中执行。aws s3 cp不支持通配符,所以用–exclude 和 –include 参数来代替。
4. 针对s3://testcloudfrontlog/logbydate/2016-07-11/ 中的数据,创建一个HIVE表。
假设表的名字是cloudfrontlog20160711, 输入hive, 重新进入到hive命令行工具,并输入以下语句,注意LOCATION的参数需要改成你自己的目录。
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog20160711 (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION 's3://testcloudfrontlog/logbydate/2016-07-11/'`
`tblproperties("skip.header.line.count"="2");`
5. 数据注入
至此,已经创建了两个hive表,通过在hive命令行工具中执行 show tables命令,可以查看到两个hive表。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-9.png)
向带分区的hive表中注入7月11日的数据,在hive命令行界面中分别执行以下命令:
`set hive.exec.dynamic.partition.mode=nonstrict;`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog20160711;`
以下是INSERT语句执行过程的显示信息。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-10.png)
完成后,如果进入到s3://testcloudfrontlog/logpart目录下,可以查看到已经生成了按日期划分的目录,目录下存储了文件。
第三步,数据查询
可以继续使用hive做查询,也可以进入presto做查询。这里,我们使用presto, 由于presto完全使用内存进行计算, 速度更快。进入presto的方式如下:
执行quit, 退出hive命令行程序。
然后执行以下语句进入presto命令行界面:
`presto-cli --catalog hive --schema default`
该语句表示presto使用hive数据源,并且使用hive数据源中的default数据库。细节请参考presto的社区文档。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-11.png)
执行一个简单的查询:
`SELECT time, scbytes, cip FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('2016-07-11' AS varchar) LIMIT 5;`
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-12.png)
然后退出presto命令行工具,输入 quit;
第四步,删除EMR集群
在集群列表中,选中刚刚创建的集群,点击终止,以终止该集群。如果开启的终止保护,需要变更一下终止保护的状态,然后再终止。
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-13.png)
至此, 我们通过手动的方式完成了一个简单的数据查询。但在实际生产环境中,使用手动的方式会耗费很长时间做重复性的工作,并难免出错。EMR更大优势是能够通过程序的方式去控制集群的创建以及任务的执行,这使得EMR的使用者能够将集群创建以及数据分析的过程自动化。
接下来的部分将以相同的示例指导读者一步步的实现自动化的创建集群、分析数据、转存结果、关闭集群。
**自动方式完成交互式数据查询**
首先概括几点自动化执行数据分析的需求:
1. 集群在每天的固定时间被创建,然后对数据进行分析,然后集群被自动删除。这显然不能通过图形界面进行一步步的操作了。
2. 在手动操作中执行的每个步骤,需要按顺序自动化执行。这些步骤包括:
– 从/log目录向/logbydate目录中copy特定日期的文件。
– 创建对应特定日期的hive表,例如表名为cloudfrontlog20160711。
– 从cloudfrontlog20160711向cloudfrontlogpart表中注入数据。
– 使用presto从cloudfrontlogpart表中查询出需要的数据。
– 此外,在手动执行的最后一步,我们可以直接看到查询结果,但在自动化执行的过程中,我们需要将查询结果存储到一个长期运行的数据库中,供随时查询。
3. 将hive元数据放在集群的外部。
在手动执行的流程中,创建hive表后,hive的元数据存储在了主节点。而在自动化执行的过程中,当所有任务执行完毕后,集群被删除,存储在主节点的元数据也会被删除,因此要在外部数据库中存储hive的元数据。
针对以上的几个需求,在EMR中对应的解决方法如下.
1. 使用EMR的命令行进行各种集群的操作,例如集群的创建,参数的设置等。
2. 使用EMR的“step”来组织各个任务的执行。
3. 创建一个外部的数据库用来存储hive元数据,并在集群创建的时候指定元数据的存储位置。
接下来详细描述操作步骤和脚本
第一步,准备工作
1. 准备两个mysql数据库分别用来存储hive元数据和查询结果,可以使用AWS的RDS服务来创建。这两个数据库都需要能被EMR访问到, 这两个数据库也可以使用同一个物理服务器或虚拟机。
2. 假设存储查询结果的数据库名字是loganalydb,我们在该数据库中创建一个表,用来存储结果数据,表的名字是loganalytb。根据后面所进行的查询,使用如下语句创建数据库以及与查询结果匹配的表:
`CREATE DATABASE loganalydb;`
`USE loganalydb;`
``CREATE TABLE `loganalytb` ( `id` int(11) NOT NULL AUTO_INCREMENT, `filepath` varchar(300) DEFAULT NULL, `totalbyte` bigint(20) DEFAULT NULL, `tdate` varchar(50) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=58147 DEFAULT CHARSET=utf8;``
对于存储hive元数据的数据库,暂时不必创建,从后面提到的emrconf.json文件可以看到,只需要给出数据库服务器的地址,以及用户名和密码即可,数据库不存在的话,会自动创建。
3. 由于自动化的流程与前面提到的手动流程使用相同的示例,因此,请事先删除手动流程示例中产生的数据。包括:
1) s3://testcloudfrontlog/logbydate目录下的数据
2) s3://testcloudfrontlog/logpart目录下的数据
第二步,编写脚本
下面给出各个脚本,并配以注释解释
1. 主程序脚本loganaly.sh
`#!/bin/bash`
`# 变量KEYNAMES是你的SSH key的名字, 用来通过SSH方式访问EC2。`
`KEYNAME=yourkeyname`
`# 变量CONFIGFILE是emrconf.json的url地址,emrconf.json这个文件中包含了存储hive元数据的外部数据库的信息。将该文件存在s3中并让这个文件能够从外部访问到。emrconf.json中的内容在下文中会给出。`
`CONFIGFILE=https://s3-us-west-2.amazonaws.com/testcloudfrontlog/conf/emrconf.json`
`# EMR集群产生的日志所存放的s3目录。`
`LOGURI=s3://testcloudfrontlog/emrlog/`
`# 脚本、配置文件、jar包所在的目录`
`CODEDIR=s3://testcloudfrontlog/conf`
`# cloudfront日志的存放位置, 结尾别加 /`
`LOGSOURCEDIR=s3://testcloudfrontlog/log`
`# 当天日志的中转目录, 结尾别加 /`
`STAGINGDIR=s3://testcloudfrontlog/logbydate`
`# 昨天的日期, 形如:20160711`
`DATE=$(date -d "yesterday" +%Y%m%d)`
`# 昨天的日期,另外一种格式, 形如:2016-07-11`
`DATEE=$(date -d "yesterday" +%Y-%m-%d)`
`# 被查询的hive表的文件存储位置, 结尾别加 /`
`PARTDIR=s3://testcloudfrontlog/logpart`
`# 用来存储结果文件的目录, 结果文件将被sqoop使用,数据会被传到mysql.`
`SQOOPFILE=s3://testcloudfrontlog/sqoopfile`
`#用来存储结果数据的mysql的信息。`
`DBHOST=rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com`
`JDBCURL=jdbc:mysql://rdsinstance.xxxxxxx.us-west-2.rds.amazonaws.com/loganalydb`
`DBUSER=username`
`DBPASS=password`
`# EMR集群的配置信息, 这里是以Oregan region为例。如果使用的是北京region,AWSREGION用cn-north-1。`
`AWSREGION=us-west-2`
`MASTERTYPE=m3.xlarge`
`CORETYPE=r3.xlarge`
`TASKTYPE=r3.xlarge`
`MASTERNUM=1`
`CORENUM=1`
`TASKNUM=1`
`# 删除当天的数据,目的是防止脚本在同一天被多次执行而造成数据冗余,`
`aws s3 rm $PARTDIR/datee=$DATEE --recursive`
`aws s3 rm $SQOOPFILE/$DATE --recursive`
`mysql -h$DBHOST -u$DBUSER -p$DBPASS --execute "DELETE FROM loganalydb.loganalytb WHERE tdate='$DATEE'"`
`# 创建emr集群,名字是loganaly, 其中:`
`# --auto-terminate参数表示该集群在执行完所有的任务后自动删除。`
`# --configurations参数的文件中的参数配置覆盖了该集群运行起来后的默认参数配置。在本例中用来修改Hive元数据的存储位置。`
`# --step参数规定了该集群在创建后要执行的几个任务,其中Type=Hive的step, 需要给出包含hive语句的文件作为参数。而Type=CUSTOM_JAR的step,需要给出一个JAR包,这里我们使用EMR提供scrip-runner.jar,它的作用是执行其第一个参数中指定的脚本文件,并将其余的参数作为脚本文件的输入参数。`
`# --instance groups参数规定了集群的中各节点的机型和数量`
`aws --region $AWSREGION emr create-cluster --name "loganaly" --release-label emr-5.0.0 \`
`--applications Name=Hadoop Name=Hive Name=Presto Name=Sqoop \`
`--use-default-roles \`
`--ec2-attributes KeyName=$KEYNAME \`
`--termination-protected \`
`--auto-terminate \`
`--configurations $CONFIGFILE \`
`--enable-debugging \`
`--log-uri $LOGURI \`
`--steps \`
`Type=CUSTOM_JAR,Name="cpjar",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/cpjar.sh"," $CODEDIR"] \`
`Type=CUSTOM_JAR,Name="log2staging",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/log2logbydate.sh","$LOGSOURCEDIR","$STAGINGDIR","$DATE","$DATEE"] \`
`Type=Hive,Name="HiveStep",Args=[-f,$CODEDIR/hivetables.q,-d,PARTDIRh=$PARTDIR,-d,STAGINGDIRh=$STAGINGDIR,-d,DATEh=$DATE] \`
`Type=CUSTOM_JAR,Name="Presto2s3",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/presto2s3.sh","$SQOOPFILE","$DATE","$DATEE"] \`
`Type=CUSTOM_JAR,Name="s3tomysql",Jar=$CODEDIR/script-runner.jar,Args=["$CODEDIR/s3tomysql.sh","$JDBCURL","$DBUSER","$DBPASS","$SQOOPFILE","$DATE"] \`
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,InstanceCount=$TASKNUM`
2. 准备cpjar.sh脚本
作用是将mysql的JDBC驱动包下载到本地的Sqoop目录下,sqoop在将文件转存到数据库的时候会用到JDBC驱动。
在国外region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/`
在北京Region, 使用以下脚本:
`#!/bin/bash`
`CODEDIR=$1`
`sudo aws s3 cp $CODEDIR/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/ --region cn-north-1`
3. 准备log2logbydate.sh脚本
做用是将原始日志文件copy到按天划分的目录下。
`#!/bin/bash`
`LOGSOURCEDIR=$1`
`STAGINGDIR=$2`
`DATE=$3`
`DATEE=$4`
`aws s3 cp $LOGSOURCEDIR/ $STAGINGDIR/$DATE/ --exclude "*" --include "*.$DATEE*" --recursive`
4. 准备hivetables.q脚本
作用包括:创建待查询的表和按天命名的临时表,将临时表中数据注入到待查询的表中,并删除临时表。
`CREATE TABLE IF NOT EXISTS cloudfrontlogpart (`
`time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`PARTITIONED BY (datee Date)`
`LOCATION '${PARTDIRh}';`
`CREATE EXTERNAL TABLE IF NOT EXISTS cloudfrontlog${DATEh} (`
`date1 Date, time STRING, xedgelocation STRING, scbytes INT, cip STRING, csmethod STRING, csHost STRING, csuristem STRING, scstatus INT, csReferer STRING, csUserAgent STRING, csuriquery STRING, csCookie STRING, xedgeresulttype STRING, xedgerequestid STRING, xhostheader STRING, csprotocol STRING, csbytes STRING, timetaken STRING, xforwardedfor STRING, sslprotocol STRING, sslcipher STRING, xedgeresponseresulttype STRING`
`)`
`ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'`
`LOCATION '${STAGINGDIRh}/${DATEh}/'`
`tblproperties("skip.header.line.count"="2");`
`--make dynamic insert available`
`set hive.exec.dynamic.partition.mode=nonstrict;`
`--insert data.`
`INSERT INTO TABLE cloudfrontlogpart PARTITION (datee)`
`SELECT time, xedgelocation, scbytes, cip, csmethod, csHost, csuristem, scstatus, csReferer, csUserAgent, csuriquery, csCookie, xedgeresulttype, xedgerequestid, xhostheader, csprotocol, csbytes, timetaken, xforwardedfor, sslprotocol, sslcipher, xedgeresponseresulttype, date1`
`FROM cloudfrontlog${DATEh};`
`--delete the staging table`
`DROP TABLE cloudfrontlogstaging${DATEh};`
5. 准备presto2s3.sh脚本
作用包括:查询hive表,并将结果存成.csv格式的文件,将该文件上传到S3中相应的目录下。
`#!/bin/bash`
`TIME=$(date +%H%M%S)`
`SQOOPFILE=$1`
`DATE=$2`
`DATEE=$3`
`sudo presto-cli --catalog hive --schema default --execute "SELECT NULL as id, csuristem, SUM(scbytes) as totalbyte, CAST('$DATEE' AS varchar) as date FROM cloudfrontlogpart WHERE CAST(datee AS varchar) = CAST('$DATEE' AS varchar) GROUP BY csuristem order by totalbyte desc" --output-format CSV > ~/cdnfilestat.csv`
`aws s3 cp ~/cdnfilestat.csv $SQOOPFILE/$DATE/cdnfilestat$TIME.csv`
6. 准备s3tomysql.sh脚本
作用是利用sqoop将s3中的.csv文件中的内容转存到数据库中。注意,如果单次转存的数据量大,你可能需要调大数据库的max_allowed_packet参数。
`#!/bin/bash`
`JDBCURL=$1`
`DBUSER=$2`
`DBPASS=$3`
`SQOOPFILE=$4`
`DATE=$5`
`sqoop export --connect $JDBCURL --username $DBUSER --password $DBPASS --table loganalytb --fields-terminated-by ',' --enclosed-by '\"' --export-dir $SQOOPFILE/$DATE/`
7. 准备emrconf.json脚本
作用是使得创建起来的EMR集群的hive元数据存储在外部数据库中。注意:根据准备工作中创建的数据库来修改文件中的ConnectionUserName、ConnectionPassword、ConnectionURL三个参数。
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`}`
`]`
注意:如果是在北京Region, emrconf.json使用以下脚本
`[`
`{"Classification":"hive-site",`
`"Properties":{`
`"javax.jdo.option.ConnectionUserName":"username",`
`"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver",`
`"javax.jdo.option.ConnectionPassword":"password",`
`"javax.jdo.option.ConnectionURL":"jdbc:mysql://rdsinstance.xxxxxxxx.us-west-2.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true"`
`},`
`"Configurations":[]`
`},`
`{"Classification":"presto-connector-hive",`
`"Properties":{`
`"hive.s3.pin-client-to-current-region":"true"`
`},`
`"Configurations":[]`
`}`
`]`
8. 准备jar包
需要两个jar,一个是script-runner.jar,下载地址: http://s3.amazonaws.com/elasticmapreduce/libs/script-runner/script-runner.jar
另一个是mysql的JDBC驱动,下载地址: https://s3-us-west-2.amazonaws.com/hxyshare/mysql-connector-java-5.1.38-bin.jar
9. 上传文件
在s3中创建s3://testcloudfrontlog/conf/目录,并将第8步中的两个jar包,以及cpjar.sh,log2staging.sh,hivetables.q,presto2s3.sh,s3tomysql.sh,emrconf.json,上传到该目录。
由于emrconf.json需要通过http的方式访问到,在s3中将emrconf.json的访问权限增加“所有人可下载”。
cloudfront日志文件copy到s3://testcloudfrontlog/log目录下, 两个示例文件下载地址:
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-11-02+.36b1a433.gz
https://s3-us-west-2.amazonaws.com/hxyshare/cloudfrontlog/E36NLFLFEN3X0H.2016-07-12-21.a92ddc55.gz
如果在手动流程中已经上传了日志文件,则不必再上传。
第三步,执行程序
暂时将主程序loganaly.sh中的DATE和DATEE参数修改为示例数据的时间,例如,分别写成20160711和2016-07-11,在任意一个Linux系统中运行主程序脚本loganaly.sh(例如,可以使用EC2实例)。但需注意:
1) 该机器需要安装了AWS命令行工具,并具有s3和EMR的操作权限。
2) 该机器能访问到存储数据结果的数据库,因为loganaly.sh中有对该数据库的操作。
集群创建成功后,自动执行每个step中的任务,所有任务执行完成后自动关闭,从下图中可以看到每个Step的执行:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-14.png)
所有任务执行完成后,进入到存储查询结果的数据库,查看输入的结果:
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/0927-15.png)
如果要想每天执行loganaly.sh脚本并对前一天的数据进行处理和分析,将loganaly.sh中的DATE和DATEE分别赋值为 $(date -d “yesterday” +%Y%m%d) 和 $(date -d “yesterday” +%Y-%m-%d),然后创建一个crontab定时任务,每天定时执行loganaly.sh.
如果是在AWS中国以外的region执行,还可以利用竞价实例来大幅的降低成本,使用竞价实例的方法也非常简单,只需要在将loganaly.sh中对创建EMR集群的脚本稍做修改,增加BidPrice参数:
`--instance-groups \`
`Name=Master,InstanceGroupType=MASTER,InstanceType=$MASTERTYPE,BidPrice=0.2,InstanceCount=$MASTERNUM \`
`Name=Core,InstanceGroupType=CORE,InstanceType=$CORETYPE,BidPrice=0.2,InstanceCount=$CORENUM \`
`Name=Task,InstanceGroupType=TASK,InstanceType=$TASKTYPE,BidPrice=0.2,InstanceCount=$TASKNUM`
第四步,删除资源
为避免额外花费,删除本实验过程中(包括手动过程以及自动过程中)创建的资源,S3, EC2等资源。
#### 总结
本文中使用Cloudfront日志进行分析,但本文中使用的方法稍作修改便适用于其他类型的日志类文件的分析。本文中主要使用了EMR中的Hive,Presto,Sqoop工具,但EMR还有更多的工具(例如Spark)可供用户使用,用户在创建集群的时候增加相应的服务即可实现丰富的功能。
**作者介绍:**
![](https://s3.cn-north-1.amazonaws.com.cn/images-bjs/HanXiaoyong-mini.jpg)
韩小勇
AWS解决方案架构师,负责基于AWS的云计算方案架构咨询和设计,实施和推广,在加入AWS之前,从事电信核心网系统上云的方案设计及标准化推广 。
本文转载自 AWS 技术博客。
原文链接:
https://amazonaws-china.com/cn/blogs/china/amazon-emr/
更多内容推荐
软件测试 / 测试开发 / 人工智能丨 Python 算术运算符
在编写程序时,可以使用算术运算符来进行基本的数学计算。Python 中的算术运算符包括加法、减法、乘法、除法、取模和幂运算。
2023-11-22
图文详解丨 iOS App 上架全流程及审核避坑指南
图文详解丨iOS App上架全流程及审核避坑指南
2023-06-09
AJAX - 创建 XMLHttpRequest 对象
AJAX(异步JavaScript和XML)是一种在Web应用程序中创建快速动态更新的技术。使用AJAX,Web应用程序可以异步地向服务器发送和接收数据,而无需刷新整个页面。 AJAX广泛用于Web应用程序中,包括社交媒体,电子商务,在线游戏等等。
2023-07-28
全新 – Amazon EC2 R6a 实例由第三代 AMD EPYC 处理器提供支持,适用于内存密集型工作负载
我们在 Amazon re:Invent 2021 上推出了通用型 Amazon EC2 M6a 实例,并于今年 2 月推出了计算密集型 C6a 实例。这些实例由运行频率高达 3.6 GHz 的第三代 AMD EPYC 处理器提供支持,与上一代实例相比,性价比提高多达 35%。
2023-05-08
44|技术演进(下):软件架构和应用生命周期技术演进之路
今天,我会介绍下应用维度和应用生命周期管理维度的技术演进。
2021-09-04
Spark On YARN:Client 模式与 Cluster 模式
2021-01-07
交易所搭建,交易所源码是怎么开发的?
交易所搭建和交易所源码的开发涉及多个阶段,以下是对这些阶段的概述:
2023-09-06
可观测(下): 如何构建多维度视角下的 Serverless 监测体系?
这两节课,我们一直在讨论函数计算平台下可观测体系的解决方案。我们可以按照可观测中指标、日志和链路这三要素的架构去构建解决方案。
2022-09-28
Spark Structured Streaming 原理及实战:Structured Streaming 概念、特点、数据模型和应用实战
2020-12-24
使用 Sentieon 加速甲基化 WGBS 数据分析
全基因组甲基化测序(WGBS)是一种研究DNA甲基化的方法,以全面了解在基因组水平上的表观遗传变化。在进行WGBS数据分析时,通常需要使用专门的比对工具,因为这些工具需要能够处理亚硫酸盐转化后的数据。
2023-08-25
软件测试|华新学院在 2022 年全国大学生“火焰杯”软件测试高校就业选拔赛取得佳绩
近期,华新学院信工学院在全国大学生“火焰杯”软件测试高校就业选拔赛中取得了杰出的成绩。
2023-10-23
欢迎提报 | 2023 年龙蜥社区优秀贡献者评选正式启动
龙蜥社区为了表示对过去一年开发者们杰出贡献的认可,2023 年龙蜥社区优秀贡献者评选正式启动。奖项多种,截至日期至 11 月 30 日,欢迎大家提报。
2023-11-17
pandas 数据导入
2022-09-08
选型:不同阶段的数据应如何存储?
今天提到的MySQL、PostgreSQL、Redis、Doris、Etcd等,都是我们思考的一个具象化的表达而已。我们更关心的,应该是构建一个系统的思维。
2022-09-30
加速中产 “返贫” 的 4 个迹象
没有消息,就是好消息。这话放在现在的朋友圈子里,似乎很合适。最近接到两个朋友的电话,一个是朋友的诉苦电话,这位朋友曾是某大厂的高管,被裁后失业近1年,虽然当初赔了N+1,但架不住这位朋友“房贷近千万、配偶不工作、孩子送出国”,即传说中的“中产作
2023-08-07
苹果账号被禁用怎么办?
当我们使用苹果手机登录App Store时,有时会遇到账号被禁用的提示。总结下来,
2023-08-11
Spring 高手之路 3——揭秘 Spring 依赖注入和 SpEL 表达式
在本文中,我们深入探讨了Spring框架中的属性注入技术,包括setter注入、构造器注入、注解式属性注入,以及使用SpEL表达式进行属性注入。我们通过XML和注解两种方式,详细讲解了如何进行属性注入,并给出了完整的代码示例。
2023-07-21
面部表情识别的伦理问题
面部表情识别是一项涉及隐私和伦理的技术,其在应用过程中可能会引发一系列伦理问题。本文将探讨面部表情识别的伦理问题,包括隐私保护、种族和性别偏见、情绪识别准确率等方面。
2023-08-04
一文详解 SpEL 表达式注入漏洞
本文介绍了SpEL表达式以及常见的SpEL注入攻击,详细地介绍了部分漏洞攻击实例以及常用的漏洞检测与防御手段。
2023-02-21
推荐阅读
基于 RAG 构建生成式 AI 应用最佳实践与“避坑指南” | QCon
4. SparkSQL 整合其它外部数据源
2023-09-08
开篇词|从企业级项目开始,进阶推荐系统
2023-04-10
Ableton Live 12 Suite for mac(音乐制作工具)
2024-12-17
华为云耀云服务器 L 实例:数字化转型的得力伙伴
2023-11-26
7. 集群自动扩缩容:Cluster Autoscaler
2023-09-26
淘宝商品信息的 API 接口获取方式是什么?
2023-11-26
电子书
大厂实战PPT下载
换一换 赵晨阳 | 矩阵起源 研发副总裁
杨军 | 腾讯 IEG 技术运营部 SRE 总监
熊飞 | 经纬中国 合伙人
评论