使用 Strimzi 将 Kafka 和 Debezium 迁移到 Kubernetes

  • 2022-10-12
    北京
  • 本文字数:14536 字

    阅读完需:约 48 分钟

在本系列文章的第1部分第2部分中,我们学习了Apache Kafka、Apache Kafka Streams 和Quarkus之间的集成。我们开发了一个简单的应用程序,向 Kafka 主题生成事件,并使用Kafka Streams实时消费和处理它们。

在那个例子中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,并在另一个 Kafka 主题中保存用户停止观看电影时的事件,并捕获影片播放的时间。我们实时对这些事件进行后期处理,计算电影播放超过 10 分钟的次数。

下图是这个应用程序的架构。

然后,在第3部分中,我们介绍了发件箱模式和 Debezium,用于避免在不同系统需要同步相同数据时发生的双写问题。

在前面的三篇文章中,我们已经从开发人员的角度学习了所有这些技术,并最终在开发人员的本地机器上(以开发模式)部署应用程序。

在本文中,我们将探讨如何将所有东西部署到生产环境,更具体地说,部署到 Kubernetes 中。我们将学习:

  • 在 Kubernetes 中安装和管理 Apache Kafka 集群。

  • 容器化 Quarkus 应用程序。

  • 配置一个带有生产参数的 Quarkus 应用程序。

  • 将 Debezium Embedded 迁移成 Debezium Server。

Kubernetes

Kubernetes 是一个开源的容器编配器,是部署微服务的事实上的平台。这些服务既可以在裸金属环境中运行,也可以在云环境中运行。

本文使用minikube作为 Kubernetes 集群,但同样的步骤应该适用于任何其他实现。

启动集群

在终端窗口中执行以下命令,在配备了 8GB 内存和 2 个 vCPU 的 VirtualBox 机器上启动集群。

minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096  [strimzi] minikube v1.24.0 on Darwin 12.5  minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1  To disable this notice, run: 'minikube config set WantUpdateNotification false'Using the virtualbox driver based on user configuration  Starting control plane node strimzi in cluster strimzi  Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ...    > kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s    > kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s    > kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s    > kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s    > kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s    > kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s    ▪ Generating certificates and keys ...    ▪ Booting up control plane ...    ▪ Configuring RBAC rules ...Using image gcr.io/k8s-minikube/storage-provisioner:v5  Verifying Kubernetes components...  Enabled addons: storage-provisioner, default-storageclass❗  /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12.    ▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A'  Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default
复制代码

在终端窗口执行下面的命令检查 Kubernetes 集群是否正常运行。

kubectl get nodesNAME      STATUS   ROLES                  AGE    VERSIONstrimzi   Ready    control-plane,master   3m4s   v1.22.12kubectl get podsNo resources found in default namespace.
复制代码

Apache Kafka

在之前的文章中,我们通过 Quarkus 的开发模式来启动运行应用程序所需的外部依赖项(Kafka 集群和 MySQL 数据库)。从开发的角度来看,开发模式非常棒,但在部署到生产环境时,你会发现这些东西管理起来更加复杂。第一个障碍可能是在 Kubernetes 中安装和配置 Kafka 集群。

你可能想知道以下这些问题的答案:

  • Kafka 组件(Kafka、Zookeeper 等)需要使用哪个容器镜像?

  • 如何在 Kubernetes 中轻松部署所有这些组件?

  • 如何在 Kubernetes 中创建用户、主题或 HA?

  • 安全性如何?你可以尝试手动完成所有这些事情,例如编写很长的 YAML 文件和使用 Kafka CI 工具配置 Kafka 组件。然而,还有另一种 Kubernetes 原生的、完全自动化和可复制的(非常适合 CI/CD)方法,就是使用 Strimzi。

Strimzi

Strimzi是一个Kubernetes Operator,通过控制器来创建、配置和保护 Kafka 集群,就像其他 Kubernetes 资源(如 Pod、Deployment、ConfigMap 等)一样。

Strimzi 项目包含三个 Operator——一个用于管理 Kafka 集群,一个用于管理主题,一个用于用户管理。

在 Kubernetes 集群中安装了 Strimzi Operator 之后,你只需要使用下面的 YAML 文件就可以启动并运行一个 Kafka 集群,其中包含了一个 Kafka 副本和三个使用临时存储(没有挂载持久卷)的 ZooKeeper 副本。

apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false     - name: tls       port: 9093       type: internal       tls: true   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 3   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码

接下来,我们将在已经启动的集群中安装 Strimzi。

安装 Strimzi

首先是创建一个命名空间来安装 Strimzi Operator。在本例中,我们使用了命名空间 kafka。在终端窗口中执行如下命令:

kubectl create namespace kafkanamespace/kafka created
复制代码

接下来,我们应用 Strimzi 安装文件,其中包括用于声明式管理 Kafka 集群、Kafka 主题和用户的 CRD(CustomerResourceDefinition)。

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码

运行下面的命令验证 Operator 是否安装正确。

kubectl get pods -n kafkaNAME                                        READY   STATUS    RESTARTS   AGEstrimzi-cluster-operator-597d67c7d6-ms987   1/1     Running   0          4m27s
复制代码

现在,我们开始创建带有 movies 主题的 Kafka 集群。我们将在这个主题中保存所有电影的信息,稍后 Kafka Streams 将消费这个主题,正如我们在本系列文章的第2部分中所看到的那样。

创建 Kafka 集群

创建一个新的文件(即 kafka.yaml)来安装一个带有一个副本的Kafka集群,不启用 TLS,作为内部Kubernetes服务

apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 1   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码

然后在终端窗口中使用 kubectl 命令创建这个资源:

kubectl create -f kafka.yaml -n kafkakafka.kafka.strimzi.io/my-cluster created
复制代码

此时,Strimzi 开始在默认命名空间中安装 Kafka 集群。

现在,我们通过获取默认的名称空间 Pod 来检查集群的创建情况。

kubectl get pods -n kafkaNAME                                          READY   STATUS    my-cluster-entity-operator-755596449b-cw82g   3/3     Running   my-cluster-kafka-0                            1/1     Running my-cluster-zookeeper-0                        1/1     Running
复制代码

Kafka 集群已启动并运行。我们除了可以将 Kafka 作为 Kubernetes 资源安装之外,还可以查询和描述它。例如,在终端窗口中执行如下命令:

kubectl get kafka -n kafkaNAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGSmy-cluster   1                        1                     True    Truekubectl describe kafka my-cluster -n kafkaName:         my-clusterNamespace:    defaultLabels:       <none>Annotations:  <none>API Version:  kafka.strimzi.io/v1beta2Kind:         KafkaMetadata:  Creation Timestamp:  2022-08-09T10:57:39Z
复制代码

当然,你也可以像删除其他 Kubernetes 资源一样删除它。此外,系统还创建了 4 个 Kubernetes 服务来访问 Kafka 集群:

kubectl get services -n kafkaNAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE               143mmy-cluster-kafka-bootstrap    ClusterIP   172.30.77.150   <none>        9091/TCP,9092/TCP            21mmy-cluster-kafka-brokers      ClusterIP   None            <none>        9090/TCP,9091/TCP,9092/TCP   21mmy-cluster-zookeeper-client   ClusterIP   172.30.5.186    <none>        2181/TCP                     21mmy-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   21m
复制代码

应用程序用于访问集群的服务是 my-cluster-kafka-bootstrap,它公开了 Kafka 的 9092 端口。

在进入到应用程序部分之前,我们需要使用另一个 YAML 文件来创建和配置 movies 主题。

创建 movies 主题

Strimzi 有一个用于创建和管理主题的 Operator。要创建一个新主题,我们需要创建一个KafkaTopic类型的 Kubernetes 资源文件,在 strimzi.io/cluster 中指定主题的名称和集群的名称(在我们的例子中是 my-cluster)。我们使用下面的内容创建一个名为 movies-topic.yaml 的新文件。

apiVersion: kafka.strimzi.io/v1beta2kind: KafkaTopicmetadata: name: movies labels:   strimzi.io/cluster: my-clusterspec: partitions: 1 replicas: 1 config:   retention.ms: 7200000   segment.bytes: 1073741824
复制代码

并应用这个文件:

kubectl apply -f movies-topic.yaml -n kafkakafkatopic.kafka.strimzi.io/movies create
复制代码

和其他 Kubernetes 资源一样,我们也可以查询和描述它。

kubectl get kafkatopic -n kafkaNAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READYconsumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           1                    Truemovies                                                                                             my-cluster   1            1                    Truestrimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            1                    Truestrimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            1                    True
复制代码

描述已创建的主题:

kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码

我们来检查一下创建的主题是否有端口转发

在终端窗口执行如下命令:

kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码

打开一个新的终端窗口,使用kcat工具列出 Kafka 集群的元素。我们可以使用 localhost 作为主机名,就像在上一步中使用端口转发技巧一样。

kcat -b localhost:9092 -LMetadata for all topics (from broker -1: localhost:9092/bootstrap): 1 brokers:  broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller) 4 topics:  topic "movies" with 1 partitions:    partition 0, leader 0, replicas: 0, isrs: 0
复制代码

最后,我们停止端口转发进程,对项目进行容器化,就像我们在本系列文章的第3部分中所做的那样,并进行一些相应的配置,以便连接到 Kafka 集群。

生产者 Debezium

我们在系列文章的第3部分解决了双写问题,使用 Debezium(具体来说是Debezium Embedded)修复了这个问题,具体方法是监听来自 MySQL 服务器的事务日志,并在每次插入新的电影播放信息时生成带有数据的 Kafka 事件。你可以在本地机器上运行这个示例,使用开发服务启动所需的服务(MySQL 和 Kafka),并自动配置应用程序来连接它们。

现在有点不一样了——服务必须运行在 Kubernetes 集群中,包括在前面步骤中创建的 Kafka 集群和 MySQL 数据库。要让它在 Kubernetes 中运行,需要做出三个改变。

  • 使用新的 Kafka 和 MySQL 参数(主机名、端口、用户名和密码)来配置服务。

  • 将应用程序装入容器,并推送到容器注册表。

  • 创建 Kubernetes 资源文件,用于部署服务。

配置服务

首先要配置的是 Kafka 的主机名和端口,它们指向 Strimzi 创建的 Kubernetes 服务。打开 src/main/resources/application.properties 文件并添加下面的内容:

%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码

%prod 前缀表示这个属性仅在应用程序以 prod 模式下运行时使用(而不是在 dev 或 test 模式下)。

其次时配置插入影片信息的数据库连接。在 application.properties 文件中添加下面的内容。

quarkus.hibernate-orm.database.generation=drop-and-create%prod.quarkus.datasource.username=alex%prod.quarkus.datasource.password=alex%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb
复制代码

稍后,我们将使用这些参数部署一个 MySQL 实例。现在,我们假设配置参数是正确的。

容器化

Quarkus 为创建容器提供了与Jib项目的集成,让容器镜像的构建和推送简单得只需要执行一个 Maven/Gradle 任务。

打开 pom.xml 文件,在 dependencies 部分添加以下依赖项:

<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-container-image-jib</artifactId></dependency>
复制代码

添加了Jib依赖项后,它将在打包时自动将应用程序装入容器。因为Jib的一些默认配置选项可能不适用于所有情况,所以你可以在 src/main/resources/application.properties 中覆盖它们。对于本例,我们将覆盖生成的容器镜像的 group 和容器注册中心的主机。

打开 application.properties 文件,并添加下面的内容:

# Substitue the value with your account namequarkus.container-image.group=lordofthejars # Defaults to Docker.io, overriden to Quay.quarkus.container-image.registry=quay.io
复制代码

你需要设置容器注册表的凭据,以便向注册表推送容器。你可以在执行构建之前运行 docker 的 login 命令。Maven 将从那里读取凭据,或者你可以使用 quarkus.container-image.username 和 quarkus.container-image.password 属性。

在项目的根目录下运行下面的命令来构建应用程序,它将构建出一个容器并将其推到指定的容器注册表中。

./mvnw clean package -DskipTests -Dquarkus.container-image.push=true[INFO] Scanning for projects...[INFO][INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT[INFO] --------------------------------[ jar ]---------------------------------[INFO][INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar][INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)
复制代码

从最后一行日志可以看到,容器被创建,并使用 application.properties 中指定的账号推送到注册中心。

Kubernetes

在将容器推送到注册表之后,我们准备将服务部署到 Kubernetes 中。我们可以手动创建 Kubernetes 资源文件,但没有必要这么做,因为 Quarkus 为我们提供了一个Kubernetes扩展

打开 pom.xml 文件,并在 dependencies 部分添加下面的依赖项。

<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-kubernetes</artifactId></dependency>
复制代码

每次 Maven 打包应用程序时都会注册 Kubernetes 扩展,并生成将应用程序部署到 Kubernetes 集群的 kubernetes.yml 文件。你可以通过 application.properties 来修改生成文件的内容。例如,我们将 Kubernetes Service 设置为 LoadBalancer 而不是 ClusterIP,并将命名空间设置为 kafka。

打开 application.properties 文件并添加下面的内容。

quarkus.kubernetes.service-type=load-balancerquarkus.kubernetes.namespace=kafka
复制代码

修改好以后运行 Maven package 生成部署文件。

./mvnw clean package -DskipTests[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------
复制代码

检查生成的文件 target/kubernetes/kubernetes.yml:

cat target/kubernetes/kubernetes.yml
复制代码

输出的内容应该类似于下面这样:

---apiVersion: v1kind: Servicemetadata: name: movie-plays-producer-debeziumspec: ports:   - name: http     port: 80     targetPort: 8080 selector:   app.kubernetes.io/name: movie-plays-producer-debezium   app.kubernetes.io/version: 1.0.0-SNAPSHOT # Type is LoadBalancer as set in the application.properties file  type: LoadBalancer---apiVersion: apps/v1kind: Deploymentmetadata: name: movie-plays-producer-debeziumspec: replicas: 1 selector:   matchLabels:     app.kubernetes.io/name: movie-plays-producer-debezium     app.kubernetes.io/version: 1.0.0-SNAPSHOT template:   metadata:   spec:     containers:       - env:           - name: KUBERNETES_NAMESPACE             valueFrom:               fieldRef:                 fieldPath: metadata.namespace         # The image is correctly set automatically         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT         imagePullPolicy: Always         name: movie-plays-producer-debezium         ports:           - containerPort: 8080             name: http             protocol: TCP
复制代码

在本例中,配置参数是硬编码在 application.properties 中的,但你可以将它们作为环境变量传递进去。要在 Kubernetes Deployment 对象中设置环境变量,比如覆盖 Kafka 的配置,可以添加下面的行:

quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092 
复制代码

生成文件的 env 部分将包含这个新的环境变量:

containers:       - env:           - name: KAFKA_BOOTSTRAP_SERVERS             value: my-new-cluster:9092         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
复制代码

全部放到一起

我们已经使用 Strimzi 在 Kubernetes 集群中部署了一个 Kafka 集群。我们将应用下面的文件(mysql-deployment.yaml)和 application.properties 中配置的参数部署 MySQL 实例。

apiVersion: v1kind: Servicemetadata: name: mysql labels:   app: mysqlspec: ports:   - port: 3306 selector:   app: mysql clusterIP: None---apiVersion: apps/v1kind: Deploymentmetadata: name: mysql labels:   app: mysqlspec: selector:   matchLabels:     app: mysql strategy:   type: Recreate template:   metadata:     labels:       app: mysql   spec:     containers:     - image: mysql:8.0.30       name: mysql       env:       - name: MYSQL_ROOT_PASSWORD         value: alex       - name: MYSQL_DATABASE         value: moviesdb       - name: MYSQL_USER         value: alex       - name: MYSQL_PASSWORD         value: alex       ports:       - containerPort: 3306         name: mysql
复制代码

将 MySQL 实例部署到 Kubernetes 集群:

kubectl apply -f mysql-deployment.yaml -n kafka
复制代码

最后要部署的是应用程序本身。我们有两个选择,第一个是直接应用资源:

kubectl apply -f target/kubernetes/kubernetes.yml -n kafka
复制代码

第二个是将 quarkus.kubernetes.deploy 标志设置为 true 来打包应用程序。当这个标志设置为 true 时,Maven 将:

  1. 创建应用程序 JAR 文件。

  2. 构建容器镜像。

  3. 将容器镜像推送到注册表中。

  4. 自动应用 kubernetes.yml 资源文件到已连接的 Kubernetes 集群。为了验证所有的东西都能正确地运行,我们将发送一个插入新电影信息的请求,并验证在 Kafka 主题中插入的新事件。

在终端窗口中执行以下命令获取访问服务的 IP 和端口。

获取访问服务的 IP:

minikube ip -p strimzi192.168.59.104
复制代码

获取 movie-plays-producer-debezium 的公开端口,也就是第二个端口。

kubectl get services -n kafkamovie-plays-producer-debezium   LoadBalancer   10.100.117.203   <pending>     80:30306/TCP                 67m
复制代码

运行 curl 命令,插入一条新的电影信息记录。

curl -X 'POST' \  'http://192.168.59.104:30306/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码

检查 Quarkus 日志,查看数据库运行的 SQL 语句:

kubectl get pods -n kafkaNAME                                             READY   STATUS      RESTARTS   AGEmovie-plays-producer-debezium-56f644cb87-5cchk   1/1     Running     0          6m5smy-cluster-entity-operator-755596449b-cw82g      3/3     Running     0          35hmy-cluster-kafka-0                               1/1     Running     0          35hmy-cluster-zookeeper-0                           1/1     Running     0          35h
复制代码

打印 movie-plays-producer-debezium 的日志:

kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka2022-08-11 07:44:25,658 INFO  [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru:)Hibernate:    select        next_val as id_val    from        hibernate_sequence for updateHibernate:    update        hibernate_sequence    set        next_val= ?    where        next_val=?Hibernate:    insert    into        Movie        (director, genre, name, id)    values        (?, ?, ?, ?)Hibernate:    insert    into        OutboxEvent        (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)    values        (?, ?, ?, ?, ?, ?, ?)# Debezium reacts to the change2022-08-11 07:44:25,867 INFO  [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}Movie Created and Reacting
复制代码

你还可以使用 Kafka 容器里的 Kafka-console-consumer.sh 脚本来检查 Kafka 中的内容。进入容器并运行下面的命令:

kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
复制代码

要返回本地终端窗口,请按 Ctrl+C 停止 kafka-console-consumer 进程,然后执行 exit 命令。

到目前为止,一切都很顺利。我们已经得到了与本系列文章第 3 部分中相同的应用程序,只是现在它运行在 Kubernetes 集群中。

到目前为止,我们使用的是 Debezium Embedded,但其实我们可以使用 Debezium Server。

Debezium Server

Debezium Server是一个可配置的、使用就绪的应用程序,它将事件从源数据库流到消息传递系统中,如 Kafka。它可以被注册成一个Kafka Connect组件,作为源连接器。

虽然我们不能在所有的场景中都使用 Debezium Server,但在我看来,使用这种方法有两个大的优点:

  • 你可以获得 Kafka 连接器的所有好处(容错、可扩展、可重用等)。

  • 因为它是一个外部组件,所以不需要更改应用程序代码,也不需要 Debezium Embedded 相关的代码或依赖项。因此,任何应用程序都可以在不做出修改或重新部署的情况下开始使用 Debezium。接下来,我们来看看如何从 Debezium Embedded 迁移到 Debezium Server。

移除 Debezium Embedded

首先要做的是删除 Debezium Embedded 相关的依赖项。

打开 pom.xml 文件,删除以下依赖项:

<dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-ddl-parser</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-embedded</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-connector-mysql</artifactId></dependency>
复制代码

下一步是删除所有与 Debezium Embedded 配置和监听器相关的代码。删除这些类文件——DebeziumConfiguration.java、DebeziumListener.java 和 MySqlJdbcParser.java。

因为所有与 Kafka 的交互都是通过 Kafka Connect 组件进行的,不需要 Kafka 代码,所以最后一步是从 pom.xml 中移除以下依赖项:

<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId></dependency>
复制代码

application.properties 文件中的这一行不再需要:

%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码

项目中已经没有了 Kafka 或 Debezium Embedded 依赖项。创建一个包含这些最新变更的容器镜像。

在终端窗口执行以下命令,删除之前的部署:

kubectl delete deployment movie-plays-producer-debeziumkubectl delete service movie-plays-producer-debezium
复制代码

要保留带有 Debezium Debezium 的容器镜像,请将 artifactId 更改为 movie-plays-producer-debezium-server。

然后将不带 Debezium 代码的新版本部署到 Kubernetes 集群中,如下所示:

./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
复制代码

运行以下命令验证新部署的服务:

kubectl get pods -n kafkaaNAME                                                    READY   STATUS    RESTARTS   AGEmovie-plays-producer-debezium-server-59db564b74-vhdmf   1/1     Running   0          73m
复制代码

部署 Debezium Kafka Connect

首先,部署一个 Kafka Connect 组件与所需的 MySQL 连接器插件。你可以认为它跟我们在DebeziumListener类中实现的逻辑差不多,只是被作为一个 Kafka Connect 元素,可以在项目中重用。我们必须为 Kafka Connect 和连接器插件创建一个容器镜像,因为 Debezium 没有为各种可能的 Kafka 与数据的组合提供“官方”镜像。对于本例,我们使用 Kafka 3.2.0 的 MySQL 连接器创建一个容器镜像。

本文中 MySQL 连接器的容器镜像可以在quay.io/lordofthejars/debezium-connector-mysql:1.9.4找到,如果你对它的构建过程感到好奇,可以查看位于这个GitHub存储库中的 Dockerfile 文件。

为了部署 Debezium Kafka Connect,我们将使用 Strimzi 提供的KafkaConnect,因为它简化了整个过程。在这个 Kubernetes 资源文件中,我们指定了 Kafka 版本、Kafka 集群的位置(my-cluster-kafka-bootstrap:9092)、容器镜像(quay.io/lordofthejars/debezin-connector-mysql:1.9.4),以及一些特定的配置参数。

创建一个名为 debezium-kafka-connect.yaml 的文件,内容如下:

apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectmetadata: name: debezium-connect-cluster annotations:   strimzi.io/use-connector-resources: "true"spec: version: 3.2.0 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 config:   group.id: connect-cluster   key.converter: org.apache.kafka.connect.json.JsonConverter   value.converter: org.apache.kafka.connect.json.JsonConverter   key.converter.schemas.enable: false   value.converter.schemas.enable: false   offset.storage.topic: connect-offsets   offset.storage.replication.factor: 1   config.storage.topic: connect-configs   config.storage.replication.factor: 1   status.storage.topic: connect-status   status.storage.replication.factor: 1
复制代码

然后在终端窗口中应用这个资源:

kubectl apply -f debezium-kafka-connect.yaml -n kafka
复制代码

并通过运行以下命令验证它是否被正确部署:

kubectl get pods -n kafkadebezium-connect-cluster-connect-546c8695c-lszn7        1/1     Running   0          91m
复制代码

请记住,这个过程可能需要几分钟的准备时间。

Kafka Connect 组件现在连接到了 Kafka 集群,最后一步是通过配置让它监听 MySQL 实例的数据变更。

为此,我们将使用 Strimzi 提供的 KafkaConnector。这有点类似于我们在 DebeziumConfiguration 类中所做的那样,提供 database.hostname 或 table.include.list 之类的参数。此外,我们还要将 strimzi.io/cluster 的值设置为上一个 YAML 文件中指定的 KafkaConnect 名称(debezum-connect-cluster)。

创建一个名为 debezium-kafka-connector.yaml 的文件,内容如下:

apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectormetadata: name: debezium-connector-mysql labels:   strimzi.io/cluster: debezium-connect-clusterspec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config:   tasks.max: 1   database.hostname: mysql   database.port: 3306   database.user: root   database.password: alex   database.server.id: 184054   database.server.name: mysql   database.include.list: moviesdb   database.allowPublicKeyRetrieval: true   table.include.list: moviesdb.OutboxEvent   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092   database.history.kafka.topic: schema-changes.movies
复制代码

通过应用资源来配置 Debezium Connector:

kubectl apply -f debezium-kafka-connector.yaml -n kafka
复制代码

为了验证一切工作正常,我们添加一条新的电影数据记录,并验证将新记录插入数据库时 Kafka 主题中会产生一个新事件。

获取新服务的端口,IP 仍然是相同的:

kubectl get services -n kafkamovie-plays-producer-debezium-server   LoadBalancer   10.100.117.203   <pending>     80:30307/TCP                 67mcurl -X 'POST' \  'http://192.168.59.104:30307/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码

使用 kafka-console-consumer.sh 脚本验证插入的数据:

kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
复制代码

然后在容器中运行脚本。注意,Debezium 连接器将事件发送到一个 Kafka 主题,名称是这样的..

,在这个示例中是 mysql.moviesdb.OutboxEvent。

./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}
复制代码

before 字段是空的,因为是插入操作,所以没有前值,但是在 after 字段中有电影信息数据。

结论

我们已经将应用程序从本地迁移到了 Kubernetes 集群中。

Strimzi 为我们提供了在 Kubernetes 中部署和管理 Apache Kafka 集群的一个关键元素。我们可以使用 Kubernetes 资源文件安装和管理集群,采用 GitOps 的方式来管理 Kafka。

Debezium Embedded 适用于一些场景,比如在检测数据变更时使用的临时逻辑。不过,在其他项目中(特别是在遗留项目或需要高可伸缩性和容错性的项目),Debezium Server 可能更合适。

有了 Strimzi、Jib 和 Kubernetes Quarkus 扩展,从本地转移到 Kubernetes 集群应该并不难。

本文的源代码可以在GitHub上找到。

作者简介

Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化领域充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning 出版)和《Quarkus Cookbook》(O'Reilly 出版)的合著者,也是多个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL 大学的国际演讲师和教师。你可以在 Twitter 上关注他(Alex Soto),继续关注 Kubernetes 和 Java 世界的动态。

原文链接

https://www.infoq.com/articles/strimzi-the-gitops-way/

相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

本系列第二部分:Kafka Streams 与 Quarkus:实时处理事件

本系列第三部分:Debezium 和 Quarkus:通过 CDC 模式来避免双重写入