Spark RDD通過persist方法或cache方法可以將計算結果的緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD才會被緩存在計算節點的內存中并供后面重用。下面是persist方法或cache方法的函數定義:
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
視頻講解如下 |
---|
【趙渝強老師】Spark RDD的緩存機制 |
通過函數的定義發現,cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark在object StorageLevel中定義了緩存的存儲級別。下面是在StorageLevel中的定義的緩存級別。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2=new StorageLevel(true, true, false, false, 2)
valOFF_HEAP = new StorageLevel(true, true, true, false, 1)
需要說明的是,使用RDD的緩存機制,數據可能丟失;或者會由于內存的不足而造成數據被刪除。可以通過使用RDD的檢查點機制了保證緩存的容錯,即使緩存丟失了也能保證計算的正確執行。
下面是使用RDD緩存機制的一個示例。這里使用RDD讀取一個大的文件,該文件中包含918843條記錄。通過Spark Web Console可以對比出在不使用緩存和使用緩存時,執行效率的差別。
(1)讀取一個大文件。
scala> val rdd1 = sc.textFile("/root/temp/sales")
(2)觸發一個計算,這里沒有使用緩存。
scala> rdd1.count
(3)調用cache方法標識該RDD可以被緩存。
scala> rdd1.cache
(4)第二次觸發計算,計算完成后會將結果緩存。
scala> rdd1.count
(5)第三次觸發計算,這里會直接從之前的緩存中獲取結果。
scala> rdd1.count
(6)訪問Spark的Web Console觀察這三次count計算的執行時間,可以看成最后一次count計算只耗費了98ms,如下圖所示。