Paimon LSM Tree Compaction 策略

壓縮怎么進行的

這里的操作都是KValue,內部有row kind,標記了刪除和插入

MergeTreeCompactManager?是 Paimon 中 Merge-Tree 結構壓縮任務的總調度中心。它的核心職責就是監控文件的層級狀態(Levels),并在合適的時機,根據預設的策略,挑選文件并發起壓縮任務

triggerCompaction?方法正是這個調度中心的核心入口。下面我們來詳細拆解它的邏輯和涉及的策略。

這個方法有兩個主要的執行路徑,由?fullCompaction?參數控制:

  • fullCompaction = true: 強制執行一次“完全合并”。通常由用戶手動觸發(比如通過?ALTER TABLE ... COMPACT?命令)。它會嘗試將所有數據文件合并成盡可能少的文件。
  • fullCompaction = false: 執行一次“常規合并”。這是系統在日常寫入過程中自動觸發的,目的是維持 LSM-Tree 的健康結構,防止某一層級文件過多,影響讀取性能。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;// 1. 獲取當前所有層級的 SortedRun 視圖List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 省略日志和檢查 ...// 2a. 如果是完全合并,調用 FullCompaction 策略optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {// ... 省略日志和檢查 ...// 2b. 如果是常規合并,調用配置的 strategy (如 UniversalCompaction)optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}// 3. 如果策略選出了需要合并的文件單元 (CompactUnit)optionalUnit.ifPresent(unit -> {// 4. 決定是否可以丟棄刪除標記 (dropDelete)boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ... 省略日志 ...// 5. 提交壓縮任務submitCompaction(unit, dropDelete);});}
// ... existing code ...

完全合并策略 (CompactStrategy.pickFullCompaction)

當?fullCompaction?為?true?時,會調用?CompactStrategy.pickFullCompaction。這個策略非常直接:

  • 目標:將所有層級的所有文件合并到最高層級(maxLevel)。
  • 行為:它會收集?levels?中的所有?DataFileMeta,將它們打包成一個巨大的?CompactUnit,并將?outputLevel?設置為?levels.maxLevel()
  • 應用場景
    • 用戶希望整理數據碎片,減少文件總數,優化后續的查詢性能。
    • 清理過期數據(如果配置了?recordLevelExpire)。因為只有在最高層級的合并才能確保數據不會再被舊版本覆蓋,從而安全地物理刪除。

常規合并策略 (strategy.pick)

當?fullCompaction?為?false?時,會調用構造函數中傳入的?strategy?實例的?pick?方法。在 Paimon 中,最常用的常規策略是?UniversalCompaction

UniversalCompaction?策略模擬了 RocksDB 的 Universal Compaction Style,其核心思想是:

  • 目標:維持一個健康的、層級分明的文件結構,避免 Level 0 文件堆積過多,同時控制寫放大和空間放大。
  • 觸發條件
    1. Sorted Run 數量閾值:當總的?SortedRun?數量(Level 0 每個文件算一個 run,其他 level 每層算一個 run)超過?num-sorted-run.compaction-trigger?配置時,會觸發合并。這是最主要的觸發條件。
    2. 空間放大閾值:當所有文件的總大小,遠大于最高層級文件的大小時(超過?max-size-amplification-percent?配置的比例),也會觸發合并,以回收空間。
  • 挑選文件的邏輯
    • 它會從 Level 0 開始,向上檢查,找到第一個滿足合并條件的層級。
    • 通常,它會選擇將 Level 0 的所有文件合并到 Level 1,或者將 Level?i?的所有文件合并到 Level?i+1
    • 它會盡量選擇相鄰的、大小相似的?SortedRun?進行合并,以達到最優效率。
  • filter?的作用: 在?triggerCompaction?方法中,strategy.pick?的結果后面跟了兩個?.filter(...)?調用。
    1. .filter(unit -> unit.files().size() > 0): 確保選出的壓縮單元不是空的。
    2. .filter(unit -> unit.files().size() > 1 || unit.files().get(0).level() != unit.outputLevel()): 這是一個重要的優化。它排除了“只有一個文件,并且輸出層級和當前層級相同”的情況。這種情況意味著文件只是被“升級”(upgrade),而沒有實際的合并重寫,這通常在?MergeTreeCompactTask?內部處理更高效,無需作為一個獨立的壓縮任務提交。

dropDelete?策略

在確定了要合并的文件后,代碼會計算一個?dropDelete?布爾值。

  • 含義dropDelete?為?true?表示在合并重寫數據時,可以直接丟棄掉類型為?DELETE?的記錄。
  • 條件
    1. unit.outputLevel() != 0: 輸出層級不能是 Level 0。因為 Level 0 可能還有其他更老的文件沒有參與本次合并,如果丟棄了刪除標記,可能會導致本該被刪除的數據重新“復活”。
    2. unit.outputLevel() >= levels.nonEmptyHighestLevel(): 輸出層級必須是當前數據存在的最高層級或更高。這確保了合并產生的新文件是“最老”的,不存在比它更老的數據了,因此刪除標記可以被安全地物理移除。
    3. dvMaintainer != null: 如果表開啟了刪除向量(Deletion Vectors),則邏輯會有所不同,通常也可以安全地處理刪除。

總結

MergeTreeCompactManager?的?triggerCompaction?方法是一個精密的調度器,它通過?levels?對象感知整個表的物理文件狀態,并執行以下流程:

  1. 獲取狀態:從?levels?中獲取所有文件的層級和分布信息(levelSortedRuns)。
  2. 策略決策
    • 如果是手動觸發的全量合并,則使用?pickFullCompaction?策略,將所有文件打包,目標是合并到最高層。
    • 如果是自動觸發的常規合并,則使用?UniversalCompaction?等策略,根據?SortedRun?數量、空間放大等指標,智能地選擇一部分文件進行層級推進式的合并。
  3. 任務優化:通過?filter?過濾掉無需執行的或無效的壓縮單元。
  4. 參數計算:根據合并的目標層級和?levels?的整體狀態,計算出是否可以安全地在合并中物理刪除數據(dropDelete)。
  5. 提交執行:最后將包含待合并文件、目標層級、dropDelete?標志的?CompactUnit?封裝成?MergeTreeCompactTask,提交到線程池執行。

整個過程完美地體現了 LSM-Tree 架構通過后臺合并來平衡寫入性能和查詢性能的核心思想。

壓縮策略

CompactStrategy.pickFullCompaction?- 全量合并策略

這是一個定義在?CompactStrategy?接口中的靜態方法,代表了一種最徹底的合并策略。它的目標非常明確:將表(或分區)中的所有數據文件合并成盡可能少的文件,并放置到最高層級(max level)

a. 源碼分析

// ... existing code ...static Optional<CompactUnit> pickFullCompaction(int numLevels,List<LevelSortedRun> runs,@Nullable RecordLevelExpire recordLevelExpire) {int maxLevel = numLevels - 1;if (runs.isEmpty()) {// no sorted run, no need to compactreturn Optional.empty();} else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {if (recordLevelExpire == null) {// only 1 sorted run on the max level and don't check record-expire, no need to// compactreturn Optional.empty();}// pick the files which has expired recordsList<DataFileMeta> filesContainExpireRecords = new ArrayList<>();for (DataFileMeta file : runs.get(0).run().files()) {if (recordLevelExpire.isExpireFile(file)) {filesContainExpireRecords.add(file);}}return Optional.of(CompactUnit.fromFiles(maxLevel, filesContainExpireRecords));} else {return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}
// ... existing code ...

邏輯分解如下:

  1. 計算最高層級maxLevel = numLevels - 1
  2. 處理邊界情況
    • 如果?runs?為空(沒有任何文件),直接返回?Optional.empty(),無需合并。
    • 如果已經只剩下一個?SortedRun?并且它已經在最高層級?(runs.size() == 1 && runs.get(0).level() == maxLevel),這說明數據已經是最優狀態了。
      • 此時,會檢查是否配置了記錄級過期 (recordLevelExpire)
      • 如果沒有配置過期,那么就無需做任何事,返回?Optional.empty()
      • 如果配置了過期,它會遍歷這個?SortedRun?中的所有文件,挑出那些可能包含過期數據的文件(通過?recordLevelExpire.isExpireFile(file)?判斷),然后只對這些文件進行一次“自我合并”以清理過期數據。
  3. 常規全量合并
    • 如果不滿足上述邊界情況(比如有多于一個?SortedRun,或者唯一的?SortedRun?不在最高層),則執行標準的全量合并。
    • 它會調用?CompactUnit.fromLevelRuns(maxLevel, runs),將所有傳入的?runs?中的文件都包含進來,創建一個?CompactUnit,并指定輸出層級為?maxLevel

b. 應用場景

  • 用戶手動觸發:最常見的場景是用戶通過 Flink SQL?CALL sys.compact(...)?或 Spark Procedure?CALL sys.compact(...)?并指定?full?模式來執行。
  • 目的
    • 減少小文件:將長時間積累的大量小文件合并成少量大文件,顯著提升后續的查詢性能。
    • 物理刪除:全量合并到最高層級是安全地物理刪除帶有刪除標記(DELETE)的數據的唯一時機。
    • 數據清理:配合 TTL,清理過期數據。

?ForceUpLevel0Compaction?- 強制提升 Level 0 策略

這是一個實現了?CompactStrategy?接口的類,它代表了一種更激進的常規合并策略。它的核心思想是:優先采用通用的?UniversalCompaction?策略;如果通用策略認為無需合并,則強制檢查 Level 0 是否有文件,如果有,就將它們全部合并

a. 源碼分析

/** A {@link CompactStrategy} to force compacting level 0 files. */
public class ForceUpLevel0Compaction implements CompactStrategy {private final UniversalCompaction universal;public ForceUpLevel0Compaction(UniversalCompaction universal) {this.universal = universal;}@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {// 1. 首先嘗試通用的 UniversalCompaction 策略Optional<CompactUnit> pick = universal.pick(numLevels, runs);if (pick.isPresent()) {// 如果通用策略找到了需要合并的文件,就直接采納它的決定return pick;}// 2. 如果通用策略認為不需要合并,則執行強制邏輯//    調用 universal.forcePickL0,這個方法會專門檢查 L0return universal.forcePickL0(numLevels, runs);}
}

forcePickL0?的邏輯在?UniversalCompaction.java?中:

// ... existing code ...Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {// 收集所有 level 0 的文件int candidateCount = 0;for (int i = candidateCount; i < runs.size(); i++) {if (runs.get(i).level() > 0) {break;}candidateCount++;}// 如果 L0 沒有文件,返回空;否則,將所有 L0 文件打包成一個壓縮單元return candidateCount == 0? Optional.empty(): Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));}
// ... existing code ...

邏輯分解如下:

  1. 委托與回退(Delegate and Fallback)ForceUpLevel0Compaction?首先將決策權委托給內部的?UniversalCompaction?實例。UniversalCompaction?會根據文件數量、大小比例等常規指標判斷是否需要合并。
  2. 強制檢查 L0:如果?UniversalCompaction?的常規檢查沒有觸發合并(返回了?Optional.empty()),ForceUpLevel0Compaction?會執行它的“強制”邏輯:調用?universal.forcePickL0
  3. forcePickL0?的行為:這個方法很簡單,它只看 Level 0。只要 Level 0 存在任何文件,它就會把 Level 0 的所有文件都收集起來,創建一個?CompactUnit,準備將它們向上合并。

b. 應用場景

  • Lookup Join 優化:這是此策略最主要的應用場景。當 Paimon 表作為 Flink 的維表進行?lookup join?時,為了保證維表數據的近實時性,我們希望新寫入的數據(總是在 Level 0)能盡快地被合并到更高層級,從而對?lookup?可見。
  • lookup.compact-mode = 'radical':當用戶配置此參數時,系統就會啟用?ForceUpLevel0Compaction?策略。radical(激進的)這個詞很形象地描述了它的行為:只要有新數據寫入(即 L0 有文件),就盡快地、激進地將它合并掉,以犧牲一些合并開銷為代價,換取數據可見性的延遲降低。

Paimon 的優化:Lookup 模式下跳過 L0

為了解決上述性能問題,Paimon 在開啟了特定優化后(比如配置了?lookup?changelog-producer 或開啟了刪除向量),其?LocalTableQuery?會采取一種特殊的讀取策略:

默認從 Level 1 開始讀取數據,直接忽略 Level 0!

這可以在?LocalTableQuery.java?的構造函數中找到證據:

// ... existing code ...this.lookupStoreFactory =LookupStoreFactory.create(
// ... existing code ...new RowCompactedSerializer(keyType).createSliceComparator());if (options.needLookup()) {// 關鍵點:如果開啟了 lookup 優化,起始讀取層級設置為 1startLevel = 1;} else {
// ... existing code ...}
// ... existing code ...

options.needLookup()?會在滿足某些條件時(如?changelog-producer = 'lookup')返回?true。當它為?true?時,startLevel?被設為?1,這意味著后續的所有查找操作都將從 Level 1 開始,從而完全避開了低效的 Level 0。

ForceUpLevel0Compaction?的作用:讓 L0 數據盡快可見

現在,整個邏輯就閉環了:

  1. 新數據寫入 Paimon 表,生成 L0 文件。
  2. 高性能的 Lookup Join 讀取器為了性能,只看 L1 及以上層級的文件
  3. 此時,新寫入的 L0 文件對于這個 Lookup Join 來說,就是“不可見”的。
  4. 為了解決這個問題,必須盡快地將 L0 的文件合并(Compaction)到 L1。
  5. ForceUpLevel0Compaction?策略應運而生。它的行為非常“激進” (radical):只要發現 L0 有文件,不管滿足不滿足常規的合并觸發條件,都強制發起一次 Compaction,將它們推向 L1

這樣一來,新數據停留在 L0 的時間窗口被大大縮短,從而保證了數據能夠近實時地對 Lookup Join 可見。

UniversalCompaction?

它是 Paimon 中最核心、最常用的常規壓縮策略,其設計思想借鑒了 RocksDB 的 Universal Compaction,旨在平衡寫入放大、讀取放大和空間放大。

UniversalCompaction?作為一個?CompactStrategy?的實現,它的主要職責是在常規寫入流程中,根據一系列預設規則,判斷是否需要觸發一次合并(Compaction),并挑選出具體要合并哪些文件。

它的核心目標是:

  • 控制寫入放大(Write Amplification):避免過于頻繁地重寫數據。
  • 維持健康的 LSM-Tree 結構:防止 L0 文件過多,或者文件大小差異過大,從而保證讀取性能(Read Amplification)和空間占用(Space Amplification)在一個可接受的范圍內。

我們先從它的成員變量入手,這些變量定義了?UniversalCompaction?策略的行為準則。

// ... existing code ...
public class UniversalCompaction implements CompactStrategy {private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class);// 對應 'compaction.max-size-amplification-percent'private final int maxSizeAmp; // 對應 'compaction.sorted-run.size-ratio'private final int sizeRatio;// 對應 'compaction.sorted-run.num-compaction-trigger'private final int numRunCompactionTrigger;// 對應 'compaction.optimization-interval'@Nullable private final Long opCompactionInterval;@Nullable private Long lastOptimizedCompaction;// 對應 'lookup.compact-max-interval'@Nullable private final Integer maxLookupCompactInterval;@Nullable private final AtomicInteger lookupCompactTriggerCount;public UniversalCompaction(int maxSizeAmp,int sizeRatio,int numRunCompactionTrigger,@Nullable Duration opCompactionInterval,@Nullable Integer maxLookupCompactInterval) {this.maxSizeAmp = maxSizeAmp;this.sizeRatio = sizeRatio;this.numRunCompactionTrigger = numRunCompactionTrigger;this.opCompactionInterval =opCompactionInterval == null ? null : opCompactionInterval.toMillis();this.maxLookupCompactInterval = maxLookupCompactInterval;this.lookupCompactTriggerCount =maxLookupCompactInterval == null ? null : new AtomicInteger(0);}
// ... existing code ...
  • maxSizeAmp:?最大空間放大百分比。控制除最老的一個 Sorted Run 外,其他所有 Sorted Run 的總大小,不能超過最老的 Sorted Run 大小的?maxSizeAmp / 100?倍。這是為了控制空間浪費。
  • sizeRatio:?大小比例。在挑選文件進行合并時,如果當前候選文件集合的總大小,與下一個 Sorted Run 的大小比例在?sizeRatio?之內,就會把下一個 Sorted Run 也納入本次合并。這是為了傾向于合并大小相近的文件,提高效率。
  • numRunCompactionTrigger:?文件數量觸發器。當總的 Sorted Run 數量超過這個閾值時,會強制觸發一次合并。這是最主要的常規合并觸發條件。
  • opCompactionInterval:?優化合并時間間隔。一個可選的配置,用于周期性地觸發一次全量合并(將所有文件合并到最高層),以保證讀取性能。
  • maxLookupCompactInterval:?Lookup 場景的合并間隔。這是為?lookup.compact-mode = 'gentle'?模式設計的。它不要求每次寫入都強制合并 L0,而是每隔 N 次?pick?調用(即 N 次 compaction 檢查)后,如果 L0 還有文件,就強制合并一次。這是一種在性能和數據延遲之間的折中。

核心方法?pick?- 決策中心

pick?方法是整個策略的核心,它按照優先級從高到低的順序,檢查是否滿足各種觸發條件。只要有一個條件滿足,就會生成一個?CompactUnit?并返回,后續的檢查就不再進行。

// ... existing code ...@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {int maxLevel = numLevels - 1;// 優先級 0: 周期性優化合并 (如果配置了)if (opCompactionInterval != null) {if (lastOptimizedCompaction == null|| currentTimeMillis() - lastOptimizedCompaction > opCompactionInterval) {// ...return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}// 優先級 1: 檢查空間放大 (pickForSizeAmp)CompactUnit unit = pickForSizeAmp(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 優先級 2: 檢查大小比例 (pickForSizeRatio)unit = pickForSizeRatio(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 優先級 3: 檢查文件數量 (numRunCompactionTrigger)if (runs.size() > numRunCompactionTrigger) {// ...return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));}// 優先級 4: 檢查 Lookup 場景的周期性強制合并 (如果配置了)if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {lookupCompactTriggerCount.getAndIncrement();if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {// ...return forcePickL0(numLevels, runs);} // ...}return Optional.empty();}
// ... existing code ...

觸發條件分析 (按優先級)

  1. 周期性優化合并 (opCompactionInterval): 最高優先級。如果配置了該參數,并且距離上次優化合并的時間已經超過了指定間隔,它會直接觸發一次全量合并,將所有?runs?合并到?maxLevel。這對于需要定期整理數據以保證查詢性能的場景非常有用。

  2. 空間放大 (pickForSizeAmp): 檢查?(所有文件總大小 - 最老文件大小) / 最老文件大小?是否超過了?maxSizeAmp。如果超過,說明非最高層的數據(即“增量”數據)相對于“存量”數據來說過于龐大,占用了過多額外空間。此時也會觸發一次全量合并

  3. 大小比例 (pickForSizeRatio): 這是最常規的合并挑選邏輯。它會從最年輕的文件(runs?列表的開頭)開始,逐個向后累加,只要當前累加的總大小與下一個文件的大小比例在?sizeRatio?之內,就繼續向后吞并。這個過程會形成一個大小比較均勻的合并候選集。

  4. 文件數量 (numRunCompactionTrigger): 如果總的?SortedRun?數量超過了閾值,說明文件過于碎片化,會嚴重影響性能。此時會強制觸發一次合并。它會計算出需要合并掉多少個文件才能使總數降到閾值以下,然后調用?pickForSizeRatio?來挑選這些文件。

  5. Lookup 周期性合并 (maxLookupCompactInterval): 這是最低優先級的檢查。它維護一個原子計數器?lookupCompactTriggerCount。每次調用?pick?都會使其加一。當計數器達到?maxLookupCompactInterval?時,就會重置為 0 并調用?forcePickL0,強制合并 L0 的所有文件。這為?lookup?場景提供了一種“溫和”(gentle)的合并模式。

pickForSizeRatio?

    public CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {long candidateSize = candidateSize(runs, candidateCount);for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;}if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);}return null;}

這個方法是 Paimon 中 Universal Compaction(通用合并)策略的核心部分之一,它根據文件大小比例來決定哪些文件(SortedRun)應該被合并。

  • int maxLevel: Merge-Tree 的最大層級數。合并后的文件通常會被放到更高的層級。
  • List<LevelSortedRun> runs: 當前所有層級的、已排序的?SortedRun?列表。這個列表通常是按從新到舊(或從小到大)的順序排列的。
  • int candidateCount: 初始候選?SortedRun?的數量。方法會從列表的前?candidateCount?個?run?開始考慮合并。
  • boolean forcePick: 是否強制選擇。如果為?true,即使最終只選出了一個?run,也會創建合并單元(CompactUnit)。這在某些強制合并的場景下很有用。

這個方法的主要邏輯是:從一組初始的候選文件(runs)開始,不斷地嘗試將后續的文件也加入到這次合并任務中,直到遇到一個尺寸“過大”的文件為止。

代碼執行流程如下:

  1. 計算初始候選文件總大小

    long candidateSize = candidateSize(runs, candidateCount);
    

    首先,它會計算由?candidateCount?指定的初始候選?run?的總大小。

  2. 迭代選擇更多文件

    for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;
    }
    

    接著,它會遍歷剩余的?run。對于每一個?next?run,它會檢查一個關鍵條件:?candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()

    • sizeRatio?是一個配置項 (compaction.size-ratio),表示尺寸比較時的靈活性百分比。
    • 這個條件判斷的是:當前已選中的文件總大小(candidateSize),即使加上?sizeRatio?百分比的“寬容度”,是否仍然小于下一個文件(next)的大小。
    • 如果條件成立,意味著下一個文件?next?比當前已選中的文件總和要大得多,將它們合并的性價比不高。因此,循環中斷(break),不再選擇更多的文件。
    • 如果條件不成立,意味著?next?文件的大小和當前已選中的文件總大小在同一個量級,適合一起合并。于是,將?next?文件加入候選集,更新?candidateSize?和?candidateCount
  3. 創建合并單元

    if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);
    }return null;
    

    循環結束后,方法會判斷是否需要創建合并任務 (CompactUnit)。

    • 如果?forcePick?為?true,或者最終選出的文件數量?candidateCount?大于1(合并至少需要兩個文件才有意義),就會調用?createUnit?方法來創建一個包含所有選中文件的?CompactUnit
    • 否則,說明沒有找到合適的合并機會,返回?null

在?UniversalCompaction?類中,這個方法主要在以下幾個場景被調用:

  1. 常規大小比例檢查 (pickForSizeRatio(maxLevel, runs)): 這是最常見的用法,從第一個?run?開始(candidateCount=1),嘗試尋找合適的合并機會。
  2. 文件數量觸發 (pick?方法中): 當總文件數超過閾值?numRunCompactionTrigger?時,會觸發合并。此時,初始?candidateCount?會被設置為?runs.size() - numRunCompactionTrigger + 1,然后調用此方法來決定最終合并哪些文件。
  3. 強制L0層合并 (forcePickL0?方法中): 在某些情況下(例如,為了降低查找延遲),需要強制合并L0層的所有文件。這時會調用此方法,并將?forcePick?設置為?true,確保即使L0只有一個文件也會被打包成一個合并任務。

pickForSizeRatio?方法實現了一種智能的合并文件選擇策略。它傾向于將大小相近的文件進行合并,避免了將一個很小的文件和一個巨大的文件進行合并所帶來的高I/O開銷,從而提高了合并效率。通過?sizeRatio?參數提供了靈活性,并通過?forcePick?參數支持了強制合并的場景。

createUnit()?方法:目標層級的決定者

createUnit?方法是真正計算?outputLevel?的地方,這是理解整個機制的關鍵

// ... existing code ...@VisibleForTestingCompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {int outputLevel;if (runCount == runs.size()) {// 如果所有 run 都參與合并,目標就是最高層outputLevel = maxLevel;} else {// 否則,目標層級是下一個未參與合并的 run 的層級減 1// 這是最核心的邏輯outputLevel = Math.max(0, runs.get(runCount).level() - 1);}if (outputLevel == 0) {// 為了避免產生新的 level 0 文件,這里做了特殊處理// 如果計算出的目標是 level 0,會繼續往后尋找,直到找到一個非 level 0 的 run// 并將它的層級作為 outputLevelfor (int i = runCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);runCount++;if (next.level() != 0) {outputLevel = next.level();break;}}}if (runCount == runs.size()) {// 如果經過上述邏輯后,所有 run 都被選中了,那么還是合并到最高層updateLastOptimizedCompaction();outputLevel = maxLevel;}return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));}
// ... existing code ...

核心邏輯解讀:

  • runCount?是被選中參與本次合并的 run 的數量。
  • outputLevel = Math.max(0, runs.get(runCount).level() - 1);?這一行代碼是關鍵。它說明,當不是所有文件都參與合并時,輸出層級取決于第一個未被選中的 run?(runs.get(runCount))。目標層級是這個 run 的層級減 1。

舉個例子:?假設我們有以下?runs:?[L0, L0, L0, L2, L4]

  1. pickForSizeRatio?方法可能決定合并前3個 L0 的文件。此時?runCount?= 3。
  2. 進入?createUnit?方法,runs.get(runCount)?就是?runs.get(3),即那個 L2 的 run。
  3. outputLevel?計算為?L2.level() - 1,也就是?2 - 1 = 1
  4. 最終,這3個 L0 文件會被合并成一個新的?L1?文件,而不是?maxLevel

輔助方法

  • pickForSizeRatio(...): 實現了上述的大小比例挑選邏輯。它會從最年輕的文件開始,向后“滾動”合并,直到下一個文件太大不適合合并為止。
  • createUnit(...): 根據挑選出的文件(runCount個),決定輸出層級(outputLevel)。邏輯通常是:如果合并了所有文件,則輸出到最高層;否則,輸出到下一個未被合并的文件的層級減一。它還會保證輸出層級不為 0。
  • forcePickL0(...): 一個特殊的方法,只收集 Level 0 的所有文件,并準備將它們合并。被?ForceUpLevel0Compaction?和?lookupCompactMaxInterval?邏輯所調用。

總結

UniversalCompaction?是一個設計精巧且全面的合并策略,它通過多維度、分優先級的規則來維護 LSM-Tree 的健康狀態:

  • 通過文件數量空間放大來設定合并的“底線”,防止系統狀態惡化。
  • 通過大小比例來智能地挑選合并單元,實現高效合并。
  • 通過周期性全量合并針對 Lookup 的周期性 L0 合并,為不同的應用場景提供了額外的優化選項。

它與?ForceUpLevel0Compaction?的關系是:ForceUpLevel0Compaction?是一種更激進的策略,它完全復用了?UniversalCompaction?的所有邏輯,并在此基礎上增加了一個最終的回退(fallback)邏輯:如果?UniversalCompaction?認為無需合并,它會強制檢查并合并 L0,以滿足 Lookup 場景對數據實時性的極致要求。

CompactUnit

Paimon 中執行一次具體壓縮操作的對象是?CompactUnit,它并不完全等同于一個?SortedRun,而是從一個或多個?SortedRun?中選取出來的文件集合。定位哪些文件需要壓縮,則是由?CompactStrategy(壓縮策略)來決encided。

一個壓縮任務的基本工作單元是?CompactUnit?接口。從它的定義可以看出,一個?CompactUnit?主要包含兩部分信息:

  1. outputLevel(): 壓縮后生成的新文件要歸屬的層級(Level)。
  2. files(): 需要參與本次壓縮的所有數據文件(DataFileMeta)的列表。
public interface CompactUnit {int outputLevel();List<DataFileMeta> files();static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {List<DataFileMeta> files = new ArrayList<>();for (LevelSortedRun run : runs) {files.addAll(run.run().files());}return fromFiles(outputLevel, files);}static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {return new CompactUnit() {@Overridepublic int outputLevel() {return outputLevel;}@Overridepublic List<DataFileMeta> files() {return files;}};}
}

所以,壓縮對象是一組文件的集合(List<DataFileMeta>),而不是單個?SortedRun。一個?CompactUnit?可以包含來自不同?SortedRun?的文件。

SortedRun?是 LSM-Tree 里的一個邏輯概念,代表一個有序的數據文件集合。在 Paimon 中,Level 0 每個文件都是一個獨立的?SortedRun,而在其他 Level,同一層的所有文件構成一個?SortedRun

SortedRun?是挑選壓縮文件時的重要依據,但不是壓縮任務的直接執行對象。壓縮策略會分析各個?SortedRun?的狀態(如數量、大小、層級)來決定是否要發起一次壓縮,以及挑選哪些?SortedRun?里的文件來組成?CompactUnit

如何定位和組織壓縮對象?

這個過程主要分為兩步:策略選擇?和?任務執行

第一步:CompactStrategy?策略選擇

CompactStrategy?接口負責從當前所有?LevelSortedRun(帶有層級信息的?SortedRun)中挑選出需要被壓縮的?CompactUnit

// ... existing code ...
public interface CompactStrategy {/*** Pick compaction unit from runs.** <ul>*   <li>compaction is runs-based, not file-based.*   <li>level 0 is special, one run per file; all other levels are one run per level.*   <li>compaction is sequential from small level to large level.* </ul>*/Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);/** Pick a compaction unit consisting of all existing files. */
// ... existing code ...

例如,UniversalCompaction?策略會檢查是否有太多?SortedRun,如果超過閾值,就會選擇一些相鄰的?SortedRun?合并。FullCompaction?策略則會選擇所有的文件進行合并。

第二步:MergeTreeCompactTask?任務執行

一旦?CompactStrategy?挑選出了一個?CompactUnit,就會創建一個?MergeTreeCompactTask?來執行具體的壓縮邏輯。在你正在查看的?MergeTreeCompactTask.java?文件中,doCompact()?方法清晰地展示了如何處理這個?CompactUnit?里的文件。

  1. 分區(Partitioning):任務首先會根據文件的 key 范圍將?CompactUnit?中的文件分成若干個互不重疊或連續重疊的組(List<List<SortedRun>> partitioned)。
  2. 分類處理
    • 對于有多個?SortedRun?重疊的組,它們是必須被重寫(rewrite)合并的,因此被加入?candidate?列表。
    • 對于沒有重疊的組(只有一個?SortedRun),會進一步判斷:
      • 如果文件很小(小于?minFileSize),它也會被加入?candidate?列表,以便和其它小文件一起合并,避免產生過多小文件。
      • 如果文件很大,通常會直接“升級”(upgrade),即只修改文件的元數據,將其層級提升到?outputLevel,而無需重寫文件內容,這樣效率更高。但如果這個大文件包含需要過期的數據,或者需要被合并到最高層級但自身帶有刪除記錄,它還是會被強制重寫。
// ... existing code ...@Overrideprotected CompactResult doCompact() throws Exception {List<List<SortedRun>> candidate = new ArrayList<>();CompactResult result = new CompactResult();// Checking the order and compacting adjacent and contiguous files// Note: can't skip an intermediate file to compact, this will destroy the overall// orderlinessfor (List<SortedRun> section : partitioned) {if (section.size() > 1) {candidate.add(section);} else {SortedRun run = section.get(0);// No overlapping:// We can just upgrade the large file and just change the level instead of// rewriting it// But for small files, we will try to compact itfor (DataFileMeta file : run.files()) {if (file.fileSize() < minFileSize) {// Smaller files are rewritten along with the previous filescandidate.add(singletonList(SortedRun.fromSingle(file)));} else {// Large file appear, rewrite previous and upgrade itrewrite(candidate, result);upgrade(file, result);}}}}rewrite(candidate, result);result.setDeletionFile(compactDfSupplier.get());return result;}
// ... existing code ...

總結

  • 壓縮對象:一個?CompactUnit?實例,它內部封裝了一組待壓縮的文件列表?(List<DataFileMeta>) 和目標層級。
  • 與?SortedRun?的關系SortedRun?是 LSM-Tree 的邏輯層,是壓縮策略(CompactStrategy)制定計劃的輸入和依據。策略根據?SortedRun?的情況來決定挑選哪些文件組成?CompactUnit
  • 定位方式:通過?CompactStrategy.pick()?方法,根據預設的壓縮策略(如 Universal, Full 等)分析所有?SortedRun,挑選出最需要被壓縮的文件,打包成?CompactUnit,然后交由?MergeTreeCompactTask?執行。

level層次數量的權衡

這是一個關于性能權衡(Trade-off)的經典問題,主要涉及?寫放大(Write Amplification)?和?讀放大(Read Amplification)

降低寫放大(The Main Benefit)是分更多層級的最主要原因。

  • 場景A:只有兩層(L0 和 L-max)?假設我們只有 L0 和 L1(作為最高層)。當 L0 的文件需要合并時,它們需要和 L1 中所有的數據進行合并,然后生成全新的 L1。如果 L1 非常大(比如幾百GB),那么即使 L0 只有很小的一點新數據(比如幾百MB),也需要重寫整個 L1。這個過程消耗的 I/O 非常巨大,這就是“寫放大”——為了寫入少量邏輯數據,卻需要進行大量的物理磁盤寫入。

  • 場景B:有多層(L0, L1, L2, ...)?在這種設計下,合并是逐步進行的。L0 的文件合并成 L1 的文件;當 L1 積累到一定程度,再和 L2 合并,以此類推。每次合并操作所涉及的數據量都相對較小。例如,將幾個100MB的 L0 文件合并成一個 L1 文件,遠比將它們與一個100GB的 L-max 文件合并要快得多。這大大降低了單次合并的成本,使得寫入性能更平滑、更可預測。

平衡讀寫性能

  • 更多層級

    • 優點:寫放大低,寫入平穩。
    • 缺點:讀放大高。因為一條數據可能存在于任何一層,查詢時需要從 L0 到 L-max 逐層查找,直到找到最新的版本。層級越多,需要檢查的地方就越多。
  • 更少層級

    • 優點:讀放大低。查詢時只需要檢查很少的幾個層級。
    • 缺點:寫放大高。合并成本巨大,可能導致寫入性能的劇烈抖動。

Paimon 采用的 Universal Compaction 策略,以及其多層級的結構,正是在這兩種放大效應之間做權衡,目標是提供一個整體表現良好的方案,尤其是在高頻寫入的場景下,通過平滑合并操作來保證寫入的穩定性。

總而言之,分更多層次的核心好處是顯著降低寫放大,以平滑的、低成本的增量合并,替代高成本的全量合并,從而獲得更穩定、高效的寫入性能。這是以犧牲一部分讀取性能(增加讀放大)為代價的,但對于許多數據倉庫和數據湖場景來說,這是一個非常值得的權衡。

AbstractCompactRewriter

AbstractCompactRewriter?是 Apache Paimon 項目中用于合并樹(Merge-Tree)壓縮(Compaction)操作的一個核心抽象基類。它定義了壓縮重寫器(Compact Rewriter)的通用行為和輔助功能,具體的重寫邏輯則由其子類實現。

該類位于?org.apache.paimon.mergetree.compact?包下,從其命名和?abstract?關鍵字可以看出,它是一個抽象類,不能被直接實例化。它實現了?CompactRewriter?接口。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;/** Common implementation of {@link CompactRewriter}. */
public abstract class AbstractCompactRewriter implements CompactRewriter {@Overridepublic CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {return new CompactResult(file, file.upgrade(outputLevel));}protected static List<DataFileMeta> extractFilesFromSections(List<List<SortedRun>> sections) {return sections.stream().flatMap(Collection::stream).map(SortedRun::files).flatMap(Collection::stream).collect(Collectors.toList());}@Overridepublic void close() throws IOException {}
}

它實現了?CompactRewriter?接口,我們先來看一下這個接口的定義:

// ... existing code ...
import java.io.Closeable;
import java.util.List;public interface CompactRewriter extends Closeable {/*** Rewrite sections to new level.* ...*/CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception;/*** Upgrade file to new level, usually file data is not rewritten, only the metadata is updated.* ...*/CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
}

CompactRewriter?接口定義了兩個核心方法?rewrite?和?upgrade,并且繼承了?Closeable?接口,意味著實現類可能持有需要關閉的資源。

AbstractCompactRewriter?提供了部分方法的默認實現,并留下一個抽象方法給子類。

upgrade(int outputLevel, DataFileMeta file)

這個方法提供了?upgrade?操作的默認實現。在 Paimon 的合并樹中,upgrade?通常指將一個文件從較低的 level "提升" 到較高的 level,這個過程大多數情況下不需要重寫文件內容,只需要更新文件的元數據(比如 level 信息)。 這里的實現正是如此:

  1. 它接收一個?DataFileMeta?對象(代表一個數據文件)和一個目標?outputLevel
  2. 它調用?file.upgrade(outputLevel)?創建一個新的?DataFileMeta?對象,新對象包含了更新后的 level 信息。
  3. 最后,它將原始文件和升級后的文件包裝成一個?CompactResult?對象返回。CompactResult?用來封裝壓縮的結果,包含壓縮前的文件列表和壓縮后的文件列表。

extractFilesFromSections(List<List<SortedRun>> sections)

這是一個?protected static?的輔助方法,不屬于接口方法。它的作用是從一個嵌套的?List<List<SortedRun>>?結構中提取出所有底層的?DataFileMeta?文件。

  • sections?代表了待壓縮的多個數據段。
  • 每個?section?是一個?List<SortedRun>
  • 每個?SortedRun?包含一個或多個數據文件 (DataFileMeta)。 這個方法通過 Java Stream API 將這個三層嵌套的結構展平,最終返回一個包含所有待壓縮文件的?List<DataFileMeta>。這在?rewrite?的實現中非常有用,可以方便地獲取所有輸入文件。

close()

該方法提供了?Closeable?接口的默認空實現。這意味著如果子類沒有需要釋放的資源(如文件句柄、網絡連接等),就無需重寫此方法。如果子類持有了需要關閉的資源,則必須重寫此方法以確保資源被正確釋放。

rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)

AbstractCompactRewriter?沒有實現?CompactRewriter?接口中的?rewrite?方法。這正是它被聲明為?abstract?的原因。?rewrite?方法是壓縮操作的核心,它定義了如何讀取多個輸入文件(sections),通過合并、去重等邏輯,然后寫入到一個或多個新的輸出文件(在?outputLevel)。這個過程是復雜且多變的,取決于具體的合并策略(例如,是否保留 changelog、使用哪種合并引擎等)。 因此,AbstractCompactRewriter?將這個最核心、最易變的邏輯交由具體的子類去實現。

實現與應用

在 Paimon 項目中,有多個?AbstractCompactRewriter?的子類,它們根據不同的場景提供了具體的?rewrite?實現。例如:

  • MergeTreeCompactRewriter: 提供了通用的合并樹壓縮邏輯。
  • ChangelogMergeTreeRewriter: 在?MergeTreeCompactRewriter?基礎上,增加了生成 changelog 的能力。
  • FullChangelogMergeTreeCompactRewriter: 一種生成完整 changelog 的策略。
  • LookupMergeTreeCompactRewriter: 適用于?lookup?模式的合并策略。

這些子類會重寫?rewrite?方法,并在內部使用?extractFilesFromSections?輔助方法來獲取輸入文件列表。

例如,在一個測試類?MergeTreeTestBase?中,就有一個內部類?TestRewriter?繼承了?AbstractCompactRewriter?并實現了?rewrite?方法,清晰地展示了其用法:

MergeTreeTestBase.java

Apply

// ... existing code ...private class TestRewriter extends AbstractCompactRewriter {@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception {// 1. 創建用于寫入新文件的 writerRollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);// 2. 創建用于讀取和合并舊文件的 readerRecordReader<KeyValue> reader =MergeTreeReaders.readerForMergeTree(sections,// ... reader and merge function details ...);if (dropDelete) {reader = new DropDeleteReader(reader);}// 3. 執行讀寫操作writer.write(new RecordReaderIterator<>(reader));writer.close();// 4. 使用父類的輔助方法獲取輸入文件,并返回結果return new CompactResult(extractFilesFromSections(sections), writer.result());}}
// ... existing code ...

AbstractCompactRewriter?是一個典型的模板方法模式的應用。它定義了壓縮重寫算法的骨架:

  1. 提供了不變的部分upgrade?方法的通用實現和?close?的空實現。
  2. 提供了可復用的輔助功能extractFilesFromSections?靜態方法。
  3. 定義了易變的部分為抽象方法:將核心的?rewrite?邏輯延遲到子類中實現。

這種設計使得 Paimon 可以靈活地擴展和實現不同場景下的文件壓縮策略,同時保證了代碼的復用性和結構的清晰性。

MergeTreeCompactRewriter

MergeTreeCompactRewriter?是?AbstractCompactRewriter?的一個核心具體實現。在 Paimon 的合并樹(Merge-Tree)結構中,當需要對數據文件進行合并(Compaction)時,這個類提供了默認的、最基礎的重寫(rewrite)邏輯。

// ... existing code ...
import java.util.Comparator;
import java.util.List;/** Default {@link CompactRewriter} for merge trees. */
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
// ... existing code ...

從定義可以看出:

  • 它是一個公開類,位于?org.apache.paimon.mergetree.compact?包下。
  • 它繼承自我們之前分析過的?AbstractCompactRewriter。這意味著它自動獲得了?upgrade?方法的默認實現(僅更新元數據)和?extractFilesFromSections?輔助方法。
  • 它的核心職責是實現?AbstractCompactRewriter?中定義的抽象方法?rewrite

核心成員變量與構造函數

MergeTreeCompactRewriter?的行為由其成員變量(在構造時注入)決定,這體現了依賴注入的設計思想,使得類更加靈活和可測試。

// ... existing code ...
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {protected final FileReaderFactory<KeyValue> readerFactory;protected final KeyValueFileWriterFactory writerFactory;protected final Comparator<InternalRow> keyComparator;@Nullable protected final FieldsComparator userDefinedSeqComparator;protected final MergeFunctionFactory<KeyValue> mfFactory;protected final MergeSorter mergeSorter;public MergeTreeCompactRewriter(FileReaderFactory<KeyValue> readerFactory,KeyValueFileWriterFactory writerFactory,Comparator<InternalRow> keyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionFactory<KeyValue> mfFactory,MergeSorter mergeSorter) {this.readerFactory = readerFactory;this.writerFactory = writerFactory;this.keyComparator = keyComparator;this.userDefinedSeqComparator = userDefinedSeqComparator;this.mfFactory = mfFactory;this.mergeSorter = mergeSorter;}
// ... existing code ...
  • readerFactory: 文件讀取器工廠,用于創建讀取輸入數據文件(SortedRun中的文件)的?RecordReader
  • writerFactory: 文件寫入器工廠,用于創建?RollingFileWriter,將合并后的數據寫入新的輸出文件。
  • keyComparator: 鍵比較器,用于在合并過程中對?KeyValue?的鍵(key)進行排序和比較。
  • userDefinedSeqComparator: 用戶定義的序列號比較器(可選)。Paimon 支持用戶自定義字段作為合并時的排序依據,這個比較器就用于此目的。
  • mfFactory: 合并函數工廠(MergeFunctionFactory),用于創建?MergeFunctionMergeFunction?定義了當遇到相同鍵(key)的多條記錄時,應如何合并它們。例如,可以是去重(Deduplicate)、部分更新(Partial Update)或聚合(Aggregation)等。
  • mergeSorter: 合并排序器,當待合并的數據量過大,無法完全在內存中進行時,MergeSorter?會利用外部排序(External Sort)來處理。

rewrite?方法

這是?CompactRewriter?接口的核心方法。MergeTreeCompactRewriter?的實現非常直接,它將調用轉發給了?rewriteCompaction?方法。這種設計模式允許子類在不改變公共API的情況下,更容易地重寫或擴展核心壓縮邏輯。

// ... existing code ...@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {return rewriteCompaction(outputLevel, dropDelete, sections);}
// ... existing code ...

rewriteCompaction?方法

這是實際執行壓縮重寫邏輯的地方,是整個類的核心。

// ... existing code ...protected CompactResult rewriteCompaction(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {// 1. 創建一個滾動寫入器,用于寫入壓縮后的新文件RollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);RecordReader<KeyValue> reader = null;Exception collectedExceptions = null;try {// 2. 創建一個合并讀取器,它會讀取所有輸入文件,并應用合并邏輯reader =readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));// 3. 如果需要,包裝一個 DropDeleteReader 來過濾掉刪除標記的記錄if (dropDelete) {reader = new DropDeleteReader(reader);}// 4. 執行讀寫:從 reader 讀取合并后的數據,寫入 writerwriter.write(new RecordReaderIterator<>(reader));} catch (Exception e) {collectedExceptions = e;} finally {// 5. 確保資源被關閉try {IOUtils.closeAll(reader, writer);} catch (Exception e) {collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);}}if (null != collectedExceptions) {// 6. 如果發生異常,終止寫入并拋出異常writer.abort();throw collectedExceptions;}// 7. 構造并返回 CompactResultList<DataFileMeta> before = extractFilesFromSections(sections);notifyRewriteCompactBefore(before);return new CompactResult(before, writer.result());}
// ... existing code ...

其執行流程可以分解為:

  1. 創建寫入器:通過?writerFactory?創建一個?RollingFileWriter,準備將合并后的數據寫入到指定?outputLevel?的新文件中。
  2. 創建讀取器:調用?readerForMergeTree?方法創建一個?RecordReader。這個 reader 是一個復合的 reader,它能夠同時讀取多個?SortedRun?中的所有文件,并使用?MergeFunction?在內存中或通過外部排序進行多路歸并。
  3. 處理刪除標記:如果?dropDelete?參數為?true(通常在對最高 level 進行 full compaction 時),則用?DropDeleteReader?包裝原始 reader。這個裝飾器會過濾掉類型為?DELETE?的記錄。
  4. 執行重寫writer.write(new RecordReaderIterator<>(reader))?是核心步驟。它驅動?reader?讀取、合并數據,并將結果逐條寫入?writer
  5. 資源管理與異常處理:使用?try-finally?確保?reader?和?writer?無論成功還是失敗都能被關閉。如果過程中發生異常,會收集起來。
  6. 失敗回滾:如果收集到異常,調用?writer.abort()?來刪除可能已產生的臨時文件,然后將異常拋出。
  7. 返回結果:如果成功,它會調用父類的?extractFilesFromSections?獲取所有輸入文件,并結合?writer.result()(所有新生成的輸出文件),封裝成一個?CompactResult?對象返回。在返回前,會調用一個空的?notifyRewriteCompactBefore?方法,這是一個為子類提供的擴展點。

readerForMergeTree?和?notifyRewriteCompactBefore

  • readerForMergeTree: 這是一個輔助方法,它封裝了創建合并樹讀取器的復雜性,將所有需要的組件(readerFactory,?keyComparator?等)傳遞給?MergeTreeReaders.readerForMergeTree?來構造最終的?RecordReader
  • notifyRewriteCompactBefore: 這是一個空的?protected?方法,充當一個鉤子(Hook)。子類可以重寫此方法來在壓縮完成、返回結果之前執行一些額外的邏輯。例如,LookupMergeTreeCompactRewriter?就重寫了此方法來清理與被壓縮文件相關的刪除向量(Deletion Vectors)。

在系統中的位置和擴展

  • MergeTreeCompactRewriter?是 Paimon 壓縮機制的基石。MergeTreeCompactManager?負責制定壓縮計劃,當它決定執行一個壓縮任務(MergeTreeCompactTask)時,就會使用一個?CompactRewriter?的實例來執行實際的文件讀寫和合并。
  • 這個類是可擴展的。Paimon 中更復雜的 Rewriter,如?ChangelogMergeTreeRewriter(用于生成 changelog)、LookupMergeTreeCompactRewriter(用于 lookup 表)和?FullChangelogMergeTreeCompactRewriter,都直接或間接地繼承自?MergeTreeCompactRewriter,并在其基礎上增加或修改功能。

總結

MergeTreeCompactRewriter?是 Paimon 合并樹 compaction 邏輯的一個標準、通用的實現。它清晰地展示了如何將多個已排序的文件(SortedRun)通過多路歸并、應用合并函數,最終重寫為新的、更緊湊的文件。其設計利用了依賴注入和模板方法/鉤子模式,具有良好的靈活性和擴展性,為更高級的壓縮策略提供了堅實的基礎。

為了更清晰地理解,梳理從頂層到底層的核心調用鏈:

  1. MergeTreeCompactRewriter.readerForMergeTree
  2. MergeTreeReaders.readerForMergeTree
    • 調用?->?ConcatRecordReader.create?將多個?section?的 reader 串聯起來。
    • 為每個 section 調用?->?MergeTreeReaders.readerForSection
  3. MergeTreeReaders.readerForSection
    • 調用?->?MergeSorter.mergeSort?對一個?section?內的所有?SortedRun?進行合并排序。
    • 為每個 SortedRun 調用?->?MergeTreeReaders.readerForRun
  4. MergeTreeReaders.readerForRun
    • 調用?->?ConcatRecordReader.create?將一個?SortedRun?內的所有文件串聯起來。
  5. MergeSorter.mergeSort
    • 調用?->?SortMergeReader.createSortMergeReader?(當數據無需溢出到磁盤時)
  6. SortMergeReader.createSortMergeReader
    • 創建?->?SortMergeReaderWithMinHeap?或?SortMergeReaderWithLoserTree?(最終執行多路歸并的 Reader)

SortMergeReaderWithMinHeap?和?SortMergeReaderWithLoserTree?是兩種經典的多路歸并排序算法的實現。它們接收多個已經排好序的?RecordReader?作為輸入,通過內部的數據結構(最小堆或敗者樹)高效地從所有輸入中找出當前最小的記錄。當遇到主鍵相同的記錄時,它們會調用?MergeFunctionWrapper?來執行用戶定義的合并邏輯(如去重、更新等)。

    IntervalPartition

    IntervalPartition?是 Paimon 在執行 Merge-Tree 壓縮(Compaction)時一個非常核心的工具類。它的主要目標是將一組給定的數據文件(DataFileMeta)進行高效地分組,以便后續進行歸并排序。這個分組算法非常關鍵,因為它直接影響了壓縮任務的并行度和效率。

    從類的注釋中我們可以看到,它的目的是:

    Algorithm to partition several data files into the minimum number of {@link SortedRun}s.

    即:將多個數據文件劃分為最少數目的?SortedRun?的算法

    這里的?SortedRun?代表一組按主鍵有序且鍵范圍互不重疊的文件集合。將文件劃分為最少的?SortedRun?意味著我們可以用最少的歸并路數來完成排序,從而提高效率。

    為了實現這個目標,IntervalPartition?采用了兩層劃分的策略:

    • 第一層:Section(分段)

      • 它首先將所有輸入文件按照主鍵范圍(Key Interval)劃分為若干個?Section
      • 不同 Section 之間的主鍵范圍是完全不重疊的
      • 這樣做的好處是,不同 Section 之間的數據沒有交集,因此可以獨立、并行地進行處理,而不會相互影響。這極大地提高了壓縮的并行能力。
    • 第二層:SortedRun(有序運行)

      • 在每個 Section 內部,文件的主鍵范圍是可能相互重疊的。
      • 算法的目標是在這個 Section 內,將這些可能重疊的文件,組合成最少數目的?SortedRun
      • 每個?SortedRun?內部的文件,按主鍵有序排列,并且它們的主鍵范圍不會重疊。

    最終,IntervalPartition?的輸出是一個二維列表?List<List<SortedRun>>,外層列表代表 Section,內層列表代表該 Section 內劃分出的所有?SortedRun

    構造函數與初始化

    // ... existing code ...public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow> keyComparator) {this.files = new ArrayList<>(inputFiles);this.files.sort((o1, o2) -> {int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());return leftResult == 0? keyComparator.compare(o1.maxKey(), o2.maxKey()): leftResult;});this.keyComparator = keyComparator;}
    // ... existing code ...
    

    構造函數做了非常重要的一步預處理: 對所有輸入的?DataFileMeta?文件,首先按照它們的?minKey(最小主鍵)進行升序排序。如果?minKey?相同,則再按照?maxKey(最大主鍵)進行升序排序。

    這個排序是后續所有劃分算法的基礎,保證了文件是按照主鍵范圍的起始位置被依次處理的。

    partition()?方法:第一層劃分(切分 Section)

    這是該類的入口方法,負責將已排序的文件切分成多個 Section。

    // ... existing code ...public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;}
    // ... existing code ...
    

    其邏輯如下:

    1. 維護一個當前?section?的文件列表和一個當前?section?的右邊界?bound(即?section?中所有文件的?maxKey?的最大值)。
    2. 遍歷所有(已按?minKey?排序的)文件:
      • 將當前文件?meta?加入?section
      • 更新?bound?為當前?section?中所有文件?maxKey?的最大值。
      • 核心判斷:在添加下一個文件之前,檢查它的?minKey?是否大于當前?section?的右邊界?bound
      • 如果?meta.minKey() > bound,說明這個新文件與當前?section?內的所有文件都沒有主鍵范圍的重疊。這意味著一個 Section 到此結束了。
      • 此時,就將當前?section?里的文件列表交給?partition(section)?方法(第二層劃分)處理,得到?List<SortedRun>,然后加入最終結果?result
      • 清空?section?和?bound,開始一個新的 Section。
    3. 循環結束后,處理最后一個?section

    通過這種方式,它有效地將所有文件切分成了若干個主鍵范圍互不重疊的 Section。

    partition(List<DataFileMeta> metas)?方法:第二層劃分(貪心算法生成 SortedRun)

    這個私有方法是算法的精髓所在。它接收一個 Section 內的文件列表(這些文件的主鍵范圍可能重疊),目標是將它們劃分為最少的?SortedRun

    這里使用了一個經典的貪心算法,并借助優先隊列(最小堆)?來實現。

    // ... existing code ...private List<SortedRun> partition(List<DataFileMeta> metas) {PriorityQueue<List<DataFileMeta>> queue =new PriorityQueue<>((o1, o2) ->// sort by max key of the last data filekeyComparator.compare(o1.get(o1.size() - 1).maxKey(),o2.get(o2.size() - 1).maxKey()));// create the initial partitionList<DataFileMeta> firstRun = new ArrayList<>();firstRun.add(metas.get(0));queue.add(firstRun);for (int i = 1; i < metas.size(); i++) {DataFileMeta meta = metas.get(i);// any file list whose max key < meta.minKey() is sufficient,// for convenience we pick the smallestList<DataFileMeta> top = queue.poll();if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {// append current file to an existing partitiontop.add(meta);} else {// create a new partitionList<DataFileMeta> newRun = new ArrayList<>();newRun.add(meta);queue.add(newRun);}queue.add(top);}// order between partitions does not matterreturn queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());}
    }
    

    邏輯分解如下:

    1. 創建一個優先隊列?queue。隊列中的元素是?List<DataFileMeta>,代表一個正在構建中的?SortedRun
    2. 這個優先隊列的排序規則是:按照每個?SortedRun?中最后一個文件的?maxKey?進行升序排序。這意味著,隊首的?SortedRun?是當前所有?SortedRun?中,右邊界最小的那個。
    3. 將 Section 中的第一個文件放入一個新的?SortedRun,并加入隊列。
    4. 遍歷 Section 中剩余的文件?meta
      • 從優先隊列中取出隊首元素?top(即右邊界最小的那個?SortedRun)。
      • 核心判斷:比較當前文件?meta?的?minKey?和?top?的?maxKey
      • 如果?meta.minKey()?大于?top.get(top.size() - 1).maxKey(),說明?meta?文件可以安全地追加到?top?這個?SortedRun?的末尾,而不會破壞其內部的有序性(因為?meta?的范圍在?top?之后)。于是,將?meta?添加到?top?中。
      • 如果?meta.minKey()?不大于?top?的?maxKey,說明?meta?與?top?這個?SortedRun?的范圍有重疊,不能追加。此時,必須為?meta?創建一個新的?SortedRun,并將這個新的?SortedRun?也加入到優先隊列中。
      • 最后,將被修改過(或者未修改)的?top?重新放回優先隊列。
    5. 遍歷結束后,優先隊列?queue?中剩下的所有?List<DataFileMeta>?就是劃分好的、數量最少的?SortedRun?集合。

    這個貪心策略的正確性在于,對于每一個新來的文件,我們總是嘗試將它追加到最早可以結束的那個?SortedRun?后面。這樣可以最大限度地復用現有的?SortedRun,從而保證最終?SortedRun?的數量最少。這在算法上被稱為“區間劃分問題”的一種經典解法。

    總結

    IntervalPartition?通過一個兩階段的劃分過程,巧妙地解決了如何高效組織待壓縮文件的問題:

    1. 宏觀上,通過?partition()?方法按主鍵范圍切分出互不重疊的?Section,為并行處理創造了條件。
    2. 微觀上,在每個?Section?內部,通過?partition(List<DataFileMeta> metas)?方法中的貪心算法,將可能重疊的文件高效地組織成最少數目的?SortedRun,為后續的歸并排序(Merge-Sort)提供了最優的輸入,減少了歸并的復雜度。

    這個類的設計體現了在數據密集型系統中,通過精巧的算法設計來優化核心流程(如Compaction)的典型思路。

    MergeTreeCompactManager

    MergeTreeCompactManager?是 Paimon 中?merge-tree?核心寫流程的心臟,它負責管理和調度數據文件的合并(Compaction)任務。下面我將從幾個關鍵部分來解析這個類。

    MergeTreeCompactManager?繼承自?CompactFutureManager,它的核心職責是:

    • 管理數據文件:通過內部的?Levels?對象,維護所有數據文件(DataFileMeta)在不同層級(Level)的分布情況。
    • 觸發合并任務:根據預設的合并策略(CompactStrategy),決定何時以及哪些文件需要進行合并。
    • 提交和管理合并任務:將選出的文件打包成一個合并單元(CompactUnit),并提交給一個異步執行的?MergeTreeCompactTask?任務。
    • 處理合并結果:獲取異步任務的執行結果(CompactResult),并用它來更新?Levels?中文件的狀態。
    • 控制寫入流速:通過判斷當前文件堆積情況,決定是否需要阻塞上游的寫入(Write)操作,防止因合并速度跟不上寫入速度而導致系統不穩定。

    核心成員變量 (Key Fields)

    我們來看一下這個類中最重要的幾個成員變量,它們定義了?MergeTreeCompactManager?的行為:

    // ... existing code ...
    public class MergeTreeCompactManager extends CompactFutureManager {private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class);private final ExecutorService executor;private final Levels levels;private final CompactStrategy strategy;private final Comparator<InternalRow> keyComparator;private final long compactionFileSize;private final int numSortedRunStopTrigger;private final CompactRewriter rewriter;@Nullable private final CompactionMetrics.Reporter metricsReporter;@Nullable private final DeletionVectorsMaintainer dvMaintainer;private final boolean lazyGenDeletionFile;private final boolean needLookup;@Nullable private final RecordLevelExpire recordLevelExpire;
    // ... existing code ...
    
    • executor: 一個?ExecutorService?線程池,用于異步執行合并任務。
    • levels:?Levels?對象,是 LSM-Tree(Log-Structured Merge-Tree)分層結構的核心體現。它管理著所有的數據文件,并根據 Level 組織它們。
    • strategy:?CompactStrategy,合并策略。它定義了如何從?Levels?中挑選文件進行合并。Paimon 提供了如?UniversalCompaction?和?LevelCompaction?等策略。
    • keyComparator: 主鍵的比較器,用于在合并過程中對數據進行排序。
    • compactionFileSize: 合并后生成的目標文件大小。
    • numSortedRunStopTrigger: 一個非常重要的閾值。當?Levels?中的有序文件片段(Sorted Run)數量超過這個值時,會阻塞寫入操作,等待合并完成。這是控制寫入和合并速度平衡的關鍵。
    • rewriter:?CompactRewriter,合并重寫器。它負責讀取待合并的舊文件,使用?MergeFunction?對數據進行合并,然后寫入新文件。這是實際執行數據合并邏輯的組件。
    • needLookup: 一個布爾值,通常與?changelog-producer?=?lookup?配置相關。當為?true?時,表示在合并時需要通過 lookup 方式生成 changelog。

    triggerCompaction(boolean fullCompaction): 觸發合并

    這是發起合并的入口。

    // ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 處理強制全量合并 ...optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {if (taskFuture != null) {return;}// ...optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}optionalUnit.ifPresent(unit -> {// ...boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ...submitCompaction(unit, dropDelete);});}
    // ... existing code ...
    
    • fullCompaction?參數: 如果為?true,會觸發一次全量合并,通常是將所有文件合并成一個或少數幾個大文件。這是一種比較重的操作,一般由用戶手動觸發。
    • 普通合并: 如果?fullCompaction?為?false,則會調用?strategy.pick(...)?方法,讓合并策略根據當前的?Levels?狀態來決定是否需要合并以及合并哪些文件。
    • 提交任務: 如果策略選出了需要合并的文件(CompactUnit),就會調用?submitCompaction?方法將任務提交到線程池。

    submitCompaction(CompactUnit unit, boolean dropDelete): 提交合并任務

    這個方法負責創建并提交一個真正的合并任務。

    // ... existing code ...private void submitCompaction(CompactUnit unit, boolean dropDelete) {// ...MergeTreeCompactTask task =new MergeTreeCompactTask(keyComparator,compactionFileSize,rewriter,unit,dropDelete,levels.maxLevel(),metricsReporter,compactDfSupplier,recordLevelExpire);// ...taskFuture = executor.submit(task);// ...}
    // ... existing code ...
    

    它將所有需要的組件(如?rewriterkeyComparator?等)和待合并的文件(unit)打包成一個?MergeTreeCompactTask?對象,然后通過?executor.submit(task)?提交給線程池異步執行。

    getCompactionResult(boolean blocking): 獲取合并結果

    當外部(通常是?MergeTreeWriter)需要獲取合并結果時,會調用此方法。

    // ... existing code .../** Finish current task, and update result files to {@link Levels}. */@Overridepublic Optional<CompactResult> getCompactionResult(boolean blocking)throws ExecutionException, InterruptedException {Optional<CompactResult> result = innerGetCompactionResult(blocking);result.ifPresent(r -> {// ...levels.update(r.before(), r.after());MetricUtils.safeCall(this::reportMetrics, LOG);// ...});return result;}
    // ... existing code ...
    
    • 它會從?taskFuture?中獲取任務結果?CompactResultCompactResult?中包含了合并前的文件列表(before)和合并后生成的新文件列表(after)。
    • 最關鍵的一步是?levels.update(r.before(), r.after()),它用新生成的文件替換掉了舊的、已被合并的文件,從而更新了整個?Levels?的狀態。

    shouldWaitForLatestCompaction()?和?compactNotCompleted(): 控制寫入流速

    這兩個方法是 Paimon 實現反壓(Back-Pressure)機制的關鍵。

    // ... existing code ...@Overridepublic boolean shouldWaitForLatestCompaction() {return levels.numberOfSortedRuns() > numSortedRunStopTrigger;}// ... existing code ...@Overridepublic boolean compactNotCompleted() {// If it is a lookup compaction, we should ensure that all level 0 files are consumed, so// here we need to make the outside think that we still need to do unfinished compact// workingreturn super.compactNotCompleted() || (needLookup && !levels().level0().isEmpty());}
    // ... existing code ...
    
    • shouldWaitForLatestCompaction(): 當?Levels?中?SortedRun?的數量超過了?numSortedRunStopTrigger?閾值時,返回?trueMergeTreeWriter?會根據這個返回值決定是否要阻塞寫入,等待合并完成,以釋放一些?SortedRun
    • compactNotCompleted(): 判斷是否還有未完成的合并工作。特別地,如果配置了?needLookup,即使沒有正在運行的合并任務,只要 Level 0 還存在文件,它也會返回?true,以確保 Level 0 的文件能被盡快合并掉,這對于?lookup?生成 changelog 的正確性至關重要。

    總結

    MergeTreeCompactManager?是一個典型的生產者-消費者模型實現。MergeTreeWriter?是生產者,不斷產生新的 L0 層文件;MergeTreeCompactManager?內部的?MergeTreeCompactTask?是消費者,不斷地將多層級的文件合并成更少、更大的文件。

    它通過?Levels?來感知系統狀態,通過?CompactStrategy?來制定決策,通過?CompactRewriter?和?MergeTreeCompactTask?來執行任務,并通過?numSortedRunStopTrigger?等機制來協調與上游生產者的速度,最終保證了整個?merge-tree?寫入模型的穩定和高效。

      Paimon 進行 Compact 是不是就是保留最新的 Key?

      Paimon 的 Compact 行為取決于在表上配置的?merge-engine(合并引擎)。保留最新的 Key 只是其中一種最常見的默認行為。

      Paimon 支持多種合并引擎,來滿足不同的業務場景:

      • deduplicate?(默認值): 這是你提到的“保留最新的 Key”的行為。當多條記錄有相同的主鍵時,在合并(Compaction)過程中,Paimon 只會保留序列號(sequence number)最大的一條記錄,即最新的記錄。
      • partial-update: 部分更新引擎。它允許你只更新一行的某些字段。合并時,它會將同一主鍵的多條記錄的字段進行合并,而不是簡單地保留最新的一整行。對于未更新的字段,會保留其舊值。
      • aggregation: 聚合引擎。它可以在數據寫入過程中,根據預設的聚合函數(如?SUM,?MAX,?MIN,?COUNT?等)對同一主鍵的記錄進行實時聚合。
      • first-row: 保留第一行的引擎。當有相同主鍵的多條數據時,它會保留遇到的第一條數據,后續的數據會被忽略。這在需要按主鍵去重的流式數據入湖場景中很有用。

      這些不同的合并邏輯由不同的?MergeFunction?實現類來處理,例如?DeduplicateMergeFunctionPartialUpdateMergeFunction?和?AggregateMergeFunction

      // ... existing code ...
      public class AggregateMergeFunction implements MergeFunction<KeyValue> {private final InternalRow.FieldGetter[] getters;private final FieldAggregator[] aggregators;
      // ... existing code ...@Overridepublic void add(KeyValue kv) {
      // ... existing code ...for (int i = 0; i < getters.length; i++) {aggregators[i].agg(row, getters[i].getFieldOrNull(value));}}@Overridepublic KeyValue getResult() {
      // ... existing code ...}
      // ... existing code ...
      }
      

      從上面的?AggregateMergeFunction?可以看到,它的?add?方法是在對字段進行聚合(agg),而不是簡單地替換。

      MergeFunction?定義了具體的數據?合并邏輯(比如是去重、聚合還是部分更新),而?MergeFunctionWrapper?則是在?MergeFunction?的基礎上增加了一層?“包裝”,用于處理更復雜的場景,主要是為了?優化性能?和?生成 Changelog

      不同的?MergeFunctionWrapper?實現是為了支持 Paimon 不同的 Changelog 生成策略 (changelog-producer) 和合并場景:

      1. ReducerMergeFunctionWrapper?是一個基礎的、帶有優化的包裝器。從它的文檔和實現可以看出,當一個 Key 只有一條記錄需要合并時,它會直接返回這條記錄,避免了調用內部?MergeFunction?的開銷。它主要用于不需要生成 Changelog 或采用簡單合并策略的場景,只返回合并后的最終結果?KeyValue

        /** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.KeyValue;/*** Wrapper for {@link MergeFunction}s which works like a reducer.** <p>A reducer is a type of function. If there is only one input the result is equal to that input;* Otherwise the result is calculated by merging all the inputs in some way.** <p>This wrapper optimize the wrapped {@link MergeFunction}. If there is only one input, the input* will be stored and the inner merge function will not be called, thus saving some computing time.*/
        public class ReducerMergeFunctionWrapper implements MergeFunctionWrapper<KeyValue> {private final MergeFunction<KeyValue> mergeFunction;private KeyValue initialKv;private boolean isInitialized;public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {this.mergeFunction = mergeFunction;}/** Resets the {@link MergeFunction} helper to its default state. */@Overridepublic void reset() {initialKv = null;mergeFunction.reset();isInitialized = false;}/** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */@Overridepublic void add(KeyValue kv) {if (initialKv == null) {initialKv = kv;} else {if (!isInitialized) {merge(initialKv);isInitialized = true;}merge(kv);}}private void merge(KeyValue kv) {mergeFunction.add(kv);}/** Get current value of the {@link MergeFunction} helper. */@Overridepublic KeyValue getResult() {return isInitialized ? mergeFunction.getResult() : initialKv;}
        }
        
      2. FullChangelogMergeFunctionWrapper: 用于?changelog-producer?=?full-compaction?模式。在這種模式下,Paimon 會在最高層(maxLevel)的文件中查找舊值,并與當前合并的結果進行比較,從而生成?INSERT,?UPDATE_BEFORE,?UPDATE_AFTER,?DELETE?這樣的完整 Changelog。這個 Wrapper 的職責就是協調這個過程。

      3. LookupChangelogMergeFunctionWrapper: 用于?changelog-producer?=?lookup?模式。它會在合并前通過?lookup?的方式去查找舊的記錄,從而生成 Changelog。相比?full-compaction,它可能更高效,因為它不需要總是訪問最高層的文件。

      4. FirstRowMergeFunctionWrapper: 這是一個專為?first-row?合并引擎設計的包裝器,用于處理其獨特的 Changelog 生成邏輯。

      總結一下:

      • MergeFunction?決定了數據?如何合并(What)。
      • MergeFunctionWrapper?決定了合并過程?如何被執行和協調(How),特別是如何與不同的 Changelog 生成策略相結合,并進行性能優化。

      本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
      如若轉載,請注明出處:http://www.pswp.cn/bicheng/88317.shtml
      繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88317.shtml
      英文地址,請注明出處:http://en.pswp.cn/bicheng/88317.shtml

      如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

      相關文章

      小米路由器3C刷OpenWrt,更換系統/變磚恢復 指南

      基礎篇看這里&#xff1a; 小米路由器3C如何安裝OpenWrt官方編譯的ROM - 嗶哩嗶哩 小米路由器 3C 刷入 Breed 和 OpenWrt - Snoopy1866 - 博客園 一、路由器注入 如果按照上面的文章&#xff0c; telnet、ftp一直連接失敗,那么可以嘗試看 這里&#xff1a; 獲取路由器root權…

      Spring Boot 項目啟動時按需初始化加載數據

      1、新建類&#xff0c;類上添加注解 Component &#xff0c;該類用于在項目啟動時處理數據加載任務&#xff1b; 2、該類實現 ApplicationRunner 接口&#xff0c;并重寫 run 方法&#xff1b; 3、在重寫的 run 方法里處理數據加載任務&#xff1b; 注意&#xff1a; 有定時加載…

      MCP快速入門—快速構建自己的服務器

      引言 隨著大語言模型(LLM)技術的快速發展&#xff0c;如何擴展其能力邊界成為開發者關注的重點。MCP(Model Capability Protocol)作為一種協議標準&#xff0c;允許開發者構建自定義服務器來增強LLM的功能。 正文內容 1. MCP核心概念與技術背景 MCP服務器主要提供三種能力類…

      Vue 事件總線深度解析:從實現原理到工程實踐

      在 Vue 組件通信體系中&#xff0c;事件總線&#xff08;Event Bus&#xff09;是處理非父子組件通信的輕量解決方案。本文將從技術實現細節、工程化實踐、內存管理等維度展開&#xff0c;結合源碼級分析與典型場景&#xff0c;帶你全面掌握這一核心技術點。?一、事件總線的技…

      CMake Qt靜態庫中配置qrc并使用

      CMake Qt序言環境代碼序言 看網上這資料較少&#xff0c;且我理解起來有歧義&#xff0c;特地補充 環境 CMake&#xff1a;3.29.2 Qt&#xff1a;5.15.2 MSVC&#xff1a;2022 IDE&#xff1a;QtCreator 代碼 方式一&#xff1a; 在CMakeLists.txt里&#xff0c;add_libr…

      記錄一下:成功部署k8s集群(部分)

      前提條件&#xff1a;安裝了containerd、docker 關閉了firewalld、selinux 配置了時間同步服務 chronyd 關閉swap分區等1、在控制節點、工作節點&#xff0c;安裝kubelet、kubeadm、kubectlyum install -y kubelet-1.26.0 kubeadm-1.26.0 kubectl-1.26.0 …

      Idea如何解決包沖突

      Idea如何解決包沖突1.Error信息&#xff1a;JAR列表。 在掃描期間跳過不需要的JAR可以縮短啟動時間和JSP編譯時間。SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/E:/javapojects/stww-v4-gjtwt-seal/target/stww--v4-platform-proj…

      python 協程學習筆記

      目錄 python 協程 通俗理解 Python 的 asyncio 協程&#xff0c;最擅長的是&#xff1a; 批量下載文件的例子&#xff1a; 協程的優勢&#xff1a; python 協程 通俗理解 def my_coroutine():print("開始")x yield 1print("拿到了&#xff1a;", x)yi…

      【學習筆記】蒙特卡洛仿真與matlab實現

      概述 20 世紀 40 年代&#xff0c;由于電子計算機的出現&#xff0c; 借助計算機可以實現大量的隨機抽樣試驗&#xff0c;為利用隨機試驗方法解決實際問題提供了便捷。 非常具代表性的例子是&#xff0c; 美國在第二次世界大戰期間研制原子彈的“曼哈頓計劃”中&#xff0c;為了…

      HTTP/3.x協議詳解:基于QUIC的下一代Web傳輸協議

      一、HTTP/3協議概述 HTTP/3是超文本傳輸協議&#xff08;HTTP&#xff09;的第三個正式版本&#xff0c;由IETF&#xff08;互聯網工程任務組&#xff09;于2022年正式標準化&#xff08;RFC 9114&#xff09;。其核心創新在于完全基于QUIC協議替代傳統TCP&#xff0c;結合UDP…

      【SQL】使用UPDATE修改表字段的時候,遇到1054 或者1064的問題怎么辦?

      我在使用python連接sql修改表格的時間字段的時候&#xff0c;遇到這樣一個問題&#xff1a;ProgrammingError: (pymysql.err.ProgrammingError) (1064, “You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the ri…

      【字節跳動】數據挖掘面試題0013:怎么做男女二分類問題, 從抖音 app 提供的內容中。

      文章大綱 ?? 一、問題定義與數據基礎數據源及預處理:?? 二、特征工程方案1. 文本特征2. 視覺特征3. 音頻與行為特征4. 上下文特征?? 三、模型選型與訓練1. 基礎模型對比2. 多模態融合模型3. 訓練技巧?? 四、評估與優化策略1. 評估指標2. 典型問題優化3. 算法偏差控制?…

      HTTP請求走私漏洞

      一、漏洞定義與核心原理HTTP請求走私&#xff08;HTTP Request Smuggling&#xff09;是一種利用前端服務器&#xff08;如代理、負載均衡器&#xff09;與后端服務器在解析HTTP請求時的不一致性&#xff0c;繞過安全機制并執行惡意操作的攻擊技術。其核心在于混淆請求邊界&…

      Javaweb - 10.1 Servlet

      目錄 Servlet 簡介 動態資源和靜態資源 Servlet 簡介 Servlet 開發流程 目標 開發過程 開發一個 web 類型的 module 開發一個 form 表單 開發一個 UserServlet 在 web..xml 為 userServlet 配置請求路徑 Edit Configurations 啟動項目 完&#xff01; Servlet 簡介…

      手機能用酒精擦嗎?

      對于電視、電腦屏幕來說&#xff0c;為了避免反光、改善顯示效果&#xff0c;會在屏幕表面覆上一層“抗反射涂層”。不同廠商設計的涂層材料并不相同&#xff0c;酒精作為良好的溶劑&#xff0c;確實會損壞可溶的涂層。手機作為觸控產品&#xff0c;通常會在屏幕表面增加“疏水…

      【圖像處理基石】圖像超分辨率有哪些研究進展值得關注?

      近年來&#xff0c;圖像超分辨率&#xff08;SR&#xff09;領域在深度學習技術的推動下取得了顯著進展&#xff0c;尤其在模型架構優化、計算效率提升和真實場景適應性等方面涌現出諸多創新。以下是基于最新研究的核心進展梳理&#xff1a; 一、高效大圖像處理&#xff1a;像素…

      Windows系統下WSL從C盤遷移方案

      原因&#xff1a;一開始裝WSL的時候放在了C盤&#xff0c;這下好了&#xff0c;跑了幾個深度學習模型訓練后&#xff0c;C盤快滿了&#xff0c;這可怎么辦&#xff1f;可愁壞了。沒關系&#xff0c;山人自有妙計。我們將WSL遷移到D盤或者E盤呀。一.遷移操作步驟前期準備&#x…

      金融時間序列機器學習訓練前的數據格式驗證系統設計與實現

      金融時間序列機器學習訓練前的數據格式驗證系統設計與實現 前言 在機器學習項目中&#xff0c;數據質量是決定模型成功的關鍵因素。特別是在金融時間序列分析領域&#xff0c;原始數據往往需要經過復雜的預處理才能用于模型訓練。本文將詳細介紹一個完整的數據格式驗證系統&…

      cocos2dx3.x項目升級到xcode15以上的iconv與duplicate symbols報錯問題

      cocos2dx3.x項目升級xcode15以上后會有幾處報錯。1. CCFontAtlas.cpp文件下的iconv與iconv_close的報錯。修改如下&#xff1a;// iconv_close(_iconv);iconv_close((iconv_t)_iconv);iconv((iconv_t)_iconv, (char**)&pin, &inLen, &pout, &outLen); /…

      HTTP/3.0的連接遷移使用連接ID來標識連接為什么可以做到連接不會中斷

      一定要結合圖文一起理解&#xff01;&#xff01; 文章目錄文字描述傳統方式&#xff1a;HTTP/2 基于 TCP 的連接&#xff08;就像打固定電話&#xff09;HTTP/3 基于 QUIC 的連接遷移&#xff08;就像用帶“通話ID”的手機&#xff09;總結一下圖文詳解HTTP2.0傳統方式&#x…