大數據學習——akka自定義RPC

?

?

實現

package cn.itcast.akkaimport akka.actor.{Actor, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactoryimport scala.collection.mutableimport scala.concurrent.duration._

class Master(val host: String, val port: Int) extends Actor {//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//導入隱式轉換import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker發送個Mater的注冊消息case RegisterWorker(workerId, cores, memory) => {if (!idToWorker.contains(workerId)) {//封裝worker發送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反饋注冊成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker發送給Master的心跳信息case Heartbeat(workerId) => {if (idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳時間workerInfo.lastHeartbeatTime = currentTime}}//檢測超時的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)//      for(w <- deadWorkers) {//        idToWorker -= w.id//        workers -= w//      }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}}
}object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]) {//    val host = args(0)//    val port = args(1).toIntval host = "127.0.0.1"val port = 8888val confStr =s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是單例的,用于創建Acotor并監控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通過ActorSystem創建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()}
}
package cn.itcast.akka

trait Message extends Serializable//Worker -> Master
case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker
case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master
case class Heartbeat(id: String) extends Message//Worker internal message
case object SendHeartbeat//Master internal message
case object CheckTimeOutWorker
package cn.itcast.akkaimport java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._

class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在構造器之后receive之前執行override def preStart(): Unit = {//首先跟Master建立連接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通過master的引用向Master發送注冊消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master發送給Worker注冊成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//啟動定時任務,向Master發送心跳//導入隱式轉換import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master發送心跳master ! Heartbeat(workerId)}}
}object Worker {def main(args: Array[String]) {//Worker的地址和端口//    val host = args(0)//    val port = args(1).toInt//    val cores = args(2).toInt//    val memory = args(3).toIntval host = "127.0.0.1"val port = 9999val cores = 8val memory = 1024//Master的地址和端口//    val masterHost = args(4)//    val masterPort = args(5).toIntval masterHost = "127.0.0.1"val masterPort = 8888val confStr =s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通過actorSystem來創建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()}
}
package cn.itcast.akka
class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _}

?

?

轉載于:https://www.cnblogs.com/feifeicui/p/10996077.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/248765.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/248765.shtml
英文地址,請注明出處:http://en.pswp.cn/news/248765.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從Client應用場景介紹IdentityServer4(一)

從Client應用場景介紹IdentityServer4&#xff08;一&#xff09; 原文:從Client應用場景介紹IdentityServer4&#xff08;一&#xff09;一、背景 IdentityServer4的介紹將不再敘述&#xff0c;百度下可以找到&#xff0c;且官網的快速入門例子也有翻譯的版本。這里主要從Clie…

開發常用代碼筆記

Vue 使用moment插件對時間進行格式化&#xff08;全局設置&#xff09; 下載插件 npm install moment --save 在main.js中引入插件 import moment from ‘moment’ 在main.js中定義全局過濾器 Vue.filter(dataFilter,function (dataStr,patten YYYY-MM-DD HH:mm:ss) {retur…

springboot 參數校驗詳解

https://www.jianshu.com/p/89a675b7c900 在日常開發寫rest接口時&#xff0c;接口參數校驗這一部分是必須的&#xff0c;但是如果全部用代碼去做&#xff0c;顯得十分麻煩&#xff0c;spring也提供了這部分功能,本文來探究一下如何實現 1.配置 spring-boot-starter-web包自動依…

微信小程序——賬號及開發工具

1. 注冊微信小程序賬號 點擊我進入微信公眾平臺 進入后點擊立即注冊 注冊成功且登錄后進入小程序管理后臺 2. 安裝開發者工具 點擊進入開發文檔 進入安裝開發工具&#xff08;穩定版本&#xff09; 一路默認下一步進行安裝 3. 開發者工具的使用 使用注冊微信小程序的微信號…

CSS注意的地方

content-box和border-box的區別 2018年02月27日 22:20:16 sulingliang 閱讀數&#xff1a;8011盒子模型 盒子寬度&#xff1a;paddingbordercontent-width 盒子高度&#xff1a;paddingbordercontent-height 如圖所示 盒子模型content-box 說明&#xff1a;在內容寬度和高度之…

機器學習筆記(6) 線性回歸

先從最簡單的例子開始,假設我們有一組樣本(如下圖的一個個黑色的圓點),只有一個特征,如下圖,橫軸是特征值,縱軸是label。比如橫軸是房屋面積,縱軸是房屋價格. 現在我們要做什么呢&#xff1f;我們試圖找到一條直線yaxb,可以盡量好的擬合這些點. 你可能要問了,為啥是直線,不是曲…

仿微信朋友圈項目梳理

項目功能簡介&#xff1a; 用戶通過手機號驗證碼進行登錄和注冊 可以瀏覽動態列表中的所有動態 登錄成功后用戶可以發表自己的動態 也可以對自己認可欣賞的動態進行點贊和評論 也可以通過動態結識志同道合的朋友 進行聊天和探討 前端&#xff1a;采用Vue框架搭建 weui進行頁面…

如何處理大流量高并發

1.動靜分離。 將網站中的靜態資源單獨拆分出來, 比如 css, js, 圖片, 視頻資源單獨存儲在一臺服務器上, 或者直接使用云存儲平臺, 七牛云或者阿里云之類的, 這樣能有效的降低主服務器的運行壓力 2.CDN加速。 云平臺提供 CDN 加速, 可以對資源進行全國服務器節點的分發, 提高全國…

echarts鼠標事件以及自定義數據獲取

事件添加方法&#xff1a; 對應官網位置&#xff1a;https://www.echartsjs.com/api.html#events 鼠標事件包括 click、dblclick、mousedown、mousemove、mouseup、mouseover、mouseout、globalout、contextmenu。 myChart.on(click, function (params) {console.log(params); …

[數學]點、線、面分割問題

平面分割問題 p條直線相交于一點時&#xff0c;分割的圖形有 2*(n-1) 個&#xff0c;此時再加一條直線&#xff0c;在 2*(n-1) 的基礎上再加 n條&#xff0c;此時為2*n n條曲線&#xff0c;其中有m條相交于一點&#xff0c;每兩個曲線都交于兩點 平面上有n條直線&#xff0c;且…

移動開發

1.移動端基礎 1.1 瀏覽器現狀 PC端瀏覽器 360瀏覽器、谷歌瀏覽器、火狐瀏覽器、QQ瀏覽器、百度瀏覽器&#xff08;停止服務&#xff09;、搜狗瀏覽器、IE瀏覽器 移動端瀏覽器 UC、QQ瀏覽器、歐朋瀏覽器、百度手機瀏覽器、360、搜狗、獵豹、谷歌等其他手機自帶的瀏覽器 國…

Django之路由系統

Django的路由系統 Django 1.11版本 URLConf官方文檔 URL配置(URLconf)就像Django 所支撐網站的目錄。它的本質是URL與要為該URL調用的視圖函數之間的映射表。 你就是以這種方式告訴Django&#xff0c;對于這個URL調用這段代碼&#xff0c;對于那個URL調用那段代碼。 URLconf配置…

微信小程序——操作數據庫

案例一&#xff1a;統計用戶的訪問次數 業務需求&#xff1a; 統計每個用戶對程序的訪問次數將訪問次數存儲到數據庫中訪問次數應該與用戶進行關聯 業務邏輯&#xff1a; 如果用戶是第一次訪問此程序&#xff0c;向數據庫添加一條記錄&#xff1a;{openid&#xff1a;45454…

shop--12.阿里云部署以及域名綁定

一、申請阿里云服務器&#xff08;1&#xff09;PC訪問阿里云https://www.aliyun.com/&#xff0c;申請阿里云帳號&#xff08;可以用您的支付寶帳號登錄&#xff0c;因為支付寶帳號已經進行了實名認證&#xff0c;使用起來更方便&#xff09;并登錄&#xff08;2&#xff09;找…

微信小程序——獲取用戶的運動步數

程序獲取用戶信息步驟 點擊參考微信文檔中的授權首先程序先向用戶申請訪問哪些權限用戶做出選擇后返回給程序程序攜帶權限訪問服務器如果用戶允許則返回信息如果用戶為允許則不返回 自定義函數getUserRun 為獲取用戶的微信運動數據 頁面加載調用此函數函數中執行下面操作 1…

C++之前置自增與后置自增

關于前置自增與后置自增的區別我是參考這里&#xff1a;http://bbs.bccn.net/thread-454977-1-1.html 簡單復述下&#xff0c;比如x; 與 x; 在C中&#xff0c;x這個表達式的值為原先x的值1&#xff0c;副作用是x的值增加了1&#xff1b;&#xff08;C中不是這樣定義的&#xff…

第一次個人作業

該作業所屬課程&#xff1a;https://edu.cnblogs.com/campus/xnsy/SoftwareEngineeringClass2作業要求地址&#xff1a;https://edu.cnblogs.com/campus/xnsy/SoftwareEngineeringClass2/homework/3340團隊名稱&#xff1a;腦殼痛 作業的目標 1.通過測試其他組的軟件項目學習其…

微信小程序——解決上傳并部署云函數時報錯ResourceNotFound.Function, 未找到函數版本,請創建后再試。 (7f2d9d2d-5eac-4575-9n57-acd66cfa587g

1. 上傳部署我們的云函數 2. 報錯 錯誤信息為&#xff1a;Error: ResourceNotFound.Function, 未找到函數版本&#xff0c;請創建后再試。 (7f2d9d2d-5eac-4575-9b57-acd66cfa587e) 3. 原因 原因是可能我們在調試的時候不小心將我們開發控制臺中的云函數刪除了 4. 解決辦法…

前端面試題——HTML基礎篇

如何進行網站的性能優化 content方面 減少http請求 合并文件 css精靈圖減少 DNS 查詢 DNS緩存 將資源分布到恰當數量的主機名減少 DOM 元素的數量 Server方面 使用CDN配置Etag對組件使用 Gzip 壓縮 Cookie方面 減小cookie大小 css方面 將樣式表放到頁面頂部不使用css表…

【IT界的廚子】醬香鱸魚

食材: 前世曾經回眸的鱸魚一條(主要選刺少的魚&#xff0c;適合孩子吃&#xff0c;大人吃隨意&#xff0c;草魚比較大) 五花肉少許(肥一些的) 豆腐 輔料: 蔥姜 蒜(選) 大料 香菜 調味: 啤酒(兩罐) 黃豆醬或豆瓣醬(選) 老抽 生抽 料酒 鹽 步驟: 1、魚肉劃開&#xff0c;方便燉的…