Spark中寫parquet文件是怎么實現的

背景

本文基于 Spark 3.5.0
寫本篇文章的目的是在于能夠配合spark.sql.maxConcurrentOutputFileWriters參數來加速寫parquet文件的速度,為此研究一下Spark寫parquet的時候會占用內存的大小,便于配置spark.sql.maxConcurrentOutputFileWriters的值,從而保證任務的穩定性

結論

一個spark parquet writer可能會占用128MB的內存(也就是parquet.block.size的大小)。 所有在調整spark.sql.maxConcurrentOutputFileWriters的時候得注意不能調整過大,否則會導致OOM,但是如果在最后寫文件的時候加入合并小文件的功能(AQE+Rebalance的方式),也可以適當的調整大一點,因為這個時候的Task 不像沒有shuffle一樣,可能還會涉及到sort以及aggregate等消耗內存的操作,(這個時候就是一個task純寫parquet文件)
大家也可以參考Parquet文件是怎么被寫入的-Row Groups,Pages,需要的內存,以及flush操作

分析

還是得從InsertIntoHadoopFsRelationCommand類中說起,涉及到寫parquet的數據流如下:

InsertIntoHadoopFsRelationCommand.run||\/
FileFormatWriter.write||\/
fileFormat.prepareWrite||\/
executeWrite => planForWrites.executeWrite ||\/WriteFilesExec.doExecuteWrite||\/FileFormatWriter.executeTask||\/dataWriter.writeWithIterator||\/dataWriter.writeWithMetrics||\/DynamicPartitionDataConcurrentWriter.write||\/writeRecord||\/ParquetOutputWriter.write||\/recordWriter.write
  • 其中fileFormat.prepareWrite 涉及到 spark這一層級有關parquet的設置,并返回一個生成ParquetOutputWriter實例的工廠類實例OutputWriterFactory
    主要設置如 parquet.compression 壓縮格式,一般是 zstd ,也可以通過 spark.sql.parquet.compression.codec設置
    parquet.write.support.classParquetWriteSupport,該類的作用為Spark把內部IternalRow轉為parquet message

  • DynamicPartitionDataConcurrentWriter.write 涉及到了InternalRowUnsafeRow代碼生成
    這里不討論這部分的細節,只說一下getPartitionValuesrenewCurrentWriter 方法中的 getPartitionPath這兩部分

    • getPartitionValues
      這個是InternalRow => UnsafeRow轉換,為什么這么做,是因為對于UnsafeRow這種數據結構來說,能夠很好管理內存和避免GC問題

          val proj = UnsafeProjection.create(description.partitionColumns, description.allColumns)row => proj(row)
      

      我們以UnsafeProjection的子類InterpretedUnsafeProjection,該類不是代碼生成的類(這樣便于分析),

        override def apply(row: InternalRow): UnsafeRow = {if (subExprEliminationEnabled) {runtime.setInput(row)}// Put the expression results in the intermediate row.var i = 0while (i < numFields) {values(i) = exprs(i).eval(row)i += 1}// Write the intermediate row to an unsafe row.rowWriter.reset()writer(intermediate)rowWriter.getRow()}
      
      • 首先是消除公共子表達式
      • 用values數組保存每個表達式計算出來的結果
      • rowWriter.reset() 用來對齊cursor,便于對于String類型的寫入,這可以參考UnsafeRow內存布局和代碼優化
      • unsafeWriter按照不同的類型寫入到unsaferow不同的位置上去,這里的offset在cursor的內部的,也就是說cursor的值要大于offset
      • 返回UnsafeRow類型
        通過這種方式完成了InternalRow => UnsafeRow轉換
    • getPartitionPath
      這個是通過表達式的方式獲取partition的函數,從而完成InternalRow => String的轉換,涉及的代碼如下:

        private lazy val partitionPathExpression: Expression = Concat(description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>val partitionName = ScalaUDF(ExternalCatalogUtils.getPartitionPathString _,StringType,Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)})private lazy val getPartitionPath: InternalRow => String = {val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns)row => proj(row).getString(0)}
      

      UnsafeProjection.create 上面已經說了怎么實現的了,重點說partitionPathExpression 生成partition的表達式,
      該表達式主要通過UDF中getPartitionPathString來生成,關鍵的一點是,傳入的參數:Literal(c.name)和Cast(c, StringType, Option(description.timeZoneId))))
      這里的Literal(c.name)表示的是partition名字的常量
      Cast(c, StringType, Option(description.timeZoneId)))表示的是c這個變量所代表的值,
      為什么這么說,因為在ScalaUDF的內部計算方法中有:

        override def eval(input: InternalRow): Any = {val result = try {f(input)} catch {case e: Exception =>throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(functionName, inputTypesString, outputType, e)}resultConverter(result)}

      這里的f中會對傳入的每個參數都會調用eval(InernalRow),對于Literal來說就是常亮,而對于Cast(Attribute)來說就是屬性的值(通過BindReferences.bindReference方法)。

  • recordWriter.write涉及到 ParquetOutputFormat.getRecordWriter方法,該方法中涉及到parquet中的一些原生參數設置:

public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)throws IOException, InterruptedException {final WriteSupport<T> writeSupport = getWriteSupport(conf);ParquetProperties.Builder propsBuilder = ParquetProperties.builder().withPageSize(getPageSize(conf)).withDictionaryPageSize(getDictionaryPageSize(conf)).withDictionaryEncoding(getEnableDictionary(conf)).withWriterVersion(getWriterVersion(conf)).estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)).withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)).withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)).withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)).withStatisticsTruncateLength(getStatisticsTruncateLength(conf)).withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)).withBloomFilterEnabled(getBloomFilterEnabled(conf)).withPageRowCountLimit(getPageRowCountLimit(conf)).withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));new ColumnConfigParser().withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding).withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false),propsBuilder::withBloomFilterEnabled).withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV).withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),propsBuilder::withBloomFilterFPP).parseConfig(conf);ParquetProperties props = propsBuilder.build();long blockSize = getLongBlockSize(conf);int maxPaddingSize = getMaxPaddingSize(conf);boolean validating = getValidation(conf);...WriteContext fileWriteContext = writeSupport.init(conf);FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);w.start();...return new ParquetRecordWriter<T>(w,writeSupport,fileWriteContext.getSchema(),fileWriteContext.getExtraMetaData(),blockSize,codec,validating,props,memoryManager,conf);}

這里涉及到的關鍵的幾個參數是:

   parquet.page.size                   1*1024*1024         -- page的大小 默認是 1MBparquet.block.size                  128*1024*1024       -- rowgroup的大小 默認是 128MBparquet.page.size.row.check.min     100                 -- page檢查是否達到page size的最小行數parquet.page.size.row.check.max     10000               -- page檢查是否達到page size的最大行數parquet.page.row.count.limit        20_000              -- page檢查是否達到page size的行數極限行數

parquet.page.size.row.check.min parquet.page.size.row.check.max parquet.page.row.count.limit 這三個配置項存在著相互制約的關系,總的目標就是檢查當行數達到了一定的閾值以后,來檢查是否能夠flush到內存page中,具體的可以查看ColumnWriteStoreBase類中的方法

接下來就是真正寫操作了,調用的是InternalParquetRecordWriter.write方法,如下:

 private void initStore() {ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(),fileEncryptor, rowGroupOrdinal);pageStore = columnChunkPageWriteStore;bloomFilterWriteStore = columnChunkPageWriteStore;columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);this.recordConsumer = columnIO.getRecordWriter(columnStore);writeSupport.prepareForWrite(recordConsumer);}public void write(T value) throws IOException, InterruptedException {writeSupport.write(value);++ recordCount;checkBlockSizeReached();}

initStore主要是初始化 pageStorecolumnStore
具體的spark interalRow怎么轉換為parquet message,主要在writeSupport.write中的rootFieldWriters
接下來就是checkBlockSizeReached,這里主要就是flush rowgroup到磁盤了,
具體的讀者可以看代碼:
對于flush到page可以看checkBlockSizeReached中columnStore.flush()
對于flush rowroup到磁盤可以看checkBlockSizeReached中pageStore.flushToFileWriter(parquetFileWriter)
總結出來就是 一個spark parquet writer可能會占用128MB的內存(也就是parquet.block.size的大小),
因為只有在滿足了rowgroup的大小以后,才會真正的flush到磁盤。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696350.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696350.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696350.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Javascript怎么輸出內容?兩種常見方式以及控制臺介紹

javascript是一種非常重要的編程語言&#xff0c;在許多網頁中它被廣泛使用&#xff0c;可以實現許多交互效果和動態效果。輸出是javascript中最基本的操作之一&#xff0c;下面將介紹兩種常見的輸出方式。 一、使用console.log()函數輸出 console.log()函數是常用的輸出函數…

Jmeter實現階梯式線程增加的壓測

安裝相應jmeter 插件 1&#xff1a;安裝jmeter 管理插件&#xff1a; 下載地址&#xff1a;https://jmeter-plugins.org/install/Install/&#xff0c;將下載下來的jar包放到jmeter文件夾下的lib/ext路徑下&#xff0c;然后重啟jmeter。 2&#xff1a;接著打開 選項-Plugins Ma…

在Linux上安裝Docker: 一站式指南

Docker 是一款強大的容器化平臺&#xff0c;為開發者提供了一種輕松打包、發布和運行應用的方式。在本文中&#xff0c;我們將探討如何在Linux操作系統上安裝Docker&#xff0c;為你提供一站式指南。 步驟1: 卸載舊版本 在安裝新版Docker之前&#xff0c;建議先卸載舊版本&am…

三十年一個大輪回!日股突破“泡沫時期”歷史高點

2月22日周四&#xff0c;英偉達四季報業績超預期&#xff0c;而且本季度業績指引非常樂觀&#xff0c;提振美股股指期貨并成為芯片股和AI概念股情緒的重要催化劑。今日亞洲芯片股和AI股起飛&#xff0c;日本在芯片股的帶動下突破1989年泡沫時期以來的歷史最高收盤價。 美股方面…

我之前炒股虧麻了,找百融云AI Agent談了談心

春節之前&#xff0c;A股和H股都跌麻了&#xff0c;但是機構的路演和調研反而多了。因為&#xff1a;寫不完的安撫、說不完的陪伴、聽不完的客戶指責、以及撿不完的AH股便宜貨。 有一位血液里流淌著美式咖啡的職場白領&#xff0c;雖然這些年在股市過得很不如意&#xff0c;但…

C語言---鏈表

一.定義 鏈表是由一系列節點組成&#xff0c;每個結點包含兩個域&#xff0c;一個是數據域&#xff0c;數據域用來保存用戶數據&#xff0c;另一個是指針域&#xff0c;保存下一個節點的地址。鏈表在內存中是非連續的。 二.分類 靜態鏈表 動態鏈表 單向鏈表 雙向鏈表 循環鏈…

maven使用問題及解決辦法匯總

文章目錄 1、maven clean后打包出現Cannot create resource output directory2、把已有jar包打包進本地maven倉庫 1、maven clean后打包出現Cannot create resource output directory 主要原因是target目錄被別的程序占用了&#xff0c;最笨的辦法是重啟電腦&#xff0c;當然也…

C++跨模塊釋放內存

linux一個進程只有一個堆&#xff0c;不要考慮這些問題&#xff0c;但是windows一個進程可能有多個堆&#xff0c;要在對應的堆上釋放。 一&#xff0c; MT改MD 一個進程的地址空間是由一個可執行模塊和多個DLL模塊構成的&#xff0c;這些模塊中&#xff0c;有些可能會鏈接到…

代碼隨想錄訓練營第29天| 491.遞增子序列、46.全排列、47.全排列 II

491.遞增子序列 題目鏈接&#xff1a;491. 非遞減子序列 - 力扣&#xff08;LeetCode&#xff09; class Solution {List<List<Integer>> ans new ArrayList<>();public List<List<Integer>> findSubsequences(int[] nums) {backtrack(nums, …

(十三)【Jmeter】線程(Threads(Users))之tearDown 線程組

簡述 操作路徑如下: 作用:在正式測試結束后執行清理操作,如關閉連接、釋放資源等。配置:設置清理操作的采樣器、執行順序等參數。使用場景:確保在測試結束后應用程序恢復到正常狀態,避免資源泄漏或對其他測試的影響。優點:提供清理操作,確保測試環境的整潔和可重復性…

租用海外服務器,自己部署ChatGPT-Next-Web,實現ChatGPT聊天自由,還可以分享給朋友用

前言 如果有好幾個人需要使用ChatGPT&#xff0c;又沒有魔法上網環境&#xff0c;最好就是自己搭建一個海外的服務器環境&#xff0c;然后很多人就可以同時直接用了。 大概是情況是要花80元租一個一年的海外服務器&#xff0c;花15元租一個一年的域名&#xff0c;然后openai 的…

centos安裝擴展

centos下安裝php擴展時遇到的問題php 1.imapgit cd /root/php-5.6.27/ext/imap /usr/local/php/bin/phpize ./configure --prefix/usr/local/imap 錯誤1github configure: error: utf8_mime2text() has new signature, but U8T_CANONICAL is missing. This should not happe…

一 些有代表性的相位解包裹算法

Itoh首先給出了傳統解包裹算法的數學描述!。傳統的相位解包裹操作是通過對空間相鄰點相位值的比較來完成的。根據抽樣定理&#xff0c;如果相鄰采樣點的相位差不超過z&#xff0c;則對應的相位解包裹處理是非常簡單的&#xff0c;理論上以某點為起始點沿某一路徑對包裹相位的差…

中科院計算所:什么情況下,大模型才需要檢索增強?

ChatGPT等大型語言模型在自然語言處理領域表現出色。但有時候會表現得過于自信&#xff0c;對于無法回答的事實問題&#xff0c;也能編出一個像樣的答案來。 這類胡說亂說的答案對于醫療等安全關鍵的領域來說&#xff0c;是致命的。 為了彌補這一缺陷&#xff0c;研究者們提出…

ios抓包Tunnel to......443

fiddler官網下載“CertMaker for iOS and Android”插件&#xff0c;官網插件&#xff1a;https://www.telerik.com/fiddler/add-ons 雙擊運行插件后&#xff0c;重啟fiddler&#xff0c;ios重新安裝證書即可

貓頭虎分享已解決Bug || 系統更新失敗(System Update Failure):UpdateError, UpgradeFailure

博主貓頭虎的技術世界 &#x1f31f; 歡迎來到貓頭虎的博客 — 探索技術的無限可能&#xff01; 專欄鏈接&#xff1a; &#x1f517; 精選專欄&#xff1a; 《面試題大全》 — 面試準備的寶典&#xff01;《IDEA開發秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鴻蒙》 …

Java并發編程面試題53道-JUC

Java中的JUC是"Java Concurrency Utilities"的縮寫&#xff0c;它是指Java平臺從Java 5版本開始引入的一系列用于處理多線程并發編程的工具類和框架。這個包(java.util.concurrent)極大地增強了Java在并發編程領域的支持&#xff0c;提供了一系列高級抽象如線程池&am…

Sora:視頻生成模型作為世界模擬器

我們探索了視頻數據上生成模型的大規模訓練。具體來說&#xff0c;我們在可變持續時間、分辨率和長寬比的視頻和圖像上聯合訓練文本條件擴散模型。我們利用了一個在視頻和圖像潛在碼的時空塊上操作的變壓器架構。我們規模最大的模型 Sora 能夠生成一分鐘的高保真視頻。我們的結…

一周學會Django5 Python Web開發-Django5路由重定向

鋒哥原創的Python Web開發 Django5視頻教程&#xff1a; 2024版 Django5 Python web開發 視頻教程(無廢話版) 玩命更新中~_嗶哩嗶哩_bilibili2024版 Django5 Python web開發 視頻教程(無廢話版) 玩命更新中~共計25條視頻&#xff0c;包括&#xff1a;2024版 Django5 Python we…

代碼隨想錄算法訓練營第21天—回溯算法01 | ● 理論基礎 ● *77. 組合

理論基礎 回溯是一種純暴力搜索的方法&#xff0c;它和遞歸相輔相成&#xff0c;通常是執行完遞歸之后緊接著執行回溯相較于以往使用的for循環暴力搜索&#xff0c;回溯能解決更為復雜的問題&#xff0c;如以下的應用場景應用場景 組合問題 如一個集合{1,2,3,4}&#xff0c;找…