前言:
Debezium SQL Server連接器捕獲SQL Server數據庫模式中發生的行級更改。
官方2.0文檔:
Debezium connector for SQL Server :: Debezium Documentation
有關與此連接器兼容的SQL Server版本的信息,請參閱
SQL Server | Database: 2017, 2019
JDBC Driver: 10.2.1.jre8
| Database: 2017, 2019
JDBC Driver: 9.4.1.jre8 |
Debezium SQL Server連接器首次連接到SQL Server數據庫或集群時,它會對數據庫中的模式進行一致的快照。初始快照完成后,連接器會連續捕獲提交到CDC啟用的SQL Server數據庫的INSERT、UPDATE或DELETE操作的行級更改。該連接器為每個數據更改操作生成事件,并將其流式傳輸到Kafka主題。該連接器將表的所有事件流式傳輸到一個專門的Kafka主題。然后,應用程序和服務可以消耗該主題的數據更改事件記錄。
要使Debezium SQL Server連接器捕獲數據庫操作的更改事件記錄,您必須首先在SQL Server數據庫上啟用更改數據捕獲。必須在數據庫和要捕獲的每個表上啟用CDC。在源數據庫上設置CDC后,連接器可以捕獲數據庫中發生的行級INSERT、UPDATE和DELETE操作。該連接器將每個源表的事件記錄寫入專門用于該表的Kafka主題。每個捕獲的表都有一個主題。客戶端應用程序讀取他們遵循的數據庫表的Kafka主題,并可以響應他們從這些主題中消耗的行級事件。
連接器首次連接到SQL Server數據庫或集群時,它需要對其配置為捕獲更改的所有表的模式進行一致的快照,并將此狀態流式傳輸到Kafka。快照完成后,連接器會持續捕獲隨后發生的行級更改。通過首先建立所有數據的一致視圖,連接器可以繼續讀取,而不會丟失快照發生時所做的任何更改。
Debezium SQL Server連接器可以容忍故障。當連接器讀取更改并生成事件時,它會定期記錄事件在數據庫日志中的位置(LSN/日志序列號)。如果連接器因任何原因(包括通信故障、網絡問題或崩潰)而停止,則在重新啟動后,連接器將從讀取的最后一點恢復讀取SQL Server CDC表。
抵消定期提交。它們不是在更改事件發生時提交的。因此,在中斷后,可能會生成重復的事件。
容錯也適用于快照。也就是說,如果連接器在快照期間停止,則連接器在重新啟動時開始新的快照。
?
安裝sqlserver linux
1,環境centos7.2以上,2 GB+ 的內存
2,rpm安裝包下載:Index of /rhel/7/mssql-server-2017/
3,下載并上傳到linux環境下
wget https://packages.microsoft.com/rhel/7/mssql-server-2017/mssql-server-14.0.1000.169-2.x86_64.rpm
rpm -ivh mssql-server-14.0.1000.169-2.x86_64.rpm
提示缺少依賴包 把包補上
rpm -ivh繼續安裝,根據提示執行sudo /opt/mssql/bin/mssql-conf setup
[root@node01 opt]# sudo /opt/mssql/bin/mssql-conf setup
選擇 SQL Server 的一個版本: 1) Evaluation (免費,無生產許可,180 天限制) 2) Developer (免費,無生產許可) 3) Express (免費) 4) Web (付費版) 5) Standard (付費版) 6) Enterprise (付費版) 7) Enterprise Core (付費版) 8) 我通過零售渠道購買了許可證并具有要輸入的產品密鑰。 可在以下位置找到有關版本的詳細信息:
https://go.microsoft.com/fwlink/?LinkId=852748&clcid=0x804 使用此軟件的付費版本需要通過以下途徑獲取單獨授權
Microsoft 批量許可計劃。
選擇付費版本即表示你具有適用的
要安裝和運行此軟件的就地許可證數量。 輸入版本(1-8):1
可以在以下位置找到此產品的許可條款:
/usr/share/doc/mssql-server 或從以下位置下載:
https://go.microsoft.com/fwlink/?LinkId=855864&clcid=0x804 可以從以下位置查看隱私聲明:
https://go.microsoft.com/fwlink/?LinkId=853010&clcid=0x804 接受此許可條款嗎? [Yes/No]:yes 選擇 SQL Server 的語言:
(1) English
(2) Deutsch
(3) Espa?ol
(4) Fran?ais
(5) Italiano
(6) 日本語
(7) ???
(8) Português
(9) Русский
(10) 中文 – 簡體
(11) 中文 (繁體)
輸入選項 1-11:10
輸入 SQL Server 系統管理員密碼:DTstack@123
確認 SQL Server 系統管理員密碼:DTstack@123
正在配置 SQL Server…
Created symlink from /etc/systemd/system/multi-user.target.wants/mssql-server.service to /usr/lib/systemd/system/mssql-server.service.
安裝程序已成功完成。SQL Server 正在啟動。
[root@node01 opt]#
驗證是否啟動
systemctl status mssql-server
安裝 SQL Server 命令行工具
若要創建數據庫,需要使用一個能夠在 SQL Server 上運行 Transact-SQL 語句的工具進行連接。 以下步驟安裝 SQL Server 命令行工具: sqlcmd和bcp。
下載 Microsoft Red Hat 存儲庫配置文件。
sudo curl -o /etc/yum.repos.d/msprod.repo https://packages.microsoft.com/config/rhel/7/prod.repo
—————
運行以下命令以安裝 mssql-tools 和 unixODBC 開發人員包。
sudo yum install -y mssql-tools unixODBC-devel
為方便起見,請將 /opt/mssql-tools/bin/ 添加到 PATH 環境變量。 這樣就可以在運行工具時不指定完整路徑。 請運行以下命令,以便修改登錄會話和交互/非登錄會話的 PATH:
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bash_profile
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
SQLServer基本命令
本地連接
以下步驟使用 sqlcmd 本地連接到新的 SQL Server 實例。
使用 SQL Server 名稱 (-S),用戶名 (-U) 和密碼 (-P) 的參數運行 sqlcmd。 在本教程中,用戶進行本地連接,因此服務器名稱為 localhost。 用戶名為 SA,密碼是在安裝過程中為 SA 帳戶提供的密碼。
sqlcmd -S localhost -U SA -P 密碼 # 用命令行連接
如果成功,應會顯示 sqlcmd 命令提示符:1>。
提示 可以在命令行上省略密碼,以收到密碼輸入提示。 如果以后決定進行遠程連接,請指定 -S 參數的計算機名稱或 IP 地址,并確保防火墻上的端口 1433 已打開。
SQLServer基本命令
(1) 建庫
> create database TestDB
> go
(2) 看當前數據庫列表
> select * from SysDatabases
> go
(3) 看當前數據表
> use 庫名
> select * from sysobjects where xtype='u'
> go
(4) 看表的內容
> select * from 表名;
> go
插入數據
接下來創建一個新表 mgmg,然后插入兩個新行。
在 sqlcmd 命令提示符中,將上下文切換到新的 TestDB 數據庫:
USE TestDB
創建名為 mgmg 的新表:
CREATE TABLE mgmg (id INT, name NVARCHAR(50), quantity INT)
將數據插入新表:
INSERT INTO mgmg VALUES (1, 'banana', 150); INSERT INTO mgmg VALUES (2, 'orange', 154);
要執行上述命令的類型 GO:
GO
在 SQL Server 數據庫上啟用 CDC
?
在為表啟用 CDC 之前,您必須為 SQL Server 數據庫啟用它。 SQL Server 管理員通過運行系統存儲過程來啟用 CDC。系統存儲過程可以使用 SQL Server Management Studio 或 Transact-SQL 運行。
先決條件
您是 SQL Server 的 sysadmin 固定服務器角色的成員。
您是數據庫的 db_owner。
SQL Server 代理正在運行。
注意:
SQL Server CDC 功能僅處理用戶創建的表中發生的更改。您不能在 SQL Server 主數據庫上啟用 CDC。
程序
1.從 SQL Server Management Studio 的“查看”菜單中,單擊“模板資源管理器”。
2.在模板瀏覽器中,展開 SQL Server 模板。
3.展開更改數據捕獲 > 配置,然后單擊為 CDC 啟用數據庫。
4.在模板中,將 USE 語句中的數據庫名稱替換為要為 CDC 啟用的數據庫名稱。
5.運行存儲過程 sys.sp_cdc_enable_db 為 CDC 啟用數據庫。
為 CDC 啟用數據庫后,將創建名為 cdc 的模式,以及 CDC 用戶、元數據表和其他系統對象。
以下示例顯示如何為
數據庫 TestDB 啟用 CDC:
USE TestDB
GO
EXEC sys.sp_cdc_enable_db
GO
在 SQL Server 表上啟用 CDC
SQL Server 管理員必須在您希望 Debezium 捕獲的源表上啟用更改數據捕獲。數據庫必須已為 CDC 啟用。要在表上啟用 CDC,SQL Server 管理員為該表運行存儲過程 sys.sp_cdc_enable_table。存儲過程可以使用 SQL Server Management Studio 或 Transact-SQL 運行。必須為要捕獲的每個表啟用 SQL Server CDC。
先決條件:
CDC 在 SQL Server 數據庫上啟用。
SQL Server 代理正在運行。
您是數據庫的 db_owner 固定數據庫角色的成員。
程序
從 SQL Server Management Studio 的“查看”菜單中,單擊“模板資源管理器”。
在模板瀏覽器中,展開 SQL Server 模板。
展開更改數據捕獲 > 配置,然后單擊啟用表指定文件組選項。
在模板中,將 USE 語句中的表名替換為您要捕獲的表名。
運行存儲過程 sys.sp_cdc_enable_table。
以下示例顯示如何為表 mgmg 啟用 CDC:
示例:為 SQL Server 表啟用 CDC
USE TestDB
GO EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'mgmg',
@role_name = N'NULL',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
?
示例
USE MyDB
GO EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
指定要捕獲的表的名稱。
指定角色 MyRole,您可以向該角色添加要授予對源表的捕獲列的 SELECT 權限的用戶。具有 sysadmin 或 db_owner 角色的用戶還可以訪問指定的更改表。將 @role_name 的值設置為 NULL,以僅允許 sysadmin 或 db_owner 中的成員對捕獲的信息具有完全訪問權限。
指定 SQL Server 為捕獲的表放置更改表的文件組。命名的文件組必須已經存在。最好不要將更改表放在用于源表的同一文件組中。
Ps文件組
1. 如何查看數據庫中所有的文件組。 語法:sp_helpfilegroup 步驟: use 數據庫 sp_helpfilegroup
2. 如何找到文件組和文件的對應情況. sp_helpdb love 創建文件組。
語法: alter database 數據庫名 add filegroup 文件組名
步驟: use 數據庫名 alter database 數據庫名 add filegroup 文件組名
范例:
use love
alter database TestDB add filegroup 財務部
?
SQL Server 管理員可以運行系統存儲過程來查詢數據庫或表以檢索其 CDC 配置信息。存儲過程可以使用 SQL Server Management Studio 或 Transact-SQL 運行。
先決條件
您對捕獲實例的所有捕獲列具有 SELECT 權限。 db_owner 數據庫角色的成員可以查看所有已定義捕獲實例的信息。
您擁有為查詢包括的表信息定義的任何門控角色的成員資格。
程序
從 SQL Server Management Studio 的“查看”菜單中,單擊“對象資源管理器”。
在對象資源管理器中,展開數據庫,然后展開您的數據庫對象,例如 MyDB。
展開可編程性 > 存儲過程 > 系統存儲過程。
運行 sys.sp_cdc_help_change_data_capture 存儲過程來查詢表。
查詢不應返回空結果。
以下示例在數據庫 TestDB 上運行存儲過程 sys.sp_cdc_help_change_data_capture:
示例:
查詢表以獲取 CDC 配置信息
USE TestDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
該查詢返回數據庫中每個表的配置信息,這些表為 CDC 啟用并且包含調用者有權訪問的更改數據。如果結果為空,請驗證用戶是否具有訪問捕獲實例和 CDC 表的權限。
部署Debezium SQL Server連接器
要部署Debezium SQL Server連接器,請安裝Debezium SQL Server連接器存檔,配置連接器,并通過將其配置添加到Kafka Connect來啟動連接器。
先決條件
安裝了Apache ZooKeeper、Apache Kafka和Kafka Connect。
SQL Server已安裝,已配置為CDC,并準備與Debezium連接器一起使用。
程序
下載Debezium SQL Server連接器插件存檔
Wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
將文件提取到您的Kafka Connect環境中。
將帶有JAR文件的目錄添加到Kafka Connect的plugin.path中。
配置連接器并將配置添加到您的Kafka Connect集群中。
重新啟動您的Kafka Connect流程以提取新的JAR文件。
解壓插件包
tar -xvf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
配置kafka,讓kafka知道這個插件.
[root@hhz02 config]# vi connect-distributed.properties
配置注意項:(單機模式啟動連接器 connect-standlone.properties)
bootstrap.servers=172.16.120.17:9092
offset.storage.topic=connect-sqlserver-status
offset.storage.replication.factor=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
status.storage.topic=connect-sqlserver-statu
status.storage.replication.factor=1
config.storage.topic=connect-sqlserver-config
config.storage.replication.factor=1
group.id=connect-sqlserver
offset.flush.interval.ms=10000
plugin.path=/opt/debezium-connector-sqlserver
rest.port=8083
Ps:
如果kafka開啟kerberos 該配置文件還需要添加:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka還需要再啟動腳本connect-distributed.sh中添加:
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/dtstack/Kafka/kafka/config/kafka_jaas.conf"
注意:集群模式kafka 需要注意配置分發。
重啟kafka集群使配置生效。
啟動Kafka connector
/opt/dtstack/DTBase/kafka/bin/connect-distributed.sh -daemon /opt/dtstack/DTBase/kafka/config/connect-sqlserver.properties
注意結尾這個文件 集群模式為:connect-distributed.properties,單機模式為:connect-standalone.properties
(如果啟動有問題 觀察/opt/app/kafka/logs/connectDistributed.out)
檢測Kafka connector是否正常工作
1. 檢測kafka連接器的服務狀態
[root@node01 logs]# curl -H "Accept:application/json" node01:8083/
{"version":"1.1.1","commit":"8e07427ffb493498","kafka_cluster_id":"svXhhAFkSt6xLRPTBIdE1Q"}
2. 檢查向 Kafka Connect 注冊的連接器列表
[root@hdp01 kafka]# curl -H "Accept:application/json" node01:8083/connectors/
[]
返回空列表, 表示目前還沒有注冊的連接器
附:刪除命令
curl -X DELETE node01:8083/connectors/db2-connector1
SQL Server 連接器配置示例
以下是連接器實例的配置示例,該實例在 172.16.120.17 的端口 1433 上從 SQL Server 服務器捕獲數據,我們在邏輯上將其命名為 fullfillment。通常,您通過設置可用于連接器的配置屬性,在 JSON 文件中配置 Debezium SQL Server 連接器。
注冊連接器
Kafka Connect 服務的 API 提交POST針對/connectors資源的請求,其中包含描述新連接器(稱為inventory-connector)的 JSON 文檔。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" node01:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "172.16.120.17", "database.port": "1433", "database.user": "SA", "database.password": "Dtstack@123", "database.dbname": "TestDB", "database.server.name": "fullfillment", "table.include.list": "dbo.mgmg", "database.history.kafka.bootstrap.servers": "node01:9092", "database.history.kafka.topic": "dbhistory.fullfillment" }
}'
(如果注冊有問題 觀察/opt/app/kafka/logs/connectDistributed.out)
多看下日志 日志中都可以 寫到 如果注冊成功,也沒有kafka數據 麻煩看下日志WARN中過濾問題。[sourceTableId=DB2INST1.TESTTB, changeTableId=ASNCDC.CDC_DB2INST1_TESTTB
]
?
?
查看kafka中的數據
/opt/dtstack/DTBase/kafka/bin/kafka-topics.sh -list -zookeeper localhost:2181/kafka
/opt/dtstack/DTBase/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic fullfillment.dbo.mgmg
?
?
?
?
?
?
?