RabbitMQ 消息隊列安裝及入門

市面常見消息隊列中間件對比

技術名稱吞吐量 /IO/并發時效性(類似延遲)消息到達時間可用性可靠性優勢應用場景
activemq萬級簡單易學中小型企業、項目
rabbitmq萬級極高(微秒)極高生態好(基本什么語言都支持)、時效性高、易學適合絕大數的分布式應用
kafka10萬 QBS高(毫秒)極高極高吞吐量大、可靠性、可用性、強大的數據流處理能力適合大規模處理數據的場景、比如構建日志手機系統、實時數據傳輸、事件流收集傳輸
rocketmq10萬 QBS高ms極高極高吞吐量大、可靠性、可用性、可擴展性適用于金融等可靠性要求較高的場景、適合大規模的消息處理。金融、電商、大規模社交
pulsar10萬 QBS高ms極高可靠性、可用性很高、新興(技術架構先進)適合大規模、高并發的分布式系統(云原生)適合實時分析、事件流處理、物聯網數據處理。

RabbitMQ?

RabbitMQ 是基于 AMQP 高級消息隊列協議的。

?實際使用可根據官方文檔的 demo 。

官方文檔:RabbitMQ Tutorials | RabbitMQ

模型

生產者:通俗就是發消息的人,比如在外賣軟件上點餐的人

消費者:通俗就是處理消息的任務,比如外賣軟件上的商家,需要根據顧客的要求制作餐

交換機:負責把消息轉發到對應的隊列中,比如有外賣的時候,系統給附近的外面小哥派單

隊列:存放消息的地方,等待消費者消費,比如商家肯定不是只做一份餐,做好的餐放在一個指定的位置等待外賣小哥來取餐

路由:轉發,就是怎么把消息從一個地方轉到另一個地方,通常加在交換機和隊列之間,比如系統指定某個范圍的外賣小哥接這單

安裝

1. 首先安裝 RabbitMQ,直接官網下載即可,如果下載速度慢,可以換個網絡,或者找找有沒有國內鏡像。(當初我下載的時候找了半天的鏡像都是版本比較老的,結果想著掛一晚上下載,結果官網突然就快了,白折騰了。)

官方網站:Installing on Windows | RabbitMQ

國內鏡像:Index of rabbitmq-server-local/v3.12.7

一路 next ,傻瓜式安裝即可

安裝之后檢查服務中是否已經運行了。

2. 安裝監控面板

在 RabbitMQ 安裝目錄下的 sbin 目錄下的CMD 輸入下面的命令

rabbitmq-plugins.bat enable rabbitmq_management

?安裝成功:

默認端口號是 5672,webUI 是 15672

在瀏覽器輸入地址打開管理界面:http://localhost:15672

默認賬號密碼是 guest

注意:1. 安裝目錄不能是中文,不能有空格等非法字符,否則頁面打不開

????????? ?2. 如果想要在遠程服務器訪問 RabbitMQ 管理面板,需要創建管理員賬號,比如在寶塔面板使用時寶塔面板提供的 admin賬號,地址就是寶塔面板的 IP?

創建賬號:access-control | RabbitMQ

入門

依賴引入

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

單消費者和生產者

一對一的關系

1. 生產者代碼

public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();//設置了本地連接,如果修改了用戶名和密碼,需要設置/*factory.setPassword();factory.setUsername();*/factory.setHost("localhost");//建立連接、創建頻道//頻道,類似客戶端,用于調用serverConnection connection = factory.newConnection();Channel channel = connection.createChannel();//創建隊列,隊列持久化,第二份參數設置為 truechannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

channel 頻道:理解為操作消息隊列的 Client,通過 channel 收發消息,提供了和消息對了 server 建立通信的傳輸方法

channel.queueDeclare 方法參數:

queue:這是一個字符串參數,代表要聲明的隊列的名稱。如果隊列不存在,則會自動創建一個新的隊列。

durable:這是一個布爾值參數,表示隊列是否持久化。如果設置為true,則隊列會在服務器重啟后仍然存在;如果設置為false,則隊列在服務器重啟后會被刪除。默認值為false。

exclusive:這也是一個布爾值參數,表示隊列是否為獨占模式。如果設置為true,則只有當前連接可以訪問該隊列;如果設置為false,則其他連接也可以訪問該隊列。默認值為false。

autoDelete:這是另一個布爾值參數,表示隊列是否自動刪除。如果設置為true,則當最后一個消費者取消訂閱時,隊列將被刪除;如果設置為false,則隊列將一直存在,直到手動刪除或服務器重啟。默認值為false。

arguments:這是一個可選參數,用于設置隊列的其他屬性,比如消息的最大長度、最大優先級等。

channel.basicPublish 參數:

exchange:這是一個字符串參數,代表交換機的名稱。如果不需要使用特定的交換機,可以傳遞一個空字符串("")。交換機是RabbitMQ中用于接收生產者發送的消息并根據綁定規則路由到隊列的組件。

routingKey:這也是一個字符串參數,它指定了發布消息的隊列。無論通道綁定到哪個隊列,最終發布的消息都會包含這個指定的路由鍵。路由鍵是用來確定消息應該發送到哪個隊列的重要信息。

message:這是要發布的消息本身,通常是字節數組的形式。

properties:這是一個可選參數,用于設置消息的屬性,比如消息的優先級、過期時間等。

在使用channel.basicPublish時,需要注意以下幾點:

exchange和routingKey不能為空:在AMQImpl類中的實現要求這兩個參數都不能為null,否則會拋出異常。

交換機類型:根據不同的需求,可以選擇不同類型的交換機,如fanout、direct或topic。每種類型的交換機都有其特定的路由規則。

非命名隊列:在某些情況下,比如日志系統,可以使用非命名隊列,這樣消費者可以接收到所有相關的日志消息,而不是特定的部分。

2. 消費者代碼

public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明隊列,同一個消息隊列參數必須一致channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定義了如何處理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//接收、消費消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

?channel.basicConsume 參數:

  1. queue:這是一個字符串參數,代表要消費的隊列的名稱。如果隊列不存在,則會拋出異常。
  2. onMessage:這是一個回調函數,當有新的消息到達時會被調用。該函數需要接收兩個參數:一個表示消息內容的Delivery對象和一個表示通道的Channel對象。
  3. consumerTag:這是一個可選參數,用于標識消費者。如果沒有指定,則會自動生成一個唯一的標識符。
  4. autoAck:這是一個布爾值參數,表示是否自動確認消息。如果設置為true,則在消息被處理后會自動發送確認信息;如果設置為false,則需要手動發送確認信息。默認值為false。
  5. arguments:這是一個可選參數,用于設置消費者的其他屬性,比如消息的最大長度、最大優先級等。

在使用channel.basicConsume時,需要注意以下幾點:

  1. 隊列名稱:隊列名稱應該是唯一的,否則會拋出異常。
  2. 消息處理:在onMessage回調函數中,需要對消息進行處理,并根據需要發送確認信息。
  3. 消費者標識符:可以通過設置consumerTag來標識消費者,以便在后續操作中進行識別和管理。
  4. 消費者屬性:可以通過設置消費者的其他屬性來控制消費者的行為,比如設置消息的最大長度、最大優先級等。

多消費者

多個消費者,比如一個工廠生產商品,一個商店賣不完,分給多個商店一起賣

生產者代碼和上面一樣


public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();//設置本地連接factory.setHost("localhost");//創建隊列,創建頻道,類似客戶端try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//隊列持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//設置消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){//輸入消息String message = scanner.nextLine();//發送消息channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

控制處理任務的積壓數,最多同時處理任務數

channel.basicQos(1); //最多處理1個

消息確認機制

ack 確認、nack 消息失敗、reject 拒絕

當消息拿走之后會有一個確認機制,保證消息成功被消費。當消費者接收消息會給一個反饋,確認消息的狀態,成功消息才會被移除。

支持配置 autoack ,建議修改為 false,根據實際情況手動確認。

//手動確認
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手動拒絕
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

?消費者代碼

public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");for (int i = 0; i < 2; i++) {final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();//隊列持久化,參數要一致channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制處理任務的積壓數,最多同時處理任務數channel.basicQos(1);//定義了如何處理消息int finalI = i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//處理工作的邏輯System.out.println(" [x] Received '" +"消費者:" + finalI + " 消息:"+ message + "'");//睡一定時間,模擬機器處理能力有限Thread.sleep(20000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//接收消息,消費消息,開啟消息監聽channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {});}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/14481.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/14481.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/14481.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

為什么MySQL推薦使用utf8mb4代替utf8?

前言 在MySQL數據庫的世界里&#xff0c;字符集的選擇直接影響著數據的存儲和檢索方式&#xff0c;尤其是對于多語言支持至關重要的應用而言。近年來&#xff0c;utf8mb4字符集逐漸成為MySQL中存儲Unicode字符的標準選擇&#xff0c;逐步取代了傳統的utf8字符集。本文將詳細探…

leetcode124 二叉樹中的最大路徑和-dp

題目 二叉樹中的 路徑 被定義為一條節點序列&#xff0c;序列中每對相鄰節點之間都存在一條邊。同一個節點在一條路徑序列中 至多出現一次 。該路徑 至少包含一個 節點&#xff0c;且不一定經過根節點。 路徑和 是路徑中各節點值的總和。 給你一個二叉樹的根節點 root &…

【Crypto】Rabbit

文章目錄 一、Rabbit解題感悟 一、Rabbit 題目提示很明顯是Rabbit加密&#xff0c;直接解 小小flag&#xff0c;拿下&#xff01; 解題感悟 提示的太明顯了

redis核心面試題二(實戰優化)

文章目錄 10. redis配置mysql實戰優化[重要]11. redis之緩存擊穿、緩存穿透、緩存雪崩12. redis實現分布式session 10. redis配置mysql實戰優化[重要] // 最初實現OverrideTransactionalpublic Product createProduct(Product product) {productRepo.saveAndFlush(product);je…

MQTT 5.0 報文解析 05:DISCONNECT

歡迎閱讀 MQTT 5.0 報文系列 的第五篇文章。在上一篇中&#xff0c;我們已經介紹了 MQTT 5.0 的 PINGREQ 和 PINGRESP 報文。現在&#xff0c;我們將介紹下一個控制報文&#xff1a;DISCONNECT。 在 MQTT 中&#xff0c;客戶端和服務端可以在斷開網絡連接前向對端發送一個 DIS…

手把手教你搭建一個花店小程序商城

如果你是一位花店店主&#xff0c;想要為你的生意搭建一個精美的小程序商城&#xff0c;以下是你將遵循的五個步驟。 步驟1&#xff1a;登錄喬拓云平臺進入后臺 首先&#xff0c;你需要登錄喬拓云平臺的后臺管理頁面。你可以在電腦或移動設備上的瀏覽器中輸入喬拓云的官方網站…

2024.5.26 機器學習周報

目錄 引言 Abstract 文獻閱讀 1、題目 2、引言 3、創新點 4、Motivation 5、naive Lite-HRNet 6、Lite-HRNet 7、實驗 深度學習 解讀SAM(Segment Anything Model) 1、SAM Task 2、SAM Model 2.1、Patch Embedding 2.2、Positiona Embedding 2.3、Transformer …

移動端適配:vw適配方案

vw (Viewport Width) 是一種長度單位&#xff0c;代表視口寬度的百分比。1vw 等于視口寬度的1%。在網頁設計和前端開發中&#xff0c;vw 單位常用于實現響應式設計和屏幕適配&#xff0c;尤其是針對不同尺寸和分辨率的移動設備。 為什么使用vw適配&#xff1f; 響應式: 使用v…

互聯網醫院開發:引領智慧醫療新時代

隨著科技的迅猛發展和互聯網的普及&#xff0c;傳統醫療模式正在迎來一場深刻的變革。互聯網醫院的崛起&#xff0c;打破了時間和空間的限制&#xff0c;為患者和醫療機構帶來了更加便捷、高效、安全的醫療服務體驗。本文將從技術角度深入探討互聯網醫院的開發&#xff0c;包括…

【openpcdet中yaml文件的DATA_AUGMENTOR學習】

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、代碼二、詳細解釋DISABLE_AUG_LISTAUG_CONFIG_LIST1. gt_sampling2. random_world_flip3. random_world_rotation4. random_world_scaling 總結 前言 提示…

多線程(八)

一、wait和notify 等待 通知 機制 和join的用途類似,多個線程之間隨機調度,引入 wait notify 就是為了能夠從應用層面上,干預到多個不同線程代碼的執行順序.( 這里說的干預,不是影響系統的線程調度策略 內核里的線程調度,仍然是無序的. 相當于是在應用程序…

Pod容器資源限制和探針

目錄 一、資源限制 1.Pod和容器的資源請求和限制 2.CPU 資源單位 案例一 案例二 二、健康檢查&#xff0c;又稱為探針&#xff08;Probe&#xff09; 1.探針的三種規則 2.Probe支持三種檢查方法 3.探測獲得的三種結果 案例一&#xff1a;exec 案例二&#xff1a;htt…

OneMO同行 心級服務:中移物聯OneMO模組助力客戶終端寒冷環境下的穩定運行

中移物聯OneMO模組以客戶為中心&#xff0c;基于中國移動心級服務要求&#xff0c;開展“OneMO同行 心級服務 標定一流”高標服務主題活動&#xff0c;升級“服務內容““服務方式”和“服務意識”&#xff0c;為行業客戶提供全新的服務體驗。 近日&#xff0c;某車載監控設備…

Hive語法學習總結

Hive SQL語法學習總結 hive參數庫操作1.創建庫2.具體案例3.庫的其他操作 表和庫的路徑演示表的操作創建表插入數據 hive參數 一 hive常用交互命令hive -e sql語句hive -f sql文件 //文件中是sql語句二 參數的設置方式一&#xff1a;在客戶端中設置參數(當次有效)set 參數名參…

ACM實訓第十七天

Is It A Tree? 問題 考試時應該做不出來&#xff0c;果斷放棄 樹是一種眾所周知的數據結構&#xff0c;它要么是空的(null, void, nothing)&#xff0c;要么是一個或的集合滿足以下屬性的節點之間有向邊連接的節點較多。 ?只有一個節點&#xff0c;稱為根節點&#xff0c;它…

【Crypto】摩絲

文章目錄 一、摩斯解題感悟 一、摩斯 很明顯莫爾斯密碼 iloveyou還挺浪漫 小小flag&#xff0c;拿下 解題感悟 莫爾斯密碼這種題還是比較明顯的

【董曉算法】競賽常用知識之圖論3(最近公共祖先)

前言&#xff1a; 本系列是學習了董曉老師所講的知識點做的筆記 董曉算法的個人空間-董曉算法個人主頁-嗶哩嗶哩視頻 (bilibili.com) 動態規劃系列&#xff08;還沒學完&#xff09; 【董曉算法】動態規劃之線性DP問題-CSDN博客 【董曉算法】動態規劃之背包DP問題&#xff…

智能鎖千千萬,誰是你的NO.1,親身實測凱迪仕傳奇大師K70旗艦新品

智能鎖千千萬&#xff0c;誰是你的NO.1。歡迎來到智哪兒評測室&#xff0c;這次我們為大家帶來了凱迪仕傳奇大師K70系列的一款重磅新品。 在科技的浪潮中&#xff0c;家居安全領域正經歷著前所未有的變革。智能鎖越來越成為家的安全守護神&#xff0c;以及智能生活的得力助手。…

Android 11 Framework實時監聽Activity堆棧變化

核心類 Framework中有一個類SystemActivityMonitoringService專門用于監控Activity堆棧變化&#xff0c;屬于隱藏Api&#xff0c;應用側無法調用。此類位于 packages/services/Car/service/src/com/android/car/SystemActivityMonitoringService.java 方法 void registerTa…

Mysql信息脫敏

類似微信姓名脫敏&#xff1a; SELECT CONCAT( REPEAT(*, CHAR_LENGTH(real_name) -1 ), RIGHT(real_name, 1) ) name from user_info電話號脫敏&#xff1a; SELECT CONCAT(LEFT(mobile_phone, 3), REPEAT(*, 4 ), RIGHT(mobile_phone, 4) ) phone from user_info