【RabbitMQ】工作隊列和發布/訂閱模式的具體實現

文章目錄

  • 建立連接
  • 工作隊列模式實現
    • 創建隊列和交換機
    • 生產者代碼
    • 消費者代碼
    • 運行程序
      • 啟動消費者
      • 啟動生產者
  • 發布/訂閱模式實現
    • 創建隊列和交換機
    • 生產者代碼
      • 創建交換機
      • 聲明兩個隊列
      • 綁定隊列和交換機
      • 發送消息
      • 完整代碼
    • 消費者代碼
      • 完整代碼
    • 運行程序
      • 啟動生產者
      • 啟動消費者

建立連接

我們把建立連接時,創建的連接工廠部分創建成常量,方便后面進行使用

  • rabbitmq 包下,再創建一個 constant
package rabbitmq.constant;  public class Constants {  static public final String HOST = "localhost";  static public final int PORT = 5672;  static public final String USER_NAME = "study";  static public final String PASSWORD = "study";  static public final String VIRTUAL_HOST = "coding ";  
}

工作隊列模式實現

和簡單模式相比較,工作隊列與之不同的就是有多個消費者,其他都一樣。所以我們只需要多添加幾個消費者即可

創建隊列和交換機

Constants 中添加:

// 工作隊列模式  
public static final String WORK_QUEUE = "work.queue ";

生產者代碼

package rabbitmq.work;  import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Producer {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立連接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 開啟信道  Channel channel = connection.createChannel();  //3. 聲明隊列  channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);  //4. 發送消息  for (int i = 0; i < 10; i++) {  String msg = "hello work queue..." + i;  channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());  }  System.out.println("消息發送成功!");  // 5. 資源釋放  channel.close();  connection.close();  }  
}

消費者代碼

package rabbitmq.work;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Consumer1 {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  // 1. 建立連接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 開啟信道  Channel channel = connection.createChannel();  //3. 聲明隊列  channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);  //4. 消費消息  DefaultConsumer consumer = new DefaultConsumer(channel){  // 從隊列中收到消息,就會執行的方法  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  //TODO  System.out.println("接收到消息: " + new String(body));  }  };  channel.basicConsume(Constants.WORK_QUEUE, true, consumer);  // 等待程序執行完成  Thread.sleep(2000);  // 5. 釋放資源  
//        channel.close();  
//        connection.close();  }  
}
  • 多個消費者的代碼都一樣的

運行程序

我們先啟動兩個消費者,再啟動生產者

  • 如果先啟動生產者,再啟動消費者,由于消息較少,處理較快,那么第一個啟動的消費者就會瞬間把 10 條消息消費掉,所以我們先啟動兩個消費者,再啟動生產者

啟動消費者

我們將兩個消費者啟動

  • 我們可以看到 rabbitmq 客戶端里面,work.queue 隊列已經被創建了出來
  • image.png

啟動生產者

在啟動消費者之后,我們啟動生產者,發送 10 條消息到隊列中

  • 我們可以看到,連個該消費者將 10 條消息消費完了image.png

發布/訂閱模式實現

在發布/訂閱模式中,多了一個 Exchange 角色

Exchange 常見有三種類型,分別代表不同的路由規則

  1. Fanout: 廣播,將消息交給所有綁定到交換機的隊列(Publish/Subscribe
  2. Direct: 定向,將消息交給符合指定 routingKey 的隊列 (Routing 模式)
  3. Topic: 通配符,把消息交給符合 routing pattern(路由模式)的隊列(Topics 模式)
    也就分別對應不同的工作模式

image.png

創建隊列和交換機

Constants 中添加:

// 發布訂閱模式  
public static final String FANOUT_EXCHANGE = "fanout.exchange";  
public static final String FANOUT_QUEUE1 = "fanout.queue1";  
public static final String FANOUT_QUEUE2 = "fanout.queue2";

生產者代碼

發布/訂閱模式的生產者代碼和簡單模式類似,只是有些變化

  • 需要聲明交換機
  • 需要指出交換機和隊列之間的關系

創建交換機

相比于生產者代碼和簡單模式,這一步是關鍵的一步。我們需要聲明一個交換機,而不是使用默認交換機

channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
  • 我們會使用到 exchangeDeclare() 方法
Exchange.DeclareOk exchangeDeclare(String exchange,  
BuiltinExchangeType type,  
boolean durable,  
boolean autoDelete,  
boolean internal,  
Map<String, Object> arguments) throws IOException;

參數解釋:

  1. exchange:交換機名稱
  2. type:交換機類型
    • Direct("direct"):定向,直連,routing
    • Fanout("fanout"):扇形(廣播),每個隊列都能收到消息
    • TOPIC("topic"):通配符
    • HEADERS("headers"):參數匹配(工作時用到的少)
  3. durable:是否持久化
    • true:持久化
    • false:非持久化
    • 持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息
  4. autoDelete:自動刪除
    • 自動刪除的前提是至少有一個對類或者交換器與這個交換器綁定,之后所有與這個交換器綁定的對類或交換器都與此解綁
    • 而不是這種理解:當與此交換器連接的客戶端都斷開時,RabbitMQ 會自動刪除本交換器
  5. internal:內部使用,一般 false
    • 如果設置為 true,表示內部使用
    • 客戶端程序無法直接發送消息到這個交換器中,只能通過交換器路由到交換器這種方式
  6. argument:參數

聲明兩個隊列

// 如果沒有一個這樣的隊列,會自動創建;如果有,則不創建
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

綁定隊列和交換機

channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");  
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
  • 這里會用到 queueBind() 方法
queueBind(String queue, String exchange, String routingKey)

參數解釋:

  1. queue:對類名稱
  2. exchange:交換機名稱
  3. routingKey:路由 key,路由規則
  • 如果交換機類型為 fanoutroutingKey 設置為 “”,表示每個消費者都能收到全部信息

發送消息

String msg = "hello fanout...";  
// 第二個參數 routingKey 為空。因為這是廣播模式,交換機收到消息后需要全部轉發(綁定的時候設為空,發送的時候也為空  
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());  
System.out.println("消息發送成功!");
  • 這里會用到 basicPublish() 方法
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)

參數解釋:

  1. Exchange:交換機名稱
  2. routingKey:如果交換機類型為 fanoutroutingKey 設置為 “”,表示每個消費者都能收到全部信息

完整代碼

package rabbitmq.fanout;  import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Producer {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立連接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 開啟信道  Channel channel = connection.createChannel();  //3. 聲明交換機  /*  Exchange.DeclareOk exchangeDeclare(String exchange,        BuiltinExchangeType type,        boolean durable,        boolean autoDelete,        boolean internal,        Map<String, Object> arguments) throws IOException;        參數解釋:  exchange:交換機名稱  type:交換機類型  DIRECT("direct"),定向,直連,routing  FANOUT("fanout"),扇形(廣播),每個隊列都能收到消息  TOPIC("topic"),通配符  HEADERS("headers"),參數匹配(工作中用的少)  durable:是否持久化  autoDelete:自動刪除  internal:內部使用(一般false)  arguments:參數  */        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);  // 4. 聲明隊列  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);  // 5. 綁定交換機和隊列  channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");  channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");  // 6. 發布消息  String msg = "hello fanout...";  // 第二個參數 routingKey 為空。因為這是廣播模式,交換機收到消息后需要全部轉發(綁定的時候設為空,發送的時候也為空  channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());  System.out.println("消息發送成功!");  // 7. 釋放資源  channel.close();  connection.close();  }  
}

消費者代碼

主要的步驟為:

  1. 創建 Channel
  2. 接收消息,并處理

完整代碼

package rabbitmq.fanout;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  public class Consumer1 {  public static void main(String[] args) throws IOException, TimeoutException {  // 1. 建立連接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  // 2.建立信道  Channel channel = connection.createChannel();  // 3.聲明隊列(如果隊列已經存在,就不會創建;不存在就會創建)  channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);  // 4. 消費信息  DefaultConsumer consumer = new DefaultConsumer(channel) {  // 從隊列中收到消息,就會執行的方法  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("接收到消息:" + new String(body));  }  };  channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);  }  }

運行程序

啟動生產者

  1. 消息全轉發

image.png|424

  • 我們可以看到兩個隊列中分別有了一條消息
  • 這就是發布訂閱模式,他會把收到的消息都轉發
  1. 交換機綁定了隊列
    image.png|374
  • 這里,我們可以看到交換機和隊列之間的綁定關系

啟動消費者

消費者 1:

接收到消息:hello fanout...

消費者 2:

接收到消息:hello fanout...

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/82868.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/82868.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/82868.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Codeforces Round 998 (Div. 3)

A. Fibonacciness 題目大意 給你四個數字abde&#xff0c;讓你找到一個中間值c&#xff0c;問 a b c a b c abc &#xff0c; b c d b c d bcd &#xff0c; c d e c d e cde 最多能有幾個式子成立 解題思路 顯然最多就六種情況&#xff0c;暴力枚舉即可 代…

火山引擎發展初始

火山引擎是字節跳動旗下的云計算服務品牌&#xff0c;其云服務業務的啟動和正式商業化時間線如下&#xff1a; 1. **初期探索&#xff08;2020年之前&#xff09;** 字節跳動在早期為支持自身業務&#xff08;如抖音、今日頭條等&#xff09;構建了強大的基礎設施和技術中…

【認知思維】光環效應:第一印象的持久力量

什么是光環效應 光環效應&#xff08;Halo Effect&#xff09;是指人們傾向于讓對某人或某物的一個顯著特質的印象影響對其他特質的評價的認知偏差。簡單來說&#xff0c;當我們對某人的一個特質&#xff08;如外表、智力或某項技能&#xff09;形成積極印象時&#xff0c;我們…

Java Solon v3.3.0 發布(國產優秀應用開發基座)

Solon 框架&#xff01; Solon 是新一代&#xff0c;Java 企業級應用開發框架。從零開始構建&#xff08;No Java-EE&#xff09;&#xff0c;有靈活的接口規范與開放生態。采用商用友好的 Apache 2.0 開源協議&#xff0c;是“杭州無耳科技有限公司”開源的根級項目&#xff…

力扣-104.二叉樹的最大深度

題目描述 給定一個二叉樹 root &#xff0c;返回其最大深度。 二叉樹的 最大深度 是指從根節點到最遠葉子節點的最長路徑上的節點數。 class Solution { public:int maxDepth(TreeNode* root) {if(!root){return 0;}return max(maxDepth(root->left), maxDepth(root->…

單反和無反(私人筆記)

① 單反相機&#xff1a; 定義&#xff1a; 單反相機&#xff08;Single-Lens Reflex&#xff0c;SLR&#xff09;是一種帶有反光鏡結構的數碼相機。光線通過鏡頭進入后&#xff0c;先被反光鏡反射到五棱鏡/五面鏡&#xff0c;再通過取景器進入人眼。按下快門時&#xff0c;反…

超詳細講解C語言轉義字符\a \b \r \t \? \n等等

轉義字符 C語言有一組字符很特殊&#xff0c;叫做轉義字符&#xff0c;顧名思義&#xff0c;改變原來的意思的字符。 1 \? ??)是一個三字母詞&#xff0c;在以前的編譯器它會被編譯為] (??會被編譯為[ 因此在以前輸入(are you ok ??)就會被編譯為are you ok ] 解決這個…

Java Spring MVC -01

SpringMVC 是一種基于 的實現 MVC 設計模式的請求驅動類型的輕量級 Web 框架&#xff0c;屬于 Spring FrameWork 的后續產品&#xff0c;已經融合在 Spring Web Flow 中。 First:SpringMVC-01-SpringMVC 概述 SpringMVC 是 Spring 框架的一個模塊&#xff0c;用于構建 Web 應…

Spring MessageSource 詳解:如何在國際化消息中傳遞參數

在開發多語言應用程序時,Spring 的 MessageSource 是處理國際化(i18n)文本的核心組件。它允許我們根據用戶的 Locale (區域設置) 顯示不同的消息。然而,很多時候我們的消息并不是靜態的,而是需要包含動態數據,比如用戶名、數量、文件名等。這時,我們就需要在獲取國際化消…

Datawhale 5月llm-universe 第1次筆記

課程地址&#xff1a;GitHub - datawhalechina/llm-universe: 本項目是一個面向小白開發者的大模型應用開發教程&#xff0c;在線閱讀地址&#xff1a;https://datawhalechina.github.io/llm-universe/ 難點&#xff1a;配置conda環境變量 我用的vscode github方法 目錄 重要…

基于Java的家政服務平臺設計與實現(代碼+數據庫+LW)

摘 要 現代經濟快節奏發展以及不斷完善升級的信息化技術&#xff0c;讓傳統數據信息的管理升級為軟件存儲&#xff0c;歸納&#xff0c;集中處理數據信息的管理方式。本家政服務平臺就是在這樣的大環境下誕生&#xff0c;其可以幫助管理者在短時間內處理完畢龐大的數據信息&a…

Android中LinearLayout線性布局使用詳解

Android中LinearLayout線性布局使用詳解 LinearLayout&#xff08;線性布局&#xff09;是Android中最基礎、最常用的布局之一&#xff0c;它按照水平或垂直方向依次排列子視圖。 基本特性 方向性&#xff1a;可以設置為水平(horizontal)或垂直(vertical)排列權重&#xff1…

LVS+keepalived實戰案例

目錄 部署LVS 安裝軟件 創建VIP 創建保存規則文件 給RS添加規則 驗證規則 部署RS端 安裝軟件 頁面內容 添加VIP 配置系統ARP 傳輸到rs-2 客戶端測試 查看規則文件 實現keepalived 編輯配置文件 傳輸文件給backup 修改backup的配置文件 開啟keepalived服務 …

(C語言)超市管理系統(測試版)(指針)(數據結構)(二進制文件讀寫)

目錄 前言&#xff1a; 源代碼&#xff1a; product.h product.c fileio.h fileio.c main.c 代碼解析&#xff1a; fileio模塊&#xff08;文件&#xff08;二進制&#xff09;&#xff09; 寫文件&#xff08;保存&#xff09; 函數功能 代碼逐行解析 關鍵知識點 讀文…

ubuntu----100,常用命令2

目錄 文件與目錄管理系統信息與管理用戶與權限管理網絡配置與管理軟件包管理打包與壓縮系統服務與任務調度硬件信息查看系統操作高級工具開發相關其他實用命令 在 Ubuntu 系統中&#xff0c;掌握常用命令可以大幅提升操作效率。以下是一些常用的命令&#xff0c;涵蓋了文件管理…

WiFi密碼查看器打開軟件自動獲取數據

相信有很大一部分人都不知道怎么看已經連過的WiFi密碼。 你還在手動查詢自己的電腦連接過得WiFi密碼嗎&#xff1f; —————【下 載 地 址】——————— 【本章單下載】&#xff1a;https://drive.uc.cn/s/dbbedf933dad4 【百款黑科技】&#xff1a;https://ucnygalh6…

開目新一代MOM:AI賦能高端制造的破局之道

導讀 INTRODUCTION 在高端制造業智能化轉型的深水區&#xff0c;企業正面臨著個性化定制、多工藝場景、動態生產需求的敏捷響應以及傳統MES柔性不足的考驗……在此背景下&#xff0c;武漢開目信息技術股份有限公司&#xff08;簡稱“開目軟件”&#xff09;正式發布新一代開目…

Android開發-視圖基礎

在Android應用開發中&#xff0c;視圖&#xff08;View&#xff09;是構建用戶界面的基本元素。無論是按鈕、文本框還是復雜的自定義控件&#xff0c;它們都是基于View類或其子類實現的。掌握視圖的基礎知識對于創建功能強大且美觀的應用至關重要。本文將深入探討Android中的視…

無人機信號線被電磁干擾導致停機

問題描述&#xff1a; 無人機飛控和電調之間使用PWM信號控制時候&#xff0c;無人機可以正常起飛&#xff0c;但是在空中懸停的時候會出現某一個電機停機&#xff0c;經排查電調沒有啟動過流過壓等保護&#xff0c;定位到電調和飛控之間的信號線被干擾問題。 信號線被干擾&am…

VSCode設置SSH免密登錄

引言 2025年05月13日20:21:14 原來一直用的PyCharn來完成代碼在遠程服務器上的運行&#xff0c;但是PyCharm時不時同步代碼會有問題。因此&#xff0c;嘗試用VSCode來完成代碼SSH遠程運行。由于VSCode每次進行SSH連接的時候都要手動輸入密碼&#xff0c;為了解決這個問題在本…