LocalTableQuery
LocalTableQuery
?是 Paimon 中實現本地化、帶緩存的表查詢的核心引擎。它的主要應用場景是 Flink 中的 Lookup Join。當 Flink 作業需要根據一個流中的 Key 去關聯一個 Paimon 維表時,LocalTableQuery
?可以在 Flink 的 TaskManager 節點上,通過將 Paimon 表的遠程數據文件拉取到本地并建立索引,來實現高性能的低延遲點查。
LocalTableQuery
?實現了?TableQuery
?接口,其核心職責可以概括為:
- 管理表數據視圖:在內存中維護一個按分區(Partition)和桶(Bucket)組織的表數據文件視圖。
- 提供點查(Lookup)能力:對外提供?
lookup(partition, bucket, key)
?方法,用于根據主鍵查詢單條數據。 - 動態更新數據:提供?
refreshFiles
?方法,可以增量地更新內存中的文件視圖,以響應 Paimon 表的數據變更(新的 Commit)。 - 整合本地緩存機制:它是?
LookupLevels
?和?LookupFile
?的頂層封裝和協調者,負責創建和管理這些底層的緩存查詢組件。 - 配置與初始化:負責初始化所有進行本地查詢所需要的組件,如文件讀取器、本地 IO 管理器、緩存策略等。
關鍵成員變量
LocalTableQuery
?的成員變量揭示了它的內部結構和依賴。
// ... existing code ...
public class LocalTableQuery implements TableQuery {private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> tableView;private final CoreOptions options;private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;private final LookupStoreFactory lookupStoreFactory;private final int startLevel;private IOManager ioManager;@Nullable private Cache<String, LookupFile> lookupFileCache;private final RowType rowType;private final RowType partitionType;@Nullable private Filter<InternalRow> cacheRowFilter;
// ... existing code ...
tableView
:?核心數據結構。這是一個嵌套的 Map (Map<Partition, Map<Bucket, LookupLevels>>
),它在內存中構建了整個表的邏輯視圖。通過它,可以快速定位到任意分區任意桶的數據查詢處理器?LookupLevels
。options
: 表的配置項,決定了緩存策略、Merge-Engine 行為等。keyComparatorSupplier
: 主鍵比較器的提供者。readerFactoryBuilder
: 用于構建讀取遠程數據文件(如 Parquet)的?KeyValueFileReaderFactory
。它封裝了文件格式、數據類型等信息。lookupStoreFactory
: 用于創建本地查找文件(LookupFile
)的工廠。startLevel
: 查詢起始的 Level。對于?partial-update
?或?aggregation
?等需要合并歷史數據的場景,需要從 Level 1 開始查(needLookup()
?為 true);對于?deduplicate
?這種新數據直接覆蓋舊數據的場景,可以從 Level 0 開始查,性能更高。ioManager
: 本地磁盤 I/O 管理器,負責創建本地緩存文件所需的臨時目錄和文件。lookupFileCache
:?共享的緩存實例。這是一個?Caffeine
?緩存,用于存儲?LookupFile
?對象。這個緩存可以在同一個 TaskManager 的多個?LocalTableQuery
?實例間共享,避免對同一個遠程文件重復創建本地緩存。cacheRowFilter
: 一個可選的過濾器,可以在創建本地緩存文件時,預先過濾掉不需要的數據,從而減小本地緩存文件的大小。
構造函數?LocalTableQuery(FileStoreTable table)
負責從?FileStoreTable
?中提取所有必要信息,并初始化各個工廠和配置。它會檢查表的類型(必須是帶主鍵的?KeyValueFileStore
),并根據表的配置(merge-engine
,?sequence.field
?等)來決定?startLevel
,這是對查詢行為的一個重要優化。
refreshFiles(...)
// ... existing code ...public void refreshFiles(BinaryRow partition,int bucket,List<DataFileMeta> beforeFiles,List<DataFileMeta> dataFiles) {LookupLevels<KeyValue> lookupLevels =tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket);if (lookupLevels == null) {Preconditions.checkArgument(beforeFiles.isEmpty(),"The before file should be empty for the initial phase.");newLookupLevels(partition, bucket, dataFiles);} else {lookupLevels.getLevels().update(beforeFiles, dataFiles);}}
// ... existing code ...
這是實現數據動態更新的關鍵。當 Paimon 表有新的數據提交時,外部調用者(如?PrimaryKeyPartialLookupTable
)會通過?StreamTableScan
?感知到文件的變化(beforeFiles
?是被刪除的文件,dataFiles
?是新增的文件),然后調用此方法。
- 如果?
lookupLevels
?不存在(即第一次加載該分區/桶的數據),則調用?newLookupLevels
?創建一個全新的查詢處理器。 - 如果?
lookupLevels
?已存在,則調用其?update
?方法,增量地更新內部的文件列表,同時清理掉被刪除文件(beforeFiles
)對應的本地緩存。
newLookupLevels(...)
這個私有方法是真正創建查詢處理器的地方。當需要為一個新的分區/桶建立查詢能力時,此方法被調用。
// ... existing code ...private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels());// ...KeyValueFileReaderFactory factory =readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory());// ...if (lookupFileCache == null) {lookupFileCache =LookupFile.createCache(options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));}LookupLevels<KeyValue> lookupLevels =new LookupLevels<>(levels,// ... 傳入各種工廠和配置 ...lookupFileCache);tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);}
// ... existing code ...
它執行了以下關鍵步驟:
- 初始化?
Levels
: 將文件列表組織成 LSM-Tree 的分層結構。 - 構建文件讀取工廠?
factory
: 準備好讀取遠程文件的能力。 - 創建共享緩存?
lookupFileCache
: 如果是第一次創建,則根據配置初始化?Caffeine
?緩存。 - 實例化?
LookupLevels
: 將上面準備好的所有組件(Levels
、比較器、各種工廠、共享緩存等)組裝起來,創建一個?LookupLevels
?實例。 - 存入?
tableView
: 將新創建的?lookupLevels
?實例放入?tableView
?中,以備后續查詢使用。
lookup(...)
// ... existing code ...@Nullable@Overridepublic synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)throws IOException {Map<Integer, LookupLevels<KeyValue>> buckets = tableView.get(partition);if (buckets == null || buckets.isEmpty()) {return null;}LookupLevels<KeyValue> lookupLevels = buckets.get(bucket);if (lookupLevels == null) {return null;}KeyValue kv = lookupLevels.lookup(key, startLevel);if (kv == null || kv.valueKind().isRetract()) {return null;} else {return kv.value();}}
// ... existing code ...
這是對外提供查詢服務的入口。
- 從?
tableView
?中根據?partition
?和?bucket
?找到對應的?LookupLevels
?實例。 - 調用?
lookupLevels.lookup(key, startLevel)
,執行真正的、帶緩存的查找邏輯。 - 對返回的?
KeyValue
?進行處理,如果數據不存在,或者是一條刪除標記(Retract
),則返回?null
,否則返回其?value
。 synchronized
?關鍵字表明當前實現是線程安全的,但注釋也指出了未來的優化方向是移除它以支持多線程并發查詢。
close()
close()
?方法負責釋放資源。它會遍歷?tableView
?中所有的?LookupLevels
?實例并調用它們的?close()
?方法。這會觸發?LookupLevels
?清理它自己創建的緩存項。最后,它會清空整個共享緩存?lookupFileCache
,確保所有本地文件都被刪除,所有資源都被釋放。
總結
LocalTableQuery
?是一個上層的協調和管理類。它本身不執行具體的 I/O 或查找算法,而是通過巧妙地組織和管理?LookupLevels
、LookupFile
、KeyValueFileReaderFactory
?等底層組件,構建了一個完整的、動態的、高性能的本地化查詢服務。它將 Paimon 表的靜態文件視圖,轉化為了一個可以響應數據更新、并提供低延遲點查能力的“活”的查詢引擎,是支撐 Flink Lookup Join 功能的基石。
誰主動監控元數據變化?
LocalTableQuery.refreshFiles(...)
?本身是一個被動的方法,它只負責接收文件更新信息并應用到內存視圖中。監控文件變化的職責由更上層的、與計算引擎(如 Flink)緊密集成的類來承擔。
在 Flink Lookup Join 的場景下,主要的監控和調用者是?PrimaryKeyPartialLookupTable
。
我們來梳理一下這個調用鏈:
監控者:PrimaryKeyPartialLookupTable
這個類內部有一個?LookupTable
?實現,其中包含一個?refresh()
?方法。這個方法是監控和刷新邏輯的核心。
// ... existing code ...@Overridepublic void refresh() {while (true) {// 1. 規劃一次增量掃描,獲取文件的變更List<Split> splits = scan.plan().splits();log(splits);// 2. 如果沒有新的變更,說明數據已是最新,退出循環if (splits.isEmpty()) {return;}// 3. 遍歷所有的變更(每個 split 代表一個 bucket 的變更)for (Split split : splits) {BinaryRow partition = ((DataSplit) split).partition();int bucket = ((DataSplit) split).bucket();List<DataFileMeta> before = ((DataSplit) split).beforeFiles();List<DataFileMeta> after = ((DataSplit) split).dataFiles();// 4. 將文件變更信息傳遞給 LocalTableQuerytableQuery.refreshFiles(partition, bucket, before, after);}}}
// ... existing code ...
分析:
- 誰在監控??核心是?
scan.plan().splits()
?這行代碼。這里的?scan
?是一個?StreamTableScan
?實例。它被配置為增量模式,每次調用?plan()
?時,它會從上一次結束的位置開始,掃描 Paimon 的 Manifest 文件,找出從上次掃描到現在發生變化的文件。 - 監控到了什么??
scan.plan()
?的結果是一系列的?DataSplit
。每個?DataSplit
?都封裝了一個 Bucket 內的數據文件變化,其中:dataFiles()
?(即?after
): 本次掃描發現的新增文件。beforeFiles()
?(即?before
): 因為 Compaction 操作而被替換掉的舊文件。
- 如何觸發更新??
refresh()
?方法拿到這些?DataSplit
?后,從中解析出分區、Bucket、新增文件和過期文件列表,然后調用我們關注的?tableQuery.refreshFiles(...)
?方法,將這些元數據變更信息傳遞下去。
觸發時機:FileStoreLookupFunction
那么,PrimaryKeyPartialLookupTable.refresh()
?方法又是在什么時候被調用的呢?這通常由 Flink 的?RichAsyncFunction
?實現,即?FileStoreLookupFunction
?來控制。
FileStoreLookupFunction.java
// ... existing code ...void tryRefresh() throws Exception {// ... (省略黑名單和動態分區檢查) ...// 3. refresh lookup table// 檢查是否到達了刷新間隔if (shouldRefreshLookupTable()) {// 調用上一節的 refresh 方法lookupTable.refresh();// 更新下一次刷新時間nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();}}
// ... existing code ...
分析:
FileStoreLookupFunction
?會在處理數據(asyncInvoke
)的間隙或者通過定時器,周期性地調用?tryRefresh()
?方法。這個方法會根據用戶配置的刷新間隔(lookup.refresh-interval
)來決定是否需要調用?lookupTable.refresh()
,從而啟動一輪新的文件變更監控。
總結
整個監控和刷新流程如下:
- 定時觸發 (The "When"):?
FileStoreLookupFunction
?作為 Flink 的一個算子的核心邏輯,根據配置的刷新間隔,定時觸發刷新操作。 - 調用刷新 (The "Trigger"):?
FileStoreLookupFunction
?調用?PrimaryKeyPartialLookupTable.refresh()
?方法。 - 監控變更 (The "Monitor"):?
refresh()
?方法內部通過?StreamTableScan.plan()
?掃描 Paimon 的 Manifest 文件,獲取到自上次掃描以來的文件增量變化(新增和被替換的文件)。 - 應用更新 (The "Action"):?
refresh()
?方法將獲取到的文件變更信息,最終調用?LocalTableQuery.refreshFiles()
,將更新應用到本地查詢引擎的內存視圖中。
所以,LocalTableQuery
?只是一個“執行者”,真正的“監控者”是?StreamTableScan
,而“調度者”則是?PrimaryKeyPartialLookupTable
?和?FileStoreLookupFunction
。這是一個典型的分層設計,各司其職,非常清晰。
RemoteTableQuery 和 LocalTableQuery 的區別
這兩個類都實現了 TableQuery
接口,提供了對 Paimon 表進行 Key-Value 方式點查(Lookup)的能力,但它們的實現機制和適用場景截然不同,代表了 Paimon 在 Lookup Join 場景下的兩種核心策略:本地緩存模式和遠程服務模式。
核心區別:架構與數據流
1. LocalTableQuery (本地緩存模式)
去中心化。查詢邏輯和數據緩存完全發生在 Flink 的 TaskManager 進程內部。每個需要進行 Lookup Join 的 Flink Task 都會在自己的本地磁盤上創建和管理一份數據文件的緩存。
??數據流??:
- ??拉取 (Pull)??:
Flink Task 啟動時,LocalTableQuery
會根據需要查詢的數據,主動從遠程存儲(如 HDFS、S3)將 Paimon 的 SST 文件(如 Parquet)整個拉取到 TaskManager 的本地磁盤。 - ??轉換與索引??:
拉取到本地后,它會將 SST 文件轉換為一種更適合點查的格式(如哈希索引文件),并建立布隆過濾器等索引結構。這個過程由LookupStoreFactory
和LookupFile
負責。 - ??查詢??:
后續的lookup
請求直接在本地磁盤上的索引文件進行,速度極快,不涉及網絡 IO。 - ??刷新??:
通過refreshFiles
機制,定期從遠程存儲拉取最新的增量文件,并更新本地緩存。
??代碼體現??:
持有 LookupLevels
、LookupFileCache
等大量與本地緩存、文件管理相關的組件。構造函數和 newLookupLevels
方法中充滿了創建本地文件、管理本地 IO (IOManager
)、配置緩存策略的邏輯。
// LocalTableQuery.java
public class LocalTableQuery implements TableQuery {private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> tableView;// ...private IOManager ioManager;@Nullable private Cache<String, LookupFile> lookupFileCache;// ... existing code ...
}
?RemoteTableQuery (遠程服務模式)
客戶端-服務器 (Client-Server)。它依賴一個獨立部署的、專門用于提供查詢服務的集群(Query Service)。RemoteTableQuery
本身只是一個輕量級的客戶端。
??數據流??:
- ??服務發現??:
RemoteTableQuery
啟動時,通過ServiceManager
去發現和定位遠程 Query Service 的地址。 - ??RPC 調用??:
當lookup
請求發生時,RemoteTableQuery
將要查詢的partition
、bucket
、key
等信息打包,通過網絡 RPC 調用發送給遠程的 Query Service。 - ??遠程處理??:
Query Service 接收到請求后,在其內部執行真正的查詢邏輯(其內部可能也使用了類似LocalTableQuery
的機制來加速查詢),然后將結果返回給客戶端。 - ??接收結果??:
RemoteTableQuery
接收到網絡返回的結果并將其返回給上層調用者。
??代碼體現??:
核心成員是 KvQueryClient
,一個專門用于和服務端進行 RPC 通信的客戶端。lookup
方法的核心邏輯是調用 client.getValues(...)
,這是一個異步的網絡請求。完全沒有本地文件、緩存、IO 管理相關的邏輯。它不關心數據存儲在哪里,只關心服務端的地址。
// RemoteTableQuery.java
public class RemoteTableQuery implements TableQuery {private final FileStoreTable table;private final KvQueryClient client;private final InternalRowSerializer keySerializer;// ...public RemoteTableQuery(Table table) {this.table = (FileStoreTable) table;ServiceManager manager = this.table.store().newServiceManager();this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);// ...}@Nullable@Overridepublic InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {BinaryRow row;try {row =client.getValues(partition,bucket,new BinaryRow[] {keySerializer.toBinaryRow(key)}).get()[0];// ...}// ... existing code ...
}
優缺點與適用場景對比
特性 | LocalTableQuery (本地緩存模式) | RemoteTableQuery (遠程服務模式) |
---|---|---|
??性能?? | 極高。一旦緩存預熱完成,所有查詢都是本地磁盤/內存操作,無網絡延遲。 | 較高,但有網絡開銷。每次查詢都涉及一次 RPC 網絡往返,延遲相對較高。 |
??資源消耗?? | 高。每個 Flink TaskManager 都需要消耗額外的磁盤空間來存儲緩存文件,并消耗 CPU 和內存來構建和維護索引。 | 低。Flink TaskManager 端非常輕量,只作為客戶端,不消耗額外磁盤,內存和 CPU 占用極少。資源消耗集中在獨立的 Query Service 集群。 |
??部署復雜度?? | 低。無需額外部署,邏輯內嵌在 Flink 作業中。 | 高。需要獨立部署和維護一套 Paimon Query Service 集群,并確保其高可用。 |
??數據一致性?? | 最終一致性。依賴 Flink 作業的刷新間隔(lookup.refresh-interval )來同步數據,存在一定的延遲。 | 強一致性(取決于 Query Service 的實現)。Query Service 可以更及時地感知數據更新,提供更實時的數據視圖。 |
??擴展性?? | 受限于 Flink TaskManager。如果維表數據量巨大,可能會撐爆 TaskManager 的本地磁盤。 | 良好。可以通過水平擴展 Query Service 集群來應對高并發和大數據量的查詢需求,與 Flink 作業本身解耦。 |
??適用場景?? | 1. 維表數據量可控,能夠完全放入 TaskManager 本地磁盤。 2. 對查詢延遲要求極高的場景。 3. 部署架構簡單,不希望引入額外組件。 | 1. 超大規模維表,無法在每個 TaskManager 中緩存全量數據。 2. 多作業共享同一張維表,通過服務化避免重復緩存。 3. 希望將查詢服務與計算任務解耦,獨立管理和擴展。 |
總結
LocalTableQuery
和 RemoteTableQuery
是 Paimon 為滿足不同業務需求而設計的兩種 Lookup Join 方案。
- ??LocalTableQuery?? 是一個性能優先、簡單直接的方案。它用資源換性能,通過在計算節點本地緩存數據,將網絡 IO 轉化為本地 IO,以達到最低的查詢延遲。
- ??RemoteTableQuery?? 是一個架構優先、靈活可擴展的方案。它通過引入獨立的查詢服務,實現了計算與存儲的分離,讓 Flink 作業本身變得輕量,同時為超大規模數據和多租戶共享場景提供了可能。
在實際選擇時,需要根據維表的大小、查詢的 QPS、對延遲的容忍度以及運維復雜度等因素進行綜合權衡。
Query Service?
啟動 Query Service 通常通過 Flink SQL?CALL
?一個系統過程,或者執行一個 Flink Action。
QueryServiceProcedure.java
: 這個類實現了?CALL sys.query_service(...)
?的邏輯。它會獲取 Flink 的?StreamExecutionEnvironment
,然后調用?QueryService.build
。// ... existing code ... public String[] call(ProcedureContext procedureContext, String tableId, Integer parallelism)throws Exception {Table table = catalog.getTable(Identifier.fromString(tableId));StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();QueryService.build(env, table, parallelism);return execute(env, IDENTIFIER); } // ... existing code ...
QueryService.java
: 這個類負責構建 Flink 作業的數據流(DataStream)。它會創建一個?QueryExecutorOperator
。// ... existing code ... public static void build(StreamExecutionEnvironment env, Table table, int parallelism) {// ...QueryExecutorOperator executorOperator = new QueryExecutorOperator(table);DataStreamSink<?> sink =stream.transform("Executor",InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),executorOperator).setParallelism(parallelism);// ... } // ... existing code ...
Query Service 的核心?QueryExecutorOperator
?在做什么?
這正是揭示謎底的關鍵。QueryExecutorOperator
?是一個 Flink 的算子,在它的?open()
?方法中,它做了兩件核心事情:
- 創建?
LocalTableQuery
: 它實例化了一個?LocalTableQuery
,這和普通的 Lookup Join 作業中的本地緩存模式完全一樣。 - 啟動?
KvQueryServer
: 它啟動了一個 RPC 服務器,這個服務器持有上面創建的?LocalTableQuery
?實例,并監聽一個網絡端口,等待客戶端的查詢請求。
// ... existing code ...
public class QueryExecutorOperator extends AbstractStreamOperator<InternalRow>implements OneInputStreamOperator<InternalRow, InternalRow> {
// ...private transient LocalTableQuery tableQuery;private transient KvQueryServer server;
// ...@Overridepublic void open() throws Exception {super.open();// 1. 創建一個 LocalTableQuery 實例,用于本地緩存和查詢this.tableQuery =table.newLocalTableQuery().withIOManager(new IOManagerImpl(ioManager.getSpillingDirectories()));// 2. 啟動一個 RPC 服務,該服務使用上面的 tableQuery 來響應查詢this.server =new KvQueryServer(// ...tableQuery,// ...);server.start();}
// ...
}
當一個?RemoteTableQuery
?客戶端發起請求時,KvQueryServer
?接收到請求,然后調用其內部持有的?tableQuery.lookup(...)
?方法,執行查詢并將結果返回給客戶端。
為什么不直接用?LocalTableQuery
,而要啟動一個服務?
既然底層都是?LocalTableQuery
,為什么還要引入?RemoteTableQuery
?和 Query Service 這種 C/S 架構呢?這正是 Paimon 設計的精妙之處,它解決了?LocalTableQuery
?模式的一些固有痛點:
資源隔離與解耦:
- Local 模式: Lookup 的緩存(磁盤、內存)和計算(CPU)都發生在業務 Flink 作業的 TaskManager 中。如果維表很大,或者更新頻繁,會嚴重搶占業務作業的資源,可能導致作業不穩定。
- Service 模式: 查詢負載被完全剝離到一個獨立的 Flink 作業中。業務作業和查詢服務作業可以獨立部署、獨立配置資源,互不干擾。
緩存共享與資源復用 (最核心的優勢):
- Local 模式: 如果有 10 個不同的 Flink 作業都需要關聯同一張大的維表,那么每個作業都會在自己的 TaskManager 上維護一套完整且獨立的本地緩存。這造成了巨大的資源浪費(10 倍的磁盤空間、10 倍的網絡拉取帶寬、10 倍的索引構建 CPU 消耗)。
- Service 模式: 只需要部署一套?Query Service。所有 10 個業務作業都通過輕量級的?
RemoteTableQuery
?客戶端去請求這一套共享的服務。緩存只存在一份,被所有作業共享,極大地節約了資源。
獨立的生命周期與彈性伸縮:
- Query Service 可以作為基礎設施長期運行,其生命周期與任何一個業務作業都無關。
- 可以根據查詢的 QPS 壓力,獨立地對 Query Service 作業進行擴縮容(調整其?
parallelism
),而無需觸碰任何業務作業。
總結
用一個形象的比喻來總結:
LocalTableQuery
?就像一個內嵌式數據庫引擎(比如 SQLite),它直接在您的應用程序(Flink 作業)內部運行,簡單直接,但資源耦合,無法共享。- Paimon Query Service 則更像是一個獨立的數據庫服務器(比如 MySQL 或 PostgreSQL)。雖然它底層可能也用了相似的存儲和索引技術(即?
LocalTableQuery
),但通過 C/S 架構,它提供了資源隔離、緩存共享和獨立管理的強大能力。
Paimon Query Service 正是通過將?LocalTableQuery
?的能力服務化,從而解決了大規模、多租戶場景下的維表關聯痛點。