前言
本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什么意思呢?我們的生產者發送一條消息,投遞到RabbitMQ集群也就是Broker。
我們的消費端進行監聽RabbitMQ,當發現隊列中有消息后,就進行消費。
1. 環境準備
本次整合主要采用SpringBoot框架,需要對SpringBoot的使用有一定了解。
2.大概步驟
我們來看下大概步驟:
- ConnectionFacorty:獲取連接工廠
- Connection:一個連接
- Channel:數據通信信道,可發送和接收消息
- Queue:具體的消息存儲隊列
- Producer & Consumer 生產者和消費者
這個連接工廠需要配置一些相應的信息,例如: RabbitMQ節點的地址,端口號,VirtualHost等等。
Channel是我們RabbitMQ所有消息進行交互的關鍵。
3. 項目實戰
3.1 連接工廠
public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//amqp協議 端口 類似與mysql的3306//設置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/vhost_cp");factory.setUsername("user_cp");factory.setPassword("123456");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;}
}
3.2 生產端
public class Producer {public static void main(String[] args) throws Exception {System.out.println("Producer start...");//1 創建ConnectionFactoryConnection connection = ConnectionUtils.getConnection();//2 通過connection創建一個ChannelChannel channel = connection.createChannel();//3 通過Channel發送數據
for(int i=0; i < 5; i++){String msg = "Hello RabbitMQ!";//1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes());}//4 記得要關閉相關的連接channel.close();connection.close();}
}
3.3 消費端
public class Consumer {public static void main(String[] args) throws Exception {System.out.println("Consumer start...");//1 創建ConnectionFactoryConnection connection = ConnectionUtils.getConnection();//2通過connection創建一個ChannelChannel channel = connection.createChannel();//3聲明(創建)一個隊列String queueName = "test001";channel.queueDeclare(queueName, true, false, false, null);//4創建消費者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//5設置Channelchannel.basicConsume(queueName, true, queueingConsumer);while(true){//6 獲取消息Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消費端: " + msg);//Envelope envelope = delivery.getEnvelope();}}
}
3.4 源碼解析
channel.queueDeclare(queueName, true, false, false, null);
第一個參數:queue:隊列的名稱
第二個參數:durable 是否持久化。true消息會持久化到本地,保證重啟服務后消息不會丟失
第三個參數:exclusive :表示獨占方式,設置為true 在某些情景下有必要,例如:順序消費。表示只有一個channel可以去監聽,其他channel都不能夠監聽。目的就是為了保證順序消費。
第四個參數:autoDelete:隊列如果與Exchange未綁定,則自動刪除
第五個參數:arguments:擴展參數
channel.basicConsume(QUEUE_NAME, true, consumer);
第二個參數 autoAck:自動簽收消息
3.5 運行程序
(1)啟動消費端
(2)查看管控臺
可以看到已經有一個連接,一個信道,一個消費者等信息了。
可以看到信道目前的狀態是空閑狀態。
隊列中多了test001隊列。
關于管控臺的介紹可以看這篇文章:消息中間件——RabbitMQ(四)命令行與管控臺的基本操作!
(3)運行生產端
可以看到生產端發送完消息之后停下了,消費端迅速接收到了消息。也可以繼續通過管控臺觀察消費的情況。
(4) 問題
注意:
這里面可能有一個問題:為什么要先啟動消費端呢?
因為在消費端創建的隊列,我們必須要有隊列,才能夠發送消息。
另一個問題:在生產端代碼中:
channel.basicPublish("", "test001", null, msg.getBytes());
并沒有設置exchange,只設置了隊列名稱,消費端卻依然能夠消費到消息,這是為什么呢?
答:發消息的一定要指定Exchange,如果不指定Exchange或者Exchange為空的話,它會默認走第一個
它的路由規則:將相同命名的隊列Queue的消息路由過去,如果路由不過去,將會把消息刪除。