Docker安裝CDC
- 拉取鏡像
- 離線形式安裝
- 上傳文件并創建docker-compose.yml
- 把鏡像加載到docker中
- 啟動容器
- 連接數據庫
- 創建賬號,并給賬號授權
- 設置wal_level
- 確認wal_level的值
- 創建鏈接
- 查詢連接狀態
- 使用kafdrop
- 消息中看不到修改之前的信息怎么辦
- 補充
拉取鏡像
docker pull confluentinc/cp-zookeeper:7.5.0
docker pull confluentinc/cp-kafka:7.5.0
docker pull debezium/connect:2.7.0.Final
docker pull obsidiandynamics/kafdrop:latest
拉取鏡像需要VPN,如果沒有VPN可以從我的網盤中下載
通過網盤分享的文件:kafka以及cdc和kafdrop
鏈接: https://pan.baidu.com/s/10OU_4cy7mWtaKAijGpakfQ?pwd=asn5 提取碼: asn5
離線形式安裝
上傳文件并創建docker-compose.yml
把下載好的tar包上傳到服務器某個目錄下,并在這個目錄下創建docker-compose.yml文件
如果是linux環境內容如下:
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: ["CMD-SHELL", "nc -z zookeeper 2181 || exit 1"]interval: 5stimeout: 3sretries: 5kafka:image: confluentinc/cp-kafka:7.5.0depends_on:zookeeper:condition: service_healthy # 等 zk 完全就緒ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 # Docker Desktop / WindowsKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1healthcheck:test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 || exit 1"]interval: 10stimeout: 5sretries: 5connect:image: debezium/connect:2.7.0.Finaldepends_on:kafka:condition: service_healthy # 等 kafka 完全就緒ports:- "8083:8083"environment:BOOTSTRAP_SERVERS: kafka:9092GROUP_ID: 1CONFIG_STORAGE_TOPIC: my_connect_configsOFFSET_STORAGE_TOPIC: my_connect_offsetsSTATUS_STORAGE_TOPIC: my_connect_statuseskafdrop:image: obsidiandynamics/kafdropports:- "9000:9000"environment:KAFKA_BROKERCONNECT: kafka:9092 # 用服務名,同一網絡自動解析depends_on:- kafka
如果是windows環境內容如下:
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: ["CMD-SHELL", "nc -z zookeeper 2181 || exit 1"]interval: 5stimeout: 3sretries: 5kafka:image: confluentinc/cp-kafka:7.5.0depends_on:zookeeper:condition: service_healthy # 等 zk 完全就緒ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://host.docker.internal:9092 # Docker Desktop / WindowsKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1healthcheck:test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 || exit 1"]interval: 10stimeout: 5sretries: 5connect:image: debezium/connect:2.7.0.Finaldepends_on:kafka:condition: service_healthy # 等 kafka 完全就緒ports:- "8083:8083"environment:BOOTSTRAP_SERVERS: host.docker.internal:9092GROUP_ID: 1CONFIG_STORAGE_TOPIC: my_connect_configsOFFSET_STORAGE_TOPIC: my_connect_offsetsSTATUS_STORAGE_TOPIC: my_connect_statuseskafdrop:image: obsidiandynamics/kafdropports:- "9000:9000"environment:KAFKA_BROKERCONNECT: host.docker.internal:9092 # 用服務名,同一網絡自動解析depends_on:- kafka
把鏡像加載到docker中
請在命令行中進入tar包所在的文件夾,執行以下命令把鏡像加載到docker
docker load < zookeeper.tar
docker load < kafka.tar
docker load < connect.tar
docker load < kafdrop.tar
啟動容器
docker compose up -d
# 老版本 docker-compose 命令是:docker-compose up -d
啟動完成后,請使用docker ps查看四個容器是否都啟動了。如果發現容器沒有起來,請【docker logs 容器id】查看報錯日志,并把日志放入deepseek中查找原因。
連接數據庫
我這里連的postgres 10,其他版本或者其他數據庫請deepseek,大同小異
創建賬號,并給賬號授權
CREATE USER debezium WITH PASSWORD 'dbz_pass' REPLICATION;
ALTER USER debezium SUPERUSER;
設置wal_level
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
確認wal_level的值
- 重啟postgres,如果是docker安裝的,可以docker restart 容器id
- 執行SHOW wal_level查看結果是不是“logical”。如果不是請deepseek。
創建鏈接
POST http://connect所在服務器ip:8083/connectors
body:【請仔細查看body內容,替換成實際的信息】
{"name": "pg-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","tasks.max": "1","database.hostname": "數據庫ip","database.port": "數據庫端口","database.user": "debezium","database.password": "dbz_pass","database.dbname": "數據庫名","topic.prefix": "dbserver1","plugin.name": "pgoutput","table.include.list": "public.table_a,public.table_b","snapshot.mode": "initial"}
}
查詢連接狀態
GET http://connect所在服務器ip:8083/connectors/pg-connector/status
成功的結果:
{"name": "pg-connector","connector": {"state": "RUNNING","worker_id": "172.20.0.5:8083"},"tasks": [{"id": 0,"state": "RUNNING","worker_id": "172.20.0.5:8083"}],"type": "source"
}
如果結果里兩個status都是RUNNING,則表示成功,否則拿trace里的內容deepseek
使用kafdrop
· 這里使用的kafka圖形化界面是kafdrop,也可根據習慣使用別的工具
· 訪問地址 http://安裝的服務器ip:9000
· 上面所有步驟都成功后,去數據庫指定的表中修改一條數據
· 指定的表指的是:"table.include.list"中指定的表
· 然后刷新kafdrop,可以看到dbserver1開頭的topic
消息中看不到修改之前的信息怎么辦
在數據庫執行以下語句
DO $$
DECLAREr RECORD;
BEGINFOR r INSELECT schemaname, tablenameFROM pg_tablesWHERE schemaname = 'public'AND tablename IN ('tb_A', 'tb_B')LOOPEXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', r.schemaname, r.tablename);END LOOP;
END$$;
補充
connect提供了創建鏈接接口,查詢鏈接狀態接口,刪除鏈接接口,查看鏈接配置接口和更新鏈接配置接口,接口文檔如下:https://docs.apipost.net/docs/detail/4de10f8eac12000?locale=zh-cn&target_id=1dd669507cb037