一、前述
Spark中控制算子也是懶執行的,需要Action算子觸發才能執行,主要是為了對數據進行緩存。
控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系。
二、具體算子
1、 cache
默認將RDD的數據持久化到內存中。cache是懶執行。
chche () = persist()=persist(StorageLevel.Memory_Only)
2、persist(可以指定持久化的級別)

解釋:
1、MEMORY_AND_DISK 意思是先往內存中放數據,內存不夠再放磁盤
2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本數。
3、選擇的原則是:首先考慮內存,然后考慮序列化之后再放入內存,最后考慮內存加磁盤。
4、盡量避免使用“_2”和DISK_ONLY級別。
5、deserialized是不序列化的意思。
注意事項:
1、cache和persist都是懶執行,必須有一個action類算子觸發執行。
2、cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition。
3、cache和persist算子后不能立即緊跟action算子。
錯誤:
rdd.cache().count() 返回的不是持久化的RDD,而是一個數值了
3、 Checkpoint(對Lineage非常長時使用)
1、概念和特征:
不僅可以將數據持久化到磁盤,還可以切斷RDD之間的依賴關系,checkpoint也是懶執行。
Checkpoin不僅存儲結果,還會存儲邏輯,還可以存儲元數據。
Persisit切斷不了RDD的依賴關系。
2、checkpoint 的執行原理:
(1).Spark job執行完之后,spark會從finalRDD從后往前回溯。
(2)當回溯到對某個RDD進行了checkpoint,會對這個RDD標記。
(3)回溯完成之后,Spark會重新計算標記RDD的結果,然后將結果保存到Checkpint目錄中。
3、優化checekpoint
- 因為最后是要觸發當前application的action算子,所以在觸發之前加一層cache操作,一樣會往前執行cache操作,實現對數據的cache ,所以考慮將cache優化到checkpoin的優化流程里。
- 對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job(回溯完成之后重新開的job)只需要將內存中的數據(cache緩存好的checkpoint那個點的數據)拷貝到HDFS上就可以。
- 省去了重新計算這一步,不需要重頭開始來走到checkpoint這個點了。
總結:
持久化的最小單位是partition!!!