1、Job啟動流程
1、Client觸發 SparkContext
初始化
2、SparkContext
向 Master
注冊應用
3、Master
調度 Worker
啟動 Executor
4、Worker
進程啟動 Executor
5、DAGScheduler
將作業分解為 Stage
:
6、TaskScheduler
分配 Task
到 Executor
2、核心組件
組件 | 職責 |
---|---|
SparkContext | 應用入口,協調各組件,管理應用生命周期。 |
DAGScheduler | 將 Job 拆分為 Stage,構建 DAG,提交 TaskSet 給 TaskScheduler。 |
TaskScheduler | 調度 Task 到 Executor,處理故障重試。 |
CoarseGrainedSchedulerBackend | 與集群管理器交互,申請資源,管理 Executor。 |
ExternalClusterManager | 抽象層,適配不同集群(Standalone/YARN/Mesos)。 |
Master & Worker | Standalone 模式下管理集群資源(Master 分配資源,Worker 啟動 Executor)。 |
Executor | 在 Worker 上運行,執行 Task,管理內存/磁盤。 |
CoarseGrainedExecutorBackend | Executor 的通信代理,接收 Task,返回狀態/結果。 |
Task | 計算單元(ShuffleMapTask / ResultTask)。 |
ShuffleManager | 管理 Shuffle 數據讀寫(如 SortShuffleManager)。 |
3、工作流程
1、SparkContext
負責資源申請、任務提交、與集群管理器通信。
調用runJob
方法,將RDD操作傳遞給DAGScheduler
2、DAGScheduler
將Job拆分為Stage(DAG),處理Shuffle依賴,提交TaskSet給TaskScheduler。
1、DAGSchedulerEvent
/* 作業生命周期事件 */
JobSubmitted //新作業提交時觸發
JobCancelled //單個作業被取消
JobGroupCancelled //作業組整體取消
JobTagCancelled //按標簽批量取消作業
AllJobsCancelled //取消所有運行中的作業/* 階段執行事件 */
MapStageSubmitted //Shuffle Map階段提交
StageCancelled //單個階段取消
StageFailed //階段執行失敗
ResubmitFailedStages //自動重試失敗階段 ,默認4次/* 任務調度事件 */
TaskSetFailed //整個任務集失敗,默認4次
SpeculativeTaskSubmitted //啟動推測執行任務
UnschedulableTaskSetAdded //任務集進入待調度隊列
UnschedulableTaskSetRemoved //任務集離開待調度隊列/* Shuffle 優化事件 */
RegisterMergeStatuses //注冊Shuffle合并狀態
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle數據推送完成/* 資源管理事件 */
ExecutorAdded //新Executor注冊成功
ExecutorLost //Executor異常丟失
WorkerRemoved //工作節點移除/* 執行過程事件 */
BeginEvent //任務集開始執行
GettingResultEvent //驅動程序主動獲取任務結果
CompletionEvent //作業/階段完成
2、stage拆分流程
*ResultStage
(執行作的最后一個階段)、ShuffleMapStage
(shuffle映射輸出文件)*
- 用戶行動操作觸發
submitJob
,發送JobSubmitted
事件。 handleJobSubmitted
處理事件,調用createResultStage
創建ResultStage。createResultStage
調用getOrCreateParentStages
獲取父Stage,父Stage的創建會遞歸進行。- 在創建父Stage的過程中,遇到寬依賴則創建ShuffleMapStage,并遞歸創建其父Stage。
- 當所有父Stage都創建完成后,回到
handleJobSubmitted
,調用submitStage
提交ResultStage。 submitStage
檢查父Stage是否完成,如果有未完成的父Stage,則遞歸提交父Stage;否則,提交當前Stage(調用submitMissingTasks
)。submitMissingTasks
為Stage創建任務(ShuffleMapTask或ResultTask),并提交給TaskScheduler執行。
3、寬窄依賴切分
private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍歷RDD依賴樹val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 寬依賴:創建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)} // Otherwise there's no need to follow the dependency back// 窄依賴:繼續回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}
3、TaskScheduler
接收TaskSet,按調度策略(FIFO/FAIR)將Task分配給Executor。
1、執行流程
1、DAGScheduler 調用 taskScheduler.submitTasks()
后,任務進入 TaskScheduler 調度階段
2、任務提交submitTasks
// TaskSetManager管理任務集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到調度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 觸發資源分配
backend.reviveOffers()
3、資源分配 (Driver)
// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers) // 向DriverEndpoint發送消息
}// DriverEndpoint處理
case ReviveOffers =>makeOffers() // 觸發資源分配
4、資源分配核心
private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 獲取所有可用Executor資源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 調用任務調度器分配任務scheduler.resourceOffers(workOffers, true)}// 3. 啟動分配的任務if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}
5、任務啟動
// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任務val serializedTask = TaskDescription.encode(task)// 2. 檢查任務大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 發送任務到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}