背景
本文基于 Spark 3.5.0
寫本篇文章的目的是在于能夠配合spark.sql.maxConcurrentOutputFileWriters
參數來加速寫parquet文件的速度,為此研究一下Spark寫parquet的時候會占用內存的大小,便于配置spark.sql.maxConcurrentOutputFileWriters
的值,從而保證任務的穩定性
結論
一個spark parquet writer可能會占用128MB的內存(也就是parquet.block.size的大小)。 所有在調整spark.sql.maxConcurrentOutputFileWriters
的時候得注意不能調整過大,否則會導致OOM,但是如果在最后寫文件的時候加入合并小文件的功能(AQE+Rebalance的方式),也可以適當的調整大一點,因為這個時候的Task 不像沒有shuffle一樣,可能還會涉及到sort以及aggregate等消耗內存的操作,(這個時候就是一個task純寫parquet文件)
大家也可以參考Parquet文件是怎么被寫入的-Row Groups,Pages,需要的內存,以及flush操作
分析
還是得從InsertIntoHadoopFsRelationCommand
類中說起,涉及到寫parquet的數據流如下:
InsertIntoHadoopFsRelationCommand.run||\/
FileFormatWriter.write||\/
fileFormat.prepareWrite||\/
executeWrite => planForWrites.executeWrite ||\/WriteFilesExec.doExecuteWrite||\/FileFormatWriter.executeTask||\/dataWriter.writeWithIterator||\/dataWriter.writeWithMetrics||\/DynamicPartitionDataConcurrentWriter.write||\/writeRecord||\/ParquetOutputWriter.write||\/recordWriter.write
-
其中
fileFormat.prepareWrite
涉及到 spark這一層級有關parquet的設置,并返回一個生成ParquetOutputWriter
實例的工廠類實例OutputWriterFactory
主要設置如parquet.compression
壓縮格式,一般是 zstd ,也可以通過spark.sql.parquet.compression.codec
設置
parquet.write.support.class
為ParquetWriteSupport
,該類的作用為Spark把內部IternalRow轉為parquet message -
DynamicPartitionDataConcurrentWriter.write
涉及到了InternalRow
到UnsafeRow
代碼生成
這里不討論這部分的細節,只說一下getPartitionValues
和renewCurrentWriter 方法中的 getPartitionPath
這兩部分-
getPartitionValues
這個是InternalRow => UnsafeRow
轉換,為什么這么做,是因為對于UnsafeRow這種數據結構來說,能夠很好管理內存和避免GC問題val proj = UnsafeProjection.create(description.partitionColumns, description.allColumns)row => proj(row)
我們以UnsafeProjection的子類InterpretedUnsafeProjection,該類不是代碼生成的類(這樣便于分析),
override def apply(row: InternalRow): UnsafeRow = {if (subExprEliminationEnabled) {runtime.setInput(row)}// Put the expression results in the intermediate row.var i = 0while (i < numFields) {values(i) = exprs(i).eval(row)i += 1}// Write the intermediate row to an unsafe row.rowWriter.reset()writer(intermediate)rowWriter.getRow()}
- 首先是消除公共子表達式
- 用values數組保存每個表達式計算出來的結果
rowWriter.reset()
用來對齊cursor,便于對于String類型的寫入,這可以參考UnsafeRow內存布局和代碼優化unsafeWriter
按照不同的類型寫入到unsaferow不同的位置上去,這里的offset在cursor的內部的,也就是說cursor的值要大于offset- 返回UnsafeRow類型
通過這種方式完成了InternalRow => UnsafeRow
轉換
-
getPartitionPath
這個是通過表達式的方式獲取partition的函數,從而完成InternalRow => String的轉換,涉及的代碼如下:private lazy val partitionPathExpression: Expression = Concat(description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>val partitionName = ScalaUDF(ExternalCatalogUtils.getPartitionPathString _,StringType,Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)})private lazy val getPartitionPath: InternalRow => String = {val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns)row => proj(row).getString(0)}
UnsafeProjection.create 上面已經說了怎么實現的了,重點說
partitionPathExpression
生成partition的表達式,
該表達式主要通過UDF中getPartitionPathString
來生成,關鍵的一點是,傳入的參數:Literal(c.name)和Cast(c, StringType, Option(description.timeZoneId))))
這里的Literal(c.name)
表示的是partition名字的常量
Cast(c, StringType, Option(description.timeZoneId)))
表示的是c這個變量所代表的值,
為什么這么說,因為在ScalaUDF的內部計算方法中有:override def eval(input: InternalRow): Any = {val result = try {f(input)} catch {case e: Exception =>throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(functionName, inputTypesString, outputType, e)}resultConverter(result)}
這里的
f
中會對傳入的每個參數都會調用eval(InernalRow)
,對于Literal來說就是常亮,而對于Cast(Attribute)
來說就是屬性的值(通過BindReferences.bindReference
方法)。
-
-
recordWriter.write
涉及到ParquetOutputFormat.getRecordWriter
方法,該方法中涉及到parquet
中的一些原生參數設置:
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)throws IOException, InterruptedException {final WriteSupport<T> writeSupport = getWriteSupport(conf);ParquetProperties.Builder propsBuilder = ParquetProperties.builder().withPageSize(getPageSize(conf)).withDictionaryPageSize(getDictionaryPageSize(conf)).withDictionaryEncoding(getEnableDictionary(conf)).withWriterVersion(getWriterVersion(conf)).estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)).withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)).withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)).withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)).withStatisticsTruncateLength(getStatisticsTruncateLength(conf)).withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)).withBloomFilterEnabled(getBloomFilterEnabled(conf)).withPageRowCountLimit(getPageRowCountLimit(conf)).withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));new ColumnConfigParser().withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding).withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false),propsBuilder::withBloomFilterEnabled).withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV).withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),propsBuilder::withBloomFilterFPP).parseConfig(conf);ParquetProperties props = propsBuilder.build();long blockSize = getLongBlockSize(conf);int maxPaddingSize = getMaxPaddingSize(conf);boolean validating = getValidation(conf);...WriteContext fileWriteContext = writeSupport.init(conf);FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);w.start();...return new ParquetRecordWriter<T>(w,writeSupport,fileWriteContext.getSchema(),fileWriteContext.getExtraMetaData(),blockSize,codec,validating,props,memoryManager,conf);}
這里涉及到的關鍵的幾個參數是:
parquet.page.size 1*1024*1024 -- page的大小 默認是 1MBparquet.block.size 128*1024*1024 -- rowgroup的大小 默認是 128MBparquet.page.size.row.check.min 100 -- page檢查是否達到page size的最小行數parquet.page.size.row.check.max 10000 -- page檢查是否達到page size的最大行數parquet.page.row.count.limit 20_000 -- page檢查是否達到page size的行數極限行數
parquet.page.size.row.check.min parquet.page.size.row.check.max parquet.page.row.count.limit
這三個配置項存在著相互制約的關系,總的目標就是檢查當行數達到了一定的閾值以后,來檢查是否能夠flush到內存page中,具體的可以查看ColumnWriteStoreBase
類中的方法
接下來就是真正寫操作了,調用的是InternalParquetRecordWriter.write
方法,如下:
private void initStore() {ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(),fileEncryptor, rowGroupOrdinal);pageStore = columnChunkPageWriteStore;bloomFilterWriteStore = columnChunkPageWriteStore;columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);this.recordConsumer = columnIO.getRecordWriter(columnStore);writeSupport.prepareForWrite(recordConsumer);}public void write(T value) throws IOException, InterruptedException {writeSupport.write(value);++ recordCount;checkBlockSizeReached();}
initStore
主要是初始化 pageStore
和columnStore
具體的spark interalRow怎么轉換為parquet message,主要在writeSupport.write
中的rootFieldWriters
中
接下來就是checkBlockSizeReached
,這里主要就是flush rowgroup到磁盤了,
具體的讀者可以看代碼:
對于flush到page可以看checkBlockSizeReached中columnStore.flush()
對于flush rowroup到磁盤可以看checkBlockSizeReached中pageStore.flushToFileWriter(parquetFileWriter)
總結出來就是 一個spark parquet writer可能會占用128MB的內存(也就是parquet.block.size的大小),
因為只有在滿足了rowgroup的大小以后,才會真正的flush到磁盤。