在本系列文章的第1部分和第2部分中,我们学习了Apache Kafka、Apache Kafka Streams 和Quarkus之间的集成。我们开发了一个简单的应用程序,向 Kafka 主题生成事件,并使用Kafka Streams实时消费和处理它们。
在那个例子中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,并在另一个 Kafka 主题中保存用户停止观看电影时的事件,并捕获影片播放的时间。我们实时对这些事件进行后期处理,计算电影播放超过 10 分钟的次数。
下图是这个应用程序的架构。
然后,在第3部分中,我们介绍了发件箱模式和 Debezium,用于避免在不同系统需要同步相同数据时发生的双写问题。
在前面的三篇文章中,我们已经从开发人员的角度学习了所有这些技术,并最终在开发人员的本地机器上(以开发模式)部署应用程序。
在本文中,我们将探讨如何将所有东西部署到生产环境,更具体地说,部署到 Kubernetes 中。我们将学习:
在 Kubernetes 中安装和管理 Apache Kafka 集群。
容器化 Quarkus 应用程序。
配置一个带有生产参数的 Quarkus 应用程序。
将 Debezium Embedded 迁移成 Debezium Server。
Kubernetes
Kubernetes 是一个开源的容器编配器,是部署微服务的事实上的平台。这些服务既可以在裸金属环境中运行,也可以在云环境中运行。
本文使用minikube作为 Kubernetes 集群,但同样的步骤应该适用于任何其他实现。
启动集群
在终端窗口中执行以下命令,在配备了 8GB 内存和 2 个 vCPU 的 VirtualBox 机器上启动集群。
minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096
[strimzi] minikube v1.24.0 on Darwin 12.5
minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1
To disable this notice, run: 'minikube config set WantUpdateNotification false'
✨ Using the virtualbox driver based on user configuration
Starting control plane node strimzi in cluster strimzi
Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ...
> kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s
> kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s
> kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s
> kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s
▪ Generating certificates and keys ...
▪ Booting up control plane ...
▪ Configuring RBAC rules ...
▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5
Verifying Kubernetes components...
Enabled addons: storage-provisioner, default-storageclass
❗ /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12.
▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A'
Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default
复制代码
在终端窗口执行下面的命令检查 Kubernetes 集群是否正常运行。
kubectl get nodes
NAME STATUS ROLES AGE VERSION
strimzi Ready control-plane,master 3m4s v1.22.12
kubectl get pods
No resources found in default namespace.
复制代码
Apache Kafka
在之前的文章中,我们通过 Quarkus 的开发模式来启动运行应用程序所需的外部依赖项(Kafka 集群和 MySQL 数据库)。从开发的角度来看,开发模式非常棒,但在部署到生产环境时,你会发现这些东西管理起来更加复杂。第一个障碍可能是在 Kubernetes 中安装和配置 Kafka 集群。
你可能想知道以下这些问题的答案:
Kafka 组件(Kafka、Zookeeper 等)需要使用哪个容器镜像?
如何在 Kubernetes 中轻松部署所有这些组件?
如何在 Kubernetes 中创建用户、主题或 HA?
安全性如何?你可以尝试手动完成所有这些事情,例如编写很长的 YAML 文件和使用 Kafka CI 工具配置 Kafka 组件。然而,还有另一种 Kubernetes 原生的、完全自动化和可复制的(非常适合 CI/CD)方法,就是使用 Strimzi。
Strimzi
Strimzi是一个Kubernetes Operator,通过控制器来创建、配置和保护 Kafka 集群,就像其他 Kubernetes 资源(如 Pod、Deployment、ConfigMap 等)一样。
Strimzi 项目包含三个 Operator——一个用于管理 Kafka 集群,一个用于管理主题,一个用于用户管理。
在 Kubernetes 集群中安装了 Strimzi Operator 之后,你只需要使用下面的 YAML 文件就可以启动并运行一个 Kafka 集群,其中包含了一个 Kafka 副本和三个使用临时存储(没有挂载持久卷)的 ZooKeeper 副本。
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
复制代码
接下来,我们将在已经启动的集群中安装 Strimzi。
安装 Strimzi
首先是创建一个命名空间来安装 Strimzi Operator。在本例中,我们使用了命名空间 kafka。在终端窗口中执行如下命令:
kubectl create namespace kafka
namespace/kafka created
复制代码
接下来,我们应用 Strimzi 安装文件,其中包括用于声明式管理 Kafka 集群、Kafka 主题和用户的 CRD(CustomerResourceDefinition)。
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码
运行下面的命令验证 Operator 是否安装正确。
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
复制代码
现在,我们开始创建带有 movies 主题的 Kafka 集群。我们将在这个主题中保存所有电影的信息,稍后 Kafka Streams 将消费这个主题,正如我们在本系列文章的第2部分中所看到的那样。
创建 Kafka 集群
创建一个新的文件(即 kafka.yaml)来安装一个带有一个副本的Kafka集群,不启用 TLS,作为内部Kubernetes服务。
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
复制代码
然后在终端窗口中使用 kubectl 命令创建这个资源:
kubectl create -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created
复制代码
此时,Strimzi 开始在默认命名空间中安装 Kafka 集群。
现在,我们通过获取默认的名称空间 Pod 来检查集群的创建情况。
kubectl get pods -n kafka
NAME READY STATUS
my-cluster-entity-operator-755596449b-cw82g 3/3 Running
my-cluster-kafka-0 1/1 Running
my-cluster-zookeeper-0 1/1 Running
复制代码
Kafka 集群已启动并运行。我们除了可以将 Kafka 作为 Kubernetes 资源安装之外,还可以查询和描述它。例如,在终端窗口中执行如下命令:
kubectl get kafka -n kafka
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster 1 1 True True
kubectl describe kafka my-cluster -n kafka
Name: my-cluster
Namespace: default
Labels: <none>
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: Kafka
Metadata:
Creation Timestamp: 2022-08-09T10:57:39Z
…
复制代码
当然,你也可以像删除其他 Kubernetes 资源一样删除它。此外,系统还创建了 4 个 Kubernetes 服务来访问 Kafka 集群:
kubectl get services -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
143m
my-cluster-kafka-bootstrap ClusterIP 172.30.77.150 <none> 9091/TCP,9092/TCP 21m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 21m
my-cluster-zookeeper-client ClusterIP 172.30.5.186 <none> 2181/TCP 21m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 21m
复制代码
应用程序用于访问集群的服务是 my-cluster-kafka-bootstrap,它公开了 Kafka 的 9092 端口。
在进入到应用程序部分之前,我们需要使用另一个 YAML 文件来创建和配置 movies 主题。
创建 movies 主题
Strimzi 有一个用于创建和管理主题的 Operator。要创建一个新主题,我们需要创建一个KafkaTopic类型的 Kubernetes 资源文件,在 strimzi.io/cluster 中指定主题的名称和集群的名称(在我们的例子中是 my-cluster)。我们使用下面的内容创建一个名为 movies-topic.yaml 的新文件。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: movies
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
复制代码
并应用这个文件:
kubectl apply -f movies-topic.yaml -n kafka
kafkatopic.kafka.strimzi.io/movies create
复制代码
和其他 Kubernetes 资源一样,我们也可以查询和描述它。
kubectl get kafkatopic -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True
movies my-cluster 1 1 True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
复制代码
描述已创建的主题:
kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092
复制代码
我们来检查一下创建的主题是否有端口转发。
在终端窗口执行如下命令:
kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092
Forwarding from [::1]:9092 -> 9092
复制代码
打开一个新的终端窗口,使用kcat工具列出 Kafka 集群的元素。我们可以使用 localhost 作为主机名,就像在上一步中使用端口转发技巧一样。
kcat -b localhost:9092 -L
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller)
4 topics:
topic "movies" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
复制代码
最后,我们停止端口转发进程,对项目进行容器化,就像我们在本系列文章的第3部分中所做的那样,并进行一些相应的配置,以便连接到 Kafka 集群。
生产者 Debezium
我们在系列文章的第3部分解决了双写问题,使用 Debezium(具体来说是Debezium Embedded)修复了这个问题,具体方法是监听来自 MySQL 服务器的事务日志,并在每次插入新的电影播放信息时生成带有数据的 Kafka 事件。你可以在本地机器上运行这个示例,使用开发服务启动所需的服务(MySQL 和 Kafka),并自动配置应用程序来连接它们。
现在有点不一样了——服务必须运行在 Kubernetes 集群中,包括在前面步骤中创建的 Kafka 集群和 MySQL 数据库。要让它在 Kubernetes 中运行,需要做出三个改变。
配置服务
首先要配置的是 Kafka 的主机名和端口,它们指向 Strimzi 创建的 Kubernetes 服务。打开 src/main/resources/application.properties 文件并添加下面的内容:
%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码
%prod 前缀表示这个属性仅在应用程序以 prod 模式下运行时使用(而不是在 dev 或 test 模式下)。
其次时配置插入影片信息的数据库连接。在 application.properties 文件中添加下面的内容。
quarkus.hibernate-orm.database.generation=drop-and-create
%prod.quarkus.datasource.username=alex
%prod.quarkus.datasource.password=alex
%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb
复制代码
稍后,我们将使用这些参数部署一个 MySQL 实例。现在,我们假设配置参数是正确的。
容器化
Quarkus 为创建容器提供了与Jib项目的集成,让容器镜像的构建和推送简单得只需要执行一个 Maven/Gradle 任务。
打开 pom.xml 文件,在 dependencies 部分添加以下依赖项:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
复制代码
添加了Jib依赖项后,它将在打包时自动将应用程序装入容器。因为Jib的一些默认配置选项可能不适用于所有情况,所以你可以在 src/main/resources/application.properties 中覆盖它们。对于本例,我们将覆盖生成的容器镜像的 group 和容器注册中心的主机。
打开 application.properties 文件,并添加下面的内容:
# Substitue the value with your account name
quarkus.container-image.group=lordofthejars
# Defaults to Docker.io, overriden to Quay.
quarkus.container-image.registry=quay.io
复制代码
你需要设置容器注册表的凭据,以便向注册表推送容器。你可以在执行构建之前运行 docker 的 login 命令。Maven 将从那里读取凭据,或者你可以使用 quarkus.container-image.username 和 quarkus.container-image.password 属性。
在项目的根目录下运行下面的命令来构建应用程序,它将构建出一个容器并将其推到指定的容器注册表中。
./mvnw clean package -DskipTests -Dquarkus.container-image.push=true
[INFO] Scanning for projects...
[INFO]
[INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------
[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---
[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target
…
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar]
[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)
复制代码
从最后一行日志可以看到,容器被创建,并使用 application.properties 中指定的账号推送到注册中心。
Kubernetes
在将容器推送到注册表之后,我们准备将服务部署到 Kubernetes 中。我们可以手动创建 Kubernetes 资源文件,但没有必要这么做,因为 Quarkus 为我们提供了一个Kubernetes扩展。
打开 pom.xml 文件,并在 dependencies 部分添加下面的依赖项。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes</artifactId>
</dependency>
复制代码
每次 Maven 打包应用程序时都会注册 Kubernetes 扩展,并生成将应用程序部署到 Kubernetes 集群的 kubernetes.yml 文件。你可以通过 application.properties 来修改生成文件的内容。例如,我们将 Kubernetes Service 设置为 LoadBalancer 而不是 ClusterIP,并将命名空间设置为 kafka。
打开 application.properties 文件并添加下面的内容。
quarkus.kubernetes.service-type=load-balancer
quarkus.kubernetes.namespace=kafka
复制代码
修改好以后运行 Maven package 生成部署文件。
./mvnw clean package -DskipTests
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
复制代码
检查生成的文件 target/kubernetes/kubernetes.yml:
cat target/kubernetes/kubernetes.yml
复制代码
输出的内容应该类似于下面这样:
---
apiVersion: v1
kind: Service
metadata:
…
name: movie-plays-producer-debezium
spec:
ports:
- name: http
port: 80
targetPort: 8080
selector:
app.kubernetes.io/name: movie-plays-producer-debezium
app.kubernetes.io/version: 1.0.0-SNAPSHOT
# Type is LoadBalancer as set in the application.properties file
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
…
name: movie-plays-producer-debezium
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: movie-plays-producer-debezium
app.kubernetes.io/version: 1.0.0-SNAPSHOT
template:
metadata:
…
spec:
containers:
- env:
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
# The image is correctly set automatically
image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
imagePullPolicy: Always
name: movie-plays-producer-debezium
ports:
- containerPort: 8080
name: http
protocol: TCP
复制代码
在本例中,配置参数是硬编码在 application.properties 中的,但你可以将它们作为环境变量传递进去。要在 Kubernetes Deployment 对象中设置环境变量,比如覆盖 Kafka 的配置,可以添加下面的行:
quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092
复制代码
生成文件的 env 部分将包含这个新的环境变量:
containers:
- env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: my-new-cluster:9092
image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
复制代码
全部放到一起
我们已经使用 Strimzi 在 Kubernetes 集群中部署了一个 Kafka 集群。我们将应用下面的文件(mysql-deployment.yaml)和 application.properties 中配置的参数部署 MySQL 实例。
apiVersion: v1
kind: Service
metadata:
name: mysql
labels:
app: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
labels:
app: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: mysql:8.0.30
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: alex
- name: MYSQL_DATABASE
value: moviesdb
- name: MYSQL_USER
value: alex
- name: MYSQL_PASSWORD
value: alex
ports:
- containerPort: 3306
name: mysql
复制代码
将 MySQL 实例部署到 Kubernetes 集群:
kubectl apply -f mysql-deployment.yaml -n kafka
复制代码
最后要部署的是应用程序本身。我们有两个选择,第一个是直接应用资源:
kubectl apply -f target/kubernetes/kubernetes.yml -n kafka
复制代码
第二个是将 quarkus.kubernetes.deploy 标志设置为 true 来打包应用程序。当这个标志设置为 true 时,Maven 将:
创建应用程序 JAR 文件。
构建容器镜像。
将容器镜像推送到注册表中。
自动应用 kubernetes.yml 资源文件到已连接的 Kubernetes 集群。为了验证所有的东西都能正确地运行,我们将发送一个插入新电影信息的请求,并验证在 Kafka 主题中插入的新事件。
在终端窗口中执行以下命令获取访问服务的 IP 和端口。
获取访问服务的 IP:
minikube ip -p strimzi
192.168.59.104
复制代码
获取 movie-plays-producer-debezium 的公开端口,也就是第二个端口。
kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:30306/TCP 67m
复制代码
运行 curl 命令,插入一条新的电影信息记录。
curl -X 'POST' \
'http://192.168.59.104:30306/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
复制代码
检查 Quarkus 日志,查看数据库运行的 SQL 语句:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-56f644cb87-5cchk 1/1 Running 0 6m5s
my-cluster-entity-operator-755596449b-cw82g 3/3 Running 0 35h
my-cluster-kafka-0 1/1 Running 0 35h
my-cluster-zookeeper-0 1/1 Running 0 35h
复制代码
打印 movie-plays-producer-debezium 的日志:
kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka
2022-08-11 07:44:25,658 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru
:)
Hibernate:
select
next_val as id_val
from
hibernate_sequence for update
Hibernate:
update
hibernate_sequence
set
next_val= ?
where
next_val=?
Hibernate:
insert
into
Movie
(director, genre, name, id)
values
(?, ?, ?, ?)
Hibernate:
insert
into
OutboxEvent
(aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
values
(?, ?, ?, ?, ?, ?, ?)
# Debezium reacts to the change
2022-08-11 07:44:25,867 INFO [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}
Movie Created and Reacting
复制代码
你还可以使用 Kafka 容器里的 Kafka-console-consumer.sh 脚本来检查 Kafka 中的内容。进入容器并运行下面的命令:
kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092
{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
复制代码
要返回本地终端窗口,请按 Ctrl+C 停止 kafka-console-consumer 进程,然后执行 exit 命令。
到目前为止,一切都很顺利。我们已经得到了与本系列文章第 3 部分中相同的应用程序,只是现在它运行在 Kubernetes 集群中。
到目前为止,我们使用的是 Debezium Embedded,但其实我们可以使用 Debezium Server。
Debezium Server
Debezium Server是一个可配置的、使用就绪的应用程序,它将事件从源数据库流到消息传递系统中,如 Kafka。它可以被注册成一个Kafka Connect组件,作为源连接器。
虽然我们不能在所有的场景中都使用 Debezium Server,但在我看来,使用这种方法有两个大的优点:
移除 Debezium Embedded
首先要做的是删除 Debezium Embedded 相关的依赖项。
打开 pom.xml 文件,删除以下依赖项:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
复制代码
下一步是删除所有与 Debezium Embedded 配置和监听器相关的代码。删除这些类文件——DebeziumConfiguration.java、DebeziumListener.java 和 MySqlJdbcParser.java。
因为所有与 Kafka 的交互都是通过 Kafka Connect 组件进行的,不需要 Kafka 代码,所以最后一步是从 pom.xml 中移除以下依赖项:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
复制代码
application.properties 文件中的这一行不再需要:
%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码
项目中已经没有了 Kafka 或 Debezium Embedded 依赖项。创建一个包含这些最新变更的容器镜像。
在终端窗口执行以下命令,删除之前的部署:
kubectl delete deployment movie-plays-producer-debezium
kubectl delete service movie-plays-producer-debezium
复制代码
要保留带有 Debezium Debezium 的容器镜像,请将 artifactId 更改为 movie-plays-producer-debezium-server。
然后将不带 Debezium 代码的新版本部署到 Kubernetes 集群中,如下所示:
./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
复制代码
运行以下命令验证新部署的服务:
kubectl get pods -n kafkaa
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-server-59db564b74-vhdmf 1/1 Running 0 73m
复制代码
部署 Debezium Kafka Connect
首先,部署一个 Kafka Connect 组件与所需的 MySQL 连接器插件。你可以认为它跟我们在DebeziumListener类中实现的逻辑差不多,只是被作为一个 Kafka Connect 元素,可以在项目中重用。我们必须为 Kafka Connect 和连接器插件创建一个容器镜像,因为 Debezium 没有为各种可能的 Kafka 与数据的组合提供“官方”镜像。对于本例,我们使用 Kafka 3.2.0 的 MySQL 连接器创建一个容器镜像。
本文中 MySQL 连接器的容器镜像可以在quay.io/lordofthejars/debezium-connector-mysql:1.9.4找到,如果你对它的构建过程感到好奇,可以查看位于这个GitHub存储库中的 Dockerfile 文件。
为了部署 Debezium Kafka Connect,我们将使用 Strimzi 提供的KafkaConnect,因为它简化了整个过程。在这个 Kubernetes 资源文件中,我们指定了 Kafka 版本、Kafka 集群的位置(my-cluster-kafka-bootstrap:9092)、容器镜像(quay.io/lordofthejars/debezin-connector-mysql:1.9.4),以及一些特定的配置参数。
创建一个名为 debezium-kafka-connect.yaml 的文件,内容如下:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.0
image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.storage.topic: connect-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-configs
config.storage.replication.factor: 1
status.storage.topic: connect-status
status.storage.replication.factor: 1
复制代码
然后在终端窗口中应用这个资源:
kubectl apply -f debezium-kafka-connect.yaml -n kafka
复制代码
并通过运行以下命令验证它是否被正确部署:
kubectl get pods -n kafka
debezium-connect-cluster-connect-546c8695c-lszn7 1/1 Running 0 91m
复制代码
请记住,这个过程可能需要几分钟的准备时间。
Kafka Connect 组件现在连接到了 Kafka 集群,最后一步是通过配置让它监听 MySQL 实例的数据变更。
为此,我们将使用 Strimzi 提供的 KafkaConnector。这有点类似于我们在 DebeziumConfiguration 类中所做的那样,提供 database.hostname 或 table.include.list 之类的参数。此外,我们还要将 strimzi.io/cluster 的值设置为上一个 YAML 文件中指定的 KafkaConnect 名称(debezum-connect-cluster)。
创建一个名为 debezium-kafka-connector.yaml 的文件,内容如下:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: alex
database.server.id: 184054
database.server.name: mysql
database.include.list: moviesdb
database.allowPublicKeyRetrieval: true
table.include.list: moviesdb.OutboxEvent
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.movies
复制代码
通过应用资源来配置 Debezium Connector:
kubectl apply -f debezium-kafka-connector.yaml -n kafka
复制代码
为了验证一切工作正常,我们添加一条新的电影数据记录,并验证将新记录插入数据库时 Kafka 主题中会产生一个新事件。
获取新服务的端口,IP 仍然是相同的:
kubectl get services -n kafka
movie-plays-producer-debezium-server LoadBalancer 10.100.117.203 <pending> 80:30307/TCP 67m
curl -X 'POST' \
'http://192.168.59.104:30307/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
复制代码
使用 kafka-console-consumer.sh 脚本验证插入的数据:
kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
复制代码
然后在容器中运行脚本。注意,Debezium 连接器将事件发送到一个 Kafka 主题,名称是这样的<database.server.name>.<database.inlude.list>.<table>,在这个示例中是 mysql.moviesdb.OutboxEvent。
./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092
{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}
复制代码
before 字段是空的,因为是插入操作,所以没有前值,但是在 after 字段中有电影信息数据。
结论
我们已经将应用程序从本地迁移到了 Kubernetes 集群中。
Strimzi 为我们提供了在 Kubernetes 中部署和管理 Apache Kafka 集群的一个关键元素。我们可以使用 Kubernetes 资源文件安装和管理集群,采用 GitOps 的方式来管理 Kafka。
Debezium Embedded 适用于一些场景,比如在检测数据变更时使用的临时逻辑。不过,在其他项目中(特别是在遗留项目或需要高可伸缩性和容错性的项目),Debezium Server 可能更合适。
有了 Strimzi、Jib 和 Kubernetes Quarkus 扩展,从本地转移到 Kubernetes 集群应该并不难。
本文的源代码可以在GitHub上找到。
作者简介
Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化领域充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning 出版)和《Quarkus Cookbook》(O'Reilly 出版)的合著者,也是多个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL 大学的国际演讲师和教师。你可以在 Twitter 上关注他(Alex Soto),继续关注 Kubernetes 和 Java 世界的动态。
原文链接:
https://www.infoq.com/articles/strimzi-the-gitops-way/
相关阅读:
本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息
本系列第二部分:Kafka Streams 与 Quarkus:实时处理事件
本系列第三部分:Debezium 和 Quarkus:通过 CDC 模式来避免双重写入
评论