Master作為集群的Manager,對于集群的健壯運行發揮著十分重要的作用。下面,我們一起了解一下Master是聽從Client(Leader)的號召,如何管理好Worker的吧。
1.家當(靜態屬性)
1.設置一個守護單線程的消息發送器,
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
2.根據sparkConf得到hadoopConf
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.一個bool類型的標識,如果設置為true,那么app的執行將會盡量分步到盡可能多的worker上,否則app的執行將會先用完一個worker的資源,然后再使用下一個worker的資源
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
4.設置執行app默認的最大核數為Int類型的最大值
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
5.還有一些關于worker、driver、app等的字段信息,都比較簡單,限于篇幅限制就不一一列出了
2.技能(方法)
由于Master上本質上是一個RpcEndpoint,所以我們按照它的生命周期進行介紹。如果不明白,請看文章
Spark Rpc通信源碼分析 http://www.cnblogs.com/yourarebest/p/5297157.html
1.構造函數就是Master默認的主構造器
2.onStart方法,主要功能是啟動Jetty的WebUI服務,Rest服務、選出持久化引擎及持久化代理
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
//啟動JettyServer并綁定webUI端口號
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//forwardMessageThread線程每1min中檢查Worker是否宕了
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//啟動Rest服務,默認端口6066
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
//返回綁定的端口號
restServerBoundPort = restServer.map(.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
//當metrics系統啟動后,將master和app的metrics servlet的hadnler給webui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化Spark的配置文件
val serializer = new JavaSerializer(conf)
//支持三種持久化引擎,將Spark的配置參數持久化,便于以后恢復使用
val (persistenceEngine, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
3.onStop方法,停止master的metrics系統、停止app的metrics系統、取消異步執行的任務、停止WebUi服務、停止rest服務以及持久化引擎和選舉代理的停止。
override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
//避免異步發出的CompleteRecovery消息導致master的重啟
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}
還有一個重要的方法receive方法,留到下一篇吧。