RabbitMQ實例教程:發布/訂閱者消息隊列

消息交換機(Exchange)


  RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,一般的情況生產者甚至不知道消息應該發送到哪些隊列。


wKiom1YZeRCgrXh9AAA8_sKBnCU136.jpg

  相反的,生產者只能發送消息給交換機(Exchange)。交換機的作用非常簡單,一邊接收從生產者發來的消息,另一邊把消息推送到隊列中。交換機必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丟棄?這些規則通過交換機的類型進行定義。


  交換機的類型有:direct,topic,headers 和 fanout。我們以fanout為例創建一個“logs”類型的交換機。


1
channel.exchangeDeclare("logs",?"fanout");


  fanout交換機非常簡單,它會廣播它收到的所有隊列的所有消息。


  交換機命名


  在前面的例子中,我們不了解交換機的任何概念,也能發送消息,這是因為我們使用了默認的交換機(""),但以后可以使用我們自定義的交換機了。


1
2
channel.basicPublish("",?"hello",?null,?message.getBytes());?//空字符串交換機
channel.basicPublish(?"logs",?"",?null,?message.getBytes());?//logs交換機


  臨時隊列(Temporary Queues)


  在前面的例子中,我們為隊列都指定了具體的名字(如hello和task_queue),給隊列命名是非常重要的事情,因為生產者和消費者是隊列名稱來傳遞消息的。


  但是對于日志來說的消息隊列,我們會監聽所有的日志消息,而不是其中的一些子集。而且我們只關注當前發生的消息而不是歷史消息,要解決這些問題需要這么做:


  首先,當我們連接Rabbit服務器時,我們需要一個新的空隊列。我們可以自己隨機生成一個隊列名字或者讓服務器隨機生成一個隊列名字。


  其次,當消息消費者失去連接時,隊列應該自動刪除。


  在Java中,我們使用不帶參數的queueDeclare()方法創建一個非持久化的,唯一的,用后自動刪除的隊列。


1
String?queueName?=?channel.queueDeclare().getQueue();


  queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 這樣的隨機隊列名。


  消息綁定(Bindings)


  前面我們創建了一個fanout類型的交換機和隊列。現在需要告訴交換機發送消息到隊列。交換機和隊列之間的關系就是消息綁定(binding)。


wKioL1YZemfT9Am4AAA7uKydZt4912.jpg

  使用下面的代碼logs交換機會將消息傳遞給隊列。


1
channel.queueBind(queueName,?"logs",?"");


  將交換機和消息綁定放在一起


wKiom1YZetLSZ2z_AABbZcUcUF0159.jpg


  現在我們有一個提交日志的的消息生產者,它與我們之前的消息發送者并沒有太大的區別,唯一不同的地方是我們將消息發送到 logs 交換機,而不是沒有名字的交換機。當發送消息時,我們需要提供一個路由,盡管它在 fanout 交換機中并沒有什么作用。下面是提交日志的Java代碼。


  EmitLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package?com.favccxx.favrabbit;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
public?class?EmitLog?{
?private?static?final?String?EXCHANGE_NAME?=?"logs";
?public?static?void?main(String[]?argv)?throws?Exception?{
??ConnectionFactory?factory?=?new?ConnectionFactory();
??factory.setHost("localhost");
??Connection?connection?=?factory.newConnection();
??Channel?channel?=?connection.createChannel();
??channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");
??String[]?sendMsgs?=?{"I",?"saw",?"a",?"dog"};
??String?message?=?getMessage(sendMsgs);
??channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes("UTF-8"));
??System.out.println("?[x]?Sent?'"?+?message?+?"'");
??channel.close();
??connection.close();
?}
?private?static?String?getMessage(String[]?strings)?{
??if?(strings.length?<?1)
???return?"info:?Hello?World!";
??return?joinStrings(strings,?"?");
?}
?private?static?String?joinStrings(String[]?strings,?String?delimiter)?{
??int?length?=?strings.length;
??if?(length?==?0)
???return?"";
??StringBuilder?words?=?new?StringBuilder(strings[0]);
??for?(int?i?=?1;?i?<?length;?i++)?{
???words.append(delimiter).append(strings[i]);
??}
??return?words.toString();
?}
}



  正如上面所示,與消息服務器建立連接后,聲明了一個交換機,這是因為系統不允許發布到空交換機。?如果沒有隊列綁定到交換機的話,消息就會丟失,但我們不用擔心。如果沒有消費者監聽消息的話,我們就丟棄該消息。


  接收消息代碼ReceiveLogs.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package?com.favccxx.favrabbit;
import?java.io.IOException;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?com.rabbitmq.client.Consumer;
import?com.rabbitmq.client.DefaultConsumer;
import?com.rabbitmq.client.Envelope;
public?class?ReceiveLogs?{
?private?static?final?String?EXCHANGE_NAME?=?"logs";
?public?static?void?main(String[]?argv)?throws?Exception?{
??ConnectionFactory?factory?=?new?ConnectionFactory();
??factory.setHost("localhost");
??Connection?connection?=?factory.newConnection();
??Channel?channel?=?connection.createChannel();
??channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");
??String?queueName?=?channel.queueDeclare().getQueue();
??channel.queueBind(queueName,?EXCHANGE_NAME,?"");
??System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");
??Consumer?consumer?=?new?DefaultConsumer(channel)?{
???@Override
???public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,
?????byte[]?body)?throws?IOException?{
????String?message?=?new?String(body,?"UTF-8");
????System.out.println("?[x]?Received?'"?+?message?+?"'");
???}
??};
??channel.basicConsume(queueName,?true,?consumer);
?}
}



  測試數據


  運行幾個日志消息接收者實例,使用日志消息發送者發送消息,發現每個日志消息接收者都接收到同樣的數據,說明發布訂閱成功。


1
?[x]?Received?'I?saw?a?dog'




本文轉自 genuinecx 51CTO博客,原文鏈接:http://blog.51cto.com/favccxx/1701738,如需轉載請自行聯系原作者

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/453907.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/453907.shtml
英文地址,請注明出處:http://en.pswp.cn/news/453907.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OAuth 2.0(網轉)

&#xff08;一&#xff09;背景知識 OAuth 2.0很可能是下一代的“用戶驗證和授權”標準&#xff0c;目前在國內還沒有很靠譜的技術資料。為了弘揚“開放精神”&#xff0c;讓業內的人更容易理解“開放平臺”相關技術&#xff0c;進而長遠地促進國內開放平臺領域的發展&#xf…

kafka 自動提交 和 手動提交

Consumer 需要向 Kafka 匯報自己的位移數據&#xff0c;這個匯報過程被稱為提交位移&#xff08;Committing Offsets&#xff09;。因為 Consumer 能夠同時消費多個分區的數據&#xff0c;所以位移的提交實際上是在分區粒度上進行的&#xff0c;即 Consumer 需要為分配給它的每…

axios vue 回調函數_vue中ajax請求與axios包完美處理

這次給大家帶來vue中ajax請求與axios包完美處理&#xff0c;vue中ajax請求與axios包處理的注意事項有哪些&#xff0c;下面就是實戰案例&#xff0c;一起來看一下。在vue中&#xff0c;經常會用到數據請求&#xff0c;常用的有&#xff1a;vue-resourse、axios今天我說的是axio…

用int還是用Integer?

昨天例行code review時大家有討論到int和Integer的比較和使用。 這里做個整理&#xff0c;發表一下個人的看法。【int和Integer的區別】int是java提供的8種原始類型之一&#xff0c;java為每個原始類型提供了封裝類&#xff0c;Integer是int的封裝類。int默認值是0&#xff0c;…

前端之 JavaScript 常用數據類型和操作

JavaScript 常用數據類型有&#xff1a;數字、字符串、布爾、Null、Undefined、對象 JavaScript 擁有動態類型 JavaScript 擁有動態類型。這意味著相同的變量可用作不同的類型 var x; // 此時x是undefined var x 1; // 此時x是數字 var x "Alex" …

mysql備份還原(視圖、存儲過程)

最近在備份還原mysql的時候發現&#xff0c;視圖還原報錯&#xff0c;無法創建視圖&#xff0c;在網上查了下資料&#xff0c;找到以下信息&#xff1a;1、如果備份的數據庫含有視圖,還原時需要把my.ini中的character-set改為latin1,才能夠還原視圖。2、還原后,需要把latin1改為…

有關javabean的說法不正確的是_關于 JavaBean, 下列敘述中不正確的是 ( ) 。_學小易找答案...

【填空題】在使用 URL 傳值時傳輸的數據只能是 類型。【簡答題】陶器是人類最偉大的發明,比四大發明更有意義,你如何認為?(手機上直接回答提交)【單選題】對于 ( ) 作用范圍的 Bean, 當客戶離開這個頁面時 JSP 引擎取消為客戶的該頁 面分配的 Bean, 釋放他所占的內存空間。【填…

Postgres中tuple的組裝與插入

1.相關的數據類型 我們先看相關的數據類型&#xff1a; HeapTupleData(src/include/access/htup.h) typedef struct HeapTupleData {uint32 t_len; /* length of *t_data */ItemPointerData t_self; /* SelfItemPointer */Oid t_tableOid; /* ta…

Python 自動生成環境依賴包 requirements

一、生成當前 python 環境 安裝的所有依賴包 1、命令 # cd 到項目路徑下&#xff0c;執行以下命令 pip freeze > requirements.txt# 或者使用如下命令 pip list --formatfreeze > requirements.txt 2、常見問題 1、中使用 pip freeze > requirements.txt 命令導出…

DenyHosts 加固centos系統安全

DenyHosts是Python語言寫的一個程序&#xff0c;它會分析sshd的日志文件&#xff08;/var/log/secure&#xff09;&#xff0c;當發現重 復的攻擊時就會記錄IP到/etc/hosts.deny文件&#xff0c;從而達到自動屏IP的功能 DenyHosts官方網站 http://denyhosts.sourceforge.net 下…

在windows xp下編譯出ffmpeg.exe

找了好多資料&#xff0c;把自己的編譯成功過程詳細敘述&#xff0c;以避免后來者可以少浪費點時間。 1.安裝MSys 到http://sourceforge.net/project/showfiles.php?group_id2435下載文件&#xff1a;   bash-3.1-MSYS-1.0.11-tar.bz2   msysCORE-1.0.11-2007.01.19-1.ta…

手機uc怎么放大頁面_手機網站怎樣做可以提高用戶體驗度?——竹晨網絡

目前&#xff0c;手機已經占據了人們大多數的閑暇時間&#xff0c;互聯網的流量開始逐漸向移動端傾斜&#xff0c;重視移動端的用戶體驗&#xff0c;就可以給客戶端增加很多意想不到的功能。但是還是有很多公司和站長不知道手機網站應該怎么建才能符合用戶的使用習慣。下面&…

科技申報項目總結

這個項目分為三大模塊&#xff0c;管理員&#xff0c;專家以及單位模塊&#xff0c;具體頁面有&#xff1a;1單位信息&#xff1b;2項目申報&#xff1b;3專家信息&#xff1b;4項目評審&#xff1b;5 項目信息&#xff1b;6申報設置&#xff1b;7專家信息。 —-項目框架SSM&am…

kafka 異常:ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOExce

問題概述 kafka進程不定期掛掉。ERROR Failed to clean up log for __consumer_offsets-30 in dir /tmp/kafka-logs due to IOException (kafka.server.LogDirFailureChannel)&#xff0c;報錯如下 [2020-12-07 16:12:36,803] ERROR Failed to clean up log for __consumer_o…

樹形控件(CTreeCtrl和CTreeView)

如何插入數據項目&#xff1f;如何添加鼠標右擊事件&#xff1f;插入數據項 通過InsertItem()方法&#xff0c;有四種重載樣式: HTREEITEM InsertItem(LPTVINSERTSTRUCT lpInsertStruct); HTREEITEM InsertItem(UINT nMask, LPCTSTR lpszItem, int nImage,int nSelectedImage, …

ffmpeg編譯(生成Windows或Win32平臺dll, lib)

ffmpeg編譯(生成Windows或Win32平臺dll, lib) 介紹&#xff1a;本文簡要介紹通過cygwin環境來編譯生成ffmpeg。 包括解碼組件libfaad與libopencore-amrnb的編譯。 1)安裝msys mingw環境 具體安裝過程可以看網上教程 我用的是&#xff1a;http://code.google.com/p/msys-cn/ 假…

2019python課件_2019版經典Python學習路線分享

Python有三大神器&#xff0c;包括numpy,scipy,matplotlib,因此適合用于數據處理。spark&#xff0c;Hadoop都開了Python的接口&#xff0c;所以使用Python做Python的mapreduce也非常簡單。因此它也備受歡迎&#xff0c;python學習大綱分享給大家。一、Python基礎1.2數據的存儲…

UML之涉眾/參與者(角色/執行者)(Actor)/業務主角(BusinessActor)/業務工人(BusinessWorker)/用戶/角色辨析【圖解】...

參考文檔&#xff1a; 【業務建模】(http://www.baike.com/wiki/%E4%B8%9A%E5%8A%A1%E5%BB%BA%E6%A8%A1) 【UML 核心元素之參與者】(http://www.voidcn.com/article/p-obarwwaq-tp.html) 【UML核心元素之參與者】(http://www.voidcn.com/article/p-ntpnhoue-da.html)轉載于:htt…

git 報錯:Please make sure you have the correct access rights and the repository exists

提示&#xff1a;Warning: Permanently added gitee.com,120.55.226.24 (ECDSA) to the list of known hosts.是公鑰出問題了&#xff0c;要先設置用戶和郵箱再重新生成ssh公鑰即可。 1、首先我得重新在git設置一下身份的名字和郵箱 進入到需要提交的文件夾底下&#xff08;…

java 實現excel 導出功能

實現功能&#xff1a;java導出excel表 1、jsp代碼 1 <form id"zhanwForm" action"<%path%>/conferences.do?" target"_self" method"get" > 2 <input type"hidden" name"method" value…