Spark 運行流程核心組件(三)任務執行

一、啟動模式

1、standalone

在這里插入圖片描述

  1. 資源申請:Driver向Master申請Executor資源
  2. Executor啟動:Master調度Worker啟動Executor
  3. 注冊通信:Executor直接向Driver注冊

2、YARN

在這里插入圖片描述

  1. Driver向YARN ResourceManager(RM)申請AM容器

  2. RM分配NodeManager(NM)啟動AM(yarn-client 僅資源代理,不運行用戶代碼)

  3. AM向RM注冊

  4. AM根據申請Executor容器

5.RM分配多個NM

6.每個NM啟動ExecutorBackend進程

**7.**注冊通信:Executor向AM內的Driver注冊

二、Executor端任務執行的核心組件

  1. Driver 端組件
    • CoarseGrainedSchedulerBackend:負責與Executor通信
    • TaskSchedulerImpl:任務調度核心邏輯
    • DAGScheduler:DAG調度與Stage管理
    • BlockManagerMaster:塊管理器協調器
    • MapOutputTrackerMaster:Shuffle輸出跟蹤器
  2. Executor 端組件
    • CoarseGrainedExecutorBackend:Executor的通信端點
    • Executor:任務執行引擎
    • TaskRunner:任務執行線程封裝
    • BlockManager:本地數據塊管理
    • ShuffleManager:Shuffle讀寫控制
    • ExecutorSource:指標監控

三、Executor 端任務執行核心流程

1、任務接收與初始化

  • CoarseGrainedExecutorBackend 接收任務
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")executor.launchTask(this, taskDesc)}
  • Executor 任務啟動
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val taskId = taskDescription.taskIdval tr = createTaskRunner(context, taskDescription)runningTasks.put(taskId, tr)val killMark = killMarks.get(taskId)if (killMark != null) {tr.kill(killMark._1, killMark._2)killMarks.remove(taskId)}threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}

2、任務執行

  • TaskRunner.run
// 1. 類加載與依賴管理
updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)
// 2. 反序列化任務
task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 內存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)// 4. 任務執行
val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,cpus = taskDescription.cpus,resources = resources,plugins = plugins)threwException = falseres} {// block 釋放val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)// memory 釋放val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = log"Managed memory leak detected; size = " +log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logInfo(errMsg)}}}// 5. 狀態上報
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  • Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 審計日志 , 用于問題排查 。 
// e.g. 小文件劇增 定位spark 任務
new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()// 任務啟動
context.runTaskWithListeners(this)

3、shuffle處理

  • ShuffleMapTask

為下游 Stage 準備 Shuffle 數據(Map 端輸出),生成 MapStatus(包含數據位置和大小信息)。

val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 從廣播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0Lval rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId
} else {context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(rdd.iterator(partition, context),dep,mapId,partitionId,context)
  • ResultTask
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lfunc(context, rdd.iterator(partition, context))
}

四、核心通信機制

消息類型方向內容
RegisterExecutorExecutor→DriverexecutorId, hostPort
RegisteredExecutorDriver→Executor注冊成功確認
LaunchTaskDriver→Executor序列化的TaskDescription
StatusUpdateExecutor→DrivertaskId, state, result data
KillTaskDriver→Executor終止指定任務
StopExecutorDriver→Executor關閉Executor指令
HeartbeatExecutor→Driver心跳+指標數據

五、Executor 線程模型

Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│   ├── TaskRunner 1
│   ├── TaskRunner 2
│   └── ... 
├── BlockManager
│   ├── MemoryStore (on-heap/off-heap)
│   └── DiskStore
└── ShuffleManager├── SortShuffleWriter└── UnsafeShuffleWriter

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/93764.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/93764.shtml
英文地址,請注明出處:http://en.pswp.cn/web/93764.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

rabbitmq發送的延遲消息時間過長就立即消費了

RabbitMQ延遲消息在設置過長時間后被立即消費的問題,通常與以下原因有關: TTL限制問題 RabbitMQ對消息TTL(Time To Live)有32位整數限制(0-4294967295毫秒),約49.7天。超過該值的延遲時間會導致消息立即被消費解決方案:確保設置的…

kafka的pull的依據

1. 每次 pull() 是否必須在提交上一批消息的 offset 之后?絕對不需要! 提交 offset 和調用 poll() (拉取消息) 是兩個完全獨立的行為。消費者可以連續調用 poll() 多次,期間完全不提交任何 offset。 這是 Kafka 消費者的正常工作模式。提交 o…

學習嵌入式的第二十一天——數據結構——鏈表

單向鏈表特點:存儲的內存空間不連續 。為了彌補順序存儲存劣勢。優勢 插入,刪除 O(1) 動態存儲 ,在程序運行期間決定大小。劣勢: 不能隨機訪問 O(N) 節點-> 數據域指針域 順序表(數組) 只有數據域鏈表的操作代碼&#xff1…

Rust Web 全棧開發(十三):發布

Rust Web 全棧開發(十三):發布Rust Web 全棧開發(十三):發布發布 teacher_service發布 svr測試 teacher_service 和 svr發布 wasm-client測試 wasm-clientRust Web 全棧開發(十三)&a…

Zephyr 中的 bt_le_per_adv_set_data 函數的介紹和應用方法

目錄 概述 1 函數接口介紹 1.1 函數原型 1.2 功能詳解 2 使用方法 2.1 創建流程 2.1.1 創建擴展廣播實例 2.1.2 設置周期性廣播數據 2.1.3 配置周期性廣播參數 2.1.4 啟動廣播 2.2 主流程函數 2.3 關鍵配置 (prj.conf) 3 高級用法 3.1 大數據分片傳輸 3.2 動態數…

Ansible 角色管理指南

Ansible 角色管理指南 實驗環境設置 以下命令用于準備實驗環境,創建一個工作目錄并配置基本的Ansible設置: # 創建web工作目錄并進入 [azurewhiskycontroller ~]$ mkdir web && cd web# 創建Ansible配置文件 [azurewhiskycontroller web]$ cat &…

【補充】數據庫中有關系統編碼和校驗規則的簡述

一、字符集和校驗規則1.創建數據庫案例數據庫創建方法:使用CREATE DATABASE語句創建數據庫字符集指定方式:通過CHARACTER SETutf8指定數據庫編碼格式默認配置說明:未指定字符集時默認使用utf8和utf8_general_ci配置文件位置&…

計算機網絡 HTTP1.1、HTTP2、HTTP3 的核心對比及性能分析

以下是 HTTP/1.1、HTTP/2、HTTP/3 的核心對比及性能分析,重點關注 HTTP/3 的性能優勢:📊 HTTP 協議演進對比表特性HTTP/1.1 (1997)HTTP/2 (2015)HTTP/3 (2022)傳輸層協議TCPTCPQUIC (基于 UDP)連接建立TCP 三次握手 TLS 握手 (高延遲)同 HTT…

【計算機視覺與深度學習實戰】07基于Hough變換的答題卡識別技術:原理、實現與生物識別拓展(有完整代碼)

1. 引言 在人工智能和計算機視覺快速發展的今天,自動化圖像識別技術已經滲透到社會生活的各個角落。從工業質檢到醫學影像分析,從自動駕駛到教育評估,計算機視覺技術正在重塑我們與數字世界的交互方式。在這眾多應用中,答題卡識別技術作為教育信息化的重要組成部分,承載著…

《WASM驅動本地PDF與Excel預覽組件的深度實踐》

WASM為何能成為本地文件解析的核心載體,首先需要跳出“前端只能處理輕量任務”的固有認知,從“性能與兼容性平衡”的角度切入。PDF與Excel這類文件格式的解析,本質是對復雜二進制數據的解碼與重構——PDF包含嵌套的對象結構、字體渲染規則和矢量圖形描述,Excel則涉及單元格…

Oracle Free 實例重裝系統操作指南

之前申請了兩臺 x86 架構的 Oracle 機器,偶爾用來部署開源項目測試,有一臺在測試 SSH 相關功能時 “變磚”,網上看重裝系統發現很繁瑣就沒去打理,近期又想到這個機器,發現去年就有了官方重裝方法,簡單配置下…

Linux 基礎指令與權限管理

一、Linux 操作系統概述1.1 操作系統的核心價值操作系統的本質是 "使計算機更好用"。它作為用戶與硬件之間的中間層,負責內存管理、進程調度、文件系統管理和設備驅動管理等核心功能,讓用戶無需直接操作硬件即可完成復雜任務。在服務器領域&am…

深度學習-167-MCP技術之工具函數的設計及注冊到MCP服務器的兩種方式

文章目錄 1 MCP協議概述 1.1 MCP的原理 1.2 兩種主要的通信模式 2 工具函數的設計與實現 2.1 tools.py(工具函數) 2.2 工具函數的設計原則 2.3 工具函數的測試 3 MCP服務器的構建與配置 3.1 安裝mcp庫 3.2 main.py(MCP服務器) 3.2.1 方式一(add_tool方法) 3.2.2 方式二(@mcp.to…

哈希:兩數之和

問題描述:在一個整數數組中,找到兩數之和為target的兩個值,返回找到的兩個值的下標。 nums[3,3] target6 返回:[0,1] 說明:返回結果,索引無順序要求;有唯一的答案;不能使用兩次相…

PHP反序列化的CTF題目環境和做題復現第5集_POP鏈構造4

1 題目 下載yii2.0.37版本,https://github.com/yiisoft/yii2/releases/tag/2.0.37 放在phpstudy的www目錄下或ubuntu的/var/www/html的目錄下。 3 EXP <?php namespace PHPUnit\Framework\MockObject{class MockTrait {private $classCode = "system(whoami);php…

廣東省省考備考(第八十一天8.19)——資料分析、數量(強化訓練)

資料分析 錯題解析解析解析解析解析今日題目正確率&#xff1a;67% 數量&#xff1a;數學運算解析解析解析標記題解析今日題目正確率&#xff1a;80%

決策樹剪枝及數據處理

一、核心決策樹算法&#xff08;3 類主流算法&#xff09;1. ID3 算法&#xff1a;用 “信息增益” 選屬性ID3 是決策樹的 “開山鼻祖” 之一&#xff0c;它的核心邏輯是 “選能讓數據最‘純’的屬性”—— 這里的 “純” 用 “信息增益” 衡量。簡單說&#xff0c;“信息增益”…

Ansible 角色管理

環境準備# 創建一個叫web的文件夾并進入&#xff08;相當于新建一個工作目錄&#xff09;[lykcontroller ~]$ mkdir web && cd web?# 創建Ansible的配置文件ansible.cfg[lykcontroller web]$ cat > ansible.cfg <<EOF[defaults]remote_user lykinventory .…

Java面試準備指南!

現在已經是8月中旬了&#xff0c;秋招馬上就要開始了&#xff0c;不知道大家準備好了嗎&#xff1f;現階段找工作真的是千軍萬馬過獨木橋&#xff0c;沒有真本事&#xff0c;真的會被淘汰掉&#xff0c;現實就是如此的殘酷&#xff01; 為了能夠幫助到大家在秋招Java面試中脫穎…

Encoder-Decoder Model編碼器-解碼器模型

Encoder-Decoder編碼器-解碼器是一種深度學習模型&#xff0c;應用于圖像處理、語音識別、自然語言處理等領域。主要由編碼器和解碼器兩部分組成&#xff0c;這種結構能夠處理序列到序列的任務。編碼器-解碼器模型具備獨特的雙階段處理&#xff0c;先對輸入信息進行編碼&#x…