【AICon】 如何构建高效的 RAG 系统?RAG 技术在实际应用中遇到的挑战及应对策略?>>> 了解详情
写点什么

使用 Apache Kafka 和 KSQL 实现流处理普及化——第二部分

  • 2018-09-20
  • 本文字数:12858 字

    阅读完需:约 42 分钟

本文要点

  • 针对客户操作、操作仪表板、在线分析等应用场景,使用 Apache Kafka 和 KSQL 构建数据集成和处理应用程序。
  • 流处理的主要好处包括:数据增强一次性完成、低延迟处理、向客户 Ops 团队实时发送通知。
  • 你可以使用变化数据捕获(CDC)工具把数据库的数据以及任何后续的变化镜像到 Kafka 主题。
  • 使用 KSQL,很容易把业务数据流和源自数据库、在 Kafka 主题中维护的相关信息合流。
  • 扩展应用程序使其可以处理更多的通知,而不必修改过滤逻辑。

这是文章“使用 Apache Kafka 和 KSQL 实现流处理普及化”的第二部分。第一部分在这里

在本文中,我们将介绍如何使用 Apache Kafka®和 KSQL 构建数据集成和处理应用程序。这是一个来自电商领域的简单示例:在一个网站上,通过一系列事件跟踪用户评论。关于这些用户的信息,如姓名、联系方式、尊贵客户俱乐部资深会员,保存在数据库的某个地方。对于这类评论数据,至少有三种用途:

  • 客户操作——如果一个尊贵客户俱乐部资深会员留下了差评,我们希望可以马上做些事情,降低流失这类客户的风险。我们希望应用程序在出现满足此条件的评论时立即通知我们。这样,我们就可以马上为客户提供服务,这远远好于我们等待一段时间后才运行的批处理为我们标记出需要联系的用户。
  • 操作仪表板实时展示评论输入,滚动聚合,计算数量和平均分数,等等,并按用户区域划分。
  • 结合其他数据(不论在数据湖中,还是在数据仓库中)在线分析分析评论数据。这可以扩展到更广泛的数据科学实践和机器学习应用。所有这些都需要访问评论信息及用户的详细信息。

我们将介绍下如何使用一种更为现代化的模式基于流平台实现上述功能。我们将使用开源项目 Apache Kafka 和 KSQL 来实现。KSQL 是一个面向 Apache Kafka 的流 SQL 引擎,基于 Kafka Streams API 实现,后者是 Apache Kafka 的一个组成部分。

下图展示了流应用程序示例的工作原理。



图1. 流数据应用程序

事件是用户提交到网站的评论,它们被以流的方式直接传递给 Kafka。从这里,它们可以实时和用户信息联系起来,经过充实的结果数据会写回到 Kafka。转换完成后,这些数据就可以用于驱动上述应用和目标了。转换逻辑只需要执行一次。数据一次性从源系统提取。转换后的数据可以供不相关的应用程序多次使用。不用对现有组件做任何修改,就可以添加新的源和目标。所有这些操作的延迟都非常低。

因此,高层设计是这样的:

  • Web 应用直接向 Kafka 发送评论;
  • Kafka Connect 把数据库用户数据快照以流的方式发送给 Kafka,并且直接与 CDC 保持同步;
  • 流处理把用户数据添加到评论事件,并写回到一个新的 Kafka 主题;
  • 流处理会针对 VIP 用户的差评筛选出充实后的 Kafka 主题,并写入一个新的 Kafka 主题;
  • 事件驱动应用会监听 Kafka 主题,在 VIP 用户留下差评后立即推送通知;
  • Kafka Connect 把数据以流的方式传入 Elasticsearch,供操作仪表板使用;
  • Kafka Connect 把数据以流的方式传入 S3,供长期在线分析使用以及和其他数据集一起使用。

其中,主要好处包括:

  • 数据增强一次性完成,可供任何应用程序消费;
  • 数据处理延迟低;
  • 可以在 VIP 客户留下差评后立即通知客户 Ops 团队——提供更好的客户体验,增加业务保留机会;
  • 容易扩展,可以按需增加新节点,实现更大的吞吐量。

实现

让我们看一下构建这个应用程序的详细过程。 GitHub 上提供了所有示例的代码以及 docker-compose 文件。

把数据写入 Kafka

Web 应用程序有多种方式可以使事件流入 Kafka。

  • 许多客户端库都提供了Producer API,面向的语言包括 Java、.NET、Python、C/C++、Go、node.js 等;
  • 开源REST 代理,可以发起 HTTP 调用把数据发送到 Kafka。

在我们的例子中,应用程序使用了 Producer API。

Web 应用程序发送给 Kafka 主题“评级(ratings)”的消息格式如下:

复制代码
{
"rating_id": 604087,
"user_id": 7,
"stars": 1,
"route_id": 2777,
"rating_time": 1528800546808,
"channel": "android",
"message": "thank you for the most friendly, helpful experience today at your new lounge"
}

使 Kafka 可以访问数据库中的数据

在构建应用程序的时候,经常需要使用存储在数据库中的数据。在我们的例子中,用户数据保存在 MySQL 中,不过,设计模式都是一样的,与采用哪种具体的 RDBMS 技术无关。

在使用 Kafka 编写流处理应用程序时,集成保存在数据库中的数据的标准方法是,确保数据本身在 Kafka 中保存和维护。这比听上去简单——我们只需要使用数据变化捕获(CDC)工具把数据库中的数据和任何后续的变化镜像到一个 Kafka 主题。

这样做的好处是隔离了数据库和流处理。这主要有两个好处:数据库不会因为我们的请求增加开销,我们可以自由使用我们选取的数据,而又不会使我们的开发和部署流程和数据库所有者的相耦合。

CDC 技术和工具不止一种,我们这里就不介绍了。由于数据在 MySQL 中,我们使用 Debezium 项目作为我们的 CDC 工具。它会把用户表的内容快照到 Kafka,并使用 MySQL 的 binlog 即时检测后续 MySQL 中数据的变化并复制到 Kafka。

图 2 详细展示了数据变化捕获过程的数据流动。



图2. 流应用程序变化数据捕获

从数据库流出、流入 Kafka 主题 asgard.demo.CUSTOMERS 的消息格式如下:

复制代码
{
"id": 1,
"first_name": "Rica",
"last_name": "Blaisdell",
"email": "rblaisdell0@rambler.ru",
"gender": "Female",
"club_status": "bronze",
"comments": "Universal optimal hierarchy",
"create_ts": "2018-06-12T11:47:30Z",
"update_ts": "2018-06-12T11:47:30Z",
"messagetopic": "asgard.demo.CUSTOMERS",
"messagesource": "Debezium CDC from MySQL on asgard"
}

使用数据库信息充实事件流

使用 KSQL,很容易就可以把源于数据库、在 Kafka 主题中维护的相关信息合并到评级中。

合并细节如图 3 所示:

第一步是确保客户主题中的消息以关联列为键,在这个例子中是客户 ID。我们实际上可以使用 KSQL 进行重新分区。KSQL CREATE STREAM的输出被写入一个 Kafka 主题,在默认情况下,会以流本身的名称命名:

复制代码
-- 处理流中所有现有的数据以及将来的数据
SET 'auto.offset.reset' = 'earliest';
-- 声明源流
CREATE STREAM CUSTOMERS_SRC WITH \
(KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
-- 在 ID 列上重新分区,设置目标主题,使其分区数量与作为源的评级主题一致:
CREATE STREAM CUSTOMERS_SRC_REKEY WITH (PARTITIONS=1) AS \
SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;

现在,到达asgard.demo.CUSTOMERS主题的每条信息都将写入正确设置了消息键的 Kafka 主题CUSTOMERS_SRC_REKEY。注意,我们不一定要声明任何模式,因为我们在使用 Avro。KSQL 和 Kafka Connect 都无缝集成了开源的 Confluent Schema Registry,序列化 / 反序列化 Avro 数据,并在 Schema Registry 中保存 / 检索模式。

为了进行合并,我们使用标准的 SQL 联合查询语法:

复制代码
-- 把 CUSTOMER 注册为一张 KSQL 表,
-- 源自重新分区后的主题
CREATE TABLE CUSTOMERS WITH \
(KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID');
-- 把 RATINGS 数据注册到一个 KSQL 流,源自 ratings 主题
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
-- 执行联合查询,写入新主题——注意,主题名称是显式设置的。
-- 如果移除 KAFKA_TOPIC 参数,那么目标主题将使用所创建的流或者表的名称
CREATE STREAM RATINGS_ENRICHED WITH \
(KAFKA_TOPIC='ratings-with-customer-data', PARTITIONS=1) AS \
SELECT R.RATING_ID, R.CHANNEL, R.STARS, R.MESSAGE, \
C.ID, C.CLUB_STATUS, C.EMAIL, \
C.FIRST_NAME, C.LAST_NAME \
FROM RATINGS R \
LEFT JOIN CUSTOMERS C \
ON R.USER_ID = C.ID \
WHERE C.FIRST_NAME IS NOT NULL ;

我们可以查看这条查询处理的消息数量:

复制代码
ksql> DESCRIBE EXTENDED RATINGS_ENRICHED;
Name : RATINGS_ENRICHED
Type : STREAM
Key field : R.USER_ID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : ratings-with-customer-data (partitions: 4, replication: 1)
[...]
Local runtime statistics
------------------------
messages-per-sec: 3.61 total-messages: 2824 last-message: 6/12/18 11:58:27 AM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(本地 KSQL 服务器与 Kafka 主题 ratings-with-customer-data 的交互统计)

实际上,这条 SQL 语句本身就是一个应用程序,就像我们在 Java、Python、C……中编写的代码一样。它不断地执行,接收输入数据、处理数据、输出数据。我们在上面看到的输出是该应用程序的运行时指标。

使用 KSQL 过滤数据流

我们前面创建的 JOIN 查询其输出是一个 Kafka 主题,在源自源主题 ratings 的事件的驱动下实时填充,如下图 4 所示:

我们可以构建第二个 KSQL 应用程序,由这个派生主题所驱动,并对数据做进一步地处理。这里,我们将简单地过滤所有评级流,识别那些同时满足如下两个条件的评级:

  • 差评(评级范围 1 到 5,小于 3 即为差评)
  • “铂金”客户留下的评级

SQL 给出的语义几乎可以从字面上表达上述需求。我们可以首先使用 KSQL CLI 验证该查询:

复制代码
SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
FROM RATINGS_ENRICHED \
WHERE STARS < 3 \
AND CLUB_STATUS = 'platinum';
platinum | ltoopinc@icio.us | 1 | worst. flight. ever. #neveragain
platinum | italboyd@imageshack.us | 2 | (expletive deleted)

然后,和以前一样,这个持续查询的结果可以持久化到一个 Kafka 主题,只需为语句加上CREATE STREAM ... AS(通常使用缩写 CSAS)前缀。注意,我们可以选择所有的源列(SELECT *),或者创建一个可用字段的子集(SELECT COL1, COL2),使用哪一个取决于创建流的目的。此外,我们将把目标消息写成 JSON 格式:

复制代码
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS \
WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS \
SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
FROM RATINGS_ENRICHED \
WHERE STARS < 3 \
AND CLUB_STATUS = 'platinum';

查看生成的 Kafka 主题,我们可以看到,它只包含我们感兴趣的事件。再次强调一下,这是一个 Kafka 主题——我们可以使用 KSQL 查询它——这里,我将跳过 KSQL,使用流行的 kafkacat 工具查看它:

复制代码
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic UNHAPPY_PLATINUM_CUSTOMERS | jq '.'
{
"CLUB_STATUS": {
"string": "platinum"
},
"EMAIL": {
"string": "italboyd@imageshack.us"
},
"STARS": {
"int": 1
},
"MESSAGE": {
"string": "Surprisingly good, maybe you are getting your mojo back at long last!"
}
}

在离开 KSQL 之前,我们给自己提个醒,我们实际上仅写了三个流应用程序:

复制代码
ksql> SHOW QUERIES;
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------
CSAS_CUSTOMERS_SRC_REKEY_0 | CUSTOMERS_SRC_REKEY | CREATE STREAM CUSTOMERS_SRC_REKEY […]
CSAS_RATINGS_ENRICHED_1 | RATINGS_ENRICHED | CREATE STREAM RATINGS_ENRICHED […]
CSAS_UNHAPPY_PLATINUM_CUSTOMERS_2 | UNHAPPY_PLATINUM_CUSTOMERS | CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS […]

由 Kafka 主题驱动的推送通知

我们在上面创建的主题UNHAPPY_PLATINUM_CUSTOMERS可以用于驱动一个应用程序,如果有重要客户留下了差评,它就会给客户运营团队发送警报。这里的关键是,我们基于一个刚刚发生的事件驱动了一个实时的动作。基于批处理的分析下周才告诉我们,上周我们让一位客户失望了,这就没用了。我们希望现在就知道,以便我们现在就可以采取行动,向那位客户提供更好的体验。

Kafka 客户端库有面向各种语言的——你几乎可以选择任何语言。这里,我们使用面向Python 的开源Confluent Kafka 库。这是一个构建事件驱动应用程序的简单例子。它在一个Kafka 主题上监听事件,然后生成一个推送通知。我们将使用Slack 作为我们的通知发送平台。为了简化说明,下面的代码片段删除了所有的错误处理代码。我们可以把一个 API(如 Slack 的 API)和一个 Kafka 主题集成,在这个主题上监听事件,从而触发一个动作。

复制代码
from slackclient import SlackClient
from confluent_kafka import Consumer, KafkaError
sc = SlackClient('api-token-xxxxxxx')
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python_kafka_notify.py',
'default.topic.config': {'auto.offset.reset': 'largest'}
}
c = Consumer(settings)
c.subscribe(['UNHAPPY_PLATINUM_CUSTOMERS'])
while True:
msg = c.poll(0.1)
if msg is None:
continue
else:
email=app_msg['EMAIL']
message=app_msg['MESSAGE']
channel='unhappy-customers'
text=('`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_' % (email, message))
sc.api_call('chat.postMessage', channel=channel,
text=text, username='KSQL Notifications',
icon_emoji=':rocket:')
finally:
c.close()

下图 5 展示了使用 Slack API 发送用户通知。

[点击查看大图]

这里有必要重申一下,我们正在构建的应用程序(如果你愿意,可以把它称为微服务)是事件驱动的。就是说,该应用程序会等待一个事件,然后执行动作。它不是尝试处理所有数据并查找特定的条件,也不是一个响应某个命令的同步请求 - 响应服务。我们已经分离出了这些职责:

  • 根据确定的条件过滤实时事件流是由 KSQL 完成的(使用我们前面介绍的CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS语句),匹配的事件被写入一个 Kafka 主题;
  • 通知服务有一个唯一的职责,它负责从 Kafka 主题获得事件,并基于它生成一个推送通知。这是异步完成的。

这样做的好处很明显:

  • 我们可以横向扩展应用程序,使其处理更多通知,而不必修改过滤逻辑;
  • 我们可以使用可选的其他应用程序替换这个应用程序,而不必修改过滤逻辑;
  • 我们可以替换或修改过滤逻辑,而不必触及通知应用程序。

Kafka 和请求 / 响应模式

对于基于 Kafka 平台编写应用程序,有一种常见的质疑,就是事件驱动模式不适用于应用程序的流程,并由此推论,Kafka 也不适合。这种观点是错误的,有两个关键点需要记住:

  • 事件驱动模式和请求 / 响应模式都完全可以使用——它们不是互斥的,有些需求需要使用请求 / 响应模式;
  • 决定因素应该是需求;应该挑战现有方法的惯性。在部分或全部应用程序的消息传递中使用事件驱动架构,你可以从它带来的异步性、可扩展性以及与 Kafka 的集成中受益,其他所有使用 Kafka 的系统和应用程序也是如此。

要了解有关这个问题的进一步讨论,可以查阅 Ben Stopford 的系列文章及其最新著作《事件驱动系统设计》。

使数据从 Kafka 流入 Elasticsearch,用于操作分析

使用 Kafka Connect 很容易就可以使数据从 Kafka 流入 Elasticsearch。它提供了一个由配置文件控制的可扩展的流集成。有一个开源的 Elasticsearch 连接器,既可以单独存在,也可以作为 Confluent 平台的一部分。这里,我们将使原始评级及警告信息流入 Elasticsearch:

复制代码
"name": "es_sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "ratings-with-customer-data,UNHAPPY_PLATINUM_CUSTOMERS",
"connection.url": "http://elasticsearch:9200"
[...]
}
}

在从 Kafka Connect 到 Elasticsearch 的数据流上,使用 Kibana 很容易在经过充实、过滤的数据上构建一个实时仪表板,如图 6 所示。

[点击查看大图]

使数据从 Kafka 流入数据湖

最后,我们将使充实后的评级流入数据湖。在这里,它可以用于在线分析、训练机器学习模型和数据科学项目,等等。

Kafka 中的数据可以流入使用Kafka Connect 的各种类型的目标。这里,我们将看下S3 和BigQuery,但是,使用HDFS、GCS、Redshift、Snowflake DB 等也同样简单。

就像前面介绍的使数据从 Kafka 流入 Elasticsearch 一样,针对每项目标技术的设置只是一个简单的配置文件设置:

复制代码
"name": "s3-sink-ratings",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "ratings-with-customer-data",
"s3.region": "us-west-2",
"s3.bucket.name": "rmoff-demo-ratings",

数据流入 S3 后,我们可以在桶里查看,如图 7 所示。

[点击查看大图]

我们还可以使同样的数据流入谷歌的 BigQuery:

复制代码
"name": "gbq-sink-ratings",
"config": {
"connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "ratings-with-customer-data",
"project":"rmoff",
"datasets":".*=ratings",

[点击查看大图]

谷歌的 Data Studio 是众多可以用于分析这些来自云对象存储的数据的应用程序之一:

[点击查看大图]

这里的重点不是上面介绍的具体技术,不管你选择使用什么样的数据湖技术,使用 Kafka Connect 都很容易使数据流入它。

和 KSQL 及流平台一起走向未来

在这篇文章中,我们已经看了把流平台作为数据架构核心组成部分的其中多个有力的论据。它提供了一个可扩展的基础,由于其解耦特性,使系统可以灵活地集成和演进。分析工作可以从流平台的强大集成能力中获益。它是流平台,因此,实时不是其主要动因。应用程序可以从流平台获益,因为它是实时的,而且也因为它的集成能力。

借助 KSQL,可以使用许多开发人员都熟悉的语言编写流处理应用程序。这些应用程序可以是简单的 Kafka 事件流过滤器,也可以是复杂的充实模式,从包括数据库在内的其他系统获取数据。

要了解更多有关 KSQL 的信息,你可以观看教程并自己试一下。文档中介绍了调整和部署实践。在 Confluent Community Slack 群组中,有一个与此相关的活跃社区。 GitHub 上提供了本文的示例。

关于作者

Robin Moffatt 是 Confluent 的一名开发大使,该公司由 Apache Kafka 的创建者发起成立。他还是 Oracle ACE 总监和开发冠军。职业生涯至今,他一直在跟数据打交道,从以前的 COBOL 和 DB2 到 Oracle 和 Hadoop,再到如今的 Kafka。他的主要研究领域是分析、系统架构、性能测试和优化。你可以在这里这里阅读他的博文(之前在这里)。他的Twitter 账号是 @rmoff 。在业余时间里,他喜欢喝啤酒,吃煎炸早餐,不过一般不是同时。

这是文章“使用 Apache Kafka 和 KSQL 实现流处理普及化”的第二部分。第一部分在这里

查看英文原文: Democratizing Stream Processing with Apache Kafka® and KSQL - Part 2

2018-09-20 18:293790
用户头像

发布了 1008 篇内容, 共 370.5 次阅读, 收获喜欢 340 次。

关注

评论

发布
暂无评论
发现更多内容

医院预约管理系统开发

ALVIS

主流分布式文件系统选型,写得太好了!

编程菌

Java 编程 程序员 计算机 技术宅

爱奇艺搜索排序算法实践(内附福利)

爱奇艺技术产品团队

排序算法 nlp 搜索

这次不编故事了,阿里Spring Cloud Alibabab笔记,自己领吧

Java架构师迁哥

新思科技解读金融服务业的应用安全误区与现实

InfoQ_434670063458

新思科技 金融服务安全

万能小哥系统开发是什么?

ALVIS

正式发布!中国信通院联合腾讯安全等起草单位,共同发布研发运营安全工具系列标准

腾讯安全云鼎实验室

云计算 中国信通院 安全工具系列标准

智慧农业陷转型困局,区块链如何“对症下药”?

CECBC

8月日更挑战正式开启,新人大奖等你来领!

InfoQ写作社区官方

8月日更 热门活动

手撕环形队列系列二:无锁实现高并发

实力程序员

程序员 数据结构 并发 无锁 环形队列

品牌轮:用MOT引导的品牌体验模型

石云升

用户体验 关键时刻 7月日更 体验设计

好慷在家系统开发前景

ALVIS

企业管云就用行云管家!省时省力省心!

行云管家

云计算 企业上云 行云管家 企业管云

新工具上线!sdkmgr命令行助力流水线构建

科技汇

完备的娱乐行业知识图谱库如何建成?爱奇艺知识图谱落地实践

爱奇艺技术产品团队

nlp 搜索 知识图谱

下一个颠覆的领域:区块链如何影响审计行业?(中)

CECBC

RocketMQ事物消息调研

crazylle

RocketMQ 事物消息

你以为你懂redis?等看完某宝付费的资源你就知道了

Java架构师迁哥

阿里巴巴Java岗面试题库更新(第8版)

Java架构师迁哥

绿地回收系统开发|现成小程序

ALVIS

Fil收益怎么看?Fil一天收益如何?

区块链 IPFS fil收益 filecoin生态

解读区块链技术对量子攻击的脆弱性以及量子安全区块链的解决方案

CECBC

打造“云边一体化”,时序时空数据库TSDB技术原理深度揭秘

数据库 大数据 时序数据库 tsdb 数据智能

Apache ShardingSphere:由开源驱动的分布式数据库中间件生态

亚马逊云科技 (Amazon Web Services)

人工智能 开源数据库

ISC网络安全大会关于“新型网络犯罪打击与治理”的分析

郑州埃文科技

网络安全 isc

微服务架构设计模式-进程间通信

以吻封笺

微服务 设计模式

绿色篮子系统开发是什么模式?

ALVIS

行云管家荣获CFS第十届财经峰会2021科技创新引领奖!

行云管家

行云管家 财经峰会

希望体验更好的开发流程

escray

学习 极客时间 朱赟的技术管理课 7月日更

ONES 当选深圳信创联盟副理事长单位,助力国产软件工业发展

万事ONES

信创 ONES

如何识别并解决复杂的dcache问题

安第斯智能云

后端

使用Apache Kafka和KSQL实现流处理普及化——第二部分_大数据_Robin Moffatt_InfoQ精选文章