一、依賴與血緣關系
- 依賴:兩個相鄰 RDD 之間的關系
- 血緣關系:多個連續的 RDD 的依賴
- 由于 RDD 不會保存數據,為了提高容錯性,每個 RDD 都會保存自己的血緣關系,一旦某個轉換過程出現錯誤,可以根據血緣關系重新從數據源開始讀取計算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血緣關系println(rdd1.dependencies) // 打印依賴關系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血緣關系println(rdd2.dependencies) // 打印依賴關系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血緣關系println(rdd3.dependencies) // 打印依賴關系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血緣關系println(rdd4.dependencies) // 打印依賴關系println("----------------------")}
}
二、寬窄依賴
-
窄依賴:OneToOneDependency,表示每一個父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一個 Partition 使用,類比喻為獨生子女
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
-
寬依賴:ShuffleDependency,表示同一個父 (上游) RDD 的 Partition 被子 (下游) RDD 的多個 Partition 依賴或者說子 RDD 的一個 Partition 需要父 RDD 的多個 Partition 的數據,所以會引起 Shuffle 操作,類比喻為多生
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false ) extends Dependency[Product2[K, V]]
三、階段劃分
- 窄依賴由于上游和下游的 RDD 分區是一對一的,所以整個的執行過程是不受其它分區執行結果的影響,每個分區只需要一個 task 就可以完成計算任務
-
寬依賴由于存在 shuffle 操作,下游的 RDD 分區的數據計算需要等待上游 RDD 相關分區的數據全部執行完成后才能開始,所以存在不同階段的劃分,上游和下游 RDD 的每個分區都需要一個 task 來完成計算任務,所有階段的劃分和執行順序可以由有向無環圖 (DAG) 的形式來表示
-
階段劃分源碼:
/**結論:1.默認會至少存在一個階段,即 resultStage,最后執行的階段2.當存在 shuffle 依賴時,每存在一個會增加一個階段(shuffleMapStage)3.階段的數量 = shuffle 依賴數量 + 1 */ // 行動算子觸發作業執行 rdd.collect()// collect() 深入底層 dagScheduler.runJob()// runJob() 中會調用 submitJob(),其中會調用 handleJobSubmitted() // handleJobSubmitted() 中的階段劃分 try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch {... }// createResultStage() 方法 private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判斷是否有上一階段val id = nextStageId.getAndIncrement()val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 至少存在一個 resultStage 階段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage }// getOrCreateParentStages(),判斷是否有上一階段 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):獲取當前 rdd 的 shuffle 依賴getShuffleDependencies(rdd).map { shuffleDep =>// 為 shuffle 依賴創建 ShuffleMapStage 階段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList }// getShuffleDependencies(rdd):獲取當前 rdd 的 shuffle 依賴 private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents }
四、任務劃分
-
RDD 任務劃分中間分為:Application、Job、Stage 和 Task
- Application:初始化一個 SparkContext 即生成一個 Application
- Job:一個 Action 算子就會生成一個 Job
- Stage:Stage 等于寬依賴 (ShuffleDependency) 的個數加 1
- Task:一個 Stage 階段中,最后一個 RDD 的分區個數就是 Task 的個數
-
Application -> Job -> Stag e-> Task 之間每一層都是 1 對 n 的關系
-
任務劃分源碼:
val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}} }// val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions) }