RabbitMQ教程總結

【譯】RabbitMQ教程一

  • 主要通過Hello Word對RabbitMQ有初步認識

【譯】RabbitMQ教程二

  • 工作隊列,即一個生產者對多個消費者
  • 循環分發、消息確認、消息持久、公平分發

【譯】RabbitMQ教程三

  • 如何同一個消息同時發給多個消費者
  • 開始引入RabbitMQ消息模型中的重要概念路由器Exchange以及綁定等
  • 使用了fanout類型的路由器

【譯】RabbitMQ教程四

  • 如何選擇性地接收消息
  • 使用了direct路由器

【譯】RabbitMQ教程五

  • 如何通過多重標準接收消息
  • 使用了topic路由器,可通過靈活的路由鍵和綁定鍵的設置,
    進一步增強消息選擇的靈活性

【譯】RabbitMQ教程六

  • 如何使用RabbitMQ實現一個簡單的RPC系統
  • 回調隊列callback queue和關聯標識correlation id

各教程代碼

  • 官方:GitHub rabbitmq-tutorials
  • 我整理的:rabbitmq-tutorial-java

RabbitMQ 一般工作流程

生產者和RabbitMQ服務器建立連接和通道,聲明路由器,同時為消息設置路由鍵,這樣,所有的消息就會以特定的路由鍵發給路由器,具體路由器會發送到哪個或哪幾個隊列,生產者在大部分場景中都不知道。(1個路由器,但不同的消息可以有不同的路由鍵)。
消費者和RabbitMQ服務器建立連接和通道,然后聲明隊列,聲明路由器,然后通過設置綁定鍵(或叫路由鍵)為隊列和路由器指定綁定關系,這樣,消費者就可以根據綁定鍵的設置來接收消息。(1個路由器,1個隊列,但不同的消費者可以設置不同的綁定關系)。


主要方法

  • 聲明隊列(創建隊列):可以生產者和消費者都聲明,也可以消費者聲明生產者不聲明,也可以生產者聲明而消費者不聲明。最好是都聲明。(生產者未聲明,消費者聲明這種情況如果生產者先啟動,會出現消息丟失的情況,因為隊列未創建)
    channel.queueDeclare(String queue, //隊列的名字boolean durable, //該隊列是否持久化(即是否保存到磁盤中)boolean exclusive,//該隊列是否為該通道獨占的,即其他通道是否可以消費該隊列boolean autoDelete,//該隊列不再使用的時候,是否讓RabbitMQ服務器自動刪除掉Map<String, Object> arguments)//其他參數
  • 聲明路由器(創建路由器):生產者、消費者都要聲明路由器---如果聲明了隊列,可以不聲明路由器。
    channel.exchangeDeclare(String exchange,//路由器的名字String type,//路由器的類型:topic、direct、fanout、headerboolean durable,//是否持久化該路由器boolean autoDelete,//是否自動刪除該路由器boolean internal,//是否是內部使用的,true的話客戶端不能使用該路由器Map<String, Object> arguments) //其他參數
  • 綁定隊列和路由器:只用在消費者

    channel.queueBind(String queue, //隊列String exchange, //路由器String routingKey, //路由鍵,即綁定鍵Map<String, Object> arguments) //其他綁定參數
  • 發布消息:只用在生產者

    channel.basicPublish(String exchange, //路由器的名字,即將消息發到哪個路由器String routingKey, //路由鍵,即發布消息時,該消息的路由鍵是什么BasicProperties props, //指定消息的基本屬性byte[] body)//消息體,也就是消息的內容,是字節數組
    • BasicProperties props:指定消息的基本屬性,如deliveryMode為2時表示消息持久,2以外的值表示不持久化消息
      //BasicProperties介紹
      String corrId = "";
      String replyQueueName = "";
      Integer deliveryMode = 2;
      String contentType = "application/json";
      AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();
  • 接收消息:只用在消費者
    channel.basicConsume(String queue, //隊列名字,即要從哪個隊列中接收消息boolean autoAck, //是否自動確認,默認trueConsumer callback)//消費者,即誰接收消息
    • 消費者中一般會有回調方法來消費消息
      Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //該消費者的標簽Envelope envelope,//字面意思為信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//獲取消息示例String message = new String(body, "UTF-8");//接下來就可以根據消息處理一些事情}};

路由器類型

  • fanout:會忽視綁定鍵,每個消費者都可以接受到所有的消息(前提是每個消費者都要有各自單獨的隊列,而不是共有同一隊列)。
  • direct:只有綁定鍵和路由鍵完全匹配時,才可以接受到消息。
  • topic:可以設置多個關鍵詞作為路由鍵,在綁定鍵中可以使用*#來匹配
  • headers:(可以忽視它的存在)

教程一 HelloWorld

看主要代碼

//生產者
channel.queueDeclare(QUEUE_NAME, false, false, false, null); ----①
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
//消費者
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, consumer);

這里,生產者和消費者都沒有聲明路由器,而是聲明了同名的隊列。生產者發布消息時,使用了默認的無名路由器(""),并以隊列的名字作為了路由鍵。消費者在消費時,由于沒有聲明路由器,這并不表示沒有路由器的存在,消費者此時使用的是默認的路由器,即Default exchange,該路由器和所有的隊列都進行綁定,并且使用隊列的名字作為了路由鍵進行綁定。所以,生產者使用默認路由器以隊列的名字作為了綁定鍵進行了消息發布,而消費者也使用了默認的路由器,并以隊列的名字作為綁定鍵進行了綁定。而默認路由器是direct類型,路由鍵和綁定鍵完全匹配時,消費者才能接受到消息,所以教程1中的消費者可以接收到消息。(為了認證這一點,可以將代碼①去掉,然后先運行消費者,讓它等待監聽,然后啟動生產者,發送消息,消費者同樣會收到消息。這里的生產者聲明隊列,只是讓RabbitMQ服務器先創建這個隊列,以免發送的消息因為找不到隊列而丟失。)


教程二 Work Queues

看主要代碼

//生產者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "1.";
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//消費者
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);---①...channel.basicAck(envelope.getDeliveryTag(), false);...---③
boolean autoAck = false;---②
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

這里也使用了默認的direct路由器。假如啟動多個工作者(消費者),按道理這些工作者應該可以接收到所有的消息啊,但是不要忘了這幾個工作者都是從同一個隊列中取消息,消息取出一個,隊列中就少一個,所以每個工作者都只是收到的消息的一部分。既然這幾個工作者都從同一個隊列中取消息,那每個工作者應該怎么取呢?

如果沒有代碼①,并且②設置為true,即自動確認收到消息,RabbitMQ只要發出消息就認為消費者收到了,此時RabbitMQ采取的是循環分發的策略,在這幾個工作者中循環輪流分發消息。每個工作者接受到的消息數量都是相同的。
如果有代碼①,并且②設置為false,則RabbitMQ會采取公平分發策略,即將消息發給空閑的工作者(空閑,工作者將消息處理完畢,執行了代碼③;不空閑,即工作者還在處理消息,還沒有給RabbitMQ發回確認信息,即還沒有執行代碼③)。
代碼①中的參數1:(prefetchCount)maximum number of messages that the server will deliver

為了防止隊列丟失,在聲明隊列的時候指定了durabletrue。為了防止消息丟失,設置了消息屬性BasicPropertiesMessageProperties.PERSISTENT_TEXT_PLAIN,讓我們看看值是什么:


MessageProperties.PERSISTENT_TEXT_PLAIN

可以看出里面包含了deliveryMode=2。從這張圖也可以看到BasicProperties屬性的全貌。

如果想讓多個消費者共同消費某些消息,只要讓他們共用同一隊列即可(當然前提是你得保證消息可以都進到這個隊列中來,如本例中使用direct路由器,消息的路由鍵和隊列的綁定鍵設為一致,當然也可以使用fanout路由器,路由鍵和綁定鍵隨意設置,不一致也能收到,因為fanout路由器會忽略路由鍵的設置)。


教程三 Publish/Subscribe

看主要代碼

 //生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();---①
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, consumer);

教程三才引出路由器的概念。生產者和消費者聲明了同樣的路由,并指明路由類型為fanout,該路由器會忽視路由鍵,將消息發布到所有綁定的隊列中(仍需要綁定,只是綁定時綁定鍵任意就行了)。
假如啟動多個消費者,因為代碼①中調用無參的聲明去惡劣方法channel.queueDeclare(),就會創建了一個非持久、獨特的、自動刪除的隊列,并返回一個自動生成的名字。所以多個消費者取消息時使用的是各自的隊列,不會存在多個消費者從同一個隊列取消息的情況。
這樣多個消費者就可以接收到同一消息。

如果想實現多個消費者都可以接收到所有的消息,只要讓他們各自使用單獨的隊列即可(當然前提是保證路由鍵和綁定鍵的設置可以讓消息都進入到隊列,如本例中使用fanout路由器,無需考慮綁定鍵和路由鍵)。


教程4 Routing

看主要代碼:

//生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);
} 
channel.basicConsume(queueName, true, consumer);

可以看出,教程3使用了direct路由器,該路由器的特點是可以設定路由鍵和綁定鍵,消費者只能從隊列中取出兩者匹配的消息。
在生產者發消息時,為消息設置不同的路由鍵(如例子中severity可以設為infowarnerror)。
消費者在通過為隊列設置多個綁定關系,來選擇想要接收的消息。
這里有一個概念叫做多重綁定,即多個隊列以相同的綁定鍵binding key綁定到同一個路由器上,此時direct路由器就會像fanout路由器一樣,將消息廣播給所有符合路由規則的隊列。


教程5 Topics

看主要代碼:

//生產者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "msg...";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
//消費者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME,   bindingKey);}
channel.basicConsume(queueName, true, consumer);

這里使用了topic路由器,它與direct路由器類似,不同在于,topic路由器可以為路由鍵設置多重標準。一個消息有一個路由鍵,direct路由器只能為路由鍵指定一個關鍵字,但是topic路由器可以在路由鍵中通過點號分割多個單詞來組成路由鍵,消費者在綁定的時候,可以設置多重標準來選擇接受。
舉個例子:假如日志根據嚴重級別infowarnerror,也可以根據來源分為cronkernauth。某個日志消息設置路由鍵為kern.info,表示來自kerninfo級別的日志。想要選擇接收消息的時候,direct路由器就辦不到,它要么可以根據嚴重級別來篩選,要么根據來源來篩選,而topic路由器則可以輕松應對,只要將綁定鍵設置為kern.info就可以精準獲取該類型的日志。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387431.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387431.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387431.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

3 幫助命令、用戶管理、壓縮

幫助命令&#xff1a; man 命令或配置文件 獲得幫助信息 /l 查看所有和l相關的行 q 退出 man passwd 1命令的幫助 5配置文件的幫助 man 1 passwd man 5 passwd 默認查看命令的幫助 man 5 passwd 查看配置文件的幫助 whatis 命令 查看命令的功能性描述 whatis ls ap…

[bzoj1039] [ZJOI2008]無序運動Movement

Description D博士對物理有著深入的研究&#xff0c;經典物理、天體物理、量子物理都有著以他的名字命名的定理。最近D博士著迷于研究粒子運動的無規則性。對圣經深信不疑的他相信&#xff0c;上帝創造的任何事物必然是有序的、有理可循的&#xff0c;而不是無規則的、混沌的。…

關于shiro session失效報錯問題

最近做了一個項目&#xff0c;要用到shiro&#xff0c;做完之后發現有個異常經常發生org.apache.shiro.session.UnknownSessionException: There is no session with id &#xff0c;經過多天的研究&#xff0c;終于得以解決 登錄的時候異常信息&#xff1a; [java] view plain…

4 網絡、掛載、關機

網絡命令: 給在線用戶發信 write 用戶名 編輯時&#xff0c;Ctrl退格鍵刪除錯誤輸入 CtrlD 保存輸入信息 wall 給所有在線用戶發信 ping命令 -c指定發送次數 ping -c 3 192.168.231.1 ifconfig 查看網卡信息 ifconfig eth1 192.168.231.100 臨時設置IP地址 mail 用戶名 …

#191 sea(動態規劃)

假設已經求出了i個點j個橋的連通圖數量f[i][j]&#xff0c;容易由此推出最終答案&#xff0c;套路地枚舉1號點所在連通塊大小即可。 假設已經求出了i個點的邊雙連通圖數量h[i]&#xff0c;考慮由此推出f[i][j]。可以枚舉其中一座橋將圖劃分成兩個部分&#xff0c;固定1號點在其…

linux下獲取占用CPU資源最多的10個進程,可以使用如下命令組合: ps aux|head -1;ps aux|grep -v PID|sort -rn -k +3|head linux下

linux下獲取占用CPU資源最多的10個進程&#xff0c;可以使用如下命令組合&#xff1a; ps aux|head -1;ps aux|grep -v PID|sort -rn -k 3|head linux下獲取占用內存資源最多的10個進程&#xff0c;可以使用如下命令組合&#xff1a; ps aux|head -1;ps aux|grep -v PID|s…

自定義注解與validation結合使用案例

案例1&#xff1a; [java] view plaincopy import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import javax.validation.Constraint; import…

5 Vim編輯器的使用

vi filename 命令模式 a i o 插入模式 后前 行 Esc鍵 回到命令模式 Shift&#xff1a; 編輯模式 set nu加行號 執行完命令后直接回到命令模式 :set nu 設置行號 :set nonu 取消行號 移動命令&#xff1a; gg 到第一行 G 到最后一行 nG 到第n行 :n到第n行 $ 移至行…

機器學習實戰(筆記)------------KNN算法

1.KNN算法 KNN算法即K-臨近算法&#xff0c;采用測量不同特征值之間的距離的方法進行分類。 以二維情況舉例&#xff1a; 假設一條樣本含有兩個特征。將這兩種特征進行數值化&#xff0c;我們就可以假設這兩種特種分別為二維坐標系中的橫軸和縱軸&#xff0c;將一個樣本以點的形…

hive的安裝配置

hive只需安裝在一個節點上。 1、將安裝包解壓&#xff0c;cd入conf文件夾下&#xff0c;執行命令cp hive-default.xml hive-site.xml 2、更改hive-site.xml的配置項 </property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql:/…

Java注解Annotation 完成驗證

Java注解Annotation用起來很方便&#xff0c;也越來越流行&#xff0c;由于其簡單、簡練且易于使用等特點&#xff0c;很多開發工具都提供了注解功能&#xff0c;不好的地方就是代碼入侵比較嚴重&#xff0c;所以使用的時候要有一定的選擇性。 這篇文章將利用注解&#xff0c;來…

隱藏馬爾科夫模型HMM

概率圖模型 HMM 先從一個具體的例子入手,看看我們要解決的實際問題.例子引自wiki.https://en.wikipedia.org/wiki/Hidden_Markov_model Consider two friends, Alice and Bob, who live far apart from each other and who talk together daily over the telephone about what …

常用HQL

進入hive客戶端后&#xff1a; 1、建表&#xff1a; create table page_view(viewTime int, userid bigint,page_url string, referrer_url string,ip string comment IP Address of the User)comment This is the page view tablepartitioned by(dt string, country string)r…

阿里云天池 金融風控訓練營Task1 廣東工業站

Task1 賽題理解 一、學習知識點概要 本次學習先是介紹了賽題的背景和概況&#xff0c;題目以金融風控中的個人信貸為背景&#xff0c;給所給的47列特征中&#xff0c;根據貸款申請人的數據信息預測其是否有違約的可能&#xff0c;以此判斷是否通過貸款。隨后介紹了比賽中的評…

如何將.crt的ssl證書文件轉換成.pem格式

如何將.crt的ssl證書文件轉換成.pem格式摘自&#xff1a;https://www.landui.com/help/show-8127 2018-07-04 14:55:41 2158次 準備:有一臺安裝了php的linux操作系統執行下面的openssl命令即可&#xff1a;openssl x509 -in www.xx.com.crt -out www.xx.com.pem轉載于:https://…

SpringMVC學習記錄--Validator驗證分析

一.基于Validator接口的驗證. 首先創建User實例,并加入幾個屬性 ?12345678910111213141516171819202122232425262728293031323334<code class"hljs cs">public class User {private String username;private String password;private String nickname;public …

NTP時間服務器實現Linux時間同步

在linux下&#xff0c;可以通過自帶的NTP(Network Time Protocol)協議通過網絡使自己的系統保持精確的時間。 什么是NTP&#xff1f; NTP是用來使系統和一個時間源保持時間同步的協議。 在自己管理的網絡中建立至少一臺時間服務器來同步本地時間&#xff0c;這樣可以使得在不同…

阿里云天池 Python訓練營Task1:從變量到異常處理

本學習筆記為阿里云天池龍珠計劃Python訓練營的學習內容&#xff0c;學習鏈接為&#xff1a;https://tianchi.aliyun.com/specials/promotion/aicamppython?spm5176.22758685.J_6770933040.1.6f103da1tESyzu 目錄 一、學習知識點概要 二、學習內容 I.變量、運算符與數據類…

python回收機制

目錄 Python的垃圾回收機制引子:一、什么是垃圾回收機制&#xff1f;二、為什么要用垃圾回收機制&#xff1f;三、垃圾回收機制原理分析1、什么是引用計數&#xff1f;2、引用計數擴展閱讀&#xff1f;&#xff08;折疊&#xff09;Python的垃圾回收機制 引子: 我們定義變量會申…

安裝openssl-devel命令

centos&#xff1a; yum install openssl-devel ubuntu&#xff1a; sudo apt-get install openssl sudo apt-get install libssl-dev