
本文经授权转载自 PostgreSQL 中文社区。
Kafka 是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。当通过 Kafka 和 greenplum 搭建流处理管道时,如何高速可靠的完成流数据加载,成为用户最关心的问题。从 5.10 开始,Greenplum 发布了新的工具 GPKafka,为 Greenplum 提供了流数据加载的能力。
GPkafka 工具:kafka —> Greenplum
一、安装准备
kafka 安装:版本为 kafka_2.11-2.1.0。
greenplum 安装:版本为 5.16
二、Kafka 数据导入 GreenPlum
启动 kafka
# 启动zookeeper
$ /opt/zookeeper-3.4.12/bin/zkServer.sh start
# 启动kafka
$/opt/kafka/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon ../config/server.properties
2.创建 gpss 扩展
在将 Kafka 消息数据加载到 Greenplum 数据库之前,必须在将 Kafka 数据写入 Greenplum 表的每个数据库中注册 Greenplum-Kafka 集成格式化程序函数;示例在 lottu 数据库
[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# CREATE EXTENSION gpss;
3.创建示例表
kafka 的数据格式 json 形式;样式:
{
"time": 1550198435941,
"type": "type_mobileinfo",
"phone_imei": "861738033581011",
"phone_imsi": "",
"phone_mac": "00:27:1c:95:47:09",
"appkey": "307A5C626E6C2F6472636E6E6A2F736460656473",
"phone_udid": "8F137BFFB2289784A5EA2DCADCE519C2",
"phone_udid2": "744DD04CE29652F4F1D2DFFC8D3204A9",
"appUdid": "D21C76419E54B18DDBB94BF2E6990183",
"phone_resolution": "1280*720",
"phone_apn": "",
"phone_model": "BF T26",
"phone_firmware_version": "5.1",
"phone_softversion": "3.19.0",
"phone_softname": "com.esbook.reader",
"sdk_version": "3.1.8",
"cpid": "blp1375_13621_001",
"currentnetworktype": "wifi",
"phone_city": "",
"os": "android",
"install_path": "\/data\/app\/com.esbook.reader-1\/base.apk",
"last_cpid": "",
"package_name": "com.esbook.reader",
"src_code": "WIFIMAC:00:27:1c:95:47:09"
}
我需要其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段;所以我创建的表语句
CREATE TABLE tbl_novel_mobile_log (
package_name text,
appkey text,
ts bigint,
phone_udid text,
os character varying(20),
idfa character varying(64),
phone_imei character varying(20),
cpid text,
last_cpid text,
phone_number character varying(20)
) ;
4.创建 gpkafka.yaml 配置文件
gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkaip:9092
TOPIC: mobile_info
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: tbl_novel_mobile_log
MAPPING:
- NAME: package_name
EXPRESSION: (jdata->>'package_name')::text
- NAME: appkey
EXPRESSION: (jdata->>'appkey')::text
- NAME: ts
EXPRESSION: (jdata->>'time')::bigint
- NAME: phone_udid
EXPRESSION: (jdata->>'phone_udid')::text
- NAME: os
EXPRESSION: (jdata->>'os')::text
- NAME: idfa
EXPRESSION: (jdata->>'idfa')::text
- NAME: phone_imei
EXPRESSION: (jdata->>'phone_imei')::text
- NAME: cpid
EXPRESSION: (jdata->>'cpid')::text
- NAME: last_cpid
EXPRESSION: (jdata->>'last_cpid')::text
- NAME: phone_number
EXPRESSION: (jdata->>'phone_number')::text
COMMIT:
MAX_ROW: 1000
5.创建 mobile_info topic
/opt/kafka/kafka_2.11-2.1.0/bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1 --topic mobile_info
6.创建 kafka 的发布者
执行下列命令;并添加 kafka 记录
[root@oracle166 ~]# /opt/kafka/kafka_2.11-2.1.0/bin/kafka-console-producer.sh --broker-list kafkaIP:9092 --topic mobile_info
>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BFT26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"LeX620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUSA6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerTlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINAMOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.readerNBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPOA33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MIMAX2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}
7.执行 gpkafka 加载数据
[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC
8.检查加载操作的进度(非必要)
[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
9.查看表数据
[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# select * from tbl_novel_mobile_log ;
package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p
hone_number
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+--
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1
5077113477
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | |
cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | |
(5 rows)
三、后记
编辑本文初衷是:公司计划为北京 ES 小说作投放计划,需要类似热云数据平台作为投放数据支持,使投放更加精准可靠。北京小说部门数据存放于 kafka 中,需要将 kafka 中的数据导入深圳后台数据库中,虽然最后平台未采用 gpkafka 方式,但不失一种方案,由于种种原因后台数据库选 PG9.6 版本,采用 java 代码实现 kafka 数据实时导入 PG。最后祝 PG,GP 越来越好,也期待 pgkafka 工具诞生。
四、参考文献
1、gpkafka 更多用法
https://gpdb.docs.pivotal.io/5120/greenplum-kafka/intro.html
2、BottledWater-PG:PostgreSQL 集成 Kafka 的实时数据交换平台
https://www.jianshu.com/p/c3659f49bf94
作者介绍:
Lottu,就职于深圳宜搜科技有限公司,担任数据库 DBA,主要承担 PostgreSQL、Oracle 数据库维护工作以及数据库去 O 工作。
原文链接:
https://mp.weixin.qq.com/s/HuYYvKtV8RfNxtrrJwR6SQ
更多内容推荐
03|架构概述:一个监控系统的典型架构是什么样的?
监控系统由哪些模块组成,各个模块是如何相互协同的
2023-01-13
Spring 源码解析 (四)Spring 是怎么处理 BeanDefinition 的?
作者:石臻臻,CSDN博客之星Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云TVP,滴滴Kafka技术专家、 KnowStreaming。
2022-08-30
一文了解 MySQL 的 Buffer Pool
Innodb 存储引擎设计了一个缓冲池(Buffer Pool),来提高数据库的读写性能。
2022-03-28
调查报告解读之国外数据库篇:MySQL 国内使用率第一,多少企业有意替换国外产品?
《2022年墨天轮数据库大调查报告》数据显示,国内有96.1%的企业正在使用国外数据库产品,其中 MySQL 的国内使用率排名第一。那么其他数据库使用情况如何?有多少企业有替换国外数据库的计划?一起来看看吧!
2023-02-22
Flink 集群运行模式
2020-08-26
4 月 22 日,云数据库技术沙龙【杭州站】
4月22日,云数据库技术主办的「MySQL x ClickHouse」技术沙龙将在杭州举办。本次沙龙以“技术进化,让数据更智能”为主题,汇聚字节跳动、阿里云、玖章算术、华为云、腾讯云等众多数据库厂商,围绕MySQL x ClickHouse的实践经验,与广大技术爱好者交流分享。
2023-04-13
选型:不同阶段的数据应如何存储?
今天提到的MySQL、PostgreSQL、Redis、Doris、Etcd等,都是我们思考的一个具象化的表达而已。我们更关心的,应该是构建一个系统的思维。
2022-09-30
Redis--Redis 集群、缓存穿透、缓存击穿、缓存雪崩
Redis集群实现了对 Redis 的水平扩容,即启动 N 个 Redis 节点,将整个数据库分布存储在这N个节点中,每个节点存储总数数据的 1/N。
2022-10-07
亚信科技 AntDB 数据库专家出席数据库标准研讨会并参与研讨
2023年7月12日,全国信息技术标准化技术委员会数据库标准工作组(SAC/TC28/WG31)秘书处组织召开数据库标准研讨会,会议围绕数据库标准工作组2023年上半年开展的标准编制情况进行交流。
2023-08-03
【kafka 运维】TopicCommand 运维脚本 (1)
TopicCommand 1.Topic创建 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
2022-10-23
62|Apache Hive 集成
2020-11-12
GreatSQL 社区月报 | 2023.04
GreatSQL 是一个开源的 MySQL 技术路线数据库社区,社区致力于通过开放的社区合作,构建国内自主 MySQL 版本及开源数据库技术,推动中国开源数据库及应用生态繁荣发展。
2023-05-12
精彩回顾|【ACDU 中国行·成都站】数据库主题交流活动成功举办!
【ACDU 中国行·成都站】中六位数据库资深专家从数据库性能管理、数据库中间件以及监控工具等热点话题进行分享,点击查看。
2023-10-18
Spark Connecter:MySQL 及 Kafka
2020-12-17
Apache 安全专题 - CVE-2017-15715
啊
2020-10-08
2022 年 2 月国产数据库排行榜: OceanBase“三连增”重夺探花,GaussDB 实现本月最大涨幅引期待
寒辞去冬雪,暖带入春风。2022年2月,虎年开年的国产数据库流行度排行榜已在墨天轮社区发布,本月共有195个数据库参与排名。排名前十位的数据库分数增减幅度较大,整体排名略有波动。
2022-02-15
Day433
每一个要使用分布式事务的数据库都需要一个 UNDO_LOG 表。
2022-05-08
GreatSQL 社区月报 | 2023.03
GreatSQL 是一个开源的 MySQL 技术路线数据库社区,社区致力于通过开放的社区合作,构建国内自主 MySQL 版本及开源数据库技术,推动中国开源数据库及应用生态繁荣发展。
2023-04-10
MatrixOne 入选艾瑞数据库研究报告啦~
近期,艾瑞咨询正式发布《2022中国数据库研究报告》。据艾瑞统计,2021年中国数据库市场总规模达286.8亿元,同比增长16.1%。
2023-01-17
PostgreSQL-HA 高可用集群在 Rainbond 上的部署方案
PostgreSQL 是一种流行的开源关系型数据库管理系统。它提供了标准的SQL语言接口用于操作数据库。
2023-05-09
推荐阅读
钱大妈生鲜如何利用 CCR 实现 Apache Doris 集群读写分离
数据湖仓3. HBase 的 shell 操作命令
2023-09-08
ClickHouse 收购 PeerDB,通过 Postgres CDC 集成增强实时分析
数据库打造次世代分析型数据库(八):高效数据导入导出方案
2023-10-30
39.Kafka 该如何实现顺序消费
2023-09-30
钱大妈生鲜如何利用 CCR 实现 Apache Doris 集群读写分离
数据库2. Kafka 的基本操作
2023-09-08
电子书

大厂实战PPT下载
换一换 
姚兴 | 京东集团 信息安全部 总监
席永青 | 阿里巴巴 资深网络架构师
黄文勇 | Intel 资深软件工程师、WebAssembly Micro Runtime 开源项目主要创始人
评论