LookupLevels
?
LookupLevels
?在 Paimon 中扮演著**“帶緩存的、基于 Key 的數據查找引擎”**的角色。它的核心使命是:當需要根據主鍵(Key)查找某條數據時,能夠高效地在 LSM-Tree 的多層(Levels)數據文件中定位到這條數據,并盡可能地利用本地緩存來避免昂貴的遠程 I/O。
LookupLevels
?封裝了對一個?Levels
?對象(代表了 LSM-Tree 的一個分區的所有數據文件分層信息)的查找操作。它本身不存儲數據文件的元數據,而是依賴傳入的?Levels
?對象。它的主要職責可以概括為:
- 提供統一的查找入口:通過?
lookup(InternalRow key, int startLevel)
?方法,屏蔽底層多層文件和緩存的復雜性。 - 實現惰性本地緩存:當第一次需要在一個遠程數據文件(SST 文件)中查找時,它會下載該文件,并在本地構建一個優化的、基于 Key 的查找索引文件(Lookup File)。
- 管理緩存生命周期:通過?
Caffeine
?緩存管理這些本地查找文件,并能在遠程文件被刪除時(如 Compaction 后),自動清理對應的本地緩存。 - 提供靈活的查找策略:通過?
ValueProcessor
?接口,支持不同場景的查找需求,例如:只需判斷 Key 是否存在、需要獲取完整的 Value、或需要獲取 Value 在文件中的物理位置等。
關鍵成員變量(構造函數解析)
LookupLevels
?的所有核心依賴都通過構造函數注入,這體現了良好的設計模式。
// ... existing code ...private final Levels levels;private final Comparator<InternalRow> keyComparator;private final RowCompactedSerializer keySerializer;private final ValueProcessor<T> valueProcessor;private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;private final Function<String, File> localFileFactory;private final LookupStoreFactory lookupStoreFactory;private final Function<Long, BloomFilter.Builder> bfGenerator;private final Cache<String, LookupFile> lookupFileCache;private final Set<String> ownCachedFiles;public LookupLevels(Levels levels,Comparator<InternalRow> keyComparator,RowType keyType,ValueProcessor<T> valueProcessor,IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,Function<String, File> localFileFactory,LookupStoreFactory lookupStoreFactory,Function<Long, BloomFilter.Builder> bfGenerator,Cache<String, LookupFile> lookupFileCache) {
// ... existing code ...
levels
: 持有 LSM-Tree 的文件分層元數據,是查找的基礎。keyComparator
?&?keySerializer
: 用于主鍵的比較和序列化。valueProcessor
:?策略模式的核心。它決定了從遠程文件讀取數據后,在本地查找文件中存儲什么,以及最終返回給調用者什么。fileReaderFactory
: 一個工廠,用于根據?DataFileMeta
?創建能讀取遠程數據文件(如 Parquet)的?RecordReader
。localFileFactory
: 一個工廠,用于在本地磁盤上創建臨時文件,作為查找文件的載體。lookupStoreFactory
: 一個工廠,用于創建真正的本地 KV 存儲。Paimon 默認使用基于哈希的實現 (HashLookupStoreFactory
)。bfGenerator
: 布隆過濾器(Bloom Filter)生成器,用于在創建本地查找文件時一并生成布隆過濾器,可以快速過濾掉不存在的 Key,避免磁盤 I/O。lookupFileCache
:?核心緩存,一個?Caffeine
?緩存實例,Key 是遠程數據文件的文件名,Value 是封裝了本地查找文件的?LookupFile
?對象。這個緩存可以在多個?LookupLevels
?實例間共享。ownCachedFiles
: 一個?HashSet
,用于追蹤由當前?LookupLevels
?實例創建并放入緩存的文件。這主要用于在當前實例關閉時,能準確地清理自己創建的緩存。
lookup(InternalRow key, int startLevel)
// ... existing code ...@Nullablepublic T lookup(InternalRow key, int startLevel) throws IOException {return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);}
// ... existing code ...
這個方法將實際的查找邏輯委托給了?LookupUtils
?工具類。LookupUtils.lookup
?的邏輯遵循 LSM-Tree 的基本原則:
- 從?
startLevel
?開始,逐層向上查找(Level 0, Level 1, Level 2...)。 - 因為低層(Level 號碼小)的數據比高層的數據更新,所以一旦在某一層找到了數據,就立刻返回,不再繼續查找更高層。
- 對于 Level 0,由于文件之間 key range 可能重疊,需要遍歷所有文件。
- 對于 Level 1 及以上,文件間的 key range 不重疊,因此可以通過二分查找快速定位到可能包含目標 key 的那個文件(這里的二分是根據DataFileMeta記錄的最大和最小key)。
最終,無論是哪一層,定位到具體的?DataFileMeta
?后,都會調用?LookupLevels
?自己的?lookup(InternalRow key, DataFileMeta file)
?方法。
lookup(InternalRow key, DataFileMeta file)
?
// ... existing code ...@Nullableprivate T lookup(InternalRow key, DataFileMeta file) throws IOException {// 1. 嘗試從緩存獲取 LookupFileLookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());boolean newCreatedLookupFile = false;if (lookupFile == null) {// 2. 緩存未命中,調用 createLookupFile 創建一個新的lookupFile = createLookupFile(file);newCreatedLookupFile = true;}byte[] valueBytes;try {byte[] keyBytes = keySerializer.serializeToBytes(key);// 3. 使用 LookupFile 在本地進行查找valueBytes = lookupFile.get(keyBytes);} finally {if (newCreatedLookupFile) {// 4. 如果是新創建的,放入緩存供后續使用lookupFileCache.put(file.fileName(), lookupFile);}}if (valueBytes == null) {return null;}// 5. 使用 ValueProcessor 將從本地文件讀出的字節數組,轉換成最終結果return valueProcessor.readFromDisk(key, lookupFile.remoteFile().level(), valueBytes, file.fileName());}
// ... existing code ...
這個過程清晰地展示了 “按需加載、惰性創建” 的緩存模式。而?createLookupFile
?方法則是將遠程數據轉化為本地索引的核心:
- 創建本地空文件:
localFileFactory.apply(...)
。 - 創建本地寫入器:
lookupStoreFactory.createWriter(...)
。 - 創建遠程讀取器:
fileReaderFactory.apply(file)
。 - 數據轉換與寫入:循環讀取遠程文件中的每條?
KeyValue
,通過?valueProcessor.persistToDisk(...)
?處理后,寫入本地文件。 - 封裝返回:將創建好的本地文件及其讀取器封裝成?
LookupFile
?對象返回。
createLookupFile(DataFileMeta file)
?
這個函數是?LookupLevels
?實現惰性本地緩存機制的關鍵。它的主要作用是:當需要在一個遠程數據文件(DataFileMeta
)中進行查找,但本地緩存又不存在時,由該函數負責將遠程數據文件轉換成本地優化的、可快速查找的索引文件(LookupFile
)。
下面我們分步驟、層層遞進地解析它的實現邏輯。
// ... existing code ...private LookupFile createLookupFile(DataFileMeta file) throws IOException {// 1. 創建本地臨時文件File localFile = localFileFactory.apply(file.fileName());if (!localFile.createNewFile()) {throw new IOException("Can not create new file: " + localFile);}// 2. 創建本地 KV 存儲的寫入器LookupStoreWriter kvWriter =lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount()));LookupStoreFactory.Context context;try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {// 3. 數據從遠程讀取并寫入本地KeyValue kv;if (valueProcessor.withPosition()) {// ... (處理帶位置信息的場景)} else {// ... (處理不帶位置信息的場景)}} catch (IOException e) {// 4. 異常處理:刪除不完整的本地文件FileIOUtils.deleteFileOrDirectory(localFile);throw e;} finally {// 5. 關閉寫入器并獲取上下文context = kvWriter.close();}// 6. 注冊到 ownCachedFilesownCachedFiles.add(file.fileName());// 7. 創建并返回 LookupFile 實例return new LookupFile(localFile,file,lookupStoreFactory.createReader(localFile, context),() -> ownCachedFiles.remove(file.fileName()));}
// ... existing code ...
創建本地臨時文件 (localFileFactory.apply
)
File localFile = localFileFactory.apply(file.fileName());
- 作用: 為即將創建的本地查找文件在磁盤上預留一個位置。
- 遞歸分析:
localFileFactory
?是一個?Function<String, File>
?類型的函數,在?LookupLevels
?實例化時由外部傳入。- 在典型的?
KeyValueFileStoreWrite
?中,它的實現是調用?ioManager.createChannel(prefix).getPathFile()
。 ioManager
?是 Paimon 的 I/O 管理器,它負責在配置的臨時目錄(java.io.tmpdir
?或用戶指定的目錄)下創建文件,并保證文件名唯一,通常會添加隨機后綴。- 文件名的前綴由?
LookupFile.localFilePrefix(...)
?生成,格式為?分區信息_bucket號_遠程文件名
,這保證了不同來源的文件生成的本地文件不會沖突。
創建本地 KV 存儲的寫入器 (lookupStoreFactory.createWriter
)
LookupStoreWriter kvWriter =lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount()));
- 作用: 獲取一個能向第一步創建的?
localFile
?中寫入鍵值對的?LookupStoreWriter
?對象。 - 遞歸分析:
lookupStoreFactory
: 這是本地查找文件格式的工廠。Paimon 支持多種格式,由?CoreOptions.LOOKUP_LOCAL_FILE_TYPE
?配置決定。HASH
?: 返回?HashLookupStoreFactory
。它創建的?HashLookupStoreWriter
?會構建一個基于哈希表的本地文件,提供 O(1) 的平均查找復雜度。SORT
: 返回?SortLookupStoreFactory
。它創建的?SortLookupStoreWriter
?會構建一個基于排序鍵的本地文件,通過二分查找進行定位。
bfGenerator.apply(file.rowCount())
: 這是一個布隆過濾器(Bloom Filter)生成器。createWriter
?會接收這個?BloomFilter.Builder
,并在寫入數據的同時,將所有的 Key 添加到布隆過濾器中。這個布隆過濾器最終也會被序列化到本地文件中,用于快速過濾掉不存在的 Key。
數據從遠程讀取并寫入本地
try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {// ... 循環讀取和寫入 ...
}
這是數據轉換的核心步驟。
- 作用: 從遠程的 SST 文件(如 Parquet)中逐條讀取?
KeyValue
,然后通過?kvWriter
?寫入本地查找文件。 - 遞歸分析:
fileReaderFactory.apply(file)
: 創建一個?RecordReader
?來讀取遠程的?DataFileMeta
。這個讀取器知道如何解析 Parquet/ORC/Avro 文件格式。reader.readBatch()
: 為了效率,數據是按批次讀取的。valueProcessor.withPosition()
: 這是一個判斷,詢問當前的?ValueProcessor
?策略是否需要原始數據在文件中的物理位置(Position)。- 如果為?
true
?(如?PositionedKeyValueProcessor
),則會調用?valueProcessor.persistToDisk(kv, batch.returnedPosition())
。 - 如果為?
false
?(如?KeyValueProcessor
?或?ContainsValueProcessor
),則調用?valueProcessor.persistToDisk(kv)
。
- 如果為?
valueProcessor.persistToDisk(...)
: 這是策略模式的應用。ValueProcessor
?決定了最終寫入本地查找文件的?value
?是什么。KeyValueProcessor
: 寫入完整的?value
、sequenceNumber
?和?RowKind
。ContainsValueProcessor
: 寫入一個空字節數組,極大地節省空間。PositionedKeyValueProcessor
: 寫入?rowPosition
?以及可選的?value
?等信息。
kvWriter.put(keyBytes, valueBytes)
: 將序列化后的?key
?和?value
?寫入本地查找文件。kvWriter
?內部會處理哈希沖突(HASH 模式)或排序(SORT 模式),并同步更新布隆過濾器。
異常處理
catch (IOException e) {FileIOUtils.deleteFileOrDirectory(localFile);throw e;
}
- 作用: 這是一個健壯性保證。如果在數據轉換過程中發生任何 I/O 異常(例如網絡中斷、磁盤寫滿),這個?
catch
?塊會確保被創建出來但不完整的本地臨時文件被刪除,避免留下垃圾文件。
關閉寫入器并獲取上下文
- 作用:?
kvWriter.close()
?是一個至關重要的步驟。它會完成所有收尾工作,例如:- 將內存中的緩沖區(buffer)刷寫到磁盤。
- 寫入文件元數據(metadata),比如布隆過濾器的序列化數據、哈希表的元信息、索引塊等。
- 返回一個?
LookupStoreFactory.Context
?對象,這個對象包含了讀取該文件所必需的元信息(比如文件總大小、布隆過濾器在文件中的偏移量等)。這個?context
?會在后續創建?LookupStoreReader
?時被傳入。
注冊到?ownCachedFiles
ownCachedFiles.add(file.fileName());
- 作用: 將這個新創建的本地文件的遠程文件名記錄在?
ownCachedFiles
?這個?Set
?中。這用于追蹤當前?LookupLevels
?實例創建了哪些緩存。當這個實例被?close()
?時,它只會清理自己創建的緩存,而不會影響其他實例創建的緩存。
創建并返回?LookupFile
?實例
return new LookupFile(localFile,file,lookupStoreFactory.createReader(localFile, context),() -> ownCachedFiles.remove(file.fileName()));
- 作用: 將所有資源封裝成一個?
LookupFile
?對象返回。 - 遞歸分析:
new LookupFile(...)
:?LookupFile
?是對一個本地查找文件的完整封裝。lookupStoreFactory.createReader(localFile, context)
: 使用與寫入時相同的工廠,并傳入之前獲取的?context
,來創建一個?LookupStoreReader
。這個?reader
?知道如何解析這個本地文件并執行快速查找。() -> ownCachedFiles.remove(file.fileName())
: 傳入一個?Runnable
?回調。這個回調會在?LookupFile
?從?Caffeine
?緩存中被移除時調用,從而將文件名從?ownCachedFiles
?中也移除,保持狀態同步。
ValueProcessor
?策略接口
LookupLevels
?提供了三種?ValueProcessor
?實現,以應對不同場景:
KeyValueProcessor
: 用于需要獲取完整?KeyValue
?的場景。它會將?value
、sequenceNumber
?和?valueKind
?都序列化后存入本地文件。ContainsValueProcessor
: 用于僅需判斷 Key 是否存在的場景(例如 Lookup Join)。它的?persistToDisk
?直接返回一個空字節數組,極大地減小了本地索引文件的大小。readFromDisk
?則直接返回?true
。PositionedKeyValueProcessor
: 用于需要知道數據在原文件中物理位置的場景,這對于實現 Deletion Vector(刪除向量)至關重要。它可以選擇是否將?value
?也一并存入本地文件。
生命周期與緩存清理
LookupLevels
?實現了兩個接口來管理生命周期:
Closeable
: 當?LookupLevels
?對象關閉時(例如一個 Flink Task 結束),它的?close()
?方法會被調用,從而將它自己創建的那些緩存項(記錄在?ownCachedFiles
?中)從共享的?lookupFileCache
?中移除。Levels.DropFileCallback
: 它把自己注冊為?Levels
?的一個回調。當 Compaction 等操作導致某個遠程數據文件被刪除時,Levels
?會通過?notifyDropFile
?方法通知?LookupLevels
,后者會立即將該文件對應的本地緩存項從?lookupFileCache
?中移除,保證了緩存與實際數據的一致性。
總結
LookupLevels
?是 Paimon 高性能查找能力的關鍵。它通過惰性本地緩存、可插拔的 KV 存儲和靈活的?ValueProcessor
?策略,巧妙地解決了在存算分離架構下,如何高效地對遠程大規模數據進行點查的問題。其設計兼顧了性能、靈活性和健壯的緩存管理,是理解 Paimon 讀取鏈路的一個絕佳范例。
lookup
?方法的調用鏈
LookupLevels.lookup
?方法本身非常簡潔,它將所有復雜的邏輯都委托給了?LookupUtils.lookup
。
// ... existing code ...@Nullablepublic T lookup(InternalRow key, int startLevel) throws IOException {return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);}
// ... existing code ...
這里的關鍵是理解兩個方法引用:this::lookup
?和?this::lookupLevel0
。它們分別指向?LookupLevels
?類中其他同名但參數不同的?lookup
?和?lookupLevel0
?私有方法。
this::lookup
?對應的是?private T lookup(InternalRow key, SortedRun level)
。this::lookupLevel0
?對應的是?private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0)
。
現在,我們來看?LookupUtils.lookup
?的源碼,看看它是如何使用這兩個方法引用的。
第一層:LookupUtils.lookup(levels, ...)
?- 頂層循環
LookupUtils.java
// ... existing code ...public static <T> T lookup(Levels levels,InternalRow key,int startLevel,BiFunctionWithIOE<InternalRow, SortedRun, T> lookup, // 對應 this::lookupBiFunctionWithIOE<InternalRow, TreeSet<DataFileMeta>, T> level0Lookup) // 對應 this::lookupLevel0throws IOException {T result = null;// 1. 從 startLevel 開始,逐層向上查找for (int i = startLevel; i < levels.numberOfLevels(); i++) {if (i == 0) {// 2. 如果是 Level 0,調用傳入的 level0Lookup 函數result = level0Lookup.apply(key, levels.level0());} else {// 3. 如果是 Level 1+,調用傳入的 lookup 函數SortedRun level = levels.runOfLevel(i);result = lookup.apply(key, level);}// 4. 一旦找到結果,立即跳出循環if (result != null) {break;}}return result;}
// ... existing code ...
調用過程分析:
LookupUtils.lookup
?接收到?LookupLevels
?傳來的兩個方法引用,分別命名為?lookup
?和?level0Lookup
。- 它開始一個?
for
?循環,從?startLevel
?開始遍歷 LSM-Tree 的每一層。 - 當?
i == 0
?時:它調用?level0Lookup.apply(...)
。這實際上就是調用了?LookupLevels.lookupLevel0(key, levels.level0())
。 - 當?
i > 0
?時:它調用?lookup.apply(...)
。這實際上就是調用了?LookupLevels.lookup(key, levels.runOfLevel(i))
。 - 只要任何一次調用返回了非?
null
?的結果,循環就會終止,并返回該結果。這符合 LSM-Tree 從新數據(低 Level)向舊數據(高 Level)查找的原則。
第二層:深入?LookupLevels
?的私有方法
現在我們來看被調用的那兩個私有方法內部又做了什么。
LookupLevels.lookupLevel0
// ... existing code ...@Nullableprivate T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0) throws IOException {return LookupUtils.lookupLevel0(keyComparator, key, level0, this::lookup);}
// ... existing code ...
這里又出現了一次委托!它調用了?LookupUtils.lookupLevel0
,并且又傳遞了一個方法引用?this::lookup
。這次的?this::lookup
?對應的是?private T lookup(InternalRow key, DataFileMeta file)
。
LookupLevels.lookup(key, level)
// ... existing code ...@Nullableprivate T lookup(InternalRow key, SortedRun level) throws IOException {return LookupUtils.lookup(keyComparator, key, level, this::lookup);}
// ... existing code ...
同樣,這里也委托給了?LookupUtils.lookup
?的另一個重載版本,并再次傳遞了方法引用?this::lookup
,同樣對應?private T lookup(InternalRow key, DataFileMeta file)
。
第三層:LookupUtils
?的具體查找邏輯
現在我們進入?LookupUtils
?的具體實現,看看它們如何使用第三層傳遞進來的?this::lookup
?方法引用。
LookupUtils.lookupLevel0
// ... existing code ...public static <T> T lookupLevel0(Comparator<InternalRow> keyComparator,InternalRow target,TreeSet<DataFileMeta> level0,BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup) // 對應 this::lookup(key, file)throws IOException {T result = null;// 遍歷 Level 0 的所有文件for (DataFileMeta file : level0) {// 檢查 key 是否在文件的 [minKey, maxKey] 范圍內if (keyComparator.compare(file.maxKey(), target) >= 0&& keyComparator.compare(file.minKey(), target) <= 0) {// 如果在范圍內,就調用傳入的 lookup 函數result = lookup.apply(target, file);if (result != null) {// 找到就返回(Level 0 內部文件按新舊排序,所以第一個找到的就是最新的)return result;}}}return null;}
// ... existing code ...
LookupUtils.lookup(keyComparator, ...)
LookupUtils.java
// ... existing code ...public static <T> T lookup(Comparator<InternalRow> keyComparator,InternalRow target,SortedRun level,BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup) // 對應 this::lookup(key, file)throws IOException {// ...// 對 Level 1+ 的文件列表進行二分查找,找到可能包含 key 的那個文件// ... (binary search logic) ...List<DataFileMeta> files = level.files();int left = 0;int right = files.size() - 1;// binary search restart positions to find the restart position immediately before the// targetKeywhile (left < right) {int mid = (left + right) / 2;if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {// Key at "mid.max" is < "target". Therefore all// files at or before "mid" are uninteresting.left = mid + 1;} else {// Key at "mid.max" is >= "target". Therefore all files// after "mid" are uninteresting.right = mid;}}int index = right;// ...// 如果找到了對應的文件if (index < files.size()&& keyComparator.compare(files.get(index).minKey(), target) <= 0) {// 調用傳入的 lookup 函數return lookup.apply(target, files.get(index));}return null;}
// ... existing code ...
第四層:最終的執行體
經過層層傳遞,最終所有邏輯都匯聚到了?LookupLevels.lookup(InternalRow key, DataFileMeta file)
?這個方法。
// ... existing code ...@Nullableprivate T lookup(InternalRow key, DataFileMeta file) throws IOException {// 1. 檢查本地緩存LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());// 2. 如果緩存沒有,創建本地查找文件if (lookupFile == null) {lookupFile = createLookupFile(file);// ...}// 3. 在本地查找文件中執行 get 操作valueBytes = lookupFile.get(keyBytes);// ...// 4. 處理結果并返回return valueProcessor.readFromDisk(...);}
// ... existing code ...
這個方法是真正干活的地方:檢查和使用緩存、創建本地查找文件、從本地文件中讀取數據。
總結調用鏈
整個過程是一個精巧的委托鏈條:
LookupLevels.lookup(key, startLevel)
?(入口)- 委托給?
LookupUtils.lookup(levels, ...)
,并告訴它:“當你需要處理 Level 0 時,請調用我的?lookupLevel0
?方法;當你需要處理 Level 1+ 時,請調用我的?lookup(key, level)
?方法。”
- 委托給?
LookupUtils.lookup(levels, ...)
?(頂層循環)- 根據當前層級,調用?
LookupLevels
?對應的私有方法。
- 根據當前層級,調用?
LookupLevels.lookupLevel0
?或?LookupLevels.lookup(key, level)
?(中間層)- 這兩個方法再次委托給?
LookupUtils
?中更具體的實現 (lookupLevel0
?或?lookup(keyComparator, ...)
?),并告訴它們:“當你定位到具體要查哪個?DataFileMeta
?文件時,請調用我的?lookup(key, file)
?方法。”
- 這兩個方法再次委托給?
LookupUtils.lookupLevel0
?或?LookupUtils.lookup(keyComparator, ...)
?(定位層)- 它們負責遍歷(Level 0)或二分查找(Level 1+)來定位到具體的文件。
- 一旦定位到?
DataFileMeta
,就調用最終的方法引用,即?LookupLevels.lookup(key, file)
。
LookupLevels.lookup(key, file)
?(執行層)- 這是所有邏輯的終點,負責與緩存和本地文件系統交互,完成最終的查找。
這種設計將**“遍歷/查找策略”(在?LookupUtils
?中)和“具體執行邏輯”**(在?LookupLevels
?中)完美地解耦開來,使得代碼結構清晰,復用性強。
LookupFile
LookupFile
?是 Paimon 本地查找緩存機制的物理載體和邏輯封裝。當?LookupLevels
?決定將一個遠程數據文件(SST 文件)緩存到本地時,最終產物就是一個?LookupFile
?對象。這個對象不僅代表了本地磁盤上的一個物理文件,還封裝了對該文件的所有操作和生命周期管理。
LookupFile
?的職責非常清晰和集中:
- 封裝本地查找文件:它持有一個本地?
File
?對象和一個?LookupStoreReader
,提供了對這個本地優化文件的統一訪問入口。 - 關聯遠程文件:它內部保存了原始遠程文件?
DataFileMeta
?的引用,明確了這個本地緩存的來源。 - 提供查找功能:通過?
get(byte[] key)
?方法,利用內部的?LookupStoreReader
?在本地文件上執行快速的 Key-Value 查找。 - 管理生命周期:通過?
close()
?方法,負責關閉文件讀取器、刪除本地物理文件,并執行必要的回調。 - 作為緩存的 Value:
LookupFile
?對象本身就是?Caffeine
?緩存中的?Value
,與遠程文件名(Key
)一一對應。
關鍵成員變量(構造函數解析)
LookupFile
?的所有核心組件都在構造時傳入,清晰地定義了它的構成。
// ... existing code ...private final File localFile;private final DataFileMeta remoteFile;private final LookupStoreReader reader;private final Runnable callback;private long requestCount;private long hitCount;private boolean isClosed = false;public LookupFile(File localFile, DataFileMeta remoteFile, LookupStoreReader reader, Runnable callback) {this.localFile = localFile;this.remoteFile = remoteFile;this.reader = reader;this.callback = callback;}
// ... existing code ...
localFile
: 指向本地磁盤上物理文件的?java.io.File
?對象。這是查找操作的物理基礎。remoteFile
: 對應的遠程數據文件的元數據(DataFileMeta
)。用于追溯來源和調試。reader
:?核心查找器。這是一個?LookupStoreReader
?接口的實例,它知道如何解析?localFile
?的二進制格式并高效地查找 Key。具體的實現可能是?HashLookupStoreReader
?或?SortLookupStoreReader
,取決于創建時使用的?LookupStoreFactory
。callback
: 一個?Runnable
?對象。這是一個非常重要的回調函數。當?LookupFile
?被關閉或從緩存中移除時,這個回調會被執行。在?LookupLevels
?中,這個回調的作用是?ownCachedFiles.remove(file.fileName())
,用于維護?LookupLevels
?內部的狀態一致性。requestCount
,?hitCount
: 用于統計這個本地文件的訪問情況,便于監控緩存效率。isClosed
: 狀態標記,防止對一個已經關閉的?LookupFile
?進行操作。
get(byte[] key)
// ... existing code ...@Nullablepublic byte[] get(byte[] key) throws IOException {checkArgument(!isClosed);requestCount++;byte[] res = reader.lookup(key);if (res != null) {hitCount++;}return res;}
// ... existing code ...
這是?LookupFile
?最主要的功能方法。它將查找請求直接委托給內部的?reader.lookup(key)
。LookupStoreReader
?會利用布隆過濾器、哈希索引或二分查找等技術,在?localFile
?中快速定位并返回與?key
?對應的?value
?字節數組。
close(RemovalCause cause)
// ... existing code ...public void close(RemovalCause cause) throws IOException {reader.close();isClosed = true;callback.run();LOG.info("Delete Lookup file {} due to {}. Access stats: requestCount={}, hitCount={}, size={}KB",
// ... existing code ...FileIOUtils.deleteFileOrDirectory(localFile);}
// ... existing code ...
這是?LookupFile
?的生命周期終點。它執行一系列清理操作:
reader.close()
: 關閉底層的?LookupStoreReader
,釋放文件句柄等資源。isClosed = true
: 更新狀態。callback.run()
: 執行回調,通知其創建者(LookupLevels
)自己已被銷毀。LOG.info(...)
: 打印一條詳細的日志,說明文件被刪除的原因(RemovalCause
)、訪問統計和文件大小,這對于問題排查和性能調優非常有價值。FileIOUtils.deleteFileOrDirectory(localFile)
:?從本地磁盤上刪除物理文件,釋放磁盤空間。
靜態工廠與緩存集成 (createCache
)
LookupFile
?類還包含一組非常重要的靜態方法,用于創建和配置?Caffeine
?緩存。這使得緩存的創建邏輯與?LookupFile
?本身緊密耦合,體現了其作為“可緩存對象”的設計意圖。
// ... existing code ...public static Cache<String, LookupFile> createCache(Duration fileRetention, MemorySize maxDiskSize) {return Caffeine.newBuilder().expireAfterAccess(fileRetention).maximumWeight(maxDiskSize.getKibiBytes()).weigher(LookupFile::fileWeigh).removalListener(LookupFile::removalCallback).executor(Runnable::run).build();}
// ... existing code ...
這里配置了?Caffeine
?緩存的幾個關鍵策略:
.expireAfterAccess(fileRetention)
:?基于訪問時間的淘汰策略。如果一個?LookupFile
?在?fileRetention
?時間內沒有被訪問過,它就會被標記為過期,并可能被緩存淘汰。這是避免冷數據永久占用磁盤空間的關鍵。.maximumWeight(maxDiskSize.getKibiBytes())
:?基于容量的淘汰策略。限制了所有本地查找文件占用的總磁盤空間。.weigher(LookupFile::fileWeigh)
: 指定了如何計算每個緩存項的“權重”。fileWeigh
?方法返回本地文件的大小(以 KB 為單位)。當總權重超過?maximumWeight
?時,Caffeine
?會開始淘汰緩存項。.removalListener(LookupFile::removalCallback)
:?核心清理機制。當一個?LookupFile
?因為任何原因(過期、容量超限、手動刪除)被緩存移除時,removalCallback
?方法會被調用。這個回調方法會調用?lookupFile.close(cause)
,從而觸發前面分析的清理流程,確保本地物理文件被刪除。
?Caffeine 內部有自己的維護線程,用于處理過期檢查、淘汰計算等任務。默認情況下,為了不阻塞這些核心維護線程,removalListener
?會被提交到一個默認的?Executor
?(通常是?ForkJoinPool.commonPool()
) 中異步執行。
然而,Paimon 在這里通過?.executor(Runnable::run)
?改變了默認行為。
Runnable::run
?是一個方法引用,它等價于一個?Executor
?的實現:
// 偽代碼
class SameThreadExecutor implements Executor {@Overridepublic void execute(Runnable command) {command.run(); // 直接在當前線程執行,而不是提交到線程池}
}
所以,.executor(Runnable::run)
?的確切含義是:
“請不要將?removalListener
?的任務異步地提交到線程池,而是在觸發這個移除事件的那個線程中,立即、同步地執行它。”
為什么 Paimon 要這么做?
選擇同步執行而不是異步執行,通常是出于以下考慮:
及時性和確定性:?
LookupFile
?的關閉操作涉及到文件 I/O(刪除本地文件)。通過同步執行,可以確保一旦 Caffeine 決定移除一個文件,這個文件會立即被刪除。這避免了在異步執行模型下可能出現的延遲——即緩存已經認為條目被移除了,但對應的磁盤文件由于線程池繁忙等原因,可能過了一小段時間才被真正刪除。對于需要精確控制磁盤空間的場景,這種確定性很重要。簡化資源管理: 同步執行可以簡化并發控制。由于回調是在 Caffeine 的內部維護線程中同步執行的,可以減少多線程并發訪問和修改?
LookupFile
?狀態的復雜性。任務輕量:?
lookupFile.close()
?操作雖然是 I/O,但刪除單個文件通常是一個非常快速的操作。Paimon 的設計者可能認為這個操作足夠輕量,不會對 Caffeine 的內部線程造成明顯的阻塞,因此同步執行的開銷是可以接受的。
本地文件名生成 (localFilePrefix
)
// ... existing code ...public static String localFilePrefix(RowType partitionType, BinaryRow partition, int bucket, String remoteFileName) {if (partition.getFieldCount() == 0) {return String.format("%s-%s", bucket, remoteFileName);} else {String partitionString = partToSimpleString(partitionType, partition, "-", 20);return String.format("%s-%s-%s", partitionString, bucket, remoteFileName);}}
// ... existing code ...
這個靜態方法定義了本地查找文件的命名規則,格式為?分區信息-bucket號-遠程文件名
。這樣做的好處是:
- 唯一性:保證了不同分區、不同 Bucket、不同遠程文件生成的本地緩存文件不會重名。
- 可讀性:通過文件名就可以大致了解這個緩存文件對應的數據來源,方便人工排查問題。
DataFileMeta
?vs.?LookupFile
這兩者代表了不同層面的東西,它們的存儲位置也不同:
DataFileMeta
:?對象本身在內存中,它描述的數據在遠程磁盤上。DataFileMeta
?是一個輕量級的 Java 對象,它本身存在于 JVM 內存中。- 它并不包含實際的數據,而是數據的“元信息”,比如數據文件的路徑(通常在 HDFS、S3 等遠程存儲上)、文件大小、行數、主鍵的最小/最大值等。
- 你可以把它理解為一個指向遠程數據文件的“指針”或“描述符”。
LookupFile
:?對象本身在內存中,它管理的數據在本地磁盤上。LookupFile
?也是一個 Java 對象,存在于 JVM 內存中(具體來說,是作為?lookupFileCache
?的?value
)。- 它的核心職責是管理一個為了加速查詢而創建在本地磁盤上的臨時文件。這個本地文件是遠程數據文件(由?
DataFileMeta
?指向)的一個經過優化的副本。 - 當需要進行 lookup 時,Paimon 會將遠程文件拉取到本地,并可能轉換成更適合快速隨機查找的格式(如哈希索引文件),這個本地文件由?
LookupFile
?對象來管理。
Caffeine 自帶寫磁盤功能?
這是一個非常關鍵的問題。答案是:不,Caffeine 本身是一個純粹的、高性能的?內存?緩存庫。?它不提供原生的將緩存項寫入磁盤的功能。
那么 Paimon 是如何實現用它來管理磁盤文件的呢?
Paimon 在這里使用了一種非常巧妙的設計模式,利用 Caffeine 的能力來管理本地磁盤文件的?生命周期,而不是用它來存儲文件內容本身。
具體實現機制如下:
緩存的不是文件內容,而是文件句柄:Paimon 存入 Caffeine 緩存的?
value
?不是文件的字節內容,而是?LookupFile
?這個 Java 對象。這個對象內部持有了本地磁盤文件的路徑。權重計算基于磁盤大小:Caffeine 的容量限制是通過?
weigher
?來計算每個緩存項的“權重”的。Paimon 在配置?lookupFileCache
?時,提供的?weigher
?(LookupFile::fileWeigh
) 計算的不是?LookupFile
?對象的內存大小,而是它所指向的本地磁盤文件的實際大小。// ... existing code ... public static Cache<String, LookupFile> createCache(Duration fileRetention, MemorySize maxDiskSize) {return Caffeine.newBuilder().expireAfterAccess(fileRetention)// 這里的 maximumWeight 是基于磁盤大小的.maximumWeight(maxDiskSize.getKibiBytes())// weigher 計算的是本地文件在磁盤上的大小.weigher(LookupFile::fileWeigh).removalListener(LookupFile::removalCallback).executor(Runnable::run).build(); }private static int fileWeigh(String file, LookupFile lookupFile) {// 返回本地文件的大小(以 KB 為單位)return fileKibiBytes(lookupFile.localFile); } // ... existing code ...
這樣一來,Caffeine 的容量管理就從“內存大小”變成了“磁盤空間大小”。
利用淘汰監聽器刪除文件:當緩存占用的總“權重”(即總磁盤空間)超過閾值時,Caffeine 會根據其淘汰策略(如 LRU)移除一個?
LookupFile
?對象。此時,會觸發注冊的?removalListener
?(LookupFile::removalCallback
)。在這個監聽器中,Paimon 的代碼會主動地去刪除該?LookupFile
?對象所對應的本地磁盤文件,從而釋放磁盤空間。// ... existing code ... private static void removalCallback(String file, LookupFile lookupFile, RemovalCause cause) {if (lookupFile != null) {try {// 當緩存項被淘汰時,關閉并刪除對應的本地磁盤文件lookupFile.close(cause);} catch (IOException e) {throw new UncheckedIOException(e);}} }public void close(RemovalCause cause) throws IOException {reader.close();isClosed = true;callback.run();LOG.info("Delete Lookup file {} due to {}. Access stats: requestCount={}, hitCount={}, size={}KB",localFile.getName(),cause,requestCount,hitCount,localFile.length() >> 10);FileIOUtils.deleteFileOrDirectory(localFile);} // ... existing code ...
綜上所述,Paimon 并不是在使用 Caffeine 的某個隱藏的磁盤功能,而是將 Caffeine 作為了一個非常高效的本地磁盤文件生命周期管理器。Caffeine 負責決定“何時”淘汰哪個文件,而 Paimon 的代碼則負責執行實際的“文件刪除”操作。這是一個非常優雅的組合設計。