Debezium日常分享系列之:在 Kubernetes 中使用 Debezium 的 CDC
- 架構
- 源數據庫
- 創建數據庫憑證密鑰
- Debezium 自定義鏡像
- 構建并推送鏡像
- Kafka Connect 集群
- Debezium Postgres 連接器
- Debezium 創建的 Kafka 主題
Debezium 是一個開源的分布式變更數據捕獲 (CDC) 平臺。Debezium 充當實時 CDC 引擎,實時捕獲插入、更新和刪除操作。
Debezium 提供多種部署選項,但使用 Kafka Connect 尤其有利,尤其是在數據復制方面。本文介紹此方法,因為它可以將現有數據和變更數據捕獲 (CDC) 從源數據庫無縫傳輸到 Kafka 主題中。從那里,數據可以高效地復制到目標數據庫。通過將 Kafka Connect 與 Debezium 結合使用,組織可以實現強大且可擴展的數據復制解決方案,從而有效地跨不同系統同步數據。
本演示探討了如何在 Kubernetes 環境中部署 Debezium,以從本地 Postgres 數據庫捕獲 CDC 數據,并將更改傳輸到 Kafka 主題。
使用 Strimzi 在 Kubernetes 中部署 Kafka。此 Kafka 集群將用于部署 Kafka Connect 并使用 Debezium。
架構
Kafka Connect 是一個框架和運行時環境,用于促進 Kafka 生態系統內的數據移動。它支持兩項關鍵功能:
- 源連接器:這些連接器(例如 Debezium)支持將來自各種源的記錄傳輸到 Kafka 主題中。
- 接收器連接器:相反,接收器連接器將記錄從 Kafka 主題傳輸到其他目標系統。
Kafka Connect 作為 Kafka Broker 的獨立服務運行。
默認情況下,數據庫表的更改會被發送到以表本身命名的 Kafka 主題。但是,Debezium 允許通過配置調整實現靈活的主題路由。例如,用戶可以:
- 將記錄路由到名稱與其來源表不同的主題。
- 將來自多個表的更改事件記錄合并到一個主題中。
一旦更改事件記錄駐留在 Apache Kafka 中,Kafka Connect 生態系統中的各種連接器就可以將它們流式傳輸到各種系統和數據庫,包括 Elasticsearch、數據倉庫、分析平臺或像 Infinispan 這樣的緩存解決方案。根據所選的接收器連接器,可能需要配置 Debezium 新的記錄狀態提取轉換功能。
源數據庫
有關所有支持的 Debezium 連接器,請參閱此官方文檔 - https://debezium.io/documentation/reference/stable/connectors/index.html
本指南展示了如何使用托管在本地計算機上的 Postgres 數據庫進行數據捕獲。雖然 Postgres 也可以部署在 Kubernetes 內部,但您可以參考《Kubernetes (K8s) 上的 Postgres》指南。要訪問 Kubernetes 集群外部(由 Docker Desktop 托管)的數據庫,您可以將 localhost 替換為數據庫主機名 host.docker.internal。通常,要從 Kubernetes 集群外部訪問數據庫,可以使用 ExternalName 等服務發現機制。
創建數據庫憑證密鑰
db-secret.yaml
apiVersion: v1
kind: Secret
metadata:name: debezium-secretnamespace: kafka
type: Opaque
stringData:username: dbzuserpassword: dbzpass
apiVersion: v1
: 這指定了使用的 API 版本。對于Secret
對象,v1
是一個常用版本。kind: Secret
: 指明這是一個Secret
類型的資源。metadata:
: 這個部分包含了描述Secret
的元數據。name: debezium-secret
: 指定Secret
的名稱為debezium-secret
。namespace: kafka
: 指定Secret
所屬的命名空間為kafka
。Kubernetes 中的資源可以通過命名空間進行隔離,以實現多租戶管理。type: Opaque
: 指定Secret
的類型為Opaque
。Opaque
類型的Secret
通常用于存儲任意的數據,如文本或二進制數據。stringData:
: 這是一個特殊的字段,允許你直接以字符串的形式提供數據,而不需要先對其進行 base64 編碼。Kubernetes 會自動處理編碼過程。username: dbzuser
: 指定用戶名為dbzuser
。password: dbzpass
: 指定密碼為dbzpass
。
創建一個 Role 來引用上述 secret,并創建一個 RoleBinding 將此角色綁定到 Kafka Connect 集群服務帳戶,以便 Kafka Connect 可以訪問該 secret。
db-rbac.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: kafka
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["debezium-secret"]verbs: ["get"]
在Kubernetes中,Role
是一種資源,用于定義特定命名空間內的權限。你提供的 YAML 文件定義了一個名為 connector-configuration-role
的角色,該角色具有對特定資源的訪問權限。下面是對這個 YAML 文件的逐行解釋:
- apiVersion 指定了 Kubernetes API 的版本。
rbac.authorization.k8s.io/v1
表示這是 RBAC(Role-Based Access Control,基于角色的訪問控制)API 的 v1 版本。 -
kind: Role
kind 指定了資源的類型。在這里,Role
表示這是一個角色資源。
-
metadata:
metadata 是一個包含元數據的字段,通常包括名稱、標簽等信息。name: connector-configuration-role
: 角色的名稱為connector-configuration-role
。
namespace: kafka
: 角色所屬的命名空間為kafka
。
-
rules:
rules 是一個列表,定義了該角色可以執行的操作和可以訪問的資源。
-
- apiGroups: [""]resources: ["secrets"]resourceNames: ["debezium-secret"]verbs: ["get"]
這是一個規則條目,定義了具體的權限:
- apiGroups: 指定了 API 組。這里使用空字符串
""
表示核心 API 組。 - resources: 指定了資源類型。這里指定的是
secrets
,即 Kubernetes 中的秘密資源。 - resourceNames: 指定了具體資源的名稱。這里指定的是
debezium-secret
,表示該角色只能訪問名為debezium-secret
的秘密資源。 - verbs: 指定了允許的操作。這里指定的是
get
,表示該角色可以獲取指定的秘密資源。
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: kafka
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: kafka
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
在Kubernetes中,RoleBinding
是一個資源對象,用于將特定的 Role
(角色)與一個或多個 Subject
(主體,如用戶、組或服務賬戶)綁定在一起。這使得主體能夠執行由該角色定義的操作。
- kind: RoleBinding這指定了資源的類型為
RoleBinding
,即角色綁定。 - 主體(Subjects)
-
- kind: ServiceAccount 指定了主體的類型為
ServiceAccount
(服務賬戶)。
- kind: ServiceAccount 指定了主體的類型為
- name: debezium-connect-cluster-connect這是服務賬戶的名稱。
-
- 角色引用(RoleRef)
- kind: Role指定了要綁定的角色類型為
Role
- name: connector-configuration-role這是要綁定的角色的名稱。
- apiGroup: rbac.authorization.k8s.io:這指定了角色所屬的API組。
rbac.authorization.k8s.io
是RBAC API的組。
總結:
這個 RoleBinding
配置文件的作用是將名為 connector-configuration-role
的角色綁定到名為 debezium-connect-cluster-connect
的服務賬戶上,綁定操作發生在 kafka
命名空間中。這意味著 debezium-connect-cluster-connect
服務賬戶將具有 connector-configuration-role
角色所定義的所有權限。
Strimzi 在 Kafka Connect 部署期間會自動創建一個服務帳戶。此服務帳戶名稱遵循特定格式:$KafkaConnectName-connect。由于我們將部署一個名為 debezium-connect-cluster 的 Kafka Connect 集群,因此相應的服務帳戶名稱為 debezium-connect-cluster-connect。
kubectl apply -f debezium/db-secret.yaml -n kafkakubectl apply -f debezium/db-rbac.yaml -n kafka
Debezium 自定義鏡像
要部署 Debezium 連接器,首先必須設置一個包含所需連接器插件的 Kafka Connect 集群。此過程包括為 Kafka Connect 創建包含所需插件的 Strimzi 容器鏡像,然后再實例化連接器本身。
請參閱官方文檔下載連接器 - https://debezium.io/documentation/reference/stable/install.html
有關預安裝的連接器鏡像,請參閱 Debezium 官方鏡像 - https://quay.io/organization/debezium
在本演示中,我將 Postgres 連接器下載到 debezium/plugins
注意:使用 TimestampConverter jar(來自 https://github.com/oryanmoshe/debezium-timestamp-converter)將所有時間數據類型(所有數據庫中的)轉換為您選擇的指定格式。
默認值:
"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"
當使用多個連接器時,必須將此 jar 單獨添加到所有 Debezium 連接器中。
DockerfileDebezium
#DockerfileFROM quay.io/strimzi/kafka:0.40.0-kafka-3.7.0
USER root:root
COPY debezium/plugins /opt/kafka/plugins/
USER 1001
Dockerfile 是一個用于構建 Docker 鏡像的腳本文件,其中包含了構建鏡像所需的一系列命令。每個命令都會在鏡像中創建一個新的層,使得最終的鏡像可以被構建、分發和運行。
FROM
指令指定了基礎鏡像,這里是使用了 Strimzi 提供的 Kafka 鏡像,版本號為0.40.0-kafka-3.7.0
。這意味著新的鏡像是基于這個現有的 Kafka 鏡像構建的,繼承了它所有的配置和內容。USER
指令用來設置執行后續指令的用戶。這里將用戶設置為root:root
,意味著接下來的操作將以超級用戶的權限進行。這通常是為了確保有足夠權限來執行某些特定的任務,比如安裝軟件或修改系統文件。COPY
指令從本地文件系統復制文件到鏡像中的指定路徑。這里的命令表示將本地目錄debezium/plugins
下的所有內容復制到鏡像內的/opt/kafka/plugins/
目錄下。這通常是用于添加應用程序所需的額外插件或依賴項。- USER 1001最后一條
USER
指令將用戶切換回1001
。這通常是一個非特權用戶,用于提高安全性,防止鏡像以 root 用戶運行時可能帶來的安全風險。在生產環境中,盡量避免以 root 用戶運行容器是一個好的實踐。
總結來說,這段 Dockerfile 的主要目的是在基于 Strimzi Kafka 鏡像的基礎上,添加 Debezium 插件,并且確保這些操作是以 root 權限完成的,但在最終運行時切換到一個較低權限的用戶以增加安全性。
構建并推送鏡像
docker build -t osds-debezium -f debezium/DockerfileDebezium .
docker login
docker tag osds-debezium howdytech01/osds:osds-debezium
docker push howdytech01/osds:osds-debezium
docker build
: 這是用于構建 Docker 鏡像的命令。-t osds-debezium
:-t
參數用于給構建的鏡像打標簽。這里的標簽是osds-debezium
,這意味著構建完成后,鏡像的名稱將為osds-debezium
。-f debezium/DockerfileDebezium
:-f
參數指定要使用的Dockerfile
文件。在這個例子中,Dockerfile
文件位于debezium/
目錄下,并且文件名為DockerfileDebezium
。.
: 這個點表示構建上下文目錄,即 Docker 構建過程中會訪問的文件和目錄。在這里,當前目錄 (.
) 是構建上下文。docker push
: 這個命令用于將鏡像推送到 Docker 注冊表(通常是 Docker Hub)。howdytech01/osds:osds-debezium
: 這是你希望推送的鏡像的完整名稱。推送成功后,其他用戶可以通過這個名稱從 Docker Hub 拉取該鏡像。
Kafka Connect 集群
使用 Strimzi Operator 創建 Kafka Connect 集群。
debezium-connect-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"
spec:version: 3.7.0image: howdytech01/osds:osds-debeziumreplicas: 1bootstrapServers: osds-cluster-kafka-bootstrap:9092config:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1
- kind 指定了資源的類型。在這里,
KafkaConnect
表示這是一個Kafka Connect集群的資源。 - image:
howdytech01/osds:osds-debezium
指定Kafka Connect使用的Docker鏡像。 - replicas:
1
指定Kafka Connect集群的副本數。這里設置為1,表示只有一個Kafka Connect實例。 - config.providers:
secrets
配置提供者,這里使用了secrets
。 - config.providers.secrets.class:
io.strimzi.kafka.KubernetesSecretConfigProvider
配置提供者的類,這里指定了一個Kubernetes Secret配置提供者,用于從Kubernetes Secret中讀取配置信息。 - group.id:
connect-cluster
指定Kafka Connect集群的組ID。
此配置設置了一個名為 debezium-connect-cluster 的 Kafka Connect 集群,具有特定的配置,并指向 Kafka 引導服務器進行連接。
kubectl apply -f debezium/debezium-connect-cluster.yaml -n kafka
注意:
配置指定將 bootstrapServers 設置為 osds-cluster-kafka-bootstrap:9092。這表明此處使用的是本演示中在 Kubernetes 中創建的 Strimzi Kafka 集群。
需要注意的是,我們已經配置了 Strimzi Secret 提供程序。該提供程序會自動為 Kafka Connect 集群生成一個服務帳號,該帳號已與必要的角色關聯。此設置使 Kafka Connect 能夠安全地訪問包含敏感信息(例如數據庫憑據)的 Secret 對象。
Debezium Postgres 連接器
請參閱此頁面以在 Postgres 上設置并啟用 CDC 日志記錄 - https://debezium.io/documentation/reference/stable/connectors/postgresql.html
使用以下配置創建一個 KafkaConnector
postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-postgreslabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.postgresql.PostgresConnectortasksMax: 1config:tasks.max: 1database.hostname: host.docker.internaldatabase.port: 5432database.user: ${secrets:kafka/debezium-secret:username}database.password: ${secrets:kafka/debezium-secret:password}database.dbname: movie_rental_dbtopic.prefix: movie_rental_dbplugin.name: pgoutputpublication.autocreate.mode: filteredtable.include.list: public.actorkey.converter.schemas.enable: falsevalue.converter.schemas.enable: falsesnapshot.mode: alwaysmessage.key.columns: public.actor:actor_idtransforms: unwraptransforms.unwrap.type: io.debezium.transforms.ExtractNewRecordStatetransforms.unwrap.add.fields: op:_meta_op,table:_meta_table,lsn:_meta_lsn,source.ts_ms:_meta_event_ts,schema:_meta_schematransforms.unwrap.add.headers: dbtransforms.unwrap.delete.handling.mode: rewritedelete.tombstone.handling.mode: rewriteconverters: timestampConverter,timestampConverter.type: oryanmoshe.kafka.connect.util.TimestampConverterkey.converter: org.apache.kafka.connect.json.JsonConvertervalue.converter: org.apache.kafka.connect.json.JsonConverter
kind: KafkaConnector
:kind 指定了資源的類型,這里是KafkaConnector
,表示這是一個Kafka連接器。- class: 連接器的類名,這里是
io.debezium.connector.postgresql.PostgresConnector
,表示這是一個用于PostgreSQL的Debezium連接器。 - tasksMax: 連接器的最大任務數,這里設置為
1
。 - tasks.max: 連接器的任務數,與
tasksMax
相同,設置為1
。 - database.hostname: PostgreSQL數據庫的主機名,這里是
host.docker.internal
,通常表示Docker內部網絡中的主機。 - database.port: PostgreSQL數據庫的端口,這里是
5432
。 - database.user: 數據庫用戶名,使用了環境變量
${secrets:kafka/debezium-secret:username}
,表示從Kubernetes Secrets中獲取用戶名。 - database.password: 數據庫密碼,使用了環境變量
${secrets:kafka/debezium-secret:password}
,表示從Kubernetes Secrets中獲取密碼。 - database.dbname: 要連接的數據庫名稱,這里是
movie_rental_db
。 - topic.prefix: 生成的Kafka主題的前綴,這里是
movie_rental_db
。 - plugin.name: PostgreSQL的復制插件名稱,這里是
pgoutput
。 - publication.autocreate.mode: 自動創建發布表的模式,這里是
filtered
,表示只包含指定的表。 - table.include.list: 需要捕獲變化的表列表,這里是
public.actor
。 - key.converter.schemas.enable: 是否啟用鍵的Schema轉換,這里是
false
。 - value.converter.schemas.enable: 是否啟用值的Schema轉換,這里是
false
。 - snapshot.mode: 快照模式,這里是
always
,表示每次啟動連接器時都進行全量快照。 - message.key.columns: 消息鍵的列,這里是
public.actor:actor_id
。 - transforms: 定義了一系列的轉換操作。
- unwrap: 使用
io.debezium.transforms.ExtractNewRecordState
變換器來提取新記錄的狀態。 - add.fields: 添加額外的字段,例如
_meta_op
,_meta_table
,_meta_lsn
,_meta_event_ts
,_meta_schema
。 - add.headers: 添加額外的頭信息,例如
db
。 - delete.handling.mode: 刪除處理模式,這里是
rewrite
。 - delete.tombstone.handling.mode: 刪除墓碑的處理模式,這里是
rewrite
。 - converters: 定義了自定義的轉換器,這里是
timestampConverter
。 - timestampConverter.type: 自定義轉換器的類名,這里是
oryanmoshe.kafka.connect.util.TimestampConverter
。 - key.converter: 鍵的轉換器,這里是
org.apache.kafka.connect.json.JsonConverter
。 - value.converter: 值的轉換器,這里是
org.apache.kafka.connect.json.JsonConverter
。
此配置設置了一個名為 debezium-connector-postgres 的 KafkaConnector,并具有用于連接 PostgreSQL 數據庫和捕獲 public.actor 表的更改的特定配置。
Debezium 創建的 Kafka 主題
從 Kafka 集群終端驗證
kubectl exec -n kafka -i -t osds-cluster-kafka-0 -- /bin/bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092