什么是 Apache Kafka 以及为何要使用它?
现在,数据处理和消费的方式与以往的实践是不一样的。在过去,数据会存储到一个数据库中,并且会进行批处理以获取分析结果。
尽管这种方式依然占有一席之地,但是更现代化的平台能够让我们在数据进入系统的时候对其进行实时处理。
Apache Kafka(简称为 Kafka)是一个分布式的事件存储和流处理平台,能够存储、消费和处理数据流。
要理解 Kafka 是如何运行的,我们需要掌握五个基本概念:
事件(Event,或者叫做消息):事件是一个具有时间戳的键-值对,代表了存储在系统中需要进行处理的数据。从 Kafka 的角度来看,它就是一堆字节。
分区(Partition):分区是生产和消费事件的地方。在分区中,能够保证事件的顺序。
主题(Topic):一个主题是由一个或多个分区组成的。主题是开发人员使用的工作单元,能够消费或生产事件。
消费者(Consumer):消费者订阅某个主题,这样每当事件发布到该主题时,消费者都会得到通知。
生产者(Producer):生产者发布事件到主题(实际上是属于该主题的某个分区)中。
Apache Kafka 的一个重要特点是它在创建时充分考虑到了可扩展性和容错性,使其非常适合高性能的应用。我们认为,Kafka 可以取代一些传统的消息系统,比如 Java Message Service(JMS)和 Advanced Message Queuing Protocol(AMQP)。
Apache Kafka 能够与当前使用的大多数语言进行集成,但是在本文中,我们将会讨论它与 Java 的集成,具体来讲是与Quarkus Java 栈的集成。
Quarkus 是什么?
Quarkus 是一个全栈、Kubernetes 原生的 Java 框架,适用于 Java 虚拟机(JVM)和原生编译环境,专门为容器中运行的 Java 进行了优化,使其成为 Serverless、云和 Kubernetes 环境下的高效平台。
Quarkus 没有重复发明轮子,而是使用了由标准/规范支撑的知名企业级框架,并且使它们能够借助 GraalVM 编译为二进制文件。
如何在 Quarkus 中集成 Kafka?
Quarkus 使用SmallRye Reactive Messaging项目实现与 Apache Kafka 的交互。
Quarkus 入门
要开始使用 Quarkus,最快捷的方式就是通过其初始化页面添加所需的依赖。每个服务可能会有不同的依赖,我们可以在 Java 11 或 Java 17 之间选择。要实现 Quarkus 与 Kafka 的集成,我们需要添加_SmallRye Reactive Messaging - Kafka Connector_扩展。
要开发的应用
假设我们是一家影视流媒体公司,其中有个用例就是保存电影。这的确可以通过传统数据库来实现,但是考虑到要实现良好的用户体验需要实时互动,因此我们决定将它们存储在 Kafka 中。
所以,我们会有两个服务,其中一个服务会在用户停止播放电影的时候生成一个事件,另外一个服务则会消费这些事件,并以服务器事件的方式对其进行展示和流式处理。
下图展示了应用的架构:
接下来,我们使用 Quarkus 实现这些服务并阐述一些内部的细节。
电影播放的生产者(Movie Plays Producer)
每当用户停止播放电影的时候,该服务会向 Kafka _PlaytimeMovies_主题发送一个事件。该事件包含了电影的 ID 以及观看的总时间。为了便于显示,我们将会使用一个定时器自动触发模拟用户观看电影的逻辑。
当该服务启动的时候,它将会生成一些电影到 Kafka _Movies_主题中。
创建项目
导航至 Quarkus 的初始页面并选择_smallrye-reactive-messaging-kafka_以便于集成 Kafka。然后,选择_Jackson_扩展,用于实现事件在 JSON 和 Java 的对象-字节数组之间进行编排/解排。同时,取消选中生成_Started Code_的选项。
如下面的截图所示:
你也可以跳过这个手动的步骤并导航至Kafka Quarkus Generator链接,在这里,所有的内容都已经选择好了。然后,点击_Generate your application_按钮,以下载应用骨架的压缩文件。
解压文件,并在你最喜欢的 IDE 中打开项目。
开发
我们创建两个 POJO,其中一个代表Movie
,另外一个代表PlayedMovie
。
public class Movie {
public int id;
public String name;
public String director;
public String genre;
public Movie(int id, String name, String director, String genre) {
this.id = id;
this.name = name;
this.director = director;
this.genre = genre;
}
}
复制代码
Movie
包含了电影的id
、name
、director
和genre
。
public class PlayedMovie {
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
复制代码
PlayedMovie
包含了id
和duration
字段,分别代表了已播放电影的标识符以及用户观看的时长。
我们还需要一个新的类MovieKafkaGenerator
,它负责将电影存储到 Kafka 主题中并模拟播放电影。
想要向主题上发布事件,我们首先需要两个类,分别是@Outgoing注解和Record类,@Outgoing
用来以通道(channel)的形式指定要将事件发送至何处,在这里我们将其配置为指向_Movies_主题,Record
代表了事件的包装器,以键/值的方式进行声明。
现在,我们创建MovieKafkaGenerator
类,它会生成电影到 Kafka _Movies_主题中。
package org.acme.movieplays;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
@ApplicationScoped
public class MovieKafkaGenerator {
private List<Movie> movies = List.of(
new Movie(1, "The Hobbit", "Peter Jackson", "Fantasy"),
new Movie(2, "Star Trek: First Contact", "Jonathan Frakes", "Space"),
new Movie(3, "Encanto", "Jared Bush", "Animation"),
new Movie(4, "Cruella", "Craig Gillespie", "Crime Comedy"),
new Movie(5, "Sing 2", "Garth Jennings", "Jukebox Musical Comedy")
);
// 填充电影到Kafka主题中
@Outgoing("movies")
public Multi<Record<Integer, Movie>> movies() {
return Multi.createFrom().items(movies.stream()
.map(m -> Record.of(m.id, m))
);
}
}
复制代码
在这个类中,有几件重要的事情需要注意:
这个类是 CDI 作用域的类(@ApplicationScoped
)
简单起见,我们将电影定义在了一个列表中
@Outgoing
注解用来设置将事件发往何处(即 movies 通道)。movies()
方法返回的元素将会自动发往定义的通道中。返回的类型可能是反应式/异步类型(在 Quarkus 中,通常会是io.smallrye.mutiny.Multi
),以包装事件的内容。在后面,我们会将该通道配置为指向一个主题。
Record
(即事件/消息)使用电影的 ID 作为 key,并使用电影对象作为值。
最后一步就是配置连接至 Kafka 实例的 Quarkus 参数。Quarkus 应用可以通过src/main/resources/
目录下的application.properties
文件进行配置。
通过如下的通用属性,我们可以很容易地配置通道和主题之间的关系:
mp.messaging.outgoing.<channel_name>.topic=<topic>
复制代码
在我们的样例中,定义如下所示:
mp.messaging.outgoing.movies.topic=movies
复制代码
你可能想知道 Kafka broker 的位置是在哪里配置的呢?对于本地开发来说,我们不需要进行配置,因为 Quarkus 为 Kafka 提供了Dev Services特性。Dev Services 会提供所需的外部依赖的实例(如数据库实例、Kafka broker、Keycloak 服务等),这些实例会在容器运行时中提供,比如 Podman 或者其他兼容 OCI 的工具。从开发者的角度来说,如果我们包括了某个扩展但是并没有对其进行配置的话,Quarkus 会自动启动服务,并配置应用来使用它。
基于此,我们在开发过程中,并不需要其他的配置参数。Quarkus 就会帮助我们实现这一点。
重要提示:要在运行样例,我们需要在本地机器上有一个正在运行的 Docker 主机。如果没有本地没有 Docker 的话,那就需要有一个部署好的 Kafka broker,我们将会在后文介绍如何在 Quarkus 中配置这个“远程”实例。
Docker 主机启动并运行起来之后,在终端中以 Quarkus dev 模式下启动应用:
./mvnw compile quarkus:dev
复制代码
终端输出如下所示:
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-producer >--------------------
[INFO] Building movie-plays-producer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 11:37:24,474 INFO [io.qua.sma.dep.processor] (build-8) Configuring the channel 'movies' to be managed by the connector 'smallrye-kafka'
2022-03-21 11:37:24,483 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-30) Generating Jackson serializer for type org.acme.movieplays.Movie
--
--
Checking Docker Environment 2022-03-21 11:37:25,018 INFO [org.tes.uti.ImageNameSubstitut
--
2022-03-21 11:37:28,500 INFO [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-22) Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=OUTSIDE://localhost:32769
2022-03-21 11:37:29,581 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 6.666s.
2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
复制代码
应用首先会被编译,然后自动配置 Jackson 序列化器(还记得我们在开始的时候就添加了该扩展),从而将Movie
对象序列化为字节数组以存储在 Kafka 主题中。随后,Kafka broker 会自动在localhost:32769
启动,应用会自动配置连接到此处。最后,应用启动并运行,所有的电影会被插入到 Kafka _Movies_主题中。
我们可以使用kcat工具来探查主题的内容。在终端窗口运行如下的命令,记得替换成你的 Kafka broker 地址:
kcat -b localhost:32769 -t movies -C -K:
:{"id":1,"name":"The Hobbit","director":"Peter Jackson","genre":"Fantasy"}
:{"id":2,"name":"Star Trek: First Contact","director":"Jonathan Frakes","genre":"Space"}
:{"id":3,"name":"Encanto","director":"Jared Bush","genre":"Animation"}
:{"id":4,"name":"Cruella","director":"Craig Gillespie","genre":"Crime Comedy"}
:{"id":5,"name":"Sing 2","director":"Garth Jennings","genre":"Jukebox Musical Comedy"}
% Reached end of topic movies [0] at offset 5
复制代码
停掉应用,我们接下来添加生成电影播放的内容。
打开MovieKafkaGenerator
类并添加如下代码:
private Random random = new Random();
@Inject
Logger logger;
@Outgoing("play-time-movies")
public Multi<Record<String, PlayedMovie>> generate() {
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.onOverflow().drop()
.map(tick -> {
Movie movie = movies.get(random.nextInt(movies.size()));
int time = random.nextInt(300);
logger.info("movie {0} played for {1} minutes", movie.name, time);
// Region作为key
return Record.of("eu", new PlayedMovie(movie.id, time));
});
}
复制代码
在这个新方法中,有几件重要的事情需要注意:
生成的事件会被发送至 play-time-movies 通道。
每秒钟会触发一个新的事件。
这里会在列表中随机选择一部电影,并分配一个随机的播放时间。
在本例中,会创建一个Record
(事件/消息),其中 key 代表了用户的区域,value 则是PlayedMovie
对象。
最后,打开application.properties
文件并配置新的通道:
mp.messaging.outgoing.play-time-movies.topic=playtimemovies
复制代码
再次启动应用,现在应用每秒钟都会生成一个新的事件。
./mvnw compile quarkus:dev
复制代码
2022-03-21 12:36:01,297 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18258: Kafka producer kafka-producer-play-time-movies, connected to Kafka brokers 'OUTSIDE://localhost:32771', is configured to write records to 'playtimemovies'
2022-03-21 12:36:01,835 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 148 minutes
2022-03-21 12:36:02,336 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 288 minutes
2022-03-21 12:36:02,836 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 176 minutes
复制代码
每当有新事件发布时,控制台都会展示日志。现在,我们使用 kcat 来探查主题的内容。
kcat -b localhost:32773 -t playtimemovies -C -K:
eu:{"id":4,"duration":88}
eu:{"id":3,"duration":291}
eu:{"id":1,"duration":228}
eu:{"id":2,"duration":165}
eu:{"id":1,"duration":170}
eu:{"id":4,"duration":75}
复制代码
电影播放的消费者(Movie Plays Consumer)
该服务负责消费来自 Kafka 主题的事件。被消费的事件是以 HTTP 服务器端事件的形式流向调用者的。事件是 playedmovie 数据,其中包含了播放的电影 ID 以及已观看的总时长。
创建项目
导航至 Quarkus 的初始页面,选择 resteasy-reactive-jackson 来实现 JAX-RS 反应式端点,借助 Jackson 实现对 Java 对象和 JSON 之间的编排/解排,并选择 smallrye-reactive-messaging-kafka 扩展实现与 Kafka 的集成。同时,取消选中 Started Code 生成选项。
同样,你可以跳过这个手动的步骤并导航至Kafka Quarkus Generator链接,在这里,所有的内容都已经选择好了。然后,点击_Generate your application_按钮,以下载应用骨架的压缩文件。
解压文件,并在你最喜欢的 IDE 中打开项目。
开发
该服务会处理PlayedMovie
事件,所以我们为该元素创建一个简单的 POJO:
public class PlayedMovie {
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
复制代码
然后,创建名为 PlayedMovieResource 的新类,并创建一个 JAX-RS 反应式端点,以便于流入来自 Kafka 主题的事件。
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Multi;
@Path("/movies")
public class PlayedMovieResource {
@Channel("played-movies")
Multi<MoviePlayed> playedMovies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<PlayedMovie> stream() {
return playedMovives;
}
}
复制代码
这是一个很小的类,但是做了很多的事情:
使用@Path
注解,暴露 HTTP 端点至/movies
路径。
处理名为 played-movies 的通道上的事件。每当有新的事件发送至该通道(即事件发送至 Kafka 主题)时,它会自动发布到Multi
实例中。
当使用 HTTP GET 方法调用/movies
端点时,应用会在该通道中以流的方式传输接收到的事件。
最后,在application.properties
文件中对通道进行配置,在这里我们配置了通道(包括主题和偏移策略)并且将监听端口修改为 9090,这样的话就不会与监听 8080 端口的生产者服务冲突。
mp.messaging.incoming.movies-played.topic=playtimemovies
mp.messaging.incoming.movies-played.auto.offset.reset=earliest
%dev.quarkus.http.port=9090
复制代码
在终端窗口中启动并运行_movie-player-producer_服务后,我们启动_movie-player-consumer_。在一个新的终端窗口中,以 dev 模式运行服务。
./mvnw compile quarkus:dev`
复制代码
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >--------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
….
2022-03-21 17:59:08,079 INFO [io.qua.sma.dep.processor] (build-13) Configuring the channel 'movies-played' to be managed by the connector 'smallrye-kafka'
2022-03-21 17:59:08,092 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-33) Generating Jackson deserializer for type org.acme.movieplays.MoviePlayed
….
2022-03-21 17:59:10,617 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-movies, connected to Kafka brokers 'localhost:32771, belongs to the 'movie-plays-consumer'
….
2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-consumer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.114s. Listening on: http://localhost:9090
2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-03-21 17:59:10,694 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
复制代码
应用首先会被编译,然后自动配置 Jackson 序列化器(还记得我们在开始的时候就添加了该扩展),从而能够将存储在 Kafka 主题中的字节数组对象反序列化为 Java 对象。运行中的应用会探测到已经启动的 Kafka 集群并自动进行连接。最后,应用会在 9090 端口启动。
在新的窗口中,运行如下的命令,获取流式数据:
curl -N localhost:9090/movies
复制代码
data:{"id":4,"duration":213}
data:{"id":4,"duration":3}
data:{"id":3,"duration":96}
data:{"id":5,"duration":200}
data:{"id":2,"duration":234}
data:{"id":1,"duration":36}
data:{"id":1,"duration":162}
data:{"id":3,"duration":88}
复制代码
我们可以观察到来自 Kafka 主题的数据是如何自动进行流式处理并以 HTTP 请求的形式发出的。
上述的样例达成了两个目的,首先以Multi
的形式注入一个通道,以便于接收事件,然后将这些事件发送至带有@Incoming
注解的方法中。
停止消费者服务并添加如下的代码片段至PlayedMovieResource
类中,以消费来自 Kafka _Movie_主题的事件:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
@Inject
Logger logger;
@Incoming("movies")
public void newMovie(Movie movie) {
logger.infov("New movie: {0}", movie);
}
复制代码
在本例中,每当有新的电影发布至 movies 通道(_Movies_主题)时,newMovie()
方法就会被调用。方法的参数就是 Kafka 主题的负载。
在application.properties
文件中配置通道,使其指向_Movies_主题。
mp.messaging.incoming.movies.topic=movies
mp.messaging.incoming.movies.auto.offset.reset=earliest
复制代码
现在,再次启动_movie-plays-consumer_服务,我们会发现有些日志行打印出了电影的列表:./mvnw compile quarkus:dev
[INFO] Scanning for projects...
[INFO]
[INFO] -------------------< org.acme:movie-plays-consumer >---------------------
[INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
…
2022-03-21 17:59:12,146 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-13) SRMSG18256: Initialize record store for topic-partition 'movies-0' at position -1.
2022-03-21 17:59:12,160 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
2022-03-21 17:59:12,164 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
2022-03-21 17:59:12,165 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
2022-03-21 17:59:12,166 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
2022-03-21 17:59:12,167 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
复制代码
外部的 Kafka Broker
你也可以使用自己的 Kafka broker,只需要在application.properties
文件中配置kafka.bootstrap.servers
属性即可。
kafka-bootstrap.servers=kafka:9092
复制代码
结论
到目前为止,连接 Quarkus 应用到 Apache Kafka 并通过主题生产和消费消息/事件是非常容易的。消费 Kafka 消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)的话,又该怎么办呢?如果我们需要对事件做一些关联,又该怎么办呢(比如,_playedmovie_事件中包含电影的id
,但是我们该如何与_Movie_主题联合起来获取电影的名字呢)?
当然,我们可以编写专门的代码来操作所有发送的数据。不过,Kafka Streams 项目能够帮助我们在事件生成的时候来消费事件流,从而进行任意的转换、流连接等操作,并且有选择性地将新的数据写回到主题中。
Kafka Streams 是一个很大的话题,它在解决实时处理问题上的各种功能令人印象深刻。我们将会用一篇专门的文章介绍 Kafka Streams 和 Quarkus,敬请关注。
作者简介:
Alex Soto 是红帽公司的开发者体验总监。他对 Java 领域、软件自动化充满热情,他相信开源软件模式。Soto 是 Manning 的《Testing Java Microservices》和 O’Reilly 的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是 Java Champion,是国际演讲者和 Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ⚛️),随时了解 Kubernetes 和 Java 领域的动态。
原文链接:
Getting Started to Quarkus Reactive Messaging with Apache Kafka
评论