1. 核心調度組件
-
DAGScheduler:負責將Job拆分為Stage,處理Stage間的依賴關系。
-
TaskScheduler:將Task分配到Executor,監控任務執行。
-
SchedulerBackend:與集群管理器(如YARN、K8s)通信,管理Executor資源。
2. 調度流程分步拆解
步驟1:用戶提交代碼
val rdd = sc.textFile("hdfs://data.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rdd.collect() // 觸發Job提交
步驟2:生成DAG(有向無環圖)
-
RDD血緣(Lineage):記錄RDD的轉換過程(
textFile
?→?flatMap
?→?map
?→?reduceByKey
)。 -
寬依賴(Shuffle):
reduceByKey
導致Stage劃分。
步驟3:劃分Stage
-
Stage 0:
textFile
?→?flatMap
?→?map
(窄依賴,合并為一個Stage)。 -
Stage 1:
reduceByKey
(寬依賴,單獨一個Stage)。
步驟4:提交Task
-
Stage 0生成多個
MapTask
,Stage 1生成多個ReduceTask
。 -
TaskScheduler根據數據本地性(Data Locality)分配Task到Executor。
步驟5:執行與監控
-
Executor執行Task,向Driver匯報狀態。
-
失敗Task自動重試(默認重試3次)。
3. 關鍵概念詳解
概念 | 說明 | 示例 |
---|---|---|
Job | 由行動操作(如collect )觸發的完整計算任務 | 一次collect() 生成一個Job |
Stage | 由一組無Shuffle依賴的Task組成(分為ResultStage 和ShuffleMapStage ) | reduceByKey 前為一個Stage |
Task | Stage中每個分區的計算單元(ShuffleMapTask 或ResultTask ) | 處理一個分區的數據 |
Shuffle | 跨Stage數據重分布(如groupByKey 、join ) | reduceByKey 觸發Shuffle |
數據本地性 | 優先將Task調度到數據所在節點(PROCESS_LOCAL ?>?NODE_LOCAL ?>?ANY ) | 讀取HDFS塊時優先分配到數據所在節點 |
4. 調度流程示意圖
5. 性能優化點
-
減少Shuffle:
-
用
reduceByKey
替代groupByKey
(提前局部聚合)。 -
使用
Broadcast Join
代替Shuffle Join
。
-
-
調整并行度:
-
通過
spark.default.parallelism
或repartition()
控制分區數。
-
-
數據本地性:
-
確保輸入數據與Executor在同一節點(如HDFS副本策略)。
-
-
資源分配:
-
合理設置Executor內存(
spark.executor.memory
)和CPU核心數(spark.executor.cores
)。
-
6. 容錯機制
-
Stage重試:若某個Stage失敗,重新提交該Stage的所有Task。
-
Task重試:單個Task失敗后,TaskScheduler會重新調度(默認最多3次)。
-
血緣恢復:若Executor丟失數據,根據RDD血緣重新計算。
總結
Spark的調度機制通過DAG優化、本地性優先和容錯設計,實現了高效的大數據處理。理解其原理后,可通過調整分區策略、優化Shuffle操作等手段顯著提升性能。