Debezium日常分享系列之:Debezium3.1版本之增量快照
- 按需快照
- 觸發一次臨時增量快照
- 觸發臨時阻塞快照
- 增量快照
- 增量快照過程
- 如何 Debezium 解決具有相同主鍵的記錄之間的沖突
- 快照窗口
- 觸發增量快照
- 使用附加條件運行臨時增量快照
- 使用 Kafka 信號通道觸發增量快照
- 臨時增量快照與附加條件
- 停止增量快照
- 附加資源
- 使用 Kafka 信令通道停止增量快照
按需快照
默認情況下,連接器僅在首次啟動后執行初始快照操作。在此初始快照之后,在正常情況下,連接器不會重復快照過程。連接器捕獲的任何未來的更改事件數據都僅通過流處理過程進入。
然而,在某些情況下,連接器在初始快照期間獲得的數據可能會變得過時、丟失或不完整。為了提供一種重新捕獲表數據的機制,Debezium 包含了一個執行按需快照的選項。你可能希望在以下任何更改發生在你的 Debezium 環境中后執行按需快照:
- 連接器配置被修改以捕獲不同的表集。
- Kafka 主題被刪除且必須重建。
- 由于配置錯誤或其他問題導致數據損壞。
你可以通過發起所謂的按需快照,重新運行之前已捕獲快照的表的快照。按需快照需要使用 。你通過向 Debezium 信號表發送信號請求來啟動按需快照。
當你對現有表啟動按需快照時,連接器會將內容附加到該表已存在的主題上。如果先前存在的主題已被移除,當 啟用時,Debezium 可以自動創建一個主題。
按需快照信號指定了要包含在快照中的表。快照可以捕獲數據庫的全部內容,或者只捕獲數據庫中的一組表。此外,快照還可以捕獲數據庫中表(們)的一部分內容。
你通過向信號表發送執行快照消息來指定要捕獲的表。設置執行快照信號的類型為增量或阻塞,并提供要包含在快照中的表的名稱,如下表所述:
表 2. 臨時執行快照信號記錄的示例
字段 | 默認值 | 描述 |
---|---|---|
type | incremental | 指定要運行的快照類型。目前,您可以請求增量快照或阻塞快照。 |
data-collections | N/A | 包含正則表達式數組,匹配要包含在快照中的表的全限定名稱。<?br?>對于 SQL Server 連接器,使用以下格式指定表的全限定名稱:database.schema.table |
additional-conditions | N/A | 一個可選數組,指定連接器評估以確定要包含在快照中的記錄子集的一組附加條件。<?br?>每個附加條件是一個對象,指定用于過濾臨時快照捕獲的數據的標準。您可以為每個附加條件設置以下參數:data-collection:應用過濾器的表的完全限定名稱。您可以為每個表應用不同的過濾器。filter:指定數據庫記錄中必須存在的列值,以便快照包含該記錄,例如,“color=‘blue’”。您分配給過濾器參數的值與在設置阻塞快照的 snapshot.select.statement.overrides 屬性時,在 SELECT 語句的 WHERE 子句中可能指定的值類型相同。 |
surrogate-key | N/A | 一個可選字符串,指定連接器在快照過程中用作表主鍵的列名。 |
觸發一次臨時增量快照
您可以通過在信號表中添加一個帶有 execute-snapshot 信號類型的條目,或通過向 Kafka 信號主題發送信號消息來啟動一次臨時增量快照。連接器處理該消息后,將開始快照操作。快照過程會讀取每個表的第一個和最后一個主鍵值,并使用這些值作為每個表的起始點和結束點。根據表中的條目數量和配置的塊大小,Debezium 將表分成多個塊,并依次逐個進行快照。
觸發臨時阻塞快照
您可以通過在信號表或信號主題中添加帶有 execute-snapshot
信號類型的消息來啟動臨時阻塞快照。連接器處理消息后,將開始快照操作。連接器會暫時停止流式傳輸,然后對指定的表進行快照,其過程與初始快照相同。快照完成后,連接器將恢復流式傳輸。
增量快照
SQL Server 排序規則
每個 SQL Server 服務器或數據庫都配置為使用特定的排序規則,這決定了字符數據如何存儲、排序、比較和顯示。某些排序規則集(如SQL Server 排序規則 (SQL_*))的排序規則與 Unicode 排序算法不兼容。在某些情況下,不兼容的排序規則可能會導致連接器在執行即時快照時丟失數據。例如,如果 SQL Server 配置為以 Unicode 格式發送字符串(即連接屬性 sendStringParametersAsUnicode 設置為 true),連接器在快照期間可能會跳過記錄。為了防止在即時快照期間丟失數據,可以將連接字符串屬性 driver.sendStringParametersAsUnicode 的值設置為 false。
為了提供管理快照的靈活性,Debezium 包含了一種補充的快照機制,稱為增量快照。增量快照依賴于 Debezium 的向 Debezium 連接器發送信號的機制。增量快照基于 DDD-3 設計文檔。
在增量快照中,與初始快照一次性捕獲數據庫的完整狀態不同,Debezium 會分階段、以一系列可配置的塊來捕獲每個表。你可以指定希望快照捕獲的表以及每塊的大小。塊大小決定了每次從數據庫獲取數據時快照收集的行數。增量快照的默認塊大小為 1024 行。
隨著增量快照的進行,Debezium 使用水印來跟蹤其進度,記錄它捕獲的每一行表數據。這種分階段的數據捕獲方法相比標準的初始快照過程具有以下優勢:
- 您可以在流式數據捕獲的同時并行運行增量快照,而無需等到快照完成后再開始流式捕獲。連接器在整個快照過程中繼續從變更日志中捕獲近乎實時的事件,兩項操作互不干擾。
- 如果增量快照的進度被中斷,您可以從中斷的地方恢復,而不會丟失任何數據。恢復后,快照將從停止的地方開始,而不是重新從頭開始捕獲整個表。
- 您可以隨時按需運行增量快照,并根據需要重復該過程以適應數據庫更新。例如,您可能在修改連接器配置以添加一個表到其屬性后重新運行快照。
增量快照過程
當你運行一個增量快照時,Debezium會根據主鍵對每個表進行排序,然后根據配置的塊大小將表分成若干塊。按塊處理,它會捕獲每塊中的每一行記錄。對于捕獲的每一行,快照會發出一個READ事件。該事件表示在快照開始時該行的值。
隨著快照的進行,其他進程可能會繼續訪問數據庫,從而可能修改表記錄。為了反映這些變化,INSERT、UPDATE或DELETE操作會像往常一樣提交到事務日志。同樣,持續的Debezium流處理過程會繼續檢測這些變更事件,并將相應的變更事件記錄發送到Kafka。
如何 Debezium 解決具有相同主鍵的記錄之間的沖突
在某些情況下,流處理過程中發出的 UPDATE 或 DELETE 事件會亂序接收。也就是說,流處理過程可能會在一個包含該行的 READ 事件的快照捕獲之前,先發出一個修改表行的事件。當快照最終發出該行對應的 READ 事件時,其值已經被覆蓋。為了確保亂序到達的增量快照事件能夠按正確的邏輯順序處理,Debezium 采用了緩沖方案來解決沖突。只有在解決了快照事件和流事件之間的沖突后,Debezium 才會將事件記錄發送到 Kafka。
快照窗口
為了協助解決晚到的讀取(READ)事件與修改同一表行的流式事件之間的沖突,Debezium 采用了所謂的快照窗口。快照窗口定義了增量快照捕獲指定表分塊數據的時間間隔。在某個分塊的快照窗口開啟之前,Debezium 按照其常規行為,直接將事務日志中的事件發送到目標 Kafka 主題。但從該特定分塊的快照開啟時刻起,直到它關閉為止,Debezium 會執行一個去重步驟,以解決具有相同主鍵的事件之間的沖突。
對于每個數據集合,Debezium 發出兩種類型的事件,并將這些事件的記錄存儲在一個目標 Kafka 主題中。從表中直接捕獲的快照記錄作為 READ 操作發出。同時,隨著用戶繼續更新數據集合中的記錄,每次提交都會反映在事務日志中,Debezium 為每次更改發出 UPDATE 或 DELETE 操作。
當快照窗口打開,Debezium 開始處理快照分塊時,它會將快照記錄交付到內存緩沖區。在快照窗口期間,緩沖區中的 READ 事件的主鍵會被與傳入的流式事件的主鍵進行比較。如果沒有找到匹配項,流式事件記錄將直接發送到 Kafka。如果 Debezium 檢測到匹配項,它會丟棄緩沖的 READ 事件,并將流式記錄寫入目標主題,因為流式事件邏輯上覆蓋了靜態快照事件。在該分塊的快照窗口關閉后,緩沖區中僅剩下沒有相關事務日志事件的 READ 事件。Debezium 將這些剩余的 READ 事件發送到表的 Kafka 主題。
連接器對每個快照分塊重復這一過程。
注意:
要使 Debezium 能夠執行增量快照,您必須授予連接器寫入信號表的權限。
只有可以配置為只讀增量快照的連接器(MariaDB, MySQL, or PostgreSQL )才不需要寫入權限。目前,您可以使用以下任一方法來啟動增量快照:
- 向源數據庫上的信令表發送臨時快照信號。
- 向配置的 Kafka 信令主題發送消息。
SQL Server 的 Debezium 連接器在增量快照運行期間不支持模式更改。
觸發增量快照
要啟動增量快照,您可以向源數據庫的信號表發送一個即時快照信號。您可以通過SQL INSERT查詢提交快照信號。
當Debezium檢測到信號表中的變化后,它會讀取該信號,并執行請求的快照操作。
您提交的查詢指定了要包含在快照中的表,并且可以選擇性地指定快照操作的類型。目前,Debezium支持增量和阻塞兩種類型的快照。
要指定要包含在快照中的表,請提供一個列出這些表的數據集合數組,或者用于匹配表的正則表達式數組,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信號的數據集合數組沒有默認值。如果數據集合數組為空,Debezium會將其解釋為無需執行任何操作,因此不會進行快照。
注意:
如果您要包含在快照中的表名包含點(.)、空格或某些其他非字母數字字符,您必須用雙引號將表名轉義。
例如,要在db1數據庫的public模式中包含名為My.Table的表,使用以下格式:“db1.public.“My.Table””。
先決條件
- 已啟用信號。
- 源數據庫上存在信號數據集合。
- 信號數據集合在 signal.data.collection 屬性中指定。
使用源信令通道觸發增量快照
發送 SQL 查詢,將臨時增量快照請求添加到信令表中:
INSERT INTO <signalTable> (id, type, data)
VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如:
INSERT INTO db1.myschema.debezium_signal (id, type, data)
VALUES ('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=''blue''"}]}');
命令中的 id
、type
和 data
參數的值對應于 信令表的字段。
下表描述了示例中的參數:
表 3. 發送增量快照信號到信令表的 SQL 命令中的字段說明
Item | Value | Description |
---|---|---|
1 | database.schema.debezium_signal | 指定源數據庫上信令表的完全限定名稱。 |
2 | ad-hoc-1 | id 參數指定一個任意字符串,該字符串被指定為信號請求的 id 標識符。使用此字符串將日志消息標識到信令表中的條目。Debezium 不使用此字符串。相反,在快照期間,Debezium 會生成自己的 id 字符串作為水印信號。 |
3 | execute-snapshot | 類型參數指定信號想要觸發的操作。 |
4 | data-collections | 信號數據字段的必需組件,用于指定表名或正則表達式的數組,以匹配要包含在快照中的表名。該數組列出了使用 database.schema.table 格式的正則表達式,以匹配表的完全限定名稱。此格式與您用于指定連接器信號表名稱的格式相同。 |
5 | incremental | 信號數據字段的可選類型組件,用于指定要運行的快照操作的類型。有效值為增量和阻塞。如果您未指定值,則連接器默認執行增量快照。 |
6 | additional-conditions | 可選數組,用于指定一組附加條件,連接器將評估這些條件以確定要包含在快照中的記錄子集。每個附加條件都是一個具有數據收集和過濾器屬性的對象。您可以為每個數據收集指定不同的過濾器。* 數據收集屬性是過濾器適用的數據收集的完全限定名稱。 |
使用附加條件運行臨時增量快照
如果您希望快照僅包含表中內容的子集,則可以通過將附加條件參數附加到快照信號來修改信號請求。
典型快照的 SQL 查詢采用以下形式:
SELECT * FROM <tableName> ....
通過添加附加條件參數,您可以將 WHERE 條件附加到 SQL 查詢,如下例所示:
SELECT * FROM <data-collection> WHERE <filter> ....
以下示例顯示了一個 SQL 查詢,用于將帶有附加條件的臨時增量快照請求發送到信令表:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如,假設您有一個包含以下列的產品表:
- id(主鍵)
- color
- quantity
如果希望產品表的增量快照僅包含 color=blue 的數據項,則可以使用以下 SQL 語句來觸發快照:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
附加條件參數還允許您傳遞基于多個列的條件。例如,使用上例中的產品表,您可以提交一個查詢來觸發增量快照,該快照僅包含顏色=藍色且數量>10 的商品的數據:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下示例顯示了連接器捕獲的增量快照事件的 JSON。
示例 1. 增量快照事件消息
{"before":null,"after": {"pk":"1","value":"New data"},"source": {..."snapshot":"incremental" 1},"op":"r", 2"ts_ms":"1620393591654","ts_us":"1620393591654547","ts_ns":"1620393591654547920","transaction":null
}
表 4. 增量快照事件消息中字段的描述
Item | Field name | Description |
---|---|---|
1 | snapshot | 指定要運行的快照操作的類型。目前,唯一有效的選項是阻止和增量。在提交給信令表的 SQL 查詢中指定類型值是可選的。如果您未指定值,則連接器將運行增量快照。 |
2 | op | 指定事件類型。快照事件的值為 r,表示讀取操作。 |
使用 Kafka 信號通道觸發增量快照
您可以向配置的 Kafka 主題發送消息,請求連接器運行臨時增量快照。
Kafka 消息的鍵必須與 topic.prefix 連接器配置選項的值匹配。
消息的值是一個帶有類型和數據字段的 JSON 對象。
信號類型為 execute-snapshot,數據字段必須具有以下字段:
表 5. 執行快照數據字段
Field | Default | Value |
---|---|---|
type | incremental | 要執行的快照類型。目前 Debezium 支持增量和阻塞類型。 |
data-collections | N/A | 與要包含在快照中的表的完全限定名稱匹配的逗號分隔正則表達式數組。使用與 signal.data.collection 配置選項所需的相同格式指定名稱。 |
additional-conditions | N/A | 可選的附加條件數組,用于指定連接器評估的條件,以指定要包含在快照中的記錄子集。每個附加條件都是一個對象,用于指定用于篩選臨時快照捕獲的數據的條件。您可以為每個附加條件設置以下參數:data-collection:: 過濾器適用的表的完全限定名稱。您可以對每個表應用不同的過濾器。filter:: 指定數據庫記錄中必須存在的列值,快照才能將其包含,例如“color=‘blue’”。您分配給 filter 參數的值與您在為阻塞快照設置 snapper.select.statement.overrides 屬性時在 SELECT 語句的 WHERE 子句中指定的值類型相同。 |
示例 2. 執行快照 Kafka 消息
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
臨時增量快照與附加條件
Debezium 使用 additional-conditions
字段來選擇表內容的子集。
通常,當 Debezium 運行快照時,它會執行類似以下的 SQL 查詢:
SELECT * FROM <tableName> ...
當快照請求包含 additional-conditions
屬性時,該屬性的數據收集和過濾參數將被附加到 SQL 查詢中,例如:
SELECT * FROM <data-collection> WHERE <filter> ...
例如,假設有一個 products
表,包含 id
(主鍵)、color
和 brand
列,如果希望快照只包含 color='blue'
的內容,在請求快照時,可以添加 additional-conditions
屬性來過濾內容:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
您還可以使用 additional-conditions 屬性傳遞基于多列的條件。例如,使用與上例相同的產品表,如果您希望快照僅包含 color=‘blue’ 和 brand=‘MyBrand’ 的產品表中的內容,則可以發送以下請求:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
停止增量快照
在某些情況下,可能需要停止增量快照。例如,你可能會發現快照配置不正確,或者你希望確保資源可用于其他數據庫操作。你可以通過向源數據庫的信號表發送信號來停止正在進行的快照。
你可以通過發送SQL INSERT
查詢將停止快照的信號提交到信號表。停止快照的信號指定了快照操作的類型為增量,并且可以選擇性地指定你想從當前正在運行的快照中排除的表。當 Debezium 檢測到信號表中的變化后,它會讀取該信號,并在快照操作進行時停止增量快照。
附加資源
還可以通過向發送 JSON 消息來停止增量快照。
先決條件:
- 源數據庫上存在一個信令數據集合。
- 信令數據集合在屬性中指定。
使用源信令通道停止增量快照
向信令表發送 SQL 查詢以停止臨時增量快照:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
例如,
INSERT INTO db1.myschema.debezium_signal (id, type, data)
values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type":"incremental"}');
signal命令中的id、type、data參數的值與信令表的字段對應。
示例中的參數說明如下表:
表 6. 向信令表發送停止增量快照信號的 SQL 命令中字段說明
Item | Value | Description |
---|---|---|
1 | database.schema.debezium_signal | 指定源數據庫上信令表的完全限定名稱。 |
2 | ad-hoc-1 | id 參數指定一個任意字符串,該字符串被指定為信號請求的 id 標識符。使用此字符串將日志消息標識到信令表中的條目。Debezium 不使用此字符串。 |
3 | stop-snapshot | 指定類型參數指定信號想要觸發的操作。 |
4 | data-collections | 信號數據字段的可選組件,用于指定要從快照中刪除的表名或正則表達式的數組。該數組列出了與表的完全限定名稱匹配的正則表達式,格式為 database.schema.table如果從數據字段中省略此組件,信號將停止正在進行的整個增量快照。 |
5 | incremental | 信號數據字段的必需組件,用于指定要停止的快照操作的類型。目前,唯一有效的選項是增量。如果您未指定類型值,則信號無法停止增量快照。 |
使用 Kafka 信令通道停止增量快照
您可以向配置的 Kafka 信令主題發送信號消息以停止臨時增量快照。
Kafka 消息的鍵必須與 topic.prefix 連接器配置選項的值匹配。
消息的值是一個具有類型和數據字段的 JSON 對象。
信號類型為 stop-snapshot,數據字段必須具有以下字段:
表 7. 執行快照數據字段
Field | Default | Value |
---|---|---|
type | incremental | 要執行的快照類型。目前 Debezium 僅支持增量類型。 |
data-collections | N/A | 一個可選的逗號分隔正則表達式數組,用于匹配要從快照中刪除的表的完全限定名稱,該數組包含表名或正則表達式,用于匹配要從快照中刪除的表名。使用 database.schema.table 格式指定表名。 |
以下示例顯示了典型的停止快照 Kafka 消息:
Key = `test_connector`Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`