写点什么

使用 Apache Kafka 和 KSQL 实现普及化流处理

  • 2018-06-30
  • 本文字数:7309 字

    阅读完需:约 24 分钟

本文要点

  • 大多数的流处理技术,需要开发人员使用 Java 或 Scala 等编程语言编写代码。
  • KSQL 是 Apache Kafka 的数据流 SQL 引擎,它使用 SQL 语句替代编写大量代码去实现流处理任务。
  • KSQL 基于 Kafka 的 Stream API 构建,它支持过滤、转换、聚合、连接、加窗操作和 Sessionization(即捕获单一会话期间的所有的流事件)等流处理操作。
  • KSQL 的用例涉及实现实时报表和仪表盘、基础设施和物联网设备监控、异常检测和欺骗行为报警等。

你会根据一分钟前的交通信号灯过马路吗?当然不会!当前,现代企业或者出于竞争上的压力,或者因为企业的客户对产品或服务的交互方式有着更高的期望,它们也面对着同样的需求。

如果人们在 iPad 上轻点按钮就可以租赁和观看最新的影片,那么为什么还要因为银行账户吃紧而必须等待数小时?

数据在现代企业中处于核心地位,数据的量也在不断增加中,并且持续快速变化。流处理技术正是支持企业实时利用这些洪流信息的一种技术。目前为重新塑造自身的业务,Netflix、奥迪、PayPal、Airbnb、Uber 和纽约时报等上万家企业已经选择了 Apache Kafka ®作为流处理平台的事实标准。

人们的很多日常活动,例如阅读报纸、在线购物、预订酒店或航班、搭乘出租车、玩电子游戏或是拨打电话,其后台都已由 Kafka 提供支持。

为什么需要流处理?

为了说明流处理技术的作用,我在此给出一个适用于多个不同行业的很好例子。假设我们需要去实时创建并维护客户的全面档案。这样做出于很多的原因,包括:

  1. 为创造更好的客户体验。例如,“这位高级客户在过去五分钟内尝试多次结账购物车,但由于我们最近的网站更新错误而产生失败。因此,我们需要立即向该客户提供折扣,并对所造成的不良用户体验致歉。”
  2. 为尽量降低风险。例如,“这笔新的付款操作似乎存在欺诈。因为该付款是在美国境外发起的,但客户的手机应用报告她身处纽约市。我们应立即阻止这笔付款,并第一时间联系该客户。”

该用例需要实时汇集来自各种内部渠道的以及一些可能外部渠道的数据,然后将这些信息整合到全面客户档案(也称为客户的“360 度档案”)中。而且一旦任何渠道有新的信息可用,档案都会得到立即更新。

下图描绘了我们如何使用 Kafka 实现该用例的高层设置。其中,客户数据从各种来源的数据流中持续收集。全面客户档案保持在表中,表根据这些数据来源构建并持续更新。所有这些操作都是实时的,并具有一定的规模。

图1 从内部和外部客户数据流实时构建全面客户档案

上图的概念非常简单,它与人们对人体神经系统工作方式的理解几乎匹配。神经系统将来自眼睛、耳朵、四肢等传感器的数据传输到大脑,以便人们能够快速做出明智的决定,例如过马路是否安全。这就是为什么Kafka 常被认为是数字原生公司的“中枢神经系统”。

然而,从目前的情况看,流处理领域入门的门槛还是相当高的。当前最广为使用的一些流处理技术,包括Apache Kafka 的Streams API,仍然需要用户使用Java 或Scala 等编程语言编写代码,即使是实现最简单的任务也是如此。对编程技巧的这种苛刻要求,已经阻碍了许多企业充分利用流处理所能提供的优势。但值得庆幸的是,现在我们有了一种更简单的方法。

KSQL 简介,Apache Kafka 的数据流 SQL 引擎

KSQL 于 2017 年推出,是 Apache Kafka 的数据流 SQL 引擎。KSQL 降低了人们进入流处理的门槛。用户不必编写大量的代码,只需使用简单的 SQL 语句就可以开始处理流处理。例如:

复制代码
CREATE STREAM fraudulent_payments AS
SELECT * FROM payments-kafka-stream
WHERE fraud_probability > 0.8

就这么简单!虽然我们可能无法一眼看出,上面给出的 KSQL 流数据查询在实现上是分布式的、容错的、弹性的、可扩展的和实时的,这些特性可以满足现代企业对数据的需求。KSQL 实现了这一目标,它是建立在 Kafka 的 Streams API 之上的,充分地利用了 Kafka 在分布式流处理方面的强大技术基础。

如果我们想使用 Java 或 Scala 直接调用 Kafka 的 Stream API 实现上述 KSQL 查询,那么我们的应用代码段可能需要做如下编写。当然,这一代码段还需要编译、打包并应用部署。

复制代码
// Using Kafka’s Streams API
object FraudFilteringApplication extends App {
val builder: StreamsBuilder = new StreamsBuilder()
val fraudulentPayments: KStream[String, Payment] = builder
.stream[String, Payment]("payments-kafka-topic")
.filter((_ ,payment) => payment.fraudProbability > 0.8)
fraudulentPayments.to("fraudulent-payments-topic")
val config = new java.util.Properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-filtering-app")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
}

对于 Java 或 Scala 开发人员而言,Kakfa Streams API 是一个强大的软件库,它实现了将流数据处理集成到应用中。但是 KSQL 为开发人员提供了更宽广的基础,即一种仅使用 SQL 即可表达流数据处理需求的方式。

当然,读者还可以使用 KSQL 实现更多功能,不必局限于上面所展示的简单例子。KSQL 是采用 Apache 2.0 许可开源的,构建于 Kakfa 的 Streams API 之上。这意味着,KSQL 支持很大范围的流数据处理操作,包括过滤、转换、聚合、连接、加窗操作和 Sessionization(即捕获单一会话期间的所有的流事件)等。使用 KSQL,可轻松实现:

  • 驱动实时报告和仪表盘;
  • 监控基础设施和物联网设备;
  • 检测异常,并对欺诈活动报警;
  • 分析基于会话的用户活动;
  • 执行实时 ETL;
  • 以及更多操作……

下面给出几个使用 KSQL 的例子。

例子一:使用 KSQL 实现在线数据集成和扩充

企业开展的大部分数据处理,都可归为数据扩充(Data Enrichment)或数据整理(Data Wrangling),即如何从多个系统中提取、转换和连接数据,并存储到键值存储、RDBMS、搜索索引、缓存等数据服务系统中。KSQL 可与 Kafka Connect 的连接器一起使用,操作 Oracle、MySQL、Elasticsearch、HDFS 或 S3 等存储系统,实现将批量数据集成转换为实时数据集成。

下面的 KSQL 查询使用了流数据表连接,将存储在数据表中的元数据扩充到数据流中:

复制代码
CREATE STREAM vip_users_clickstream AS
SELECT user_id, user_country, web_page, action
FROM website_clickstream c
LEFT JOIN users u ON u.user_id = c.user_id
WHERE u.level = 'Platinum';

为符合 GDPR 规范,,我们需要在加载数据流到其它系统之前将其中的 PII(个人验证信息,personally identifiable information)数据过滤掉。在此,我们需要移除上例中建立的vip_users数据流中的usr_id域。具体做法是,将不将该域添加到结果数据流中,在结果数据流中只保留了user_countryweb_pageaction域:

复制代码
CREATE STREAM anonymized_vip_clickstream AS
SELECT user_country, web_page, action
FROM vip_users_clickstream;

例子二:使用 KSQL 实现实时监控和分析

尽管实时监控和实时分析是两种完全不同的用例,但是它们所需要实现的流数据处理功能是非常类似的。KSQL 可以直接对原始事件流定义一些适当的度量,无论数据流是生成自数据库更新、应用、移动设备、车辆等来源。

下例给出的查询基于在五分钟窗口内观察到车辆遥测数据中的错误数,实时计算可能出故障的车辆。该例是一类特殊的聚合操作,即窗口聚合。数据首先被分组为窗口数据(在本例的查询中,分组和加窗操作是基于输入数据中的时间戳信息),然后每个窗口做单独聚合。

复制代码
CREATE TABLE possibly_failing_vehicles AS
SELECT vehicle, COUNT(*)
FROM vehicle_monitoring_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
WHERE event_type = 'ERROR'
GROUP BY vehicle
HAVING COUNT(*) >= 3;

KSQL 的另一个用法是自定义业务层面的度量,这些度量是从监控和报警中实时计算得到的。例如,展示一个 AAA 电子游戏特许经营商(“最近的游戏扩展是否增加了游戏时间?”)的并发在线玩家数量,报告电子商务网站中放弃购买的购物车数量(“我们最新的在线商店更新是否更加方便了客户结账?”)。类似,也可以使用 KSQL 为用户的业务应用定义一个用于表示是否正确的概念,进而检查该概念是否符合生产中的要求。

上面的查询例子正好也是一个有状态查询的例子。有状态的流处理可以说是流处理中最常用的功能,同时在实现与正确处理上非常具有挑战性。下面我将做详细介绍。

实现流数据处理中的记忆:有状态流处理

例子二中的查询对输入流数据执行聚合操作。聚合操作是一种有状态的操作,即在操作中需要维护和更新状态。例子二的查询在观测到新的错误前,需要记住每个时间窗口和每辆车的上一次错误计数情况,否则就无法确定查询结果是否会超出五分钟窗口期内的车辆错误阈值。分布式流处理的一个主要挑战,就是要在保证这种有状态操作可以高效且正确工作的同时,考虑到诸如机器崩溃、网络错误和大规模运行等因素。

相比之下,无状态操作更为简单。计算可以在机器间自由迁移,这样操作的代价很低,易于实现。而有状态操作要实现计算的迁移,还需要执行诸如将历史状态从故障机器移动到活动机器,并且要有效地完成,期间还可能会涉及以 GB 为单位的数据迁移。其中最重要的是,数据迁移必须正确地完成。例如,在例子二给出的 KSQL 查询中,没有人会希望仅仅因为相同的错误信息已经得到多次处理,因此就向汽车司机发出引擎即将故障的虚假警报!

为实现更快的处理和更好的容错能力,KSQL 通常会运行在多台机器、虚拟机或容器上。那么 KSQL 如何解决有状态的挑战?答案是,KSQL 建立是在 Kafka 的 Streams API 上的,这使得所有的 KSQL 查询(包括有状态查询)具有如下特征:

  • 容错:在机器出现故障时,状态和计算需要从故障机器自动迁移到活动的机器上。实现容错,一方面需要持续地对从 KSQL 到 Kafka 的状态做“流备份”,另一方面应在需要时自动地从 Kakfa 将状态恢复回 KSQL。
  • 弹性:用户可以在操作现场中随时添加并移除新机器,扩展或缩小处理规模,而不会造成数据丢失,依然给出正确的处理结果。
  • 扩展性:将处理负载和状态自动地扩展到各台机器,实现对数据的协作处理。扩展性是通过使用 Kafka 的处理协议和分区数据存储实现的。其中,处理任务根据数据的分区情况扩展到各台机器做并行处理。

由于这些属性在 KSQL 中是开箱即可用的,因此用户只需要专注于为自己的流处理需求,编写所需的 SQL 语句。出于同一原因,KSQL 非常适合构建现代部署环境,例如基于 Docker、Kubernetes 或云原生的环境。

流 - 表二元性(Stream-Table Duality)

对数据流和表提供头等支持,这是 Kafka 的一个独有特性。读者是否注意到,我们在前面的的例子中同时给出了数据流和表?例如,虽然例子二的输入是一个数据流,但是该有状态查询的结果是一个表:

复制代码
CREATE TABLE possibly_failing_vehicles AS
SELECT vehicle, COUNT(*)
FROM vehicle_monitoring_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
WHERE event_type = 'ERROR'
GROUP BY vehicle
HAVING COUNT(*) >= 3;

读者可能会思考,“数据流和表两者间有何差别?”,并且更为重要的是,“这种特性如何可用于我的日常工作中?”。简而言之,该特性非常有用。表和数据流为用户提供了必要的原语,可用于对数据建立推理和建模,回答对数据的业务问题。下面给出我能想到的一些最直观的英文类比:

  • Kafka 中的数据流是世界(或业务)从一开始至今的完整历史。它表示了过去和当前。当我们从当前走向未来时,新的事件会不断地添加到世界历史中。在 Kafka 中,事件写入、存储并读取自 Kafka 主题(Topic)。由于我们无法更改过去,因此 Kafka 是一种对事件不可变的、只添加的日志记录。从分析 RDBMS 角度看,我们可以认为数据流是对“事实”(Fact)的建模。
  • Kafka 中的是世界的当前状态(更通用的表述是某一时刻的状态)。它表示现在或过去的某个时刻,是世界事件历史的一个聚合,该聚合在我们从当前走向未来时会持续改变。表通过对数据流的处理而从流中获取,更准确地说是通过聚合这些数据流。在处理中使用了 Kafka 的 Streams API 和 KSQL 等工具。从分析 RDBMS 的角度看,我们可以认为表是对“维度”(Dimension)的建模,保持了一个键的当前值。

我们将这种内在关系称为“流 - 表二元性”(Stream-Table Duality)。如果读者希望更深入了解这种数据流和表间的有意思关系,推荐大家阅读我的一篇文章“ Kafka 和流数据处理中的数据流和表”。

稍等,那么表的概念出自何处?答案是,表来自于我们数十年在构建应用和服务中成功使用的数据库。在数据库中,表是首先需要构建的结构,它是各项工作的基础。数据流实际上也存在于数据库中,表现为构建数据库的交易日志(例如, MySQL 的 binlog ,或者 Oracle 的 Redo Log )。但这对用户而言是不可见的,用户并不直接操作这些数据流。我继续使用前面的类比,一个数据库知道现在,但它不知道过去。如果用户需要过去,那么请取出备份磁带。磁带实际上可以看成是一种硬件流……

这样,Kafka 和流数据处理是数据库的完全反转。正如上文所说,我们首先要构建数据流。而表是从数据流生成的。Pat Helland 将此归纳为“所有变化均源自于不可变性”(“ Immutability Changes Everything ”),“真相是日志(数据流),数据库是日志子集的一个缓存”。Kafka 知道当前,但也知道过去。这就是为什么纽约时报将其所有已发表的文章(可回溯至19 世纪50 年代的160 年间的新闻报道)存储在Kafka 中,作为事实来源(Source of Truth)。

简而言之,数据库认为表是最重要的,数据流次之;而Kafka 认为数据流最重要,表次之。在Kafka Streams 和KSQL 中,通过提供对数据流和表的原生支持,帮助用户构建了流数据处理和数据库之间的桥梁。为使该特性更为强大,用户可以使用Kafka Connnect 将现有数据库和表实时挂接到Kafka 中。根据上面的陈述,我们完全可以给出这样一个结论,即Kafka 是一种“数据流关系”系统,而非“仅是数据流”的系统。

数据流和表的进一步阐述

出于下述两个重要原因,流- 表二元性在实践中是至关重要的。首先,企业现有数据库中可能已经存在了大量的数据,并且企业希望能将这些数据应用于一些由流数据处理驱动的用例。其次,用户一旦着手实现自己的流处理应用,他们很快就会意识到,即使并不存在一个“真实”的数据库,大多数用例实际上还是需要将数据建模为流和表。这是因为表代表“状态”。无论何时要实现任何有状态处理,包括执行聚合(例如,计算某个关键业务度量的五分钟平均值)或连接(例如,通过维度表连接事实“流”实现实时数据扩充),表都会涉及其中。

下面给出一个流和表的例子。该例子使用KSQL 实时计算用户地理位置的变更次数。例如,Strava 这样的移动应用允许用户手动签到某个位置,并自动定期发送地理位置更新。查询的输入是一个地理位置更新数据流,输出结果是一个不断更新的表。由于 COUNT()是一种聚合操作,因此查询是一个有状态操作,即为了累加当前计数,首先必须记住当前的计数值!下面给出 KSQL 查询,它每秒执行会数次地理位置更新。对于每秒数十万次乃至更多此更新,操作也是同样的。

复制代码
CREATE TABLE geo_location_checkins_per_user AS
SELECT username, COUNT(*)
FROM geo_location_updates
GROUP BY username;

在下一个例子中,我们根据订单状态计算“订单”流的每小时汇总情况。这也是一个实践中常见的用例。同样,计算的结果是一个表(‘orders_hourly_aggregates’)。一旦有新订单到达,该表就会持续更新。该查询还展示了一些可在 KSQL 中使用的标量函数。

复制代码
CREATE TABLE orders_hourly_aggregates AS
SELECT
order_status,
COUNT(*) AS order_count,
MAX(ORDER_TOTAL) AS max_order_total,
MIN(ORDER_TOTAL) AS min_order_total,
SUM(ORDER_TOTAL) AS sum_order_total,
FROM orders
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY order_status

功能齐备(Batteries Included)的流数据处理

Kafka 提供了一个功能齐备的流媒体平台,可用于构建应用和系统。无论实施简单的流数据扩充,还是实现类似于欺诈检测或 360 度用户配置文件等更为复杂的操作,我们都需要一个易于使用的流处理解决方案,这正是所有功能和核心数据结构齐备 Kafka,特别是 Kafka 包括对流和表的头等支持。如果缺乏这种支持,用户最终需要构建一些不必要的复杂架构,将流(或仅支持流的)处理技术与 Cassandra 或 MySQL 等远程数据存储结合在一起,才能启用有状态处理,并且可能还必须添加 Hadoop / HDFS 才能启用支持容错的处理。那么用户需要同时抛接多少个科技球?

总结

本文是一次对使用 KSQL(Apache Kafka 的流 SQL 引擎)进行流处理的旋风之旅。文中给出了多个具体的例子,从更高层面介绍了 KSQL 是如何解决有状态流处理的挑战,以及 Kafka 和 KSQL 是如何通过对数据流和表提供很好的支持,为搭建数据流和数据库世界之间的桥梁提供帮助。KSQL 更易于读者端到端地实现自己的用例。

如果读者对 KSQL 产生了浓厚的兴趣,我推荐如下资源:

本文作者简介

Michael Noll 是 Confluent 公司的产品经理,该公司由 Apache Kafka 的创始人创立。此前,Michael 曾任 DNS 运营商 Verisign 大数据平台的技术主管。在 Verisign,他从零开始将基于 Hadoop,Kafka 和 Storm 的基础架构发展为一个处理数 PB 数据、跨多个数据中心的生产群集,并成为当时欧洲最大的大数据基础架构之一。Michael 也是大数据社区中一位知名科技博主 。在业余时间,Michael 担任Manning 等出版公司的技术审阅者,并常常在一些国际会议上发表演讲,例如Strata、ApacheCon 和ACM SIGIR 等。Michael 具有计算机科学博士学位。

查看英文原文: Democratizing Stream Processing with Apache Kafka and KSQL

2018-06-30 14:419269
用户头像

发布了 391 篇内容, 共 136.7 次阅读, 收获喜欢 256 次。

关注

评论

发布
暂无评论
发现更多内容

6倍性能一直加速一直快,云耀X实例值得中小企业拥有

轶天下事

OPPO举办OTalk 开发者交流专场,提供Android 15多元化适配服务

科技热闻

百度发布Comate代码知识增强2.0,国内首个支持实时检索智能代码助手

不叫猫先生

人工智能 百度 AI 百度Comate

丰富企业 AI 存储选择丨焱融科技与安擎完成兼容性互认证

焱融科技

焱融科技 高性能存储 存力 AI存储 安擎

继Bakkt之后的又一全新力作,ICE推出AI高频交易平台

科技汇

欧特克工程建设峰会在京召开

E科讯

柔性算力随心配,企业一键上云更智能

平平无奇爱好科技

继Bakkt之后的又一全新力作,ICE推出AI高频交易平台

科技热闻

和鲸科技联合中软国际教育,发布 AI 数智科研联合解决方案

ModelWhale

人工智能 科学数据

Apache Doris 2.1.3 版本正式发布

SelectDB

数据库 大数据 数据湖 数据分析 物化视图

高并发UE4/UE5像素流送云推流解决方案

点量实时云渲染

ue 像素流送 像素流 像素流送技术 UE4

Koupleless 内核系列|模块化隔离与共享带来的收益与挑战

SOFAStack

开源 模块 架构治理 蚂蚁集团 单体应用架构

性能测试中常用的性能指标有哪些?请解释每个指标的含义

测试人

软件测试 性能测试

鸿蒙OS NEXT的推出不仅仅面向App端

Geek_2305a8

京东商家智能助手:Multi-Agents 在电商垂域的探索与创新

京东零售技术

人工智能 agent LLM 企业号 5 月 PK 榜

什么是网络钓鱼攻击

德迅云安全杨德俊

全新品牌升级的 Pencils Protocl,构建 LRT 赛道新范式

大瞿科技

博思白板可以画思维导图吗?boardmix常见问题解答!

彭宏豪95

效率工具 在线白板 办公软件 在线协同 在线协作

Koupleless 单进程多应用如何解决兼容问题

SOFAStack

开源 应用架构 蚂蚁集团 兼容

MYSQL造数据占用临时表空间

不在线第一只蜗牛

MySQL 数据库

渣打国际商业银行与环旭电子完成签署3.2亿美元可持续金融绩效连结贷款

财见

加速企业上云数智化创新,云耀X实例有妙招

平平无奇爱好科技

鸿蒙快速开发该如何下手?

Geek_2305a8

软件测试学习笔记丨App性能测试方案-霍格沃兹

测试人

软件测试 性能测试 测试开发

阿布扎比:自 2023 年第一季度以来,资本之都 ADGM 资产管理规模创历史新高的 211%

财见

MES系统适用于哪些行业?MES系统具体功能有哪些?

万界星空科技

工业互联网 制造业 生产管理系统 mes 万界星空科技

湖南省气象信息中心:部署运行省人工智能气象应用支撑平台

ModelWhale

大数据 气象

和鲸携手中国石油大学,助力首届青岛市公共数据创新应用大赛璀璨启程

ModelWhale

公共数据

使用Apache Kafka和KSQL实现普及化流处理_大数据_Michael Noll_InfoQ精选文章