原文地址:http://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-scheduler%E6%A8%A1%E5%9D%97/
?
?
Background
Spark在資源管理和調度方式上采用了類似于Hadoop?YARN的方式,最上層是資源調度器,它負責分配資源和調度注冊到Spark中的所有應用,Spark選用Mesos或是YARN等作為其資源調度框架。在每一個應用內部,Spark又實現了任務調度器,負責任務的調度和協調,類似于MapReduce。本質上,外層的資源調度和內層的任務調度相互獨立,各司其職。本文對于Spark的源碼分析主要集中在內層的任務調度器上,分析Spark任務調度器的實現。
Scheduler模塊整體架構
scheduler
模塊主要分為兩大部分:
-
TaskSchedulerListener
。TaskSchedulerListener
部分的主要功能是監聽用戶提交的job,將job分解為不同的類型的stage以及相應的task,并向TaskScheduler
提交task。 -
TaskScheduler
。TaskScheduler
接收用戶提交的task并執行。而TaskScheduler
根據部署的不同又分為三個子模塊:ClusterScheduler
LocalScheduler
MesosScheduler
TaskSchedulerListener
Spark抽象了TaskSchedulerListener
并在其上實現了DAGScheduler
。DAGScheduler
的主要功能是接收用戶提交的job,將job根據類型劃分為不同的stage,并在每一個stage內產生一系列的task,向TaskScheduler
提交task。下面我們首先來看一下TaskSchedulerListener
部分的類圖:
- 用戶所提交的job在得到
DAGScheduler
的調度后,會被包裝成ActiveJob
,同時會啟動JobWaiter
阻塞監聽job的完成狀況。 - 于此同時依據job中
RDD
的dependency和dependency屬性(NarrowDependency
,ShufflerDependecy
),DAGScheduler
會根據依賴關系的先后產生出不同的stage DAG(result stage, shuffle map stage)。 - 在每一個stage內部,根據stage產生出相應的task,包括
ResultTask
或是ShuffleMapTask
,這些task會根據RDD
中partition的數量和分布,產生出一組相應的task,并將其包裝為TaskSet
提交到TaskScheduler
上去。
RDD的依賴關系和Stage的分類
在Spark中,每一個
RDD
是對于數據集在某一狀態下的表現形式,而這個狀態有可能是從前一狀態轉換而來的,因此換句話說這一個RDD
有可能與之前的RDD(s)
有依賴關系。根據依賴關系的不同,可以將RDD
分成兩種不同的類型:Narrow Dependency
和Wide Dependency
。
Narrow Dependency
指的是?child RDD
只依賴于parent RDD(s)
固定數量的partition。Wide Dependency
指的是child RDD
的每一個partition都依賴于parent RDD(s)
所有partition。它們之間的區別可參看下圖:
![]()
根據
RDD
依賴關系的不同,Spark也將每一個job分為不同的stage,而stage之間的依賴關系則形成了DAG。對于Narrow Dependency
,Spark會盡量多地將RDD
轉換放在同一個stage中;而對于Wide Dependency
,由于Wide Dependency
通常意味著shuffle操作,因此Spark會將此stage定義為ShuffleMapStage
,以便于向MapOutputTracker
注冊shuffle操作。對于stage的劃分可參看下圖,Spark通常將shuffle操作定義為stage的邊界。
DAGScheduler
在用戶創建SparkContext
對象時,Spark會在內部創建DAGScheduler
對象,并根據用戶的部署情況,綁定不同的TaskSechduler
,并啟動DAGcheduler
private var taskScheduler: TaskScheduler = {//... } taskScheduler.start() private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start()
而DAGScheduler
的啟動會在內部創建daemon線程,daemon線程調用run()
從block queue中取出event進行處理。
而run()
會調用processEvent
來處理不同的event。?
private def run() {SparkEnv.set(env)while (true) {val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)if (event != null) {logDebug("Got event of type " + event.getClass.getName)}if (event != null) {if (processEvent(event)) {return}}val time = System.currentTimeMillis() // TODO: use a pluggable clock for testabilityif (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {resubmitFailedStages()} else {submitWaitingStages()}} }
?
DAGScheduler
處理的event包括:
JobSubmitted
CompletionEvent
ExecutorLost
TaskFailed
StopDAGScheduler
根據event的不同調用不同的方法去處理。
本質上DAGScheduler
是一個生產者-消費者模型,用戶和TaskSchduler
產生event將其放入block queue,daemon線程消費event并處理相應事件。
Job的生與死
既然用戶提交的job最終會交由DAGScheduler
去處理,那么我們就來研究一下DAGScheduler
處理job的整個流程。在這里我們分析兩種不同類型的job的處理流程。
1.沒有shuffle和reduce的job
val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()
?
2.有shuffle和reduce的job
val textFile = sc.textFile("README.md")textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
?
首先在對RDD
的count()
和reduceByKey()
操作都會調用SparkContext
的runJob()
來提交job,而SparkContext
的runJob()
最終會調用DAGScheduler
的runJob()
runJob()
會調用prepareJob()
對job進行預處理,封裝成JobSubmitted
事件,放入queue中,并阻塞等待job完成
def runJob[T, U: ClassManifest](finalRdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: String,allowLocal: Boolean,resultHandler: (Int, U) => Unit) {if (partitions.size == 0) {return}val (toSubmit, waiter) = prepareJob(finalRdd, func, partitions, callSite, allowLocal, resultHandler)eventQueue.put(toSubmit)waiter.awaitResult() match {case JobSucceeded => {}case JobFailed(exception: Exception) =>logInfo("Failed to run " + callSite)throw exception} }
?
當daemon線程的processEvent()
從queue中取出JobSubmitted
事件后,會根據job劃分出不同的stage,并且提交stage:
首先,對于任何的job都會產生出一個finalStage
來產生和提交task。其次對于某些簡單的job,它沒有依賴關系,并且只有一個partition,這樣的job會使用local thread處理而并非提交到TaskScheduler
上處理。
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>val runId = nextRunId.getAndIncrement()val finalStage = newStage(finalRDD, None, runId)val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)clearCacheLocs()if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {runLocally(job)} else {activeJobs += jobresultStageToJob(finalStage) = jobsubmitStage(finalStage)}
其次,產生finalStage
后,需要調用submitStage()
,它根據stage之間的依賴關系得出stage DAG,并以依賴關系進行處理:
private def submitStage(stage: Stage) {if (!waiting(stage) && !running(stage) && !failed(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)if (missing == Nil) {submitMissingTasks(stage)running += stage} else {for (parent <- missing) {submitStage(parent)}waiting += stage}} }
?
對于新提交的job,finalStage
的parent stage還未獲得,因此submitStage
會調用getMissingParentStages()
來獲得依賴關系:
private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddif (getCacheLocs(rdd).contains(Nil)) {for (dep <- rdd.dependencies) {dep match {case shufDep: ShuffleDependency[_,_] =>val mapStage = getShuffleMapStage(shufDep, stage.priority)if (!mapStage.isAvailable) {missing += mapStage}case narrowDep: NarrowDependency[_] =>visit(narrowDep.rdd)}}}}}visit(stage.rdd)missing.toList }
這里parent stage是通過RDD
的依賴關系遞歸遍歷獲得。對于Wide Dependecy
也就是Shuffle Dependecy
,Spark會產生新的mapStage
作為finalStage
的parent,而對于Narrow Dependecy
?Spark則不會產生新的stage。這里對stage的劃分是按照上面提到的作為劃分依據的,因此對于本段開頭提到的兩種job,第一種job只會產生一個finalStage
,而第二種job會產生finalStage
和mapStage
。
?
當stage DAG產生以后,針對每個stage需要產生task去執行,故在這會調用submitMissingTasks()
:
private def submitMissingTasks(stage: Stage) {val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)myPending.clear()var tasks = ArrayBuffer[Task[_]]()if (stage.isShuffleMap) {for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {val locs = getPreferredLocs(stage.rdd, p)tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)}} else {val job = resultStageToJob(stage)for (id <- 0 until job.numPartitions if (!job.finished(id))) {val partition = job.partitions(id)val locs = getPreferredLocs(stage.rdd, partition)tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)}}if (tasks.size > 0) {myPending ++= taskstaskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))if (!stage.submissionTime.isDefined) {stage.submissionTime = Some(System.currentTimeMillis())}} else {running -= stage} }
首先根據stage所依賴的RDD
的partition的分布,會產生出與partition數量相等的task,這些task根據partition的locality進行分布;其次對于finalStage
或是mapStage
會產生不同的task;最后所有的task會封裝到TaskSet
內提交到TaskScheduler
去執行。
至此job在DAGScheduler
內的啟動過程全部完成,交由TaskScheduler
執行task,當task執行完后會將結果返回給DAGScheduler
,DAGScheduler
調用handleTaskComplete()
處理task返回:
private def handleTaskCompletion(event: CompletionEvent) {val task = event.taskval stage = idToStage(task.stageId)def markStageAsFinished(stage: Stage) = {val serviceTime = stage.submissionTime match {case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)case _ => "Unkown"}logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))running -= stage}event.reason match {case Success =>...task match {case rt: ResultTask[_, _] =>...case smt: ShuffleMapTask =>...}case Resubmitted =>...case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>...case other =>abortStage(idToStage(task.stageId), task + " failed: " + other)} }
每個執行完成的task都會將結果返回給DAGScheduler
,DAGScheduler
根據返回結果來進行進一步的動作。
RDD的計算
RDD
的計算是在task中完成的。我們之前提到task分為ResultTask
和ShuffleMapTask
,我們分別來看一下這兩種task具體的執行過程。
-
ResultTask
?
override def run(attemptId: Long): U = {val context = new TaskContext(stageId, partition, attemptId)try {func(context, rdd.iterator(split, context))} finally {context.executeOnCompleteCallbacks()}}
?
-
ShuffleMapTask
override def run(attemptId: Long): MapStatus = {val numOutputSplits = dep.partitioner.numPartitionsval taskContext = new TaskContext(stageId, partition, attemptId)try {val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])for (elem <- rdd.iterator(split, taskContext)) {val pair = elem.asInstanceOf[(Any, Any)]val bucketId = dep.partitioner.getPartition(pair._1)buckets(bucketId) += pair}val compressedSizes = new Array[Byte](numOutputSplits)val blockManager = SparkEnv.get.blockManagerfor (i <- 0 until numOutputSplits) {val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + ival iter: Iterator[(Any, Any)] = buckets(i).iteratorval size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)compressedSizes(i) = MapOutputTracker.compressSize(size)}return new MapStatus(blockManager.blockManagerId, compressedSizes)} finally {taskContext.executeOnCompleteCallbacks()}}
ResultTask
和ShuffleMapTask
都會調用RDD
的iterator()
來計算和轉換RDD
,不同的是:ResultTask
轉換完RDD
后調用func()
計算結果;而ShufflerMapTask
則將其放入blockManager
中用來shuffle。
?
RDD
的計算調用iterator()
,iterator()
在內部調用compute()
從RDD
依賴關系的根開始計算:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)} } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {if (isCheckpointed) {firstParent[T].iterator(split, context)} else {compute(split, context)} }?
至此大致分析了TaskSchedulerListener
,包括DAGScheduler
內部的結構,job生命周期內的活動,RDD
是何時何地計算的。接下來我們分析一下task在TaskScheduler
內干了什么。
TaskScheduler
前面也提到了Spark實現了三種不同的TaskScheduler
,包括LocalSheduler
、ClusterScheduler
和MesosScheduler
。LocalSheduler
是一個在本地執行的線程池,DAGScheduler
提交的所有task會在線程池中被執行,并將結果返回給DAGScheduler
。MesosScheduler
依賴于Mesos進行調度,筆者對Mesos了解甚少,因此不做分析。故此章節主要分析ClusterScheduler
模塊。
ClusterScheduler
模塊與deploy模塊和executor模塊耦合較為緊密,因此在分析ClUsterScheduler
時也會順帶介紹deploy和executor模塊。
首先我們來看一下ClusterScheduler
的類圖:
ClusterScheduler
的啟動會伴隨SparkDeploySchedulerBackend
的啟動,而backend會將自己分為兩個角色:首先是driver,driver是一個local運行的actor,負責與remote的executor進行通行,提交任務,控制executor;其次是StandaloneExecutorBackend
,Spark會在每一個slave node上啟動一個StandaloneExecutorBackend
進程,負責執行任務,返回執行結果。
ClusterScheduler的啟動
在SparkContext
實例化的過程中,ClusterScheduler
被隨之實例化,同時賦予其SparkDeploySchedulerBackend
:
master match {...case SPARK_REGEX(sparkUrl) =>val scheduler = new ClusterScheduler(this)val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)scheduler.initialize(backend)schedulercase LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>...case _ =>...} } taskScheduler.start()
?
ClusterScheduler
的啟動會啟動SparkDeploySchedulerBackend
,同時啟動daemon進程來檢查speculative task:
override def start() {backend.start()if (System.getProperty("spark.speculation", "false") == "true") {new Thread("ClusterScheduler speculation check") {setDaemon(true)override def run() {while (true) {try {Thread.sleep(SPECULATION_INTERVAL)} catch {case e: InterruptedException => {}}checkSpeculatableTasks()}}}.start()} }
?
SparkDeploySchedulerBacked
的啟動首先會調用父類的start()
,接著它會啟動client,并由client連接到master向每一個node的worker發送請求啟動StandaloneExecutorBackend
。這里的client、master、worker涉及到了deploy模塊,暫時不做具體介紹。而StandaloneExecutorBackend
則涉及到了executor模塊,它主要的功能是在每一個node創建task可以運行的環境,并讓task在其環境中運行。
override def start() {super.start()val driverUrl = "akka://spark@%s:%s/user/%s".format(System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),StandaloneSchedulerBackend.ACTOR_NAME)val args = Seq(driverUrl, "", "", "")val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)client = new Client(sc.env.actorSystem, master, appDesc, this)client.start() }
?
在StandaloneSchedulerBackend
中會創建DriverActor
,它就是local的driver,以actor的方式與remote的executor進行通信。
override def start() {val properties = new ArrayBuffer[(String, String)]val iterator = System.getProperties.entrySet.iteratorwhile (iterator.hasNext) {val entry = iterator.nextval (key, value) = (entry.getKey.toString, entry.getValue.toString)if (key.startsWith("spark.")) {properties += ((key, value))}}driverActor = actorSystem.actorOf(Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) }
?
在client實例化之前,會將StandaloneExecutorBackend
的啟動環境作為參數傳遞給client,而client啟動時會將此提交給master,由master分發給所有node上的worker,worker會配置環境并創建進程啟動StandaloneExecutorBackend
。
至此ClusterScheduler
的啟動,local driver的創建,remote executor環境的啟動所有過程都已結束,ClusterScheduler
等待DAGScheduler
提交任務。
ClusterScheduler提交任務
DAGScheduler
會調用ClusterScheduler
提交任務,任務會被包裝成TaskSetManager
并等待調度:
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = new TaskSetManager(this, taskSet)activeTaskSets(taskSet.id) = manageractiveTaskSetsQueue += managertaskSetTaskIds(taskSet.id) = new HashSet[Long]()if (hasReceivedTask == false) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered")} else {this.cancel()}}}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)}hasReceivedTask = true;}backend.reviveOffers() }
?
在任務提交的同時會啟動定時器,如果任務還未被執行,定時器持續發出警告直到任務被執行。
同時會調用StandaloneSchedulerBackend
的reviveOffers()
,而它則會通過actor向driver發送ReviveOffers
,driver收到ReviveOffers
后調用makeOffers()
:
// Make fake resource offers on just one executor def makeOffers(executorId: String) {launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {freeCores(task.executorId) -= 1executorActor(task.executorId) ! LaunchTask(task)} }
makeOffers()
會向ClusterScheduler
申請資源,并向executor提交LauchTask
請求。
接下來LaunchTask
會進入executor模塊,StandaloneExecutorBackend
在收到LaunchTask
請求后會調用Executor
執行task:
override def receive = {case RegisteredExecutor(sparkProperties) =>... case RegisterExecutorFailed(message) =>...case LaunchTask(taskDesc) =>logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>... } def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {threadPool.execute(new TaskRunner(context, taskId, serializedTask)) }
?
Executor
內部是一個線程池,每一個提交的task都會包裝為TaskRunner
交由threadpool執行:
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)extends Runnable {override def run() {SparkEnv.set(env)Thread.currentThread.setContextClassLoader(urlClassLoader)val ser = SparkEnv.get.closureSerializer.newInstance()logInfo("Running task ID " + taskId)context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)try {SparkEnv.set(env)Accumulators.clear()val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)updateDependencies(taskFiles, taskJars)val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)logInfo("Its generation is " + task.generation)env.mapOutputTracker.updateGeneration(task.generation)val value = task.run(taskId.toInt)val accumUpdates = Accumulators.valuesval result = new TaskResult(value, accumUpdates)val serializedResult = ser.serialize(result)logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)logInfo("Finished task ID " + taskId)} catch {case ffe: FetchFailedException => {val reason = ffe.toTaskEndReasoncontext.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))}case t: Throwable => {val reason = ExceptionFailure(t)context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))// TODO: Should we exit the whole executor here? On the one hand, the failed task may// have left some weird state around depending on when the exception was thrown, but on// the other hand, maybe we could detect that when future tasks fail and exit then.logError("Exception in task ID " + taskId, t)//System.exit(1)}}} }
其中task.run()
則真正執行了task中的任務,如前RDD的計算章節所述。返回值被包裝成TaskResult
返回。
至此task在ClusterScheduler
內運行的流程有了一個大致的介紹,當然這里略掉了許多異常處理的分支,但這不影響我們對主線的了解。
END
至此對Spark的Scheduler模塊的主線做了一個順藤摸瓜式的介紹,Scheduler模塊作為Spark最核心的模塊之一,充分體現了Spark與MapReduce的不同之處,體現了Spark DAG思想的精巧和設計的優雅。
當然Spark的代碼仍然在積極開發之中,當前的源碼分析在過不久后可能會變得沒有意義,但重要的是體會Spark區別于MapReduce的設計理念,以及DAG思想的應用。DAG作為對MapReduce框架的改進越來越受到大數據界的重視,hortonworks也提出了類似DAG的框架tez作為對MapReduce的改進。