背景
Paimon 通過其獨特的?partial-update
?合并引擎和底層的 LSM 存儲結構,巧妙地將傳統雙流 Join 中對 Flink State 的高頻隨機讀/寫,轉換為了對 Paimon 表的順序寫和后臺的高效合并,從而一站式地解決了 Flink 作業狀態過大、依賴外部 KV 系統等一系列痛點。
傳統方案中,Flink 作業需要維護一個巨大的 State(可能達到 TB 級)來存儲其中一個流的數據。當另一個流的數據到達時,需要去這個巨大的 State 中查找(Join)對應的記錄。這個“查找”操作,在數據量巨大、內存無法完全容納時,就會頻繁觸發對磁盤的隨機讀。機械硬盤和固態硬盤的隨機讀性能遠低于順序讀,這成為了整個作業的性能瓶瓶頸,并導致了高昂的資源開銷和不穩定性。
使用 Paimon 的?partial-update
?模式后,整個數據處理的范式發生了改變:
- 不再需要 Flink State 來做 Join:兩個數據流不再需要在 Flink 算子內部進行 Join。它們各自獨立地、源源不斷地將自己的數據寫入(
INSERT INTO
)到同一個 Paimon 表中。 - 寫入是高效的順序操作:Paimon 底層采用 LSM-Tree 結構。新寫入的數據會先進入內存緩沖區,然后刷寫成新的、有序的小文件。這個過程主要是順序寫,效率非常高。
這樣一來,原來 Flink 作業中最消耗性能的“狀態查找”(隨機讀)環節,被徹底消除了。
現在,兩個流的數據都以部分列的形式寫入了 Paimon 表。那么,數據是在哪里“打寬”合并的呢?答案是在 Paimon 的Compaction過程中。
-
Partial-Update 合并引擎:當將表的合并引擎設置為?
partial-update
?時,Paimon 就知道了它的合并策略。 正如文檔?/docs/content/primary-key-table/merge-engine/partial-update.md
?中描述的,對于相同主鍵的多條記錄,它會取每個字段最新的非空值,合并成一條完整的記錄。假設 Paimon 收到三條記錄:
<1, 23.0, 10, NULL>
<1, NULL, NULL, 'This is a book'>
<1, 25.2, NULL, NULL>
假設第一列是主鍵,最終合并的結果將是?
<1, 25.2, 10, 'This is a book'>
。 -
LSM-Tree 與順序讀合并:Paimon 的 Compaction 任務會定期將小的、分層的文件合并成更大的文件。這個合并過程是讀取多個有序的文件,然后進行多路歸并排序,這基本上是順序讀操作,效率遠高于隨機讀。
PartialUpdateMergeFunction
?這個類就是實現該合并邏輯的核心。 -
Paimon Compaction策略見:Paimon LSM Tree Compaction 策略
// ... existing code ... public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {// ... existing code ...private InternalRow currentKey;private long latestSequenceNumber;private GenericRow row;private KeyValue reused;private boolean currentDeleteRow;// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false; // ... existing code ...
-
專用 Compaction 作業:為了不影響數據寫入的實時性,最佳實踐是啟動一個獨立的、專用的 Compaction 作業。這樣,數據寫入和數據合并就可以完全解耦,互不干擾。
如文檔?
/docs/content/maintenance/dedicated-compaction.md
?所述,當有多個流式作業寫入一個?partial-update
?表時,推薦使用專用的 Compaction 作業。<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-{{< version >}}.jar \compact \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \...
總結:Paimon 的核心優勢
通過上述分析,我們可以清晰地看到 Paimon 在這個場景下的優勢:
- 性能革命:將 Flink State 的隨機讀瓶頸,轉變為 Paimon 的順序寫 + 后臺順序讀合并,大幅提升了整體吞吐量和性能。
- 架構簡化與成本降低:不再需要維護外部的 HBase/Pegasus 等 KV 系統,所有數據統一存儲在 Paimon 中,降低了系統復雜度和運維、存儲成本。
- 穩定性提升:Flink 作業本身變成了無狀態或輕狀態的寫入任務,徹底告別了 TB 級的 State,使得作業的穩定性和恢復速度大大增強。
- 開發簡化:原來需要手寫復雜?
DataStream
?API 和?Timer
?才能實現的邏輯,現在只需要兩個簡單的?INSERT INTO
?SQL 語句即可完成,開發效率和代碼可維護性顯著提高。
PartialUpdateMergeFunction
這是在 Paimon 中實現?partial-update
?(部分列更新) 合并引擎的核心類。它的主要職責是在 Compaction 過程中,將具有相同主鍵的多條記錄(KeyValue
)合并成最終的一條記錄。
PartialUpdateMergeFunction
?實現了?MergeFunction<KeyValue>
?接口。在 Paimon 的 LSM-Tree 存儲模型中,當執行 Compaction 操作需要合并多個數據文件時,Paimon 會讀取具有相同主鍵的一組?KeyValue
?數據,然后交由一個?MergeFunction
?實例來處理,計算出最終的結果。
PartialUpdateMergeFunction
?的合并邏輯是:對于相同主鍵的記錄,不斷地用新的非空字段值去覆蓋舊的字段值,最終得到一個“打寬”后的完整記錄。?它還支持更復雜的場景,如基于序列號的更新、字段聚合和多種刪除策略。
// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...
核心成員變量
這些變量定義了?PartialUpdateMergeFunction
?的狀態和配置,決定了其合并行為。
// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于從 InternalRow 中獲取字段值private final boolean ignoreDelete; // 是否忽略刪除記錄private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列號比較器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否啟用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 記錄時是否刪除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 記錄時,根據 sequence-group 刪除部分列private final boolean[] nullables; // 記錄每個字段是否可為 nullprivate InternalRow currentKey; // 當前處理的主鍵private long latestSequenceNumber; // 見過的最新序列號private GenericRow row; // 合并過程中的結果行private KeyValue reused; // 用于復用的 KeyValue 對象,避免重復創建private boolean currentDeleteRow; // 標記當前行最終是否應被刪除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到過 INSERT 類型的記錄// ... existing code ...
- 配置類變量?(
ignoreDelete
,?fieldSeqComparators
,?fieldAggregators
?等) 通常在?Factory
?中被初始化,它們在整個合并過程中保持不變。 - 狀態類變量?(
currentKey
,?row
,?latestSequenceNumber
?等) 會在每次?reset()
?時被重置,用于處理新的一組具有相同主鍵的記錄。
add(KeyValue kv)
?:合并邏輯的核心
這是最重要的方法,定義了單條?KeyValue
?是如何被合并到當前結果?row
?中的。
// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 刪除邏輯處理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...
它的邏輯可以分為兩大塊:
A. 處理?retract
?消息 (RowKind 為?DELETE
?或?UPDATE_BEFORE
)
partial-update
?默認不接受刪除記錄。如果收到了,行為由配置決定:
ignoreDelete = true
: 直接忽略這條刪除記錄,返回。removeRecordOnDelete = true
: 當收到?DELETE
?類型的記錄時,將?currentDeleteRow
?標記為?true
,并清空當前?row
。這意味著最終這條主鍵對應的記錄將被刪除。fieldSequenceEnabled = true
: 啟用了?sequence-group
。這是最復雜的邏輯,它會調用?retractWithSequenceGroup(kv)
。這個方法會根據序列號比較結果,來決定是否要“撤銷”某些字段的更新(通常是將其設置為?null
?或調用聚合器的?retract
?方法)。- 默認行為: 如果以上配置都沒有,則直接拋出?
IllegalArgumentException
?異常,提示用戶如何正確配置。
B. 處理?add
?消息 (RowKind 為?INSERT
?或?UPDATE_AFTER
)
這是主要的更新邏輯:
-
簡單更新 (
updateNonNullFields
): 如果沒有配置?sequence-group
?(fieldSeqComparators
?為空),則執行最簡單的部分列更新。遍歷新紀錄?kv
?的所有字段,只要字段值不為?null
,就用它來更新?row
?中對應位置的值。// ... existing code ... private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else { // ... existing code ...
-
帶序列號的更新 (
updateWithSequenceGroup
): 如果配置了?sequence-group
,邏輯會更復雜。對于每個字段:- 如果該字段不屬于任何?
sequence-group
,則行為和簡單更新類似(但會考慮聚合)。 - 如果該字段屬于某個?
sequence-group
,則會使用?FieldsComparator
?比較新記錄?kv
?和當前結果?row
?的序列號字段。只有當新記錄的序列號?大于或等于?當前結果的序列號時,才會用新記錄的字段值去更新?row
?中由該?sequence-group
?控制的所有字段。這保證了數據的更新順序。
- 如果該字段不屬于任何?
updateWithSequenceGroup
這個方法是?partial-update
?合并引擎處理帶有?sequence-group
?配置時的核心邏輯。當用戶在表屬性中定義了?fields.<seq_field>.sequence-group = <data_field1>,<data_field2>
?這樣的規則時,數據合并就不再是簡單的“非空值覆蓋”,而是需要根據?seq_field
?的值來判斷是否應該更新?data_field1
?和?data_field2
。這解決了多流更新時可能出現的數據亂序覆蓋問題。
updateWithSequenceGroup
?方法通過引入?FieldsComparator
,將簡單的字段更新升級為基于序列號的條件更新。它精確地控制了哪些字段在何時可以被更新,從而保證了在多流并發寫入場景下,即使數據存在一定程度的亂序,最終也能合并成正確的結果。這是 Paimon?partial-update
?模式能夠處理復雜更新場景的關鍵所在。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
- 輸入:?
KeyValue kv
,代表一條新到達的、具有相同主鍵的記錄。 - 目標: 遍歷這條新記錄?
kv
?的所有字段,并根據?sequence-group
?的規則,決定是否用?kv
?中的字段值來更新當前正在合并的結果行?this.row
。
該方法的核心是一個?for
?循環,它遍歷了表中的每一個字段。
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...
在循環內部,對每個字段的處理邏輯可以分為兩種情況:
- 該字段不屬于任何?
sequence-group
。 - 該字段屬于某個?
sequence-group
。
讓我們來詳細看這兩種情況。
1. 字段不屬于任何?sequence-group
// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
- 判斷條件:?
seqComparator == null
。fieldSeqComparators
?是一個?Map<Integer, FieldsComparator>
,如果在里面找不到當前字段索引?i
,就說明這個字段不受任何?sequence-group
?控制。 - 處理邏輯:
- 帶聚合函數: 如果為該字段配置了聚合函數(
aggregator != null
),例如?sum
、max
?等,則調用?aggregator.agg()
?方法,將當前累加值?accumulator
?和新值?field
?進行聚合,并將結果寫回?row
。 - 不帶聚合函數: 這是最簡單的情況。如果新來的字段值?
field
?不為?null
,就直接用它覆蓋?row
?中的舊值。這和?updateNonNullFields
?的行為是一致的。
- 帶聚合函數: 如果為該字段配置了聚合函數(
2. 字段屬于某個?sequence-group
這是該方法最核心和復雜的部分。
// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
- 判斷條件:?
seqComparator != null
。 - 處理邏輯:
- 空序列組檢查:?
isEmptySequenceGroup(kv, seqComparator)
?會檢查這條新紀錄?kv
?中,其對應的序列號字段是否都為?null
。如果是,意味著這條記錄無法判斷新舊,因此直接跳過,不進行任何更新。 - 序列號比較:?
seqComparator.compare(kv.value(), row) >= 0
?是關鍵。它會比較新記錄?kv
?和當前結果?row
?中,由?seqComparator
?定義的序列號字段。- 如果新記錄的序列號 >= 當前結果的序列號: 這意味著新記錄?
kv
?是“更新”的或者“同樣新”的,此時應該用?kv
?的值去更新?row
。- 更新序列號字段本身: 如果當前字段?
i
?就是序列號字段之一,那么需要把這個?sequence-group
?定義的所有序列號字段都一次性更新掉,然后用?continue
?跳出本次循環。這是為了保證序列號字段之間的一致性。 - 更新數據字段: 如果當前字段?
i
?是被序列號控制的數據字段,則執行更新。如果有聚合器,則調用?aggregator.agg()
;如果沒有,則直接用新值?field
?覆蓋。
- 更新序列號字段本身: 如果當前字段?
- 如果新記錄的序列號 < 當前結果的序列號: 這意味著?
kv
?是一條“舊”數據。在大部分情況下,這條舊數據會被忽略。但有一個例外:如果為該字段配置了支持亂序聚合的聚合器(例如?sum
),則會調用?aggregator.aggReversed()
。這個方法通常和?agg()
?的邏輯是一樣的,它允許舊數據也能被正確地聚合進來。對于不支持亂序的聚合器(如?max
),aggReversed
?可能就是一個空操作。
- 如果新記錄的序列號 >= 當前結果的序列號: 這意味著新記錄?
- 空序列組檢查:?
getResult()
?方法:產出最終結果
當處理完具有相同主鍵的所有?KeyValue
?后,調用此方法來獲取最終的合并結果。
// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...
它會根據?currentDeleteRow
?和?meetInsert
?標志位來決定最終的?RowKind
。如果?currentDeleteRow
?為?true
,或者整個合并過程從未見過?INSERT
?類型的記錄,那么最終結果就是一條?DELETE
?記錄。否則,就是一條?INSERT
?記錄。然后將主鍵、最新的序列號、最終的?RowKind
?和合并后的?row
?數據打包成一個?KeyValue
?返回。
Factory
?內部類:配置的入口
PartialUpdateMergeFunction.Factory
?是一個非常重要的內部類,它負責解析用戶在表上設置的?OPTIONS
,并據此創建出一個配置好的?PartialUpdateMergeFunction
?實例。
// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成員變量,用于存儲從 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列號字段和被控制的字段,構建 fieldSeqComparators ...}}// ... 解析聚合函數配置,構建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校驗,確保沖突的配置不會同時開啟 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...
在構造函數中,它會:
- 讀取?
ignore-delete
,?partial-update.remove-record-on-delete
?等簡單配置。 - 遍歷所有?
OPTIONS
,查找以?fields.
?開頭、以?.sequence-group
?結尾的配置項,例如?fields.order_time.sequence-group=order_id,price
。它會解析這些配置,構建出?fieldSeqComparators
?這個 Map,其中 key 是被控制字段的索引,value 是一個能夠比較?order_time
?字段的比較器。 - 調用?
createFieldAggregators
?方法,解析?fields.*.aggregate-function
?等配置,構建出?fieldAggregators
?這個 Map。 - 執行一系列?
Preconditions.checkState
,對用戶的配置進行合法性校驗,防止出現邏輯沖突。
總結
PartialUpdateMergeFunction
?是 Paimon 實現高性能數據打寬(部分列更新)能力的技術基石。它通過一個設計精巧的合并流程,將簡單的非空字段覆蓋、基于序列號的有序更新、字段聚合以及多種刪除策略融為一體。其?Factory
?類則充當了連接用戶配置和底層實現的橋梁。理解了這個類的工作原理,就能深刻地理解 Paimon?partial-update
?模式的強大之處。
雙流拼接 怎么處理schema
Paimon 允許在寫入數據時自動合并和演進表結構。這對于像雙流 Join 結果寫入等 schema 可能變化的場景至關重要。這個功能主要通過?write.merge-schema
?選項來開啟。
當將數據寫入 Paimon 表時:
- 如果?
write.merge-schema
?設置為?true
,Paimon 會比較寫入數據(Source)的 schema 和目標表(Sink)當前的 schema。 - 如果發現寫入數據中包含了表中不存在的新列,Paimon 會自動將這些新列添加到表結構中,生成一個新的、版本更高的 schema。
- 對于數據中缺失但在表 schema 中存在的列,Paimon 會自動填充?
null
?值。
這個過程是原子性的,并記錄在表的元數據中。Paimon 會為每一次 schema 變更創建一個新的版本化的 schema 文件。
代碼參考:
在 Spark 中,寫入邏輯由?WriteIntoPaimonTable.scala
?處理。可以看到,當?mergeSchema
?為?true
?時,它會調用?mergeAndCommitSchema
?來合并 schema,并處理列不匹配的情況。
WriteIntoPaimonTable.scala
// ... existing code ...override def run(sparkSession: SparkSession): Seq[Row] = {var data = _dataif (mergeSchema) {val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)mergeAndCommitSchema(dataSchema, allowExplicitCast)// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {val resolve = sparkSession.sessionState.conf.resolverval cols = newTableSchema.map {field =>dataSchema.find(f => resolve(f.name, field.name)) match {case Some(f) => col(f.name)case _ => lit(null).as(field.name)}}data = data.select(cols: _*)}}
// ... existing code ...
一個具體的測試用例也展示了這一點,一個原先只有?a
?和?b
?列的表,成功寫入了包含?c
?和?d
?列的新數據。
DataFrameWriteTest.scala
// ... existing code ...// Case 1: two additional fields: DoubleType and TimestampTypeval ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)).toDF("a", "b", "c", "d")df2.write.format("paimon").mode("append").option("write.merge-schema", "true").save(location)
// ... existing code ...
在 Flink 或 Spark 中進行雙流 Join 時,Paimon 通常作為 Sink 端。Join 操作本身由計算引擎完成。應用需要做的就是:
- 執行雙流 Join。
- 將 Join 后的?
DataStream
?或?DataFrame
?寫入 Paimon 表。 - 在寫入時,設置?
write.merge-schema
?為?true
。
這樣,無論 Join 結果的 schema 如何(比如因為上游流增加了字段導致 Join 結果也增加了字段),Paimon 表都可以自動適應,動態地添加新列。
SchemaMergingUtils
SchemaMergingUtils
?是 Paimon schema 演進(Schema Evolution)功能的核心工具類。它的主要職責是比較兩個 schema(通常是數據表的現有 schema 和新寫入數據的 schema),并根據預設的規則將它們合并成一個新的、統一的 schema。這個過程支持添加新列、安全地轉換現有列的數據類型,從而實現動態 schema 的能力。
當配置 Paimon 表允許 schema 合并(例如通過?write.merge-schema=true
)時,寫入流程就會調用這個工具類。它會:
- 比較字段:找出新舊 schema 中同名和新增的字段。
- 合并類型:對于同名字段,嘗試合并其數據類型(例如,
INT
?可以演進為?BIGINT
)。 - 添加字段:將新 schema 中獨有的字段添加到最終的 schema 中,并為其分配新的唯一 ID。
- 生成新版 Schema:如果發生了任何變更,它會創建一個版本號加一的新的?
TableSchema
?對象。
下面我們結合代碼,從頂層方法到底層實現,一步步進行分析。
mergeSchemas
這是最頂層的入口方法,用于合并一個完整的表 schema 和一個新的行類型(通常來自要寫入的數據)。
- 參數:
currentTableSchema
: Paimon 表當前的?TableSchema
?對象。它包含了字段、分區鍵、主鍵、表配置等所有元數據。targetType
: 目標?RowType
,即新數據的 schema。allowExplicitCast
: 一個布爾標志,決定是否允許顯式(可能存在精度損失)的類型轉換,比如?STRING
?轉?INT
。
- 邏輯:
- 首先,它會檢查?
targetType
?和?currentTableSchema
?的?RowType
?是否完全相同。如果相同,則無需合并,直接返回當前的?TableSchema
。 - 如果不同,它會初始化一個?
AtomicInteger
?類型的?highestFieldId
,記錄當前 schema 中所有字段(包括嵌套字段)的最大 ID。這個 ID 對于為新字段分配唯一標識至關重要。 - 調用重載的?
mergeSchemas
?方法(最終調用核心的?merge
?方法)來遞歸地合并兩個?RowType
。 - 如果合并后的?
newRowType
?與原始的?currentType
?相同(例如,只是可空性變化,而合并邏輯會保留原始的可空性),則也認為沒有發生實質性變化,返回原始的?TableSchema
。 - 如果 schema 確實發生了變化,它會創建一個新的?
TableSchema
?實例。這個新 schema 的 ID 會在舊 ID 的基礎上加 1,字段列表和?highestFieldId
?會更新,而分區鍵、主鍵、表配置和注釋等信息則會從舊 schema 中繼承。
- 首先,它會檢查?
// ... existing code ...public static TableSchema mergeSchemas(TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) {RowType currentType = currentTableSchema.logicalRowType();if (currentType.equals(targetType)) {return currentTableSchema;}AtomicInteger highestFieldId = new AtomicInteger(currentTableSchema.highestFieldId());RowType newRowType =mergeSchemas(currentType, targetType, highestFieldId, allowExplicitCast);if (newRowType.equals(currentType)) {// It happens if the `targetType` only changes `nullability` but we always respect the// current's.return currentTableSchema;}return new TableSchema(currentTableSchema.id() + 1,newRowType.getFields(),highestFieldId.get(),currentTableSchema.partitionKeys(),currentTableSchema.primaryKeys(),currentTableSchema.options(),currentTableSchema.comment());}
// ... existing code ...
merge
這是所有合并邏輯的核心。它被遞歸調用以處理各種數據類型。
可空性處理 (Nullability Handling)
在方法的一開始,它將?base0
?和?update0
?的可空性都設置為?true
?來進行比較。最終返回的類型的可空性將以?base0
(原始表 schema 中的類型)為準。這意味著 schema 合并不會改變現有列的可空性。
// ... existing code ...public static DataType merge(DataType base0,DataType update0,AtomicInteger highestFieldId,boolean allowExplicitCast) {// Here we try to merge the base0 and update0 without regard to the nullability,// and set the base0's nullability to the return's.DataType base = base0.copy(true);DataType update = update0.copy(true);if (base.equals(update)) {return base0;} else if (base instanceof RowType && update instanceof RowType) {
// ... existing code ...
遞歸合并復雜類型
RowType
?(行類型): 這是最復雜的部分。- 合并現有字段: 遍歷?
base
?(舊 schema) 的所有字段。對于每個字段,檢查?update
?(新 schema) 中是否存在同名字段。如果存在,就遞歸調用?merge
?方法來合并這兩個字段的類型。如果不存在,則保留?base
?中的原始字段。 - 添加新字段: 遍歷?
update
?的所有字段,找出在?base
?中不存在的字段。這些就是需要新增的列。對于每個新字段,調用?assignIdForNewField
?為其分配一個新的、唯一的字段 ID,然后將其添加到最終的字段列表中。 - 最后,用更新后的字段列表創建一個新的?
RowType
。
- 合并現有字段: 遍歷?
// ... existing code ...} else if (base instanceof RowType && update instanceof RowType) {List<DataField> baseFields = ((RowType) base).getFields();List<DataField> updateFields = ((RowType) update).getFields();Map<String, DataField> updateFieldMap =updateFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> updatedFields =baseFields.stream().map(baseField -> {if (updateFieldMap.containsKey(baseField.name())) {DataField updateField =updateFieldMap.get(baseField.name());DataType updatedDataType =merge(baseField.type(),updateField.type(),highestFieldId,allowExplicitCast);return new DataField(baseField.id(),baseField.name(),updatedDataType,baseField.description());} else {return baseField;}}).collect(Collectors.toList());Map<String, DataField> baseFieldMap =baseFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> newFields =updateFields.stream().filter(field -> !baseFieldMap.containsKey(field.name())).map(field -> assignIdForNewField(field, highestFieldId)).map(field -> field.copy(true)).collect(Collectors.toList());updatedFields.addAll(newFields);return new RowType(base0.isNullable(), updatedFields);} else if (base instanceof MapType && update instanceof MapType) {
// ... existing code ...
MapType
,?ArrayType
,?MultisetType
: 對于這些集合類型,合并邏輯很簡單:遞歸地調用?merge
?方法來合并它們的內部元素類型(MapType
?的鍵和值類型,ArrayType
?和?MultisetType
?的元素類型)。
合并基礎類型
-
DecimalType
: 這是一個特例。只有當兩個?DecimalType
?的?scale
?(小數位數) 相同時,才能合并。合并后的?precision
?(總位數) 取兩者中的最大值。如果?scale
?不同,會直接拋出?UnsupportedOperationException
。 -
其他可轉換類型: 對于其他基礎類型,通過?
supportsDataTypesCast
?方法判斷是否可以轉換。- 隱式轉換 (Implicit Cast): 當?
allowExplicitCast
?為?false
?時,只允許安全的類型提升,例如?INT
?->?BIGINT
,FLOAT
?->?DOUBLE
。 - 顯式轉換 (Explicit Cast): 當?
allowExplicitCast
?為?true
?時,允許更多可能損失精度的轉換。 - 對于帶有長度(如?
VARCHAR
)或精度(如?TIMESTAMP
)的類型,通常要求新類型的長度/精度不能小于舊類型,除非開啟了顯式轉換。 - 如果可以轉換,則直接采用?
update
?的類型,但保留?base0
?的可空性。
- 隱式轉換 (Implicit Cast): 當?
// ... existing code ...} else if (supportsDataTypesCast(base, update, allowExplicitCast)) {if (DataTypes.getLength(base).isPresent() && DataTypes.getLength(update).isPresent()) {// this will check and merge types which has a `length` attribute, like BinaryType,// CharType, VarBinaryType, VarCharType.if (allowExplicitCast|| DataTypes.getLength(base).getAsInt()<= DataTypes.getLength(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a smaller length: %s and %s",base, update));}} else if (DataTypes.getPrecision(base).isPresent()&& DataTypes.getPrecision(update).isPresent()) {// this will check and merge types which has a `precision` attribute, like// LocalZonedTimestampType, TimeType, TimestampType.if (allowExplicitCast|| DataTypes.getPrecision(base).getAsInt()<= DataTypes.getPrecision(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a lower precision: %s and %s",base, update));}} else {return update.copy(base0.isNullable());}} else {throw new UnsupportedOperationException(String.format("Failed to merge data types %s and %s", base, update));}}
// ... existing code ...
assignIdForNewField
這個方法非常重要。當向?RowType
?中添加一個新字段時,它負責為這個新字段及其所有嵌套字段(如果是復雜類型)分配唯一的 ID。它通過傳入的?AtomicInteger highestFieldId
?來實現 ID 的原子性遞增,確保了在并發場景下 ID 的唯一性,這對于 Paimon 正確地按 ID 映射和讀取列數據至關重要。
// ... existing code ...private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) {DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId);return new DataField(highestFieldId.incrementAndGet(), field.name(), dataType, field.description());}
}
總結
SchemaMergingUtils
?通過一套定義明確且可遞歸的規則,實現了 Paimon 強大而靈活的 Schema 演進能力。它能夠智能地處理字段的增加和類型變化,同時通過嚴格的 ID 分配和管理,保證了數據讀寫的正確性。這個類是 Paimon 能夠適應動態數據源、支持平滑表結構變更的關鍵所在。
SchemaManager
SchemaManager
?是 Paimon 中負責管理表 schema(模式)的核心組件。它處理所有與 schema 相關的持久化操作,包括創建、讀取、更新和版本管理。可以把它看作是 Paimon 表 schema 在文件系統中的“數據庫管理員”。
SchemaManager
?的主要職責可以歸納為以下幾點:
- Schema 持久化:將?
TableSchema
?對象序列化為 JSON 文件,并存儲在表的?schema
?目錄下。每個 schema 文件代表一個版本。 - 版本管理:每個 schema 文件名都以?
schema-
?開頭,后跟一個從 0 開始遞增的版本號(ID),例如?schema-0
,?schema-1
?等。這使得 Paimon 可以追蹤 schema 的所有歷史變更。 - Schema 讀取:提供方法來讀取最新版本的 schema、特定版本的 schema 或所有版本的 schema。
- Schema 創建:在創建新表時,負責初始化并提交第一個 schema 版本(
schema-0
)。 - Schema 變更:通過應用一系列?
SchemaChange
(如添加列、刪除列、修改表選項等)來原子性地更新 schema,并生成一個新的、版本號加一的 schema 文件。 - 多分支支持:能夠為不同的數據分支(branch)管理各自獨立的 schema 演進路徑。
結構和關鍵屬性
// ... existing code ...
@ThreadSafe
public class SchemaManager implements Serializable {private static final String SCHEMA_PREFIX = "schema-";private final FileIO fileIO;private final Path tableRoot;private final String branch;public SchemaManager(FileIO fileIO, Path tableRoot) {
// ... existing code ...
@ThreadSafe
: 這個注解表明該類的設計是線程安全的,允許多個線程同時訪問一個?SchemaManager
?實例。SCHEMA_PREFIX
: 常量?"schema-"
,定義了 schema 文件名的前綴。fileIO
:?FileIO
?接口的實例,用于與底層文件系統(如 HDFS, S3, 本地文件系統)進行交互。tableRoot
:?Path
?對象,指向表的根目錄。SchemaManager
?會在這個目錄下的?schema
?子目錄中工作。branch
: 字符串,表示當前?SchemaManager
?實例操作的數據分支名稱。Paimon 支持類似 Git 的分支功能,main
?是默認的主分支。不同的分支可以有獨立的快照和 schema 演進。
構造函數和分支管理
// ... existing code ...public SchemaManager(FileIO fileIO, Path tableRoot) {this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);}/** Specify the default branch for data writing. */public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {this.fileIO = fileIO;this.tableRoot = tableRoot;this.branch = BranchManager.normalizeBranch(branch);}public SchemaManager copyWithBranch(String branchName) {return new SchemaManager(fileIO, tableRoot, branchName);}
// ... existing code ...
- 構造函數初始化了?
fileIO
、tableRoot
?和?branch
。默認使用主分支?DEFAULT_MAIN_BRANCH
。 copyWithBranch(String branchName)
: 這是一個工廠方法,用于創建一個新的?SchemaManager
?實例來操作指定的分支。這體現了 Paimon 對多分支的支持。
Schema 讀取方法
// ... existing code ...public Optional<TableSchema> latest() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).reduce(Math::max).map(this::schema);} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...public List<TableSchema> listAll() {return listAllIds().stream().map(this::schema).collect(Collectors.toList());}public List<Long> listAllIds() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).collect(Collectors.toList());} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...
latest()
: 獲取最新版本的?TableSchema
。它通過?listVersionedFiles
?工具方法列出?schema
?目錄下所有符合?schema-*
?格式的文件,提取出版本號,找到最大的版本號,然后調用?schema(long id)
?方法讀取并反序列化對應的 schema 文件。listAll()
: 獲取所有版本的?TableSchema
?列表。listAllIds()
: 僅獲取所有 schema 版本的 ID 列表。schema(long id)
?(未在片段中完全展示,但被?latest()
?調用): 這是一個內部方法,根據給定的 ID 構建 schema 文件路徑(如?.../schema/schema-5
),然后使用?fileIO
?讀取文件內容,并通過?TableSchema.fromJSON(String json)
?將其反序列化為?TableSchema
?對象。
表創建?createTable(...)
// ... existing code ...public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {while (true) {Optional<TableSchema> latest = latest();if (latest.isPresent()) {TableSchema latestSchema = latest.get();if (externalTable) {checkSchemaForExternalTable(latestSchema.toSchema(), schema);return latestSchema;} else {throw new IllegalStateException("Schema in filesystem exists, creation is not allowed.");}}TableSchema newSchema = TableSchema.create(0, schema);// validate table from creating tableFileStoreTableFactory.create(fileIO, tableRoot, newSchema).store();boolean success = commit(newSchema);if (success) {return newSchema;}}}
// ... existing code ...
- 這是一個原子性操作,通過?
while(true)
?循環和文件系統的原子性創建來保證。 - 檢查存在性: 首先調用?
latest()
?檢查是否已有 schema 文件存在。如果存在且不是創建外部表,則拋出異常,防止覆蓋現有表。 - 創建新 Schema: 如果不存在,則使用?
TableSchema.create(0, schema)
?創建一個 ID 為 0 的新?TableSchema
。 - 驗證: 調用?
FileStoreTableFactory.create(...)
?來驗證 schema 的有效性(例如,檢查主鍵、分區鍵等配置是否合法)。 - 提交: 調用?
commit(newSchema)
?方法,該方法會嘗試原子性地創建?schema-0
?文件。如果創建成功,循環結束并返回新的?TableSchema
。如果因為并發沖突導致創建失敗,循環會繼續,重新嘗試整個過程。
Schema 變更?commitChanges(...)
這是執行?ALTER TABLE
?操作的核心邏輯。
// ... existing code ...public TableSchema commitChanges(List<SchemaChange> changes)throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,Catalog.ColumnNotExistException {SnapshotManager snapshotManager =new SnapshotManager(fileIO, tableRoot, branch, null, null);LazyField<Boolean> hasSnapshots =new LazyField<>(() -> snapshotManager.latestSnapshot() != null);while (true) {TableSchema oldTableSchema =latest().orElseThrow(() ->new Catalog.TableNotExistException(identifierFromPath(tableRoot.toString(), true, branch)));TableSchema newTableSchema = generateTableSchema(oldTableSchema, changes, hasSnapshots);try {boolean success = commit(newTableSchema);if (success) {return newTableSchema;}} catch (Exception e) {throw new RuntimeException(e);}}}public boolean commit(TableSchema newSchema) throws Exception {SchemaValidation.validateTableSchema(newSchema);SchemaValidation.validateFallbackBranch(this, newSchema);Path schemaPath = toSchemaPath(newSchema.id());return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString());}
// ... existing code ...
- 同樣使用?
while(true)
?循環來保證原子性。 - 獲取舊 Schema: 首先獲取當前的最新 schema (
oldTableSchema
)。 - 生成新 Schema: 調用?
generateTableSchema
?方法,該方法是變更邏輯的核心。它接收舊 schema 和一個?SchemaChange
?列表,然后逐個應用這些變更(如?AddColumn
,?DropColumn
,?SetOption
?等),生成一個新的?TableSchema
?對象。這個新對象的 ID 是舊 ID 加 1。 - 提交新 Schema: 調用?
commit(newTableSchema)
?嘗試原子性地創建新的 schema 文件(如?schema-5
?->?schema-6
)。如果成功,則返回新 schema。如果失敗,則重試。
generateTableSchema(...)
這個方法是應用?SchemaChange
?的具體實現。它像一個狀態機,基于?oldTableSchema
,根據?changes
?列表中的每個變更項,逐步構建出?newTableSchema
?的各個部分。
// ... existing code ...public TableSchema generateTableSchema(TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());String newComment = oldTableSchema.comment();for (SchemaChange change : changes) {if (change instanceof SetOption) {
// ... existing code ...} else if (change instanceof RemoveOption) {
// ... existing code ...} else if (change instanceof AddColumn) {
// ... existing code ...} else if (change instanceof RenameColumn) {
// ... existing code ...} else if (change instanceof DropColumn) {
// ... existing code ...} else if (change instanceof UpdateColumnType) {
// ... existing code ...} else if (change instanceof UpdateColumnNullability) {
// ... existing code ...} else if (change instanceof UpdateColumnPosition) {
// ... existing code ...} else if (change instanceof UpdateColumnComment) {
// ... existing code ...}}
// ... existing code ...
它通過?instanceof
?判斷?SchemaChange
?的具體類型,并執行相應的邏輯:
SetOption
/RemoveOption
: 修改?newOptions
?這個 Map。AddColumn
: 向?newFields
?列表中添加新字段,并使用?highestFieldId
?分配新 ID。RenameColumn
: 修改?newFields
?中某個字段的名稱。DropColumn
: 從?newFields
?中移除字段。UpdateColumnType
/UpdateColumnNullability
: 更新字段的類型或可空性。- ...等等。
總結
SchemaManager
?是 Paimon 表結構管理的基石。它通過將 schema 版本化并持久化到文件系統中,實現了 schema 的可靠追蹤和演進。其原子性的提交操作(無論是創建還是變更)確保了在并發環境下的元數據一致性。它與?SchemaMergingUtils
(負責邏輯合并)和?SchemaChange
(負責定義變更操作)等類緊密協作,共同構成了 Paimon 強大而靈活的 Schema Evolution 機制。