在什么情況下適合使用 Cache
我建議你在做決策的時候遵循以下 2 條基本原則:
- 如果 RDD/DataFrame/Dataset 在應用中的引用次數為 1,就堅決不使用 Cache
- 如果引用次數大于 1,且運行成本占比超過 30%,應當考慮啟用
Cache第一條很好理解,我們詳細說說第二條。這里咱們定義了一個新概念:運行成本占比。它指的是計算某個分布式數據集所消耗的總時間與作業執行時間的比值。我們來舉個例子,假設我們有個數據分析的應用,端到端的執行時間為 1 小時。應用中有個 DataFrame 被引用了 2 次,從讀取數據源,經過一系列計算,到生成這個 DataFrame 需要花費 12 分鐘,那么這個 DataFrame 的運行成本占比應該算作:12 * 2 / 60 = 40%。
你可能會說:“作業執行時間好算,直接查看 Spark UI 就好了,DataFrame 的運行時間怎么算呢?”這里涉及一個小技巧,我們可以從現有應用中 把 DataFrame 的計算邏輯單拎出來,然后利用 Spark 3.0 提供的 Noop 來精確地得到 DataFrame 的運行時間。假設 df 是那個被引用 2 次的 DataFrame,我們就可以把 df 依賴的所有代碼拷貝成一個新的作業,然后在 df 上調用 Noop 去觸發計算。
Noop 的作用很巧妙,它只觸發計算,而不涉及落盤與數據存儲,因此,新作業的執行時間剛好就是 DataFrame 的運行時間。
//利用noop精確計算DataFrame運行時間
df.write.format("noop").save()
你可能會覺得每次計算占比會很麻煩,但只要你對數據源足夠了解、對計算 DataFrame 的中間過程心中有數了之后,其實不必每次都去精確地計算運行成本占比,嘗試幾次,你就能對分布式數據集的運行成本占比估摸得八九不離十了。
##Cache 的注意事項
弄清楚了應該什么時候使用 Cache 之后,我們再來說說 Cache 的注意事項。首先,我們都知道,.cache是惰性操作,因此在調用.cache之后,需要先用 Action 算子觸發緩存的物化過程。
但是,我發現很多同學在選擇 Action 算子的時候很隨意,first、take、show、count 中哪個順手就用哪個。這肯定是不對的,這 4 個算子中只有 count 才會觸發緩存的完全物化,而 first、take 和 show 這 3 個算子只會把涉及的數據物化。舉個例子,show 默認只產生 20 條結果,如果我們在.cache 之后調用 show 算子,它只會緩存數據集中這 20 條記錄。
選擇好了算子之后,我們再來討論一下怎么 Cache 這個問題。你可能會說:“這還用說嗎?在 RDD、DataFrame 后面調用.cache不就得了”。還真沒這么簡單,我出一道選擇題來考考你,如果給定包含數十列的 DataFrame df 和后續的數據分析,你應該采用下表中的哪種 Cache 方式?
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)//Cache方式一
val cachedDF = df.cache
//數據分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//數據分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)//Cache方式三
val cachedDF = df.select(col1, col2).cache
//數據分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)
我們都知道,由于 Storage Memory 內存空間受限,因此 Cache 應該遵循最小公共子集原則,也就是說,開發者應該僅僅緩存后續操作必需的那些數據列。按照這個原則,實現方式 1 應當排除在外,畢竟 df 是一張包含數十列的寬表。
我們再來看第二種 Cache 方式,方式 2 緩存的數據列是col1和col2,且col2數值大于 0。第一條分析語句只是把filter和select調換了順序;第二條語句filter條件限制col2數值要大于 100,那么,這個語句的結果就是緩存數據的子集。因此,乍看上去,兩條數據分析語句在邏輯上剛好都能利用緩存的數據內容。但遺憾的是,這兩條分析語句都會跳過緩存數據,分別去磁盤上讀取 Parquet 源文件,然后從頭計算投影和過濾的邏輯。這是為什么呢?究其緣由是,Cache Manager 要求兩個查詢的 Analyzed Logical Plan 必須完全一致,才能對 DataFrame 的緩存進行復用。
Analyzed Logical Plan 是比較初級的邏輯計劃,主要負責 AST 查詢語法樹的語義檢查,確保查詢中引用的表、列等元信息的有效性。像謂詞下推、列剪枝這些比較智能的推理,要等到制定 Optimized Logical Plan 才會生效。因此,即使是同一個查詢語句,僅僅是調換了select和filter的順序,在 Analyzed Logical Plan 階段也會被判定為不同的邏輯計劃。因此,為了避免因為 Analyzed Logical Plan 不一致造成的 Cache miss,我們應該采用第三種實現方式,把我們想要緩存的數據賦值給一個變量,凡是在這個變量之上的分析操作,都會完全復用緩存數據。
你看,緩存的使用可不僅僅是調用.cache那么簡單。除此之外,我們也應當及時清理用過的 Cache,盡早騰出內存空間供其他數據集消費,從而盡量避免 Eviction 的發生。一般來說,我們會用.unpersist 來清理棄用的緩存數據,它是.cache 的逆操作。unpersist 操作支持同步、異步兩種模式:
異步模式:調用 unpersist() 或是 unpersist(False)
同步模式:調用 unpersist(True)
在異步模式下,Driver 把清理緩存的請求發送給各個 Executors 之后,會立即返回,并且繼續執行用戶代碼,比如后續的任務調度、廣播變量創建等等。在同步模式下,Driver 發送完請求之后,會一直等待所有 Executors 給出明確的結果(緩存清除成功還是失敗)。各個 Executors 清除緩存的效率、進度各不相同,Driver 要等到最后一個 Executor 返回結果,才會繼續執行 Driver 側的代碼。
顯然,同步模式會影響 Driver 的工作效率。因此,通常來說,在需要主動清除 Cache 的時候,我們往往采用異步的調用方式,也就是調用 unpersist() 或是unpersist(False)。
總結
我們要掌握使用 Cache 的一般性原則和注意事項,我把它們總結為 3 條:
- 如果 RDD/DataFrame/Dataset 在應用中的引用次數為 1,我們就堅決不使用 Cache
- 如果引用次數大于 1,且運行成本占比超過 30%,我們就考慮啟用 Cache(其中,運行成本占比的計算,可以利用 Spark 3.0 推出的 noop 功能)
- Action 算子要選擇 count 才能完全物化緩存數據,以及在調用 Cache 的時候,我們要把待緩存數據賦值給一個變量。這樣一來,只要是在這個變量之上的分析操作都會完全復用緩存數據。