Debezium日常分享系列之:MongoDB 新文檔狀態提取
- 變更事件結構
- 行為
- 配置
- 數組編碼
- 嵌套結構展平
- MongoDB $unset 處理
- 確定原始操作
- 添加元數據字段
- 選擇性應用轉換的選項
- 配置選項
- 已知限制
Debezium MongoDB 連接器會發出數據變更消息,以表示 MongoDB 集合中發生的每個操作。這些事件消息的復雜結構忠實地反映了原始數據庫事件的詳細信息。然而,某些下游消費者可能無法以原始格式處理這些消息。例如,為了表示數據集合中的嵌套文檔,連接器會以包含嵌套字段的格式發出事件消息。為了支持接收器連接器或其他無法處理原始消息層級格式的消費者,您可以使用 Debezium MongoDB 事件扁平化 (ExtractNewDocumentState) 單消息轉換 (SMT)。SMT 簡化了原始消息的結構,并可以通過其他方式修改消息,使數據更易于處理。
事件扁平化轉換是一種 Kafka Connect SMT。
變更事件結構
Debezium MongoDB 連接器會生成具有復雜結構的變更事件。每條事件消息包含以下部分:
源元數據
包括但不限于以下字段:
- 更改集合中數據的操作類型(創建/插入、更新或刪除)。
- 發生變更的數據庫和集合的名稱。
- 標識變更時間的時間戳。
- 可選的事務信息。
文檔數據
before data
- 在運行 MongoDB 6.0 及更高版本的環境中,當 Debezium 連接器的 capture.mode 設置為以下值之一時,此字段會出現:
- change_streams_with_pre_image
- change_streams_update_full_with_pre_image
after data
表示當前操作后文檔中存在的值的 JSON 字符串。事件消息中是否存在 after 字段取決于事件類型和連接器配置。MongoDB 插入操作的創建事件始終包含 after 字段,無論 capture.mode 的設置如何。對于更新事件,僅當 capture.mode 設置為以下值之一時,才會顯示 after 字段:
- change_streams_update_full
- change_streams_update_full_with_pre_image。
注意:
變更事件消息中的后續值不一定代表事件發生后文檔的狀態。該值并非動態計算;相反,連接器捕獲變更事件后,會查詢集合以獲取文檔的當前值。
例如,假設多個操作 a、b 和 c 快速連續地修改了文檔。當連接器處理變更 a 時,它會查詢集合以獲取完整文檔。與此同時,發生了變更 b 和 c。當連接器收到對變更 a 的完整文檔查詢的響應時,它可能會收到基于變更 b 或 c 的后續變更的文檔版本。
以下片段顯示了連接器在 MongoDB 插入操作后發出的創建更改事件的基本結構:
{"op": "c","after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}","source": { ... }
}
上例中 after 字段的復雜格式提供了有關源數據庫中發生的更改的詳細信息。但是,某些使用者無法處理包含嵌套值的消息。要將原始消息的復雜嵌套字段轉換為更簡單、更通用的結構,請使用 MongoDB 的事件展平 SMT。SMT 會展平消息中嵌套字段的結構,如下例所示:
{"field1" : "newvalue1","field2" : "newvalue2"
}
行為
MongoDB 的事件展平 SMT 會從 Debezium MongoDB 連接器發出的創建或更新變更事件消息中提取 after 字段。SMT 處理原始變更事件消息后,會生成一個僅包含 after 字段內容的簡化版本。
根據您的用例,您可以將 ExtractNewDocumentState SMT 應用于 Debezium MongoDB 連接器,或應用于接收 Debezium 連接器所生成消息的接收器連接器。如果將 SMT 應用于 Debezium MongoDB 連接器,SMT 會在連接器發出的消息發送到 Apache Kafka 之前對其進行修改。為了確保 Kafka 保留完整的 Debezium 變更事件消息的原始格式,請將 SMT 應用于接收器連接器。
當您使用事件展平 SMT 處理 MongoDB 連接器發出的消息時,SMT 會將原始消息中記錄的結構轉換為正確類型的 Kafka Connect 記錄,以便典型的接收器連接器可以使用。例如,SMT 會將表示原始消息中后續信息的 JSON 字符串轉換為任何消費者都可以處理的架構結構。
您也可以選擇為 MongoDB 配置事件展平 SMT,以便在處理過程中以其他方式修改消息。更多信息,請參閱配置主題。
配置
為接收連接器配置 MongoDB 的事件扁平化 (ExtractNewDocumentState) SMT,用于消費 Debezium MongoDB 連接器發出的消息。
基本配置
要獲取 SMT 的默認行為,請將 SMT 添加到接收連接器的配置中,無需指定任何選項,如下例所示:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
與任何 Kafka Connect 連接器配置一樣,您可以將 transforms= 設置為多個以逗號分隔的 SMT 別名。Kafka Connect 會按照您指定的轉換的列出順序應用它們。
您可以為使用 MongoDB 事件展平 SMT 的連接器設置多個選項。以下示例展示了為連接器設置 delete.tombstone.handling.mode 和 add.headers 選項的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.delete.tombstone.handling.mode=drop
transforms.unwrap.add.headers=op
數組編碼
默認情況下,事件展平 SMT 會將 MongoDB 數組轉換為與 Apache Kafka Connect 或 Apache Avro 模式兼容的數組。雖然 MongoDB 數組可以包含多種類型的元素,但 Kafka 數組中的所有元素必須屬于同一類型。
為了確保 SMT 以符合您環境需求的方式對數組進行編碼,您可以指定 array.encoding 配置選項。以下示例展示了設置數組編碼的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>
根據配置,SMT 會使用以下編碼方法之一處理源消息中每個數組實例:
數組編碼
- 如果將 array.encoding 設置為 array(默認值),SMT 編碼會使用數組數據類型對原始消息中的數組進行編碼。為確保正確處理,數組實例中的所有元素必須屬于同一類型。此選項有一定的限制,但它使下游客戶端能夠輕松處理數組。
文檔編碼
- 如果將 array.encoding 設置為 document,SMT 會將源消息中的每個數組轉換為結構體,其方式類似于 BSON 序列化。主結構體包含名為 _0、_1、_2 等的字段,其中每個字段名稱代表原始數組中元素的索引。SMT 會用其在源數組中檢索到的等效元素的值填充每個索引字段。索引名稱以下劃線為前綴,因為 Avro 編碼禁止字段名稱以數字字符開頭。
以下示例顯示了 Debezium MongoDB 連接器如何表示包含異構數據類型的數組的數據庫文檔:
例 1. 示例:包含多種數據類型的數組的文檔編碼
{"_id": 1,"a1": [{"a": 1,"b": "none"},{"a": "c","d": "something"}]
}
如果array.encoding設置為document,SMT會將上述文檔轉換為以下格式:
{"_id": 1,"a1": {"_0": {"a": 1,"b": "none"},"_1": {"a": "c","d": "something"}}
}
文檔編碼選項使 SMT 能夠處理由異構元素組成的任意數組。但是,在使用此選項之前,請務必驗證接收器連接器和其他下游消費者是否能夠處理包含多種數據類型的數組。
嵌套結構展平
當數據庫操作涉及嵌入式文檔時,Debezium MongoDB 連接器會發出 Kafka 事件記錄,該記錄的結構反映了原始文檔的層級結構。也就是說,事件消息將嵌套文檔表示為一組嵌套字段結構。在下游連接器無法處理包含嵌套結構的消息的環境中,您可以配置事件展平 SMT,以展平消息中的層級結構。扁平消息結構更適合表式存儲。
要配置 SMT 展平嵌套結構,請將 flatten.struct 配置選項設置為 true。在轉換后的消息中,字段名稱的構造與文檔源一致。SMT 通過將父文檔字段的名稱與嵌套文檔字段的名稱連接起來,重命名每個展平字段。flatten.struct.delimiter 選項定義的分隔符用于分隔名稱的各個組成部分。struct.delimiter 的默認值為下劃線 (_)。
以下示例顯示了指定 SMT 是否展平嵌套結構的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>
以下示例展示了 MongoDB 連接器發出的事件消息。該消息包含文檔 a 的字段,該文檔包含兩個嵌套文檔 b 和 c 的字段:
{"_id": 1,"a": {"b": 1,"c": "none"},"d": 100
}
以下示例中的消息顯示了 MongoDB 的 SMT 將上一條消息中的嵌套結構展平后的輸出:
{"_id": 1,"a_b": 1,"a_c": "none","d": 100
}
在生成的消息中,原始消息中嵌套的字段 b 和 c 被展平并重命名。重命名的字段由父文檔 a 的名稱與嵌套文檔的名稱 a_b 和 a_c 連接而成。新字段名稱的各個部分由下劃線分隔,具體定義在 struct.delimiter 配置屬性的設置中。
MongoDB $unset 處理
在 MongoDB 中,$unset 運算符和 $rename 運算符均會從文檔中刪除字段。由于 MongoDB 集合是無模式的,因此在更新操作從文檔中刪除字段后,無法從更新后的文檔中推斷出缺失字段的名稱。為了支持接收器連接器或其他可能需要已刪除字段信息的消費者,Debezium 會發出包含 removedFields 元素的更新消息,該元素列出了已刪除字段的名稱。
以下示例展示了導致字段 a 被刪除的操作的更新消息的一部分:
"payload": {"op": "u","ts_ms": "...","ts_us" : "...","ts_ns" : "...","before": "{ ... }","after": "{ ... }","updateDescription": {"removedFields": ["a"],"updatedFields": null,"truncatedArrays": null}
}
在上面的示例中,before 和 after 分別表示源文檔更新前后的狀態。只有當連接器的 capture.mode 設置為以下列表所述時,這些字段才會出現在連接器發出的事件消息中:
before 字段
提供文檔更改前的狀態。只有當 capture.mode 設置為以下值之一時,此字段才會出現:
- change_streams_with_pre_image
- change_streams_update_full_with_pre_image。
after 字段
提供文檔更改后的完整狀態。只有當 capture.mode 設置為以下值之一時,此字段才會出現:
- change_streams_update_full
- change_streams_update_full_with_pre_image。
假設一個連接器配置為捕獲完整文檔,當 ExtractNewDocumentState SMT 收到 $unset 事件的更新消息時,SMT 會通過表示已刪除的字段具有空值來重新編碼該消息,如下例所示:
{"id": 1,"a": null
}
對于未配置為捕獲完整文檔的連接器,當 SMT 收到 $unset 操作的更新事件時,它會生成以下輸出消息:
{"a": null
}
確定原始操作
SMT 展平事件消息后,生成的消息將不再指示生成該事件的操作類型是創建、更新還是初始快照讀取。通常,您可以通過配置連接器以公開伴隨刪除的邏輯刪除或重寫事件的信息來識別刪除操作。有關配置連接器以在事件消息中公開邏輯刪除和重寫信息的更多信息,請參閱 delete.tombstone.handling.mode 屬性。
要在事件消息中報告數據庫操作的類型,SMT 可以將 op 字段添加到以下元素之一:
- 事件消息正文。
- 消息頭。
例如,要添加顯示原始操作類型的頭屬性,請添加轉換,然后將 add.headers 屬性添加到連接器配置中,如下例所示:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op
基于上述配置,SMT 會在消息中添加 op 頭,并為其分配一個字符串值來標識操作類型,從而報告事件類型。分配的字符串值基于原始 MongoDB 變更事件消息中的 op 字段值。
添加元數據字段
MongoDB 的事件展平 SMT 可以將原始變更事件消息中的元數據字段添加到簡化消息中。添加的元數據字段以雙下劃線(“__”)為前綴。向事件記錄添加元數據可以包含諸如發生變更事件的集合名稱之類的內容,也可以包含特定于連接器的字段,例如副本集名稱。目前,SMT 僅支持從以下變更事件子結構中添加字段:source、transaction 和 updateDescription。
例如,您可以指定以下配置,將變更事件的副本集名稱 (rs) 和集合名稱添加到最終的展平事件記錄中:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection
上述配置會導致以下內容被添加到扁平記錄中:
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
如果希望 SMT 為刪除事件添加元數據字段,請將 delete.tombstone.handling.mode 選項的值設置為 rewrite。
選擇性應用轉換的選項
除了數據庫更改發生時 Debezium 連接器發出的更改事件消息外,該連接器還會發出其他類型的消息,包括心跳消息以及有關架構更改和事務的元數據消息。由于這些其他消息的結構與 SMT 設計用于處理的更改事件消息的結構不同,因此最好將連接器配置為選擇性應用 SMT,以便它僅處理預期的數據更改消息。
配置選項
下表描述了 MongoDB 事件展平 SMT 的配置選項。
屬性 | 默認值 | 描述 |
---|---|---|
array.encoding | array | 指定 SMT 對從原始事件消息中讀取的數組進行編碼時使用的格式。請設置以下選項之一:array SMT 使用數組數據類型將 MongoDB 數組編碼為與 Apache Kafka Connect 或 Apache Avro 模式兼容的格式。如果設置此選項,請驗證每個數組實例中的元素是否屬于同一類型。雖然 MongoDB 允許數組包含多種數據類型,但某些下游客戶端無法處理數組。document SMT 將每個 MongoDB 數組轉換為結構體的結構體,其方式類似于 BSON 序列化。主結構體包含名為 _0、_1、_2 等的字段。為了符合 Avro 命名標準,SMT 在每個索引字段的數字名稱前添加下劃線。每個數字字段名稱代表原始數組中元素的索引。 SMT 用從源文檔中檢索到的指定數組元素的值填充每個索引字段。 |
flatten.struct | false | SMT 通過連接消息中嵌套屬性的名稱(以可配置的分隔符分隔)來展平原始事件消息中的結構(struct),以形成一個簡單的字段名稱。 |
flatten.struct.delimiter | 當 flatten.struct 設置為 true 時,指定轉換在從輸入記錄連接的字段名稱之間插入的分隔符,以在輸出記錄中生成字段名稱。 | |
delete.tombstone.handling.mode | tombstone | Debezium 會為每個 DELETE 操作生成一條變更事件記錄。此設置決定 MongoDB 事件展平 SMT 如何處理流中的 DELETE 事件。請設置以下選項之一:drop SMT 會從流中移除 DELETE 事件和“TOMBSTONE”記錄。tombstone(默認)SMT 會在流中保留TOMBSTONE 記錄。TOMBSTONE 記錄僅包含以下值:“value”: “null”。rewrite SMT 會在流中保留變更事件記錄并進行以下更改:向記錄添加一個 value 字段,該字段包含原始記錄 before 字段中的鍵/值對。向記錄的值添加 __deleted: true。刪除 TOMBSTONE 記錄。此設置提供了另一種指示記錄已被刪除的方式。rewrite-with-tombstoneSMT 的行為與選擇重寫選項時的行為相同,只是它還會保留 TOMBSTONE 記錄。 |
delete.tombstone.handling.mode.rewrite-with-id | false | 當設置為 true 并且 delete.tombstone.handling.mode 被重寫時,從鍵中復制 id 字段并將其作為 _id 包含在刪除事件的有效負載中。 |
add.headers.prefix | __ (double-underscore) | 設置此可選字符串作為標題的前綴。 |
add.headers | No default | 指定您希望 SMT 添加到簡化消息頭的元數據字段列表(不帶空格),這些字段以逗號分隔。當原始消息包含重復的字段名稱時,您可以通過提供結構體名稱和字段名稱來標識要修改的特定字段,例如 source.ts_ms。您也可以選擇覆蓋字段的原始名稱,并通過在列表中添加以下格式的條目來為其分配新名稱:<field_name>:<new_field_name>。例如:version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP 您指定的新名稱值區分大小寫。當 SMT 將元數據字段添加到簡化消息的標頭時,它會在每個元數據字段名稱前添加雙下劃線。對于結構體規范,SMT 還會在結構體名稱和字段名稱之間插入下劃線。如果您指定的字段不在變更事件原始消息中,SMT 不會將該字段添加到標頭。 |
add.fields.prefix | __ (double-underscore) | 指定作為字段名稱前綴的可選字符串。 |
add.fields | No default | 將此選項設置為以逗號分隔的列表,其中包含要添加到簡化 Kafka 消息的 value 元素的元數據字段,不帶空格。當原始消息包含重復的字段名稱時,您可以通過提供結構體名稱和字段名稱來標識要修改的特定字段,例如 source.ts_ms。您也可以選擇覆蓋字段的原始名稱,并通過向列表中添加以下格式的條目來為其分配新名稱:<field_name>:<new_field_name>。例如:version:VERSION, connector:CONNECTOR,source.ts_ms:EVENT_TIMESTAMP 您指定的新名稱值區分大小寫。當 SMT 將元數據字段添加到簡化消息的 value 元素時,它會在每個元數據字段名稱前添加雙下劃線。對于結構體規范,SMT 還會在結構體名稱和字段名稱之間插入下劃線。如果您指定的字段在原始變更事件消息中不存在,SMT 仍會將指定的字段添加到修改后消息的 value 元素中。 |
已知限制
由于 MongoDB 是無模式數據庫,為了確保在使用 Debezium 將更改流式傳輸到基于模式的數據關系數據庫時列定義一致,集合中同名字段必須存儲相同類型的數據。
請配置 SMT,使其生成與接收器連接器兼容格式的消息。如果接收器連接器需要“扁平”消息結構,但它收到一條將源 MongoDB 文檔中的數組編碼為結構體結構的消息,則接收器連接器無法處理該消息。