?
?
實現
package cn.itcast.akkaimport akka.actor.{Actor, ActorSystem, Props} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactoryimport scala.collection.mutableimport scala.concurrent.duration._ class Master(val host: String, val port: Int) extends Actor {//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//導入隱式轉換import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker發送個Mater的注冊消息case RegisterWorker(workerId, cores, memory) => {if (!idToWorker.contains(workerId)) {//封裝worker發送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反饋注冊成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker發送給Master的心跳信息case Heartbeat(workerId) => {if (idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳時間workerInfo.lastHeartbeatTime = currentTime}}//檢測超時的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)// for(w <- deadWorkers) {// idToWorker -= w.id// workers -= w// }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}} }object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]) {// val host = args(0)// val port = args(1).toIntval host = "127.0.0.1"val port = 8888val confStr =s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是單例的,用于創建Acotor并監控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通過ActorSystem創建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()} }
package cn.itcast.akka trait Message extends Serializable//Worker -> Master case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master case class Heartbeat(id: String) extends Message//Worker internal message case object SendHeartbeat//Master internal message case object CheckTimeOutWorker
package cn.itcast.akkaimport java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._ class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在構造器之后receive之前執行override def preStart(): Unit = {//首先跟Master建立連接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通過master的引用向Master發送注冊消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master發送給Worker注冊成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//啟動定時任務,向Master發送心跳//導入隱式轉換import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master發送心跳master ! Heartbeat(workerId)}} }object Worker {def main(args: Array[String]) {//Worker的地址和端口// val host = args(0)// val port = args(1).toInt// val cores = args(2).toInt// val memory = args(3).toIntval host = "127.0.0.1"val port = 9999val cores = 8val memory = 1024//Master的地址和端口// val masterHost = args(4)// val masterPort = args(5).toIntval masterHost = "127.0.0.1"val masterPort = 8888val confStr =s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通過actorSystem來創建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()} }
package cn.itcast.akka class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _}
?
?