什么是AMQP
首先,快速介紹AMQP。 AMQP代表“高級消息隊列協議”,是消息傳遞的開放標準。 AMQP 主頁聲明其愿景是:“成為所有消息中間件之間互操作性的標準協議”。 AMQP定義了用于交換消息的傳輸級別協議,該協議可用于集成來自許多不同平臺,語言和技術的應用程序。
有許多工具可以實現此協議,但是RabbitMQ引起了越來越多的關注。 RabbitMQ是使用AMQP的基于Erlang的開源消息代理。 所有會說AMQP的應用程序都可以連接并使用RabbitMQ。 因此,在本文中,我們將展示如何將基于Play2 / Scala / Akka的應用程序連接到RabbitMQ。 在本文中,我們將向您展示如何實現兩種最常見的方案:
- 發送/接收:我們將配置一個發件人每隔幾秒鐘發送一條消息,并使用兩個偵聽器以循環方式從隊列中讀取消息。
- 發布/訂閱:在此示例中,我們將創建幾乎相同的方案,但是這次,偵聽器將同時獲得消息。
我假設您已經安裝了RabbitMQ。 如果不是,請按照其網站上的說明進行操作。
設置基本的Play 2 / Scala項目
在此示例中,我創建了一個新的Play 2項目。 這樣做很容易:
jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ_ _ _ __ | | __ _ _ _| |
| '_ \| |/ _' | || |_|
| __/|_|\____|\__ (_)
|_| |__/ play! 2.0-RC2, http://www.playframework.orgThe new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQWhat is the application name?
> PlayAndRabbitMQWhich template do you want to use for this new application? 1 - Create a simple Scala application2 - Create a simple Java application3 - Create an empty project> 1OK, application PlayAndRabbitMQ is created.Have fun!
我曾經使用scala-ide插件在Eclipse上工作,所以我執行play eclipsify并將項目導入Eclipse。
我們需要做的下一步是建立正確的依賴關系。 Play為此使用sbt,并允許您從項目目錄中的build.scala文件配置依賴項。 我們將添加的唯一依賴關系是RabbitMQ的Java客戶端庫。 即使Lift提供了一個基于Scala的AMQP庫,但我發現直接使用RabbitMQ也是一樣容易。 添加依賴項后,我的build.scala如下所示:
import sbt._
import Keys._
import PlayProject._object ApplicationBuild extends Build {val appName = "PlayAndRabbitMQ"val appVersion = "1.0-SNAPSHOT"val appDependencies = Seq("com.rabbitmq" % "amqp-client" % "2.8.1")val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings()
}
將rabbitMQ配置添加到配置文件
對于我們的示例,我們可以配置一些東西。 將消息發送到的隊列,要使用的交換以及運行RabbitMQ的主機。 在實際情況下,我們將需要設置更多的配置選項,但是在這種情況下,我們只有這三個。 將以下內容添加到您的application.conf中,以便我們可以從我們的應用程序中引用它。
#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1
現在,我們可以使用ConfigFactory訪問這些配置文件。 為了便于訪問,請創建以下對象:
object Config {val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}
初始化與RabbitMQ的連接
在研究如何使用RabbitMQ發送和接收消息之前,我們還有一個要定義的對象。 要使用RabbitMQ,我們需要一個連接。 我們可以使用ConnectionFactory獲得與服務器的連接。 查看javadocs以獲取有關如何配置連接的更多信息。
object RabbitMQConnection {private val connection: Connection = null;/*** Return a connection if one doesn't exist. Else create* a new one*/def getConnection(): Connection = {connection match {case null => {val factory = new ConnectionFactory();factory.setHost(Config.RABBITMQ_HOST);factory.newConnection();}case _ => connection}}
}
在應用程序啟動時啟動偵聽器
在查看RabbitMQ代碼之前,我們還需要做一件事。 我們需要確保在應用程序啟動時注冊了消息偵聽器,并且發件人開始發送。 播放2提供了
為此的GlobalSettings對象,您可以在應用程序啟動時擴展該對象以執行代碼。 對于我們的示例,我們將使用以下對象(請記住,該對象需要存儲在默認名稱空間中:
import play.api.mvc._
import play.api._
import rabbitmq.Senderobject Global extends GlobalSettings {override def onStart(app: Application) {Sender.startSending}
}
我們將在下面的部分中查看此Sender.startSending操作,該操作將初始化所有發送者和接收者。
設置發送和接收方案
讓我們看一下Sender.startSending代碼,該代碼將設置一個將msg發送到特定隊列的發送方。 為此,我們使用以下代碼:
object Sender {def startSending = {// create the connectionval connection = RabbitMQConnection.getConnection();// create the channel we use to sendval sendingChannel = connection.createChannel();// make sure the queue exists we want to send tosendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new SendingActor(channel = sendingChannel, queue = Config.RABBITMQ_QUEUE))), "MSG to Queue");}
}class SendingActor(channel: Channel, queue: String) extends Actor {def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish("", queue, null, msg.getBytes());Logger.info(msg);}case _ => {}}
}
在此代碼中,我們采取以下步驟:
- 使用工廠檢索到RabbitMQ的連接
- 在此連接上創建一個通道,用于與RabbitMQ通信
- 使用通道創建隊列(如果尚不存在)
- 安排Akka每秒向演員發送一條消息。
所有這些都應該非常簡單。 唯一(有點)復雜的部分是調度部分。 此調度操作的作用是這樣的。 我們告訴Akka安排要發送給演員的消息。 我們希望在發射之前有2秒的延遲,并且我們希望每秒重復一次此作業。 應該用于此的actor是SendingActor,您也可以在此清單中看到。 該參與者需要訪問通道以發送消息,并且該參與者還需要知道接收消息的位置。 這是隊列。
因此,該Actor每秒將收到一條消息,附加一個時間戳,并使用提供的通道將此消息發送到隊列:channel.basicPublish(“”,queue,null,msg.getBytes());。 現在我們每秒發送一條消息,在此隊列上有可以接收消息的偵聽器將是很好的。 為了接收消息,我們還創建了一個Actor,可以在特定隊列上無限期地進行監聽。
class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {// called on the initial rundef receive = {case _ => startReceving}def startReceving = {val consumer = new QueueingConsumer(channel);channel.basicConsume(queue, true, consumer);while (true) {// wait for the messageval delivery = consumer.nextDelivery();val msg = new String(delivery.getBody());// send the message to the provided callback function// and execute this in a subactorcontext.actorOf(Props(new Actor {def receive = {case some: String => f(some);}})) ! msg}}
}
這個actor比我們以前發送的actor要復雜一些。 當該參與者收到消息(消息的種類無關緊要)時,它將開始偵聽創建該消息的隊列。 它通過使用提供的通道創建使用者來實現此目的,并告訴使用者開始在指定隊列上偵聽。 Consumer.nextDelivery()方法將阻塞,直到消息在配置的隊列中等待。 收到消息后,將創建一個新的Actor,將消息發送到該Actor。 這個新的參與者將消息傳遞給提供的方法,您可以在其中放置業務邏輯。
要使用此偵聽器,我們需要提供以下參數:
- 頻道:允許訪問RabbitMQ
- 隊列:監聽消息的隊列
- f:收到消息后我們將執行的功能。
第一個示例的最后一步是將所有內容粘合在一起。 為此,我們向Sender.startSending方法添加了幾個方法調用。
def startSending = {...val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);// create an actor that starts listening on the specified queue and passes the// received message to the provided callbackval callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);// setup the listener that sends to a specific queue using the SendingActorsetupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);...}private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");}
在此代碼中,您可以看到我們定義了一個回調函數,并使用此回調函數以及隊列和通道來創建ListeningActor。 我們使用scheduleOnce方法在單獨的線程中啟動此偵聽器。 現在,使用此代碼,我們可以運行應用程序(播放運行),打開localhost:9000來啟動應用程序,我們應該看到類似以下輸出的內容。
[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822
在這里,您可以清楚地看到消息的循環處理方式。
設置發布和訂閱方案
一旦我們運行了上面的代碼,添加發布/訂閱功能就變得非常簡單。 現在,我們使用PublishingActor代替SendingActor:
class PublishingActor(channel: Channel, exchange: String) extends Actor {/*** When we receive a message we sent it using the configured channel*/def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish(exchange, "", null, msg.getBytes());Logger.info(msg);}case _ => {}}
}
RabbitMQ使用交換來允許多個收件人接收相同的消息(以及許多其他高級功能)。 來自其他參與者的代碼唯一的變化是,這次我們將消息發送到交換而不是隊列。 偵聽器代碼完全相同,我們唯一需要做的就是將隊列連接到特定的交換機。 這樣,該隊列上的偵聽器就可以接收發送到交換機的消息。 我們再次根據之前使用的設置方法執行此操作。
...// create a new sending channel on which we declare the exchangeval sendingChannel2 = connection.createChannel();sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");// define the two callbacks for our listenersval callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);// create a channel for the listener and setup the first listenerval listenChannel1 = connection.createChannel();setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback3);// create another channel for a listener and setup the second listenerval listenChannel2 = connection.createChannel();setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback4);// create an actor that is invoked every two seconds after a delay of// two seconds with the message "msg"Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new PublishingActor(channel = sendingChannel2, exchange = Config.RABBITMQ_EXCHANGEE))), "MSG to Exchange");...
我們還為setupListener創建了一個重載方法,該方法作為一個附加參數,也接受要使用的交換的名稱。
private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {channel.queueBind(queueName, exchange, "");Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");}
在這小段代碼中,您可以看到我們將提供的隊列(在我們的示例中是一個隨機名稱)綁定到指定的交易所。 之后,我們將創建一個新的監聽器,如我們之前所見。
現在運行此代碼將產生以下輸出:
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006
如您所見,在這種情況下,兩個偵聽器都收到相同的消息。 這幾乎涵蓋了本文的全部內容。 如您所見,為RabbitMQ使用基于Java的客戶端api綽綽有余,并且可以從Scala輕松使用。 請注意,盡管該示例尚未準備好投入生產,但您應注意關閉連接,并很好地關閉偵聽器和參與者。 這里沒有顯示所有關閉代碼。
參考:通過我們的JCG合作伙伴 Jos Dirksen在Smart Java博客上使用Scala,Play和Akka連接到RabbitMQ(AMQP) 。
翻譯自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html