1、sparkSession配置checkpoint的方法
# step1: 在conf中添加checkpoint的保存地址
val spark = SparkSession.builder.appName(JobRegister.getJobName("xxx", s"xxxx")).config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").config("spark.sql.broadcastTimeout", 3000).config("spark.sql.sources.partitionOverwriteMode", "dynamic").config("spark.checkpoint.dir", "/user/vc/projects_prod/checkpoint/data") # 配置路徑.enableHiveSupport().getOrCreate()# step2: 保險期間,在引用spark時,再設置下:
spark.sparkContext.setCheckpointDir("/user/vc/projects_prod/checkpoint/data")# step3: 在代碼中添加checkpoint函數
val risk_msg = spark.sql(s"select * from temp.dwd_im_basic_info ").cache().checkpoint()
2、checkpoint()跟cache()的原理
- checkpoint() 方法和 cache() 方法都是 Spark中的緩存機制,用于提高計算效率的操作,都可以在迭代計算或長時間計算中使用,以減少計算時間和提高數據可靠性,但是它們的實現方式不同。
- cache() 是將數據緩存在內存中,優勢在于速度快,但缺點在于內存有限,數據可能會被清除;
- 而 checkpoint()則是將數據寫入磁盤中,優勢在于數據可靠性高,但缺點在于速度慢,需要寫入磁盤。
- cache() 是一個轉換操作,不會立即執行計算,只有在行動操作需要使用數據時才會觸發計算。
- checkpoint()會觸發一次完整的計算,并將結果寫入到磁盤中,因此它是一個行動操作。
- 但是,checkpoint() 方法需要將數據寫入磁盤或分布式文件系統中,這會導致額外的 I/O操作,影響性能。因此,為了避免頻繁地進行 I/O 操作,通常將 checkpoint() 方法和 cache() 方法結合使用。
- 具體來說,可以先使用 cache() 方法將數據緩存到內存中,然后再使用 checkpoint()方法將數據寫入磁盤或分布式文件系統中。這樣可以避免頻繁地進行 I/O 操作,同時又能保證數據的可靠性和可恢復性。
- 需要注意的是,使用 checkpoint()會將數據寫入磁盤或分布式文件系統中,這會占用一定的存儲空間。因此,需要根據實際情況來決定何時使用 checkpoint()方法,以避免浪費存儲資源。
3、checkpoint()和cache()結合時,誰前誰后呢?
在 PyTorch 中,checkpoint() 和 cache() 都是模型優化中經常使用的函數,但它們的使用方式和作用不同。
checkpoint() 函數是一種優化方法,可以將模型的一部分計算推遲到后面執行,從而減少顯存的占用,提高模型的訓練速度。
而 cache() 函數是一種緩存方法,可以將模型的某些計算結果緩存下來,以便下次使用時可以直接調用,避免重復計算,提高模型的訓練速度。
這兩個函數的使用順序取決于具體的場景。
如果你希望先緩存模型的某些計算結果,再對模型進行優化,那么就應該先使用 cache() 函數,再使用 checkpoint() 函數。
如果你希望先對模型進行優化保存,再將優化后的結果緩存下來,那么就應該先使用 checkpoint() 函數,再使用 cache() 函數。