壓縮怎么進行的
這里的操作都是KValue,內部有row kind,標記了刪除和插入
MergeTreeCompactManager
?是 Paimon 中 Merge-Tree 結構壓縮任務的總調度中心。它的核心職責就是監控文件的層級狀態(Levels
),并在合適的時機,根據預設的策略,挑選文件并發起壓縮任務。
triggerCompaction
?方法正是這個調度中心的核心入口。下面我們來詳細拆解它的邏輯和涉及的策略。
這個方法有兩個主要的執行路徑,由?fullCompaction
?參數控制:
fullCompaction = true
: 強制執行一次“完全合并”。通常由用戶手動觸發(比如通過?ALTER TABLE ... COMPACT
?命令)。它會嘗試將所有數據文件合并成盡可能少的文件。fullCompaction = false
: 執行一次“常規合并”。這是系統在日常寫入過程中自動觸發的,目的是維持 LSM-Tree 的健康結構,防止某一層級文件過多,影響讀取性能。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;// 1. 獲取當前所有層級的 SortedRun 視圖List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 省略日志和檢查 ...// 2a. 如果是完全合并,調用 FullCompaction 策略optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {// ... 省略日志和檢查 ...// 2b. 如果是常規合并,調用配置的 strategy (如 UniversalCompaction)optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}// 3. 如果策略選出了需要合并的文件單元 (CompactUnit)optionalUnit.ifPresent(unit -> {// 4. 決定是否可以丟棄刪除標記 (dropDelete)boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ... 省略日志 ...// 5. 提交壓縮任務submitCompaction(unit, dropDelete);});}
// ... existing code ...
完全合并策略 (CompactStrategy.pickFullCompaction
)
當?fullCompaction
?為?true
?時,會調用?CompactStrategy.pickFullCompaction
。這個策略非常直接:
- 目標:將所有層級的所有文件合并到最高層級(
maxLevel
)。 - 行為:它會收集?
levels
?中的所有?DataFileMeta
,將它們打包成一個巨大的?CompactUnit
,并將?outputLevel
?設置為?levels.maxLevel()
。 - 應用場景:
- 用戶希望整理數據碎片,減少文件總數,優化后續的查詢性能。
- 清理過期數據(如果配置了?
recordLevelExpire
)。因為只有在最高層級的合并才能確保數據不會再被舊版本覆蓋,從而安全地物理刪除。
常規合并策略 (strategy.pick
)
當?fullCompaction
?為?false
?時,會調用構造函數中傳入的?strategy
?實例的?pick
?方法。在 Paimon 中,最常用的常規策略是?UniversalCompaction
。
UniversalCompaction
?策略模擬了 RocksDB 的 Universal Compaction Style,其核心思想是:
- 目標:維持一個健康的、層級分明的文件結構,避免 Level 0 文件堆積過多,同時控制寫放大和空間放大。
- 觸發條件:
- Sorted Run 數量閾值:當總的?
SortedRun
?數量(Level 0 每個文件算一個 run,其他 level 每層算一個 run)超過?num-sorted-run.compaction-trigger
?配置時,會觸發合并。這是最主要的觸發條件。 - 空間放大閾值:當所有文件的總大小,遠大于最高層級文件的大小時(超過?
max-size-amplification-percent
?配置的比例),也會觸發合并,以回收空間。
- Sorted Run 數量閾值:當總的?
- 挑選文件的邏輯:
- 它會從 Level 0 開始,向上檢查,找到第一個滿足合并條件的層級。
- 通常,它會選擇將 Level 0 的所有文件合并到 Level 1,或者將 Level?
i
?的所有文件合并到 Level?i+1
。 - 它會盡量選擇相鄰的、大小相似的?
SortedRun
?進行合并,以達到最優效率。
filter
?的作用: 在?triggerCompaction
?方法中,strategy.pick
?的結果后面跟了兩個?.filter(...)
?調用。.filter(unit -> unit.files().size() > 0)
: 確保選出的壓縮單元不是空的。.filter(unit -> unit.files().size() > 1 || unit.files().get(0).level() != unit.outputLevel())
: 這是一個重要的優化。它排除了“只有一個文件,并且輸出層級和當前層級相同”的情況。這種情況意味著文件只是被“升級”(upgrade),而沒有實際的合并重寫,這通常在?MergeTreeCompactTask
?內部處理更高效,無需作為一個獨立的壓縮任務提交。
dropDelete
?策略
在確定了要合并的文件后,代碼會計算一個?dropDelete
?布爾值。
- 含義:
dropDelete
?為?true
?表示在合并重寫數據時,可以直接丟棄掉類型為?DELETE
?的記錄。 - 條件:
unit.outputLevel() != 0
: 輸出層級不能是 Level 0。因為 Level 0 可能還有其他更老的文件沒有參與本次合并,如果丟棄了刪除標記,可能會導致本該被刪除的數據重新“復活”。unit.outputLevel() >= levels.nonEmptyHighestLevel()
: 輸出層級必須是當前數據存在的最高層級或更高。這確保了合并產生的新文件是“最老”的,不存在比它更老的數據了,因此刪除標記可以被安全地物理移除。dvMaintainer != null
: 如果表開啟了刪除向量(Deletion Vectors),則邏輯會有所不同,通常也可以安全地處理刪除。
總結
MergeTreeCompactManager
?的?triggerCompaction
?方法是一個精密的調度器,它通過?levels
?對象感知整個表的物理文件狀態,并執行以下流程:
- 獲取狀態:從?
levels
?中獲取所有文件的層級和分布信息(levelSortedRuns
)。 - 策略決策:
- 如果是手動觸發的全量合并,則使用?
pickFullCompaction
?策略,將所有文件打包,目標是合并到最高層。 - 如果是自動觸發的常規合并,則使用?
UniversalCompaction
?等策略,根據?SortedRun
?數量、空間放大等指標,智能地選擇一部分文件進行層級推進式的合并。
- 如果是手動觸發的全量合并,則使用?
- 任務優化:通過?
filter
?過濾掉無需執行的或無效的壓縮單元。 - 參數計算:根據合并的目標層級和?
levels
?的整體狀態,計算出是否可以安全地在合并中物理刪除數據(dropDelete
)。 - 提交執行:最后將包含待合并文件、目標層級、
dropDelete
?標志的?CompactUnit
?封裝成?MergeTreeCompactTask
,提交到線程池執行。
整個過程完美地體現了 LSM-Tree 架構通過后臺合并來平衡寫入性能和查詢性能的核心思想。
壓縮策略
CompactStrategy.pickFullCompaction
?- 全量合并策略
這是一個定義在?CompactStrategy
?接口中的靜態方法,代表了一種最徹底的合并策略。它的目標非常明確:將表(或分區)中的所有數據文件合并成盡可能少的文件,并放置到最高層級(max level)。
a. 源碼分析
// ... existing code ...static Optional<CompactUnit> pickFullCompaction(int numLevels,List<LevelSortedRun> runs,@Nullable RecordLevelExpire recordLevelExpire) {int maxLevel = numLevels - 1;if (runs.isEmpty()) {// no sorted run, no need to compactreturn Optional.empty();} else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {if (recordLevelExpire == null) {// only 1 sorted run on the max level and don't check record-expire, no need to// compactreturn Optional.empty();}// pick the files which has expired recordsList<DataFileMeta> filesContainExpireRecords = new ArrayList<>();for (DataFileMeta file : runs.get(0).run().files()) {if (recordLevelExpire.isExpireFile(file)) {filesContainExpireRecords.add(file);}}return Optional.of(CompactUnit.fromFiles(maxLevel, filesContainExpireRecords));} else {return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}
// ... existing code ...
邏輯分解如下:
- 計算最高層級:
maxLevel = numLevels - 1
。 - 處理邊界情況:
- 如果?
runs
?為空(沒有任何文件),直接返回?Optional.empty()
,無需合并。 - 如果已經只剩下一個?
SortedRun
?并且它已經在最高層級?(runs.size() == 1 && runs.get(0).level() == maxLevel
),這說明數據已經是最優狀態了。- 此時,會檢查是否配置了記錄級過期 (
recordLevelExpire
)。 - 如果沒有配置過期,那么就無需做任何事,返回?
Optional.empty()
。 - 如果配置了過期,它會遍歷這個?
SortedRun
?中的所有文件,挑出那些可能包含過期數據的文件(通過?recordLevelExpire.isExpireFile(file)
?判斷),然后只對這些文件進行一次“自我合并”以清理過期數據。
- 此時,會檢查是否配置了記錄級過期 (
- 如果?
- 常規全量合并:
- 如果不滿足上述邊界情況(比如有多于一個?
SortedRun
,或者唯一的?SortedRun
?不在最高層),則執行標準的全量合并。 - 它會調用?
CompactUnit.fromLevelRuns(maxLevel, runs)
,將所有傳入的?runs
?中的文件都包含進來,創建一個?CompactUnit
,并指定輸出層級為?maxLevel
。
- 如果不滿足上述邊界情況(比如有多于一個?
b. 應用場景
- 用戶手動觸發:最常見的場景是用戶通過 Flink SQL?
CALL sys.compact(...)
?或 Spark Procedure?CALL sys.compact(...)
?并指定?full
?模式來執行。 - 目的:
- 減少小文件:將長時間積累的大量小文件合并成少量大文件,顯著提升后續的查詢性能。
- 物理刪除:全量合并到最高層級是安全地物理刪除帶有刪除標記(DELETE)的數據的唯一時機。
- 數據清理:配合 TTL,清理過期數據。
?ForceUpLevel0Compaction
?- 強制提升 Level 0 策略
這是一個實現了?CompactStrategy
?接口的類,它代表了一種更激進的常規合并策略。它的核心思想是:優先采用通用的?UniversalCompaction
?策略;如果通用策略認為無需合并,則強制檢查 Level 0 是否有文件,如果有,就將它們全部合并。
a. 源碼分析
/** A {@link CompactStrategy} to force compacting level 0 files. */
public class ForceUpLevel0Compaction implements CompactStrategy {private final UniversalCompaction universal;public ForceUpLevel0Compaction(UniversalCompaction universal) {this.universal = universal;}@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {// 1. 首先嘗試通用的 UniversalCompaction 策略Optional<CompactUnit> pick = universal.pick(numLevels, runs);if (pick.isPresent()) {// 如果通用策略找到了需要合并的文件,就直接采納它的決定return pick;}// 2. 如果通用策略認為不需要合并,則執行強制邏輯// 調用 universal.forcePickL0,這個方法會專門檢查 L0return universal.forcePickL0(numLevels, runs);}
}
forcePickL0
?的邏輯在?UniversalCompaction.java
?中:
// ... existing code ...Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {// 收集所有 level 0 的文件int candidateCount = 0;for (int i = candidateCount; i < runs.size(); i++) {if (runs.get(i).level() > 0) {break;}candidateCount++;}// 如果 L0 沒有文件,返回空;否則,將所有 L0 文件打包成一個壓縮單元return candidateCount == 0? Optional.empty(): Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));}
// ... existing code ...
邏輯分解如下:
- 委托與回退(Delegate and Fallback):
ForceUpLevel0Compaction
?首先將決策權委托給內部的?UniversalCompaction
?實例。UniversalCompaction
?會根據文件數量、大小比例等常規指標判斷是否需要合并。 - 強制檢查 L0:如果?
UniversalCompaction
?的常規檢查沒有觸發合并(返回了?Optional.empty()
),ForceUpLevel0Compaction
?會執行它的“強制”邏輯:調用?universal.forcePickL0
。 forcePickL0
?的行為:這個方法很簡單,它只看 Level 0。只要 Level 0 存在任何文件,它就會把 Level 0 的所有文件都收集起來,創建一個?CompactUnit
,準備將它們向上合并。
b. 應用場景
- Lookup Join 優化:這是此策略最主要的應用場景。當 Paimon 表作為 Flink 的維表進行?
lookup join
?時,為了保證維表數據的近實時性,我們希望新寫入的數據(總是在 Level 0)能盡快地被合并到更高層級,從而對?lookup
?可見。 lookup.compact-mode = 'radical'
:當用戶配置此參數時,系統就會啟用?ForceUpLevel0Compaction
?策略。radical
(激進的)這個詞很形象地描述了它的行為:只要有新數據寫入(即 L0 有文件),就盡快地、激進地將它合并掉,以犧牲一些合并開銷為代價,換取數據可見性的延遲降低。
Paimon 的優化:Lookup 模式下跳過 L0
為了解決上述性能問題,Paimon 在開啟了特定優化后(比如配置了?lookup
?changelog-producer 或開啟了刪除向量),其?LocalTableQuery
?會采取一種特殊的讀取策略:
默認從 Level 1 開始讀取數據,直接忽略 Level 0!
這可以在?LocalTableQuery.java
?的構造函數中找到證據:
// ... existing code ...this.lookupStoreFactory =LookupStoreFactory.create(
// ... existing code ...new RowCompactedSerializer(keyType).createSliceComparator());if (options.needLookup()) {// 關鍵點:如果開啟了 lookup 優化,起始讀取層級設置為 1startLevel = 1;} else {
// ... existing code ...}
// ... existing code ...
options.needLookup()
?會在滿足某些條件時(如?changelog-producer = 'lookup'
)返回?true
。當它為?true
?時,startLevel
?被設為?1
,這意味著后續的所有查找操作都將從 Level 1 開始,從而完全避開了低效的 Level 0。
ForceUpLevel0Compaction
?的作用:讓 L0 數據盡快可見
現在,整個邏輯就閉環了:
- 新數據寫入 Paimon 表,生成 L0 文件。
- 高性能的 Lookup Join 讀取器為了性能,只看 L1 及以上層級的文件。
- 此時,新寫入的 L0 文件對于這個 Lookup Join 來說,就是“不可見”的。
- 為了解決這個問題,必須盡快地將 L0 的文件合并(Compaction)到 L1。
ForceUpLevel0Compaction
?策略應運而生。它的行為非常“激進” (radical
):只要發現 L0 有文件,不管滿足不滿足常規的合并觸發條件,都強制發起一次 Compaction,將它們推向 L1。
這樣一來,新數據停留在 L0 的時間窗口被大大縮短,從而保證了數據能夠近實時地對 Lookup Join 可見。
UniversalCompaction
?
它是 Paimon 中最核心、最常用的常規壓縮策略,其設計思想借鑒了 RocksDB 的 Universal Compaction,旨在平衡寫入放大、讀取放大和空間放大。
UniversalCompaction
?作為一個?CompactStrategy
?的實現,它的主要職責是在常規寫入流程中,根據一系列預設規則,判斷是否需要觸發一次合并(Compaction),并挑選出具體要合并哪些文件。
它的核心目標是:
- 控制寫入放大(Write Amplification):避免過于頻繁地重寫數據。
- 維持健康的 LSM-Tree 結構:防止 L0 文件過多,或者文件大小差異過大,從而保證讀取性能(Read Amplification)和空間占用(Space Amplification)在一個可接受的范圍內。
我們先從它的成員變量入手,這些變量定義了?UniversalCompaction
?策略的行為準則。
// ... existing code ...
public class UniversalCompaction implements CompactStrategy {private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class);// 對應 'compaction.max-size-amplification-percent'private final int maxSizeAmp; // 對應 'compaction.sorted-run.size-ratio'private final int sizeRatio;// 對應 'compaction.sorted-run.num-compaction-trigger'private final int numRunCompactionTrigger;// 對應 'compaction.optimization-interval'@Nullable private final Long opCompactionInterval;@Nullable private Long lastOptimizedCompaction;// 對應 'lookup.compact-max-interval'@Nullable private final Integer maxLookupCompactInterval;@Nullable private final AtomicInteger lookupCompactTriggerCount;public UniversalCompaction(int maxSizeAmp,int sizeRatio,int numRunCompactionTrigger,@Nullable Duration opCompactionInterval,@Nullable Integer maxLookupCompactInterval) {this.maxSizeAmp = maxSizeAmp;this.sizeRatio = sizeRatio;this.numRunCompactionTrigger = numRunCompactionTrigger;this.opCompactionInterval =opCompactionInterval == null ? null : opCompactionInterval.toMillis();this.maxLookupCompactInterval = maxLookupCompactInterval;this.lookupCompactTriggerCount =maxLookupCompactInterval == null ? null : new AtomicInteger(0);}
// ... existing code ...
maxSizeAmp
:?最大空間放大百分比。控制除最老的一個 Sorted Run 外,其他所有 Sorted Run 的總大小,不能超過最老的 Sorted Run 大小的?maxSizeAmp / 100
?倍。這是為了控制空間浪費。sizeRatio
:?大小比例。在挑選文件進行合并時,如果當前候選文件集合的總大小,與下一個 Sorted Run 的大小比例在?sizeRatio
?之內,就會把下一個 Sorted Run 也納入本次合并。這是為了傾向于合并大小相近的文件,提高效率。numRunCompactionTrigger
:?文件數量觸發器。當總的 Sorted Run 數量超過這個閾值時,會強制觸發一次合并。這是最主要的常規合并觸發條件。opCompactionInterval
:?優化合并時間間隔。一個可選的配置,用于周期性地觸發一次全量合并(將所有文件合并到最高層),以保證讀取性能。maxLookupCompactInterval
:?Lookup 場景的合并間隔。這是為?lookup.compact-mode = 'gentle'
?模式設計的。它不要求每次寫入都強制合并 L0,而是每隔 N 次?pick
?調用(即 N 次 compaction 檢查)后,如果 L0 還有文件,就強制合并一次。這是一種在性能和數據延遲之間的折中。
核心方法?pick
?- 決策中心
pick
?方法是整個策略的核心,它按照優先級從高到低的順序,檢查是否滿足各種觸發條件。只要有一個條件滿足,就會生成一個?CompactUnit
?并返回,后續的檢查就不再進行。
// ... existing code ...@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {int maxLevel = numLevels - 1;// 優先級 0: 周期性優化合并 (如果配置了)if (opCompactionInterval != null) {if (lastOptimizedCompaction == null|| currentTimeMillis() - lastOptimizedCompaction > opCompactionInterval) {// ...return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}// 優先級 1: 檢查空間放大 (pickForSizeAmp)CompactUnit unit = pickForSizeAmp(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 優先級 2: 檢查大小比例 (pickForSizeRatio)unit = pickForSizeRatio(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 優先級 3: 檢查文件數量 (numRunCompactionTrigger)if (runs.size() > numRunCompactionTrigger) {// ...return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));}// 優先級 4: 檢查 Lookup 場景的周期性強制合并 (如果配置了)if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {lookupCompactTriggerCount.getAndIncrement();if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {// ...return forcePickL0(numLevels, runs);} // ...}return Optional.empty();}
// ... existing code ...
觸發條件分析 (按優先級)
周期性優化合并 (
opCompactionInterval
): 最高優先級。如果配置了該參數,并且距離上次優化合并的時間已經超過了指定間隔,它會直接觸發一次全量合并,將所有?runs
?合并到?maxLevel
。這對于需要定期整理數據以保證查詢性能的場景非常有用。空間放大 (
pickForSizeAmp
): 檢查?(所有文件總大小 - 最老文件大小) / 最老文件大小
?是否超過了?maxSizeAmp
。如果超過,說明非最高層的數據(即“增量”數據)相對于“存量”數據來說過于龐大,占用了過多額外空間。此時也會觸發一次全量合并。大小比例 (
pickForSizeRatio
): 這是最常規的合并挑選邏輯。它會從最年輕的文件(runs
?列表的開頭)開始,逐個向后累加,只要當前累加的總大小與下一個文件的大小比例在?sizeRatio
?之內,就繼續向后吞并。這個過程會形成一個大小比較均勻的合并候選集。文件數量 (
numRunCompactionTrigger
): 如果總的?SortedRun
?數量超過了閾值,說明文件過于碎片化,會嚴重影響性能。此時會強制觸發一次合并。它會計算出需要合并掉多少個文件才能使總數降到閾值以下,然后調用?pickForSizeRatio
?來挑選這些文件。Lookup 周期性合并 (
maxLookupCompactInterval
): 這是最低優先級的檢查。它維護一個原子計數器?lookupCompactTriggerCount
。每次調用?pick
?都會使其加一。當計數器達到?maxLookupCompactInterval
?時,就會重置為 0 并調用?forcePickL0
,強制合并 L0 的所有文件。這為?lookup
?場景提供了一種“溫和”(gentle
)的合并模式。
pickForSizeRatio
?
public CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {long candidateSize = candidateSize(runs, candidateCount);for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;}if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);}return null;}
這個方法是 Paimon 中 Universal Compaction(通用合并)策略的核心部分之一,它根據文件大小比例來決定哪些文件(SortedRun
)應該被合并。
int maxLevel
: Merge-Tree 的最大層級數。合并后的文件通常會被放到更高的層級。List<LevelSortedRun> runs
: 當前所有層級的、已排序的?SortedRun
?列表。這個列表通常是按從新到舊(或從小到大)的順序排列的。int candidateCount
: 初始候選?SortedRun
?的數量。方法會從列表的前?candidateCount
?個?run
?開始考慮合并。boolean forcePick
: 是否強制選擇。如果為?true
,即使最終只選出了一個?run
,也會創建合并單元(CompactUnit
)。這在某些強制合并的場景下很有用。
這個方法的主要邏輯是:從一組初始的候選文件(runs
)開始,不斷地嘗試將后續的文件也加入到這次合并任務中,直到遇到一個尺寸“過大”的文件為止。
代碼執行流程如下:
計算初始候選文件總大小
long candidateSize = candidateSize(runs, candidateCount);
首先,它會計算由?
candidateCount
?指定的初始候選?run
?的總大小。迭代選擇更多文件
for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++; }
接著,它會遍歷剩余的?
run
。對于每一個?next
?run,它會檢查一個關鍵條件:?candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()
sizeRatio
?是一個配置項 (compaction.size-ratio
),表示尺寸比較時的靈活性百分比。- 這個條件判斷的是:當前已選中的文件總大小(
candidateSize
),即使加上?sizeRatio
?百分比的“寬容度”,是否仍然小于下一個文件(next
)的大小。 - 如果條件成立,意味著下一個文件?
next
?比當前已選中的文件總和要大得多,將它們合并的性價比不高。因此,循環中斷(break
),不再選擇更多的文件。 - 如果條件不成立,意味著?
next
?文件的大小和當前已選中的文件總大小在同一個量級,適合一起合并。于是,將?next
?文件加入候選集,更新?candidateSize
?和?candidateCount
。
創建合并單元
if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount); }return null;
循環結束后,方法會判斷是否需要創建合并任務 (
CompactUnit
)。- 如果?
forcePick
?為?true
,或者最終選出的文件數量?candidateCount
?大于1(合并至少需要兩個文件才有意義),就會調用?createUnit
?方法來創建一個包含所有選中文件的?CompactUnit
。 - 否則,說明沒有找到合適的合并機會,返回?
null
。
- 如果?
在?UniversalCompaction
?類中,這個方法主要在以下幾個場景被調用:
- 常規大小比例檢查 (
pickForSizeRatio(maxLevel, runs)
): 這是最常見的用法,從第一個?run
?開始(candidateCount=1
),嘗試尋找合適的合并機會。 - 文件數量觸發 (
pick
?方法中): 當總文件數超過閾值?numRunCompactionTrigger
?時,會觸發合并。此時,初始?candidateCount
?會被設置為?runs.size() - numRunCompactionTrigger + 1
,然后調用此方法來決定最終合并哪些文件。 - 強制L0層合并 (
forcePickL0
?方法中): 在某些情況下(例如,為了降低查找延遲),需要強制合并L0層的所有文件。這時會調用此方法,并將?forcePick
?設置為?true
,確保即使L0只有一個文件也會被打包成一個合并任務。
pickForSizeRatio
?方法實現了一種智能的合并文件選擇策略。它傾向于將大小相近的文件進行合并,避免了將一個很小的文件和一個巨大的文件進行合并所帶來的高I/O開銷,從而提高了合并效率。通過?sizeRatio
?參數提供了靈活性,并通過?forcePick
?參數支持了強制合并的場景。
createUnit()
?方法:目標層級的決定者
createUnit
?方法是真正計算?outputLevel
?的地方,這是理解整個機制的關鍵。
// ... existing code ...@VisibleForTestingCompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {int outputLevel;if (runCount == runs.size()) {// 如果所有 run 都參與合并,目標就是最高層outputLevel = maxLevel;} else {// 否則,目標層級是下一個未參與合并的 run 的層級減 1// 這是最核心的邏輯outputLevel = Math.max(0, runs.get(runCount).level() - 1);}if (outputLevel == 0) {// 為了避免產生新的 level 0 文件,這里做了特殊處理// 如果計算出的目標是 level 0,會繼續往后尋找,直到找到一個非 level 0 的 run// 并將它的層級作為 outputLevelfor (int i = runCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);runCount++;if (next.level() != 0) {outputLevel = next.level();break;}}}if (runCount == runs.size()) {// 如果經過上述邏輯后,所有 run 都被選中了,那么還是合并到最高層updateLastOptimizedCompaction();outputLevel = maxLevel;}return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));}
// ... existing code ...
核心邏輯解讀:
runCount
?是被選中參與本次合并的 run 的數量。outputLevel = Math.max(0, runs.get(runCount).level() - 1);
?這一行代碼是關鍵。它說明,當不是所有文件都參與合并時,輸出層級取決于第一個未被選中的 run?(runs.get(runCount)
)。目標層級是這個 run 的層級減 1。
舉個例子:?假設我們有以下?runs
:?[L0, L0, L0, L2, L4]
pickForSizeRatio
?方法可能決定合并前3個 L0 的文件。此時?runCount
?= 3。- 進入?
createUnit
?方法,runs.get(runCount)
?就是?runs.get(3)
,即那個 L2 的 run。 outputLevel
?計算為?L2.level() - 1
,也就是?2 - 1 = 1
。- 最終,這3個 L0 文件會被合并成一個新的?L1?文件,而不是?
maxLevel
。
輔助方法
pickForSizeRatio(...)
: 實現了上述的大小比例挑選邏輯。它會從最年輕的文件開始,向后“滾動”合并,直到下一個文件太大不適合合并為止。createUnit(...)
: 根據挑選出的文件(runCount
個),決定輸出層級(outputLevel
)。邏輯通常是:如果合并了所有文件,則輸出到最高層;否則,輸出到下一個未被合并的文件的層級減一。它還會保證輸出層級不為 0。forcePickL0(...)
: 一個特殊的方法,只收集 Level 0 的所有文件,并準備將它們合并。被?ForceUpLevel0Compaction
?和?lookupCompactMaxInterval
?邏輯所調用。
總結
UniversalCompaction
?是一個設計精巧且全面的合并策略,它通過多維度、分優先級的規則來維護 LSM-Tree 的健康狀態:
- 通過文件數量和空間放大來設定合并的“底線”,防止系統狀態惡化。
- 通過大小比例來智能地挑選合并單元,實現高效合并。
- 通過周期性全量合并和針對 Lookup 的周期性 L0 合并,為不同的應用場景提供了額外的優化選項。
它與?ForceUpLevel0Compaction
?的關系是:ForceUpLevel0Compaction
?是一種更激進的策略,它完全復用了?UniversalCompaction
?的所有邏輯,并在此基礎上增加了一個最終的回退(fallback)邏輯:如果?UniversalCompaction
?認為無需合并,它會強制檢查并合并 L0,以滿足 Lookup 場景對數據實時性的極致要求。
CompactUnit
Paimon 中執行一次具體壓縮操作的對象是?CompactUnit
,它并不完全等同于一個?SortedRun
,而是從一個或多個?SortedRun
?中選取出來的文件集合。定位哪些文件需要壓縮,則是由?CompactStrategy
(壓縮策略)來決encided。
一個壓縮任務的基本工作單元是?CompactUnit
?接口。從它的定義可以看出,一個?CompactUnit
?主要包含兩部分信息:
outputLevel()
: 壓縮后生成的新文件要歸屬的層級(Level)。files()
: 需要參與本次壓縮的所有數據文件(DataFileMeta
)的列表。
public interface CompactUnit {int outputLevel();List<DataFileMeta> files();static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {List<DataFileMeta> files = new ArrayList<>();for (LevelSortedRun run : runs) {files.addAll(run.run().files());}return fromFiles(outputLevel, files);}static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {return new CompactUnit() {@Overridepublic int outputLevel() {return outputLevel;}@Overridepublic List<DataFileMeta> files() {return files;}};}
}
所以,壓縮對象是一組文件的集合(List<DataFileMeta>
),而不是單個?SortedRun
。一個?CompactUnit
?可以包含來自不同?SortedRun
?的文件。
SortedRun
?是 LSM-Tree 里的一個邏輯概念,代表一個有序的數據文件集合。在 Paimon 中,Level 0 每個文件都是一個獨立的?SortedRun
,而在其他 Level,同一層的所有文件構成一個?SortedRun
。
SortedRun
?是挑選壓縮文件時的重要依據,但不是壓縮任務的直接執行對象。壓縮策略會分析各個?SortedRun
?的狀態(如數量、大小、層級)來決定是否要發起一次壓縮,以及挑選哪些?SortedRun
?里的文件來組成?CompactUnit
。
如何定位和組織壓縮對象?
這個過程主要分為兩步:策略選擇?和?任務執行。
第一步:CompactStrategy
?策略選擇
CompactStrategy
?接口負責從當前所有?LevelSortedRun
(帶有層級信息的?SortedRun
)中挑選出需要被壓縮的?CompactUnit
。
// ... existing code ...
public interface CompactStrategy {/*** Pick compaction unit from runs.** <ul>* <li>compaction is runs-based, not file-based.* <li>level 0 is special, one run per file; all other levels are one run per level.* <li>compaction is sequential from small level to large level.* </ul>*/Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);/** Pick a compaction unit consisting of all existing files. */
// ... existing code ...
例如,UniversalCompaction
?策略會檢查是否有太多?SortedRun
,如果超過閾值,就會選擇一些相鄰的?SortedRun
?合并。FullCompaction
?策略則會選擇所有的文件進行合并。
第二步:MergeTreeCompactTask
?任務執行
一旦?CompactStrategy
?挑選出了一個?CompactUnit
,就會創建一個?MergeTreeCompactTask
?來執行具體的壓縮邏輯。在你正在查看的?MergeTreeCompactTask.java
?文件中,doCompact()
?方法清晰地展示了如何處理這個?CompactUnit
?里的文件。
- 分區(Partitioning):任務首先會根據文件的 key 范圍將?
CompactUnit
?中的文件分成若干個互不重疊或連續重疊的組(List<List<SortedRun>> partitioned
)。 - 分類處理:
- 對于有多個?
SortedRun
?重疊的組,它們是必須被重寫(rewrite)合并的,因此被加入?candidate
?列表。 - 對于沒有重疊的組(只有一個?
SortedRun
),會進一步判斷:- 如果文件很小(小于?
minFileSize
),它也會被加入?candidate
?列表,以便和其它小文件一起合并,避免產生過多小文件。 - 如果文件很大,通常會直接“升級”(upgrade),即只修改文件的元數據,將其層級提升到?
outputLevel
,而無需重寫文件內容,這樣效率更高。但如果這個大文件包含需要過期的數據,或者需要被合并到最高層級但自身帶有刪除記錄,它還是會被強制重寫。
- 如果文件很小(小于?
- 對于有多個?
// ... existing code ...@Overrideprotected CompactResult doCompact() throws Exception {List<List<SortedRun>> candidate = new ArrayList<>();CompactResult result = new CompactResult();// Checking the order and compacting adjacent and contiguous files// Note: can't skip an intermediate file to compact, this will destroy the overall// orderlinessfor (List<SortedRun> section : partitioned) {if (section.size() > 1) {candidate.add(section);} else {SortedRun run = section.get(0);// No overlapping:// We can just upgrade the large file and just change the level instead of// rewriting it// But for small files, we will try to compact itfor (DataFileMeta file : run.files()) {if (file.fileSize() < minFileSize) {// Smaller files are rewritten along with the previous filescandidate.add(singletonList(SortedRun.fromSingle(file)));} else {// Large file appear, rewrite previous and upgrade itrewrite(candidate, result);upgrade(file, result);}}}}rewrite(candidate, result);result.setDeletionFile(compactDfSupplier.get());return result;}
// ... existing code ...
總結
- 壓縮對象:一個?
CompactUnit
?實例,它內部封裝了一組待壓縮的文件列表?(List<DataFileMeta>
) 和目標層級。 - 與?
SortedRun
?的關系:SortedRun
?是 LSM-Tree 的邏輯層,是壓縮策略(CompactStrategy
)制定計劃的輸入和依據。策略根據?SortedRun
?的情況來決定挑選哪些文件組成?CompactUnit
。 - 定位方式:通過?
CompactStrategy.pick()
?方法,根據預設的壓縮策略(如 Universal, Full 等)分析所有?SortedRun
,挑選出最需要被壓縮的文件,打包成?CompactUnit
,然后交由?MergeTreeCompactTask
?執行。
level層次數量的權衡
這是一個關于性能權衡(Trade-off)的經典問題,主要涉及?寫放大(Write Amplification)?和?讀放大(Read Amplification)。
降低寫放大(The Main Benefit)是分更多層級的最主要原因。
場景A:只有兩層(L0 和 L-max)?假設我們只有 L0 和 L1(作為最高層)。當 L0 的文件需要合并時,它們需要和 L1 中所有的數據進行合并,然后生成全新的 L1。如果 L1 非常大(比如幾百GB),那么即使 L0 只有很小的一點新數據(比如幾百MB),也需要重寫整個 L1。這個過程消耗的 I/O 非常巨大,這就是“寫放大”——為了寫入少量邏輯數據,卻需要進行大量的物理磁盤寫入。
場景B:有多層(L0, L1, L2, ...)?在這種設計下,合并是逐步進行的。L0 的文件合并成 L1 的文件;當 L1 積累到一定程度,再和 L2 合并,以此類推。每次合并操作所涉及的數據量都相對較小。例如,將幾個100MB的 L0 文件合并成一個 L1 文件,遠比將它們與一個100GB的 L-max 文件合并要快得多。這大大降低了單次合并的成本,使得寫入性能更平滑、更可預測。
平衡讀寫性能
更多層級:
- 優點:寫放大低,寫入平穩。
- 缺點:讀放大高。因為一條數據可能存在于任何一層,查詢時需要從 L0 到 L-max 逐層查找,直到找到最新的版本。層級越多,需要檢查的地方就越多。
更少層級:
- 優點:讀放大低。查詢時只需要檢查很少的幾個層級。
- 缺點:寫放大高。合并成本巨大,可能導致寫入性能的劇烈抖動。
Paimon 采用的 Universal Compaction 策略,以及其多層級的結構,正是在這兩種放大效應之間做權衡,目標是提供一個整體表現良好的方案,尤其是在高頻寫入的場景下,通過平滑合并操作來保證寫入的穩定性。
總而言之,分更多層次的核心好處是顯著降低寫放大,以平滑的、低成本的增量合并,替代高成本的全量合并,從而獲得更穩定、高效的寫入性能。這是以犧牲一部分讀取性能(增加讀放大)為代價的,但對于許多數據倉庫和數據湖場景來說,這是一個非常值得的權衡。
AbstractCompactRewriter
AbstractCompactRewriter
?是 Apache Paimon 項目中用于合并樹(Merge-Tree)壓縮(Compaction)操作的一個核心抽象基類。它定義了壓縮重寫器(Compact Rewriter)的通用行為和輔助功能,具體的重寫邏輯則由其子類實現。
該類位于?org.apache.paimon.mergetree.compact
?包下,從其命名和?abstract
?關鍵字可以看出,它是一個抽象類,不能被直接實例化。它實現了?CompactRewriter
?接口。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;/** Common implementation of {@link CompactRewriter}. */
public abstract class AbstractCompactRewriter implements CompactRewriter {@Overridepublic CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {return new CompactResult(file, file.upgrade(outputLevel));}protected static List<DataFileMeta> extractFilesFromSections(List<List<SortedRun>> sections) {return sections.stream().flatMap(Collection::stream).map(SortedRun::files).flatMap(Collection::stream).collect(Collectors.toList());}@Overridepublic void close() throws IOException {}
}
它實現了?CompactRewriter
?接口,我們先來看一下這個接口的定義:
// ... existing code ...
import java.io.Closeable;
import java.util.List;public interface CompactRewriter extends Closeable {/*** Rewrite sections to new level.* ...*/CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception;/*** Upgrade file to new level, usually file data is not rewritten, only the metadata is updated.* ...*/CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
}
CompactRewriter
?接口定義了兩個核心方法?rewrite
?和?upgrade
,并且繼承了?Closeable
?接口,意味著實現類可能持有需要關閉的資源。
AbstractCompactRewriter
?提供了部分方法的默認實現,并留下一個抽象方法給子類。
upgrade(int outputLevel, DataFileMeta file)
這個方法提供了?upgrade
?操作的默認實現。在 Paimon 的合并樹中,upgrade
?通常指將一個文件從較低的 level "提升" 到較高的 level,這個過程大多數情況下不需要重寫文件內容,只需要更新文件的元數據(比如 level 信息)。 這里的實現正是如此:
- 它接收一個?
DataFileMeta
?對象(代表一個數據文件)和一個目標?outputLevel
。 - 它調用?
file.upgrade(outputLevel)
?創建一個新的?DataFileMeta
?對象,新對象包含了更新后的 level 信息。 - 最后,它將原始文件和升級后的文件包裝成一個?
CompactResult
?對象返回。CompactResult
?用來封裝壓縮的結果,包含壓縮前的文件列表和壓縮后的文件列表。
extractFilesFromSections(List<List<SortedRun>> sections)
這是一個?protected static
?的輔助方法,不屬于接口方法。它的作用是從一個嵌套的?List<List<SortedRun>>
?結構中提取出所有底層的?DataFileMeta
?文件。
sections
?代表了待壓縮的多個數據段。- 每個?
section
?是一個?List<SortedRun>
。 - 每個?
SortedRun
?包含一個或多個數據文件 (DataFileMeta
)。 這個方法通過 Java Stream API 將這個三層嵌套的結構展平,最終返回一個包含所有待壓縮文件的?List<DataFileMeta>
。這在?rewrite
?的實現中非常有用,可以方便地獲取所有輸入文件。
close()
該方法提供了?Closeable
?接口的默認空實現。這意味著如果子類沒有需要釋放的資源(如文件句柄、網絡連接等),就無需重寫此方法。如果子類持有了需要關閉的資源,則必須重寫此方法以確保資源被正確釋放。
rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)
AbstractCompactRewriter
?沒有實現?CompactRewriter
?接口中的?rewrite
?方法。這正是它被聲明為?abstract
?的原因。?rewrite
?方法是壓縮操作的核心,它定義了如何讀取多個輸入文件(sections
),通過合并、去重等邏輯,然后寫入到一個或多個新的輸出文件(在?outputLevel
)。這個過程是復雜且多變的,取決于具體的合并策略(例如,是否保留 changelog、使用哪種合并引擎等)。 因此,AbstractCompactRewriter
?將這個最核心、最易變的邏輯交由具體的子類去實現。
實現與應用
在 Paimon 項目中,有多個?AbstractCompactRewriter
?的子類,它們根據不同的場景提供了具體的?rewrite
?實現。例如:
MergeTreeCompactRewriter
: 提供了通用的合并樹壓縮邏輯。ChangelogMergeTreeRewriter
: 在?MergeTreeCompactRewriter
?基礎上,增加了生成 changelog 的能力。FullChangelogMergeTreeCompactRewriter
: 一種生成完整 changelog 的策略。LookupMergeTreeCompactRewriter
: 適用于?lookup
?模式的合并策略。
這些子類會重寫?rewrite
?方法,并在內部使用?extractFilesFromSections
?輔助方法來獲取輸入文件列表。
例如,在一個測試類?MergeTreeTestBase
?中,就有一個內部類?TestRewriter
?繼承了?AbstractCompactRewriter
?并實現了?rewrite
?方法,清晰地展示了其用法:
MergeTreeTestBase.java
Apply
// ... existing code ...private class TestRewriter extends AbstractCompactRewriter {@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception {// 1. 創建用于寫入新文件的 writerRollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);// 2. 創建用于讀取和合并舊文件的 readerRecordReader<KeyValue> reader =MergeTreeReaders.readerForMergeTree(sections,// ... reader and merge function details ...);if (dropDelete) {reader = new DropDeleteReader(reader);}// 3. 執行讀寫操作writer.write(new RecordReaderIterator<>(reader));writer.close();// 4. 使用父類的輔助方法獲取輸入文件,并返回結果return new CompactResult(extractFilesFromSections(sections), writer.result());}}
// ... existing code ...
AbstractCompactRewriter
?是一個典型的模板方法模式的應用。它定義了壓縮重寫算法的骨架:
- 提供了不變的部分:
upgrade
?方法的通用實現和?close
?的空實現。 - 提供了可復用的輔助功能:
extractFilesFromSections
?靜態方法。 - 定義了易變的部分為抽象方法:將核心的?
rewrite
?邏輯延遲到子類中實現。
這種設計使得 Paimon 可以靈活地擴展和實現不同場景下的文件壓縮策略,同時保證了代碼的復用性和結構的清晰性。
MergeTreeCompactRewriter
MergeTreeCompactRewriter
?是?AbstractCompactRewriter
?的一個核心具體實現。在 Paimon 的合并樹(Merge-Tree)結構中,當需要對數據文件進行合并(Compaction)時,這個類提供了默認的、最基礎的重寫(rewrite)邏輯。
// ... existing code ...
import java.util.Comparator;
import java.util.List;/** Default {@link CompactRewriter} for merge trees. */
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
// ... existing code ...
從定義可以看出:
- 它是一個公開類,位于?
org.apache.paimon.mergetree.compact
?包下。 - 它繼承自我們之前分析過的?
AbstractCompactRewriter
。這意味著它自動獲得了?upgrade
?方法的默認實現(僅更新元數據)和?extractFilesFromSections
?輔助方法。 - 它的核心職責是實現?
AbstractCompactRewriter
?中定義的抽象方法?rewrite
。
核心成員變量與構造函數
MergeTreeCompactRewriter
?的行為由其成員變量(在構造時注入)決定,這體現了依賴注入的設計思想,使得類更加靈活和可測試。
// ... existing code ...
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {protected final FileReaderFactory<KeyValue> readerFactory;protected final KeyValueFileWriterFactory writerFactory;protected final Comparator<InternalRow> keyComparator;@Nullable protected final FieldsComparator userDefinedSeqComparator;protected final MergeFunctionFactory<KeyValue> mfFactory;protected final MergeSorter mergeSorter;public MergeTreeCompactRewriter(FileReaderFactory<KeyValue> readerFactory,KeyValueFileWriterFactory writerFactory,Comparator<InternalRow> keyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionFactory<KeyValue> mfFactory,MergeSorter mergeSorter) {this.readerFactory = readerFactory;this.writerFactory = writerFactory;this.keyComparator = keyComparator;this.userDefinedSeqComparator = userDefinedSeqComparator;this.mfFactory = mfFactory;this.mergeSorter = mergeSorter;}
// ... existing code ...
readerFactory
: 文件讀取器工廠,用于創建讀取輸入數據文件(SortedRun
中的文件)的?RecordReader
。writerFactory
: 文件寫入器工廠,用于創建?RollingFileWriter
,將合并后的數據寫入新的輸出文件。keyComparator
: 鍵比較器,用于在合并過程中對?KeyValue
?的鍵(key)進行排序和比較。userDefinedSeqComparator
: 用戶定義的序列號比較器(可選)。Paimon 支持用戶自定義字段作為合并時的排序依據,這個比較器就用于此目的。mfFactory
: 合并函數工廠(MergeFunctionFactory
),用于創建?MergeFunction
。MergeFunction
?定義了當遇到相同鍵(key)的多條記錄時,應如何合并它們。例如,可以是去重(Deduplicate)、部分更新(Partial Update)或聚合(Aggregation)等。mergeSorter
: 合并排序器,當待合并的數據量過大,無法完全在內存中進行時,MergeSorter
?會利用外部排序(External Sort)來處理。
rewrite
?方法
這是?CompactRewriter
?接口的核心方法。MergeTreeCompactRewriter
?的實現非常直接,它將調用轉發給了?rewriteCompaction
?方法。這種設計模式允許子類在不改變公共API的情況下,更容易地重寫或擴展核心壓縮邏輯。
// ... existing code ...@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {return rewriteCompaction(outputLevel, dropDelete, sections);}
// ... existing code ...
rewriteCompaction
?方法
這是實際執行壓縮重寫邏輯的地方,是整個類的核心。
// ... existing code ...protected CompactResult rewriteCompaction(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {// 1. 創建一個滾動寫入器,用于寫入壓縮后的新文件RollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);RecordReader<KeyValue> reader = null;Exception collectedExceptions = null;try {// 2. 創建一個合并讀取器,它會讀取所有輸入文件,并應用合并邏輯reader =readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));// 3. 如果需要,包裝一個 DropDeleteReader 來過濾掉刪除標記的記錄if (dropDelete) {reader = new DropDeleteReader(reader);}// 4. 執行讀寫:從 reader 讀取合并后的數據,寫入 writerwriter.write(new RecordReaderIterator<>(reader));} catch (Exception e) {collectedExceptions = e;} finally {// 5. 確保資源被關閉try {IOUtils.closeAll(reader, writer);} catch (Exception e) {collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);}}if (null != collectedExceptions) {// 6. 如果發生異常,終止寫入并拋出異常writer.abort();throw collectedExceptions;}// 7. 構造并返回 CompactResultList<DataFileMeta> before = extractFilesFromSections(sections);notifyRewriteCompactBefore(before);return new CompactResult(before, writer.result());}
// ... existing code ...
其執行流程可以分解為:
- 創建寫入器:通過?
writerFactory
?創建一個?RollingFileWriter
,準備將合并后的數據寫入到指定?outputLevel
?的新文件中。 - 創建讀取器:調用?
readerForMergeTree
?方法創建一個?RecordReader
。這個 reader 是一個復合的 reader,它能夠同時讀取多個?SortedRun
?中的所有文件,并使用?MergeFunction
?在內存中或通過外部排序進行多路歸并。 - 處理刪除標記:如果?
dropDelete
?參數為?true
(通常在對最高 level 進行 full compaction 時),則用?DropDeleteReader
?包裝原始 reader。這個裝飾器會過濾掉類型為?DELETE
?的記錄。 - 執行重寫:
writer.write(new RecordReaderIterator<>(reader))
?是核心步驟。它驅動?reader
?讀取、合并數據,并將結果逐條寫入?writer
。 - 資源管理與異常處理:使用?
try-finally
?確保?reader
?和?writer
?無論成功還是失敗都能被關閉。如果過程中發生異常,會收集起來。 - 失敗回滾:如果收集到異常,調用?
writer.abort()
?來刪除可能已產生的臨時文件,然后將異常拋出。 - 返回結果:如果成功,它會調用父類的?
extractFilesFromSections
?獲取所有輸入文件,并結合?writer.result()
(所有新生成的輸出文件),封裝成一個?CompactResult
?對象返回。在返回前,會調用一個空的?notifyRewriteCompactBefore
?方法,這是一個為子類提供的擴展點。
readerForMergeTree
?和?notifyRewriteCompactBefore
readerForMergeTree
: 這是一個輔助方法,它封裝了創建合并樹讀取器的復雜性,將所有需要的組件(readerFactory
,?keyComparator
?等)傳遞給?MergeTreeReaders.readerForMergeTree
?來構造最終的?RecordReader
。notifyRewriteCompactBefore
: 這是一個空的?protected
?方法,充當一個鉤子(Hook)。子類可以重寫此方法來在壓縮完成、返回結果之前執行一些額外的邏輯。例如,LookupMergeTreeCompactRewriter
?就重寫了此方法來清理與被壓縮文件相關的刪除向量(Deletion Vectors)。
在系統中的位置和擴展
MergeTreeCompactRewriter
?是 Paimon 壓縮機制的基石。MergeTreeCompactManager
?負責制定壓縮計劃,當它決定執行一個壓縮任務(MergeTreeCompactTask
)時,就會使用一個?CompactRewriter
?的實例來執行實際的文件讀寫和合并。- 這個類是可擴展的。Paimon 中更復雜的 Rewriter,如?
ChangelogMergeTreeRewriter
(用于生成 changelog)、LookupMergeTreeCompactRewriter
(用于 lookup 表)和?FullChangelogMergeTreeCompactRewriter
,都直接或間接地繼承自?MergeTreeCompactRewriter
,并在其基礎上增加或修改功能。
總結
MergeTreeCompactRewriter
?是 Paimon 合并樹 compaction 邏輯的一個標準、通用的實現。它清晰地展示了如何將多個已排序的文件(SortedRun
)通過多路歸并、應用合并函數,最終重寫為新的、更緊湊的文件。其設計利用了依賴注入和模板方法/鉤子模式,具有良好的靈活性和擴展性,為更高級的壓縮策略提供了堅實的基礎。
為了更清晰地理解,梳理從頂層到底層的核心調用鏈:
MergeTreeCompactRewriter.readerForMergeTree
MergeTreeReaders.readerForMergeTree
- 調用?->?
ConcatRecordReader.create
?將多個?section
?的 reader 串聯起來。 - 為每個 section 調用?->?
MergeTreeReaders.readerForSection
- 調用?->?
MergeTreeReaders.readerForSection
- 調用?->?
MergeSorter.mergeSort
?對一個?section
?內的所有?SortedRun
?進行合并排序。 - 為每個 SortedRun 調用?->?
MergeTreeReaders.readerForRun
- 調用?->?
MergeTreeReaders.readerForRun
- 調用?->?
ConcatRecordReader.create
?將一個?SortedRun
?內的所有文件串聯起來。
- 調用?->?
MergeSorter.mergeSort
- 調用?->?
SortMergeReader.createSortMergeReader
?(當數據無需溢出到磁盤時)
- 調用?->?
SortMergeReader.createSortMergeReader
- 創建?->?
SortMergeReaderWithMinHeap
?或?SortMergeReaderWithLoserTree
?(最終執行多路歸并的 Reader)
- 創建?->?
SortMergeReaderWithMinHeap
?和?SortMergeReaderWithLoserTree
?是兩種經典的多路歸并排序算法的實現。它們接收多個已經排好序的?RecordReader
?作為輸入,通過內部的數據結構(最小堆或敗者樹)高效地從所有輸入中找出當前最小的記錄。當遇到主鍵相同的記錄時,它們會調用?MergeFunctionWrapper
?來執行用戶定義的合并邏輯(如去重、更新等)。
IntervalPartition
IntervalPartition
?是 Paimon 在執行 Merge-Tree 壓縮(Compaction)時一個非常核心的工具類。它的主要目標是將一組給定的數據文件(DataFileMeta
)進行高效地分組,以便后續進行歸并排序。這個分組算法非常關鍵,因為它直接影響了壓縮任務的并行度和效率。
從類的注釋中我們可以看到,它的目的是:
Algorithm to partition several data files into the minimum number of {@link SortedRun}s.
即:將多個數據文件劃分為最少數目的?SortedRun
?的算法。
這里的?SortedRun
?代表一組按主鍵有序且鍵范圍互不重疊的文件集合。將文件劃分為最少的?SortedRun
?意味著我們可以用最少的歸并路數來完成排序,從而提高效率。
為了實現這個目標,IntervalPartition
?采用了兩層劃分的策略:
第一層:Section(分段)
- 它首先將所有輸入文件按照主鍵范圍(Key Interval)劃分為若干個?Section。
- 不同 Section 之間的主鍵范圍是完全不重疊的。
- 這樣做的好處是,不同 Section 之間的數據沒有交集,因此可以獨立、并行地進行處理,而不會相互影響。這極大地提高了壓縮的并行能力。
第二層:SortedRun(有序運行)
- 在每個 Section 內部,文件的主鍵范圍是可能相互重疊的。
- 算法的目標是在這個 Section 內,將這些可能重疊的文件,組合成最少數目的?
SortedRun
。 - 每個?
SortedRun
?內部的文件,按主鍵有序排列,并且它們的主鍵范圍不會重疊。
最終,IntervalPartition
?的輸出是一個二維列表?List<List<SortedRun>>
,外層列表代表 Section,內層列表代表該 Section 內劃分出的所有?SortedRun
。
構造函數與初始化
// ... existing code ...public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow> keyComparator) {this.files = new ArrayList<>(inputFiles);this.files.sort((o1, o2) -> {int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());return leftResult == 0? keyComparator.compare(o1.maxKey(), o2.maxKey()): leftResult;});this.keyComparator = keyComparator;}
// ... existing code ...
構造函數做了非常重要的一步預處理: 對所有輸入的?DataFileMeta
?文件,首先按照它們的?minKey
(最小主鍵)進行升序排序。如果?minKey
?相同,則再按照?maxKey
(最大主鍵)進行升序排序。
這個排序是后續所有劃分算法的基礎,保證了文件是按照主鍵范圍的起始位置被依次處理的。
partition()
?方法:第一層劃分(切分 Section)
這是該類的入口方法,負責將已排序的文件切分成多個 Section。
// ... existing code ...public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;}
// ... existing code ...
其邏輯如下:
- 維護一個當前?
section
?的文件列表和一個當前?section
?的右邊界?bound
(即?section
?中所有文件的?maxKey
?的最大值)。 - 遍歷所有(已按?
minKey
?排序的)文件:- 將當前文件?
meta
?加入?section
。 - 更新?
bound
?為當前?section
?中所有文件?maxKey
?的最大值。 - 核心判斷:在添加下一個文件之前,檢查它的?
minKey
?是否大于當前?section
?的右邊界?bound
。 - 如果?
meta.minKey() > bound
,說明這個新文件與當前?section
?內的所有文件都沒有主鍵范圍的重疊。這意味著一個 Section 到此結束了。 - 此時,就將當前?
section
?里的文件列表交給?partition(section)
?方法(第二層劃分)處理,得到?List<SortedRun>
,然后加入最終結果?result
。 - 清空?
section
?和?bound
,開始一個新的 Section。
- 將當前文件?
- 循環結束后,處理最后一個?
section
。
通過這種方式,它有效地將所有文件切分成了若干個主鍵范圍互不重疊的 Section。
partition(List<DataFileMeta> metas)
?方法:第二層劃分(貪心算法生成 SortedRun)
這個私有方法是算法的精髓所在。它接收一個 Section 內的文件列表(這些文件的主鍵范圍可能重疊),目標是將它們劃分為最少的?SortedRun
。
這里使用了一個經典的貪心算法,并借助優先隊列(最小堆)?來實現。
// ... existing code ...private List<SortedRun> partition(List<DataFileMeta> metas) {PriorityQueue<List<DataFileMeta>> queue =new PriorityQueue<>((o1, o2) ->// sort by max key of the last data filekeyComparator.compare(o1.get(o1.size() - 1).maxKey(),o2.get(o2.size() - 1).maxKey()));// create the initial partitionList<DataFileMeta> firstRun = new ArrayList<>();firstRun.add(metas.get(0));queue.add(firstRun);for (int i = 1; i < metas.size(); i++) {DataFileMeta meta = metas.get(i);// any file list whose max key < meta.minKey() is sufficient,// for convenience we pick the smallestList<DataFileMeta> top = queue.poll();if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {// append current file to an existing partitiontop.add(meta);} else {// create a new partitionList<DataFileMeta> newRun = new ArrayList<>();newRun.add(meta);queue.add(newRun);}queue.add(top);}// order between partitions does not matterreturn queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());}
}
邏輯分解如下:
- 創建一個優先隊列?
queue
。隊列中的元素是?List<DataFileMeta>
,代表一個正在構建中的?SortedRun
。 - 這個優先隊列的排序規則是:按照每個?
SortedRun
?中最后一個文件的?maxKey
?進行升序排序。這意味著,隊首的?SortedRun
?是當前所有?SortedRun
?中,右邊界最小的那個。 - 將 Section 中的第一個文件放入一個新的?
SortedRun
,并加入隊列。 - 遍歷 Section 中剩余的文件?
meta
:- 從優先隊列中取出隊首元素?
top
(即右邊界最小的那個?SortedRun
)。 - 核心判斷:比較當前文件?
meta
?的?minKey
?和?top
?的?maxKey
。 - 如果?
meta.minKey()
?大于?top.get(top.size() - 1).maxKey()
,說明?meta
?文件可以安全地追加到?top
?這個?SortedRun
?的末尾,而不會破壞其內部的有序性(因為?meta
?的范圍在?top
?之后)。于是,將?meta
?添加到?top
?中。 - 如果?
meta.minKey()
?不大于?top
?的?maxKey
,說明?meta
?與?top
?這個?SortedRun
?的范圍有重疊,不能追加。此時,必須為?meta
?創建一個新的?SortedRun
,并將這個新的?SortedRun
?也加入到優先隊列中。 - 最后,將被修改過(或者未修改)的?
top
?重新放回優先隊列。
- 從優先隊列中取出隊首元素?
- 遍歷結束后,優先隊列?
queue
?中剩下的所有?List<DataFileMeta>
?就是劃分好的、數量最少的?SortedRun
?集合。
這個貪心策略的正確性在于,對于每一個新來的文件,我們總是嘗試將它追加到最早可以結束的那個?SortedRun
?后面。這樣可以最大限度地復用現有的?SortedRun
,從而保證最終?SortedRun
?的數量最少。這在算法上被稱為“區間劃分問題”的一種經典解法。
總結
IntervalPartition
?通過一個兩階段的劃分過程,巧妙地解決了如何高效組織待壓縮文件的問題:
- 宏觀上,通過?
partition()
?方法按主鍵范圍切分出互不重疊的?Section
,為并行處理創造了條件。 - 微觀上,在每個?
Section
?內部,通過?partition(List<DataFileMeta> metas)
?方法中的貪心算法,將可能重疊的文件高效地組織成最少數目的?SortedRun
,為后續的歸并排序(Merge-Sort)提供了最優的輸入,減少了歸并的復雜度。
這個類的設計體現了在數據密集型系統中,通過精巧的算法設計來優化核心流程(如Compaction)的典型思路。
MergeTreeCompactManager
MergeTreeCompactManager
?是 Paimon 中?merge-tree
?核心寫流程的心臟,它負責管理和調度數據文件的合并(Compaction)任務。下面我將從幾個關鍵部分來解析這個類。
MergeTreeCompactManager
?繼承自?CompactFutureManager
,它的核心職責是:
- 管理數據文件:通過內部的?
Levels
?對象,維護所有數據文件(DataFileMeta
)在不同層級(Level)的分布情況。 - 觸發合并任務:根據預設的合并策略(
CompactStrategy
),決定何時以及哪些文件需要進行合并。 - 提交和管理合并任務:將選出的文件打包成一個合并單元(
CompactUnit
),并提交給一個異步執行的?MergeTreeCompactTask
?任務。 - 處理合并結果:獲取異步任務的執行結果(
CompactResult
),并用它來更新?Levels
?中文件的狀態。 - 控制寫入流速:通過判斷當前文件堆積情況,決定是否需要阻塞上游的寫入(
Write
)操作,防止因合并速度跟不上寫入速度而導致系統不穩定。
核心成員變量 (Key Fields)
我們來看一下這個類中最重要的幾個成員變量,它們定義了?MergeTreeCompactManager
?的行為:
// ... existing code ...
public class MergeTreeCompactManager extends CompactFutureManager {private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class);private final ExecutorService executor;private final Levels levels;private final CompactStrategy strategy;private final Comparator<InternalRow> keyComparator;private final long compactionFileSize;private final int numSortedRunStopTrigger;private final CompactRewriter rewriter;@Nullable private final CompactionMetrics.Reporter metricsReporter;@Nullable private final DeletionVectorsMaintainer dvMaintainer;private final boolean lazyGenDeletionFile;private final boolean needLookup;@Nullable private final RecordLevelExpire recordLevelExpire;
// ... existing code ...
executor
: 一個?ExecutorService
?線程池,用于異步執行合并任務。levels
:?Levels
?對象,是 LSM-Tree(Log-Structured Merge-Tree)分層結構的核心體現。它管理著所有的數據文件,并根據 Level 組織它們。strategy
:?CompactStrategy
,合并策略。它定義了如何從?Levels
?中挑選文件進行合并。Paimon 提供了如?UniversalCompaction
?和?LevelCompaction
?等策略。keyComparator
: 主鍵的比較器,用于在合并過程中對數據進行排序。compactionFileSize
: 合并后生成的目標文件大小。numSortedRunStopTrigger
: 一個非常重要的閾值。當?Levels
?中的有序文件片段(Sorted Run)數量超過這個值時,會阻塞寫入操作,等待合并完成。這是控制寫入和合并速度平衡的關鍵。rewriter
:?CompactRewriter
,合并重寫器。它負責讀取待合并的舊文件,使用?MergeFunction
?對數據進行合并,然后寫入新文件。這是實際執行數據合并邏輯的組件。needLookup
: 一個布爾值,通常與?changelog-producer
?=?lookup
?配置相關。當為?true
?時,表示在合并時需要通過 lookup 方式生成 changelog。
triggerCompaction(boolean fullCompaction)
: 觸發合并
這是發起合并的入口。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 處理強制全量合并 ...optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {if (taskFuture != null) {return;}// ...optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}optionalUnit.ifPresent(unit -> {// ...boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ...submitCompaction(unit, dropDelete);});}
// ... existing code ...
fullCompaction
?參數: 如果為?true
,會觸發一次全量合并,通常是將所有文件合并成一個或少數幾個大文件。這是一種比較重的操作,一般由用戶手動觸發。- 普通合并: 如果?
fullCompaction
?為?false
,則會調用?strategy.pick(...)
?方法,讓合并策略根據當前的?Levels
?狀態來決定是否需要合并以及合并哪些文件。 - 提交任務: 如果策略選出了需要合并的文件(
CompactUnit
),就會調用?submitCompaction
?方法將任務提交到線程池。
submitCompaction(CompactUnit unit, boolean dropDelete)
: 提交合并任務
這個方法負責創建并提交一個真正的合并任務。
// ... existing code ...private void submitCompaction(CompactUnit unit, boolean dropDelete) {// ...MergeTreeCompactTask task =new MergeTreeCompactTask(keyComparator,compactionFileSize,rewriter,unit,dropDelete,levels.maxLevel(),metricsReporter,compactDfSupplier,recordLevelExpire);// ...taskFuture = executor.submit(task);// ...}
// ... existing code ...
它將所有需要的組件(如?rewriter
、keyComparator
?等)和待合并的文件(unit
)打包成一個?MergeTreeCompactTask
?對象,然后通過?executor.submit(task)
?提交給線程池異步執行。
getCompactionResult(boolean blocking)
: 獲取合并結果
當外部(通常是?MergeTreeWriter
)需要獲取合并結果時,會調用此方法。
// ... existing code .../** Finish current task, and update result files to {@link Levels}. */@Overridepublic Optional<CompactResult> getCompactionResult(boolean blocking)throws ExecutionException, InterruptedException {Optional<CompactResult> result = innerGetCompactionResult(blocking);result.ifPresent(r -> {// ...levels.update(r.before(), r.after());MetricUtils.safeCall(this::reportMetrics, LOG);// ...});return result;}
// ... existing code ...
- 它會從?
taskFuture
?中獲取任務結果?CompactResult
。CompactResult
?中包含了合并前的文件列表(before
)和合并后生成的新文件列表(after
)。 - 最關鍵的一步是?
levels.update(r.before(), r.after())
,它用新生成的文件替換掉了舊的、已被合并的文件,從而更新了整個?Levels
?的狀態。
shouldWaitForLatestCompaction()
?和?compactNotCompleted()
: 控制寫入流速
這兩個方法是 Paimon 實現反壓(Back-Pressure)機制的關鍵。
// ... existing code ...@Overridepublic boolean shouldWaitForLatestCompaction() {return levels.numberOfSortedRuns() > numSortedRunStopTrigger;}// ... existing code ...@Overridepublic boolean compactNotCompleted() {// If it is a lookup compaction, we should ensure that all level 0 files are consumed, so// here we need to make the outside think that we still need to do unfinished compact// workingreturn super.compactNotCompleted() || (needLookup && !levels().level0().isEmpty());}
// ... existing code ...
shouldWaitForLatestCompaction()
: 當?Levels
?中?SortedRun
?的數量超過了?numSortedRunStopTrigger
?閾值時,返回?true
。MergeTreeWriter
?會根據這個返回值決定是否要阻塞寫入,等待合并完成,以釋放一些?SortedRun
。compactNotCompleted()
: 判斷是否還有未完成的合并工作。特別地,如果配置了?needLookup
,即使沒有正在運行的合并任務,只要 Level 0 還存在文件,它也會返回?true
,以確保 Level 0 的文件能被盡快合并掉,這對于?lookup
?生成 changelog 的正確性至關重要。
總結
MergeTreeCompactManager
?是一個典型的生產者-消費者模型實現。MergeTreeWriter
?是生產者,不斷產生新的 L0 層文件;MergeTreeCompactManager
?內部的?MergeTreeCompactTask
?是消費者,不斷地將多層級的文件合并成更少、更大的文件。
它通過?Levels
?來感知系統狀態,通過?CompactStrategy
?來制定決策,通過?CompactRewriter
?和?MergeTreeCompactTask
?來執行任務,并通過?numSortedRunStopTrigger
?等機制來協調與上游生產者的速度,最終保證了整個?merge-tree
?寫入模型的穩定和高效。
Paimon 進行 Compact 是不是就是保留最新的 Key?
Paimon 的 Compact 行為取決于在表上配置的?merge-engine
(合并引擎)。保留最新的 Key 只是其中一種最常見的默認行為。
Paimon 支持多種合并引擎,來滿足不同的業務場景:
deduplicate
?(默認值): 這是你提到的“保留最新的 Key”的行為。當多條記錄有相同的主鍵時,在合并(Compaction)過程中,Paimon 只會保留序列號(sequence number)最大的一條記錄,即最新的記錄。partial-update
: 部分更新引擎。它允許你只更新一行的某些字段。合并時,它會將同一主鍵的多條記錄的字段進行合并,而不是簡單地保留最新的一整行。對于未更新的字段,會保留其舊值。aggregation
: 聚合引擎。它可以在數據寫入過程中,根據預設的聚合函數(如?SUM
,?MAX
,?MIN
,?COUNT
?等)對同一主鍵的記錄進行實時聚合。first-row
: 保留第一行的引擎。當有相同主鍵的多條數據時,它會保留遇到的第一條數據,后續的數據會被忽略。這在需要按主鍵去重的流式數據入湖場景中很有用。
這些不同的合并邏輯由不同的?MergeFunction
?實現類來處理,例如?DeduplicateMergeFunction
、PartialUpdateMergeFunction
?和?AggregateMergeFunction
。
// ... existing code ...
public class AggregateMergeFunction implements MergeFunction<KeyValue> {private final InternalRow.FieldGetter[] getters;private final FieldAggregator[] aggregators;
// ... existing code ...@Overridepublic void add(KeyValue kv) {
// ... existing code ...for (int i = 0; i < getters.length; i++) {aggregators[i].agg(row, getters[i].getFieldOrNull(value));}}@Overridepublic KeyValue getResult() {
// ... existing code ...}
// ... existing code ...
}
從上面的?AggregateMergeFunction
?可以看到,它的?add
?方法是在對字段進行聚合(agg
),而不是簡單地替換。
MergeFunction
?定義了具體的數據?合并邏輯(比如是去重、聚合還是部分更新),而?MergeFunctionWrapper
?則是在?MergeFunction
?的基礎上增加了一層?“包裝”,用于處理更復雜的場景,主要是為了?優化性能?和?生成 Changelog。
不同的?MergeFunctionWrapper
?實現是為了支持 Paimon 不同的 Changelog 生成策略 (changelog-producer
) 和合并場景:
ReducerMergeFunctionWrapper
?是一個基礎的、帶有優化的包裝器。從它的文檔和實現可以看出,當一個 Key 只有一條記錄需要合并時,它會直接返回這條記錄,避免了調用內部?MergeFunction
?的開銷。它主要用于不需要生成 Changelog 或采用簡單合并策略的場景,只返回合并后的最終結果?KeyValue
。/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.KeyValue;/*** Wrapper for {@link MergeFunction}s which works like a reducer.** <p>A reducer is a type of function. If there is only one input the result is equal to that input;* Otherwise the result is calculated by merging all the inputs in some way.** <p>This wrapper optimize the wrapped {@link MergeFunction}. If there is only one input, the input* will be stored and the inner merge function will not be called, thus saving some computing time.*/ public class ReducerMergeFunctionWrapper implements MergeFunctionWrapper<KeyValue> {private final MergeFunction<KeyValue> mergeFunction;private KeyValue initialKv;private boolean isInitialized;public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {this.mergeFunction = mergeFunction;}/** Resets the {@link MergeFunction} helper to its default state. */@Overridepublic void reset() {initialKv = null;mergeFunction.reset();isInitialized = false;}/** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */@Overridepublic void add(KeyValue kv) {if (initialKv == null) {initialKv = kv;} else {if (!isInitialized) {merge(initialKv);isInitialized = true;}merge(kv);}}private void merge(KeyValue kv) {mergeFunction.add(kv);}/** Get current value of the {@link MergeFunction} helper. */@Overridepublic KeyValue getResult() {return isInitialized ? mergeFunction.getResult() : initialKv;} }
FullChangelogMergeFunctionWrapper
: 用于?changelog-producer
?=?full-compaction
?模式。在這種模式下,Paimon 會在最高層(maxLevel)的文件中查找舊值,并與當前合并的結果進行比較,從而生成?INSERT
,?UPDATE_BEFORE
,?UPDATE_AFTER
,?DELETE
?這樣的完整 Changelog。這個 Wrapper 的職責就是協調這個過程。LookupChangelogMergeFunctionWrapper
: 用于?changelog-producer
?=?lookup
?模式。它會在合并前通過?lookup
?的方式去查找舊的記錄,從而生成 Changelog。相比?full-compaction
,它可能更高效,因為它不需要總是訪問最高層的文件。FirstRowMergeFunctionWrapper
: 這是一個專為?first-row
?合并引擎設計的包裝器,用于處理其獨特的 Changelog 生成邏輯。
總結一下:
MergeFunction
?決定了數據?如何合并(What)。MergeFunctionWrapper
?決定了合并過程?如何被執行和協調(How),特別是如何與不同的 Changelog 生成策略相結合,并進行性能優化。