FileOutputCommitter中提交mapreduce.fileoutputcommitter.algorithm.version有v1和v2兩個版本。
v1版本Spark寫入文件的流程:
1.當task完成的時候,會將task的結果文件先寫入到臨時目錄下面。
2.所有的task完成后,將所有的結果文件寫入到結果目錄。
3.刪除臨時目錄,生成標記文件。
v1的弊端:第二步寫入操作是在driver端進行的,而且是單線程進行。當結果文件數量很多的時候,耗時就會線性增加。
v2版本Spark寫入文件的流程:
1.當task完成的時候,將task的結果文件直接寫入結果目錄。
2.所有的task完成后,刪除臨時目錄,生成標記文件。
可以看的v2比v1少了一個rename的過程。寫入結果目錄是發生在Executor上,task是同時進行的,相當于多線程,速度更快。但是存在一致性問題,在所有task寫入結果目錄過程,用戶可以看到部分數據。
FileOutputCommitter源碼解析
對應的類是org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
commit task
Executor端完成task會執行commitTask,可以看到當algorithmVersion == 1的時候會執行rename操作,將task結果文件到committedTaskPath(臨時文件)。
algorithmVersion不是1的時候,使用mergePaths將結果文件直接寫入到outputPath
commit job
在driver執行,當job完成(所有的task完成),執行commitJob
algorithmVersion == 1,執行mergePaths將結果文件直接寫入到outputPath,這里就是性能瓶頸所在的地方,這里是單線程進行的,要是單個文件耗時20ms,1000個文件就是20000ms(20s),它是線性增加的。
algorithmVersion == 2的話,在commitTask中已經寫入到結果目錄了。
最后刪除臨時目錄,生成標記文件。
和hive結合使用
上面提到使用v2的時候,很有可能出現結果目錄中出現部分文件的情況,這要是hive的表目錄就能在這個表讀取到部分數據,這是不對的。應該保證原子性,要么讀取的數據是舊的數據,要么是新的數據,不能讀取部分數據。
入口類:org.apache.spark.sql.hive.execution.InsertIntoHiveTable
hive的臨時目錄生成
類似這種:{hive表數據目錄}/.hive-staging_hive_2025-06-26_17-33-07_088_701862708150638596-1/-ext-10000/
processInsert
這里就一個簡單的場景,就是寫入到非分區表。
首先是先將數據寫入到hive的臨時目錄中,在load到hive表中。
寫入hive臨時目錄
可以看到commiter變量,這個就是上面的FileOutputCommitter,有兩種提交方式。
runJob就是spark提交了任務,開始執行。這里執行task,task完成回調commitTask方法,runJob是阻塞的,會等到所有task完成。然后回調commitJob方法。根據上面FileOutputCommitter,此時數據已經全部寫入了hive的臨時目錄下面了。
loadTable
loadTable是使用反射的方式執行_loadTableMethod_方法,對應就是Hive的loadTable方法。
hive的loadTable中如果是替換就是replaceFiles,否則是copyFiles。
replaceFiles
對應sparksql的 insert overwrite
首先將原有的目錄下面的文件都放到回收站,即刪除,再將結果文件從臨時目錄移動到正式目錄。
moveFiles 移動結果文件。可以看到使用線程池(默認是25個線程)進行結果文件的重命名來移動文件的。不同于FileOutputCommitter的v1中單線程進行rename,Hive是多線程來rename的,效率更高。
copyFiles
對應sparksql的 insert into
可以看到它也是使用線程池進行并行操作的。
整體流程
v2流程,建議使用v2,效率更高。