Debezium日常分享系列之:在 Kubernetes 上部署 Debezium

Debezium日常分享系列之:在 Kubernetes 上部署 Debezium

  • 先決條件
  • 步驟
  • 部署數據源 (MySQL)
  • 登錄 MySQL db
  • 將數據插入其中
  • 部署 Kafka
  • 部署 kafdrop
  • 部署 Debezium 連接器
  • 創建 Debezium 連接器

Debezium 可以無縫部署在 Kubernetes(一個用于容器編排的開源平臺)上。此部署利用了 Strimzi 項目,該項目通過自定義資源簡化了 Kubernetes 上 Kafka Connect 和連接器的部署。

先決條件

  • 一個正在運行的 Kubernetes 集群(本演示將使用 minikube)
  • kubectl
  • helm

步驟

使用 registry 啟動 minikube 集群

minikube start --insecure-registry "10.0.0.0/24"

啟用注冊表插件

minikube addons enable registry

部署 OLM(操作員生命周期管理器)

git clone https://github.com/operator-framework/operator-lifecycle-manager.gitcd operator-lifecycle-manager/deploy/charthelm install olm .

檢查operator-lifecycle-manager命名空間中的所有pod是否處于運行狀態。
在這里插入圖片描述
應用以下清單來部署 strimzi kafka 操作員

cat > strimzi-kafka-operator.yaml <<EOF
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:name: strimzi-kafka-operatornamespace: operators
spec:channel: stablename: strimzi-kafka-operatorsource: operatorhubio-catalogsourceNamespace: operator-lifecycle-manager # this should be same as the namespace in which olm is deployed
EOF
  • kind: 指定資源類型,這里是 SubscriptionSubscription 資源用于訂閱一個 Operator,以便在集群中自動更新和管理該 Operator。
  • spec:
    • channel: 訂閱的頻道,這里是 stable。頻道通常用于指定不同版本的 Operator。
    • name: 要訂閱的 Operator 的名稱,這里是 strimzi-kafka-operator
    • source: Operator Catalog 的名稱,這里是 operatorhubio-catalog。Catalog 是一個包含多個 Operator 的集合。
    • sourceNamespace: Catalog 所在的命名空間,這里是 operator-lifecycle-manager。這個命名空間通常是 Operator Lifecycle Manager (OLM) 安裝的命名空間。
kubectl apply -f strimzi-kafka-operator.yaml

檢查 Operator Pod 在 Operator 命名空間中是否處于運行狀態。啟動并運行 Pod 可能需要幾分鐘時間。

部署數據源 (MySQL)

helm repo add bitnami https://charts.bitnami.com/bitnamihelm repo update # required if above repo is already addedkubectl create ns dbcat > mysql-values.yaml <<EOF
auth:rootPassword: "root"database: "debezium_db"username: "mysql_usr"password: "mysql_pwd"
EOFhelm install -n db mysql bitnami/mysql --version 12.2.2 -f mysql-values.yaml
  • helm repo add bitnami https://charts.bitnami.com/bitnami
    命令解釋:這個命令將Bitnami Helm倉庫添加到你的Helm客戶端中。Bitnami是一個提供高質量、預構建的Kubernetes應用包的公司

注意:此圖表中已啟用 bin 日志,這是 Debezium 所必需的。

檢查 mysql pod 是否在 db 命名空間中啟動并運行。

登錄 MySQL db

MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace db mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)kubectl run mysql-client --rm --tty -i --restart='Never' --image  docker.io/bitnami/mysql:8.4.4-debian-12-r0 --namespace db --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD --command -- bashmysql -h mysql.db.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD"# giving permissions to mysql_usr to allow reading the bin_logs properly
GRANT RELOAD, FLUSH_TABLES ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;
  • kubectl get secret --namespace db mysql:從Kubernetes的db命名空間中獲取名為mysql的Secret對象。
  • -o jsonpath="{.data.mysql-root-password}":使用JSONPath表達式從Secret對象中提取mysql-root-password字段的數據。
  • | base64 -d:將提取出的Base64編碼的字符串解碼為明文。
  • kubectl run mysql-client:創建一個名為mysql-client的Pod。
  • --rm:Pod運行結束后自動刪除。
  • --tty -i:分配一個偽終端并保持輸入交互。
  • --restart='Never':設置Pod的重啟策略為不重啟。
  • --image docker.io/bitnami/mysql:8.4.4-debian-12-r0:使用指定的Docker鏡像。
  • --namespace db:在db命名空間中創建Pod。
  • --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD:將環境變量MYSQL_ROOT_PASSWORD設置為之前獲取的root密碼。
  • --command -- bash:在Pod中啟動一個Bash shell。

將數據插入其中

-- Create the database if it doesn't exist
CREATE DATABASE IF NOT EXISTS debezium_db;-- Use the database
USE debezium_db;-- Create a sample table
CREATE TABLE employees (id INT AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(50) NOT NULL,last_name VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,salary DECIMAL(10,2) NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- Insert sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('John', 'Doe', 'john.doe@example.com', 60000.00),
('Jane', 'Smith', 'jane.smith@example.com', 75000.00),
('Alice', 'Johnson', 'alice.johnson@example.com', 82000.00),
('Bob', 'Williams', 'bob.williams@example.com', 50000.00);

部署 Kafka

kubectl create ns kafkacat > kafka-values.yaml <<EOF
listeners:client:protocol: PLAINTEXTcontroller:protocol: PLAINTEXTinterbroker:protocol: PLAINTEXTexternal:protocol: PLAINTEXT
sasl:enabledMechanisms: PLAIN
broker:replicaCount: 1
controller:replicaCount: 1
EOFhelm install kafka -n kafka bitnami/kafka --version 31.3.1 -f kafka-values.yaml

注意:上述 Kafka 配置是為了簡化演示,禁用了 Kafka Broker 和 Controller 的所有身份驗證機制。不建議在生產環境中禁用身份驗證運行 Kafka。

請檢查 Kafka Broker 和 Controller Pod 是否在 Kafka 命名空間中啟動并運行。

部署 kafdrop

安裝 Kafka UI,用于查看 Kafka 主題和瀏覽消費者組。

helm repo add lsst-sqre https://lsst-sqre.github.io/charts/helm repo update # required if above repo is already addedcat > kafdrop-values.yaml <<EOF
kafka:brokerConnect: kafka:9092
EOFhelm install -n kafka kafdrop lsst-sqre/kafdrop --version 0.1.3 -f kafdrop-values.yaml# port forward kafdrop to localhost:9000
kubectl port-forward -n kafka svc/kafdrop 9000:9000

部署 Debezium 連接器

要部署 Debezium 連接器,您需要先部署一個包含所需連接器插件的 Kafka Connect 集群,然后再實例化實際的連接器本身。第一步,需要創建一個包含該插件的 Kafka Connect 容器鏡像。

創建 Kafka Connect 集群

運行以下命令獲取 minikube 注冊表的 IP:kubectl -n kube-system get svc registry -o jsonpath=‘{.spec.clusterIP}’,并將其替換到下面的 kafkaconnect 清單中。

  • get svc registry: 這部分命令用來獲取名為 registry 的服務的信息。svcservice 的縮寫,代表服務資源。
  • -o jsonpath='{.spec.clusterIP}': 這個選項指定輸出格式為 JSONPath 表達式的結果。JSONPath 是一種查詢 JSON 數據的語法,類似于 XPath 用于 XML。這里的 {.spec.clusterIP} 表示從返回的服務對象中提取 spec.clusterIP 字段的值,即該服務的 ClusterIP 地址。
kubectl create ns debeziumcat > kafka-connect.yaml <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"namespace: debezium
spec:version: 3.8.0replicas: 1bootstrapServers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointconfig:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1build:output:type: dockerimage: <TO_BE_REPLACED_BY_ABOVE_CLUSTER_IP>/debezium-connect-mysql:latestplugins:- name: debezium-mysql-connectorartifacts:- type: tgzurl: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.7.Final/debezium-connector-mysql-3.0.7.Final-plugin.tar.gz
EOFkubectl apply -f kafka-connect.yaml
  • build: 配置了構建過程:
  • output.typeoutput.image: 指定了 Docker 鏡像的輸出類型和名稱。
  • plugins: 定義了要安裝的插件,這里是 Debezium MySQL 連接器,指定了插件的下載 URL。

注意:在上述配置中,如果您希望將鏡像推送到 ECR/GCR 或任何其他鏡像倉庫,請將鏡像端點替換為相應的端點,并且集群應具有將鏡像推送到該倉庫的權限。

檢查 Kafka 連接是否已就緒。可能需要幾分鐘(4-5 分鐘)才能進入就緒狀態。

kubectl get kafkaconnect -n debezium

它應該返回狀態 Ready: True

NAME                       DESIRED REPLICAS   READY
debezium-connect-cluster   1                  True

在 Kafdrop 中,您應該能夠看到 3 個主題:

connect-cluster-configs
connect-cluster-offsets
connect-cluster-status

創建 Debezium 連接器

在創建連接器之前,我們需要創建一個 k8s secret(用于存儲數據庫憑據)和 rbac。

cat > mysql-creds.yaml <<EOF
apiVersion: v1
kind: Secret
metadata:name: mysql-credsnamespace: debezium
type: Opaque
stringData:username: mysql_usrpassword: mysql_pwd
EOFkubectl apply -f mysql-creds.yamlcat > role.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: debezium
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["mysql-creds"]verbs: ["get"]
EOFkubectl apply -f role.yamlcat > role-binding.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: debezium
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: debezium
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
EOFkubectl apply -f role-binding.yaml

通過登錄 MySQL 獲取 MySQL server_id(按照與上述相同的步驟)

SELECT @@server_id; # it is expected to be 1

部署 kafka 連接器

cat > kafka-connector.yaml <<'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-mysqlnamespace: debeziumlabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:tasks.max: 1database.hostname: mysql.db.svc.cluster.local # mysql db hostnamedatabase.port: 3306database.user: ${secrets:debezium/mysql-creds:username}database.password: ${secrets:debezium/mysql-creds:password}database.server.id: 1 # SELECT @@server_id;topic.prefix: mysqldatabase.include.list: debezium_dbschema.history.internal.kafka.bootstrap.servers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointschema.history.internal.kafka.topic: schema-changes.debezium_db
EOFkubectl apply -f kafka-connector.yaml

檢查 kafka 連接是否已就緒。可能需要幾分鐘才能進入就緒狀態。

kubectl get kafkaconnector -n debezium

它應該返回狀態 Ready: True

NAME                       CLUSTER                    CONNECTOR CLASS                              MAX TASKS   READY
debezium-connector-mysql   debezium-connect-cluster   io.debezium.connector.mysql.MySqlConnector   1           True

現在,您應該能夠在 kafdrop 中看到更多主題,例如 mysql.debezium_db.employees。

此主題將包含上面創建員工表時插入的所有數據。

為了測試 Debezium 連接器,請向表中添加更多數據。

-- Insert more sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('Charlie', 'Brown', 'charlie.brown@example.com', 72000.00),
('David', 'Miller', 'david.miller@example.com', 68000.00),
('Emma', 'Wilson', 'emma.wilson@example.com', 79000.00),
('Frank', 'Anderson', 'frank.anderson@example.com', 55000.00),
('Grace', 'Thomas', 'grace.thomas@example.com', 87000.00),
('Henry', 'Taylor', 'henry.taylor@example.com', 62000.00);

它應該反映在主題mysql.debezium_db.employees中。
在這里插入圖片描述
這表明 MySQL 表員工中所做的更改已反映在 Kafka 中。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/86997.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/86997.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/86997.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

利潤才是機器視覺企業的的“穩定器”,機器視覺企業的利潤 = (規模經濟 + 技術差異化 × 場景價值) - 競爭強度

影響機器視覺企業盈利能力的關鍵因素。這個公式本質上反映了行業的核心動態:利潤來自成本控制(規模化效應)和差異化優勢(技術壁壘與場景稀缺性的協同),但被市場競爭(內卷程度)所侵蝕。下面我將一步步拆解這個公式,結合機器視覺行業的特點(如工業自動化、質檢、安防、…

EPLAN 中定制 自己的- A3 圖框的詳細指南(一)

EPLAN 中定制 BIEM - A3 圖框的詳細指南 在智能電氣設計領域&#xff0c;圖框作為圖紙的重要組成部分&#xff0c;其定制的規范性和準確性至關重要。本文將以北京經濟管理職業學院人工智能學院的相關任務為例&#xff0c;詳細介紹在 EPLAN 軟件中定制 BIEM - A3 圖框的全過程…

macbook開發環境的配置記錄

前言&#xff1a;好多東西不記錄就會忘記 git ssh配置 當我們的沒有配置git ssh的時候&#xff0c;使用ssh下載的時候會顯示報錯“make sure you have the correct access rights and respository exits" 如何解決&#xff0c;我們先在命令行檢查檢查一下用戶名和郵箱是…

GitLab 18.1 高級 SAST 已支持 PHP,可升級體驗!

GitLab 是一個全球知名的一體化 DevOps 平臺&#xff0c;很多人都通過私有化部署 GitLab 來進行源代碼托管。極狐GitLab 是 GitLab 在中國的發行版&#xff0c;專門為中國程序員服務。可以一鍵式部署極狐GitLab。 學習極狐GitLab 的相關資料&#xff1a; 極狐GitLab 官網極狐…

[學習]M-QAM的數學原理與調制解調原理詳解(仿真示例)

M-QAM的數學原理與調制解調原理詳解 QAM&#xff08;正交幅度調制&#xff09;作為現代數字通信的核心技術&#xff0c;其數學原理和實現方法值得深入探討。本文將分為數學原理、調制解調原理和實現要點三個部分進行系統闡述。 文章目錄 M-QAM的數學原理與調制解調原理詳解一、…

圖書管理系統練習項目源碼-前后端分離-使用node.js來做后端開發

前端學習了這么久了&#xff0c;node.js 也有了一定的了解&#xff0c;知道使用node也可以來開發后端&#xff0c;今天給大家分享 使用node 來做后端&#xff0c;vue來寫前端&#xff0c;做一個簡單的圖書管理系統。我們在剛開始學習編程的時候&#xff0c;需要自己寫大量的項目…

【甲方安全視角】企業建設下的安全運營

文章目錄 一、安全運營的概念與起源二、安全運營的職責與定位三、安全運營工程師的核心能力要求四、安全運營的典型場景與應對技巧1. 明確責任劃分,避免“醫生做保姆”2. 推動機制:自下而上 vs. 自上而下3. 宣傳與內部影響力建設五、安全運營的戰略意義六、為何需要安全原因在…

03認證原理自定義認證添加認證驗證碼

目錄 大綱 一、自定義資源權限規則 二、自定義登錄界面 三、自定義登錄成功處理 四、顯示登錄失敗信息 五、自定義登錄失敗處理 六、注銷登錄 七、登錄用戶數據獲取 1. SecurityContextHolder 2. SecurityContextHolderStrategy 3. 代碼中獲取認證之后用戶數據 4. 多…

IPLOOK 2025上半年足跡回顧:連接全球,步履不停

2025年上半年&#xff0c;IPLOOK積極活躍于全球通信舞臺&#xff0c;足跡橫跨亞洲、歐洲、非洲與北美洲&#xff0c;我們圍繞5G核心網、私有網絡、云化架構等方向&#xff0c;向來自不同地區的客戶與合作伙伴展示了領先的端到端解決方案&#xff0c;深入了解各地市場需求與技術…

【Kafka】docker 中配置帶 Kerberos 認證的 Kafka 環境(全過程)

1. 準備 docker 下載鏡像 docker pull centos/systemd&#xff0c;該鏡像是基于 centos7 增加了 systemd 的功能&#xff0c;可以更方便啟動后臺服務 啟動鏡像 使用 systemd 功能需要權限&#xff0c;如果是模擬 gitlab services 就不要使用 systemd 的方式啟動 如果不使用 s…

用Python構建一個可擴展的多網盤聚合管理工具 (以阿里云盤為例)

摘要 本文旨在從開發者視角&#xff0c;探討并實踐如何構建一個命令行界面的、支持多網盤聚合管理的工具。我們將以阿里云盤為例&#xff0c;深入解析其API認證與核心操作&#xff0c;并用Python從零開始實現文件列表、重命名、分享等功能。更重要的是&#xff0c;本文將重點討…

筑牢網絡安全屏障

在數字化浪潮席卷全球的今天&#xff0c;網絡空間已成為繼陸、海、空、天之后的 “第五疆域”&#xff0c;深刻影響著國家政治、經濟、軍事等各個領域。“沒有網絡安全就沒有國家安全”&#xff0c;這句論斷精準道出了網絡安全與國家安全之間密不可分的關系。? 網絡安全關乎國…

計算機網絡(一)層

一、分層 分層的意義&#xff1a;簡化復雜性、提高靈活性、促進標準化 &#xff08;1&#xff09;法律上國際標準——OSI體系結構 &#xff08;2&#xff09;事實上的網絡標準——TCP/IP體系結構 TCP&#xff1a;運輸層的協議 IP&#xff1a;網際層的一個協議 網絡接口層&…

STM32 rs485實現中斷DMA模式收發不定長數據

在STM32F103上使用TD301D485H模塊通過USB轉485/422串口線與電腦通信 TXD (TD301D485H) -> PA2 (STM32F103)RXD (TD301D485H) -> PA3 (STM32F103)CON (TD301D485H) -> PA1 (STM32F103) 由于485是半雙工通信&#xff0c;需要在發送和接收時控制方向引腳&#xff08;CO…

DDL-8-小結

DDL 小結 DDL 小結 DDL 小結DDL - 數據庫操作DDL - 表操作 DDL - 數據庫操作 查看當前有哪些數據庫 SHOW DATABASES;新建數據庫 CREATE DATABASE 數據庫名;使用數據庫 USE 數據庫名;查詢當前數據庫 SELECT DATABASE();刪除數據庫 DROP DATABASE 數據庫名;DDL - 表操作 查看當前…

Redis 安裝使用教程

一、Redis 簡介 Redis 是一個開源&#xff08;BSD 許可&#xff09;、內存數據結構存儲系統&#xff0c;可以用作數據庫、緩存和消息中間件。支持字符串、哈希、列表、集合、有序集合等數據類型&#xff0c;廣泛應用于分布式緩存、排行榜、實時數據分析等場景。 二、下載安裝…

Go語言測試與調試:單元測試與基準測試

以下是《Go語言實戰指南》中關于 測試與調試&#xff1a;單元測試與基準測試 的詳細內容&#xff0c;涵蓋測試編寫、運行、覆蓋率分析與性能測試&#xff0c;適用于實際項目開發與性能優化階段。 一、Go 的測試體系概覽 Go 提供原生的測試工具包 testing&#xff0c;無需第三方…

數字FIR-I型濾波器設計(窗函數法)

目錄 一、實驗目的 二、實驗原理 2.1 概念辨析 2.2 代碼實現邏輯與工具函數 三、實驗內容 3.1 設計帶通濾波器&#xff08;電平組合法&#xff0c;&#xff08;理想寬帶低通-理想窄帶低通&#xff09;x窗函數&#xff09; 3.2 高通濾波器&#xff08;…

RHCSA認證題目練習一(配置網絡設置)

一. 題目 配置網絡設置 解題過程&#xff1a; 注意&#xff1a;不可以在xshell中完成&#xff0c;否則會直接斷聯 這里用圖形化解題&#xff0c;更加簡單防止命令記錯 1. 打開圖形化視圖 命令&#xff1a;nmtui 按回車確認 按回車確認 2.首先把IPv4配置 <自動> 改成 …

STL簡介+string模擬實現

STL簡介string模擬實現 1. 什么是STL2. STL的版本3. STL的六大組件4.STL的缺陷5. string5.1 C語言中的字符串5.2 1個OJ題 6.標準庫中的string類6.1 string類(了解)6.2 string類的常用接口說明1.string類對象的常見構造函數2.析構函數(~string())3.賦值函數 (operator) 6.3 stri…