任務提交流程
概述
在闡明了Spark的Master的啟動流程與Worker啟動流程。接下繼續執行的就是Worker上的Executor進程了,本文繼續分析整個Executor的啟動與任務提交流程
Spark-submit
提交一個任務到集群通過的是Spark-submit
通過啟動腳本的方式啟動它的主類,這里以WordCount為例子spark-submit --class cn.apache.spark.WordCount
- bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 調用這個類的main方法
- doRunMain方法中傳進來一個自定義spark應用程序的main方法
class cn.apache.spark.WordCount
- 通過反射拿到類的實例的引用
mainClass = Utils.classForName(childMainClass)
- 在通過反射調用
class cn.apache.spark.WordCount
的main
方法
我們來看SparkSubmit的main方法
def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {printStream.println(appArgs)}//匹配任務類型appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}
這里的類型是submit,調用submit方法
private[spark] def submit(args: SparkSubmitArguments): Unit = {val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)def doRunMain(): Unit = {。。。。。。try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {//childMainClass這個你自己定義的App的main所在的全類名runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)}})} catch {。。。。。。 }} 。。。。。。。//掉用上面的doRunMaindoRunMain()}
submit里調用了doRunMain(),然后調用了runMain,來看runMain
private def runMain(。。。。。。try {//通過反射mainClass = Class.forName(childMainClass, true, loader)} catch {。。。。。。}//反射拿到面方法實例val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)if (!Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}。。。。。。try {//調用App的main方法mainMethod.invoke(null, childArgs.toArray)} catch {case t: Throwable =>throw findCause(t)}}
最主要的流程就在這里了,上面的代碼注釋很清楚,通過反射調用我們寫的類的main方法,大體的流程到此
SparkSubmit時序圖
Executor啟動流程
SparkSubmit通過反射調用了我們程序的main方法后,就開始執行我們的代碼
,一個Spark程序中需要創建SparkContext對象,我們就從這個對象開始
SparkContext的構造方法代碼很長,主要關注的地方如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {。。。。。。private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {//通過SparkEnv來創建createDriverEnvSparkEnv.createDriverEnv(conf, isLocal, listenerBus)}//在這里調用了createSparkEnv,返回一個SparkEnv對象,這個對象里面有很多重要屬性,最重要的ActorSystemprivate[spark] val env = createSparkEnv(conf, isLocal, listenerBus)SparkEnv.set(env)//創建taskScheduler// Create and start the schedulerprivate[spark] var (schedulerBackend, taskScheduler) =SparkContext.createTaskScheduler(this, master)//創建DAGSchedulerdagScheduler = new DAGScheduler(this)//啟動TaksSchedulertaskScheduler.start()。。。。。
}
Spark的構造方法主要干三件事,創建了一個SparkEnv,taskScheduler,dagScheduler,我們先來看createTaskScheduler
里干了什么
//通過給定的URL創建TaskSchedulerprivate def createTaskScheduler(.....//匹配URL選擇不同的方式master match {。。。。。。//這個是Spark的Standalone模式case SPARK_REGEX(sparkUrl) =>//首先創建TaskSchedulerval scheduler = new TaskSchedulerImpl(sc)val masterUrls = sparkUrl.split(",").map("spark://" + _)//很重要val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)//初始化了一個調度器,默認是FIFOscheduler.initialize(backend)(backend, scheduler)。。。。。}
}
通過master的url來匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackend和TaskSchedulerImpl,這兩個對象很重要,是啟動任務調度的核心,然后調用了scheduler.initialize(backend)
進行初始化
啟動TaksScheduler初始化完成,回到我們的SparkContext構造方法后面繼續調用了taskScheduler.start()
啟動TaksScheduler
來看start方法
override def start() {//調用backend的實現的start方法backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {Utils.tryOrExit { checkSpeculatableTasks() }}}}
這里的backend是SparkDeploySchedulerBackend調用了它的start
override def start() {//CoarseGrainedSchedulerBackend的start方法,在這個方法里面創建了一個DriverActorsuper.start()// The endpoint for executors to talk to us//下面是為了啟動java子進程做準備,準備一下參數val driverUrl = AkkaUtils.address(AkkaUtils.protocol(actorSystem),SparkEnv.driverActorSystemName,conf.get("spark.driver.host"),conf.get("spark.driver.port"),CoarseGrainedSchedulerBackend.ACTOR_NAME)val args = Seq("--driver-url", driverUrl,"--executor-id", "{{EXECUTOR_ID}}","--hostname", "{{HOSTNAME}}","--cores", "{{CORES}}","--app-id", "{{APP_ID}}","--worker-url", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)// When testing, expose the parent class path to the child. This is processed by// compute-classpath.{cmd,sh} and makes all needed jars available to child processes// when the assembly is built with the "*-provided" profiles enabled.val testingClassPath =if (sys.props.contains("spark.testing")) {sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq} else {Nil}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOpts//用command拼接參數,最終會啟動org.apache.spark.executor.CoarseGrainedExecutorBackend子進程val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")//用ApplicationDescription封裝了一些重要的參數val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir, sc.eventLogCodec)//在這里面創建ClientActorclient = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)//啟動ClientActorclient.start()waitForRegistration()}
這里是拼裝了啟動Executor的一些參數,類名+參數 封裝成ApplicationDescription。最后傳給并創建AppClient并調用它的start方法
AppClient創建時序圖
AppClient的start方法
接來下關注start方法
def start() {// Just launch an actor; it will call back into the listener.actor = actorSystem.actorOf(Props(new ClientActor))}
在start方法里創建了與Master通信的ClientActor,然后會調用它的preStart方法向Master注冊,接下來看它的preStart
override def preStart() {context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])try {//ClientActor向Master注冊registerWithMaster()} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()context.stop(self)}}
最后會調用該方法向所有Master注冊
def tryRegisterAllMasters() {for (masterAkkaUrl <- masterAkkaUrls) {logInfo("Connecting to master " + masterAkkaUrl + "...")//t通過actorSelection拿到了Master的引用val actor = context.actorSelection(masterAkkaUrl)//向Master發送異步的注冊App的消息actor ! RegisterApplication(appDescription)}}
ClientActor發送來的注冊App的消息,ApplicationDescription,他包含了需求的資源,要求啟動的Executor類名和一些參數
Master的Receiver
case RegisterApplication(description) => {if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//創建App sender:ClientActorval app = createApplication(description, sender)//注冊AppregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//持久化ApppersistenceEngine.addApplication(app)//向ClientActor反饋信息,告訴他app注冊成功了sender ! RegisteredApplication(app.id, masterUrl)//TODO 調度任務schedule()}}
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = {val appAddress = app.driver.path.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//把App放到集合里面applicationMetricsSystem.registerSource(app.appSource)apps += appidToApp(app.id) = appactorToApp(app.driver) = appaddressToApp(appAddress) = appwaitingApps += app}
Master將接受的信息保存到集合并序列化后發送一個RegisteredApplication
消息通知反饋給ClientActor,接著執行schedule()方法,該方法中會遍歷workers集合,并執行launchExecutor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//記錄該worker上使用了多少資源worker.addExecutor(exec)//Master向Worker發送啟動Executor的消息worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)//Master向ClientActor發送消息,告訴ClientActor executor已經啟動了exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)}
這里Master向Worker發送啟動Executor的消息
`worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)`
application.desc里包含了Executor類的啟動信息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>。。。。。appDirectories(appId) = appLocalDirs//創建一個ExecutorRunner,這個很重要,保存了Executor的執行配置和參數val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,akkaUrl,conf,appLocalDirs, ExecutorState.LOADING)executors(appId + "/" + execId) = manager//TODO 開始啟動ExecutorRunnermanager.start()。。。。。。}}}
Worker的Receiver接受到了啟動Executor的消息,appDesc對象保存了Command命令、Executor的實現類和參數
manager.start()
里會創建一個線程
def start() {//啟動一個線程workerThread = new Thread("ExecutorRunner for " + fullId) {//用一個子線程來幫助Worker啟動Executor子進程override def run() { fetchAndRunExecutor() }}workerThread.start()// Shutdown hook that kills actors on shutdown.shutdownHook = new Thread() {override def run() {killProcess(Some("Worker shutting down"))}}Runtime.getRuntime.addShutdownHook(shutdownHook)}
在線程中調用了fetchAndRunExecutor()
方法,我們來看該方法
def fetchAndRunExecutor() {try {// Launch the processval builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,sparkHome.getAbsolutePath, substituteVariables)//構建命令val command = builder.command()logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))builder.directory(executorDir)builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))// In case we are running this from within the Spark Shell, avoid creating a "scala"// parent process for the executor commandbuilder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urlsval baseUrl =s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//啟動子進程process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(command.mkString("\"", "\" \"", "\""), "=" * 40)// Redirect its stdout and stderr to filesval stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)// or with nonzero exit code//開始執行,等待結束信號val exitCode = process.waitFor()。。。。}}
這里面進行了類名和參數的拼裝,具體拼裝過程不用關心,最終builder.start()
會以SystemRuntime的方式啟動一個子進程,這個是進程的類名是CoarseGrainedExecutorBackend
到此Executor進程就啟動起來了
Executor創建時序圖
Executor任務調度對象啟動
Executor進程后,就首先要執行main方法,main的代碼如下
//Executor進程啟動的入口def main(args: Array[String]) {。。。。//拼裝參數while (!argv.isEmpty) {argv match {case ("--driver-url") :: value :: tail =>driverUrl = valueargv = tailcase ("--executor-id") :: value :: tail =>executorId = valueargv = tailcase ("--hostname") :: value :: tail =>hostname = valueargv = tailcase ("--cores") :: value :: tail =>cores = value.toIntargv = tailcase ("--app-id") :: value :: tail =>appId = valueargv = tailcase ("--worker-url") :: value :: tail =>// Worker url is used in spark standalone mode to enforce fate-sharing with workerworkerUrl = Some(value)argv = tailcase ("--user-class-path") :: value :: tail =>userClassPath += new URL(value)argv = tailcase Nil =>case tail =>System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")printUsageAndExit()}}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||appId == null) {printUsageAndExit()}//開始執行Executorrun(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)}
執行了run方法
private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) 。。。。。//通過actorSystem創建CoarseGrainedExecutorBackend -> Actor//CoarseGrainedExecutorBackend -> DriverActor通信env.actorSystem.actorOf(Props(classOf[CoarseGrainedExecutorBackend],driverUrl, executorId, sparkHostPort, cores, userClassPath, env),name = "Executor")。。。。。。}env.actorSystem.awaitTermination()}}
run方法中創建了CoarseGrainedExecutorBackend的Actor對象用于準備和DriverActor通信,接著會繼續調用preStart生命周期方法
override def preStart() {logInfo("Connecting to driver: " + driverUrl)//Executor跟DriverActor建立連接driver = context.actorSelection(driverUrl)//Executor向DriverActor發送消息driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])}
Executor向DriverActor發送注冊的消息 driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
DriverActor的receiver收到消息后
def receiveWithLogging = {//Executor發送給DriverActor的注冊消息case RegisterExecutor(executorId, hostPort, cores, logUrls) =>Utils.checkHostPort(hostPort, "Host port expected " + hostPort)if (executorDataMap.contains(executorId)) {sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)} else {logInfo("Registered executor: " + sender + " with ID " + executorId)//DriverActor向Executor發送注冊成功的消息sender ! RegisteredExecutoraddressToExecutorId(sender.path.address) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val (host, _) = Utils.parseHostPort(hostPort)//將Executor的信息封裝起來val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executorsCoarseGrainedSchedulerBackend.this.synchronized {//往集合添加Executor的信息對象executorDataMap.put(executorId, data)if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))//將來用來執行真正的業務邏輯makeOffers()}
DriverActor的receiver里將Executor信息封裝到Map中保存起來,并發送反饋消息 sender ! RegisteredExecutor
給CoarseGrainedExecutorBackend
override def receiveWithLogging = {case RegisteredExecutor =>logInfo("Successfully registered with driver")val (hostname, _) = Utils.parseHostPort(hostPort)executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
CoarseGrainedExecutorBackend收到消息后創建一個Executor對象用于準備任務的執行,到此Executor的創建就完成了,接下來下篇介紹任務的調度。