詳解 Spark 編程之 RDD 依賴關系

一、依賴與血緣關系

在這里插入圖片描述

  • 依賴:兩個相鄰 RDD 之間的關系
  • 血緣關系:多個連續的 RDD 的依賴
  • 由于 RDD 不會保存數據,為了提高容錯性,每個 RDD 都會保存自己的血緣關系,一旦某個轉換過程出現錯誤,可以根據血緣關系重新從數據源開始讀取計算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血緣關系println(rdd1.dependencies) // 打印依賴關系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血緣關系println(rdd2.dependencies) // 打印依賴關系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血緣關系println(rdd3.dependencies) // 打印依賴關系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血緣關系println(rdd4.dependencies) // 打印依賴關系println("----------------------")}
}

二、寬窄依賴

  • 窄依賴:OneToOneDependency,表示每一個父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一個 Partition 使用,類比喻為獨生子女

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
    
  • 寬依賴:ShuffleDependency,表示同一個父 (上游) RDD 的 Partition 被子 (下游) RDD 的多個 Partition 依賴或者說子 RDD 的一個 Partition 需要父 RDD 的多個 Partition 的數據,所以會引起 Shuffle 操作,類比喻為多生

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false
    ) extends Dependency[Product2[K, V]] 
    

三、階段劃分

  • 窄依賴由于上游和下游的 RDD 分區是一對一的,所以整個的執行過程是不受其它分區執行結果的影響,每個分區只需要一個 task 就可以完成計算任務

在這里插入圖片描述

  • 寬依賴由于存在 shuffle 操作,下游的 RDD 分區的數據計算需要等待上游 RDD 相關分區的數據全部執行完成后才能開始,所以存在不同階段的劃分,上游和下游 RDD 的每個分區都需要一個 task 來完成計算任務,所有階段的劃分和執行順序可以由有向無環圖 (DAG) 的形式來表示
    在這里插入圖片描述

  • 階段劃分源碼:

    /**結論:1.默認會至少存在一個階段,即 resultStage,最后執行的階段2.當存在 shuffle 依賴時,每存在一個會增加一個階段(shuffleMapStage)3.階段的數量 = shuffle 依賴數量 + 1
    */
    // 行動算子觸發作業執行
    rdd.collect()// collect() 深入底層
    dagScheduler.runJob()// runJob() 中會調用 submitJob(),其中會調用 handleJobSubmitted()
    // handleJobSubmitted() 中的階段劃分
    try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {...
    }// createResultStage() 方法
    private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判斷是否有上一階段val id = nextStageId.getAndIncrement()val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一個 resultStage 階段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage
    }// getOrCreateParentStages(),判斷是否有上一階段
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):獲取當前 rdd 的 shuffle 依賴getShuffleDependencies(rdd).map { shuffleDep =>// 為 shuffle 依賴創建 ShuffleMapStage 階段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList
    }// getShuffleDependencies(rdd):獲取當前 rdd 的 shuffle 依賴
    private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents
    }
    

四、任務劃分

  • RDD 任務劃分中間分為:Application、Job、Stage 和 Task

    • Application:初始化一個 SparkContext 即生成一個 Application
    • Job:一個 Action 算子就會生成一個 Job
    • Stage:Stage 等于寬依賴 (ShuffleDependency) 的個數加 1
    • Task:一個 Stage 階段中,最后一個 RDD 的分區個數就是 Task 的個數
  • Application -> Job -> Stag e-> Task 之間每一層都是 1 對 n 的關系

  • 任務劃分源碼:

    val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}
    }//
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()//
    override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
    }
    

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/20336.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/20336.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/20336.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JavaScript實現粒子數字倒計時效果附完整注釋

<!DOCTYPE html> <html lang="en"><head><meta charset

隨身wifi網絡卡頓怎么解決?隨身WiFi哪個牌子的最好用?排名第一名的隨身WiFi!

對于隨身wifi靠不靠譜這個問題&#xff0c;網上一直存在爭議。很多人的隨身wifi網速不穩定&#xff0c;信號看著滿格就是上不了網。關于隨身wifi卡頓到底該怎么解決呢&#xff1f; 1.如果是設備網絡在一個地方上網速度很快&#xff0c;換一個地方網絡就不行了&#xff0c;很可能…

股票買賣II

股票買賣II 時間限制&#xff1a;1秒 內存限制&#xff1a;128M 題目描述 給定一個長度為N的數組&#xff0c;數組中的第i個數字表示一個給定股票在第i天的價格。 設計一個算法來計算你所能獲取的最大利潤。你可以盡可能地完成更多的交易&#xff08;多次買賣一支股票…

解析Java中1000個常用類:Readable類,你學會了嗎?

在 Java 編程中,處理輸入流是一個常見的需求。Java 提供了多種方式來處理輸入流,例如 InputStream、Reader 等類和接口。 而 Readable 接口是 Java 提供的一個簡單而強大的接口,用于表示可讀的字符序列。 本文將詳細介紹 Readable 接口的用途、實現原理、應用場景,并通過…

Linux學習筆記(清晰且清爽)

本文首次發布于個人博客 想要獲得最佳的閱讀體驗&#xff08;無廣告且清爽&#xff09;&#xff0c;請訪問本篇筆記 Linux安裝 關于安裝這里就不過多介紹了&#xff0c;安裝版本是CentOS 7&#xff0c;詳情安裝步驟見下述博客在VMware中安裝CentOS7&#xff08;超詳細的圖文教…

QT之全局忽略編譯警告QMAKE_CXXFLAGS

全局忽略編譯警告QMAKE_CXXFLAGS 這個是Qt中用來給編譯器傳遞開關的&#xff0c;常寫在’pro’文件或’pri’文件中。 將所有的警告當成錯誤處理 QMAKE_CXXFLAGS -Werror return-type //函數有返回值 QMAKE_CXXFLAGS -Werror return-local-addr //返回局部變量地址 QMAKE…

Dubbo架構概覽:服務注冊與發現、遠程調用、監控與管理

Dubbo 是一個成熟的、高性能的、基于 Java 的微服務開發框架&#xff0c;它主要用于解決分布式系統中的服務治理問題&#xff0c;包括服務的注冊與發現、遠程過程調用&#xff08;RPC&#xff09;、服務監控與管理等多個關鍵環節。以下是Dubbo架構概覽的詳細介紹&#xff1a; …

3種使用OpenCV進行圖像合成的技巧

準備好探索圖像世界的魔法了嗎&#xff1f;今天&#xff0c;我們將用Python和OpenCV庫&#xff0c;一起解鎖三種超炫的圖像合成技巧&#xff0c;讓你的照片變得與眾不同&#xff01;&#x1f308; 1. 圖像融合&#xff1a;讓風景與夢境交織 想象一下&#xff0c;把日出的輝煌…

【前端每日基礎】day33——響應式布局

響應式布局是一種網頁設計的方法&#xff0c;它可以使網站在不同的設備上&#xff08;如桌面電腦、平板電腦、手機等&#xff09;以及不同的屏幕尺寸上呈現出最佳的顯示效果。響應式布局的目標是使用戶在任何設備上都能夠方便地訪問和瀏覽網站&#xff0c;而不需要使用不同版本…

ios v品會 api-sign算法

vip品會 api-sign算法還原 ios入門案例 視頻系列 IOS逆向合集-前言嗶哩嗶哩bilibili 一、ios難度與安卓對比 這里直接復制 楊如畫大佬的文章的內容&#xff1a; ios難度與安卓對比 很多人說ios逆向比安卓簡單&#xff0c;有以下幾個原因 1 首先就是閉源&#xff0c;安卓開源…

PH編程入門:從基礎到實踐的全方位解析

PH編程入門&#xff1a;從基礎到實踐的全方位解析 PH編程&#xff0c;作為一種獨特而強大的編程語言&#xff0c;正逐漸在各個領域展現其巨大的潛力。對于初學者來說&#xff0c;如何快速入門并掌握PH編程的精髓&#xff0c;是一個既充滿挑戰又充滿機遇的過程。本文將從四個方…

vscode過濾器@modified(查看配置了哪些設置)

文檔 visualstudio?docs?getstarted?settingshttps://code.visualstudio.com/docs/getstarted/settings 說明 使用modified可以過濾出&#xff1a; 配置過的設置&#xff08;和默認值不同&#xff09;&#xff1b; 在 settings.json 文件中配置了值的設置 步驟 1.打開…

Ubuntu Linux 24.04 使用certbot生成ssl證書

設置域名 1. 將需要生成SSL證書的域名解析到IP地址 idealand.xyz <> 64.176.82.190 檢查防火墻的設置 1. 首先查看防火墻的狀態&#xff1a; # ufw status 2. 如果防火墻開啟了&#xff0c;要開放80和443端口用于certbot驗證 # ufw allow 80 # ufw allow 443 生…

Vue3實戰筆記(53)—奇怪+1,VUE3實戰模擬股票大盤工作臺

文章目錄 前言一、實戰模擬股票大盤工作臺二、使用步驟總結 前言 實戰模擬股票大盤工作臺 一、實戰模擬股票大盤工作臺 接上文&#xff0c;這兩天封裝好的組件直接應用,上源碼&#xff1a; <template><div class"smart_house pb-5"><v-row ><…

JS對象由淺入深

對象 對象&#xff08;Object&#xff09;&#xff1a;JavaScript里的一種數據類型&#xff08;引用類型&#xff09;&#xff0c;也是用于存儲數據的 好處&#xff1a;可以用來詳細的描述某個事物&#xff0c;是用鍵值對形式存儲語義更明了 特點&#xff1a;對象數據是無序的&…

模型 FABE(特性 優勢 好處 證據)法則

說明&#xff1a;系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思維模型目錄。特性、優勢、好處、證據&#xff0c;一氣呵成。 1 FABE法則的應用 1.1 FABE法則營銷商用跑步機 一家高端健身器材公司的銷售代表正在向一家新開的健身房推銷他們的商用跑步機。以下…

數控切割編程:探索精密制造的奧秘與挑戰

數控切割編程&#xff1a;探索精密制造的奧秘與挑戰 在現代化制造領域&#xff0c;數控切割編程以其高精度、高效率的特性&#xff0c;成為眾多行業不可或缺的工藝手段。然而&#xff0c;對于初學者或外行人來說&#xff0c;數控切割編程往往顯得神秘且復雜。本文將從四個方面…

【數據分享】中國電力年鑒(2004-2022)

大家好&#xff01;今天我要向大家介紹一份重要的中國電力統計數據資源——《中國電力年鑒》。這份年鑒涵蓋了從2004年到2022年中國電力統計全面數據&#xff0c;并提供限時免費下載。&#xff08;無需分享朋友圈即可獲取&#xff09; 數據介紹 自1993年首次出版以來&#xf…

【數據結構】鏈表與順序表的比較

不同點&#xff1a; 順序表和鏈表是兩種常見的數據結構&#xff0c;他們的不同點在于存儲方式和插入、刪除操作、隨機訪問、cpu緩存利用率等方面。 一、存儲方式不同: 順序表&#xff1a; 順序表的存儲方式是順序存儲&#xff0c;在內存中申請一塊連續的空間&#xff0c;通…

解決OpenCV讀取目標圖像,cv2.imshow出現閃退的問題

前言 本文是該專欄的第17篇,后面將持續分享OpenCV計算機視覺的干貨知識,記得關注。 最近有粉絲朋友詢問到OpenCV讀取目標圖像出現的一個問題,在基于python語言“使用OpenCV讀取目標圖像的時候,利用cv2.imshow函數出現閃退”的情況。 而本文,筆者將詳細介紹針對上述問題,…