參考資料
- https://doc.akka.io/libraries/akka-core/current/typed/actors.html#first-example
關于scala語法的注意事項
extends App
是個語法糖,等同于直接在伴生對象中編寫main 方法- 對象是通過apply方法創建的,也可以通過對象的名稱單獨創建(此時實際上會調用apply方法)
- case class 樣例類用于定義不可變類,可以用于模式匹配
- trait類似接口但是可以包括抽象方法,具體方法,子類。帶有sealed表示只能在定義它的同一個文件中被繼承,常用于更加安全的模式匹配,例如如果消息類型為sealed trait,則actor可以安全接受多種消息。
helloworld示例
代碼的整體示意圖如下
- HelloWorldMain創建ActorSystem,作為一個actorref指向HelloWorldMain actor。使用此引用向HelloWorldMain actor發送SayHello消息
- HelloWorldMain actor初始化Helloworld actor和HelloWorldBot,以及在收到SayHello消息后向HelloWorld actor發送Greet消息(其中帶有HelloWorldBot的actorref)
- HelloWorld actor收到消息后向HelloWorldBot發送Greeted消息
- HelloWorldBot actor收到消息后greetingCounter計數增加,并向HelloWorld actor返回Greet消息。當greetingCounter超過max時暫停行為。
代碼示例
//#imports
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors//#hello-world-actor
object HelloWorld {// 使用樣例類定義消息類型final case class Greet(whom: String, replyTo: ActorRef[Greeted])final case class Greeted(whom: String, from: ActorRef[Greet])// Behaviors.receive函數接收一個函數作為參數,{}是為了容納多行lambda表達式def apply(): Behavior[Greet] = Behaviors.receive { (context, message) =>context.log.info("Hello {}!", message.whom)message.replyTo ! Greeted(message.whom, context.self) // 向message.replyTo發送消息Greeted,其中context.self是自身的actorrefBehaviors.same // 設置后續的消息處理邏輯不變}
}//#hello-world-bot
object HelloWorldBot {def apply(max: Int): Behavior[HelloWorld.Greeted] = {bot(0, max)}private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =Behaviors.receive { (context, message) =>val n = greetingCounter + 1context.log.info("Greeting {} for {}", n, message.whom)if (n == max) {Behaviors.stopped // 到達max次數后停止行為,避免無限循環} else {message.from ! HelloWorld.Greet(message.whom, context.self)bot(n, max)}}
}//#hello-world-main
object HelloWorldMain {final case class SayHello(name: String)def apply(): Behavior[SayHello] =Behaviors.setup { context =>val greeter = context.spawn(HelloWorld(), "greeter") // 初始化HelloWorldBehaviors.receiveMessage { message => // 收到消息后創建HelloWorldBotval replyTo = context.spawn(HelloWorldBot(max = 3), message.name)greeter ! HelloWorld.Greet(message.name, replyTo)Behaviors.same}}def main(args: Array[String]): Unit = {val system: ActorSystem[HelloWorldMain.SayHello] =ActorSystem(HelloWorldMain(), "hello")system ! HelloWorldMain.SayHello("World")Thread.sleep(3000)system.terminate()}
}
chatroom示例
代碼的整體示意圖如下
- Main啟動后,初始化chatRoom和Gabbler客戶端。向ChatRoom發送GetSession消息(帶有client actorref)
- chatRoom創建session actor,用來隔離會話
- chatRoom向client發送SessionGranted消息(帶有session actorref)
- client(Gabbler)收到SessionGranted后向session actor發送PostMessage消息
- session 收到SessionGranted后向room發送PublishSessionMessage
- room返回NotifyClient給session
- 然后按照NotifyClient(帶有MessagePosted)中的client actorref將MessagePosted轉發給特定的client
- client收到MessagePosted之后完成并推出
整體的思路
- ChatRoom Actor:作為中央樞紐,負責管理所有的會話(Sessions)。每個連接到聊天室的客戶端都會通過
GetSession
消息與 ChatRoom 交互,并獲得一個專屬的會話 Actor。 - Session Actor:每個客戶端都有一個對應的 Session Actor,用于處理該客戶端的消息收發、保持客戶端狀態等。
- client(Gabbler)Actor:模擬客戶端行為,可以發送消息給 ChatRoom 或者其他客戶端。
- 客戶端間通信:通過 ChatRoom 轉發消息來實現客戶端間的通信。當一個客戶端發送消息時,它實際上是將消息發送給了 ChatRoom,然后由 ChatRoom 將消息廣播給所有其他在線的客戶端。
代碼示例
定義消息,這里實際上等同于定義actor之間的通信協議
- RoomCommand,用來獲取session
- SessionEvent,用來管理session和發送message
- SessionCommand,用來發送message和通知client
object ChatRoom {sealed trait RoomCommandfinal case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommandsealed trait SessionEventfinal case class SessionGranted(handle: ActorRef[PostMessage]) extends SessionEventfinal case class SessionDenied(reason: String) extends SessionEventfinal case class MessagePosted(screenName: String, message: String) extends SessionEventsealed trait SessionCommandfinal case class PostMessage(message: String) extends SessionCommandprivate final case class NotifyClient(message: MessagePosted) extends SessionCommand
}
ChatRoom actor部分
object ChatRoom {// PublishSessionMessage消息將包含的ChatRoom消息傳播到所有連接的客戶端private final case class PublishSessionMessage(screenName: String, message: String) extends RoomCommanddef apply(): Behavior[RoomCommand] =chatRoom(List.empty)private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =Behaviors.receive { (context, message) =>message match {// 如果收到GetSession,create a child actor for further interaction with the clientcase GetSession(screenName, client) =>val ses = context.spawn(session(context.self, screenName, client),name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))client ! SessionGranted(ses)chatRoom(ses :: sessions) // ::用于將ses添加到sessions頭。由于Akka 的行為是不可變的(每次更改狀態都必須返回一個新的 behavior),所以通常通過 遞歸函數 + 參數攜帶狀態 的方式來模擬“狀態變化”// 如果接收到 PublishSessionMessage 就向所有的session發送notification,每個session都帶有client內容。等于是申請chatroom允許發送case PublishSessionMessage(screenName, message) =>val notification = NotifyClient(MessagePosted(screenName, message))sessions.foreach(_ ! notification) // 將消息轉發給session中的所有clientBehaviors.same}}// 用于創建session actor,接受SessionCommand消息private def session(room: ActorRef[PublishSessionMessage],screenName: String,client: ActorRef[SessionEvent]): Behavior[SessionCommand] =Behaviors.receiveMessage {// 向room中的所有其他用戶發送消息case PostMessage(message) =>room ! PublishSessionMessage(screenName, message)Behaviors.same// room發布消息通知clientcase NotifyClient(message) =>client ! messageBehaviors.same}
}
客戶端部分
object Gabbler {import ChatRoom._def apply(): Behavior[SessionEvent] =Behaviors.setup { context =>Behaviors.receiveMessage {case SessionDenied(reason) =>context.log.info("cannot start chat room session: {}", reason)Behaviors.stoppedcase SessionGranted(handle) =>handle ! PostMessage("Hello World!")Behaviors.samecase MessagePosted(screenName, message) =>context.log.info("message has been posted by '{}': {}", screenName, message)Behaviors.stopped}}
actorsystem入口
- 這里使用了Behaviors.setup。
Behaviors.setup
和Behaviors.receiveMessage
都是用于定義 Actor 行為的工廠方法,區別是Behaviors.setup
允許你在初始化階段訪問ActorContext
,而Behaviors.receiveMessage
不直接提供對上下文的訪問,專注于消息處理邏輯 Main
Actor,對應于傳統 Java 應用程序中的main
方法
object Main {def apply(): Behavior[NotUsed] =Behaviors.setup { context =>val chatRoom = context.spawn(ChatRoom(), "chatroom")val gabblerRef = context.spawn(Gabbler(), "gabbler")context.watch(gabblerRef) //監控gabbler actor,如果gabbler 終止了,當前 Actor 將收到一個 Terminated(gabblerRef) 信號chatRoom ! ChatRoom.GetSession("ol’ Gabbler", gabblerRef)// 處理 Terminated 信號Behaviors.receiveSignal {case (_, Terminated(_)) =>Behaviors.stopped}}def main(args: Array[String]): Unit = {ActorSystem(Main(), "ChatRoomDemo")}
}
運行結果如下,按照預期邏輯,目前只有一個client,并且發送消息收到響應后推出