本章將帶大家來寫一個簡單的程序,使用 Java 創建RabbitMQ 的生產者和消費者
依賴引入
在 Maven 倉庫中輸入 amqp-client:
找到第一個 RabbitMQ Java Client ,點擊進去找到一個合適的版本然后將依賴引入到我們項目中的 pom.xml 文件中。
生產者代碼編寫
首先我們來回顧 RabbitMQ 的工作流程:
我們首先要建立連接Connection,通過 ConnectionFactory 來創建一個連接,在建立連接前我們需要設置好RabbitMQ 的參數:主機、端口號、用戶名、密碼、綁定的虛擬機。
//建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();
接著我們需要開啟信道 Channel,因為我們的Connection 是抽象的虛擬連接,并不是真正和RabbitMQ 虛擬機進行了連接,我們需要Connection 上的信道來進行通信:
//開啟信道Channel channel = connection.createChannel();
聲明交換機,當我們沒有顯式地聲明交換機的話,RabbitMQ 會使用默認的交換機,RabbitMQ默認會為每個連接自動創建一個名為 “”(空字符串)的直連交換機(Direct Exchange),所有隊列都會隱式綁定到該交換機,綁定的路由鍵(Routing Key)就是隊列名稱。
聲明隊列:
//聲明隊列,使用內置的交換機,如果隊列不存在會自動幫我們創建
channel.queueDeclare(Constants.HELLO_QUEUE, true, false, true, null);
參數介紹:
Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
queue: 隊列名稱
durable: 表示是否需要持久化,true 說明需要持久化,那么我們發送的消息就會進行落盤操作,如果RabbitMQ 重啟了也可以找到這個消息,即把消息保存在硬盤中,如果是 false ,表示不需要進行持久化操作,那么我們發送的消息就只會在內存中,如果 RabbitMQ 重啟了,消息也就不見了。
exclusive:表示是否獨占,true: 表示該隊列只對它的連接可見,且連接關閉時隊列自動刪除。
適用于臨時場景(如RPC回調隊列),確保隊列不被其他連接共享。
注意:若其他連接嘗試使用排他隊列,會拋異常。
autoDelete:表示是否自動刪除,true:當最后一個消費者取消訂閱(如斷開連接)時,隊列自動刪除。適用于動態隊列(如臨時任務隊列),無需手動清理。
注意:若隊列從未有過消費者,則不會觸發刪除。
arguments:設置隊列的高級參數,在后面的章節中會詳細展開。
發送消息:
for (int i = 0; i < 10; i++) {String msg = "hello work queue...." + i;channel.basicPublish("",Constants.HELLO_QUEUE, null, msg.getBytes());
}
參數說明:
void basicPublish(String exchange,
String routingKey,
BasicProperties props,
byte[] body) throws IOException;
exchange:交換機的名稱,由于上面我們沒有聲明交換機,所以這里填 “” 【空字符串】表示默認交換機
routingKey:路由鍵,也就說消息要發送到哪個隊列上,路由鍵即為標識,在下一章節中會詳細介紹。
props:屬性配置,這個在后面的章節中也會介紹
body:消息內容
資源釋放:
//6. 資源釋放
channel.close();
connection.close();
這里我們先釋放信道,然后再釋放連接。
注意:釋放連接之后,就不能釋放信道了!!!因為連接都不存在了,信道也不會存在。
因此你也可以只釋放連接。
運行程序之后,我們打開 RabbitMQ 的面板:
會發現我們成功將十條消息發送到隊列中
通過輸入Messages 數量,然后點擊 Get Message(s),就可以查看消息的詳細信息了。
消費者代碼編寫
和生產者類似,都需要先設置好虛擬機的參數,然后創建連接,創建信道:
//建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//開啟信道
Channel channel = connection.createChannel();
聲明隊列:
//聲明隊列
channel.queueDeclare(Constants.HELLO_QUEUE, true, false, true, null);
為什么消費者需要聲明隊列?
因為消費者要消費的前提是有隊列可以消費
在分布式系統中,生產者和消費者的代碼可能不會在同一臺機器上,如果生產者還沒有啟動,反而消費者先啟動了,消費者就會拋出異常,找不到隊列
因此為了避免發生上面的異常情況,我們也需要在消費者中聲明隊列,這樣,如果沒有隊列的話,RabbitMQ 就會幫我們創建隊列出來。
接收消息和處理消息:
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:" + new String(body));}
};
channel.basicConsume(Constants.HELLO_QUEUE, true, consumer);
處理消息的主邏輯我們通過 DefaultConsumer 來進行編寫,通過重寫 handleDelivery 方法,來實現我們需要的邏輯。
參數介紹:
/*** No-op implementation of {@link Consumer#handleDelivery}.*/
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException
{// no work to do
}
consumerTag :消費者表示,用于區分消費者,就像我們的身份證一樣
envelope:消息的元數據包裝對象。
properties:消息的附加屬性(元數據)
body:消息內容
最后我們可以不主動釋放連接,這樣我們就可以看到完整的打印信息了
RabbitMQ 的 Java 客戶端底層使用 ExecutorService 管理消費者線程。
這些線程默認是守護線程,因此主線程【main】結束后,它們會被 JVM 強制終止。
由于RabbitMQ 線程和 main 線程屬于多線程,因此根據多線程知識,我們可以知道如果主動釋放連接的話,main 線程就是立即結束,我們可能就看不到完整的信息了。