RaabitMQ 快速入門

🎉歡迎大家觀看AUGENSTERN_dc的文章(o゜▽゜)o☆??

🎉感謝各位讀者在百忙之中抽出時間來垂閱我的文章,我會盡我所能向的大家分享我的知識和經驗📖

🎉希望我們在一篇篇的文章中能夠共同進步!!!

🌈個人主頁:AUGENSTERN_dc

🔥個人專欄:C語言?|Java | 數據結構?| 算法 | MySQL?| RabbitMQ?| Redis

?個人格言:

一重山有一重山的錯落,我有我的平仄

一筆鋒有一筆鋒的著墨,我有我的舍得


接下來,我會向大家介紹如何快速入門RabbitMQ,以及如何編寫一個簡單的RabbitMQ代碼

1. 引入依賴

在編寫我們的代碼之前,我們需要引入RabbitMQ的依賴:

如果你使用的是Maven, 你可以使用以下依賴:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

?2. 生產者消費者模型

在編寫代碼之前,大家需要了解生產者消費者模型:

生產者-消費者模型(Producer-Consumer Model): 是一種經典的多線程同步問題,用于解決生產者線程和消費者線程之間的數據共享和同步問題。它在多線程編程、并發編程以及分布式系統中被廣泛應用。

2.1 主要角色

生產者-消費者模型涉及兩個主要角色:

1. 生產者(producer)

  • 負責生成數據并將其放入緩沖區(Buffer)。
  • 如果緩沖區已滿,生產者需要等待,直到緩沖區有空間可以存放數據。

2. 消費者(consumer)

  • 負責從緩沖區中取出數據并消費。
  • 如果緩沖區為空,消費者需要等待,直到緩沖區中有數據可以消費。

緩沖區(Buffer)是一個共享資源,用于存儲生產者生成的數據,供消費者消費。

2.2?關鍵問題

生產者-消費者模型需要解決以下兩個關鍵問題:

1. 互斥訪問

  • 多個線程(生產者和消費者)需要訪問共享的緩沖區,因此需要確保對緩沖區的訪問是互斥的,避免數據競爭和不一致。

2. 同步問題:

  • 生產者需要在緩沖區有空間時才能生產數據。
  • 消費者需要在緩沖區有數據時才能消費數據。
  • 需要一種機制來協調生產者和消費者之間的同步。

3. 編寫生產者代碼

3.1 創建連接

要想使用創建一個生產者,首先需要將生產者和RabbitMQ的服務器進行連接

// 1. 創建連接??
ConnectionFactory factory = new ConnectionFactory();
//2. 設置參數
factory.setHost("你的RabbitMQ服務器IP");//ip 默認值localhost
factory.setPort(5672); //默認值5672
factory.setVirtualHost("test");//虛擬機名稱, 默認 /
factory.setUsername("guest");//??名,默認guest
factory.setPassword("guest");//密碼, 默認guest
//3. 創建連接Connection
Connection connection = factory.newConnection();

RabbitMQ 默認的?于客?端連接的TCP 端?號是5672, 需要提前進?開放

3.2 創建Channel

//4. 創建channel通道
Channel channel = connection.createChannel();

?產者和消費者創建的channel并不是同?個

3.3 聲明一個隊列Queue

/*queueDeclare(String queue, boolean durable, boolean exclusive, booleanautoDelete, Map<String, Object> arguments)1.queue: 隊列名稱2.durable: 是否持久化.true-設置隊列為持久化, 待久化的隊列會存盤,服務器重啟之后, 消息不丟失。3.exclusive:* 是否獨占, 只能有?個消費者監聽隊列* 當Connection關閉時, 是否刪除隊列4.autoDelete: 是否?動刪除, 當沒有Consumer時, ?動刪除掉5.arguments: ?些參數
*/
//如果沒有?個hello_world 這樣的?個隊列, 會?動創建, 如果有, 則不創建
channel.queueDeclare("hello",true,false,false,null);

3.4 發送消息

當一個新的RabbitMQ節點啟動時,他會預聲明(declare)幾個內置的交換機, 內置交換機名稱是空字符串(""), 生產者發送的消息會根據隊列名稱直接路由到對應的隊列.

例如: 如果有?個名為 "hello" 的隊列, ?產者可以直接發送消息到 "hello" 隊列, ?消費者可以從 "hello" 隊列中接收消息, ?不需要關?交換機的存在. 這種模式?常適合簡單的應?場景,其中?產者和消費者之間的通信是?對?的.
?

//6. 通過channel發送消息到隊列中
/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)1.exchange: 交換機名稱, 簡單模式下, 交換機會使?默認的""2.routingKey: 路由名稱, routingKey = 隊列名稱3.props: 配置信息4.body: 發送消息的數據
*/
String msg = "Hello World";
//使?的是內置交換機. 使?內置交換機時, routingKey要和隊列名稱?樣, 才可以路由到對應的隊列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息發送成功");

3.5 釋放資源

//顯式地關閉Channel是個好習慣, 但這不是必須的, Connection關閉的時候,Channel也會?動關閉.
channel.close();
connection.close();

4. 編寫消費者代碼

4.1 創建連接

和生產者類似, 想要接收RabbtiMQ的消息, 首先需要和RabbitMQ建立一個連接

//1. 創建連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的RabbitMQ服務器IP");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("test");

?這里要注意和生產者使用同一個虛擬機

4.2 創建Channel

//2. 創建Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

4.3 聲明隊列

//3. 聲明隊列
channel.queueDeclare("hello", true, false, false, null);

?這里需要注意, 要和生產者使用同一個隊列, 這樣生產者發送的消息, 才能被消費者正常接收

4.4 消費資源

//4. 消費資源
/**
*  參數說明:
*  consumerTag : 消費者標簽, 通常是消費者在訂閱隊列時指定的.
*  envelope : 包含消息的封包信息,如隊列名稱, 交換機等.
*  properties : ?些配置信息
*  body : 消息的具體內容
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {//從隊列中收到消息就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}
};
/**
*  參數說明:
*  queue: 隊列名稱
*  autoAck: 是否自動確認
*  callback: 接收到消息后, 執行的邏輯是什么
*/
channel.basicConsume("hello", true, consumer);

這里的DefaultConsumer 是 RabbitMQ提供的?個默認消費者, 實現了Consumer 接?.

Consumer ?于定義消息消費者的?為. 當我們需要從RabbitMQ接收消息時, 需要提供?個實現了Consumer 接?的對象.

4.5 釋放資源

// 5. 釋放資源
channel.close();
connection.close();

當我們運行生產者代碼時, 就會向RabbitMQ服務器發送一條消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");Connection connection = factory.newConnection();//2. 開啟通道Channel channel = connection.createChannel();//3. 聲明交換機  使用內置的交換機//4. 聲明隊列/***  參數說明:*  queue: 隊列名稱*  durable: 可持久化*  exclusive: 是否獨占*  autoDelete: 是否自動刪除*  arguments: 參數*/channel.queueDeclare("hello", true, false, false, null);//5. 發送消息/***  參數說明:*  exchange: 交換機名稱*  routingKey: 路由的規則, 使用內置交換機, routingKey和隊列名稱保持一致*  props: 屬性配置*  body: 要發送的消息*/String msg = "hello rabbitmq";channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息發送成功!!");//6. 資源釋放channel.close();connection.close();}
}

當我運行消費者代碼時, 就會從RabbitMQ服務器中獲取一條消息

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");//2. 創建ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare("hello", true, false, false, null);//4. 消費資源/***  參數說明:*  queue: 隊列名稱*  autoAck: 是否自動確認*  callback: 接收到消息后, 執行的邏輯是什么*/DefaultConsumer consumer = new DefaultConsumer(channel) {//從隊列中收到消息就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);//5. 關閉資源
//        Thread.sleep(1000);channel.close();connection.close();}
}

依次運行生產者消費者代碼, 就能得到以下結果

以上就是本章的所有內容, 謝謝大家觀看!!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:
http://www.pswp.cn/bicheng/77034.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/77034.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/77034.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

語音識別——根據聲波能量、VAD 和 頻譜分析實時輸出文字

SenseVoiceSmall網絡結構圖 ASR(語音識別)是將音頻信息轉化為文字的技術。在實時語音識別中,一個關鍵問題是:如何決定將采集的音頻數據輸入大模型的最佳時機?固定時間間隔顯然不夠靈活,太短可能導致頻繁調用模型,太長則會延遲文字輸出。有沒有更智能的方式?答案是肯定…

AI大模型如何重塑科研范式:從“假說驅動”到“數據涌現”

??個人主頁??:慌ZHANG-CSDN博客 ????期待您的關注 ???? 一、引言:科研進入“模型共研”時代 傳統科研范式通常以“假設→實驗→驗證→理論”的方式推進,這一經典路徑建立在人類的認知能力與邏輯推理基礎上。然而,隨著數據規模的爆炸式增長與知識系統的高度復雜…

使用Python寫入JSON、XML和YAML數據到Excel文件

在當今數據驅動的技術生態中&#xff0c;JSON、XML和YAML作為主流結構化數據格式&#xff0c;因其層次化表達能力和跨平臺兼容性&#xff0c;已成為系統間數據交換的通用載體。然而&#xff0c;當需要將這類半結構化數據轉化為具備直觀可視化、動態計算和協作共享特性的載體時&…

面試題:Eureka和Nocas的區別

Eureka 與 Nacos 核心區別對比 一、功能定位與核心能力 ?維度??Eureka??Nacos??核心功能?專注服務注冊與發現&#xff0c;無配置管理功能?:ml-citation{ref“1,3” data“citationList”}集成服務注冊、發現、配置管理、動態DNS等?:ml-citation{ref“1,3” data“c…

2025年4月15日 百度一面 面經

目錄 1. 代理相關 從靜態代理到動態代理 2. cglib可以代理被final修飾的類嗎,為什么 3. JVM 體系結構 4. 垃圾回收算法 5. 什么是注解 如何使用 底層原理 6. synchronized和reentrantlock 7. 講一下你項目中 redis的分布式鎖 與java自帶的鎖有啥區別 8. post 請求和 ge…

AI改變生活

AI改變生活 人工智能&#xff08;AI&#xff09;在我們生活中的應用越來越廣泛&#xff0c;深刻地改變了我們的工作和生活方式。以下是一些AI實際應用的實例&#xff0c;以及它們如何影響我們的日常生活。 1. 智能助手 智能助手如Siri、Alexa和Google Assistant等&#xff0…

信奧賽之c++基礎(取模運算與數位分離)

?? 數字拆解大冒險——取模運算與數位分離魔法課 ?? 第一章:糖果分裝術——取模運算 ?? 分糖果游戲 7顆糖每人分3顆: 每人得到:7 / 3 = 2顆剩余糖果:7 % 3 = 1顆(%就是取模符號) 就像把糖果裝袋后剩下的零散糖粒!?? 取模運算說明書 算式比喻結果10 % 310顆糖分…

揭秘大數據 | 21、軟件定義計算

老夫先將這個小系列的前兩篇內容鏈接奉上&#xff0c;方便感興趣的朋友一氣讀之。 揭秘大數據 | 19、軟件定義的世界-CSDN博客 揭秘大數據 | 20、軟件定義數據中心-CSDN博客 今天&#xff0c;書接上文&#xff0c;開聊軟件定義計算的那些事兒&#xff01; 虛擬化是軟件定義…

FPGA-DDS技術的波形發生器

1.實驗目的 1.1掌握直接數字頻率合成&#xff08;DDS&#xff09;的基本原理及其實現方法。 1.2在DE2-115 FPGA開發板上設計一個可調頻率的正弦波和方波發生器&#xff0c;頻率范圍10Hz~5MHz&#xff0c;最小分辨率小于1kHz。 1.3使用Quartus II進行仿真&#xff0c;并通過S…

LeetCode[541]反轉字符串Ⅱ

思路&#xff1a; 題目給我們加了幾個規則&#xff0c;剩余長度小于2k&#xff0c;大于等于k就反轉k個&#xff0c;小于k就全部反轉&#xff0c;我們按照這個邏輯來就行。 第一就是大于等于k就反轉k個&#xff0c;我們for循環肯定是i2k了&#xff0c;接下來就是判斷是否大于等于…

實現定長的內存池

池化技術 所謂的池化技術&#xff0c;就是程序預先向系統申請過量的資源&#xff0c;然后自己管理起來&#xff0c;以備不時之需。這個操作的價值就是&#xff0c;如果申請與釋放資源的開銷較大&#xff0c;提前申請資源并在使用后并不釋放而是重復利用&#xff0c;能夠提高程序…

路由器原理與配置技術詳解

一、路由基礎原理 1.1 路由器的核心功能 網絡層設備&#xff1a;工作在OSI參考模型第三層&#xff0c;實現不同網絡間的互聯互通智能路徑選擇&#xff1a;基于路由表為數據包選擇最優傳輸路徑協議轉換&#xff1a;處理不同網絡接口間的協議差異&#xff08;如以太網與PPP&…

Leetcode 3518. Smallest Palindromic Rearrangement II

Leetcode 3518. Smallest Palindromic Rearrangement II 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;Leetcode 3518. Smallest Palindromic Rearrangement II 1. 解題思路 這一題是題目Leetcode 3517. Smallest Palindromic Rearrangement I的升級版本&#xff0c;其主要的…

大模型——Crawl4AI 中的數據提取策略

大模型——Crawl4AI 中的數據提取策略 在本章中,將詳細介紹在 Crawl4AI 中可用的數據提取策略。這些策略包括: LLMExtractionStrategy:用于詳細內容提取。JsonCssExtractionStrategy:使用 CSS 選擇器進行結構化數據檢索。CosineStrategy:基于余弦相似性進行有效的語義分段…

職坐標解碼互聯網行業轉型發展新動能

當前&#xff0c;互聯網行業正以前所未有的速度重塑全球產業格局。工信部最新數據顯示&#xff0c;我國互聯網企業營收連續三年保持雙位數增長&#xff0c;其中百強企業在人工智能、物聯網等領域的投入強度同比提升40%&#xff0c;展現出強勁的技術引領力。與此同時&#xff0c…

linux多線(進)程編程——(4)進程間的傳音術(命名管道)

前言&#xff08;前情回顧&#xff09; 進程君&#xff08;父進程&#xff09;在開發出匿名管道這門傳音術后&#xff0c;解決了和自己孩子&#xff08;子進程&#xff09;間的溝通問題&#xff0c;父子關系趨于融洽。和孩子溝通后&#xff0c;進程君發現&#xff0c;自己脫離…

在IDEA里面建立maven項目(便于java web使用)

具體步驟&#xff1a; 第一次有的電腦你再創建項目的時候右下角會提醒你彈窗&#xff1a;讓你下載沒有的東西 一定要下載&#xff01;&#xff01;可能會很慢 運行結果&#xff1a; 因為他是默認的8080端口所以在運行的時候輸入的url如下圖&#xff1a; 新建了一個controller代…

【13】數據結構之樹結構篇章

目錄標題 樹Tree樹的定義樹的基本概念樹的存儲結構雙親表示法孩子表示法孩子兄弟表示法 二叉樹二叉樹與度不超過&#xff12;的普通樹的不同之處二叉樹的基本形態二叉樹的分類二叉樹的性質 二叉樹的順序存儲二叉樹的鏈式存儲二叉樹的鏈式存儲的結點結構樹的遍歷先序遍歷中序遍歷…

雷達生命探測儀,地震救援的生命探測先鋒|鼎躍安全

在地震、山體滑坡、坍塌建筑等突發災害中&#xff0c;會嚴重摧毀建筑物&#xff0c;造成倒塌和人員被困&#xff1b;在瓦礫堆、混凝土板層中&#xff0c;受困人員的生命安全常常面臨嚴峻威脅。傳統救援手段通常存在響應時間長、監測精度有限等不足。 救援現場往往環境復雜&…

512天,倔強生長:一位技術創作者的獨白

親愛的讀者與同行者&#xff1a; 我是倔強的石頭_&#xff0c;今天是我在CSDN成為創作者的第512天。當系統提示我寫下這篇紀念日文章時&#xff0c;我恍惚間想起了2023年11月19日的那個夜晚——指尖敲下《開端——》的標題&#xff0c;忐忑又堅定地按下了“發布”鍵。那時的我…