目前在做 Spark 升級(3.1.1升級到3.5.0)的時候,遇到了cache table
導致的數據重復問題,這種情況一般來說是很少見的,因為一般很少用cache table
語句。
當然該問題已經在Spark3.5.1已經解決了,可以查看對應的 SPARK-46995和SPARK-45592
從以上的分析知道:是在做join的一方(包含了AQEshuffleRead-coalesced) 影響了join的另一方,導致EnsureRequirements
規則在做執行前檢查的時候,會把join的另一個方的shuffle 數據調整為 包含了AQEshuffleRead-coalesced)的一方:
Scan|Shuffle(200)|Scan AQEShuffleRead(10)| |
Shuffle(10) InMemoryTableScanExec\ /Join
這會導致shuffle后的數據進行了錯位(因為之前是shuffle(200),現在變成了shuffle(10)),具體原因筆者還是沒有分析清楚,但是其中涉及到的點跟規則EnsureRequirements
以及 CoalesceShufflePartitions
有關,
EnsureRequirements
會做一些執行前的判斷,主要是做任務shuffle的協調,
CoalesceShufflePartitions
中 collectCoalesceGroups 會收集 QueryStageExec ,如果是 join 的話,則會join的子節點會收集到一個組里去,這樣就能看到 join中會有 AQEShuffleRead coalesced 成對出現