雙流join 、 Paimon Partial Update 和 動態schema

背景

Paimon 通過其獨特的?partial-update?合并引擎和底層的 LSM 存儲結構,巧妙地將傳統雙流 Join 中對 Flink State 的高頻隨機讀/寫,轉換為了對 Paimon 表的順序寫和后臺的高效合并,從而一站式地解決了 Flink 作業狀態過大、依賴外部 KV 系統等一系列痛點。

傳統方案中,Flink 作業需要維護一個巨大的 State(可能達到 TB 級)來存儲其中一個流的數據。當另一個流的數據到達時,需要去這個巨大的 State 中查找(Join)對應的記錄。這個“查找”操作,在數據量巨大、內存無法完全容納時,就會頻繁觸發對磁盤的隨機讀。機械硬盤和固態硬盤的隨機讀性能遠低于順序讀,這成為了整個作業的性能瓶瓶頸,并導致了高昂的資源開銷和不穩定性。

使用 Paimon 的?partial-update?模式后,整個數據處理的范式發生了改變:

  • 不再需要 Flink State 來做 Join:兩個數據流不再需要在 Flink 算子內部進行 Join。它們各自獨立地、源源不斷地將自己的數據寫入(INSERT INTO)到同一個 Paimon 表中。
  • 寫入是高效的順序操作:Paimon 底層采用 LSM-Tree 結構。新寫入的數據會先進入內存緩沖區,然后刷寫成新的、有序的小文件。這個過程主要是順序寫,效率非常高。

這樣一來,原來 Flink 作業中最消耗性能的“狀態查找”(隨機讀)環節,被徹底消除了。

現在,兩個流的數據都以部分列的形式寫入了 Paimon 表。那么,數據是在哪里“打寬”合并的呢?答案是在 Paimon 的Compaction過程中。

  • Partial-Update 合并引擎:當將表的合并引擎設置為?partial-update?時,Paimon 就知道了它的合并策略。 正如文檔?/docs/content/primary-key-table/merge-engine/partial-update.md?中描述的,對于相同主鍵的多條記錄,它會取每個字段最新的非空值,合并成一條完整的記錄。

    假設 Paimon 收到三條記錄:

    • <1, 23.0, 10, NULL>
    • <1, NULL, NULL, 'This is a book'>
    • <1, 25.2, NULL, NULL>

    假設第一列是主鍵,最終合并的結果將是?<1, 25.2, 10, 'This is a book'>

  • LSM-Tree 與順序讀合并:Paimon 的 Compaction 任務會定期將小的、分層的文件合并成更大的文件。這個合并過程是讀取多個有序的文件,然后進行多路歸并排序,這基本上是順序讀操作,效率遠高于隨機讀。PartialUpdateMergeFunction?這個類就是實現該合并邏輯的核心。

  • Paimon Compaction策略見:Paimon LSM Tree Compaction 策略

    // ... existing code ...
    public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {// ... existing code ...private InternalRow currentKey;private long latestSequenceNumber;private GenericRow row;private KeyValue reused;private boolean currentDeleteRow;// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;
    // ... existing code ...
    
  • 專用 Compaction 作業:為了不影響數據寫入的實時性,最佳實踐是啟動一個獨立的、專用的 Compaction 作業。這樣,數據寫入和數據合并就可以完全解耦,互不干擾。

    如文檔?/docs/content/maintenance/dedicated-compaction.md?所述,當有多個流式作業寫入一個?partial-update?表時,推薦使用專用的 Compaction 作業。

    <FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-{{< version >}}.jar \compact \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \...
    

總結:Paimon 的核心優勢

通過上述分析,我們可以清晰地看到 Paimon 在這個場景下的優勢:

  1. 性能革命:將 Flink State 的隨機讀瓶頸,轉變為 Paimon 的順序寫 + 后臺順序讀合并,大幅提升了整體吞吐量和性能。
  2. 架構簡化與成本降低:不再需要維護外部的 HBase/Pegasus 等 KV 系統,所有數據統一存儲在 Paimon 中,降低了系統復雜度和運維、存儲成本。
  3. 穩定性提升:Flink 作業本身變成了無狀態或輕狀態的寫入任務,徹底告別了 TB 級的 State,使得作業的穩定性和恢復速度大大增強。
  4. 開發簡化:原來需要手寫復雜?DataStream?API 和?Timer?才能實現的邏輯,現在只需要兩個簡單的?INSERT INTO?SQL 語句即可完成,開發效率和代碼可維護性顯著提高。

PartialUpdateMergeFunction

這是在 Paimon 中實現?partial-update?(部分列更新) 合并引擎的核心類。它的主要職責是在 Compaction 過程中,將具有相同主鍵的多條記錄(KeyValue)合并成最終的一條記錄。

PartialUpdateMergeFunction?實現了?MergeFunction<KeyValue>?接口。在 Paimon 的 LSM-Tree 存儲模型中,當執行 Compaction 操作需要合并多個數據文件時,Paimon 會讀取具有相同主鍵的一組?KeyValue?數據,然后交由一個?MergeFunction?實例來處理,計算出最終的結果。

PartialUpdateMergeFunction?的合并邏輯是:對于相同主鍵的記錄,不斷地用新的非空字段值去覆蓋舊的字段值,最終得到一個“打寬”后的完整記錄。?它還支持更復雜的場景,如基于序列號的更新、字段聚合和多種刪除策略。

// ... existing code ...
import org.apache.paimon.mergetree.compact.MergeFunction;
// ... existing code ...
/*** A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update* non-null fields on merge.*/
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
// ... existing code ...

核心成員變量

這些變量定義了?PartialUpdateMergeFunction?的狀態和配置,決定了其合并行為。

// ... existing code ...
public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {public static final String SEQUENCE_GROUP = "sequence-group";private final InternalRow.FieldGetter[] getters; // 用于從 InternalRow 中獲取字段值private final boolean ignoreDelete; // 是否忽略刪除記錄private final Map<Integer, FieldsComparator> fieldSeqComparators; // 字段序列號比較器,用于 sequence-groupprivate final boolean fieldSequenceEnabled; // 是否啟用了 sequence-groupprivate final Map<Integer, FieldAggregator> fieldAggregators; // 字段聚合器private final boolean removeRecordOnDelete; // 收到 DELETE 記錄時是否刪除整行private final Set<Integer> sequenceGroupPartialDelete; // 收到 DELETE 記錄時,根據 sequence-group 刪除部分列private final boolean[] nullables; // 記錄每個字段是否可為 nullprivate InternalRow currentKey; // 當前處理的主鍵private long latestSequenceNumber; // 見過的最新序列號private GenericRow row; // 合并過程中的結果行private KeyValue reused; // 用于復用的 KeyValue 對象,避免重復創建private boolean currentDeleteRow; // 標記當前行最終是否應被刪除private boolean notNullColumnFilled;/*** If the first value is retract, and no insert record is received, the row kind should be* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no* RowKind.INSERT value is received)*/private boolean meetInsert; // 是否遇到過 INSERT 類型的記錄// ... existing code ...
  • 配置類變量?(ignoreDelete,?fieldSeqComparators,?fieldAggregators?等) 通常在?Factory?中被初始化,它們在整個合并過程中保持不變。
  • 狀態類變量?(currentKey,?row,?latestSequenceNumber?等) 會在每次?reset()?時被重置,用于處理新的一組具有相同主鍵的記錄。

add(KeyValue kv)?:合并邏輯的核心

這是最重要的方法,定義了單條?KeyValue?是如何被合并到當前結果?row?中的。

// ... existing code ...@Overridepublic void add(KeyValue kv) {// refresh key object to avoid reference overwrittencurrentKey = kv.key();currentDeleteRow = false;if (kv.valueKind().isRetract()) {if (!notNullColumnFilled) {initRow(row, kv.value());notNullColumnFilled = true;}// ... 刪除邏輯處理 ...// ... existing code ...String msg =String.join("\n","By default, Partial update can not accept delete records,"+ " you can choose one of the following solutions:","1. Configure 'ignore-delete' to ignore delete records.","2. Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.","3. Configure 'sequence-group's to retract partial columns.");throw new IllegalArgumentException(msg);}latestSequenceNumber = kv.sequenceNumber();if (fieldSeqComparators.isEmpty()) {updateNonNullFields(kv);} else {updateWithSequenceGroup(kv);}meetInsert = true;notNullColumnFilled = true;}
// ... existing code ...

它的邏輯可以分為兩大塊:

A. 處理?retract?消息 (RowKind 為?DELETE?或?UPDATE_BEFORE)

partial-update?默認不接受刪除記錄。如果收到了,行為由配置決定:

  1. ignoreDelete = true: 直接忽略這條刪除記錄,返回。
  2. removeRecordOnDelete = true: 當收到?DELETE?類型的記錄時,將?currentDeleteRow?標記為?true,并清空當前?row。這意味著最終這條主鍵對應的記錄將被刪除。
  3. fieldSequenceEnabled = true: 啟用了?sequence-group。這是最復雜的邏輯,它會調用?retractWithSequenceGroup(kv)。這個方法會根據序列號比較結果,來決定是否要“撤銷”某些字段的更新(通常是將其設置為?null?或調用聚合器的?retract?方法)。
  4. 默認行為: 如果以上配置都沒有,則直接拋出?IllegalArgumentException?異常,提示用戶如何正確配置。

B. 處理?add?消息 (RowKind 為?INSERT?或?UPDATE_AFTER)

這是主要的更新邏輯:

  1. 簡單更新 (updateNonNullFields): 如果沒有配置?sequence-group?(fieldSeqComparators?為空),則執行最簡單的部分列更新。遍歷新紀錄?kv?的所有字段,只要字段值不為?null,就用它來更新?row?中對應位置的值。

    // ... existing code ...
    private void updateNonNullFields(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());if (field != null) {row.setField(i, field);} else {
    // ... existing code ...
    
  2. 帶序列號的更新 (updateWithSequenceGroup): 如果配置了?sequence-group,邏輯會更復雜。對于每個字段:

    • 如果該字段不屬于任何?sequence-group,則行為和簡單更新類似(但會考慮聚合)。
    • 如果該字段屬于某個?sequence-group,則會使用?FieldsComparator?比較新記錄?kv?和當前結果?row?的序列號字段。只有當新記錄的序列號?大于或等于?當前結果的序列號時,才會用新記錄的字段值去更新?row?中由該?sequence-group?控制的所有字段。這保證了數據的更新順序。

updateWithSequenceGroup

這個方法是?partial-update?合并引擎處理帶有?sequence-group?配置時的核心邏輯。當用戶在表屬性中定義了?fields.<seq_field>.sequence-group = <data_field1>,<data_field2>?這樣的規則時,數據合并就不再是簡單的“非空值覆蓋”,而是需要根據?seq_field?的值來判斷是否應該更新?data_field1?和?data_field2。這解決了多流更新時可能出現的數據亂序覆蓋問題。

updateWithSequenceGroup?方法通過引入?FieldsComparator,將簡單的字段更新升級為基于序列號的條件更新。它精確地控制了哪些字段在何時可以被更新,從而保證了在多流并發寫入場景下,即使數據存在一定程度的亂序,最終也能合并成正確的結果。這是 Paimon?partial-update?模式能夠處理復雜更新場景的關鍵所在。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {
// ... existing code ...
  • 輸入:?KeyValue kv,代表一條新到達的、具有相同主鍵的記錄。
  • 目標: 遍歷這條新記錄?kv?的所有字段,并根據?sequence-group?的規則,決定是否用?kv?中的字段值來更新當前正在合并的結果行?this.row

該方法的核心是一個?for?循環,它遍歷了表中的每一個字段。

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {
// ... existing code ...

在循環內部,對每個字段的處理邏輯可以分為兩種情況:

  1. 該字段不屬于任何?sequence-group
  2. 該字段屬于某個?sequence-group

讓我們來詳細看這兩種情況。

1. 字段不屬于任何?sequence-group

// ... existing code ...private void updateWithSequenceGroup(KeyValue kv) {for (int i = 0; i < getters.length; i++) {Object field = getters[i].getFieldOrNull(kv.value());FieldsComparator seqComparator = fieldSeqComparators.get(i);FieldAggregator aggregator = fieldAggregators.get(i);Object accumulator = getters[i].getFieldOrNull(row);if (seqComparator == null) {if (aggregator != null) {row.setField(i, aggregator.agg(accumulator, field));} else if (field != null) {row.setField(i, field);}} else {
// ... existing code ...
  • 判斷條件:?seqComparator == nullfieldSeqComparators?是一個?Map<Integer, FieldsComparator>,如果在里面找不到當前字段索引?i,就說明這個字段不受任何?sequence-group?控制。
  • 處理邏輯:
    • 帶聚合函數: 如果為該字段配置了聚合函數(aggregator != null),例如?summax?等,則調用?aggregator.agg()?方法,將當前累加值?accumulator?和新值?field?進行聚合,并將結果寫回?row
    • 不帶聚合函數: 這是最簡單的情況。如果新來的字段值?field?不為?null,就直接用它覆蓋?row?中的舊值。這和?updateNonNullFields?的行為是一致的。

2. 字段屬于某個?sequence-group

這是該方法最核心和復雜的部分。

// ... existing code ...} else {if (isEmptySequenceGroup(kv, seqComparator)) {// skip null sequence groupcontinue;}if (seqComparator.compare(kv.value(), row) >= 0) {int index = i;// Multiple sequence fields should be updated at once.if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {for (int fieldIndex : seqComparator.compareFields()) {row.setField(fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));}continue;}row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));} else if (aggregator != null) {row.setField(i, aggregator.aggReversed(accumulator, field));}}}}
// ... existing code ...
  • 判斷條件:?seqComparator != null
  • 處理邏輯:
    1. 空序列組檢查:?isEmptySequenceGroup(kv, seqComparator)?會檢查這條新紀錄?kv?中,其對應的序列號字段是否都為?null。如果是,意味著這條記錄無法判斷新舊,因此直接跳過,不進行任何更新。
    2. 序列號比較:?seqComparator.compare(kv.value(), row) >= 0?是關鍵。它會比較新記錄?kv?和當前結果?row?中,由?seqComparator?定義的序列號字段。
      • 如果新記錄的序列號 >= 當前結果的序列號: 這意味著新記錄?kv?是“更新”的或者“同樣新”的,此時應該用?kv?的值去更新?row
        • 更新序列號字段本身: 如果當前字段?i?就是序列號字段之一,那么需要把這個?sequence-group?定義的所有序列號字段都一次性更新掉,然后用?continue?跳出本次循環。這是為了保證序列號字段之間的一致性。
        • 更新數據字段: 如果當前字段?i?是被序列號控制的數據字段,則執行更新。如果有聚合器,則調用?aggregator.agg();如果沒有,則直接用新值?field?覆蓋。
      • 如果新記錄的序列號 < 當前結果的序列號: 這意味著?kv?是一條“舊”數據。在大部分情況下,這條舊數據會被忽略。但有一個例外:如果為該字段配置了支持亂序聚合的聚合器(例如?sum),則會調用?aggregator.aggReversed()。這個方法通常和?agg()?的邏輯是一樣的,它允許舊數據也能被正確地聚合進來。對于不支持亂序的聚合器(如?max),aggReversed?可能就是一個空操作。

getResult()?方法:產出最終結果

當處理完具有相同主鍵的所有?KeyValue?后,調用此方法來獲取最終的合并結果。

// ... existing code ...@Overridepublic KeyValue getResult() {if (reused == null) {reused = new KeyValue();}RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;return reused.replace(currentKey, latestSequenceNumber, rowKind, row);}
// ... existing code ...

它會根據?currentDeleteRow?和?meetInsert?標志位來決定最終的?RowKind。如果?currentDeleteRow?為?true,或者整個合并過程從未見過?INSERT?類型的記錄,那么最終結果就是一條?DELETE?記錄。否則,就是一條?INSERT?記錄。然后將主鍵、最新的序列號、最終的?RowKind?和合并后的?row?數據打包成一個?KeyValue?返回。

Factory?內部類:配置的入口

PartialUpdateMergeFunction.Factory?是一個非常重要的內部類,它負責解析用戶在表上設置的?OPTIONS,并據此創建出一個配置好的?PartialUpdateMergeFunction?實例。

// ... existing code ...public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {return new Factory(options, rowType, primaryKeys);}private static class Factory implements MergeFunctionFactory<KeyValue> {// ... 成員變量,用于存儲從 Options 解析出的配置 ...private Factory(Options options, RowType rowType, List<String> primaryKeys) {this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);// ... existing code ...this.removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);// ... 解析 sequence-group 配置 ...for (Map.Entry<String, String> entry : options.toMap().entrySet()) {String k = entry.getKey();String v = entry.getValue();if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {// ... 解析出序列號字段和被控制的字段,構建 fieldSeqComparators ...}}// ... 解析聚合函數配置,構建 fieldAggregators ...this.fieldAggregators =createFieldAggregators(rowType, primaryKeys, allSequenceFields, new CoreOptions(options));// ... 配置校驗,確保沖突的配置不會同時開啟 ...Preconditions.checkState(!(removeRecordOnDelete && ignoreDelete),// ...);// ...}
// ... existing code ...

在構造函數中,它會:

  1. 讀取?ignore-delete,?partial-update.remove-record-on-delete?等簡單配置。
  2. 遍歷所有?OPTIONS,查找以?fields.?開頭、以?.sequence-group?結尾的配置項,例如?fields.order_time.sequence-group=order_id,price。它會解析這些配置,構建出?fieldSeqComparators?這個 Map,其中 key 是被控制字段的索引,value 是一個能夠比較?order_time?字段的比較器。
  3. 調用?createFieldAggregators?方法,解析?fields.*.aggregate-function?等配置,構建出?fieldAggregators?這個 Map。
  4. 執行一系列?Preconditions.checkState,對用戶的配置進行合法性校驗,防止出現邏輯沖突。

總結

PartialUpdateMergeFunction?是 Paimon 實現高性能數據打寬(部分列更新)能力的技術基石。它通過一個設計精巧的合并流程,將簡單的非空字段覆蓋、基于序列號的有序更新、字段聚合以及多種刪除策略融為一體。其?Factory?類則充當了連接用戶配置和底層實現的橋梁。理解了這個類的工作原理,就能深刻地理解 Paimon?partial-update?模式的強大之處。

雙流拼接 怎么處理schema

Paimon 允許在寫入數據時自動合并和演進表結構。這對于像雙流 Join 結果寫入等 schema 可能變化的場景至關重要。這個功能主要通過?write.merge-schema?選項來開啟。

當將數據寫入 Paimon 表時:

  1. 如果?write.merge-schema?設置為?true,Paimon 會比較寫入數據(Source)的 schema 和目標表(Sink)當前的 schema。
  2. 如果發現寫入數據中包含了表中不存在的新列,Paimon 會自動將這些新列添加到表結構中,生成一個新的、版本更高的 schema。
  3. 對于數據中缺失但在表 schema 中存在的列,Paimon 會自動填充?null?值。

這個過程是原子性的,并記錄在表的元數據中。Paimon 會為每一次 schema 變更創建一個新的版本化的 schema 文件。

代碼參考:

在 Spark 中,寫入邏輯由?WriteIntoPaimonTable.scala?處理。可以看到,當?mergeSchema?為?true?時,它會調用?mergeAndCommitSchema?來合并 schema,并處理列不匹配的情況。

WriteIntoPaimonTable.scala

// ... existing code ...override def run(sparkSession: SparkSession): Seq[Row] = {var data = _dataif (mergeSchema) {val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)mergeAndCommitSchema(dataSchema, allowExplicitCast)// For case that some columns is absent in data, we still allow to write once write.merge-schema is true.val newTableSchema = SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {val resolve = sparkSession.sessionState.conf.resolverval cols = newTableSchema.map {field =>dataSchema.find(f => resolve(f.name, field.name)) match {case Some(f) => col(f.name)case _ => lit(null).as(field.name)}}data = data.select(cols: _*)}}
// ... existing code ...

一個具體的測試用例也展示了這一點,一個原先只有?a?和?b?列的表,成功寫入了包含?c?和?d?列的新數據。

DataFrameWriteTest.scala

// ... existing code ...// Case 1: two additional fields: DoubleType and TimestampTypeval ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)).toDF("a", "b", "c", "d")df2.write.format("paimon").mode("append").option("write.merge-schema", "true").save(location)
// ... existing code ...

在 Flink 或 Spark 中進行雙流 Join 時,Paimon 通常作為 Sink 端。Join 操作本身由計算引擎完成。應用需要做的就是:

  1. 執行雙流 Join。
  2. 將 Join 后的?DataStream?或?DataFrame?寫入 Paimon 表。
  3. 在寫入時,設置?write.merge-schema?為?true

這樣,無論 Join 結果的 schema 如何(比如因為上游流增加了字段導致 Join 結果也增加了字段),Paimon 表都可以自動適應,動態地添加新列。

SchemaMergingUtils

SchemaMergingUtils?是 Paimon schema 演進(Schema Evolution)功能的核心工具類。它的主要職責是比較兩個 schema(通常是數據表的現有 schema 和新寫入數據的 schema),并根據預設的規則將它們合并成一個新的、統一的 schema。這個過程支持添加新列、安全地轉換現有列的數據類型,從而實現動態 schema 的能力。

當配置 Paimon 表允許 schema 合并(例如通過?write.merge-schema=true)時,寫入流程就會調用這個工具類。它會:

  1. 比較字段:找出新舊 schema 中同名和新增的字段。
  2. 合并類型:對于同名字段,嘗試合并其數據類型(例如,INT?可以演進為?BIGINT)。
  3. 添加字段:將新 schema 中獨有的字段添加到最終的 schema 中,并為其分配新的唯一 ID。
  4. 生成新版 Schema:如果發生了任何變更,它會創建一個版本號加一的新的?TableSchema?對象。

下面我們結合代碼,從頂層方法到底層實現,一步步進行分析。

mergeSchemas

這是最頂層的入口方法,用于合并一個完整的表 schema 和一個新的行類型(通常來自要寫入的數據)。

  • 參數:
    • currentTableSchema: Paimon 表當前的?TableSchema?對象。它包含了字段、分區鍵、主鍵、表配置等所有元數據。
    • targetType: 目標?RowType,即新數據的 schema。
    • allowExplicitCast: 一個布爾標志,決定是否允許顯式(可能存在精度損失)的類型轉換,比如?STRING?轉?INT
  • 邏輯:
    1. 首先,它會檢查?targetType?和?currentTableSchema?的?RowType?是否完全相同。如果相同,則無需合并,直接返回當前的?TableSchema
    2. 如果不同,它會初始化一個?AtomicInteger?類型的?highestFieldId,記錄當前 schema 中所有字段(包括嵌套字段)的最大 ID。這個 ID 對于為新字段分配唯一標識至關重要。
    3. 調用重載的?mergeSchemas?方法(最終調用核心的?merge?方法)來遞歸地合并兩個?RowType
    4. 如果合并后的?newRowType?與原始的?currentType?相同(例如,只是可空性變化,而合并邏輯會保留原始的可空性),則也認為沒有發生實質性變化,返回原始的?TableSchema
    5. 如果 schema 確實發生了變化,它會創建一個新的?TableSchema?實例。這個新 schema 的 ID 會在舊 ID 的基礎上加 1,字段列表和?highestFieldId?會更新,而分區鍵、主鍵、表配置和注釋等信息則會從舊 schema 中繼承。
 
// ... existing code ...public static TableSchema mergeSchemas(TableSchema currentTableSchema, RowType targetType, boolean allowExplicitCast) {RowType currentType = currentTableSchema.logicalRowType();if (currentType.equals(targetType)) {return currentTableSchema;}AtomicInteger highestFieldId = new AtomicInteger(currentTableSchema.highestFieldId());RowType newRowType =mergeSchemas(currentType, targetType, highestFieldId, allowExplicitCast);if (newRowType.equals(currentType)) {// It happens if the `targetType` only changes `nullability` but we always respect the// current's.return currentTableSchema;}return new TableSchema(currentTableSchema.id() + 1,newRowType.getFields(),highestFieldId.get(),currentTableSchema.partitionKeys(),currentTableSchema.primaryKeys(),currentTableSchema.options(),currentTableSchema.comment());}
// ... existing code ...

merge

這是所有合并邏輯的核心。它被遞歸調用以處理各種數據類型。

可空性處理 (Nullability Handling)

在方法的一開始,它將?base0?和?update0?的可空性都設置為?true?來進行比較。最終返回的類型的可空性將以?base0(原始表 schema 中的類型)為準。這意味著 schema 合并不會改變現有列的可空性。

// ... existing code ...public static DataType merge(DataType base0,DataType update0,AtomicInteger highestFieldId,boolean allowExplicitCast) {// Here we try to merge the base0 and update0 without regard to the nullability,// and set the base0's nullability to the return's.DataType base = base0.copy(true);DataType update = update0.copy(true);if (base.equals(update)) {return base0;} else if (base instanceof RowType && update instanceof RowType) {
// ... existing code ...

遞歸合并復雜類型

  • RowType?(行類型): 這是最復雜的部分。
    1. 合并現有字段: 遍歷?base?(舊 schema) 的所有字段。對于每個字段,檢查?update?(新 schema) 中是否存在同名字段。如果存在,就遞歸調用?merge?方法來合并這兩個字段的類型。如果不存在,則保留?base?中的原始字段。
    2. 添加新字段: 遍歷?update?的所有字段,找出在?base?中不存在的字段。這些就是需要新增的列。對于每個新字段,調用?assignIdForNewField?為其分配一個新的、唯一的字段 ID,然后將其添加到最終的字段列表中。
    3. 最后,用更新后的字段列表創建一個新的?RowType
 
// ... existing code ...} else if (base instanceof RowType && update instanceof RowType) {List<DataField> baseFields = ((RowType) base).getFields();List<DataField> updateFields = ((RowType) update).getFields();Map<String, DataField> updateFieldMap =updateFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> updatedFields =baseFields.stream().map(baseField -> {if (updateFieldMap.containsKey(baseField.name())) {DataField updateField =updateFieldMap.get(baseField.name());DataType updatedDataType =merge(baseField.type(),updateField.type(),highestFieldId,allowExplicitCast);return new DataField(baseField.id(),baseField.name(),updatedDataType,baseField.description());} else {return baseField;}}).collect(Collectors.toList());Map<String, DataField> baseFieldMap =baseFields.stream().collect(Collectors.toMap(DataField::name, Function.identity()));List<DataField> newFields =updateFields.stream().filter(field -> !baseFieldMap.containsKey(field.name())).map(field -> assignIdForNewField(field, highestFieldId)).map(field -> field.copy(true)).collect(Collectors.toList());updatedFields.addAll(newFields);return new RowType(base0.isNullable(), updatedFields);} else if (base instanceof MapType && update instanceof MapType) {
// ... existing code ...
  • MapType,?ArrayType,?MultisetType: 對于這些集合類型,合并邏輯很簡單:遞歸地調用?merge?方法來合并它們的內部元素類型(MapType?的鍵和值類型,ArrayType?和?MultisetType?的元素類型)。

合并基礎類型

  • DecimalType: 這是一個特例。只有當兩個?DecimalType?的?scale?(小數位數) 相同時,才能合并。合并后的?precision?(總位數) 取兩者中的最大值。如果?scale?不同,會直接拋出?UnsupportedOperationException

  • 其他可轉換類型: 對于其他基礎類型,通過?supportsDataTypesCast?方法判斷是否可以轉換。

    • 隱式轉換 (Implicit Cast): 當?allowExplicitCast?為?false?時,只允許安全的類型提升,例如?INT?->?BIGINTFLOAT?->?DOUBLE
    • 顯式轉換 (Explicit Cast): 當?allowExplicitCast?為?true?時,允許更多可能損失精度的轉換。
    • 對于帶有長度(如?VARCHAR)或精度(如?TIMESTAMP)的類型,通常要求新類型的長度/精度不能小于舊類型,除非開啟了顯式轉換。
    • 如果可以轉換,則直接采用?update?的類型,但保留?base0?的可空性。
// ... existing code ...} else if (supportsDataTypesCast(base, update, allowExplicitCast)) {if (DataTypes.getLength(base).isPresent() && DataTypes.getLength(update).isPresent()) {// this will check and merge types which has a `length` attribute, like BinaryType,// CharType, VarBinaryType, VarCharType.if (allowExplicitCast|| DataTypes.getLength(base).getAsInt()<= DataTypes.getLength(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a smaller length: %s and %s",base, update));}} else if (DataTypes.getPrecision(base).isPresent()&& DataTypes.getPrecision(update).isPresent()) {// this will check and merge types which has a `precision` attribute, like// LocalZonedTimestampType, TimeType, TimestampType.if (allowExplicitCast|| DataTypes.getPrecision(base).getAsInt()<= DataTypes.getPrecision(update).getAsInt()) {return update.copy(base0.isNullable());} else {throw new UnsupportedOperationException(String.format("Failed to merge the target type that has a lower precision: %s and %s",base, update));}} else {return update.copy(base0.isNullable());}} else {throw new UnsupportedOperationException(String.format("Failed to merge data types %s and %s", base, update));}}
// ... existing code ...

assignIdForNewField

這個方法非常重要。當向?RowType?中添加一個新字段時,它負責為這個新字段及其所有嵌套字段(如果是復雜類型)分配唯一的 ID。它通過傳入的?AtomicInteger highestFieldId?來實現 ID 的原子性遞增,確保了在并發場景下 ID 的唯一性,這對于 Paimon 正確地按 ID 映射和讀取列數據至關重要。

// ... existing code ...private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) {DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId);return new DataField(highestFieldId.incrementAndGet(), field.name(), dataType, field.description());}
}

總結

SchemaMergingUtils?通過一套定義明確且可遞歸的規則,實現了 Paimon 強大而靈活的 Schema 演進能力。它能夠智能地處理字段的增加和類型變化,同時通過嚴格的 ID 分配和管理,保證了數據讀寫的正確性。這個類是 Paimon 能夠適應動態數據源、支持平滑表結構變更的關鍵所在。

SchemaManager

SchemaManager?是 Paimon 中負責管理表 schema(模式)的核心組件。它處理所有與 schema 相關的持久化操作,包括創建、讀取、更新和版本管理。可以把它看作是 Paimon 表 schema 在文件系統中的“數據庫管理員”。

SchemaManager?的主要職責可以歸納為以下幾點:

  1. Schema 持久化:將?TableSchema?對象序列化為 JSON 文件,并存儲在表的?schema?目錄下。每個 schema 文件代表一個版本。
  2. 版本管理:每個 schema 文件名都以?schema-?開頭,后跟一個從 0 開始遞增的版本號(ID),例如?schema-0,?schema-1?等。這使得 Paimon 可以追蹤 schema 的所有歷史變更。
  3. Schema 讀取:提供方法來讀取最新版本的 schema、特定版本的 schema 或所有版本的 schema。
  4. Schema 創建:在創建新表時,負責初始化并提交第一個 schema 版本(schema-0)。
  5. Schema 變更:通過應用一系列?SchemaChange(如添加列、刪除列、修改表選項等)來原子性地更新 schema,并生成一個新的、版本號加一的 schema 文件。
  6. 多分支支持:能夠為不同的數據分支(branch)管理各自獨立的 schema 演進路徑。

結構和關鍵屬性

// ... existing code ...
@ThreadSafe
public class SchemaManager implements Serializable {private static final String SCHEMA_PREFIX = "schema-";private final FileIO fileIO;private final Path tableRoot;private final String branch;public SchemaManager(FileIO fileIO, Path tableRoot) {
// ... existing code ...
  • @ThreadSafe: 這個注解表明該類的設計是線程安全的,允許多個線程同時訪問一個?SchemaManager?實例。
  • SCHEMA_PREFIX: 常量?"schema-",定義了 schema 文件名的前綴。
  • fileIO:?FileIO?接口的實例,用于與底層文件系統(如 HDFS, S3, 本地文件系統)進行交互。
  • tableRoot:?Path?對象,指向表的根目錄。SchemaManager?會在這個目錄下的?schema?子目錄中工作。
  • branch: 字符串,表示當前?SchemaManager?實例操作的數據分支名稱。Paimon 支持類似 Git 的分支功能,main?是默認的主分支。不同的分支可以有獨立的快照和 schema 演進。

構造函數和分支管理

 
// ... existing code ...public SchemaManager(FileIO fileIO, Path tableRoot) {this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);}/** Specify the default branch for data writing. */public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {this.fileIO = fileIO;this.tableRoot = tableRoot;this.branch = BranchManager.normalizeBranch(branch);}public SchemaManager copyWithBranch(String branchName) {return new SchemaManager(fileIO, tableRoot, branchName);}
// ... existing code ...
  • 構造函數初始化了?fileIOtableRoot?和?branch。默認使用主分支?DEFAULT_MAIN_BRANCH
  • copyWithBranch(String branchName): 這是一個工廠方法,用于創建一個新的?SchemaManager?實例來操作指定的分支。這體現了 Paimon 對多分支的支持。

Schema 讀取方法

 
// ... existing code ...public Optional<TableSchema> latest() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).reduce(Math::max).map(this::schema);} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...public List<TableSchema> listAll() {return listAllIds().stream().map(this::schema).collect(Collectors.toList());}public List<Long> listAllIds() {try {return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX).collect(Collectors.toList());} catch (IOException e) {throw new UncheckedIOException(e);}}
// ... existing code ...
  • latest(): 獲取最新版本的?TableSchema。它通過?listVersionedFiles?工具方法列出?schema?目錄下所有符合?schema-*?格式的文件,提取出版本號,找到最大的版本號,然后調用?schema(long id)?方法讀取并反序列化對應的 schema 文件。
  • listAll(): 獲取所有版本的?TableSchema?列表。
  • listAllIds(): 僅獲取所有 schema 版本的 ID 列表。
  • schema(long id)?(未在片段中完全展示,但被?latest()?調用): 這是一個內部方法,根據給定的 ID 構建 schema 文件路徑(如?.../schema/schema-5),然后使用?fileIO?讀取文件內容,并通過?TableSchema.fromJSON(String json)?將其反序列化為?TableSchema?對象。

表創建?createTable(...)

// ... existing code ...public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {while (true) {Optional<TableSchema> latest = latest();if (latest.isPresent()) {TableSchema latestSchema = latest.get();if (externalTable) {checkSchemaForExternalTable(latestSchema.toSchema(), schema);return latestSchema;} else {throw new IllegalStateException("Schema in filesystem exists, creation is not allowed.");}}TableSchema newSchema = TableSchema.create(0, schema);// validate table from creating tableFileStoreTableFactory.create(fileIO, tableRoot, newSchema).store();boolean success = commit(newSchema);if (success) {return newSchema;}}}
// ... existing code ...
  • 這是一個原子性操作,通過?while(true)?循環和文件系統的原子性創建來保證。
  • 檢查存在性: 首先調用?latest()?檢查是否已有 schema 文件存在。如果存在且不是創建外部表,則拋出異常,防止覆蓋現有表。
  • 創建新 Schema: 如果不存在,則使用?TableSchema.create(0, schema)?創建一個 ID 為 0 的新?TableSchema
  • 驗證: 調用?FileStoreTableFactory.create(...)?來驗證 schema 的有效性(例如,檢查主鍵、分區鍵等配置是否合法)。
  • 提交: 調用?commit(newSchema)?方法,該方法會嘗試原子性地創建?schema-0?文件。如果創建成功,循環結束并返回新的?TableSchema。如果因為并發沖突導致創建失敗,循環會繼續,重新嘗試整個過程。

Schema 變更?commitChanges(...)

這是執行?ALTER TABLE?操作的核心邏輯。

// ... existing code ...public TableSchema commitChanges(List<SchemaChange> changes)throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,Catalog.ColumnNotExistException {SnapshotManager snapshotManager =new SnapshotManager(fileIO, tableRoot, branch, null, null);LazyField<Boolean> hasSnapshots =new LazyField<>(() -> snapshotManager.latestSnapshot() != null);while (true) {TableSchema oldTableSchema =latest().orElseThrow(() ->new Catalog.TableNotExistException(identifierFromPath(tableRoot.toString(), true, branch)));TableSchema newTableSchema = generateTableSchema(oldTableSchema, changes, hasSnapshots);try {boolean success = commit(newTableSchema);if (success) {return newTableSchema;}} catch (Exception e) {throw new RuntimeException(e);}}}public boolean commit(TableSchema newSchema) throws Exception {SchemaValidation.validateTableSchema(newSchema);SchemaValidation.validateFallbackBranch(this, newSchema);Path schemaPath = toSchemaPath(newSchema.id());return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString());}
// ... existing code ...
  • 同樣使用?while(true)?循環來保證原子性。
  • 獲取舊 Schema: 首先獲取當前的最新 schema (oldTableSchema)。
  • 生成新 Schema: 調用?generateTableSchema?方法,該方法是變更邏輯的核心。它接收舊 schema 和一個?SchemaChange?列表,然后逐個應用這些變更(如?AddColumn,?DropColumn,?SetOption?等),生成一個新的?TableSchema?對象。這個新對象的 ID 是舊 ID 加 1。
  • 提交新 Schema: 調用?commit(newTableSchema)?嘗試原子性地創建新的 schema 文件(如?schema-5?->?schema-6)。如果成功,則返回新 schema。如果失敗,則重試。

generateTableSchema(...)

這個方法是應用?SchemaChange?的具體實現。它像一個狀態機,基于?oldTableSchema,根據?changes?列表中的每個變更項,逐步構建出?newTableSchema?的各個部分。

// ... existing code ...public TableSchema generateTableSchema(TableSchema oldTableSchema, List<SchemaChange> changes, LazyField<Boolean> hasSnapshots)throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {Map<String, String> oldOptions = new HashMap<>(oldTableSchema.options());Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());String newComment = oldTableSchema.comment();for (SchemaChange change : changes) {if (change instanceof SetOption) {
// ... existing code ...} else if (change instanceof RemoveOption) {
// ... existing code ...} else if (change instanceof AddColumn) {
// ... existing code ...} else if (change instanceof RenameColumn) {
// ... existing code ...} else if (change instanceof DropColumn) {
// ... existing code ...} else if (change instanceof UpdateColumnType) {
// ... existing code ...} else if (change instanceof UpdateColumnNullability) {
// ... existing code ...} else if (change instanceof UpdateColumnPosition) {
// ... existing code ...} else if (change instanceof UpdateColumnComment) {
// ... existing code ...}}
// ... existing code ...

它通過?instanceof?判斷?SchemaChange?的具體類型,并執行相應的邏輯:

  • SetOption/RemoveOption: 修改?newOptions?這個 Map。
  • AddColumn: 向?newFields?列表中添加新字段,并使用?highestFieldId?分配新 ID。
  • RenameColumn: 修改?newFields?中某個字段的名稱。
  • DropColumn: 從?newFields?中移除字段。
  • UpdateColumnType/UpdateColumnNullability: 更新字段的類型或可空性。
  • ...等等。

總結

SchemaManager?是 Paimon 表結構管理的基石。它通過將 schema 版本化并持久化到文件系統中,實現了 schema 的可靠追蹤和演進。其原子性的提交操作(無論是創建還是變更)確保了在并發環境下的元數據一致性。它與?SchemaMergingUtils(負責邏輯合并)和?SchemaChange(負責定義變更操作)等類緊密協作,共同構成了 Paimon 強大而靈活的 Schema Evolution 機制。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/90124.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/90124.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/90124.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

7.3.1 進程調度機制那些事兒

一&#xff1a;task_struct結構體分析 1、進程有兩種特殊形式&#xff1a;沒有用戶虛擬地址空間的進程叫內核線程&#xff0c;共享用戶虛擬地址空間的進程叫作用戶線程。共享同一個用戶虛擬地址空間的所有用戶線程叫線程組。 C語言標準庫進程 Linux內核進程 …

基于多種機器學習的水質污染及安全預測分析系統的設計與實現【隨機森林、XGBoost、LightGBM、SMOTE、貝葉斯優化】

文章目錄有需要本項目的代碼或文檔以及全部資源&#xff0c;或者部署調試可以私信博主項目介紹總結每文一語有需要本項目的代碼或文檔以及全部資源&#xff0c;或者部署調試可以私信博主 項目介紹 隨著工業化和城市化的不斷推進&#xff0c;水質污染問題逐漸成為影響生態環境…

Linux第三天Linux基礎命令(二)

1.grep命令可以通過grep命令&#xff0c;從文件中通過關鍵字過濾文件行。grep [-n] 關鍵字 文件路徑選項-n&#xff0c;可選&#xff0c;表示在結果中顯示匹配的行的行號。參數&#xff0c;關鍵字&#xff0c;必填&#xff0c;表示過濾的關鍵字&#xff0c;帶有空格或其它特殊符…

Linux Debian操作系統、Deepin深度操作系統手動分區方案參考

以下是Linux Debian操作系統、Deepin深度操作系統安裝過程中手動分區的建議&#xff0c;按UEFI、swap、boot、根分區、home分區劃分&#xff0c;以下是詳細的分區配置參考建議&#xff1a; 一、手動分區方案&#xff08;UEFI模式&#xff09;分區名稱分區類型大小建議掛載點文件…

jmeter如何做自動化接口測試?

全網最全流程&#xff01;JmeterAntAllureJenkins搭建屬于你的接口自動化流水線&#xff0c;CI/CD直接起飛&#xff01;1.什么是jmeter&#xff1f; JMeter是100%完全由Java語言編寫的&#xff0c;免費的開源軟件&#xff0c;是非常優秀的性能測試和接口測試工具&#xff0c;支…

MyBatis整合SpringBoot終極指南

以下是一份系統化的 ?MyBatis 整合 Spring Boot 學習筆記&#xff0c;結合官方文檔與最佳實踐整理&#xff0c;涵蓋配置、核心功能、實戰示例及常見問題解決。 一、整合基礎與依賴配置 1. ?核心依賴? 在 pom.xml 中添加&#xff1a; <dependency><groupId>or…

企業微信ipad協議接口解決方案最新功能概覽

支持最新版本企業微信&#xff0c;安全穩定0封號免費試用&#xff0c;技術支持&#xff1a;string wechat"Mrzhu0107"企微ipad協議接口最新功能升級如下&#xff1a;【初始化】初始化企業微信&#xff0c;設置消息回調地址&#xff0c;獲取運行中的實例&#xff0c;根…

ansible 批量 scp 和 load 鏡像

1、save 鏡像腳本 在本地保存鏡像到 ansible 代碼目錄的腳本。 1.1、使用說明: 保存單個鏡像 save -i gcr.io/cadvisor/cadvisor:v0.52.1保存某個 namespace 下的所有鏡像 save1.2、腳本內容 cat /usr/local/bin/save #!/bin/bash #set -e # 分隔符 str="-"# …

【C# in .NET】20. 探秘靜態類:抽象與密封的結合體

探秘靜態類:抽象與密封的結合體 一、靜態類的底層本質:抽象與密封的結合體 靜態類作為 C# 中特殊的類型形式,其底層實現融合了抽象類與密封類的特性,形成了不可實例化、不可繼承的類型約束。 1. IL 層面的靜態類標識 定義一個簡單的靜態類: public static class Stri…

【Vue3】ECharts圖表案例

官方參考&#xff1a;Examples - Apache ECharts 1、創建工程 npm create vitelatest 或 npm init vuelatest 設置如下 2、下載依賴集運行項目 cd vue-echarts-demo npm install npm install echarts npm run dev 3、編寫核心代碼 創建src\components\BarView.vue文件…

二分查找----2.搜索二維矩陣

題目鏈接 /** 方案一: 每行都是遞增的,對每行進行二分,逐行查找;效率不高,每次搜索只能控制列無法兼顧到行,行被固定存在不必要的搜索 方案二: 從右上或左下頂點出發,以右上為例,向左迭代列減小,向下迭代行增大;效率更高避免重復搜索 */ class Solution {/**方案一: 每行都是…

2025.7.23

flen&#xff08;&#xff09;這個函數計算到的文件大小為0&#xff0c;明天解決 原因是路徑錯誤&#xff0c;寫成了CONFIG_ROOT_PATH"/music/test2.mp3,但是也沒報錯&#xff0c;打開文件也成功&#xff0c;所以就沒有懷疑到路徑方面來

大致自定義文件I/O庫函數的實現詳解(了解即可)

目錄 一、mystdio.h 代碼思路分析 二、mystdio.c 1. 輔助函數 BuyFile 2. 文件打開函數 MyFopen 3. 文件關閉函數 MyFclose 4. 數據寫入函數 MyFwrite 1、memcpy(file->outbuffer file->bufferlen, str, len); 2、按位與&#xff08;&&#xff09;運算的作…

Zipformer

Zipformer首先&#xff0c;Conv-Embed 將輸入的 100Hz 的聲學特征下采樣為 50 Hz 的特征序列&#xff1b;然后&#xff0c;由 6 個連續的 encoder stack 分別在 50Hz、25Hz、12.5Hz、6.25Hz、12.5Hz 和 25Hz 的采樣率下進行時域建模。除了第一個 stack 外&#xff0c;其他的 st…

SpringMVC快速入門之請求與響應

SpringMVC快速入門之請求與響應一、請求處理&#xff1a;獲取請求參數1.1 普通參數獲取&#xff08;RequestParam&#xff09;1.1.1 基礎用法1.1.2 可選參數與默認值1.2 路徑變量&#xff08;PathVariable&#xff09;1.3 表單數據綁定到對象1.3.1 定義實體類1.3.2 綁定對象參數…

【Mysql】 Mysql zip解壓版 Win11 安裝備忘

1. 官網 MySQL :: MySQL Community Downloads 選擇 MySQL Community Server 選擇Archives 選擇 8.0版本 MySQL :: Download MySQL Community Server (Archived Versions) 1. 普通版本&#xff08;推薦&#xff09; 名稱&#xff1a;Windows (x86, 64-bit), ZIP Archive 文件…

Web3面試題

1.在使用 Ethers.js 對接 MetaMask 錢包時&#xff0c;如何檢測用戶賬戶切換的情況&#xff1f;請簡述實現思路。 答案&#xff1a;可通過監聽accountsChanged事件來檢測。當用戶切換賬戶時&#xff0c;MetaMask 會觸發該事件&#xff0c;在事件回調函數中可獲取新的賬戶地址&…

uni-app動態獲取屏幕邊界到安全區域距離的完整教程

目錄 一、什么是安全區域&#xff1f; 二、獲取安全區域距離的核心方法 三、JavaScript動態獲取安全區域距離 1. 核心API 2. 完整代碼示例 3. 關鍵點說明 四、CSS環境變量適配安全區域 1. 使用 env() 和 constant() 3. 注意事項 五、不同平臺的適配策略 1. H5 端 2…

ZKmall開源商城微服務架構實戰:Java 商城系統的模塊化拆分與通信之道

在電商業務高速增長的今天&#xff0c;傳統單體商城系統越來越力不從心 —— 代碼堆成一團、改一點牽一片、想加功能得大動干戈&#xff0c;根本扛不住高并發、多場景的業務需求。微服務架構卻能破這個局&#xff1a;把系統拆成一個個能獨立部署的小服務&#xff0c;每個服務專…

ROS 與 Ubuntu 版本的對應關系

ROS 作為一套用于構建機器人應用的開源框架&#xff0c;其開發和運行高度依賴 Ubuntu 等 Linux 發行版&#xff0c;尤其是 Ubuntu 因其廣泛的兼容性和社區支持&#xff0c;成為了 ROS 最主流的運行平臺。 一、ROS 與 Ubuntu 版本的對應關系&#xff08;截至 2025 年&#xff0c…