写点什么

GPkafka-Kafka 数据导入 GreenPlum 实践

  • 2019-08-30
  • 本文字数:5432 字

    阅读完需:约 18 分钟

GPkafka-Kafka 数据导入 GreenPlum 实践

本文经授权转载自 PostgreSQL 中文社区。


Kafka 是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。当通过 Kafka 和 greenplum 搭建流处理管道时,如何高速可靠的完成流数据加载,成为用户最关心的问题。从 5.10 开始,Greenplum 发布了新的工具 GPKafka,为 Greenplum 提供了流数据加载的能力。


GPkafka 工具:kafka —> Greenplum

一、安装准备

kafka 安装:版本为 kafka_2.11-2.1.0。


greenplum 安装:版本为 5.16

二、Kafka 数据导入 GreenPlum

  1. 启动 kafka


# 启动zookeeper
$ /opt/zookeeper-3.4.12/bin/zkServer.sh start
# 启动kafka
$/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon ../config/server.properties
复制代码


2.创建 gpss 扩展


在将 Kafka 消息数据加载到 Greenplum 数据库之前,必须在将 Kafka 数据写入 Greenplum 表的每个数据库中注册 Greenplum-Kafka 集成格式化程序函数;示例在 lottu 数据库


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# CREATE EXTENSION gpss;
复制代码


3.创建示例表


kafka 的数据格式 json 形式;样式:


{  "time": 1550198435941,  "type": "type_mobileinfo",  "phone_imei": "861738033581011",  "phone_imsi": "",  "phone_mac": "00:27:1c:95:47:09",  "appkey": "307A5C626E6C2F6472636E6E6A2F736460656473",  "phone_udid": "8F137BFFB2289784A5EA2DCADCE519C2",  "phone_udid2": "744DD04CE29652F4F1D2DFFC8D3204A9",  "appUdid": "D21C76419E54B18DDBB94BF2E6990183",  "phone_resolution": "1280*720",  "phone_apn": "",  "phone_model": "BF T26",  "phone_firmware_version": "5.1",  "phone_softversion": "3.19.0",  "phone_softname": "com.esbook.reader",  "sdk_version": "3.1.8",  "cpid": "blp1375_13621_001",  "currentnetworktype": "wifi",  "phone_city": "",  "os": "android",  "install_path": "\/data\/app\/com.esbook.reader-1\/base.apk",  "last_cpid": "",  "package_name": "com.esbook.reader",  "src_code": "WIFIMAC:00:27:1c:95:47:09"} 
复制代码


我需要其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段;所以我创建的表语句


CREATE TABLE tbl_novel_mobile_log (
package_name text,
appkey text,
ts bigint,
phone_udid text,
os character varying(20),
idfa character varying(64),
phone_imei character varying(20),
cpid text,
last_cpid text,
phone_number character varying(20)
) ;
复制代码


4.创建 gpkafka.yaml 配置文件


gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkaip:9092
TOPIC: mobile_info
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: tbl_novel_mobile_log
MAPPING:
- NAME: package_name
EXPRESSION: (jdata->>'package_name')::text
- NAME: appkey
EXPRESSION: (jdata->>'appkey')::text
- NAME: ts
EXPRESSION: (jdata->>'time')::bigint
- NAME: phone_udid
EXPRESSION: (jdata->>'phone_udid')::text
- NAME: os
EXPRESSION: (jdata->>'os')::text
- NAME: idfa
EXPRESSION: (jdata->>'idfa')::text
- NAME: phone_imei
EXPRESSION: (jdata->>'phone_imei')::text
- NAME: cpid
EXPRESSION: (jdata->>'cpid')::text
- NAME: last_cpid
EXPRESSION: (jdata->>'last_cpid')::text
- NAME: phone_number
EXPRESSION: (jdata->>'phone_number')::text
COMMIT:
MAX_ROW: 1000
复制代码


5.创建 mobile_info topic


/opt/kafka/kafka_2.11-2.1.0/bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1  --topic mobile_info
复制代码


6.创建 kafka 的发布者


执行下列命令;并添加 kafka 记录


[root@oracle166 ~]# /opt/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh  --broker-list kafkaIP:9092 --topic mobile_info
复制代码


>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BFT26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"LeX620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUSA6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerTlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINAMOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerNBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPOA33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MIMAX2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}

复制代码


7.执行 gpkafka 加载数据


[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC 
复制代码


8.检查加载操作的进度(非必要)


[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
复制代码


9.查看表数据


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.


lottu=# select * from tbl_novel_mobile_log ;
package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p
hone_number
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+--
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1
5077113477
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | |
cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | |
(5 rows)

复制代码

三、后记

编辑本文初衷是:公司计划为北京 ES 小说作投放计划,需要类似热云数据平台作为投放数据支持,使投放更加精准可靠。北京小说部门数据存放于 kafka 中,需要将 kafka 中的数据导入深圳后台数据库中,虽然最后平台未采用 gpkafka 方式,但不失一种方案,由于种种原因后台数据库选 PG9.6 版本,采用 java 代码实现 kafka 数据实时导入 PG。最后祝 PG,GP 越来越好,也期待 pgkafka 工具诞生。

四、参考文献

1、gpkafka 更多用法


https://gpdb.docs.pivotal.io/5120/greenplum-kafka/intro.html


2、BottledWater-PG:PostgreSQL 集成 Kafka 的实时数据交换平台


https://www.jianshu.com/p/c3659f49bf94


作者介绍:


Lottu,就职于深圳宜搜科技有限公司,担任数据库 DBA,主要承担 PostgreSQL、Oracle 数据库维护工作以及数据库去 O 工作。


原文链接:


https://mp.weixin.qq.com/s/HuYYvKtV8RfNxtrrJwR6SQ


2019-08-30 09:426934

评论

发布
暂无评论
发现更多内容

使用 Tye 辅助开发 k8s 应用竟如此简单(三)

newbe36524

Docker 微服务 k8s dotnet

Java架构大牛之路必备“微服务架构笔记”

Java架构之路

Java 程序员 架构 面试 编程语言

探究Python源码,终于弄懂了字符串驻留技术

华为云开发者联盟

Python 字符串 Python解释器 字符串驻留 字符

01 | Mysql基础架构

zach

MySQL

【LeetCode】K 连续位的最小翻转次数Java题解

Albert

算法 LeetCode 2月春节不断更

元旦立下的Flag,春节后该如何实现?

脑极体

技术需求文档,应当这么写!

穿甲兵

需求 文档

大厂必问算法!查漏补缺LeetCode必考“1024道技术点面试题”

Java架构之路

Java 程序员 架构 面试 编程语言

腾讯云大神用这份“redis深度笔记”把Redis入门到精通全部精髓全部展现出来了

redis 架构 计算机

温故而知新!腾讯Android开发面试记录,薪资翻倍

欢喜学安卓

android 程序员 面试 移动开发

诊所数字化:诊所医护人员绩效指标评估方式

boshi

绩效 数字化转型 医疗 七日更

话题讨论 | 今年,你回家过年了吗?

xcbeyond

话题讨论 春节 就地过年

28天写作再次开启,你准备好来挑战了吗?

TGO鲲鹏会

28天写作 热门活动

offer稳了!四面阿里面经分享,定级P6之路。

Java架构之路

Java 程序员 架构 面试 编程语言

区块链难在落地,亟需补人才缺口迎爬升期

CECBC

区块链

电信的标准化组织

Geek_古藤模根

标准化 电信

最新Hadoop的面试题总结

大数据老哥

真牛皮!2021最新Android大厂面试真题大全,BAT大厂面试总结

欢喜学安卓

android 程序员 面试 移动开发

6.render阶段(厉害了,我有创建Fiber的技能)

全栈潇晨

React React Hooks react源码

如何提升网页核心指标

Vincent

linux内核协议栈 邻居协议之ARP协议处理初始化

赖猫

Linux 协议栈 Linux内核

资本之外,区块链破圈正当时

CECBC

区块链

十四五期间我国区块链技术趋势特征分析

CECBC

区块链 大数据

百度Hydra工具在移动端UI兼容性测试上的高效应用

百度Geek说

测试 UI

DPDK大页内存原理

赖猫

Linux DPDK

阿里面试这样问:redis 为什么把简单的字符串设计成 SDS?

程序员小富

Java redis 面试

5.state更新流程(setState里到底发生了什么)

全栈潇晨

React React Hooks react源码

翻译:《实用的Python编程》01_06_Files

codists

人工智能 后端 python 爬虫 数据结构与算法 文件操作

让虞书欣、李诞拍到停不下来!AR+AI双引擎的互动小游戏,如何打开IP新玩法?

爱奇艺技术产品团队

如何读懂CNN、BBC、经济学人、卫报、纽约时报?看完这本经典即可事半功倍!

wbliu85

学习 英语

GitHub上爆火的Java性能优化100+小技巧!(干货建议收藏)

Java架构师迁哥

GPkafka-Kafka 数据导入 GreenPlum 实践_大数据_Lottu_InfoQ精选文章