RabbitMQ學習總結(5)——發布和訂閱實例詳解

2019獨角獸企業重金招聘Python工程師標準>>> hot3.png

一、Publish/Subscribe(發布/訂閱)(using the Java Client)

 在前面的教程中,我們創建了一個work Queue(工作隊列)。工作隊列背后的假設是每個任務是交付給一個工作者(worker) 也就是均勻分給每個消費者。在本部分,我們將做一些完全不同的事情,我們將提供一個消息到多個消費者。這種模式被稱為“發布/訂閱”。
 為了說明這個模式,我們將構建一個簡單的日志系統。它將包括兩個項目:
第一個將發出日志消息 第二個將接收并打印它們。
 在我們的日志系統,每運行一次,接收器項目將得到消息的副本。這樣我們能夠運行一個接收機并且可以直接記錄到磁盤,同時我們可以運行另一個接收器,看到屏幕上的日志。  注:從本質上講,發表日志消息廣播給所有的接收者。
 下面讓我們腦中帶幾個問題,讓我們一步一步去解決:
  
如果我把消息分配給所有的消費者,我們將怎么做呢?

二、Exchanges(交換機)

在前部分的教程中,我們從一個隊列發送和接收消息。現在是時候讓Rabbit推出完整的消息模型。
讓我們快速復習我們前面的教程::
  • 生產者是一個用戶發送消息的應用程序。
  • 一個隊列是存儲消息的緩沖區。
  • 消費者是一個用戶應用程序接收消息。
RabbitMQ的消息模型的核心思想是,生產者從未直接向隊列發送任何消息。實際上,經常生產者甚至不知道消息是否會被運送到任何隊列。
相反,生產者只能發送Exchanges (消息交換區)。交換是一個非常簡單的事情。 一方面它從生產者那收到消息并推他們到另一邊隊列。交換區必須知道如何處理它收到一條消息:
  1. 它應該被加到一個特定的隊列嗎?
  2. 它應該被加到多隊列?
  3. 或者它應該丟棄嗎?
交換的規則定義的類型。
如上圖所示:X表示Exchange(交換機);
有一些可用的交換類型direct,?topic,?headers?and?fanout。我們將專注于最后一個——fanout。讓我們創建一個這種類型的交換,稱之為日志:
channel.exchangeDeclare("logs", "fanout");
fanout交換非常簡單。你大概可以猜到的名字,只是廣播所有的消息接收隊列它知道。而這正是我們需要為我們的記錄器。
問題:
①? exchange list?列出所有?(交換機)列表
sudo rabbitmqctl list_exchanges
Listing exchanges ...direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.
在此列表中有一些amq* 交換器 與默認(匿名)交換。這些都是默認創建的,但可能你不需要使用它們。
② 缺省名字的 exchange(交換機)
在前部分的教程中我們對exchange 一無所知,,但仍然能夠將消息發送到隊列。這是可能的,因為我們是使用一個 默認的交換,我們確定的空字符串(" ")
記得之前我們發布一個消息:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數是該交換區的名稱;空字符串表示默認或無名的交換,:如果routingKey存在的話,消息路由到指定的隊列的名稱。
現在,我們可以發布我們的交換器:
channel.basicPublish( "logs", "", null, message.getBytes());

三、Temporary queues(臨時隊列)

	你可能記得以前我們使用的隊列都是指定名稱的(還記得hello和task_queue嗎?)。對我們來說命名一個隊列是至關重要的,
 當你想在生產者和消費者中分享隊列的時候,給一個隊列的名稱是必須的。
但是那些都不是日志記錄系統所需要的,我們希望能夠獲得所有的日志信息,而不只是其中的一部分,而且我們只對當前正在傳遞的信息感興趣,對舊的日志信息不感興趣,要解決這些問題,我們需要分兩個步驟:
首先當我們鏈接到RabbitMQ服務器的時候,需要一個新的、空的隊列,為了做到這點,可以創建一個隨機名的隊列, 或者更好的方法就是讓服務器選擇一個隨機的隊列名。 其次,當斷開與隊列的連接時,消費者應該被自動刪除掉 在Java客戶端,我們通過一個無參數的queueDeclare()方法為我們創建一個非持久的、唯一的、能自動刪除的隊列與隊列名稱
String queueName = channel.queueDeclare().getQueue();
在這點上,queueName包含了一個隨機隊列名稱。例如它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

四、Bindings(綁定)

我們已經創建了一個fanout exchange和一個隊列,現在我們需要告訴exchange去發送消息到隊列中,exchange和隊列之間的關系被稱為一個綁定(binding)
channel.queueBind(queueName, "logs", "");
注意:從現在開始我們從logs exchange將被添加消息到隊列中,使用rabbitmqctl list_bingdins能列出所有的綁定。

五、Putting it all together(發布者/訂閱者 實現)

生產者代碼和之前的發送消息的代碼并沒有太大的區別,最重要的變化是,我們現在要將發布的消息傳遞給logs exchange來代替無名的exchange(之前的是"")
在發送消息時需要提供一個routingKey,它對于fanout exchange是非常重要的,不能被忽視的,這里的EmitLog.java代碼如下
[java]? view plain copy print ? 在CODE上查看代碼片 派生到我的代碼片
  1. </pre><pre?name="code"?class="java">import?java.io.IOException;??
  2. import?com.rabbitmq.client.ConnectionFactory;??
  3. import?com.rabbitmq.client.Connection;??
  4. import?com.rabbitmq.client.Channel;??
  5. ??
  6. public?class?EmitLog?{??
  7. ??
  8. ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
  9. ??
  10. ????public?static?void?main(String[]?argv)??
  11. ??????????????????throws?java.io.IOException?{??
  12. ??
  13. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  14. ????????factory.setHost("localhost");??
  15. ????????Connection?connection?=?factory.newConnection();??
  16. ????????Channel?channel?=?connection.createChannel();??
  17. ??
  18. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
  19. ??
  20. ????????String?message?=?getMessage(argv);??
  21. ??
  22. ????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes());??
  23. ????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
  24. ??
  25. ????????channel.close();??
  26. ????????connection.close();??
  27. ????}??
  28. ????//...??
  29. }??

接收端:
[java]? view plain copy print ? 在CODE上查看代碼片 派生到我的代碼片
  1. import?java.io.IOException;??
  2. import?com.rabbitmq.client.ConnectionFactory;??
  3. import?com.rabbitmq.client.Connection;??
  4. import?com.rabbitmq.client.Channel;??
  5. import?com.rabbitmq.client.QueueingConsumer;??
  6. ??
  7. public?class?ReceiveLogs?{??
  8. ??
  9. ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
  10. ??
  11. ????public?static?void?main(String[]?argv)??
  12. ??????????????????throws?java.io.IOException,??
  13. ??????????????????java.lang.InterruptedException?{??
  14. ??
  15. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  16. ????????factory.setHost("localhost");??
  17. ????????Connection?connection?=?factory.newConnection();??
  18. ????????Channel?channel?=?connection.createChannel();??
  19. ??
  20. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
  21. ????????String?queueName?=?channel.queueDeclare().getQueue();??
  22. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?"");??
  23. ??
  24. ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");??
  25. ??
  26. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  27. ????????channel.basicConsume(queueName,?true,?consumer);??
  28. ??
  29. ????????while?(true)?{??
  30. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  31. ????????????String?message?=?new?String(delivery.getBody());??
  32. ??
  33. ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
  34. ????????}??
  35. ????}??
  36. }??


像以前一樣,我們開始做編譯
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想將日志保存到一個文件,打開一個控制臺并運行
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你想看到日志在你的屏幕上,產生一個新的終端并運行:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
發布日志類型:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings實際上您可以驗證綁定和隊列的代碼是否是我們想要的? 有兩個ReceiveLogs。
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

轉載于:https://my.oschina.net/zhanghaiyang/blog/599408

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/448594.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/448594.shtml
英文地址,請注明出處:http://en.pswp.cn/news/448594.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

iOS有哪些數據類型/基本數據類型?

簡述 本文主要探究使用OC作為iOS開發語言時&#xff0c;我們能使用哪些數據類型。 一切類型始于C。 C語言的類型 基本數據類型&#xff1a; 基本數據類型&#xff08;fundamental data types&#xff09;也叫原始數據類型&#xff08;primitive data types&#xff09; 整型、字…

李洋瘋狂C語言之將”you are come from shanghai ”倒置為”shanghai from come are you”,將句子中的單詞位置倒置,而不改變單詞內部結構

題目: 編寫一個C函數,將”you are come from shanghai ”倒置為”shanghai from come are you”,及將句子中的單詞位置倒置,而不改變單詞內部結構 #include <stdio.h> #include <string.h> void change(char *p1, char *p2); //函數聲明 int main() {char str[] …

馬桶怎么清洗才干凈無異味?

方法/步驟 在馬桶水箱中一定要放上潔廁寶&#xff1a; 潔廁寶里面含有多種去除馬桶中雜質以及異味的功能&#xff0c;另外它還帶有香香的味道&#xff0c;我們一按沖馬桶的按鈕&#xff0c;放出來的總是藍色的水&#xff0c;十分的美觀和好看&#xff0c;但是這并不是花瓶般的作…

白話解說:阻塞和非阻塞,同步和異步

阻塞和非阻塞&#xff0c;同步和異步是node.js里經常遇到的詞匯&#xff0c;舉例說明&#xff1a; 我要看足球比賽&#xff0c;但是媽媽叫我燒水&#xff0c;電視機在客廳&#xff0c;燒水要在廚房。家里有2個水壺&#xff0c;一個是普通的水壺&#xff0c;另一個是水開了會叫的…

蘇嵌點滴(一)

來蘇嵌也有12天了&#xff0c;也漸漸開始習慣這樣的生活&#xff0c;每天睜眼到閉眼&#xff0c;全都是代碼。每天都得學習很多新的知識&#xff0c;C語言學到現在也學得差不多了&#xff0c;還有明天一天課。 指針、數組這些C語言中的重點&#xff0c;還是需要一點時間消化的…

Mysql學習總結(8)——MySql基本查詢、連接查詢、子查詢、正則表達查詢講解...

2019獨角獸企業重金招聘Python工程師標準>>> 查詢數據指從數據庫中獲取所需要的數據。查詢數據是數據庫操作中最常用&#xff0c;也是最重要的操作。用戶可以根據自己對數據的需求&#xff0c;使用不同的查詢方式。通過不同的查詢方式&#xff0c;可以獲得不同的數據…

安裝OpenCL和AMD驅動程序

我們將安裝AMD OpenCL軟件開發工具包&#xff08;SDK&#xff09;和AMD驅動程序。 userubuntu:~$ mkdir AMD-APP-SDK-v2.5-lnx64 userubuntu:~$ cd AMD-APP-SDK-v2.5-lnx64/ userubuntu:~$ wgethttp://developer.amd.com/Downloads/AMD-APP-SDK-v2.5-lnx64.tgz userubuntu:~$ t…

Node.js -- Stream 使用小例 ( 流運用 :讀取、寫入、寫出、拷貝)

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Stream 是一個抽象接口&#xff0c;Node 中有很多對象實現了這個接口。例如&#xff0c;對http 服務器發起請求的request 對象就是一個 …

李洋瘋狂C語言之有關“you are come from shanghai”逆序(二)

今天將指針和函數全部學完了&#xff0c;之前這題的做法&#xff0c;現在看來有點繁瑣&#xff0c;于是乎做了一些修改&#xff0c;下面是新的代碼 //you are from shanghai逆序#include <stdio.h> #include <string.h> //下面要用到strlenvoid reverse(c…

sync - 清空文件系統緩沖區

總覽 (SYNOPSIS) sync [OPTION] 描述 (DESCRIPTION) 強迫把更改的塊寫入磁盤&#xff0c; 并更新超級塊。 --help顯示幫助然后終止。--version顯示版本信息然后終止。 轉載于:https://www.cnblogs.com/fanweisheng/p/11101219.html

學會用好 Visual Studio Code

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Visual Studio Code是個牛逼的編輯器&#xff0c;啟動非常快&#xff0c;完全可以用來代替其他文本文件編輯工具。又可以用來做開發&…

蘇嵌點滴(二)

今天把指針和函數講完了&#xff0c;這些都還能接受&#xff0c;之后老師和我們講了遞歸&#xff0c;有點難度。 晚上電腦還出了點狀況&#xff0c;一個晚自習全用來重裝系統和學習軟件套裝X_X&#xff0c;調試完已經接近下課&#xff0c;遞歸還沒來得及看。 放學后&#xff…

Maven學習總結(五)——聚合與繼承

2019獨角獸企業重金招聘Python工程師標準>>> Maven學習總結(五)——聚合與繼承 一、聚合 如果我們想一次構建多個項目模塊&#xff0c;那我們就需要對多個項目模塊進行聚合 1.1、聚合配置代碼 1 <modules> 2 <module>模塊一</module> 3 …

19-6/28作業:100以內偶數求和

?作業要求 ?分析思路 在循環里面增加約束&#xff0c;使累加1變成累加2 ?do-while循環代碼 public class GaoSiFor { public static void main(String[] args) { //定義兩個變量 int sum 0; int i 0; //開始循環 do{ //…

Visual Studio Code 常用快捷鍵

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 一、常用快捷鍵 編輯器與窗口管理 打開一個新窗口&#xff1a; CtrlShiftN 關閉窗口&#xff1a; CtrlShiftW 新建文件 CtrlN 文件之間切…

李洋瘋狂C語言之用遞歸解決李白喝酒問題(附填空題解法)

這是14年藍橋杯的一道填空題 題目&#xff1a;“李白街上走&#xff0c;提壺去買酒&#xff0c;遇店加一倍&#xff0c;見花喝一斗”&#xff0c; 途中&#xff0c;遇見5次店&#xff0c;見了10此花&#xff0c;壺中原有2斗酒&#xff0c;最后剛好喝 完酒&#xff0c;要求最…

coco creator編輯動畫坑之拖圖片

如圖所示&#xff0c;批量選中多張圖片之后&#xff0c;拖到右下角的框中&#xff0c;發現有時候可以有時候不行。這個我覺得是個軟件的bug 后來經過測試發現只有在粉紅色圈的高度才可以正確放入圖片&#xff0c;否則都放不了。轉載于:https://www.cnblogs.com/codeDevotee/p/1…

Spring MVC常用注解說明

2019獨角獸企業重金招聘Python工程師標準>>> 使用Spring MVC的注解及其用法和其它相關知識來實現控制器功能。02 之前在使用Struts2實現MVC的注解時&#xff0c;是借助struts2-convention這個插件&#xff0c;如今我們使用Spring自帶的spring-webmvc組件來實現同…

Ubuntu12.10中安裝ati顯卡驅動amd driver 13.1

1. 先安裝依賴庫 [plain] view plaincopyprint?sudo apt-get install build-essential cdbs dh-make dkms execstack dh-modaliases fakeroot libqtgui4 debhelper debconf libstdc6 dkms libqtgui4 libelfg0 linux-headers-generic 2. 如果是64位系統&#xff0c;需要安裝3…

李洋瘋狂C語言之用遞歸解決李白喝酒問題(二)

之前用遞歸求出了多少種情況&#xff0c;如果要打印出所有的結果&#xff0c;就需要一個數組來存放可能的情況&#xff1a; /****************************************************** 題目&#xff1a;“李白街上走&#xff0c;提壺去買酒&#xff0c;遇店加一倍&#xff0c;…