背景
本文基于 Paimon 0.9
出于對與Paimon內部的DeleteVctor的實現以及部分更新的實現進行的源碼閱讀。
關于 DeleteVector的介紹可以看這里
說明
對于Paimon來說無論是Spark中使用還是Flink使用,后面的邏輯都是一樣的,所以我們以Spark為例來說。所以我們會參考類 org.apache.paimon.spark.SparkSource
,
對于Flink可以參考org.apache.paimon.flink.FlinkTableFactory
如沒特別說明,這里都是以主鍵表來進行說明。
paimon的部分字段更新
這里主要的場景更多的是多流或者多批寫同一個表字段的場景,且每個流或批只更新某幾個字段(同樣的主鍵),具體的配置或說明參考Partial Update
這里涉及到的方法為 SparkTableWrite.write
,最終會到MergeTreeWriter.write
:
@Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
writeBuffer.put
主要是往buffer中寫數據
這里的writeBuffer
是SortBufferWriteBuffer
類實例。
這里會 主鍵+sequenceNumber+valueKind + value 的形式寫入數據flushWriteBuffer
這里就會涉及到數據落盤以及部分更新的邏輯:writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);
- mergeFunction 這里的函數就是會在
MergeTreeWriter
初始化,也就是會初始化為PartialUpdateMergeFunction
。 - 對于
forEach
的實現會構建一個MergeIterator
,在這里面會調用PartialUpdateMergeFunction.add
方法
這里就會涉及到部分更新的邏輯,主要就是:把按照主鍵+sequenceNumber
排序好的數據傳給PartialUpdateMergeFunction
,
這樣PartialUpdateMergeFunction
只需要判斷前后兩個的數據的主鍵是否一致來進行更新。
具體的更新邏輯見: Partial Update
這里的new MergeIterator(awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
buffer.sortedIterator
主要看SortBufferWriteBuffer
構造方法(也就是為什么會按照主鍵+sequenceNumber
排序):
其中public SortBufferWriteBuffer(RowType keyType,RowType valueType,@Nullable FieldsComparator userDefinedSeqComparator,MemorySegmentPool memoryPool,boolean spillable,MemorySize maxDiskSize,int sortMaxFan,CompressOptions compression,IOManager ioManager) {...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);sortFields = IntStream.concat(sortFields, udsFields);}// sequence fieldsortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();// row typeList<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());fieldTypes.add(new BigIntType(false));fieldTypes.add(new TinyIntType(false));fieldTypes.addAll(valueType.getFieldTypes());NormalizedKeyComputer normalizedKeyComputer =CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);RecordComparator keyComparator =CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);...InternalRowSerializer serializer =InternalSerializers.create(KeyValue.schema(keyType, valueType));BinaryInMemorySortBuffer inMemorySortBuffer =BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()))
就會會把sequenceNumber
這個字段帶入到排序中去,
也就是在buffer.sortedIterato
方法中調用。
如果有定義sequence.field
,那這里面的字段也會參與排序,見:udsFields
字段
- mergeFunction 這里的函數就是會在
DeleteVector的實現
關于deleteVector的實現,可以參考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup
的機制產生 DeleteVector:
- 當一個記錄不屬于 level0層的話,就不會產生DelectVector
- 當一個記錄只屬于需要進行compaction的level的話,就不會產生DeleteVector
- 當一個記錄只屬于 level0層的話,就要去查詢不包含 Compaction的層的文件數據,從而產生DeleteVector
注意: deleteVector只支持主鍵表, 是屬于bucket級別的,一個bucket一個DeleteVector。
DeleteVector的寫
按照以上的說法,只有在Compaction的時候,才會產生DeleteVector,所以 我們直接到達 MergeTreeWriter.flushWriteBuffer
,這里涉及到DeleteVector的數據流如下:
compactManager.triggerCompaction(forcedFullCompaction)||\/
submitCompaction||\/
MergeTreeCompactTask.doCompact||\/rewrite ||\/
rewriteImpl ||\/
LookupMergeTreeCompactRewriter.rewrite ||\/
rewriteOrProduceChangelog||\/
createMergeWrapper||\/
iterator.next()||\/
RecordReaderIterator.next()||\/
advanceIfNeeded||\/
currentIterator.next() ||\/
SortMergeIterator.next()||\/
LookupChangelogMergeFunctionWrapper.add(winner)||\/
LookupChangelogMergeFunctionWrapper.getResult()
-
這里
MergeTreeCompactTask.doCompact
寫完之后,會有result.setDeletionFile(compactDfSupplier.get())
compactDfSupplier
這里的源自submitCompaction
方法中的compactDfSupplier
構造:if (dvMaintainer != null) {compactDfSupplier =lazyGenDeletionFile? () -> CompactDeletionFile.lazyGeneration(dvMaintainer): () -> CompactDeletionFile.generateFiles(dvMaintainer);}
而這里的deleteVector的產生來自
LookupChangelogMergeFunctionWrapper.getResult()
,見以下說明 -
這里的
LookupMergeTreeCompactRewriter.rewrite
的LookupMergeTreeCompactRewriter
實例是在創建MergeTreeWriter
的CompactManager compactManager =createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
這里會調用
createRewriter
方法創建LookupMergeTreeCompactRewriter
實例,
其中會根據lookupStrategy
來創建該實例:public LookupStrategy lookupStrategy() {return LookupStrategy.from(mergeEngine().equals(MergeEngine.FIRST_ROW),changelogProducer().equals(ChangelogProducer.LOOKUP),deletionVectorsEnabled(),options.get(FORCE_LOOKUP));
-
這里
currentIterator.next()
是 通過調用currentIterator = SortMergeReaderWithLoserTree.readBatch
獲取的,而SortMergeReaderWithLoserTree
是通過readerForMergeTree
方法獲取的 -
這里
LookupChangelogMergeFunctionWrapper.getResult()
才是重點@Overridepublic ChangelogResult getResult() {// 1. Compute the latest high level record and containLevel0 of candidatesLinkedList<KeyValue> candidates = mergeFunction.candidates();Iterator<KeyValue> descending = candidates.descendingIterator();KeyValue highLevel = null;boolean containLevel0 = false;while (descending.hasNext()) {KeyValue kv = descending.next();if (kv.level() > 0) {descending.remove();if (highLevel == null) {highLevel = kv;}} else {containLevel0 = true;}}// 2. Lookup if latest high level record is absentif (highLevel == null) {InternalRow lookupKey = candidates.get(0).key();T lookupResult = lookup.apply(lookupKey);if (lookupResult != null) {if (lookupStrategy.deletionVector) {PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;highLevel = positionedKeyValue.keyValue();deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition());} else {highLevel = (KeyValue) lookupResult;}}}// 3. Calculate resultKeyValue result = calculateResult(candidates, highLevel);// 4. Set changelog when there's level-0 recordsreusedResult.reset();if (containLevel0 && lookupStrategy.produceChangelog) {setChangelog(highLevel, result);}return reusedResult.setResult(result);}
-
這里主要說明
lookup.apply
的方法,其中 lookup的 構造是在create
中LookupChangelogMergeFunctionWrapper
構造中:@Overridepublic MergeFunctionWrapper<ChangelogResult> create(MergeFunctionFactory<KeyValue> mfFactory,int outputLevel,LookupLevels<T> lookupLevels,@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {return new LookupChangelogMergeFunctionWrapper<>(mfFactory,key -> {try {return lookupLevels.lookup(key, outputLevel + 1);} catch (IOException e) {throw new UncheckedIOException(e);}},valueEqualiser,changelogRowDeduplicate,lookupStrategy,deletionVectorsMaintainer,userDefinedSeqComparator);}}
這里的
lookupLevels.lookup
會最終調用createLookupFile
方法構造LookupFile
實例,
其中會調用valueProcessor.persistToDisk(kv, batch.returnedPosition()
方法,持久化 行號到對應的文件,
這樣就能獲取到對應的行號。 -
獲取到對應的結果 lookupResult 后
調用deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()
方法去構造
DeletionVector.
上面提到的result.setDeletionFile(compactDfSupplier.get())
會調用CompactDeletionFile.generateFiles(dvMaintainer)
方法
從而調用maintainer.writeDeletionVectorsIndex
方法,從而寫如到DeleteVector文件中。
DeleteVector的讀
DeleteVector的讀取主要在以下方法中構造:PrimaryKeyFileStoreTable.newRead
:
最終會調用RawFileSplitRead.createReader
從而調用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)
方法構造ApplyDeletionVectorReader實例:
public RecordIterator<InternalRow> readBatch() throws IOException {RecordIterator<InternalRow> batch = reader.readBatch();if (batch == null) {return null;}checkArgument(batch instanceof FileRecordIterator,"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");return new ApplyDeletionFileRecordIterator((FileRecordIterator<InternalRow>) batch, deletionVector);}
該處的readBatch
方法會構造一個ApplyDeletionFileRecordIterator
迭代器,可見在next()
方法會對每一個記錄調用deletionVector.isDeleted
是否刪除的判斷.
@Overridepublic InternalRow next() throws IOException {while (true) {InternalRow next = iterator.next();if (next == null) {return null;}if (!deletionVector.isDeleted(returnedPosition())) {return next;}}}
FAQ
寫入文件的時候,怎么記錄行號和主鍵的關系?
這里不會寫入的時候記錄行號,會在調用createLookupFile
在構建 LookupFile這個文件的時候(初始化),從parquet文件讀取過來的時候,就會獲取行號。