Paimon的部分更新以及DeleteVector實現

背景

本文基于 Paimon 0.9
出于對與Paimon內部的DeleteVctor的實現以及部分更新的實現進行的源碼閱讀。
關于 DeleteVector的介紹可以看這里

說明

對于Paimon來說無論是Spark中使用還是Flink使用,后面的邏輯都是一樣的,所以我們以Spark為例來說。所以我們會參考類 org.apache.paimon.spark.SparkSource,
對于Flink可以參考org.apache.paimon.flink.FlinkTableFactory
如沒特別說明,這里都是以主鍵表來進行說明。

paimon的部分字段更新

這里主要的場景更多的是多流或者多批寫同一個表字段的場景,且每個流或批只更新某幾個字段(同樣的主鍵),具體的配置或說明參考Partial Update
這里涉及到的方法為 SparkTableWrite.write,最終會到MergeTreeWriter.write:

 @Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
  • writeBuffer.put 主要是往buffer中寫數據
    這里的writeBufferSortBufferWriteBuffer類實例。
    這里會 主鍵+sequenceNumber+valueKind + value 的形式寫入數據
  • flushWriteBuffer 這里就會涉及到數據落盤以及部分更新的邏輯:
      writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);
    
    • mergeFunction 這里的函數就是會在MergeTreeWriter初始化,也就是會初始化為PartialUpdateMergeFunction
    • 對于forEach的實現會構建一個 MergeIterator,在這里面會調用 PartialUpdateMergeFunction.add方法
      這里就會涉及到部分更新的邏輯,主要就是:把按照 主鍵+sequenceNumber 排序好的數據傳給PartialUpdateMergeFunction
      這樣PartialUpdateMergeFunction只需要判斷前后兩個的數據的主鍵是否一致來進行更新。
      具體的更新邏輯見: Partial Update
      new MergeIterator(awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
      
      這里的buffer.sortedIterator主要看SortBufferWriteBuffer構造方法(也就是為什么會按照主鍵+sequenceNumber排序):
       public SortBufferWriteBuffer(RowType keyType,RowType valueType,@Nullable FieldsComparator userDefinedSeqComparator,MemorySegmentPool memoryPool,boolean spillable,MemorySize maxDiskSize,int sortMaxFan,CompressOptions compression,IOManager ioManager) {...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);sortFields = IntStream.concat(sortFields, udsFields);}// sequence fieldsortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();// row typeList<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());fieldTypes.add(new BigIntType(false));fieldTypes.add(new TinyIntType(false));fieldTypes.addAll(valueType.getFieldTypes());NormalizedKeyComputer normalizedKeyComputer =CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);RecordComparator keyComparator =CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);...InternalRowSerializer serializer =InternalSerializers.create(KeyValue.schema(keyType, valueType));BinaryInMemorySortBuffer inMemorySortBuffer =BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
      
      其中IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())) 就會會把sequenceNumber這個字段帶入到排序中去,
      也就是在buffer.sortedIterato方法中調用。
      如果有定義sequence.field,那這里面的字段也會參與排序,見:udsFields 字段

DeleteVector的實現

關于deleteVector的實現,可以參考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup的機制產生 DeleteVector:

  • 當一個記錄不屬于 level0層的話,就不會產生DelectVector
  • 當一個記錄只屬于需要進行compaction的level的話,就不會產生DeleteVector
  • 當一個記錄只屬于 level0層的話,就要去查詢不包含 Compaction的層的文件數據,從而產生DeleteVector
    注意: deleteVector只支持主鍵表, 是屬于bucket級別的,一個bucket一個DeleteVector。

DeleteVector的寫

按照以上的說法,只有在Compaction的時候,才會產生DeleteVector,所以 我們直接到達 MergeTreeWriter.flushWriteBuffer,這里涉及到DeleteVector的數據流如下:

compactManager.triggerCompaction(forcedFullCompaction)||\/
submitCompaction||\/
MergeTreeCompactTask.doCompact||\/rewrite  ||\/
rewriteImpl ||\/
LookupMergeTreeCompactRewriter.rewrite ||\/
rewriteOrProduceChangelog||\/
createMergeWrapper||\/
iterator.next()||\/
RecordReaderIterator.next()||\/
advanceIfNeeded||\/
currentIterator.next() ||\/
SortMergeIterator.next()||\/
LookupChangelogMergeFunctionWrapper.add(winner)||\/
LookupChangelogMergeFunctionWrapper.getResult()
  • 這里MergeTreeCompactTask.doCompact寫完之后,會有result.setDeletionFile(compactDfSupplier.get())
    compactDfSupplier 這里的源自submitCompaction方法中的compactDfSupplier構造:

     if (dvMaintainer != null) {compactDfSupplier =lazyGenDeletionFile? () -> CompactDeletionFile.lazyGeneration(dvMaintainer): () -> CompactDeletionFile.generateFiles(dvMaintainer);}
    

    而這里的deleteVector的產生來自LookupChangelogMergeFunctionWrapper.getResult(),見以下說明

  • 這里的LookupMergeTreeCompactRewriter.rewriteLookupMergeTreeCompactRewriter實例是在創建MergeTreeWriter

     CompactManager compactManager =createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
    

    這里會調用createRewriter方法創建LookupMergeTreeCompactRewriter實例,
    其中會根據lookupStrategy來創建該實例:

     public LookupStrategy lookupStrategy() {return LookupStrategy.from(mergeEngine().equals(MergeEngine.FIRST_ROW),changelogProducer().equals(ChangelogProducer.LOOKUP),deletionVectorsEnabled(),options.get(FORCE_LOOKUP));
    
  • 這里 currentIterator.next() 是 通過調用currentIterator = SortMergeReaderWithLoserTree.readBatch獲取的,而SortMergeReaderWithLoserTree 是通過readerForMergeTree方法獲取的

  • 這里LookupChangelogMergeFunctionWrapper.getResult()才是重點

     @Overridepublic ChangelogResult getResult() {// 1. Compute the latest high level record and containLevel0 of candidatesLinkedList<KeyValue> candidates = mergeFunction.candidates();Iterator<KeyValue> descending = candidates.descendingIterator();KeyValue highLevel = null;boolean containLevel0 = false;while (descending.hasNext()) {KeyValue kv = descending.next();if (kv.level() > 0) {descending.remove();if (highLevel == null) {highLevel = kv;}} else {containLevel0 = true;}}// 2. Lookup if latest high level record is absentif (highLevel == null) {InternalRow lookupKey = candidates.get(0).key();T lookupResult = lookup.apply(lookupKey);if (lookupResult != null) {if (lookupStrategy.deletionVector) {PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;highLevel = positionedKeyValue.keyValue();deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition());} else {highLevel = (KeyValue) lookupResult;}}}// 3. Calculate resultKeyValue result = calculateResult(candidates, highLevel);// 4. Set changelog when there's level-0 recordsreusedResult.reset();if (containLevel0 && lookupStrategy.produceChangelog) {setChangelog(highLevel, result);}return reusedResult.setResult(result);} 
    
  • 這里主要說明 lookup.apply的方法,其中 lookup的 構造是在createLookupChangelogMergeFunctionWrapper構造中:

       @Overridepublic MergeFunctionWrapper<ChangelogResult> create(MergeFunctionFactory<KeyValue> mfFactory,int outputLevel,LookupLevels<T> lookupLevels,@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {return new LookupChangelogMergeFunctionWrapper<>(mfFactory,key -> {try {return lookupLevels.lookup(key, outputLevel + 1);} catch (IOException e) {throw new UncheckedIOException(e);}},valueEqualiser,changelogRowDeduplicate,lookupStrategy,deletionVectorsMaintainer,userDefinedSeqComparator);}}
    

    這里的lookupLevels.lookup 會最終調用createLookupFile 方法構造LookupFile 實例,
    其中會調用 valueProcessor.persistToDisk(kv, batch.returnedPosition()方法,持久化 行號到對應的文件,
    這樣就能獲取到對應的行號。

  • 獲取到對應的結果 lookupResult 后
    調用 deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()方法去構造
    DeletionVector.
    上面提到的result.setDeletionFile(compactDfSupplier.get())會調用 CompactDeletionFile.generateFiles(dvMaintainer) 方法
    從而調用maintainer.writeDeletionVectorsIndex方法,從而寫如到DeleteVector文件中。

DeleteVector的讀

DeleteVector的讀取主要在以下方法中構造:PrimaryKeyFileStoreTable.newRead:
最終會調用RawFileSplitRead.createReader從而調用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)方法構造ApplyDeletionVectorReader實例:

 public RecordIterator<InternalRow> readBatch() throws IOException {RecordIterator<InternalRow> batch = reader.readBatch();if (batch == null) {return null;}checkArgument(batch instanceof FileRecordIterator,"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");return new ApplyDeletionFileRecordIterator((FileRecordIterator<InternalRow>) batch, deletionVector);}

該處的readBatch方法會構造一個ApplyDeletionFileRecordIterator迭代器,可見在next()方法會對每一個記錄調用deletionVector.isDeleted是否刪除的判斷.

 @Overridepublic InternalRow next() throws IOException {while (true) {InternalRow next = iterator.next();if (next == null) {return null;}if (!deletionVector.isDeleted(returnedPosition())) {return next;}}}

FAQ

寫入文件的時候,怎么記錄行號和主鍵的關系?

這里不會寫入的時候記錄行號,會在調用createLookupFile 在構建 LookupFile這個文件的時候(初始化),從parquet文件讀取過來的時候,就會獲取行號。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/90130.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/90130.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/90130.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis 的事務機制是怎樣的?

Redis 的事務機制 Redis支持事務機制,其主要目的是確保多個命令執行的原子性,即這些命令會作為一個不可分割的操作單元執行。 需要注意的是,Redis事務不支持回滾操作。從Redis 2.6.5版本開始,服務器會在命令累積階段檢測錯誤。在執行EXEC命令時,若發現錯誤則會拒絕執行事…

網安學習NO.17

1. VPN 概述定義&#xff1a;在公用網絡&#xff08;如 Internet、幀中繼、ATM 等&#xff09;中&#xff0c;通過技術手段虛擬出的一條企業內部專線&#xff0c;能像私有網絡一樣提供安全性、可靠性和可管理性。核心特征&#xff1a;利用公共網絡構建&#xff0c;具備 “虛擬性…

MCU芯片AS32S601在衛星光纖放大器(EDFA)中的應用探索

摘要&#xff1a;本文聚焦于國科安芯推出的AS32S601型MCU芯片在衛星光纖放大器&#xff08;EDFA&#xff09;中的潛在應用&#xff0c;探討其技術特性、抗輻射性能及適用性。通過分析其在單粒子效應脈沖激光試驗中的表現&#xff0c;結合EDFA系統對控制芯片的要求&#xff0c;評…

Hexo - 免費搭建個人博客02 - 創建個人博客

導言我的博客&#xff1a;https://q164129345.github.io/ 開始一步一步地完成博客的創建。 一、初始化Hexo博客以上所示&#xff0c;運行以下指令在myCode文件夾里初始化一個hexo博客。 hexo init myblog二、安裝依賴如上所示&#xff0c;完成依賴的安裝。 cd myblog npm insta…

單片機-----基礎知識整合

一、基礎知識1&#xff09;單片機的組成&#xff1a;中央處理器CPU、隨機存儲器RAM、只讀存儲器ROM、定時器、多種I/O接口、中斷系統等2&#xff09;STM32U575RIT6采用ARM Cortex-M33內核架構ARM是什么&#xff1f;①ARM是一家公司&#xff0c;ARM公司是一家芯片知識產權&#…

雙流join 、 Paimon Partial Update 和 動態schema

背景 Paimon 通過其獨特的 partial-update 合并引擎和底層的 LSM 存儲結構&#xff0c;巧妙地將傳統雙流 Join 中對 Flink State 的高頻隨機讀/寫&#xff0c;轉換為了對 Paimon 表的順序寫和后臺的高效合并&#xff0c;從而一站式地解決了 Flink 作業狀態過大、依賴外部 KV 系…

7.3.1 進程調度機制那些事兒

一&#xff1a;task_struct結構體分析 1、進程有兩種特殊形式&#xff1a;沒有用戶虛擬地址空間的進程叫內核線程&#xff0c;共享用戶虛擬地址空間的進程叫作用戶線程。共享同一個用戶虛擬地址空間的所有用戶線程叫線程組。 C語言標準庫進程 Linux內核進程 …

基于多種機器學習的水質污染及安全預測分析系統的設計與實現【隨機森林、XGBoost、LightGBM、SMOTE、貝葉斯優化】

文章目錄有需要本項目的代碼或文檔以及全部資源&#xff0c;或者部署調試可以私信博主項目介紹總結每文一語有需要本項目的代碼或文檔以及全部資源&#xff0c;或者部署調試可以私信博主 項目介紹 隨著工業化和城市化的不斷推進&#xff0c;水質污染問題逐漸成為影響生態環境…

Linux第三天Linux基礎命令(二)

1.grep命令可以通過grep命令&#xff0c;從文件中通過關鍵字過濾文件行。grep [-n] 關鍵字 文件路徑選項-n&#xff0c;可選&#xff0c;表示在結果中顯示匹配的行的行號。參數&#xff0c;關鍵字&#xff0c;必填&#xff0c;表示過濾的關鍵字&#xff0c;帶有空格或其它特殊符…

Linux Debian操作系統、Deepin深度操作系統手動分區方案參考

以下是Linux Debian操作系統、Deepin深度操作系統安裝過程中手動分區的建議&#xff0c;按UEFI、swap、boot、根分區、home分區劃分&#xff0c;以下是詳細的分區配置參考建議&#xff1a; 一、手動分區方案&#xff08;UEFI模式&#xff09;分區名稱分區類型大小建議掛載點文件…

jmeter如何做自動化接口測試?

全網最全流程&#xff01;JmeterAntAllureJenkins搭建屬于你的接口自動化流水線&#xff0c;CI/CD直接起飛&#xff01;1.什么是jmeter&#xff1f; JMeter是100%完全由Java語言編寫的&#xff0c;免費的開源軟件&#xff0c;是非常優秀的性能測試和接口測試工具&#xff0c;支…

MyBatis整合SpringBoot終極指南

以下是一份系統化的 ?MyBatis 整合 Spring Boot 學習筆記&#xff0c;結合官方文檔與最佳實踐整理&#xff0c;涵蓋配置、核心功能、實戰示例及常見問題解決。 一、整合基礎與依賴配置 1. ?核心依賴? 在 pom.xml 中添加&#xff1a; <dependency><groupId>or…

企業微信ipad協議接口解決方案最新功能概覽

支持最新版本企業微信&#xff0c;安全穩定0封號免費試用&#xff0c;技術支持&#xff1a;string wechat"Mrzhu0107"企微ipad協議接口最新功能升級如下&#xff1a;【初始化】初始化企業微信&#xff0c;設置消息回調地址&#xff0c;獲取運行中的實例&#xff0c;根…

ansible 批量 scp 和 load 鏡像

1、save 鏡像腳本 在本地保存鏡像到 ansible 代碼目錄的腳本。 1.1、使用說明: 保存單個鏡像 save -i gcr.io/cadvisor/cadvisor:v0.52.1保存某個 namespace 下的所有鏡像 save1.2、腳本內容 cat /usr/local/bin/save #!/bin/bash #set -e # 分隔符 str="-"# …

【C# in .NET】20. 探秘靜態類:抽象與密封的結合體

探秘靜態類:抽象與密封的結合體 一、靜態類的底層本質:抽象與密封的結合體 靜態類作為 C# 中特殊的類型形式,其底層實現融合了抽象類與密封類的特性,形成了不可實例化、不可繼承的類型約束。 1. IL 層面的靜態類標識 定義一個簡單的靜態類: public static class Stri…

【Vue3】ECharts圖表案例

官方參考&#xff1a;Examples - Apache ECharts 1、創建工程 npm create vitelatest 或 npm init vuelatest 設置如下 2、下載依賴集運行項目 cd vue-echarts-demo npm install npm install echarts npm run dev 3、編寫核心代碼 創建src\components\BarView.vue文件…

二分查找----2.搜索二維矩陣

題目鏈接 /** 方案一: 每行都是遞增的,對每行進行二分,逐行查找;效率不高,每次搜索只能控制列無法兼顧到行,行被固定存在不必要的搜索 方案二: 從右上或左下頂點出發,以右上為例,向左迭代列減小,向下迭代行增大;效率更高避免重復搜索 */ class Solution {/**方案一: 每行都是…

2025.7.23

flen&#xff08;&#xff09;這個函數計算到的文件大小為0&#xff0c;明天解決 原因是路徑錯誤&#xff0c;寫成了CONFIG_ROOT_PATH"/music/test2.mp3,但是也沒報錯&#xff0c;打開文件也成功&#xff0c;所以就沒有懷疑到路徑方面來

大致自定義文件I/O庫函數的實現詳解(了解即可)

目錄 一、mystdio.h 代碼思路分析 二、mystdio.c 1. 輔助函數 BuyFile 2. 文件打開函數 MyFopen 3. 文件關閉函數 MyFclose 4. 數據寫入函數 MyFwrite 1、memcpy(file->outbuffer file->bufferlen, str, len); 2、按位與&#xff08;&&#xff09;運算的作…

Zipformer

Zipformer首先&#xff0c;Conv-Embed 將輸入的 100Hz 的聲學特征下采樣為 50 Hz 的特征序列&#xff1b;然后&#xff0c;由 6 個連續的 encoder stack 分別在 50Hz、25Hz、12.5Hz、6.25Hz、12.5Hz 和 25Hz 的采樣率下進行時域建模。除了第一個 stack 外&#xff0c;其他的 st…