Debezium日常分享系列之:在 Kubernetes 上部署 Debezium
- 先決條件
- 步驟
- 部署數據源 (MySQL)
- 登錄 MySQL db
- 將數據插入其中
- 部署 Kafka
- 部署 kafdrop
- 部署 Debezium 連接器
- 創建 Debezium 連接器
Debezium 可以無縫部署在 Kubernetes(一個用于容器編排的開源平臺)上。此部署利用了 Strimzi 項目,該項目通過自定義資源簡化了 Kubernetes 上 Kafka Connect 和連接器的部署。
先決條件
- 一個正在運行的 Kubernetes 集群(本演示將使用 minikube)
- kubectl
- helm
步驟
使用 registry 啟動 minikube 集群
minikube start --insecure-registry "10.0.0.0/24"
啟用注冊表插件
minikube addons enable registry
部署 OLM(操作員生命周期管理器)
git clone https://github.com/operator-framework/operator-lifecycle-manager.gitcd operator-lifecycle-manager/deploy/charthelm install olm .
檢查operator-lifecycle-manager命名空間中的所有pod是否處于運行狀態。
應用以下清單來部署 strimzi kafka 操作員
cat > strimzi-kafka-operator.yaml <<EOF
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:name: strimzi-kafka-operatornamespace: operators
spec:channel: stablename: strimzi-kafka-operatorsource: operatorhubio-catalogsourceNamespace: operator-lifecycle-manager # this should be same as the namespace in which olm is deployed
EOF
- kind: 指定資源類型,這里是
Subscription
。Subscription
資源用于訂閱一個 Operator,以便在集群中自動更新和管理該 Operator。 - spec:
- channel: 訂閱的頻道,這里是
stable
。頻道通常用于指定不同版本的 Operator。 - name: 要訂閱的 Operator 的名稱,這里是
strimzi-kafka-operator
。 - source: Operator Catalog 的名稱,這里是
operatorhubio-catalog
。Catalog 是一個包含多個 Operator 的集合。 - sourceNamespace: Catalog 所在的命名空間,這里是
operator-lifecycle-manager
。這個命名空間通常是 Operator Lifecycle Manager (OLM) 安裝的命名空間。
- channel: 訂閱的頻道,這里是
kubectl apply -f strimzi-kafka-operator.yaml
檢查 Operator Pod 在 Operator 命名空間中是否處于運行狀態。啟動并運行 Pod 可能需要幾分鐘時間。
部署數據源 (MySQL)
helm repo add bitnami https://charts.bitnami.com/bitnamihelm repo update # required if above repo is already addedkubectl create ns dbcat > mysql-values.yaml <<EOF
auth:rootPassword: "root"database: "debezium_db"username: "mysql_usr"password: "mysql_pwd"
EOFhelm install -n db mysql bitnami/mysql --version 12.2.2 -f mysql-values.yaml
helm repo add bitnami https://charts.bitnami.com/bitnami
命令解釋:這個命令將Bitnami Helm倉庫添加到你的Helm客戶端中。Bitnami是一個提供高質量、預構建的Kubernetes應用包的公司
注意:此圖表中已啟用 bin 日志,這是 Debezium 所必需的。
檢查 mysql pod 是否在 db 命名空間中啟動并運行。
登錄 MySQL db
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace db mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)kubectl run mysql-client --rm --tty -i --restart='Never' --image docker.io/bitnami/mysql:8.4.4-debian-12-r0 --namespace db --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD --command -- bashmysql -h mysql.db.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD"# giving permissions to mysql_usr to allow reading the bin_logs properly
GRANT RELOAD, FLUSH_TABLES ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;
kubectl get secret --namespace db mysql
:從Kubernetes的db
命名空間中獲取名為mysql
的Secret對象。-o jsonpath="{.data.mysql-root-password}"
:使用JSONPath表達式從Secret對象中提取mysql-root-password
字段的數據。| base64 -d
:將提取出的Base64編碼的字符串解碼為明文。kubectl run mysql-client
:創建一個名為mysql-client
的Pod。--rm
:Pod運行結束后自動刪除。--tty -i
:分配一個偽終端并保持輸入交互。--restart='Never'
:設置Pod的重啟策略為不重啟。--image docker.io/bitnami/mysql:8.4.4-debian-12-r0
:使用指定的Docker鏡像。--namespace db
:在db
命名空間中創建Pod。--env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD
:將環境變量MYSQL_ROOT_PASSWORD
設置為之前獲取的root密碼。--command -- bash
:在Pod中啟動一個Bash shell。
將數據插入其中
-- Create the database if it doesn't exist
CREATE DATABASE IF NOT EXISTS debezium_db;-- Use the database
USE debezium_db;-- Create a sample table
CREATE TABLE employees (id INT AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(50) NOT NULL,last_name VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,salary DECIMAL(10,2) NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- Insert sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('John', 'Doe', 'john.doe@example.com', 60000.00),
('Jane', 'Smith', 'jane.smith@example.com', 75000.00),
('Alice', 'Johnson', 'alice.johnson@example.com', 82000.00),
('Bob', 'Williams', 'bob.williams@example.com', 50000.00);
部署 Kafka
kubectl create ns kafkacat > kafka-values.yaml <<EOF
listeners:client:protocol: PLAINTEXTcontroller:protocol: PLAINTEXTinterbroker:protocol: PLAINTEXTexternal:protocol: PLAINTEXT
sasl:enabledMechanisms: PLAIN
broker:replicaCount: 1
controller:replicaCount: 1
EOFhelm install kafka -n kafka bitnami/kafka --version 31.3.1 -f kafka-values.yaml
注意:上述 Kafka 配置是為了簡化演示,禁用了 Kafka Broker 和 Controller 的所有身份驗證機制。不建議在生產環境中禁用身份驗證運行 Kafka。
請檢查 Kafka Broker 和 Controller Pod 是否在 Kafka 命名空間中啟動并運行。
部署 kafdrop
安裝 Kafka UI,用于查看 Kafka 主題和瀏覽消費者組。
helm repo add lsst-sqre https://lsst-sqre.github.io/charts/helm repo update # required if above repo is already addedcat > kafdrop-values.yaml <<EOF
kafka:brokerConnect: kafka:9092
EOFhelm install -n kafka kafdrop lsst-sqre/kafdrop --version 0.1.3 -f kafdrop-values.yaml# port forward kafdrop to localhost:9000
kubectl port-forward -n kafka svc/kafdrop 9000:9000
部署 Debezium 連接器
要部署 Debezium 連接器,您需要先部署一個包含所需連接器插件的 Kafka Connect 集群,然后再實例化實際的連接器本身。第一步,需要創建一個包含該插件的 Kafka Connect 容器鏡像。
創建 Kafka Connect 集群
運行以下命令獲取 minikube 注冊表的 IP:kubectl -n kube-system get svc registry -o jsonpath=‘{.spec.clusterIP}’,并將其替換到下面的 kafkaconnect 清單中。
get svc registry
: 這部分命令用來獲取名為registry
的服務的信息。svc
是service
的縮寫,代表服務資源。-o jsonpath='{.spec.clusterIP}'
: 這個選項指定輸出格式為 JSONPath 表達式的結果。JSONPath 是一種查詢 JSON 數據的語法,類似于 XPath 用于 XML。這里的{.spec.clusterIP}
表示從返回的服務對象中提取spec.clusterIP
字段的值,即該服務的 ClusterIP 地址。
kubectl create ns debeziumcat > kafka-connect.yaml <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"namespace: debezium
spec:version: 3.8.0replicas: 1bootstrapServers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointconfig:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1build:output:type: dockerimage: <TO_BE_REPLACED_BY_ABOVE_CLUSTER_IP>/debezium-connect-mysql:latestplugins:- name: debezium-mysql-connectorartifacts:- type: tgzurl: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.7.Final/debezium-connector-mysql-3.0.7.Final-plugin.tar.gz
EOFkubectl apply -f kafka-connect.yaml
build
: 配置了構建過程:output.type
和output.image
: 指定了 Docker 鏡像的輸出類型和名稱。plugins
: 定義了要安裝的插件,這里是 Debezium MySQL 連接器,指定了插件的下載 URL。
注意:在上述配置中,如果您希望將鏡像推送到 ECR/GCR 或任何其他鏡像倉庫,請將鏡像端點替換為相應的端點,并且集群應具有將鏡像推送到該倉庫的權限。
檢查 Kafka 連接是否已就緒。可能需要幾分鐘(4-5 分鐘)才能進入就緒狀態。
kubectl get kafkaconnect -n debezium
它應該返回狀態 Ready: True
NAME DESIRED REPLICAS READY
debezium-connect-cluster 1 True
在 Kafdrop 中,您應該能夠看到 3 個主題:
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
創建 Debezium 連接器
在創建連接器之前,我們需要創建一個 k8s secret(用于存儲數據庫憑據)和 rbac。
cat > mysql-creds.yaml <<EOF
apiVersion: v1
kind: Secret
metadata:name: mysql-credsnamespace: debezium
type: Opaque
stringData:username: mysql_usrpassword: mysql_pwd
EOFkubectl apply -f mysql-creds.yamlcat > role.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: debezium
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["mysql-creds"]verbs: ["get"]
EOFkubectl apply -f role.yamlcat > role-binding.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: debezium
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: debezium
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
EOFkubectl apply -f role-binding.yaml
通過登錄 MySQL 獲取 MySQL server_id(按照與上述相同的步驟)
SELECT @@server_id; # it is expected to be 1
部署 kafka 連接器
cat > kafka-connector.yaml <<'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-mysqlnamespace: debeziumlabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:tasks.max: 1database.hostname: mysql.db.svc.cluster.local # mysql db hostnamedatabase.port: 3306database.user: ${secrets:debezium/mysql-creds:username}database.password: ${secrets:debezium/mysql-creds:password}database.server.id: 1 # SELECT @@server_id;topic.prefix: mysqldatabase.include.list: debezium_dbschema.history.internal.kafka.bootstrap.servers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointschema.history.internal.kafka.topic: schema-changes.debezium_db
EOFkubectl apply -f kafka-connector.yaml
檢查 kafka 連接是否已就緒。可能需要幾分鐘才能進入就緒狀態。
kubectl get kafkaconnector -n debezium
它應該返回狀態 Ready: True
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
debezium-connector-mysql debezium-connect-cluster io.debezium.connector.mysql.MySqlConnector 1 True
現在,您應該能夠在 kafdrop 中看到更多主題,例如 mysql.debezium_db.employees。
此主題將包含上面創建員工表時插入的所有數據。
為了測試 Debezium 連接器,請向表中添加更多數據。
-- Insert more sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('Charlie', 'Brown', 'charlie.brown@example.com', 72000.00),
('David', 'Miller', 'david.miller@example.com', 68000.00),
('Emma', 'Wilson', 'emma.wilson@example.com', 79000.00),
('Frank', 'Anderson', 'frank.anderson@example.com', 55000.00),
('Grace', 'Thomas', 'grace.thomas@example.com', 87000.00),
('Henry', 'Taylor', 'henry.taylor@example.com', 62000.00);
它應該反映在主題mysql.debezium_db.employees中。
這表明 MySQL 表員工中所做的更改已反映在 Kafka 中。