Spark 運行流程核心組件(一)作業提交

1、Job啟動流程

在這里插入圖片描述

1、Client觸發 SparkContext 初始化

2、SparkContextMaster 注冊應用

3、Master 調度 Worker 啟動 Executor

4、Worker 進程啟動 Executor

5、DAGScheduler 將作業分解為 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心組件

組件職責
SparkContext應用入口,協調各組件,管理應用生命周期。
DAGScheduler將 Job 拆分為 Stage,構建 DAG,提交 TaskSet 給 TaskScheduler。
TaskScheduler調度 Task 到 Executor,處理故障重試。
CoarseGrainedSchedulerBackend與集群管理器交互,申請資源,管理 Executor。
ExternalClusterManager抽象層,適配不同集群(Standalone/YARN/Mesos)。
Master & WorkerStandalone 模式下管理集群資源(Master 分配資源,Worker 啟動 Executor)。
Executor在 Worker 上運行,執行 Task,管理內存/磁盤。
CoarseGrainedExecutorBackendExecutor 的通信代理,接收 Task,返回狀態/結果。
Task計算單元(ShuffleMapTask / ResultTask)。
ShuffleManager管理 Shuffle 數據讀寫(如 SortShuffleManager)。

3、工作流程

1、SparkContext

負責資源申請、任務提交、與集群管理器通信。

調用runJob方法,將RDD操作傳遞給DAGScheduler

2、DAGScheduler

將Job拆分為Stage(DAG),處理Shuffle依賴,提交TaskSet給TaskScheduler。

1、DAGSchedulerEvent

/* 作業生命周期事件 */
JobSubmitted //新作業提交時觸發
JobCancelled //單個作業被取消
JobGroupCancelled //作業組整體取消
JobTagCancelled //按標簽批量取消作業
AllJobsCancelled //取消所有運行中的作業/* 階段執行事件 */
MapStageSubmitted //Shuffle Map階段提交
StageCancelled //單個階段取消
StageFailed //階段執行失敗
ResubmitFailedStages //自動重試失敗階段 ,默認4次/* 任務調度事件 */
TaskSetFailed //整個任務集失敗,默認4次
SpeculativeTaskSubmitted //啟動推測執行任務
UnschedulableTaskSetAdded //任務集進入待調度隊列
UnschedulableTaskSetRemoved //任務集離開待調度隊列/* Shuffle 優化事件 */
RegisterMergeStatuses //注冊Shuffle合并狀態
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle數據推送完成/* 資源管理事件 */
ExecutorAdded //新Executor注冊成功
ExecutorLost //Executor異常丟失
WorkerRemoved //工作節點移除/* 執行過程事件 */
BeginEvent //任務集開始執行 
GettingResultEvent //驅動程序主動獲取任務結果
CompletionEvent //作業/階段完成

2、stage拆分流程

*ResultStage (執行作的最后一個階段)、ShuffleMapStage (shuffle映射輸出文件)*

  1. 用戶行動操作觸發submitJob,發送JobSubmitted事件。
  2. handleJobSubmitted處理事件,調用createResultStage創建ResultStage。
  3. createResultStage調用getOrCreateParentStages獲取父Stage,父Stage的創建會遞歸進行。
  4. 在創建父Stage的過程中,遇到寬依賴則創建ShuffleMapStage,并遞歸創建其父Stage。
  5. 當所有父Stage都創建完成后,回到handleJobSubmitted,調用submitStage提交ResultStage。
  6. submitStage檢查父Stage是否完成,如果有未完成的父Stage,則遞歸提交父Stage;否則,提交當前Stage(調用submitMissingTasks)。
  7. submitMissingTasks為Stage創建任務(ShuffleMapTask或ResultTask),并提交給TaskScheduler執行。

3、寬窄依賴切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍歷RDD依賴樹val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 寬依賴:創建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)}  // Otherwise there's no need to follow the dependency back// 窄依賴:繼續回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}

3、TaskScheduler

接收TaskSet,按調度策略(FIFO/FAIR)將Task分配給Executor。

1、執行流程

1、DAGScheduler 調用 taskScheduler.submitTasks() 后,任務進入 TaskScheduler 調度階段

2、任務提交submitTasks

// TaskSetManager管理任務集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到調度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 觸發資源分配
backend.reviveOffers()

3、資源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint發送消息
}// DriverEndpoint處理
case ReviveOffers =>makeOffers()  // 觸發資源分配

4、資源分配核心

private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 獲取所有可用Executor資源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 調用任務調度器分配任務scheduler.resourceOffers(workOffers, true)}// 3. 啟動分配的任務if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

5、任務啟動

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任務val serializedTask = TaskDescription.encode(task)// 2. 檢查任務大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 發送任務到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93196.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93196.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93196.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL 臨時表與復制表

一、MySQL 臨時表臨時表是會話級別的臨時數據載體&#xff0c;其設計初衷是為了滿足短期數據處理需求&#xff0c;以下從技術細節展開說明。&#xff08;一&#xff09;核心特性拓展1.生命周期與會話綁定會話結束的判定&#xff1a;包括正常斷開連接&#xff08;exit/quit&…

從配置到調試:WinCC與S7-1200/200SMT無線Modbus TCP通訊方案

測試設備與參數l 西門子PLC型號&#xff1a;S7-1200 1臺l 西門子PLC型號&#xff1a;S7-200Smart 1臺l 上位機&#xff1a;WinCC7.4 1臺l 無線通訊終端——DTD418MB 3塊l 主從關系&#xff1a;1主2從l 通訊接口&#xff1a;RJ45接口l 供電&#xff1a;12-24VDCl 通訊協議&a…

Android沉浸式全屏顯示與隱藏導航欄的實現

1. 總體流程以下是實現沉浸式全屏顯示和隱藏導航欄的流程&#xff1a;步驟描述步驟1創建一個新的Android項目步驟2在布局文件中定義需要展示的界面步驟3在Activity中設置沉浸式全屏顯示步驟4處理系統UI的顯示與隱藏步驟5運行應用并測試效果2. 詳細步驟步驟1&#xff1a;創建一個…

EN 62368消費電子、信息技術設備和辦公設備安全要求標準

EN 62368認證標準是一項全球性的電子產品安全標準&#xff0c;用于評估和認證消費電子、信息技術設備和辦公設備的安全性。該標準由國際電工委員會(IEC)制定&#xff0c;取代了傳統的EN60065和EN 60950兩個標準&#xff0c;成為國際電子產品安全領域的新指導。IEC /EN 62368-1是…

【unity實戰】使用Splines+DOTween制作彎曲手牌和抽牌動畫效果

最終效果 文章目錄最終效果前言實戰1、Splines的使用2、繪制樣條線3、DOTween安裝和使用4、基于樣條曲線&#xff08;Spline&#xff09;的手牌管理系統4.1 代碼實現4.2 解釋&#xff1a;&#xff08;1&#xff09;計算第一張卡牌的位置&#xff08;居中排列&#xff09;&#…

Flask模板注入梳理

從模板開始介紹&#xff1a;Flask中有許多不同功能的模板&#xff0c;他們之間是相互隔離的地帶&#xff0c;可供引入和使用。Flask中的模塊&#xff1a;flask 主模塊&#xff1a;包含框架的核心類和函數&#xff0c;如 Flask&#xff08;應用實例&#xff09;、request&#x…

企業級的即時通訊平臺怎么保護敏感行業通訊安全?

聊天記錄存在第三方服務器、敏感文件被誤發至外部群組、離職員工仍能查看歷史消息.對于金融、醫療、政務等對數據安全高度敏感的行業而言&#xff0c;“溝通效率與”信息安全”的矛盾&#xff0c;從未像今天這樣尖銳。企業即時通訊怎么保護敏感行業通訊安全&#xff1f;這個問題…

Java Spring框架最新版本及發展史詳解(截至2025年8月)-優雅草卓伊凡

Java Spring框架最新版本及發展史詳解&#xff08;截至2025年8月&#xff09;-優雅草卓伊凡引言今天有個新項目 客戶問我為什么不用spring 4版本&#xff0c;卓伊凡我今天剛做完項目方案&#xff0c;我被客戶這一句問了有點愣住&#xff0c;Java Spring框架最新版本及發展史詳解…

Android實現Glide/Coil樣式圖/視頻加載框架,Kotlin

Android實現Glide/Coil樣式圖/視頻加載框架&#xff0c;Kotlin <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE" /><uses-permission android:name"android.permission.READ_EXTERNAL_STORAGE" /><uses-permiss…

【k8s】pvc 配置的兩種方式volumeClaimTemplates 和 PersistentVolumeClaim

pvc配置實例 實例1在Deployment中配置 template:xxxxxxvolumeClaimTemplates:- metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 1GistorageClassName: nfsdev-storageclass (創建好的storageClassName)實例2#先創建一個pvc 然后在 Deploym…

Logistic Loss Function|邏輯回歸代價函數

----------------------------------------------------------------------------------------------- 這是我在我的網站中截取的文章&#xff0c;有更多的文章歡迎來訪問我自己的博客網站rn.berlinlian.cn&#xff0c;這里還有很多有關計算機的知識&#xff0c;歡迎進行留言或…

計算機網絡技術-知識篇(Day.1)

一、網絡概述 1、網絡的概念 兩個不在同一地理位置的主機&#xff0c;通過傳輸介質和通信協議&#xff0c;實現通信和資源共享。 2、網絡發展史 第一階段&#xff08;20世紀60年代&#xff09; 標志性事件&#xff1a;ARPANET的誕生關鍵技術&#xff1a;分組交換技術 第二…

工業元宇宙:邁向星辰大海的“玄奘之路”

一、從認知革命到工業革命&#xff1a;文明躍遷的底層邏輯1.1 認知革命&#xff1a;人類協作的基石時間線&#xff1a;約7萬年前&#xff0c;智人通過語言和想象力構建共同虛擬現實&#xff0c;形成部落協作模式。核心突破&#xff1a;虛構能力&#xff1a;創造神、國家、法律等…

9. React組件生命周期

2. React組件生命周期 2.1. 認識生命周期 2.1.1. 很多事物都有從創建到銷毀的整個過程&#xff0c;這個過程稱之為生命周期&#xff1b;2.1.2. React組件也有自己的生命周期&#xff0c;了解生命周期可以讓我們在最合適的地方完成想要的功能2.1.3. 生命周期和生命周期函數的關系…

【單板硬件開發】關于復位電路的理解

閱讀紫光同創供應商提供的FPGA單板硬件開發手冊&#xff0c;發現復位電路他們家解釋的很通俗易懂&#xff0c;所以分享一下。如下圖&#xff0c;RST_N 是低有效的異步全芯片復位信號&#xff0c;一般外部連接電路有 3 種形式如圖 3–2&#xff0c;可根據實際需要選擇合適的電路…

《Unity Shader入門精要》學習筆記一

1、本書的源代碼 https://github.com/candycat1992/Unity_Shaders_Book 2、第1章 Shader是面向GPU的工作方式 3、第2章 渲染流水線 Shader&#xff1a;著色器 渲染流水線&#xff1a;目標是渲染一張二維紋理&#xff0c;輸入是一個虛擬攝像機、一些光源、一些Shader以及紋…

從零到一:TCP 回聲服務器與客戶端的完整實現與原理詳解

目錄 一、TCP 通信的核心邏輯 二、TCP 服務器編程步驟 步驟 1&#xff1a;創建監聽 Socket 步驟 2&#xff1a;綁定地址與端口&#xff08;bind&#xff09; 步驟 3&#xff1a;設置監聽狀態&#xff08;listen&#xff09; 步驟 4&#xff1a;接收客戶端連接&#xff08…

MyBatis-Plus核心內容

MyBatis-Plus MyBatis-Plus 是一個基于 MyBatis的增強工具&#xff0c;旨在簡化開發過程&#xff0c;減少重復代碼。它在MyBatis的基礎上增加了CRUD操作封裝&#xff0c;條件構造器、代碼生成器等功能。 一、核心特性與優勢 1. 核心特性 無侵入&#xff1a;只做增強不做改變&am…

計算機網絡摘星題庫800題筆記 第4章 網絡層

第4章 網絡層4.1 網絡層概述題組闖關1.在 Windows 的網絡配置中&#xff0c;“默認網關” 一般被設置為 ( ) 的地址。 A. DNS 服務器 B. Web 服務器 C. 路由器 D. 交換機1.【參考答案】C 【解析】只有在計算機上正確安裝網卡驅動程序和網絡協議&#xff0c;并正確設置 IP 地址信…

非root用戶在linux中配置zsh(已解決ncurses-devel報錯)

Zsh&#xff08;Z Shell&#xff09;是一款功能強大的交互式 Unix shell&#xff0c;以其高度可定制性和豐富的功能著稱&#xff0c;被視為 Bash 的增強替代品。它支持智能補全、主題美化、插件擴展&#xff08;如 Oh My Zsh 框架&#xff09;、自動糾錯、全局別名等特性&#…