使用Scala,Play和Akka連接到RabbitMQ(AMQP)

在本文中,我們將研究如何從Scala連接到RabbitMQ,以便可以從應用程序中支持AMQP協議。 在此示例中,我將使用Play Framework 2.0作為容器(有關更多信息,請參閱我在該主題上的其他文章 )在其中運行應用程序,因為Play使得使用Scala進行開發變得更加容易。 本文還將使用Akka actor發送和接收RabbitMQ的消息。

什么是AMQP

首先,快速介紹AMQP。 AMQP代表“高級消息隊列協議”,是消息傳遞的開放標準。 AMQP 主頁聲明其愿景是:“成為所有消息中間件之間互操作性的標準協議”。 AMQP定義了用于交換消息的傳輸級別協議,該協議可用于集成來自許多不同平臺,語言和技術的應用程序。
有許多工具可以實現此協議,但是RabbitMQ引起了越來越多的關注。 RabbitMQ是使用AMQP的基于Erlang的開源消息代理。 所有會說AMQP的應用程序都可以連接并使用RabbitMQ。 因此,在本文中,我們將展示如何將基于Play2 / Scala / Akka的應用程序連接到RabbitMQ。 在本文中,我們將向您展示如何實現兩種最常見的方案:

  • 發送/接收:我們將配置一個發件人每隔幾秒鐘發送一條消息,并使用兩個偵聽器以循環方式從隊列中讀取消息。
  • 發布/訂閱:在此示例中,我們將創建幾乎相同的方案,但是這次,偵聽器將同時獲得消息。

我假設您已經安裝了RabbitMQ。 如果不是,請按照其網站上的說明進行操作。

設置基本的Play 2 / Scala項目

在此示例中,我創建了一個新的Play 2項目。 這樣做很容易:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ_            _ _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ play! 2.0-RC2, http://www.playframework.orgThe new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQWhat is the application name? 
> PlayAndRabbitMQWhich template do you want to use for this new application? 1 - Create a simple Scala application2 - Create a simple Java application3 - Create an empty project> 1OK, application PlayAndRabbitMQ is created.Have fun!

我曾經使用scala-ide插件在Eclipse上工作,所以我執行play eclipsify并將項目導入Eclipse。
我們需要做的下一步是建立正確的依賴關系。 Play為此使用sbt,并允許您從項目目錄中的build.scala文件配置依賴項。 我們將添加的唯一依賴關系是RabbitMQ的Java客戶端庫。 即使Lift提供了一個基于Scala的AMQP庫,但我發現直接使用RabbitMQ也是一樣容易。 添加依賴項后,我的build.scala如下所示:

import sbt._
import Keys._
import PlayProject._object ApplicationBuild extends Build {val appName         = "PlayAndRabbitMQ"val appVersion      = "1.0-SNAPSHOT"val appDependencies = Seq("com.rabbitmq" % "amqp-client" % "2.8.1")val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings()
}

將rabbitMQ配置添加到配置文件

對于我們的示例,我們可以配置一些東西。 將消息發送到的隊列,要使用的交換以及運行RabbitMQ的主機。 在實際情況下,我們將需要設置更多的配置選項,但是在這種情況下,我們只有這三個。 將以下內容添加到您的application.conf中,以便我們可以從我們的應用程序中引用它。

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

現在,我們可以使用ConfigFactory訪問這些配置文件。 為了便于訪問,請創建以下對象:

object Config {val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

初始化與RabbitMQ的連接

在研究如何使用RabbitMQ發送和接收消息之前,我們還有一個要定義的對象。 要使用RabbitMQ,我們需要一個連接。 我們可以使用ConnectionFactory獲得與服務器的連接。 查看javadocs以獲取有關如何配置連接的更多信息。

object RabbitMQConnection {private val connection: Connection = null;/*** Return a connection if one doesn't exist. Else create* a new one*/def getConnection(): Connection = {connection match {case null => {val factory = new ConnectionFactory();factory.setHost(Config.RABBITMQ_HOST);factory.newConnection();}case _ => connection}}
}

在應用程序啟動時啟動偵聽器

在查看RabbitMQ代碼之前,我們還需要做一件事。 我們需要確保在應用程序啟動時注冊了消息偵聽器,并且發件人開始發送。 播放2提供了
為此的GlobalSettings對象,您可以在應用程序啟動時擴展該對象以執行代碼。 對于我們的示例,我們將使用以下對象(請記住,該對象需要存儲在默認名稱空間中:

import play.api.mvc._
import play.api._
import rabbitmq.Senderobject Global extends GlobalSettings {override def onStart(app: Application) {Sender.startSending}
}

我們將在下面的部分中查看此Sender.startSending操作,該操作將初始化所有發送者和接收者。

設置發送和接收方案

讓我們看一下Sender.startSending代碼,該代碼將設置一個將msg發送到特定隊列的發送方。 為此,我們使用以下代碼:

object Sender {def startSending = {// create the connectionval connection = RabbitMQConnection.getConnection();// create the channel we use to sendval sendingChannel = connection.createChannel();// make sure the queue exists we want to send tosendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new SendingActor(channel = sendingChannel, queue = Config.RABBITMQ_QUEUE))), "MSG to Queue");}
}class SendingActor(channel: Channel, queue: String) extends Actor {def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish("", queue, null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

在此代碼中,我們采取以下步驟:

  1. 使用工廠檢索到RabbitMQ的連接
  2. 在此連接上創建一個通道,用于與RabbitMQ通信
  3. 使用通道創建隊列(如果尚不存在)
  4. 安排Akka每秒向演員發送一條消息。

所有這些都應該非常簡單。 唯一(有點)復雜的部分是調度部分。 此調度操作的作用是這樣的。 我們告訴Akka安排要發送給演員的消息。 我們希望在發射之前有2秒的延遲,并且我們希望每秒重復一次此作業。 應該用于此的actor是SendingActor,您也可以在此清單中看到。 該參與者需要訪問通道以發送消息,并且該參與者還需要知道接收消息的位置。 這是隊列。
因此,該Actor每秒將收到一條消息,附加一個時間戳,并使用提供的通道將此消息發送到隊列:channel.basicPublish(“”,queue,null,msg.getBytes());。 現在我們每秒發送一條消息,在此隊列上有可以接收消息的偵聽器將是很好的。 為了接收消息,我們還創建了一個Actor,可以在特定隊列上無限期地進行監聽。

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {// called on the initial rundef receive = {case _ => startReceving}def startReceving = {val consumer = new QueueingConsumer(channel);channel.basicConsume(queue, true, consumer);while (true) {// wait for the messageval delivery = consumer.nextDelivery();val msg = new String(delivery.getBody());// send the message to the provided callback function// and execute this in a subactorcontext.actorOf(Props(new Actor {def receive = {case some: String => f(some);}})) ! msg}}
}

這個actor比我們以前發送的actor要復雜一些。 當該參與者收到消息(消息的種類無關緊要)時,它將開始偵聽創建該消息的隊列。 它通過使用提供的通道創建使用者來實現此目的,并告訴使用者開始在指定隊列上偵聽。 Consumer.nextDelivery()方法將阻塞,直到消息在配置的隊列中等待。 收到消息后,將創建一個新的Actor,將消息發送到該Actor。 這個新的參與者將消息傳遞給提供的方法,您可以在其中放置業務邏輯。
要使用此偵聽器,我們需要提供以下參數:

  • 頻道:允許訪問RabbitMQ
  • 隊列:監聽消息的隊列
  • f:收到消息后我們將執行的功能。

第一個示例的最后一步是將所有內容粘合在一起。 為此,我們向Sender.startSending方法添加了幾個方法調用。

def startSending = {...val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);// create an actor that starts listening on the specified queue and passes the// received message to the provided callbackval callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);// setup the listener that sends to a specific queue using the SendingActorsetupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);...}private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");}

在此代碼中,您可以看到我們定義了一個回調函數,并使用此回調函數以及隊列和通道來創建ListeningActor。 我們使用scheduleOnce方法在單獨的線程中啟動此偵聽器。 現在,使用此代碼,我們可以運行應用程序(播放運行),打開localhost:9000來啟動應用程序,我們應該看到類似以下輸出的內容。

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

在這里,您可以清楚地看到消息的循環處理方式。

設置發布和訂閱方案

一旦我們運行了上面的代碼,添加發布/訂閱功能就變得非常簡單。 現在,我們使用PublishingActor代替SendingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {/*** When we receive a message we sent it using the configured channel*/def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish(exchange, "", null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

RabbitMQ使用交換來允許多個收件人接收相同的消息(以及許多其他高級功能)。 來自其他參與者的代碼唯一的變化是,這次我們將消息發送到交換而不是隊列。 偵聽器代碼完全相同,我們唯一需要做的就是將隊列連接到特定的交換機。 這樣,該隊列上的偵聽器就可以接收發送到交換機的消息。 我們再次根據之前使用的設置方法執行此操作。

...// create a new sending channel on which we declare the exchangeval sendingChannel2 = connection.createChannel();sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");// define the two callbacks for our listenersval callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);// create a channel for the listener and setup the first listenerval listenChannel1 = connection.createChannel();setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback3);// create another channel for a listener and setup the second listenerval listenChannel2 = connection.createChannel();setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback4);// create an actor that is invoked every two seconds after a delay of// two seconds with the message "msg"Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new PublishingActor(channel = sendingChannel2, exchange = Config.RABBITMQ_EXCHANGEE))), "MSG to Exchange");...

我們還為setupListener創建了一個重載方法,該方法作為一個附加參數,也接受要使用的交換的名稱。

private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {channel.queueBind(queueName, exchange, "");Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");}

在這小段代碼中,您可以看到我們將提供的隊列(在我們的示例中是一個隨機名稱)綁定到指定的交易所。 之后,我們將創建一個新的監聽器,如我們之前所見。
現在運行此代碼將產生以下輸出:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

如您所見,在這種情況下,兩個偵聽器都收到相同的消息。 這幾乎涵蓋了本文的全部內容。 如您所見,為RabbitMQ使用基于Java的客戶端api綽綽有余,并且可以從Scala輕松使用。 請注意,盡管該示例尚未準備好投入生產,但您應注意關閉連接,并很好地關閉偵聽器和參與者。 這里沒有顯示所有關閉代碼。

參考:通過我們的JCG合作伙伴 Jos Dirksen在Smart Java博客上使用Scala,Play和Akka連接到RabbitMQ(AMQP) 。


翻譯自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/372999.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/372999.shtml
英文地址,請注明出處:http://en.pswp.cn/news/372999.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

阿爾法貝塔閥原理_圖總結 - 阿爾法個貝塔 - 博客園

一.思維導圖二.概念筆記圖的存儲結構1. 鄰接矩陣定義:設圖G有n (n大于等于1) 個頂點,則鄰接矩陣是一個n階方陣。當矩陣中的 [i,j] !0(下標從1開始) ,代表其對應的第i個頂點與第j個頂點是連接的特點無向圖的鄰接矩陣是對稱矩陣,n個頂點的無向圖…

WebApi Post 后臺無法獲取參數的解決方案

事件回放: 之前一段時間,公司里前端用的Angularjs 發送http請求也是用的ng的組件,后臺是.Net的WebApi 前端 var data {PArgs: {PageIndex: 0,PageSize: 8,RowsCount: 0} };$http.post("/Api/Test/ABC", data).success(function (d…

南京大學計算機系周小莉,周會群

媒體報道:南京大學周會群:用計算機聰明地做實驗Q《中國教育網絡》A周會群Q:南京大學的高性能計算中心非常特殊,分布在物理,化學、天文、地球科學四個不同的學科中,為什么采取這種模式?A&#xf…

不要慫,就是GAN (生成式對抗網絡) (五):無約束條件的 GAN 代碼與網絡的 Graph...

GAN 這個領域發展太快,日新月異,各種 GAN 層出不窮,前幾天看到一篇關于 Wasserstein GAN 的文章,講的很好,在此把它分享出來一起學習:https://zhuanlan.zhihu.com/p/25071913。相比 Wasserstein GAN &#…

用于MyBatis CRUD操作的Spring MVC 3控制器

到目前為止,我們已經為域類“ User ”創建了CRUD數據庫服務,并且還將MyBatis配置與Spring Configuration文件集成在一起。 接下來,我們將使用Spring MVC創建一個網頁,以使用MyBatis CRUD服務對數據庫執行操作。 使用MyBatis 3創建…

2pin接口耳機_拆解報告:雷柏首款真無線耳機XS200

-----我愛音頻網拆解報告第185篇-----雷柏是一家歷史悠久的鼠標和鍵盤廠商,截至目前,雷柏(rapoo)總共出了四款耳機,此前曾推出過三款藍牙耳機, 分別是S500 藍牙立體聲麥克風耳機,S200 藍牙立體聲麥克風耳機&#xff0c…

html表單中陰影,html5中input表單加邊框,陰影效果.doc

文檔介紹:CSS:input:focus{border-color:#99;}獲取焦點時改變顏色focus能同時改變寬度長度背景色…….form,p(margin-bottom:30px;margin-left:20px;).shadow,.one,.two,.three,.four,.five,.six( height:50px; width:280px; border:C;).shadow( -moz-box-shadow:C;…

帶有GSON和抽象類的JSON

經過多年使用org.json庫在Java中支持JSON數據交換格式后,我已切換到Google Gson 。 org.json是一個較低級的庫,因此您必須創建JSONObject,JSONArray,JSONString等…并執行其他低級工作。 Gson簡化了這項工作。 它提供了簡單的toJs…

深入理解javascript原型和閉包(3)——prototype原型

轉載,原文地址http://www.cnblogs.com/wangfupeng1988/p/3978131.html 既typeof之后的另一位老朋友! prototype也是我們的老朋友,即使不了解的人,也應該都聽過它的大名。如果它還是您的新朋友,我估計您也是javascript的…

python 溫度 符號_Python通過小實例入門學習---1.0(溫度轉換)

1.安裝Python 3 下載地址: Welcome to Python.org?www.python.org 2.“溫度轉換”實例:攝氏度--->華氏度 / 華氏度--->攝氏度 TempConvert.py TempStr = input("請輸入帶有符號的溫度值:") if TempStr[-1] in ["f","F"]:C = (eval(Tem…

mysql 修改root密碼

1.找到配置文件my.ini ,然后將其打開,可以選擇用記事本打開 C:\Program Files (x86)\MySQL\MySQL Server 5.0 2.打開后,搜索mysqld關鍵字,找到后,在mysqld下面添加skip-grant-tables,保存退出。 PS&#x…

聯想計算機CDROM啟動,聯想電腦光驅啟動問題?

1、開機按del鍵或f2進入bios設置(不同主板按鍵不一樣,一般是DEL,也可能是F2,可以參考下主板說明),將計算機的啟動模式調成從光盤啟動。也就是從cdrom啟動,根據主板的不同,bios設置有所差異(一般是&#xff…

沒有J2EE容器的JNDI和JPA

我們希望通過盡可能簡單的設置來測試一些JPA代碼。 計劃僅使用Java和Maven,不使用應用程序服務器或其他J2EE容器。 我們的JPA配置需要兩件事才能成功運行: 數據庫來存儲數據, JNDI訪問數據庫。 這篇文章分為兩個部分。 第一部分顯示了如何…

string 大小寫轉換

STL的algorithm庫確實給我們提供了這樣的便利&#xff0c;使用模板函數transform可以輕松解決這個問題&#xff0c;開發人員只需要提供一個函數對象&#xff0c;例如將char轉成大寫的toupper函數或者小寫的函數tolower函數。 transform原型&#xff1a; 1 #include <string&…

linux服務器上svn的log_如何在 Centos 8 / RHEL 8 上安裝和配置 VNC 服務器 | Linux 中國...

在 Centos 8 和 RHEL 8 系統中&#xff0c;默認未安裝 VNC 服務器&#xff0c;它需要手動安裝。在本文中&#xff0c;我們將通過簡單的分步指南&#xff0c;介紹如何在 Centos 8 / RHEL 8 上安裝 VNC 服務器。-- Pradeep KumarVNC(虛擬網絡計算Virtual Network Computing)服務器…

怎么把網頁保存到本地計算機,在IE瀏覽器中,將網頁保存到本地計算機中,若只需保存其中的文字、超鏈接和表格信息,應該選擇的保存類型為( )...

2.(2017高一上東臺月考)閱讀下面一段資料&#xff0c;判斷在給出的幾種說法中不正確的是( )資料&#xff1a;IP電話與傳統電話IP電話是按國際互聯網協議規定的網絡技術內容開通的電話業務&#xff0c;中文翻譯為網絡電話或互聯網電話&#xff0c;它是利用國際互聯網Inetrnet為…

html_博客博主

csdn: 工匠若水 http://blog.csdn.net/yanbober yunama: IT藍豹&#xff1a;http://www.itlanbao.com/&#xff1b; http://ask.dcloud.net.cn/docs/; 博客園&#xff1a; https://www.cnblogs.com/guweiwei/category/965437.html轉載于:https://www.cnblogs.com/awkflf11/p/55…

Windows上的Java線程CPU分析

本文將為您提供一個教程&#xff0c;介紹如何在Windows OS上快速查明Java線程貢獻者與CPU嚴重問題有關。 Windows與Linux&#xff0c;Solaris和AIX等其他操作系統一樣&#xff0c;使您可以在進程級別監視CPU利用率&#xff0c;還可以監視在進程中執行任務的單個線程。 在本教程…

flask 繼承模版的基本使用1

轉載于:https://www.cnblogs.com/wanghaonull/p/6399492.html

東芝2303am維護清零_東芝打印機2303A怎樣清零

展開全部東芝e68a843231313335323631343130323136353331333365653137打印機是按照相關要求生產的正規產品&#xff0c;其清零方式與正規產品相同。因此此處將介紹常用的打印機清零方法。打印機清零一般分兩種&#xff1a;一種是手工清零&#xff0c;另一種是軟件清零。一、手工…