Flink SQLServer CDC 環境配置與驗證

一、SQL Server 數據庫核心配置
1. 啟用 CDC 功能(Change Data Capture)

SQL Server CDC 依賴數據庫級別的 CDC 功能及表級別的捕獲配置,需按以下步驟啟用:

啟用數據庫 CDC

-- 以管理員身份連接數據庫
USE master;
GO-- 檢查數據庫是否已啟用CDC
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'MyDB' AND is_cdc_enabled = 1)
BEGINEXEC sys.sp_cdc_enable_db;PRINT 'CDC已啟用';
END
ELSEPRINT 'CDC已啟用';
GO

啟用表級 CDC(以dbo.Orders表為例)

USE MyDB;
GO-- 確保SQL Agent服務已啟動(CDC依賴Agent作業)
EXEC sys.sp_cdc_enable_table@source_schema = N'dbo',          -- 表所屬模式@source_name = N'Orders',         -- 表名@role_name = N'cdc_reader',       -- 授權角色(可設為NULL使用默認權限)@filegroup_name = N'MyDB_CT',     -- 存儲變更表的文件組(需提前創建)@supports_net_changes = 0;        -- 是否支持凈變更(0為不支持)
GO-- 驗證CDC配置
EXEC sys.sp_cdc_help_change_data_capture;
GO

創建文件組(若不存在)

USE MyDB;
GO
IF NOT EXISTS (SELECT 1 FROM sys.filegroups WHERE name = N'MyDB_CT')
BEGINALTER DATABASE MyDB ADD FILEGROUP MyDB_CT;ALTER DATABASE MyDB ADD FILE (NAME = N'MyDB_CT', FILENAME = N'C:\Data\MyDB_CT.ndf') TO FILEGROUP MyDB_CT;
END
GO
2. 創建專用用戶并授權
-- 創建用戶
CREATE LOGIN flinkuser WITH PASSWORD = 'Flink@123';
CREATE USER flinkuser FOR LOGIN flinkuser;-- 授予數據庫訪問權限
ALTER ROLE db_owner ADD MEMBER flinkuser;  -- 生產環境建議細化權限
GRANT SELECT ON ALL TABLES IN SCHEMA dbo TO flinkuser;-- 授予CDC相關權限
GRANT VIEW SERVER STATE TO flinkuser;
GRANT SELECT ON sys.change_tables TO flinkuser;
GO
二、Flink 環境集成配置
1. 添加Maven依賴
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-sqlserver-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency>
2. SQL Client部署
  1. 下載JAR包:flink-sql-connector-sqlserver-cdc-3.0.1.jar
  2. 將JAR包放入$FLINK_HOME/lib/目錄后重啟Flink集群。
三、Flink SQL 表定義與參數詳解
1. 完整建表示例(含元數據列)
-- 配置checkpoint(可選)
SET 'execution.checkpointing.interval' = '5s';-- 創建SQL Server CDC表
CREATE TABLE sqlserver_orders (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,-- 元數據列:捕獲變更信息db_name STRING METADATA FROM 'database_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector' = 'sqlserver-cdc','hostname' = '192.168.1.100','port' = '1433','username' = 'flinkuser','password' = 'Flink@123','database-name' = 'MyDB','table-name' = 'dbo.orders','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true'
);
2. 核心參數詳解
參數名必選默認值類型說明
connectorString固定為sqlserver-cdc
hostnameStringSQL Server服務器IP或域名
usernameString連接數據庫的用戶名(需具備CDC讀取權限)
passwordString連接數據庫的密碼
database-nameString數據庫名稱(如MyDB
table-nameString表名(格式:schema.table,如dbo.orders
port1433Integer數據庫端口號
server-time-zoneUTCString數據庫時區(如Asia/Shanghai),影響TIMESTAMP轉換
scan.incremental.snapshot.enabledtrueBoolean啟用增量快照(并行讀取,需主鍵),默認開啟
debezium.snapshot.modeinitialString快照模式:initial(結構+數據)、initial-only(僅快照)、latest-offset(僅結構)
四、環境驗證與測試
1. 準備測試數據
-- 創建測試表(已啟用CDC)
USE MyDB;
GO
CREATE TABLE dbo.orders (id INT PRIMARY KEY,order_date DATE,purchaser INT,quantity INT,product_id INT,update_time DATETIME
);-- 插入測試數據
INSERT INTO dbo.orders VALUES 
(1, '2023-01-01', 101, 5, 1001, GETDATE()),
(2, '2023-01-02', 102, 3, 1002, GETDATE());
GO
2. Flink SQL 驗證
-- 查詢CDC表(首次觸發快照讀取)
SELECT * FROM sqlserver_orders;-- 在SQL Server中更新數據
UPDATE dbo.orders SET quantity = 10 WHERE id = 1;
GO-- 觀察Flink輸出:應顯示變更記錄,op_ts為變更時間
3. DataStream API 驗證(增量模式)
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SqlServerCdcExample {public static void main(String[] args) throws Exception {// 配置SQL Server Source(增量快照模式)SqlServerSourceBuilder.SqlServerIncrementalSource<String> sourceBuilder = SqlServerSourceBuilder.sqlserverIncrementalSource().hostname("192.168.1.100").port(1433).databaseList("MyDB").tableList("dbo.orders").username("flinkuser").password("Flink@123").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).splitSize(1000) // 快照分片大小.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.fromSource(sourceBuilder,WatermarkStrategy.noWatermarks(),"SQL Server CDC Source").setParallelism(4) // 設置4并行度.print();env.execute("SQL Server CDC Test");}
}
五、常見問題與解決方案
  1. SQL Agent未運行

    ERROR: CDC作業無法啟動,SQL Agent服務未運行
    
    • 解決方案:啟動SQL Server Agent服務(可通過SQL Server配置管理器或命令行啟動)。
  2. 權限不足

    ERROR: 用戶無權訪問CDC表
    
    • 解決方案:確認用戶屬于db_owner角色,或手動授予SELECT權限至sys.change_tables
  3. 增量快照失敗(無主鍵表)

    ERROR: 表缺少主鍵,無法進行增量快照
    
    • 解決方案:為表添加主鍵,或手動指定分片鍵:
      'scan.incremental.snapshot.chunk.key-column' = 'id'
      
  4. 時區轉換異常

    • 解決方案:顯式設置server-time-zone參數:
      'server-time-zone' = 'Asia/Shanghai'
      
六、生產環境優化建議
  1. CDC清理策略

    • 配置CDC清理作業(定期刪除舊變更數據):
      USE MyDB;
      GO
      EXEC sys.sp_cdc_cleanup_change_data; -- 清理舊變更記錄
      
  2. 作業高可用

    • 使用SQL Server Always On Availability Groups時,Flink作業需連接主副本,并確保CDC配置在主庫。
  3. 性能調優

    • 調整scan.incremental.snapshot.chunk.size(如設為10000)以平衡并行度和內存占用;
    • 對于大表,啟用debezium.snapshot.fetch.size(如設為2048)優化快照讀取性能。

通過以上步驟,可完成Flink SQL Server CDC的全流程配置與驗證。生產環境中需特別注意SQL Agent的運行狀態、CDC數據清理策略及增量快照的并行參數調優,以確保數據一致性和系統穩定性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/88003.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88003.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/88003.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟考(軟件設計師)存儲管理—設備管理,磁盤調度

I/O軟件的核心目標是管理硬件差異、提供統一接口、實現高效可靠的數據傳輸。 核心目標&#xff1a; 設備無關性&#xff1a; 應用程序無需關心具體硬件細節。錯誤處理&#xff1a; 處理硬件錯誤和傳輸異常。同步/異步傳輸&#xff1a; 支持阻塞&#xff08;等待完成&#xff09…

[C語言] C語言數學函數庫概覽

C語言數學函數庫概覽 文章目錄 C語言數學函數庫概覽一、概述二、基本數學函數詳解1. 平方根函數 sqrt(x)2. 冪函數 pow(x, y)3. 絕對值函數 fabs(x)4. 向上取整函數 ceil(x)5. 向下取整函數 floor(x) 三、三角函數與雙曲函數詳解1. 正弦函數 double sin(double x)2. 余弦函數 d…

【簡單三步】Stable diffusion Webai本地部署無法加載模型并報openai/clip-vit-large-patch14錯誤的解決方法

問題描述 Stable diffusion Webai本地部署成功后&#xff0c;手動加載本地模型checkpoint時&#xff0c;始終無法加載進去&#xff0c;確定模型存放位置無誤&#xff08;位于models\Stable-diffusion&#xff09;查看cmd窗口時&#xff0c;發現一個報錯提示&#xff1a;Can’t …

Java 命令行參數詳解:系統屬性、JVM 選項與應用配置

Java 命令行參數詳解&#xff1a;系統屬性、JVM 選項與應用配置 在 Java 應用啟動命令中&#xff0c;如&#xff1a; java -jar -Dserver.port8088 xdr-demo-1.0-SNAPSHOT-assembly.jar &-Dserver.port8088是一個 系統屬性&#xff08;System Property&#xff09; 設置。…

【論文筆記】World Models for Autonomous Driving: An Initial Survey

原文鏈接&#xff1a;https://ieeexplore.ieee.org/abstract/document/10522953 1. 世界模型的發展 A. 世界模型的結構基礎 世界模型包含4個關鍵組件&#xff0c;以模擬人類連貫的思考和決策過程。 a&#xff09;感知模塊使用如變分自編碼器&#xff08;VAE&#xff09;、掩…

Spring Cloud Config(微服務配置中心詳解)

關鍵詞&#xff1a;Spring Cloud Config、配置中心、遠程倉庫、動態刷新、加密解密 ? 摘要 在微服務架構中&#xff0c;隨著服務數量的增加&#xff0c;統一管理各服務的配置信息變得尤為重要。傳統的本地配置文件方式難以滿足多環境、多實例、集中化的需求。 Spring Cloud …

【Note】《深入理解Linux內核》 第二十章:深入理解 Linux 程序執行機制

《深入理解Linux內核》 第二十章&#xff1a;深入理解 Linux 程序執行機制&#xff08;Program Execution&#xff09;關鍵詞&#xff1a;exec 系列系統調用、可執行文件格式&#xff08;ELF&#xff09;、用戶地址空間、內存映射、動態鏈接、棧初始化、入口點、共享庫、內核態…

服務器如何配置防火墻規則以阻止惡意流量和DDoS攻擊?

防火墻是保護服務器免受惡意流量和 DDoS 攻擊的第一道防線。通過合理配置防火墻規則&#xff0c;可以有效阻止惡意訪問、限制不必要的流量&#xff0c;并減少攻擊對服務器的影響。以下是配置防火墻規則的全面指南&#xff0c;包括基礎規則設置、防御 DDoS 攻擊的高級策略和最佳…

持續性投入是成就自我價值的關鍵一環

概述 時間&#xff0c;的唯一公平之處就是給你我的長度是相同的&#xff0c;這也是它唯一公平&#xff0c;也是不公平的地方。 所謂的公平&#xff0c;就是不患寡而患不均中所說的平均。 所謂的不公平就是&#xff0c;相同時間內我們彼此對應的標價不同&#xff0c;延伸到后…

使用allegro在BoardGeometry的Silkscreen_Top層畫出圖案

目錄 1. 圖形及圖形放置顯示2. 繪制 1. 圖形及圖形放置顯示 繪制完成圖案&#xff1a; 導出后圖案&#xff1a; 2. 繪制 圖層選中&#xff1b; 畫圓型&#xff1b; 半徑3.5mm&#xff0c;原點生成&#xff1b; 在圖案中挖空&#xff1b; 用指令走線&#xff1a; …

Kotlin 協程:Channel 與 Flow 深度對比及 Channel 使用指南

前言 在 Kotlin 協程的異步編程世界里&#xff0c;Channel 和 Flow 是處理數據流的重要工具&#xff0c;它們有著不同的設計理念與適用場景。本文將對比二者功能與應用場景&#xff0c;詳細講解 Channel 的使用步驟及注意事項 。 一、Channel 與 Flow 的特性對比 Channel 是協程…

MYsql主從復制部署

MySQL 主從復制是將主數據庫的變更自動同步到從數據庫的過程&#xff0c;常用語讀寫分離、高可用性和數據備份。 1.環境準備 確保主從服務器已安裝相同版本的 MySQL&#xff0c;并能通過網絡互相訪問。 # 檢查 MySQL 版本 mysql -V 2.配置主服務器 &#xff08;1&#xff0…

安燈呼叫看板如何實現汽車生產異常秒級響應

在汽車零部件工廠的靜置車間&#xff0c;傳統生產管理依賴人工巡檢與紙質記錄&#xff0c;存在效率低、信息滯后、異常響應慢等問題。某汽車廠曾因物料靜置時間未及時監控&#xff0c;導致批次混料&#xff0c;損失超10萬元。而安燈呼叫看板系統的引入&#xff0c;通過實時狀態…

構造函數注入在spring boot 中怎么使用詳解

我們來詳細講解一下在 Spring Boot 中如何使用構造函數注入&#xff0c;并通過一個完整的、可運行的例子來演示。 構造函數注入是 Spring 官方最推薦的依賴注入方式&#xff0c;因為它能保證對象的不可變性和依賴的完整性。 核心理念 在 Spring Boot 中使用構造函數注入非常簡單…

2025.6.30-2025.7.06第26周:第一次參加頭馬演講俱樂部

現在是周一早上6:23&#xff0c;我開始寫上周的周總結。 3件超出預期的事 參加頭馬俱樂部絕對是最超出預期的&#xff0c;使得這個周末格外的快樂簡歷的第一版終于改完了&#xff0c;花了好長的時間&#xff0c;其中有一天心情還很蕩&#xff0c;因為&#xff0c;我想&#x…

2025使用VM虛擬機安裝配置Macos蘋果系統下Flutter開發環境保姆級教程--下篇

其實如何安裝VM,如何安裝MACOS網上的教程很多,我只是結合我的體驗重新整理了一次,接下來才進入本教程最核心的部分,Flutter開發環境的配置部分。、一.配置前準備 主要是準備相應的工具包,以及其他虛擬機設置1.工具包 工具包的版本也可以自行配置,我這主要是我使用的是F…

QSPI、OSPI與FSMC的區別與內存映射分析

QSPI、OSPI與FSMC的區別與內存映射分析 基本概念與區別 1. FSMC (靈活靜態存儲控制器) 接口類型&#xff1a;并行接口&#xff0c;通常8/16位數據總線總線標準&#xff1a;傳統并行總線協議速度&#xff1a;相對較低&#xff0c;通常最高約100MHz應用場景&#xff1a;SRAM、NOR…

系統思考與心智模式探索

成長的真正障礙&#xff0c;不是能力的不足&#xff0c;而是看待問題的局限。 在復雜多變的商業環境中&#xff0c;我們往往習慣于解決“眼前”的問題&#xff0c;卻忽視了深藏背后的系統性障礙。我們看到的只是表面的“癥狀”&#xff0c;而真正的根源&#xff0c;卻往往隱藏…

物聯網技術的關鍵技術與區塊鏈發展趨勢的深度融合分析

一、物聯網技術的核心架構與關鍵技術 物聯網技術體系由感知層、網絡層、平臺層、應用層和安全層構成&#xff0c;各層技術協同工作&#xff0c;實現物理世界與數字世界的深度融合。 感知層&#xff1a;物聯網的“感官” 傳感器技術&#xff1a;包括環境傳感器&#xff08;溫度…

針對Exhcnage Server的攻擊防范措施

一、背景介紹最近&#xff0c;安全研究人員揭露了一個名為 NightEagle&#xff08;又名 APT-Q-95&#xff09; 的高級持續性威脅&#xff08;APT&#xff09;組織。這個組織被觀察到利用 Microsoft Exchange 服務器中的零日漏洞鏈 進行攻擊&#xff0c;其主要目標是中國政府、國…