2025 年技术指引:让真实案例和经验为开发者开路 了解详情
写点什么

使用 Strimzi 将 Kafka 和 Debezium 迁移到 Kubernetes

  • 2022-10-12
    北京
  • 本文字数:14536 字

    阅读完需:约 48 分钟

使用Strimzi将Kafka和Debezium迁移到Kubernetes

在本系列文章的第1部分第2部分中,我们学习了Apache Kafka、Apache Kafka Streams 和Quarkus之间的集成。我们开发了一个简单的应用程序,向 Kafka 主题生成事件,并使用Kafka Streams实时消费和处理它们。


在那个例子中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,并在另一个 Kafka 主题中保存用户停止观看电影时的事件,并捕获影片播放的时间。我们实时对这些事件进行后期处理,计算电影播放超过 10 分钟的次数。


下图是这个应用程序的架构。


然后,在第3部分中,我们介绍了发件箱模式和 Debezium,用于避免在不同系统需要同步相同数据时发生的双写问题。


在前面的三篇文章中,我们已经从开发人员的角度学习了所有这些技术,并最终在开发人员的本地机器上(以开发模式)部署应用程序。


在本文中,我们将探讨如何将所有东西部署到生产环境,更具体地说,部署到 Kubernetes 中。我们将学习:


  • 在 Kubernetes 中安装和管理 Apache Kafka 集群。

  • 容器化 Quarkus 应用程序。

  • 配置一个带有生产参数的 Quarkus 应用程序。

  • 将 Debezium Embedded 迁移成 Debezium Server。

Kubernetes


Kubernetes 是一个开源的容器编配器,是部署微服务的事实上的平台。这些服务既可以在裸金属环境中运行,也可以在云环境中运行。


本文使用minikube作为 Kubernetes 集群,但同样的步骤应该适用于任何其他实现。

启动集群

在终端窗口中执行以下命令,在配备了 8GB 内存和 2 个 vCPU 的 VirtualBox 机器上启动集群。


minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096
[strimzi] minikube v1.24.0 on Darwin 12.5 minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1 To disable this notice, run: 'minikube config set WantUpdateNotification false'
✨ Using the virtualbox driver based on user configuration Starting control plane node strimzi in cluster strimzi Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ... > kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s > kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s > kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s
▪ Generating certificates and keys ... ▪ Booting up control plane ... ▪ Configuring RBAC rules ... ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5 Verifying Kubernetes components... Enabled addons: storage-provisioner, default-storageclass
❗ /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12. ▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A' Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default
复制代码


在终端窗口执行下面的命令检查 Kubernetes 集群是否正常运行。


kubectl get nodes
NAME STATUS ROLES AGE VERSIONstrimzi Ready control-plane,master 3m4s v1.22.12
kubectl get podsNo resources found in default namespace.
复制代码

Apache Kafka


在之前的文章中,我们通过 Quarkus 的开发模式来启动运行应用程序所需的外部依赖项(Kafka 集群和 MySQL 数据库)。从开发的角度来看,开发模式非常棒,但在部署到生产环境时,你会发现这些东西管理起来更加复杂。第一个障碍可能是在 Kubernetes 中安装和配置 Kafka 集群。


你可能想知道以下这些问题的答案:


  • Kafka 组件(Kafka、Zookeeper 等)需要使用哪个容器镜像?

  • 如何在 Kubernetes 中轻松部署所有这些组件?

  • 如何在 Kubernetes 中创建用户、主题或 HA?

  • 安全性如何?你可以尝试手动完成所有这些事情,例如编写很长的 YAML 文件和使用 Kafka CI 工具配置 Kafka 组件。然而,还有另一种 Kubernetes 原生的、完全自动化和可复制的(非常适合 CI/CD)方法,就是使用 Strimzi。

Strimzi

Strimzi是一个Kubernetes Operator,通过控制器来创建、配置和保护 Kafka 集群,就像其他 Kubernetes 资源(如 Pod、Deployment、ConfigMap 等)一样。


Strimzi 项目包含三个 Operator——一个用于管理 Kafka 集群,一个用于管理主题,一个用于用户管理。


在 Kubernetes 集群中安装了 Strimzi Operator 之后,你只需要使用下面的 YAML 文件就可以启动并运行一个 Kafka 集群,其中包含了一个 Kafka 副本和三个使用临时存储(没有挂载持久卷)的 ZooKeeper 副本。


apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false     - name: tls       port: 9093       type: internal       tls: true   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 3   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码


接下来,我们将在已经启动的集群中安装 Strimzi。

安装 Strimzi

首先是创建一个命名空间来安装 Strimzi Operator。在本例中,我们使用了命名空间 kafka。在终端窗口中执行如下命令:


kubectl create namespace kafkanamespace/kafka created
复制代码


接下来,我们应用 Strimzi 安装文件,其中包括用于声明式管理 Kafka 集群、Kafka 主题和用户的 CRD(CustomerResourceDefinition)。


kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码


运行下面的命令验证 Operator 是否安装正确。


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEstrimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
复制代码


现在,我们开始创建带有 movies 主题的 Kafka 集群。我们将在这个主题中保存所有电影的信息,稍后 Kafka Streams 将消费这个主题,正如我们在本系列文章的第2部分中所看到的那样。

创建 Kafka 集群

创建一个新的文件(即 kafka.yaml)来安装一个带有一个副本的Kafka集群,不启用 TLS,作为内部Kubernetes服务


apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 1   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码


然后在终端窗口中使用 kubectl 命令创建这个资源:


kubectl create -f kafka.yaml -n kafkakafka.kafka.strimzi.io/my-cluster created
复制代码


此时,Strimzi 开始在默认命名空间中安装 Kafka 集群。


现在,我们通过获取默认的名称空间 Pod 来检查集群的创建情况。


kubectl get pods -n kafka
NAME READY STATUS my-cluster-entity-operator-755596449b-cw82g 3/3 Running my-cluster-kafka-0 1/1 Running my-cluster-zookeeper-0 1/1 Running
复制代码


Kafka 集群已启动并运行。我们除了可以将 Kafka 作为 Kubernetes 资源安装之外,还可以查询和描述它。例如,在终端窗口中执行如下命令:


kubectl get kafka -n kafka
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGSmy-cluster 1 1 True True


kubectl describe kafka my-cluster -n kafka
Name: my-clusterNamespace: defaultLabels: <none>Annotations: <none>API Version: kafka.strimzi.io/v1beta2Kind: KafkaMetadata: Creation Timestamp: 2022-08-09T10:57:39Z
复制代码


当然,你也可以像删除其他 Kubernetes 资源一样删除它。此外,系统还创建了 4 个 Kubernetes 服务来访问 Kafka 集群:


kubectl get services -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE 143mmy-cluster-kafka-bootstrap ClusterIP 172.30.77.150 <none> 9091/TCP,9092/TCP 21mmy-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 21mmy-cluster-zookeeper-client ClusterIP 172.30.5.186 <none> 2181/TCP 21mmy-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 21m
复制代码


应用程序用于访问集群的服务是 my-cluster-kafka-bootstrap,它公开了 Kafka 的 9092 端口。


在进入到应用程序部分之前,我们需要使用另一个 YAML 文件来创建和配置 movies 主题。

创建 movies 主题

Strimzi 有一个用于创建和管理主题的 Operator。要创建一个新主题,我们需要创建一个KafkaTopic类型的 Kubernetes 资源文件,在 strimzi.io/cluster 中指定主题的名称和集群的名称(在我们的例子中是 my-cluster)。我们使用下面的内容创建一个名为 movies-topic.yaml 的新文件。


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaTopicmetadata: name: movies labels:   strimzi.io/cluster: my-clusterspec: partitions: 1 replicas: 1 config:   retention.ms: 7200000   segment.bytes: 1073741824
复制代码


并应用这个文件:


kubectl apply -f movies-topic.yaml -n kafkakafkatopic.kafka.strimzi.io/movies create
复制代码


和其他 Kubernetes 资源一样,我们也可以查询和描述它。


kubectl get kafkatopic -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READYconsumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 Truemovies my-cluster 1 1 Truestrimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 Truestrimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
复制代码


描述已创建的主题:


kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码


我们来检查一下创建的主题是否有端口转发


在终端窗口执行如下命令:


kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码


打开一个新的终端窗口,使用kcat工具列出 Kafka 集群的元素。我们可以使用 localhost 作为主机名,就像在上一步中使用端口转发技巧一样。


kcat -b localhost:9092 -L
Metadata for all topics (from broker -1: localhost:9092/bootstrap): 1 brokers: broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller) 4 topics: topic "movies" with 1 partitions: partition 0, leader 0, replicas: 0, isrs: 0
复制代码


最后,我们停止端口转发进程,对项目进行容器化,就像我们在本系列文章的第3部分中所做的那样,并进行一些相应的配置,以便连接到 Kafka 集群。

生产者 Debezium


我们在系列文章的第3部分解决了双写问题,使用 Debezium(具体来说是Debezium Embedded)修复了这个问题,具体方法是监听来自 MySQL 服务器的事务日志,并在每次插入新的电影播放信息时生成带有数据的 Kafka 事件。你可以在本地机器上运行这个示例,使用开发服务启动所需的服务(MySQL 和 Kafka),并自动配置应用程序来连接它们。


现在有点不一样了——服务必须运行在 Kubernetes 集群中,包括在前面步骤中创建的 Kafka 集群和 MySQL 数据库。要让它在 Kubernetes 中运行,需要做出三个改变。


  • 使用新的 Kafka 和 MySQL 参数(主机名、端口、用户名和密码)来配置服务。

  • 将应用程序装入容器,并推送到容器注册表。

  • 创建 Kubernetes 资源文件,用于部署服务。

配置服务


首先要配置的是 Kafka 的主机名和端口,它们指向 Strimzi 创建的 Kubernetes 服务。打开 src/main/resources/application.properties 文件并添加下面的内容:


%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码


%prod 前缀表示这个属性仅在应用程序以 prod 模式下运行时使用(而不是在 dev 或 test 模式下)。


其次时配置插入影片信息的数据库连接。在 application.properties 文件中添加下面的内容。


quarkus.hibernate-orm.database.generation=drop-and-create%prod.quarkus.datasource.username=alex%prod.quarkus.datasource.password=alex%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb
复制代码


稍后,我们将使用这些参数部署一个 MySQL 实例。现在,我们假设配置参数是正确的。

容器化

Quarkus 为创建容器提供了与Jib项目的集成,让容器镜像的构建和推送简单得只需要执行一个 Maven/Gradle 任务。


打开 pom.xml 文件,在 dependencies 部分添加以下依赖项:


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-container-image-jib</artifactId></dependency>
复制代码


添加了Jib依赖项后,它将在打包时自动将应用程序装入容器。因为Jib的一些默认配置选项可能不适用于所有情况,所以你可以在 src/main/resources/application.properties 中覆盖它们。对于本例,我们将覆盖生成的容器镜像的 group 和容器注册中心的主机。


打开 application.properties 文件,并添加下面的内容:


# Substitue the value with your account namequarkus.container-image.group=lordofthejars # Defaults to Docker.io, overriden to Quay.quarkus.container-image.registry=quay.io
复制代码


你需要设置容器注册表的凭据,以便向注册表推送容器。你可以在执行构建之前运行 docker 的 login 命令。Maven 将从那里读取凭据,或者你可以使用 quarkus.container-image.username 和 quarkus.container-image.password 属性。


在项目的根目录下运行下面的命令来构建应用程序,它将构建出一个容器并将其推到指定的容器注册表中。


./mvnw clean package -DskipTests -Dquarkus.container-image.push=true
[INFO] Scanning for projects...[INFO][INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT[INFO] --------------------------------[ jar ]---------------------------------[INFO][INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar][INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)
复制代码


从最后一行日志可以看到,容器被创建,并使用 application.properties 中指定的账号推送到注册中心。

Kubernetes

在将容器推送到注册表之后,我们准备将服务部署到 Kubernetes 中。我们可以手动创建 Kubernetes 资源文件,但没有必要这么做,因为 Quarkus 为我们提供了一个Kubernetes扩展


打开 pom.xml 文件,并在 dependencies 部分添加下面的依赖项。


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-kubernetes</artifactId></dependency>
复制代码


每次 Maven 打包应用程序时都会注册 Kubernetes 扩展,并生成将应用程序部署到 Kubernetes 集群的 kubernetes.yml 文件。你可以通过 application.properties 来修改生成文件的内容。例如,我们将 Kubernetes Service 设置为 LoadBalancer 而不是 ClusterIP,并将命名空间设置为 kafka。


打开 application.properties 文件并添加下面的内容。


quarkus.kubernetes.service-type=load-balancerquarkus.kubernetes.namespace=kafka
复制代码


修改好以后运行 Maven package 生成部署文件。


./mvnw clean package -DskipTests
[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------
复制代码


检查生成的文件 target/kubernetes/kubernetes.yml:


cat target/kubernetes/kubernetes.yml
复制代码


输出的内容应该类似于下面这样:


---apiVersion: v1kind: Servicemetadata: name: movie-plays-producer-debeziumspec: ports:   - name: http     port: 80     targetPort: 8080 selector:   app.kubernetes.io/name: movie-plays-producer-debezium   app.kubernetes.io/version: 1.0.0-SNAPSHOT # Type is LoadBalancer as set in the application.properties file  type: LoadBalancer---apiVersion: apps/v1kind: Deploymentmetadata: name: movie-plays-producer-debeziumspec: replicas: 1 selector:   matchLabels:     app.kubernetes.io/name: movie-plays-producer-debezium     app.kubernetes.io/version: 1.0.0-SNAPSHOT template:   metadata:   spec:     containers:       - env:           - name: KUBERNETES_NAMESPACE             valueFrom:               fieldRef:                 fieldPath: metadata.namespace         # The image is correctly set automatically         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT         imagePullPolicy: Always         name: movie-plays-producer-debezium         ports:           - containerPort: 8080             name: http             protocol: TCP
复制代码


在本例中,配置参数是硬编码在 application.properties 中的,但你可以将它们作为环境变量传递进去。要在 Kubernetes Deployment 对象中设置环境变量,比如覆盖 Kafka 的配置,可以添加下面的行:


quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092 
复制代码


生成文件的 env 部分将包含这个新的环境变量:


containers:       - env:           - name: KAFKA_BOOTSTRAP_SERVERS             value: my-new-cluster:9092         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
复制代码

全部放到一起

我们已经使用 Strimzi 在 Kubernetes 集群中部署了一个 Kafka 集群。我们将应用下面的文件(mysql-deployment.yaml)和 application.properties 中配置的参数部署 MySQL 实例。


apiVersion: v1kind: Servicemetadata: name: mysql labels:   app: mysqlspec: ports:   - port: 3306 selector:   app: mysql clusterIP: None---apiVersion: apps/v1kind: Deploymentmetadata: name: mysql labels:   app: mysqlspec: selector:   matchLabels:     app: mysql strategy:   type: Recreate template:   metadata:     labels:       app: mysql   spec:     containers:     - image: mysql:8.0.30       name: mysql       env:       - name: MYSQL_ROOT_PASSWORD         value: alex       - name: MYSQL_DATABASE         value: moviesdb       - name: MYSQL_USER         value: alex       - name: MYSQL_PASSWORD         value: alex       ports:       - containerPort: 3306         name: mysql
复制代码


将 MySQL 实例部署到 Kubernetes 集群:


kubectl apply -f mysql-deployment.yaml -n kafka
复制代码


最后要部署的是应用程序本身。我们有两个选择,第一个是直接应用资源:


kubectl apply -f target/kubernetes/kubernetes.yml -n kafka
复制代码


第二个是将 quarkus.kubernetes.deploy 标志设置为 true 来打包应用程序。当这个标志设置为 true 时,Maven 将:


  1. 创建应用程序 JAR 文件。

  2. 构建容器镜像。

  3. 将容器镜像推送到注册表中。

  4. 自动应用 kubernetes.yml 资源文件到已连接的 Kubernetes 集群。为了验证所有的东西都能正确地运行,我们将发送一个插入新电影信息的请求,并验证在 Kafka 主题中插入的新事件。


在终端窗口中执行以下命令获取访问服务的 IP 和端口。


获取访问服务的 IP:


minikube ip -p strimzi
192.168.59.104
复制代码


获取 movie-plays-producer-debezium 的公开端口,也就是第二个端口。


kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:30306/TCP 67m
复制代码


运行 curl 命令,插入一条新的电影信息记录。


curl -X 'POST' \  'http://192.168.59.104:30306/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码


检查 Quarkus 日志,查看数据库运行的 SQL 语句:


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEmovie-plays-producer-debezium-56f644cb87-5cchk 1/1 Running 0 6m5smy-cluster-entity-operator-755596449b-cw82g 3/3 Running 0 35hmy-cluster-kafka-0 1/1 Running 0 35hmy-cluster-zookeeper-0 1/1 Running 0 35h
复制代码


打印 movie-plays-producer-debezium 的日志:


kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka
2022-08-11 07:44:25,658 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru:)Hibernate: select next_val as id_val from hibernate_sequence for update
Hibernate: update hibernate_sequence set next_val= ? where next_val=?Hibernate: insert into Movie (director, genre, name, id) values (?, ?, ?, ?)Hibernate: insert into OutboxEvent (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id) values (?, ?, ?, ?, ?, ?, ?)
# Debezium reacts to the change2022-08-11 07:44:25,867 INFO [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}Movie Created and Reacting
复制代码


你还可以使用 Kafka 容器里的 Kafka-console-consumer.sh 脚本来检查 Kafka 中的内容。进入容器并运行下面的命令:


kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
复制代码


要返回本地终端窗口,请按 Ctrl+C 停止 kafka-console-consumer 进程,然后执行 exit 命令。


到目前为止,一切都很顺利。我们已经得到了与本系列文章第 3 部分中相同的应用程序,只是现在它运行在 Kubernetes 集群中。


到目前为止,我们使用的是 Debezium Embedded,但其实我们可以使用 Debezium Server。

Debezium Server


Debezium Server是一个可配置的、使用就绪的应用程序,它将事件从源数据库流到消息传递系统中,如 Kafka。它可以被注册成一个Kafka Connect组件,作为源连接器。

虽然我们不能在所有的场景中都使用 Debezium Server,但在我看来,使用这种方法有两个大的优点:


  • 你可以获得 Kafka 连接器的所有好处(容错、可扩展、可重用等)。

  • 因为它是一个外部组件,所以不需要更改应用程序代码,也不需要 Debezium Embedded 相关的代码或依赖项。因此,任何应用程序都可以在不做出修改或重新部署的情况下开始使用 Debezium。接下来,我们来看看如何从 Debezium Embedded 迁移到 Debezium Server。

移除 Debezium Embedded

首先要做的是删除 Debezium Embedded 相关的依赖项。


打开 pom.xml 文件,删除以下依赖项:


<dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-ddl-parser</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-embedded</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-connector-mysql</artifactId></dependency>
复制代码


下一步是删除所有与 Debezium Embedded 配置和监听器相关的代码。删除这些类文件——DebeziumConfiguration.java、DebeziumListener.java 和 MySqlJdbcParser.java。


因为所有与 Kafka 的交互都是通过 Kafka Connect 组件进行的,不需要 Kafka 代码,所以最后一步是从 pom.xml 中移除以下依赖项:


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId></dependency>
复制代码


application.properties 文件中的这一行不再需要:


%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码


项目中已经没有了 Kafka 或 Debezium Embedded 依赖项。创建一个包含这些最新变更的容器镜像。


在终端窗口执行以下命令,删除之前的部署:


kubectl delete deployment movie-plays-producer-debeziumkubectl delete service movie-plays-producer-debezium
复制代码


要保留带有 Debezium Debezium 的容器镜像,请将 artifactId 更改为 movie-plays-producer-debezium-server。


然后将不带 Debezium 代码的新版本部署到 Kubernetes 集群中,如下所示:


./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
复制代码


运行以下命令验证新部署的服务:


kubectl get pods -n kafkaa
NAME READY STATUS RESTARTS AGEmovie-plays-producer-debezium-server-59db564b74-vhdmf 1/1 Running 0 73m
复制代码

部署 Debezium Kafka Connect

首先,部署一个 Kafka Connect 组件与所需的 MySQL 连接器插件。你可以认为它跟我们在DebeziumListener类中实现的逻辑差不多,只是被作为一个 Kafka Connect 元素,可以在项目中重用。我们必须为 Kafka Connect 和连接器插件创建一个容器镜像,因为 Debezium 没有为各种可能的 Kafka 与数据的组合提供“官方”镜像。对于本例,我们使用 Kafka 3.2.0 的 MySQL 连接器创建一个容器镜像。


本文中 MySQL 连接器的容器镜像可以在quay.io/lordofthejars/debezium-connector-mysql:1.9.4找到,如果你对它的构建过程感到好奇,可以查看位于这个GitHub存储库中的 Dockerfile 文件。


为了部署 Debezium Kafka Connect,我们将使用 Strimzi 提供的KafkaConnect,因为它简化了整个过程。在这个 Kubernetes 资源文件中,我们指定了 Kafka 版本、Kafka 集群的位置(my-cluster-kafka-bootstrap:9092)、容器镜像(quay.io/lordofthejars/debezin-connector-mysql:1.9.4),以及一些特定的配置参数。


创建一个名为 debezium-kafka-connect.yaml 的文件,内容如下:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectmetadata: name: debezium-connect-cluster annotations:   strimzi.io/use-connector-resources: "true"spec: version: 3.2.0 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 config:   group.id: connect-cluster   key.converter: org.apache.kafka.connect.json.JsonConverter   value.converter: org.apache.kafka.connect.json.JsonConverter   key.converter.schemas.enable: false   value.converter.schemas.enable: false   offset.storage.topic: connect-offsets   offset.storage.replication.factor: 1   config.storage.topic: connect-configs   config.storage.replication.factor: 1   status.storage.topic: connect-status   status.storage.replication.factor: 1
复制代码


然后在终端窗口中应用这个资源:


kubectl apply -f debezium-kafka-connect.yaml -n kafka
复制代码


并通过运行以下命令验证它是否被正确部署:


kubectl get pods -n kafka
debezium-connect-cluster-connect-546c8695c-lszn7 1/1 Running 0 91m
复制代码


请记住,这个过程可能需要几分钟的准备时间。


Kafka Connect 组件现在连接到了 Kafka 集群,最后一步是通过配置让它监听 MySQL 实例的数据变更。


为此,我们将使用 Strimzi 提供的 KafkaConnector。这有点类似于我们在 DebeziumConfiguration 类中所做的那样,提供 database.hostname 或 table.include.list 之类的参数。此外,我们还要将 strimzi.io/cluster 的值设置为上一个 YAML 文件中指定的 KafkaConnect 名称(debezum-connect-cluster)。


创建一个名为 debezium-kafka-connector.yaml 的文件,内容如下:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectormetadata: name: debezium-connector-mysql labels:   strimzi.io/cluster: debezium-connect-clusterspec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config:   tasks.max: 1   database.hostname: mysql   database.port: 3306   database.user: root   database.password: alex   database.server.id: 184054   database.server.name: mysql   database.include.list: moviesdb   database.allowPublicKeyRetrieval: true   table.include.list: moviesdb.OutboxEvent   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092   database.history.kafka.topic: schema-changes.movies
复制代码


通过应用资源来配置 Debezium Connector:


kubectl apply -f debezium-kafka-connector.yaml -n kafka
复制代码


为了验证一切工作正常,我们添加一条新的电影数据记录,并验证将新记录插入数据库时 Kafka 主题中会产生一个新事件。


获取新服务的端口,IP 仍然是相同的:


kubectl get services -n kafka
movie-plays-producer-debezium-server LoadBalancer 10.100.117.203 <pending> 80:30307/TCP 67m
curl -X 'POST' \ 'http://192.168.59.104:30307/movie' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "name": "Minions: The Rise of Gru", "director": "Kyle Balda", "genre": "Animation"}'
复制代码


使用 kafka-console-consumer.sh 脚本验证插入的数据:


kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
复制代码


然后在容器中运行脚本。注意,Debezium 连接器将事件发送到一个 Kafka 主题,名称是这样的<database.server.name>.<database.inlude.list>.<table>,在这个示例中是 mysql.moviesdb.OutboxEvent。


./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092
{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}
复制代码


before 字段是空的,因为是插入操作,所以没有前值,但是在 after 字段中有电影信息数据。

结论


我们已经将应用程序从本地迁移到了 Kubernetes 集群中。


Strimzi 为我们提供了在 Kubernetes 中部署和管理 Apache Kafka 集群的一个关键元素。我们可以使用 Kubernetes 资源文件安装和管理集群,采用 GitOps 的方式来管理 Kafka。


Debezium Embedded 适用于一些场景,比如在检测数据变更时使用的临时逻辑。不过,在其他项目中(特别是在遗留项目或需要高可伸缩性和容错性的项目),Debezium Server 可能更合适。


有了 Strimzi、Jib 和 Kubernetes Quarkus 扩展,从本地转移到 Kubernetes 集群应该并不难。


本文的源代码可以在GitHub上找到。

作者简介


Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化领域充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning 出版)和《Quarkus Cookbook》(O'Reilly 出版)的合著者,也是多个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL 大学的国际演讲师和教师。你可以在 Twitter 上关注他(Alex Soto),继续关注 Kubernetes 和 Java 世界的动态。


原文链接

https://www.infoq.com/articles/strimzi-the-gitops-way/


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

本系列第二部分:Kafka Streams 与 Quarkus:实时处理事件

本系列第三部分:Debezium 和 Quarkus:通过 CDC 模式来避免双重写入

2022-10-12 08:0010086

评论

发布
暂无评论
发现更多内容

架构师训练营 2 期 - 第 8 周命题作业

Geek_no_one

极客大学架构师训练营

【第十二周】课后作业

云龙

架构师训练营 第八周作业

文江

第十二周作业(作业一)

Geek_83908e

架构师一期

陪你手撕源码系列之 STL set 相关算法

herongwei

c++ 算法 set stl

推进工业互联网和区块链创新发展

CECBC

区块链 互联网

数字人民币红包迎战“双十二” 六大行钱包全接入

CECBC

数字人民币

架构师训练营 1 期第 12 周:数据应用(一)- 作业

piercebn

极客大学架构师训练营

命令行搜索神器fzf

Rayjun

Linux

架构师系列9: 找出单向链表合并节点

桃花原记

《社会中的数据可视化》PDF免费下载

计算机与AI

数据可视化

性能优化练习

Mars

第八周作业

孤星

2020的另一面:5G的斯普特尼克之年

脑极体

玛雅公约软件系统开发|玛雅公约APP开发

系统开发

架构师训练营第八周作业

丁乐洪

一只支持凡尔赛文学创作的摄影手机

脑极体

性能优化总结二

Mars

第十二周作业(作业二)

Geek_83908e

架构师一期

架构师训练营第 1 期 - 第 12 周课后练习

Anyou Liu

极客大学架构师训练营

宝马区块链负责人:我们是如何让区块链技术与汽车产业结合的?

CECBC

大数据

架构师训练营第三周”代码重构“作业

随秋

极客大学架构师训练营

架构之书:雄心与《C++语言的设计与演化》

lidaobing

c++ 架构

架构师训练营 2 期 - 第八周总结

Geek_no_one

极客大学架构师训练营

第八周总结

孤星

FFmpeg使用基础(音视频开发入门)

赖猫

架构师训练营第 1 期第 12 周学习总结

好吃不贵

极客大学架构师训练营

生产环境全链路压测建设历程之七: 淘宝网2012年双十一库存超卖问题的本质

数列科技杨德华

Codurance不太一样

sherlockq

LeetCode题解:22. 括号生成,BFS,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

可能会重塑未来移动支付市场的格局

CECBC

货币

使用Strimzi将Kafka和Debezium迁移到Kubernetes_架构_Alex Soto_InfoQ精选文章