写点什么

GPkafka-Kafka 数据导入 GreenPlum 实践

  • 2019-08-30
  • 本文字数:5432 字

    阅读完需:约 18 分钟

GPkafka-Kafka 数据导入 GreenPlum 实践

本文经授权转载自 PostgreSQL 中文社区。


Kafka 是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。当通过 Kafka 和 greenplum 搭建流处理管道时,如何高速可靠的完成流数据加载,成为用户最关心的问题。从 5.10 开始,Greenplum 发布了新的工具 GPKafka,为 Greenplum 提供了流数据加载的能力。


GPkafka 工具:kafka —> Greenplum

一、安装准备

kafka 安装:版本为 kafka_2.11-2.1.0。


greenplum 安装:版本为 5.16

二、Kafka 数据导入 GreenPlum

  1. 启动 kafka


# 启动zookeeper
$ /opt/zookeeper-3.4.12/bin/zkServer.sh start
# 启动kafka
$/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon ../config/server.properties
复制代码


2.创建 gpss 扩展


在将 Kafka 消息数据加载到 Greenplum 数据库之前,必须在将 Kafka 数据写入 Greenplum 表的每个数据库中注册 Greenplum-Kafka 集成格式化程序函数;示例在 lottu 数据库


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# CREATE EXTENSION gpss;
复制代码


3.创建示例表


kafka 的数据格式 json 形式;样式:


{  "time": 1550198435941,  "type": "type_mobileinfo",  "phone_imei": "861738033581011",  "phone_imsi": "",  "phone_mac": "00:27:1c:95:47:09",  "appkey": "307A5C626E6C2F6472636E6E6A2F736460656473",  "phone_udid": "8F137BFFB2289784A5EA2DCADCE519C2",  "phone_udid2": "744DD04CE29652F4F1D2DFFC8D3204A9",  "appUdid": "D21C76419E54B18DDBB94BF2E6990183",  "phone_resolution": "1280*720",  "phone_apn": "",  "phone_model": "BF T26",  "phone_firmware_version": "5.1",  "phone_softversion": "3.19.0",  "phone_softname": "com.esbook.reader",  "sdk_version": "3.1.8",  "cpid": "blp1375_13621_001",  "currentnetworktype": "wifi",  "phone_city": "",  "os": "android",  "install_path": "\/data\/app\/com.esbook.reader-1\/base.apk",  "last_cpid": "",  "package_name": "com.esbook.reader",  "src_code": "WIFIMAC:00:27:1c:95:47:09"} 
复制代码


我需要其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段;所以我创建的表语句


CREATE TABLE tbl_novel_mobile_log (
package_name text,
appkey text,
ts bigint,
phone_udid text,
os character varying(20),
idfa character varying(64),
phone_imei character varying(20),
cpid text,
last_cpid text,
phone_number character varying(20)
) ;
复制代码


4.创建 gpkafka.yaml 配置文件


gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkaip:9092
TOPIC: mobile_info
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: tbl_novel_mobile_log
MAPPING:
- NAME: package_name
EXPRESSION: (jdata->>'package_name')::text
- NAME: appkey
EXPRESSION: (jdata->>'appkey')::text
- NAME: ts
EXPRESSION: (jdata->>'time')::bigint
- NAME: phone_udid
EXPRESSION: (jdata->>'phone_udid')::text
- NAME: os
EXPRESSION: (jdata->>'os')::text
- NAME: idfa
EXPRESSION: (jdata->>'idfa')::text
- NAME: phone_imei
EXPRESSION: (jdata->>'phone_imei')::text
- NAME: cpid
EXPRESSION: (jdata->>'cpid')::text
- NAME: last_cpid
EXPRESSION: (jdata->>'last_cpid')::text
- NAME: phone_number
EXPRESSION: (jdata->>'phone_number')::text
COMMIT:
MAX_ROW: 1000
复制代码


5.创建 mobile_info topic


/opt/kafka/kafka_2.11-2.1.0/bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1  --topic mobile_info
复制代码


6.创建 kafka 的发布者


执行下列命令;并添加 kafka 记录


[root@oracle166 ~]# /opt/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh  --broker-list kafkaIP:9092 --topic mobile_info
复制代码


>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BFT26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"LeX620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUSA6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerTlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINAMOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerNBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPOA33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MIMAX2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}

复制代码


7.执行 gpkafka 加载数据


[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC 
复制代码


8.检查加载操作的进度(非必要)


[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yamlPartitionID StartTime EndTime BeginOffset EndOffset0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
复制代码


9.查看表数据


[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.


lottu=# select * from tbl_novel_mobile_log ;
package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p
hone_number
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+--
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1
5077113477
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | |
cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | |
(5 rows)

复制代码

三、后记

编辑本文初衷是:公司计划为北京 ES 小说作投放计划,需要类似热云数据平台作为投放数据支持,使投放更加精准可靠。北京小说部门数据存放于 kafka 中,需要将 kafka 中的数据导入深圳后台数据库中,虽然最后平台未采用 gpkafka 方式,但不失一种方案,由于种种原因后台数据库选 PG9.6 版本,采用 java 代码实现 kafka 数据实时导入 PG。最后祝 PG,GP 越来越好,也期待 pgkafka 工具诞生。

四、参考文献

1、gpkafka 更多用法


https://gpdb.docs.pivotal.io/5120/greenplum-kafka/intro.html


2、BottledWater-PG:PostgreSQL 集成 Kafka 的实时数据交换平台


https://www.jianshu.com/p/c3659f49bf94


作者介绍:


Lottu,就职于深圳宜搜科技有限公司,担任数据库 DBA,主要承担 PostgreSQL、Oracle 数据库维护工作以及数据库去 O 工作。


原文链接:


https://mp.weixin.qq.com/s/HuYYvKtV8RfNxtrrJwR6SQ


2019-08-30 09:427062

评论

发布
暂无评论
发现更多内容

解锁YashanDB高效查询的关键功能 Group by分组

YashanDB

数据库 yashandb

13 【HarmonyOS NEXT】 仿uv-ui组件开发之Avatar组件进阶指南(四)

全栈若城

HarmonyOS NEXT

淘宝商品评论API接口全攻略

tbapi

淘宝API 淘宝评论API 淘宝评论数据采集

JAM Tour 杭州站完美谢幕:Gavin Wood 深入解析去中心化超级计算机

One Block Community

polkadot JAM Tour Gavin Wood

Easysearch 新功能: IK 字段级别词典

极限实验室

ik easysearch

2025《政府工作报告》解读:发展新质生产力,为现代化产业体系 “赋能提速”!

数造万象

人工智能 数字经济 政策 热门 #大数据

11 【HarmonyOS NEXT】 仿uv-ui组件开发之Avatar组件深度剖析(二)

全栈若城

HarmonyOS NEXT

企业云盘评测:18款工具深度对比

易成研发中心

云盘 企业云盘 企业网盘

【连载 20】实时信息展示

FunTester

12 【HarmonyOS NEXT】 仿uv-ui组件开发之Avatar组件设计精髓(三)

全栈若城

HarmonyOS NEXT

每一个“她”,一路生花

中烟创新

三八妇女节

Nexpose 7.9.0 发布,新增功能概览

sysin

Nexpose

YashanDB滚动升级

YashanDB

数据库 yashandb

YashanDB离线升级

YashanDB

数据库 yashandb

世界需要 Web3 云!而 Polkadot 为 Web3 提供真正的云能力!

One Block Community

区块链 polkadot web3

2025 年如何更快入门区块链,成为 Polkadot 生态深耕者?

One Block Community

开发者 polkadot Solidity

YashanDB升级前准备

YashanDB

数据库 yashandb

RocketMQ消息回溯实践与解析

Geek_e3e86e

Java 编程

15款热门电子协议签署平台横评,选型不踩坑

易成研发中心

电子合同 电子合同软件 电子合同系统

团队高效协同必看:Trello、Teambition等 8 大实用工具

易成研发中心

项目管理 项目管理工具 项目管理软件

IBM 完成对 HashiCorp 的收购,打造全面的端到端混合云平台

财见

YashanDB离线升级回退

YashanDB

数据库 yashandb

YashanDB更换服务器IP

YashanDB

数据库 yashandb

国产远控软件哪款最流畅?对比显示向日葵最优

编程猫

裸辞后 All in Web3,她如何从机械工程师蜕变为黑客松冠军?

One Block Community

开发者 web3 女性

VMware ESXi 8.0U2d 发布下载 - 领先的裸机 Hypervisor

sysin

esxi

YashanDB滚动升级回退

YashanDB

数据库 yashandb

AI 驱动开发,代码世界的革新

秃头小帅oi

《Operating System Concepts》阅读笔记:p203-p207

codists

操作系统

GPkafka-Kafka 数据导入 GreenPlum 实践_大数据_Lottu_InfoQ精选文章