【原】Spark中Master源碼分析(一)

Master作為集群的Manager,對于集群的健壯運行發揮著十分重要的作用。下面,我們一起了解一下Master是聽從Client(Leader)的號召,如何管理好Worker的吧。

1.家當(靜態屬性)

1.設置一個守護單線程的消息發送器,
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
2.根據sparkConf得到hadoopConf
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.一個bool類型的標識,如果設置為true,那么app的執行將會盡量分步到盡可能多的worker上,否則app的執行將會先用完一個worker的資源,然后再使用下一個worker的資源
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
4.設置執行app默認的最大核數為Int類型的最大值
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
5.還有一些關于worker、driver、app等的字段信息,都比較簡單,限于篇幅限制就不一一列出了

2.技能(方法)

由于Master上本質上是一個RpcEndpoint,所以我們按照它的生命周期進行介紹。如果不明白,請看文章

Spark Rpc通信源碼分析 http://www.cnblogs.com/yourarebest/p/5297157.html

1.構造函數就是Master默認的主構造器
2.onStart方法,主要功能是啟動Jetty的WebUI服務,Rest服務、選出持久化引擎及持久化代理

override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
//啟動JettyServer并綁定webUI端口號
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//forwardMessageThread線程每1min中檢查Worker是否宕了
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//啟動Rest服務,默認端口6066
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
//返回綁定的端口號
restServerBoundPort = restServer.map(.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
//當metrics系統啟動后,將master和app的metrics servlet的hadnler給webui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化Spark的配置文件
val serializer = new JavaSerializer(conf)
//支持三種持久化引擎,將Spark的配置參數持久化,便于以后恢復使用
val (persistenceEngine
, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

3.onStop方法,停止master的metrics系統、停止app的metrics系統、取消異步執行的任務、停止WebUi服務、停止rest服務以及持久化引擎和選舉代理的停止。

override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
//避免異步發出的CompleteRecovery消息導致master的重啟
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}

還有一個重要的方法receive方法,留到下一篇吧。

轉載于:https://www.cnblogs.com/yourarebest/p/5312965.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/458153.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/458153.shtml
英文地址,請注明出處:http://en.pswp.cn/news/458153.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

XML——XML介紹和基本語法

from:https://blog.csdn.net/gavin_john/article/details/51511180 1.XML歷史 gml(1969)->sgml(1985)->html(1993)->xml(1998) 1969 gml(通用標記語言),主要目的是要在不同的機器之間進行通信的數據規范1985 sgml(標準通用標記語言)1993 htm…

Tomcat7.0安裝配置

很久沒有通過博客對學習所得進行記錄了。 現在將使用Tomcat的一些經驗和心得寫到這里,作為記錄和備忘。如果有朋友看到,也請不吝賜教。 首先,我個人使用的是apache-tomcat-7.0.27你可以下載使用,前提條件你需要安裝JDK1.6或者1.7都…

TIFF圖像文件格式詳解

from:https://www.cnblogs.com/gywei/p/3393816.html 1 什么是TIFF? TIFF是Tagged Image File Format的縮寫。在現在的標準中,只有TIFF存在, 其他的提法已經舍棄不用了。做為一種標記語言,TIFF與其他文件格式最大的不…

java 抽象工廠模式簡單實例

抽象工廠模式:提供一個創建一系列的相關的或者依賴的對象的接口,無需指定它們的具體實現類,具體的時間分別在子類工廠中產生。 類似于工廠模式:隔離了具體類的生產實現,使得替換具體的工廠實現類很容易。包含有以下模塊…

圖像處理之積分圖應用三(基于NCC快速相似度匹配算法)

from:https://blog.csdn.net/jia20003/article/details/53021614 圖像處理之積分圖應用三(基于NCC快速相似度匹配算法) 基于Normalized cross correlation(NCC)用來比較兩幅圖像的相似程度已經是一個常見的圖像處理手段。在工業生產環節檢測…

深入淺出地理解機器人手眼標定

from:https://blog.csdn.net/qq_16481211/article/details/79764730 所謂手眼系統,就是人眼鏡看到一個東西的時候要讓手去抓取,就需要大腦知道眼鏡和手的坐標關系。如果把大腦比作B,把眼睛比作A,把手比作C,如果A和B的…

centos 6.5 安裝 mongodb

官方給出的鏈接地址:https://docs.mongodb.org/manual/tutorial/install-mongodb-on-red-hat/ 安裝后重要的日志 win10 上使用mongochef連接不上數據庫 解決方案: 修改 /etc/mongod.conf 將bindIP 改為0.0.0.0 監聽外網轉載于:https://www.cnblogs.com/l…

scala學習資料

1. scala-sbt 構建工具: http://www.scala-sbt.org/0.13/docs/zh-cn/Directories.html 2. 資料: http://www.ibm.com/developerworks/cn/java/j-lo-funinscala2/ https://www.zhihu.com/question/34548588?sortcreated http://nerd-is.in/2013-09/scala…

opencv3/C++ 機器學習-SVM應用實例:藥品(膠囊)識別與分類

from:https://blog.csdn.net/akadiao/article/details/79278072 版權聲明:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/akadiao/article/details/79278072 問題描述: 現對6種不同顏色藥品(膠囊…

Elasticsearch 搜索不到數據問題(_mapping 設置)

需求 由于 kibana3 中,不支持直接在請求的 url 中設置搜索的 type (是不是我不知道???)。 為了支持特定 type 的搜索,所以我設置了個下每個 panel 的查詢語句,讓它增加一個&#xff…

SVM之交叉驗證【轉】

交叉驗證(CrossValidation)方法思想簡介 以下簡稱交叉驗證(Cross Validation)為CV.CV是用來驗證分類器的性能一種統計分析方法,基本思想是把在某種意義下將原始數據(dataset)進行分組,一部分做為訓練集(train set),另一部分做為驗證集(validation set),首先用訓練集對分類器進…

linux命令學習-1-less

less 工具也是對文件或其它輸出進行分頁顯示的工具,應該說是linux正統查看文件內容的工具,功能極其強大。less 的用法比起 more 更加的有彈性。在 more 的時候,我們并沒有辦法向前面翻, 只能往后面看,但若使用了 less …

python問題匯總

問題1:如何解決python3中numpy報錯No module named numpy 打開terminal pip3 install numpy 問題2:ModuleNotFoundError No module named matplotlib 打開terminal pip3 install matplotlib

jspspy database help

.轉載于:https://www.cnblogs.com/outline/p/5316051.html

SVM 調參策略

轉自:SVM 調參策略:https://blog.csdn.net/u014484783/article/details/78220646 SVM 怎樣能得到好的結果 1. 對數據做歸一化(simple scaling) 2. 應用 RBF kernel 3. 用cross-validation和grid-search 得到最優的c和g 4. 用…

美好的?天 從ActionTab開始 美觀、智能、?效的新標簽? iTab 新標簽頁iTab新標簽頁Atop100工具推薦

文章目錄 ActionTabiTab 新標簽頁iTab新標簽頁,小組件,起始頁,標簽頁,日歷,股票,瀏覽器擴展 https://www.actiontab.cn/ ActionTab 收費???? iTab 新標簽頁iT…

Oracle學習之merge

--使用merge語句 create table new as select * from emp where 10; insert into new (empno,ename) select empno,ename from emp where deptno10;merge into new n using emp e on (n.empnoe.empno) when matched then update set n.sale.salwhen not matched then insert (…

機器學習中的算法(2)-支持向量機(SVM)基礎

from:http://www.cnblogs.com/LeftNotEasy/archive/2011/05/18/2034566.html 版權聲明: 本文由LeftNotEasy發布于http://leftnoteasy.cnblogs.com, 本文可以被全部的轉載或者部分使用,但請注明出處,如果有問題,請聯系wheeleastgm…

HDU 2586 How far away ?【LCA】

題目鏈接: http://acm.hdu.edu.cn/showproblem.php?pid2586 題意: 無向圖,給定邊及邊權重,任意兩點之間都有一條唯一的道路,道路上每個點只能出現一次。給定詢問,求詢問的結點之間的距離。 分析&#xff1…

深入理解拉格朗日乘子法(Lagrange Multiplier) 和KKT條件

from:https://blog.csdn.net/xianlingmao/article/details/7919597 在求取有約束條件的優化問題時,拉格朗日乘子法(Lagrange Multiplier) 和KKT條件是非常重要的兩個求取方法,對于等式約束的優化問題,可以應用拉格朗日乘子法去求…