随着时间的推移,大数据分析一词已经慢慢从一个概念进入到了实际应用的阶段,近年来,越来越多的企业已经不满足于定期批量地分析任务,为了更好地利用不断产生的业务数据,做出更快更有效的业务决策,实时数据分析已经成为了诸如电商,网站,交易平台的标准需求。
另一方面,自建实时分析平台无论从成本还是对技术人员的要求都相当巨大,大量的技术人员更喜欢使用 SQL 而非代码的方式来挖掘数据,基于此,AWS 提供的 Kinesis Analytics 服务很好地满足了用户的需求,通过简单的 SQL 语句及内置的丰富函数,用户能够快速地搭建起适合自身业务场景的实时分析解决方案,所有的底层设施由 AWS 来维护,保证服务的高可用性,用户可以把更多地精力放在如何挖掘和分析数据的工作上。
本文以实际的应用场景出发,介绍如何通过使用 AWS 的服务,实时分析 CloudFront 的访问日志,同时,借助于 DynamoDB 的全局表功能,使得用户能够利用支持 Kinesis Analytics 的区域对日志进行分析,并把分析结果同步回用户现在使用的区域的 DynamoDB 表中,帮助那些使用尚未支持 Kinesis Analytics 区域的用户也能够使用并享受到该服务带来的便捷。
下面我们来看下方案的架构:
如上图所示:
· CloudFront 会定期将用户的访问日志保存到用户设置的 S3 存储桶中。
· 日志在 S3 上生成后会自动触发 Lambda 函数。
· Lambda 函数对日志进行解压缩,并将日志内容打入 Kinesis Stream 流中。
· Kinesis Analytics 从 Kinesis Stream 流中获取日志数据并进行实时分析。
· Kinesis Analytics 的分析结果会发送到另一个 Lambda 函数。
· 该 Lambda 函数将分析结果处理后存入本区域的 DynamoDB 表中。
· 由于启用了 DynamoDB 全局表,分析结果会自动同步到其他区域的 DynamoDB 表中。
· 在用户业务所在的区域中,通过使用 ALB,EC2 搭建简单的 web 应用就可以对 DynamoDB 表中的数据进行实时的分析展现。
本文假设用户业务所在的区域是 Ohio,由于 Ohio 尚未支持 Kinesis Analytics 服务,因而将所有分析组件部署在 Virginia 区域,仅仅将分析结果同步回 Ohio 的 DynamoDB 表中,另外,对于图中的 ALB,EC2 部分,本文不做详细的搭建说明,用户可以使用自己熟悉的开发语言进行展示页面的开发,也可以使用 API GW,Lambda 搭建无服务器的展示系统。
那下面我们就来看看如何快速地搭建这样一个实时分析平台吧。
第一步 准备工作
首先开启 CloudFront 的日志功能。
创建 Lambda 函数,用于解析 CloudFront 日志并将日志打入 Kinesis 流中,Lambda 代码可从如下地址下载:
https://github.com/iwasnobody/cflogs2analytics2ddb/blob/master/CFLogs2Kinesis.py
进入 CloudFront 日志保存的 S3 桶。
设置当有新日志文件产生的时候,触发之前创建 Lambda 函数。
创建 DynamoDB 表,用于保存 Kinesis Analytics 分析的结果。
将创建的表设置成 global table,与 Ohio 区域同步。
创建 Kinesis Stream,用于接收 Lambda 发送过来的日志。
第二步 向 Kinesis 流中打入数据
由于 CloudFront 需要有用户访问流量的情况下才会产生日志,并且产生日志的过程不是实时的,所以为了便于测试,我们通过一个工具模拟 Lambda 产生的消息打入 Kinesis 流中,大家可以从如下链接查看该工具的详情。https://github.com/awslabs/amazon-kinesis-data-generator
Kinesis Data Generator 通过 PC 端浏览器的简单配置,就能够快速地产生测试流量打入指定的 Kinesis 流中,并且支持产生一定随机的数据,下图中,我们将数据以每秒一条的速度打入 us-east-1 区域的 Kinesis 流,数据的格式模拟 Lambda 函数的输出,即:以 JSON 格式输出 CloudFront 日志的每个字段,并且 c_ip 客户端 ip 字段通过简单的设置,让工具以一定的比例生成 4 个 ip 地址中的一个,其他字段都为固定字段,测试数据可以通过让 CloudFront 产生日志后,在 Lambda 函数中输出获得,或者从如下链接获取:
https://github.com/iwasnobody/cflogs2analytics2ddb/blob/master/KinesisDataGenerator
点击发送数据,开始将数据打入 Kinesis 流中。
第三步 创建 Kinesis Analytics,对 Kinesis 流中的数据做实时分析
首先创建一个 Data Analytics 应用。
设置应用对接的数据源为之前创建的 Kinesis 流。
点击 Discover schema,让 Analytics 自动发现 Kinesis 流中已有数据的 schema。
可以看到,Analytics 可以自动帮我们识别 JSON 格式的 CloudFront 日志,但是由于我们打入的日志数据大部分数据为固定字段,所以自动识别的 schema 未必准确,比如:对于 x_edge_request_id 字段,varchar(64)可能过小,我们点击修改 schema。
将 schema 做如下调整:
点击 Save schema and update stream samples,查看各个字段类型及长度已经修改成功,点击 Exit 返回上一步。
点击 Save and Continue 继续。
下面,我们开始使用 SQL 语句对带有 schema 的流式数据做分析汇聚。
下面的 SQL 语句以 1 分钟为时间窗口,分析出 1 分钟内,访问 CloudFront 的 Top3 的用户 ip 地址,Kinesis Analytics 的窗口可以实时观察到每分钟分析的结果,非常利于调试。
再继续配置 Kinesis Analytics 将分析结果输出之前,我们先来配置 Lambda 函数,用于接收输出结果,并将结果保存到 DynamoDB 中,Lambda 代码可以从如下链接获取:
https://github.com/iwasnobody/cflogs2analytics2ddb/blob/master/Analytics2DDB.py
注意:根据输出结果的大小,需要调整 Lambda 函数的 timeout 时长。
接着我们回到 Kinesis Analytics 界面,配置应用对接的输出。
设置上面创建 Lambda 函数接收输出结果。
第四步 查看分析结果
我们可以进入到 DynamoDB 中,查看之前创建的表,不断地刷新,查看表中数据的变化。
同时进入到 Ohio 区域,查看数据已经被同步过来了。
作者介绍:
余骏
本文转载自 AWS 技术博客。
原文链接:
https://amazonaws-china.com/cn/blogs/china/use-kinesis-analytics-user-behaviour/
评论