🎉歡迎大家觀看AUGENSTERN_dc的文章(o゜▽゜)o☆??
🎉感謝各位讀者在百忙之中抽出時間來垂閱我的文章,我會盡我所能向的大家分享我的知識和經驗📖
🎉希望我們在一篇篇的文章中能夠共同進步!!!
🌈個人主頁:AUGENSTERN_dc
🔥個人專欄:C語言?|Java | 數據結構?| 算法 | MySQL?| RabbitMQ?| Redis
?個人格言:
一重山有一重山的錯落,我有我的平仄
一筆鋒有一筆鋒的著墨,我有我的舍得
接下來,我會向大家介紹如何快速入門RabbitMQ,以及如何編寫一個簡單的RabbitMQ代碼
1. 引入依賴
在編寫我們的代碼之前,我們需要引入RabbitMQ的依賴:
如果你使用的是Maven, 你可以使用以下依賴:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
?2. 生產者消費者模型
在編寫代碼之前,大家需要了解生產者消費者模型:
生產者-消費者模型(Producer-Consumer Model): 是一種經典的多線程同步問題,用于解決生產者線程和消費者線程之間的數據共享和同步問題。它在多線程編程、并發編程以及分布式系統中被廣泛應用。
2.1 主要角色
生產者-消費者模型涉及兩個主要角色:
1. 生產者(producer)
- 負責生成數據并將其放入緩沖區(Buffer)。
- 如果緩沖區已滿,生產者需要等待,直到緩沖區有空間可以存放數據。
2. 消費者(consumer)
- 負責從緩沖區中取出數據并消費。
- 如果緩沖區為空,消費者需要等待,直到緩沖區中有數據可以消費。
緩沖區(Buffer)是一個共享資源,用于存儲生產者生成的數據,供消費者消費。
2.2?關鍵問題
生產者-消費者模型需要解決以下兩個關鍵問題:
1. 互斥訪問:
- 多個線程(生產者和消費者)需要訪問共享的緩沖區,因此需要確保對緩沖區的訪問是互斥的,避免數據競爭和不一致。
2. 同步問題:
- 生產者需要在緩沖區有空間時才能生產數據。
- 消費者需要在緩沖區有數據時才能消費數據。
- 需要一種機制來協調生產者和消費者之間的同步。
3. 編寫生產者代碼
3.1 創建連接
要想使用創建一個生產者,首先需要將生產者和RabbitMQ的服務器進行連接
// 1. 創建連接??
ConnectionFactory factory = new ConnectionFactory();
//2. 設置參數
factory.setHost("你的RabbitMQ服務器IP");//ip 默認值localhost
factory.setPort(5672); //默認值5672
factory.setVirtualHost("test");//虛擬機名稱, 默認 /
factory.setUsername("guest");//??名,默認guest
factory.setPassword("guest");//密碼, 默認guest
//3. 創建連接Connection
Connection connection = factory.newConnection();
RabbitMQ 默認的?于客?端連接的TCP 端?號是5672, 需要提前進?開放
3.2 創建Channel
//4. 創建channel通道
Channel channel = connection.createChannel();
?產者和消費者創建的channel并不是同?個
3.3 聲明一個隊列Queue
/*queueDeclare(String queue, boolean durable, boolean exclusive, booleanautoDelete, Map<String, Object> arguments)1.queue: 隊列名稱2.durable: 是否持久化.true-設置隊列為持久化, 待久化的隊列會存盤,服務器重啟之后, 消息不丟失。3.exclusive:* 是否獨占, 只能有?個消費者監聽隊列* 當Connection關閉時, 是否刪除隊列4.autoDelete: 是否?動刪除, 當沒有Consumer時, ?動刪除掉5.arguments: ?些參數
*/
//如果沒有?個hello_world 這樣的?個隊列, 會?動創建, 如果有, 則不創建
channel.queueDeclare("hello",true,false,false,null);
3.4 發送消息
當一個新的RabbitMQ節點啟動時,他會預聲明(declare)幾個內置的交換機, 內置交換機名稱是空字符串(""), 生產者發送的消息會根據隊列名稱直接路由到對應的隊列.
例如: 如果有?個名為 "hello" 的隊列, ?產者可以直接發送消息到 "hello" 隊列, ?消費者可以從 "hello" 隊列中接收消息, ?不需要關?交換機的存在. 這種模式?常適合簡單的應?場景,其中?產者和消費者之間的通信是?對?的.
?
//6. 通過channel發送消息到隊列中
/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)1.exchange: 交換機名稱, 簡單模式下, 交換機會使?默認的""2.routingKey: 路由名稱, routingKey = 隊列名稱3.props: 配置信息4.body: 發送消息的數據
*/
String msg = "Hello World";
//使?的是內置交換機. 使?內置交換機時, routingKey要和隊列名稱?樣, 才可以路由到對應的隊列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息發送成功");
3.5 釋放資源
//顯式地關閉Channel是個好習慣, 但這不是必須的, Connection關閉的時候,Channel也會?動關閉.
channel.close();
connection.close();
4. 編寫消費者代碼
4.1 創建連接
和生產者類似, 想要接收RabbtiMQ的消息, 首先需要和RabbitMQ建立一個連接
//1. 創建連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的RabbitMQ服務器IP");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("test");
?這里要注意和生產者使用同一個虛擬機
4.2 創建Channel
//2. 創建Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
4.3 聲明隊列
//3. 聲明隊列
channel.queueDeclare("hello", true, false, false, null);
?這里需要注意, 要和生產者使用同一個隊列, 這樣生產者發送的消息, 才能被消費者正常接收
4.4 消費資源
//4. 消費資源
/**
* 參數說明:
* consumerTag : 消費者標簽, 通常是消費者在訂閱隊列時指定的.
* envelope : 包含消息的封包信息,如隊列名稱, 交換機等.
* properties : ?些配置信息
* body : 消息的具體內容
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {//從隊列中收到消息就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}
};
/**
* 參數說明:
* queue: 隊列名稱
* autoAck: 是否自動確認
* callback: 接收到消息后, 執行的邏輯是什么
*/
channel.basicConsume("hello", true, consumer);
這里的DefaultConsumer 是 RabbitMQ提供的?個默認消費者, 實現了Consumer 接?.
Consumer ?于定義消息消費者的?為. 當我們需要從RabbitMQ接收消息時, 需要提供?個實現了Consumer 接?的對象.
4.5 釋放資源
// 5. 釋放資源
channel.close();
connection.close();
當我們運行生產者代碼時, 就會向RabbitMQ服務器發送一條消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");Connection connection = factory.newConnection();//2. 開啟通道Channel channel = connection.createChannel();//3. 聲明交換機 使用內置的交換機//4. 聲明隊列/*** 參數說明:* queue: 隊列名稱* durable: 可持久化* exclusive: 是否獨占* autoDelete: 是否自動刪除* arguments: 參數*/channel.queueDeclare("hello", true, false, false, null);//5. 發送消息/*** 參數說明:* exchange: 交換機名稱* routingKey: 路由的規則, 使用內置交換機, routingKey和隊列名稱保持一致* props: 屬性配置* body: 要發送的消息*/String msg = "hello rabbitmq";channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息發送成功!!");//6. 資源釋放channel.close();connection.close();}
}
當我運行消費者代碼時, 就會從RabbitMQ服務器中獲取一條消息
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");//2. 創建ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare("hello", true, false, false, null);//4. 消費資源/*** 參數說明:* queue: 隊列名稱* autoAck: 是否自動確認* callback: 接收到消息后, 執行的邏輯是什么*/DefaultConsumer consumer = new DefaultConsumer(channel) {//從隊列中收到消息就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);//5. 關閉資源
// Thread.sleep(1000);channel.close();connection.close();}
}
依次運行生產者消費者代碼, 就能得到以下結果
以上就是本章的所有內容, 謝謝大家觀看!!