在本系列的第3部分,我们学习了双写问题以及如何使用变更数据捕获模式解决这些问题,特别是使用Debezium读取数据库中所做的变更(通过事务日志)并将它们填充到 Kafka 主题中。
在本系列的第4部分,我们将示例又向前推进了一步,将应用程序从本地开发环境部署到 Kubernetes(生产环境)中。我们使用Strimzi来部署和配置 Kafka 和 Debezium。
但总的来说,我们忽略了一个重要的东西——当时我们没有把它简化,但它却非常重要——安全性问题。
如何在不直接将用户名/密码硬编码在部署文件中的情况下保护 MySQL 实例。
如何使用 Strimzi 在 Kafka 集群中添加 authn。
如何配置 Debezium,以便对 Kafka 和 MySQL 实例进行安全身份验证。在本文中,我们将通过保护在上一篇文章中开发的应用程序(使用 Debezium Server 方法)来回答所有这些问题。
Kubernetes
我们需要一个安装了 Strimzi 的 Kubernetes 集群。我们在本系列的第 4 部分中对此进行了介绍,如果你要重用它,需要删除应用程序、MySQL 数据库、Kafka 集群和 Debezium 实例。
重要提示:如果第 4 部分中使用的集群还在,需要执行下面的步骤。如果集群已经被删除,请从介绍如何删除集群的部分之后继续阅读。
在终端窗口执行如下命令来删除它们:
kubectl delete deployment movie-plays-producer-debezium-server -n kafka
kubectl delete service movie-plays-producer-debezium-server -n kafka
kubectl delete -f mysql-deployment.yaml -n kafka
kubectl delete -f debezium-kafka-connector.yaml -n kafka
kubectl delete -f debezium-kafka-connect.yaml -n kafka
kubectl delete -f kafka.yaml -n kafka
复制代码
重要提示:如果你还没有 Kuberntes 集群,则只需要执行下面的步骤。
如果集群已经被销毁,请按照指示创建一个新的集群。在终端窗口中运行以下命令:
minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=12096 --cpus=3
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码
执行下面的命令验证 Operator 是否安装正确:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
复制代码
等待 Operator 运行并准备就绪。
此时,我们可以开始使用身份验证和授权(而不是匿名访问)来安装所有组件。
MySQL
在前一篇文章中,我们部署了 MySQL 实例,将用户名/密码作为环境变量硬编码在部署文件中:
env:
- name: MYSQL_ROOT_PASSWORD
value: alex
- name: MYSQL_DATABASE
value: moviesdb
- name: MYSQL_USER
value: alex
- name: MYSQL_PASSWORD
value: alex
复制代码
我们创建一个 Kubernetes Secret 来存储这些敏感数据。Kubernetes 密钥文件中的数据必须采用 base64 格式编码。alex 的 base64 编码为 YWxleA==。
要生成这个值,执行下面的命令:
echo -n 'alex' | base64
YWxleA==
复制代码
在 mysql-secret.yaml 文件中填入编码的密钥:
apiVersion: v1
kind: Secret
metadata:
name: mysqlsecret
type: Opaque
data:
mysqlrootpassword: YWxleA==
mysqluser: YWxleA==
mysqlpassword: YWxleA==
复制代码
将其应用到集群:
kubectl apply -f mysql-secret.yaml -n kafka
复制代码
然后更新 MySQL 部署文件,使用 value 中的 secretKeyRef 字段读取在上一步中创建的密钥:
apiVersion: v1
kind: Service
metadata:
name: mysql
labels:
app: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
labels:
app: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: mysql:8.0.30
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
key: mysqlrootpassword
name: mysqlsecret
- name: MYSQL_DATABASE
value: moviesdb
- name: MYSQL_USER
valueFrom:
secretKeyRef:
key: mysqluser
name: mysqlsecret
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
key: mysqlpassword
name: mysqlsecret
ports:
- containerPort: 3306
name: mysql
复制代码
在 secretKeyRef 中,我们指定了密钥名称。在本例中,我们在 mysql-secret.yaml 文件中指定的是 mysqlsecret。
将 MySQL 实例部署到 Kubernetes 集群:
kubectl apply -f mysql-deployment.yaml -n kafka
复制代码
我们可以通过导出环境变量来验证注入的密钥是否正确。首先,我们获取 Pod 的名称:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
mysql-7888f99967-4cj47 1/1 Running 0 90s
复制代码
然后在终端窗口中运行下面的命令:
kubectl exec -n kafka -ti mysql-7888f99967-4cj47 /bin/bash
bash-4.4# export
declare -x GOSU_VERSION="1.14"
declare -x HOME="/root"
declare -x HOSTNAME="mysql-7888f99967-4cj47"
declare -x KUBERNETES_PORT="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP_ADDR="10.96.0.1"
declare -x KUBERNETES_PORT_443_TCP_PORT="443"
declare -x KUBERNETES_PORT_443_TCP_PROTO="tcp"
declare -x KUBERNETES_SERVICE_HOST="10.96.0.1"
declare -x KUBERNETES_SERVICE_PORT="443"
declare -x KUBERNETES_SERVICE_PORT_HTTPS="443"
declare -x MYSQL_DATABASE="moviesdb"
declare -x MYSQL_MAJOR="8.0"
declare -x MYSQL_PASSWORD="alex"
declare -x MYSQL_ROOT_PASSWORD="alex"
declare -x MYSQL_SHELL_VERSION="8.0.30-1.el8"
declare -x MYSQL_USER="alex"
declare -x MYSQL_VERSION="8.0.30-1.el8"
declare -x OLDPWD
declare -x PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
declare -x PWD="/"
declare -x SHLVL="1"
declare -x TERM="xterm"
复制代码
现在可以退出容器:
现在,MySQL 数据库的凭证使用的是 Kubernetes Secret 配置,这比在部署文件中硬编码要好得多。应用程序也需要修改,因为它现在需要从 Secret 读取凭证,而不是读取配置文件中的静态凭证。
Move Play Producer Debezium
数据库用户名和密码硬编码在 application.properties 文件中,如果应用程序能够在部署到 Kubernetes 时自动配置用户名和密码,那就更好了。
一种方法是将密钥作为环境变量注入到应用程序 Pod 中,就像部署 MySQL 那样。例如,对于密码,部署文件的 env 部分可能是这样的:
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
key: mysqlpassword
name: mysqlsecret
复制代码
现在更新 application.properties 文件,从环境变量中获取密码:
%prod.quarkus.datasource.password=${mysql-password}
复制代码
虽然这样做可以奏效,但将密钥作为环境变量并不是最安全的做法,因为任何一个可以列出环境变量的人都可以很容易地窃取它们。
虽然这样做可以奏效,但将密钥作为环境变量并不是最安全的做法,因为任何一个可以列出环境变量的人都可以很容易地窃取它们。
Quarkus 有一个kubernetes-config扩展,应用程序可以用它直接从 Kubernetes API 服务器读取 Kubernetes ConfigMaps 和 Secrets。通过这种方式,密钥可以安全地从 Kubernetes 集群传到应用程序中,而不需要任何中间步骤,如将它们作为环境变量传入或作为卷挂载。
Kubernetes 配置扩展
首先要做的是注册 kubernetes-config 扩展。打开 pom.xml 文件,并添加以下依赖项:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-config</artifactId>
</dependency>
复制代码
然后,让应用程序直接从 Kubernetes API 读取 Kubernetes Secrets(在我们的例子中,Secret 的名字是 mysqlsecret)。
打开 src/main/resources/application.properties,加入下面的内容:
%prod.quarkus.kubernetes-config.secrets.enabled=true
quarkus.kubernetes-config.secrets=mysqlsecret
复制代码
然后更新 quarku.datasource.username 和 quarku.datasource.password 属性,读取 mysqlsecret Secret 中的 mysqluser 和 mysqlpassword。
在 application. properties 文件中更新这些属性:
%prod.quarkus.datasource.username=${mysqluser}
%prod.quarkus.datasource.password=${mysqlpassword}
复制代码
这两个属性分别使用 mysqlsecret Secret 中的值进行了赋值。
由于读取 Kubernetes Secrets 需要与 Kubernetes API Server 发生交互,因此,当集群启用了 RBAC(基于角色的访问控制)时,用于运行应用程序的 ServiceAccount 必须具有适当的访问权限。
这两个属性分别使用 mysqlsecret Secret 中的值进行了赋值。
由于读取 Kubernetes Secrets 需要与 Kubernetes API Server 发生交互,因此,当集群启用了 RBAC(基于角色的访问控制)时,用于运行应用程序的 ServiceAccount 必须具有适当的访问权限。
因为我们在前一篇文章中注册了Kubernetes扩展,所以自动生成了所有必要的 Kubernetes 资源,所以现在不需要做任何事情。
现在在终端窗口运行下面的命令部署应用程序:
./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
…
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms
复制代码
为了验证部署是否正确,我们检查 Pod 的日志,确保没有出现错误,并且 SQL 语句执行是正确的:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx 1/1 Running 0 44s
kubectl logs movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx -n kafka
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-08-21 21:00:41,277 INFO [io.deb.out.qua.int.AdditionalJaxbMappingProducerImpl] (main) Contributed XML mapping for entity: io.debezium.outbox.quarkus.internal.OutboxEvent
…
Hibernate:
create table Movie (
id bigint not null,
director varchar(255),
genre varchar(255),
name varchar(255),
primary key (id)
) engine=InnoDB
Hibernate:
create table OutboxEvent (
id binary(255) not null,
aggregatetype varchar(255) not null,
aggregateid varchar(255) not null,
type varchar(255) not null,
timestamp datetime(6) not null,
payload varchar(8000),
tracingspancontext varchar(256),
primary key (id)
) engine=InnoDB
复制代码
在下图中可以看到我们做了安全性保护的部分。
现在,应用程序正在运行中,MySQL 凭证也被保护起来了,下面我们继续为 Kafka 和 Debezium 提供保护。
Kafka
到目前为止,我们已经部署了一个开放的 Kafka 集群,没有启用身份验证或授权逻辑。
Strimzi 支持使用以下认证机制来部署 Kafka 集群:
Strimzi 可以在 listeners 中设置监听器,使用 mTLS 作为通信协议(tls=true)和认证方法类型(authentication 字段)。
创建一个叫作 kafka.yaml 的新文件,使用下面的内容来配置一个安全的 Kafka:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: demo
port: 9092
type: internal
tls: false
- name: secure
port: 9093
type: internal
tls: true
authentication:
type: tls
authorization:
type: simple
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.2"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
复制代码
将其应用到 Kubernetes 集群:
kubectl apply -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created
复制代码
现在我们来验证 Kafka 集群已启动并在运行当中:
kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-d4db5ff58-rt96n 3/3 Running 0 2m26s
my-cluster-kafka-0 1/1 Running 0 2m58s
my-cluster-zookeeper-0 1/1 Running 0 3m31s
复制代码
由于我们将监听器设置为使用 TLS,所以 Strimzi 已经自动创建了一个 Kubernetes Secret,其中包含集群证书、pkcs12 信任存储和相关的密码。
kubectl get secrets -n kafka
my-cluster-clients-ca Opaque 1 9m14s
my-cluster-clients-ca-cert Opaque 3 9m14s
my-cluster-cluster-ca Opaque 1 9m14s
my-cluster-cluster-ca-cert Opaque 3 9m14s
my-cluster-cluster-operator-certs Opaque 4 9m14s
my-cluster-entity-operator-dockercfg-5wwb5 kubernetes.io/dockercfg 1 8m9s
my-cluster-entity-operator-token-h9xkq kubernetes.io/service-account-token 4 8m9s
my-cluster-entity-operator-token-npvfc kubernetes.io/service-account-token 4 8m9s
my-cluster-entity-topic-operator-certs Opaque 4 8m9s
my-cluster-entity-user-operator-certs Opaque 4 8m8s
my-cluster-kafka-brokers Opaque 4 8m41s
my-cluster-kafka-dockercfg-fgpx2 kubernetes.io/dockercfg 1 8m41s
my-cluster-kafka-token-2x7s8 kubernetes.io/service-account-token 4 8m41s
my-cluster-kafka-token-6qdgk kubernetes.io/service-account-token 4 8m41s
my-cluster-zookeeper-dockercfg-p296g kubernetes.io/dockercfg 1 9m13s
my-cluster-zookeeper-nodes Opaque 4 9m13s
my-cluster-zookeeper-token-dp9sc kubernetes.io/service-account-token 4 9m13s
my-cluster-zookeeper-token-gbrxg kubernetes.io/service-account-token 4 9m13s
复制代码
这里最为重要的是<clustername>-cluster-ca-cert(在本例中是 my-cluster-cluster-ca-cert)这个密钥。
在终端窗口中运行下面的命令列出密钥的内容:
kubectl get secret my-cluster-cluster-ca-cert -o yaml -n kafka
apiVersion: v1
data:
ca.crt: LS0tLS1CRUdJTiBDRVJU
ca.p12: MIIGkwIBAzCCBk==
ca.password: azJjY2tIMEs1c091
kind: Secret
metadata:
annotations:
strimzi.io/ca-cert-generation: "0"
creationTimestamp: "2022-08-21T19:32:55Z"
labels:
app.kubernetes.io/instance: my-cluster
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: strimzi
app.kubernetes.io/part-of: strimzi-my-cluster
strimzi.io/cluster: my-cluster
strimzi.io/kind: Kafka
strimzi.io/name: strimzi
name: my-cluster-cluster-ca-cert
namespace: kafka
ownerReferences:
- apiVersion: kafka.strimzi.io/v1beta2
blockOwnerDeletion: false
controller: false
kind: Kafka
name: my-cluster
uid: 23c84dfb-bb33-47ed-bd41-b4e87e0a4c3a
resourceVersion: "49424"
uid: 6c2679a8-216f-421b-880a-de0e6a0879fa
type: Opaque
复制代码
我们来创建一个 mTLS 授权的用户。
安全和 Debezium
Kafka 已经被保护起来了,现在我们来创建一个KafkaUser资源,将授权角色赋给使用 mTLS 模式为用户进行身份验证的组和主题。
创建一个叫作 kafka-user-connect-all-topics.yaml 的文件,包含以下内容:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-connect
namespace: kafka
labels:
# Cluster name set previously
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
type: group
name: outbox-viewer
operation: Read
- resource:
type: group
name: outbox-viewer
operation: Describe
- resource:
type: group
name: mysql-dbhistory
operation: Read
- resource:
type: group
name: mysql-dbhistory
operation: Describe
- resource:
type: group
name: connect-cluster
operation: Read
- resource:
type: group
name: connect-cluster
operation: Describe
- resource:
type: topic
name: connect-cluster-configs
operation: Read
- resource:
type: topic
name: connect-cluster-configs
operation: Describe
- resource:
type: topic
name: connect-cluster-configs
operation: Write
- resource:
type: topic
name: connect-cluster-configs
operation: Create
- resource:
type: topic
name: connect-cluster-status
operation: Read
- resource:
type: topic
name: connect-cluster-status
operation: Describe
- resource:
type: topic
name: connect-cluster-status
operation: Write
- resource:
type: topic
name: connect-cluster-status
operation: Create
- resource:
type: topic
name: connect-cluster-offsets
operation: Read
- resource:
type: topic
name: connect-cluster-offsets
operation: Write
- resource:
type: topic
name: connect-cluster-offsets
operation: Describe
- resource:
type: topic
name: connect-cluster-offsets
operation: Create
- resource:
type: group
name: connect-cluster
operation: Read
# Debezium topics
- resource:
type: topic
name: "*"
operation: Read
- resource:
type: topic
name: "*"
operation: Describe
- resource:
type: topic
name: "*"
operation: Write
- resource:
type: topic
name: "*"
operation: Create
复制代码
在终端窗口中应用这个资源:
kubectl apply -f kafka-user-connect-all-topics.yaml -n kafka
kafkauser.kafka.strimzi.io/my-connect created
复制代码
在注册了这个 Kafka 用户后,Strimzi 创建了一个与 KafkaUser 资源(my-connect)同名的新密钥,并使用 pkcs12 密钥存储库保存客户端的私钥和访问它的密码。
kubectl get secret my-connect -n kafka -o yaml
apiVersion: v1
data:
ca.crt: LS0tLS1CK
user.crt: LS0tLS1CRUdJTiB==
user.key: LS0tLS1CRUdJTiBQUklWQVRK
user.p12: MIILNAIBAzCAA==
user.password: UUR4Nk5NemsxUVFF
kind: Secret
metadata:
creationTimestamp: "2022-08-21T20:12:44Z"
labels:
app.kubernetes.io/instance: my-connect
app.kubernetes.io/managed-by: strimzi-user-operator
app.kubernetes.io/name: strimzi-user-operator
app.kubernetes.io/part-of: strimzi-my-connect
strimzi.io/cluster: my-cluster
strimzi.io/kind: KafkaUser
name: my-connect
namespace: kafka
ownerReferences:
- apiVersion: kafka.strimzi.io/v1beta2
blockOwnerDeletion: false
controller: false
kind: KafkaUser
name: my-connect
uid: 882447cc-7759-4884-9d2f-f57f8be92711
resourceVersion: "60439"
uid: 9313676f-3417-42d8-b3fb-a1b1fe1b3a39
type: Opaque
复制代码
现在,我们有了一个新的 Kafka 用户,拥有访问 Kafka 主题所需的权限。
在部署 Debezium Kafka Connector 之前,我们需要允许 Kafka Connector 对象使用 Kubernetes API 直接从 mysqlsecret Secret 对象中读取 MySQL 密钥(就像我们在应用程序中所做的那样),这样 Connector 就可以通过数据库身份验证并读取事务日志。
创建 kafka-role-binding.yaml 文件,内容如下:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: kafka
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["mysqlsecret", "my-connect", "my-cluster-cluster-ca-cert"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: kafka
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: kafka
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
复制代码
注意,subjects 下面的 name 是运行 Debezium Kafka Connect Pod 所需的服务帐户。我们还没有部署 Pod,不过在部署 KafkaConnect 组件时,创建的服务帐户需要遵循 $KafkaConnectName-connect 的格式。由于 Debezium Kafka Connect 的名称是 debezium-connect-cluster-connect,因此创建的服务帐户就是 my-connect-connect,并且我们授予这个帐户直接读取 Kubernetes Secrets 的权限。
在部署 Debezium Kafka Connect 之前应用 kafka-role-binding.yaml 文件:
kubectl apply -f kafka-role-binding.yaml -n kafka
role.rbac.authorization.k8s.io/connector-configuration-role created
rolebinding.rbac.authorization.k8s.io/connector-configuration-role-binding created
复制代码
下图总结了目前的安全通信:
为了部署 Debezium Kafka Connect,我们需要再次使用 Strimzi 提供的KafkaConnect对象,但需要做一些修改,以便通过 Kafka 集群的身份验证,并允许从 Kubernetes Secrets 读取配置参数(主要目的是读取 MySQL 凭证进行身份验证)。
配置如下字段:
端口现在是 9093。
设置用于与集群通信的 mTLS 证书(tls 字段)。
设置证书和密钥用户(authentication 字段),以便进行身份验证。
设置 config.providers,让 MySQL Connector 从 Kubernetes Secrets 读取配置。
externalConfiguration 用于将信任存储库和密钥存储库物化到文件中。它们被物化在/opt/kafka/external-configuration/目录下。MySQL Connector 会访问这些文件。创建 kafka-connect.yaml 文件,内容如下所示:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.0
image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: my-connect
certificate: user.crt
key: user.key
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
offset.storage.replication.factor: 1
config.storage.topic: connect-cluster-configs
config.storage.replication.factor: 1
status.storage.topic: connect-cluster-status
status.storage.replication.factor: 1
externalConfiguration:
volumes:
- name: cluster-ca
secret:
secretName: my-cluster-cluster-ca-cert
- name: my-user
secret:
secretName: my-connect
复制代码
trustedCertificates 设置为使用 Kafka 对象部署 Kafka 集群时创建的密钥。
authentication 下面的 certificateAndKey 设置为注册 KafkaUser 时创建的密钥。
部署资源并验证其正确性:
kubectl apply -f kafka-connect.yaml -n kafka
kafkaconnect.kafka.strimzi.io/debezium-connect-cluster created
复制代码
创建一个叫作 debezium-kafka-connector.yaml 的新文件,用于配置 Debezium,允许 MySQL Connector 访问 MySQL 实例的事务日志。在本例中,我们在连接器配置中没有使用明文的用户名和密码,而是引用前面用 MySQL 凭证创建的 Secret 对象。Secret 的访问格式为 secrets:<namespace>/<secretname>:<key>。此外,在应用了 KafkaConnect 定义后,它会读取物化的信任存储库和密钥库。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
namespace: kafka
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
group.id: connect-cluster
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: ${secrets:kafka/mysqlsecret:mysqlpassword}
database.server.id: 184054
database.server.name: mysql
database.include.list: moviesdb
database.allowPublicKeyRetrieval: true
table.include.list: moviesdb.OutboxEvent
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9093
database.history.kafka.topic: schema-changes.movies
database.history.producer.security.protocol: SSL
database.history.producer.ssl.keystore.type: PKCS12
database.history.producer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
database.history.producer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
database.history.producer.ssl.truststore.type: PKCS12
database.history.producer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
database.history.producer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}
database.history.consumer.security.protocol: SSL
database.history.consumer.ssl.keystore.type: PKCS12
database.history.consumer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
database.history.consumer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
database.history.consumer.ssl.truststore.type: PKCS12
database.history.consumer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
database.history.consumer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}
复制代码
在终端窗口中执行下面的命令应用这个文件注册 MySQL Connector:
kubectl apply -f kafka-connector.yaml -n kafka
kafkaconnector.kafka.strimzi.io/debezium-connector-mysql created
复制代码
最后,所有的通信通道都被保护起来了。
演示
现在,我们有了一个与上一篇文章中介绍的同样的示例,但现在它更安全了。
我们用一个叫作 outbox-viewer 的 Quarkus 应用程序来测试它,它将 OutboxEvent 主题的所有内容打印到控制台。部署下面的 YAML 文件:
---
apiVersion: v1
kind: ServiceAccount
metadata:
annotations:
app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
labels:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
name: outbox-viewer
namespace: kafka
---
apiVersion: v1
kind: Service
metadata:
annotations:
app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
labels:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
name: outbox-viewer
namespace: kafka
spec:
ports:
- name: http
port: 80
targetPort: 8080
selector:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
type: ClusterIP
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: view-secrets
namespace: kafka
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: outbox-viewer-view
namespace: kafka
roleRef:
kind: ClusterRole
apiGroup: rbac.authorization.k8s.io
name: view
subjects:
- kind: ServiceAccount
name: outbox-viewer
namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: outbox-viewer-view-secrets
namespace: kafka
roleRef:
kind: Role
apiGroup: rbac.authorization.k8s.io
name: view-secrets
subjects:
- kind: ServiceAccount
name: outbox-viewer
namespace: kafka
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
labels:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
name: outbox-viewer
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
template:
metadata:
annotations:
app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000
labels:
app.kubernetes.io/name: outbox-viewer
app.kubernetes.io/version: 1.0.0-SNAPSHOT
namespace: kafka
spec:
containers:
- env:
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: quay.io/lordofthejars/outbox-viewer:1.0.0-SNAPSHOT
imagePullPolicy: Always
name: outbox-viewer
ports:
- containerPort: 8080
name: http
protocol: TCP
volumeMounts:
- mountPath: /home/jboss/cluster
name: cluster-volume
readOnly: false
- mountPath: /home/jboss/user
name: user-volume
readOnly: false
serviceAccountName: outbox-viewer
volumes:
- name: cluster-volume
secret:
optional: false
secretName: my-cluster-cluster-ca-cert
- name: user-volume
secret:
optional: false
secretName: my-connect
复制代码
然后在终端窗口中可以看到应用程序 Pod 的日志。
kubectl logs outbox-viewer-684969f9f6-7snng -f
复制代码
将 Pod 名称替换为你的 Pod 的名称。
在终端中运行下面的命令查找 Movie Player Producer 应用程序的 IP 和端口:
minikube ip -p strimzi
192.168.59.106
复制代码
获取 movie-plays-producer-debezium 暴露的端口(第二个端口)。
kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:32460/TCP 67m
复制代码
向 Movie Play Producer 应用程序发送 curl 请求:
curl -X 'POST' \
'http://192.168.59.106:32460/movie' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "Minions: The Rise of Gru",
"director": "Kyle Balda",
"genre": "Animation"
}'
复制代码
根据你的示例调整 IP 和端口。
最后,检查 outbox-viewer Pod 的输出,可以看到数据从数据库传输到 Kafka。
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"bytes","optional":false,"field”
…
,"aggregatetype":"Movie","aggregateid":"1","type":"MovieCreated","timestamp":1661339188708005,"payload":"{\"id\":1,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1661339188000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":2967,"row":0,"thread":15,"query":null},"op":"c","ts_ms":1661339188768,"transaction":null}}
复制代码
Debezium Embedded
到目前为止,我们已经保护了应用程序和 MySQL 数据库、Debezium 服务器和 MySQL、Debezium 服务器和 Kafka 之间的通信。
你可能会想,如果使用部署在 Quarkus 应用程序中的 Debezium Embedded 而不是 Debezium Server 该怎么办?我们该如何配置 Kafka 连接使用 mTLS?
Quarkus 提供了两种连接 Kafka 的方式——Kafka客户端或响应式消息客户端。我们来看一下在使用这两种方式时通过 mTLS 认证方法验证 Kafka 集群所需的属性。
KeyStore 和 TrustStore
要在客户端配置 mTLS,需要四样东西:
建立 mTLS 连接所需的集群 TrustStore;
TrustStore 的密码;
用于身份验证的 Kafka User KeyStore;
KeyStore 的密码。前两个元素保存在之前应用 Strimzi 资源时创建的 my-cluster-cluster-ca-cert Kubernetes Secret 中。要获取它们,在终端窗口中运行下面的命令:
kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.p12}' | base64 -d > mtls-cluster-ca.p12
复制代码
获取密码:
kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.password}' | base64 -d
k2cckH0K5sOu
复制代码
后面的元素保存在 my-connect Kubernetes Secret 中。要获取它们,在终端窗口中运行下面的命令:
kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.p12}' | base64 -d > mtls-user.p12
复制代码
获取密码:
kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.password}' | base64 -d
QDx6NMzk1QQE
复制代码
现在,设置 Quarkus Kafka 配置属性,使用前面的凭证进行 Kafka 集群身份认证:
%prod.kafka.ssl.truststore.location=mtls-cluster-ca.p12
%prod.kafka.ssl.truststore.password=k2cckH0K5sOu
%prod.kafka.ssl.truststore.type=PKCS12
%prod.kafka.ssl.keystore.location=mtls-user.p12
%prod.kafka.ssl.keystore.password=QDx6NMzk1QQE
%prod.kafka.ssl.keystore.type=PKCS12
%prod.kafka.security.protocol=SSL
%prod.mp.messaging.incoming.movies.ssl.truststore.location=mtls-cluster-ca.p12
%prod.mp.messaging.incoming.movies.ssl.truststore.password=k2cckH0K5sOu
%prod.mp.messaging.incoming.movies.ssl.truststore.type=PKCS12
%prod.mp.messaging.incoming.movies.ssl.keystore.location=mtls-user.p12
%prod.mp.messaging.incoming.movies.ssl.keystore.password=QDx6NMzk1QQE
%prod.mp.messaging.incoming.movies.ssl.keystore.type=PKCS12
%prod.mp.messaging.incoming.movies.security.protocol=SSL
复制代码
我们可以像使用 MySQL 凭证一样,用 Quarkus Kubernetes Config 扩展来直接注入凭证,但为了简化起见,我们没有这么做。
不过,在安全性方面,仍然有一个重要的缺失点:如何正确地在 YAML 文件中存储密钥,以及如何在 Kubernetes 集群中安全地保存密钥?
加密密钥
在本文开始时,我们使用 MySQL 凭证创建了一个 Kubernetes Secret 对象,但它是一个包含 Base64 编码的敏感信息的 YAML 文件,所以并不安全。这个 YAML 文件可能最终会保存在 Git 存储库中,任何有权访问存储库的人都可以使用这些密钥。在下一节中,我们将解决这个问题。
Sealed Secrets
Sealed Secrets是一个 Kubernetes 控制器,允许在客户端(本地机器)加密 Kubernetes Secrets 资源,并在应用后在 Kubernetes 集群内解密它们。
Sealed Secrets 需要用到两个组件,第一个是用于加密密钥的 kubeseal CLI 工具。
要安装 kubeseal,请根据你的操作系统从这个链接下载软件包。
第二个是 kubeseal Kubernetes 控制器。在命令行中执行下面的命令来安装它:
kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.18.1/controller.yaml -n kube-system
role.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
clusterrole.rbac.authorization.k8s.io/secrets-unsealer created
deployment.apps/sealed-secrets-controller created
customresourcedefinition.apiextensions.k8s.io/sealedsecrets.bitnami.com created
service/sealed-secrets-controller created
role.rbac.authorization.k8s.io/sealed-secrets-key-admin created
clusterrolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created
serviceaccount/sealed-secrets-controller created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created
复制代码
运行下面的命令检查控制器是否正确部署并运行:
kubectl get pods -n kube-system
sealed-secrets-controller-554d94cb68-xr6mw 1/1 Running 0 8m46s
复制代码
在那之后,我们可以基于 mysql-secret.yaml 文件使用 kubeseal 工具自动创建一个新的 Kubernetes 资源 SealedSecret,其中数据字段是加密的。
kubeseal -n kube -o yaml <mysql-secret.yaml > mysql-secret-encrypted.yaml
复制代码
生成的新文件叫作 mysql-secret-encrypted.yaml,其中每个密钥的值都经过加密:
apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
creationTimestamp: null
name: mysqlsecret
namespace: kube
spec:
encryptedData:
mysqlpassword: AgBl721mnowwPlC35FfO26zP0
mysqlrootpassword: AgAKl1tWV8hahn00yGS4ucs
mysqluser: AgCWrWFl1/LcS
template:
data: null
metadata:
creationTimestamp: null
name: mysqlsecret
namespace: kafka
type: Opaque
复制代码
现在,你可以安全地删除 mysql-secret.yaml 文件,因为我们不再需要它了。
像应用其他 Kubernetes 资源文件一样应用加密的资源,Sealed Secrets Kubernetes 控制器将解密并将其作为正常的密钥保存在 Kubernetes 中。
你可以通过下面的命令验证 Secret:
kubectl get secret mysqlsecret -n kafka -o yaml
apiVersion: v1
data:
mysqlpassword: YWxleA==
mysqlrootpassword: YWxleA==
mysqluser: YWxleA==
kind: Secret
metadata:
creationTimestamp: "2022-08-21T19:05:21Z"
name: mysqlsecret
namespace: kafka
ownerReferences:
- apiVersion: bitnami.com/v1alpha1
controller: true
kind: SealedSecret
name: mysqlsecret
uid: 2a5ee74b-c2b2-49b3-9a9f-877e7a77b163
resourceVersion: "41514"
uid: 494cbe8b-7480-4ebd-9cc5-6fe396795eaa
type: Opaque
复制代码
需要注意的是,这是一个解密的 Kubernetes Secret,引用了负责创建它的 SealedSecret。因此,SealedSecret 的生命周期也与 Secret 紧密相关。
我们已经解决了正确存储 YAML 文件而不泄露敏感数据的问题,但是当 Secret 被应用到 Kubernetes 集群,它是以 Base64 编码格式存储的,所以它不是密钥。
静态密钥
默认情况下,Kubernetes 不会在 etcd 数据库中存储加密的密钥。静态加密密钥数据是一个很大的话题,值得专门为其写一篇文章(事实上,“Kubernetes Secret Management”一书专门讨论了这个话题)。每一种 Kubernetes 实现都有可能使用不同的方式来启用静态密钥加密,尽管最后都是一个被复制到每个 kube-apiserver 节点中的配置文件(EncryptionConfiguration)。
该文件的格式为:
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
- secrets
providers:
- identity: {}
- aesgcm:
keys:
- name: key1
secret: c2VjcmV0IGlzIHNlY3VyZQ==
- name: key2
secret: dGhpcyBpcyBwYXNzd29yZA==
- aescbc:
keys:
- name: key1
secret: c2VjcmV0IGlzIHNlY3VyZQ==
- name: key2
secret: dGhpcyBpcyBwYXNzd29yZA==
- secretbox:
keys:
- name: key1
secret: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=
复制代码
在下图中,我们可以看到在 kube-apiserver 中注册 EncryptionConfiguration 文件的流程。
现在,我们已经使用 SealedSecrets 对象来加密 YAML 文件中的密钥,还使用 EncryptionConfiguration 文件来保护静态密钥。
结论
保护好所有的基础设施是一件很重要的事情,我们已经在本文中学习了如何使用 Kubernetes Secrets 来保护对数据库和 Kafka 的访问。
我们不仅可以使用 Strimzi 定义身份验证,还可以定义授权,提供一些规则,规定谁可以对 Kafka 主题做什么。
访问这些密钥也是一个重要的部分,Quarkus 和 Debezium 允许你以一种高效而安全的方式访问这些密钥,而不需要将密钥持久化在文件系统中(或作为环境变量),而是直接将它们注入内存。
安全性是一个重要的话题,当需要在 Kafka 集群中管理安全性时,Strimzi 是一个完美的选择。
源代码可以在GitHub上找到。
作者简介:
Alex Soto 是 Red Hat 的开发者体验总监。他对 Java、软件自动化充满了热情,并且深信开源软件模式。Soto 是“Testing Java Microservices”(Manning)和“Quarkus Cookbook”(O'Reilly)的合著者,也是几个开源项目的贡献者。自 2017 年以来,他获得了 Java Champion 称号,也是萨尔 Universidad Ramon Llull 大学的国际演讲者和教师。如果你想继续关注 Kubernetes 和 Java 的动态,可以在 Twitter 上关注他(https://twitter.com/alexsotob)。
原文链接:
https://www.infoq.com/articles/secure-kafka-cluster-strimzi/
评论