希腊有一个著名的谷堆悖论。“如果1粒谷子落地不能形成谷堆,2粒谷子落地不能形成谷堆,3粒谷子落地也不能形成谷堆,依此类推,无论多少粒谷子落地都不能形成谷堆。但是,事实并非如此。”
这个悖论说的,就是告诉我们量变产生质变,需要一个明显的分割线。如果说,量是一个量化的数据,质是一个结论的话。那么,数据分析做的,就是要分析量,从而引向“定性”、”定质”。定量的了解历史的规律(“质”),从而预测未来。
近几年,大数据风靡全球,越来越多的企业利用 MapReduce,Hive,Spark 等计算框架和工具来为自身的业务提供帮助,在 AWS 上,我们也提供了诸多的服务,帮助用户能够快速地构建起适合自身需求的大数据分析架构,其中,Amazon Redshift 是性能优异并且完全托管的 PB 级别数据仓库服务,提供了标准 SQL 数据库访问接口,并且可以十分方便地与现有的主流商业智能数据分析工具整合,构建企业级数据仓库。
然而,大部分企业的核心数据都存储在关系型数据库中,如何能够有效地将这部分存量数据以及后续的增量数据导入 Redshift 中呢?本文介绍一种使用开源的 Apache Sqoop 工具,帮助我们轻松实现这一过程。
配置步骤:
第一步 准备工作
1.1 修改 MySQL 中的表结构
为了能够实现增量同步,需要在 MySQL 表中增加一列时间戳,该列能够自动记录行被插入更新的时间
为了能够实现同步删除操作,需要在 MySQL 表中增加一列删除记号列,应用对数据库的删除通过标记该列完成,而不是通过传统的 delete 语句,因为通常对于曾经存在过的数据,也有分析的意义
本例需要同步的表为 country,orders,user,其中 country 表为 Mycat 中的全局表,在两台 RDS mysql1 和 mysql2 中都有全部信息,orders 和 user 表为 Mycat 中的分片表,信息分布在 RDS mysql1 和 mysql2 中
mycat_sequence 表是用于记录其他表自增字段信息的功能表,无需同步到 Redshift 中分析
执行如下语句添加两列
alter table country add ifdelete boolean NOT NULL default 0;
alter table country add lastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMEST AMP;
1.2 创建 EMR 集群
注意勾选上 Hive 和 Sqoop,同时目前 AWS EMR 最新的版本为 5.4.0,其中对一些组件的版本进行了更新,不过 Hive 和 Sqoop 的版本与本文一致
注意选择相应的 VPC 和子网,子网需要有 internet 的路由方便之后 ssh 登入
选择登入的密钥对,Master 安全组使用默认的 ElasticMapReduce-master,不用修改
启动 EMR 集群后,修改 Master 节点的安全组,添加允许公网 ssh 访问
在 EMR 界面获取 master 节点 ssh 登入的信息
1.3 创建 Redshift 数据仓库
首先创建 Redshift 使用的安全组,放行所有源访问 5439 端口的权限
分别在 cn-north-1a 和 cn-north-1b 两个可用区中创建两个子网给 Redshift 使 用,由于之后会通过公网连接 Redshift,这两个子网需要有到 internet 的路由
在 Redshift 中创建子网组,选上之前创建的两个子网组
创建 Redshift 参数组
创建 Redshift 集群实例
选择之前创建的参数组,VPC,子网组和安全组,开启公网访问
获取连接 Redshift 的 JDBC 驱动及连接的 URL 信息
驱动如果无法下载,也可以从如下连接下载
https://s3.cn-north-1.amazonaws.com.cn/junyublog/RedshiftJDBC41-1.1.17.1017.jar
1.4 创建并保存 access key 和 secret access key
之后从 S3 中同步数据到 Redshift 时需要提供 access key 和 secret access key 信息,这边测试时可以全部放开权限
在 IAM 中增加一个用户并赋予权限
下载存有 access key 和 secret access key 的 CSV 文件
1.5 创建 S3 的 bucket 桶
S3 会作为 Hive 表的底层存储
第二步 创建 Hive 表
Hive 表为 RDS 到 Redshift 数据同步的中间表,底层使用 S3 作为存储,另外由于 Hive 的表名不能是 user,这里使用 users
exit; 退出 hive
第三步 安装 MySQL JDBC 驱动(可选)
下载安装 JDBC 驱动,最新版的 EMR 不需要,如果在运行 Sqoop 的时候报找不到驱动时需要手动安装
ssh 登入 EMR 的 master 节点
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.40.tar.gz
tar xzvf mysql-connector-java-5.1.40.tar.gz
cp mysql-connector-java-5.1.40/ mysql-connector-java-5.1.40-bin.jar /usr/bin/sqoop/lib/
第四步 修改 java 权限,否则执行 Sqoop job 会有 warning 和 error
vim /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.29.amzn1.86_64/jre/lib/security/java.policy
在 grant{}中添加如下语句
permission javax.management.MBeanTrustPermission “register”;
第五步 配置 Sqoop
5.1 创建 Sqoop 访问数据库的密码,XXXXXX 为创建 RDS mysql1 和 mysql2 时赋予的账号密码
echo –n “XXXXXX” > /home/hadoop/sqoop.password
5.2 创建并执行 Sqoop 任务
其中由于 country 表是全局表,所以这里只是从 mysql1 的 read replica 读副本中同步,而 user 和 orders 表由于是分片表,所以需要分别从 mysql1 和 mysql2 各自的读副本中同步数据
需要注意修改如下指令中的 URL 为自己 RDS 读副本的 URL,同时,对于 user 和 orders,两条 sqoop job 是不同的,第一条 job 中通过 hive-overwrite 参数覆盖上一次 job 执行后遗留在 Hive 表中的数据,第二条 job 是没有 hive-overwrite 参数的,否则会把上一条 job 从 mysql1 中同步的数据错误地删除
下面进行第一次同步,分别执行如下命令将 RDS 中的数据同步到 Hive 表中,第一次执行是全备,根据表中数据量,时间可能较长
sqoop job –exec mysql1_country
sqoop job –exec mysql1_user
sqoop job –exec mysql2_user
sqoop job –exec mysql1_orders
sqoop job –exec mysql2_orders
进入 Hive,查看表是否同步成功
第六步 将 Hive 表中的数据同步到 Redshift 中
使用 JDBC 客户端连接 Redshift,这里使用 SQL Workbench
分别创建 country,user,orders 表及各自的中间表,同时将 Hive 存在 S3 中的数据同步到中间表中,其中 aws_access_key_id 和 aws_secret_access_key 为准备工作中在 IAM 下载的 CSV 中的值
查看 stage 表中的信息是否同步正确
通过如下事务插入更新 country,users,orders 表,并删除中间表
查看数据是否正确插入更新到 country,users,orders 表中
第七步 执行增量同步
人为对 MySQL 中的表进行适当的增删改操作,本例对 country 表执行插入操作, 对 user 表执行插入和更新操作,对 orders 表执行删除操作,注意到时间戳为操作执行时的时间
ssh 登入 EMR 的 master 节点,再次运行 sqoop job 将 MySQL 中插入更新的数据同步到 Hive 表中
sqoop job –exec mysql1_country
sqoop job –exec mysql1_user
sqoop job –exec mysql2_user
sqoop job –exec mysql1_orders
sqoop job –exec mysql2_orders
在 Sqoop 执行输出中可以看到,sqoop job 会记录之前执行任务的时间,并调整 where 语句来实现增量同步数据,所以如果需要多次测试,需要删除 job(sqoop job –delete XXX)并重新创建,这样会再次全量同步
进入 Hive,查看增量数据是否同步成功
使用 SQL Workbench 通过 JDBC 连接 Redshift,执行如下命令将增量数据同步到中间表
执行如下事务将中间表的数据插入更新到 country,users,orders 表中
查看数据是否正确插入更新到 country,users,orders 表中
之后在 Redshift 中的分析语句都可以通过添加 where ifdelete=false 排除删除的记录,同时可以定期删除 ifdelete 标记为 false 的记录,释放存储空间
作者介绍:
余骏
AWS 解决方案架构师,负责基于 AWS 的云计算方案架构的咨询和设计,同时致力于 AWS 云服务在国内的应用和推广。在加入 AWS 之前,他在思科中国担任系统工程师,负责方案咨询和架构设计,在企业私有云和基础网络方面有丰富经验。
评论