文章目錄
- 一. RabbitMQ概述
- 1.1 同步/異步
- 1.1.1 同步調用
- 1.1.2 異步調用
- 1.2 消息中間件
- 1.2.1 概念
- 1.2.2 作用
- 1.2.3 常見的消息中間件
- 1.2.4 其他中間件
- 1.3 RabbitMQ
- 1.3.1 簡介
- 1.3.2 特點
- 1.3.3 方式
- 1.3.4 架構
- 1.3.5 運行流程
- 二. 安裝
- 2.1 Docker 安裝 RabbitMQ
- 三. 簡單隊列(Simple Queue)
- 3.1 消息模式
- 3.2 簡單隊列(Simple Queue)
- 3.2.1 面板操作
- 3.2.2 代碼操作
- 四. 工作隊列(Work Queue)
- 4.1 消息模式
- 4.2 工作隊列(Work Queue)
- 4.2.1 輪詢模式
- 4.2.2 公平模式
- 4.3 總結
- 五. 發布訂閱(Pub/Sub)
- 5.1 理解
- 5.1.1 發布訂閱模式具體實現
- 生產者
- 消費者
- 六. 路由模式(Routing)
- 6.1 圖形化界面理解
- 6.1.1 創建direct交換機
- 6.1.2 交換機與隊列綁定
- 6.1.3 模擬生產者發送消息
- 6.1.4 查看隊列消息
- 6.2 發布訂閱模式具體實現
- 6.2.1 生產者
- 6.1.2 消費者
- 七. 主題模式(Topic)
- 7.1 圖像界面理解
- 7.2 主題模式具體實現
- 7.2.1 生產者
- 7.2.2 消費者
- 八. SpringBoot集成RabbitMQ
一. RabbitMQ概述
1.1 同步/異步
1.1.1 同步調用
同步調用是指客戶端調用遠程服務時,需要等待服務端返回結果后才能繼續執行,典型的場景如遠程調用 HTTP 服務。
同步調用的優勢:時效性強,等到結果后才返回–需要查詢結果的通常用同步調用
同步調用的問題:
- 拓展性差–增加功能要改代碼
- 性能下降–調用鏈路長,每次都是阻塞等待上一個服務
- 級聯失敗問題–一個服務掛掉,整個鏈路上的服務都出問題
1.1.2 異步調用
異步調用是指客戶端調用遠程服務時,不需要等待服務端返回結果,而是直接返回一個代表請求的消息,典型的場景如消息隊列。當服務端處理完請求后,通過消息隊列通知客戶端。
異步調用的優勢:提升性能–不需要等待結果,可以繼續執行–適合處理耗時長的請求
1.2 消息中間件
1.2.1 概念
消息中間件(Message Queue,MQ)是一種應用程序之間的數據交換方式,它是一種分布式系統架構模式,用于在不同的應用程序之間傳遞消息。消息中間件的主要功能是實現應用間的松耦合,讓信息的發送方和接收方不需直連,而是通過消息中間件進行交互。
- 消息: 簡單的說就是軟件之間通訊時傳遞的數據,它可以是很簡單的數字、字母,也可以是很復雜的嵌套對象數據。
- 中間件:最簡單的理解是第三者,本來軟件A和軟件B間的通訊兩者直接傳遞消息就可以了,但是,此時中間件作為第三者,非要先讓軟件A通訊的消息先發給它,再由它發給軟件B(感覺就是中間商一樣),下面通過圖來更好的理解它們。
- 消息隊列:是消息中間件的一種實現方式。
總結:消息中間件則是將軟件與軟件之間的交互方式進行存儲和管理的一種技術,也可以看做是一種容器
1.2.2 作用
- 異步通信:消息隊列可以實現應用間的異步通信,應用只需要將消息放入隊列,不用等待回復,就可以繼續執行。
比如我們最常見的短信驗證碼功能,當我們在界面點擊“獲取驗證碼”后,我們還可以同時進行其他的操作,如輸入更新的密碼等,此時,我們不需要一直等到手機收到短信了才進行下一步的操作,這就是異步處理,提高了用戶體驗。
- 解耦合:通過消息隊列解耦合應用,應用之間不再需要直連,而是通過消息隊列進行通信。
比如常見的訂單系統,當有訂單下單時,我們需要減去庫存,但如果訂單、庫存的邏輯都放在一個系統中,不止處理事件需要很長,系統的耦合性比較高,此時,使用消息中間件,可以實現將訂單業務和庫存業務抽出來做不同的系統,每次下單的時候可以將下單信息放入消息中間間中,然后庫存系統去訂閱它,只有有訂單數據就進行減去庫存操作,這樣就將應用解耦了
- 削峰填谷:通過消息隊列可以有效地削峰填谷,避免應用因消息處理過慢而出現性能問題。
如常見的秒殺系統,如果有5萬個商品可以秒殺,沒有消息中間件的話,所有的請求都一次性到后臺,此時系統很容易卡死,引入消息中間件如消息隊列,此時可以在隊列中設置好可以存儲數據的數量,這樣每次用戶請求會先但消息隊列中,消息隊列就減去1,當消息隊列中存儲長度為0時,直接返回秒殺失敗,這樣就避免了所有用戶請求可能在同一時間到達系統后臺,達到流量削峰的作用
- 廣播消費:消息隊列可以實現廣播消費,一個消息可以被多個消費者消費
1.2.3 常見的消息中間件
- ActiveMQ:Apache 出品,主要用于企業級的消息中間件,支持多種協議,包括 AMQP、MQTT、STOMP 等。
- RabbitMQ:RabbitMQ 是一款開源的消息隊列軟件,由 Erlang 語言編寫,基于 AMQP 協議。
- Kafka:Apache 出品,主要用于大數據實時處理,支持多種協議,包括 Apache Kafka、Apache Pulsar 等。
- RocketMQ:阿里巴巴開源的消息隊列軟件,主要用于微服務架構。
1.2.4 其他中間件
- 分布式消息中間件:RocketMQ、Kafka、ActiveMQ 等。
- 負載均衡中間件:Nginx、LVS、HAProxy 等。
- 緩存中間件:Redis、Memcached 等。
- 數據庫中間件:MySQL、MongoDB 等。
- 搜索引擎中間件:ElasticSearch、Solr 等。
- 日志中間件:Logstash、Flume 等。
- 容器中間件:Docker、Kubernetes 等。
1.3 RabbitMQ
RabbitMQ官網
1.3.1 簡介
RabbitMQ 是一款開源的消息隊列軟件,由 Erlang 語言編寫,基于 AMQP 協議。RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲和轉發消息。RabbitMQ 是一個在分布式系統中用于存儲和轉發消息的消息隊列,它可以實現可靠的消息傳遞,支持多種消息隊列協議,包括 AMQP、STOMP、MQTT 等。RabbitMQ 是一個非常靈活的消息隊列,可以支持多種應用場景,如任務隊列、事件驅動、數據流、消息分發等。
1.3.2 特點
- 高可用性:RabbitMQ 集群保證消息的高可用性,即使部分節點發生故障,也能保證消息的傳遞。
- 靈活的路由機制:RabbitMQ 支持多種路由機制,包括點對點、發布/訂閱、主題等。
- 多種協議支持:RabbitMQ 支持多種消息隊列協議,包括 AMQP、STOMP、MQTT 等。
- 多種語言客戶端:RabbitMQ 提供多種語言的客戶端,如 Java、.NET、Python、Ruby 等。
- 管理界面:RabbitMQ 提供了一個易用的管理界面,可以直觀地查看消息隊列的狀態。
- 多種插件支持:RabbitMQ 提供了許多插件,可以實現各種功能,如消息持久化、消息確認、消息集群、Web 管理界面等。
1.3.3 方式
- 點對點(P2P):點對點通信,一個生產者發送消息到一個隊列,一個消費者從同一個隊列中接收消息。
- 發布/訂閱(Pub/Sub):發布/訂閱通信,一個生產者發送消息到一個交換機,多個消費者從同一個交換機訂閱同一個主題的消息。
- 主題(Topic):主題通信,一個生產者發送消息到一個主題交換機,多個消費者從同一個主題交換機訂閱同一個主題的消息。
1.3.4 架構
- Server:又稱Broker, RabbitMQ 服務器,用于存儲、轉發消息。
- 連接器(Connector):用于客戶端和 RabbitMQ 服務器之間的網絡連接。
- 生產者(Producer):消息的發送方,向 RabbitMQ 隊列中發送消息。
- 交換機(Exchange):消息交換機,用于接收生產者的消息并將其路由到隊列。
- 隊列(Queue):消息隊列,存儲消息直到消費者取出并處理。
- 綁定(Binding):綁定,用于將交換機和隊列進行關聯。
- 路由鍵(Routing Key):路由鍵,用于指定消息的路由規則。
- 消費者(Consumer):消息的接收方,從 RabbitMQ 隊列中接收消息并處理。
- 虛擬主機(Virtual Host):虛擬主機,用于隔離不同用戶的權限。
- 信道(Channel):信道,用于連接到 RabbitMQ 服務器,并進行消息的傳輸。
1.3.5 運行流程
- 1.生產者將消息發送到交換機。
- 2.交換機根據路由規則將消息路由到隊列。
- 3.隊列將消息存儲在內存中。
- 4.消費者從隊列中獲取消息并處理。
- 5.消費者確認消息已被處理。
- 6.RabbitMQ 服務器將消息發送給消費者。
二. 安裝
2.1 Docker 安裝 RabbitMQ
1.拉取鏡像或加載本地鏡像
docker pull rabbitmq:management
或
docker load -i rabbitmq.tar
2.創建數據卷
docker volume create mq-plugins
3.運行容器
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
相關信息
- -e RABBITMQ_DEFAULT_USER=admin:設置默認用戶名為 admin
- -e RABBITMQ_DEFAULT_PASS=123456:設置默認密碼為 123456
- -v mq-plugins:/plugins:掛載數據卷,用于存儲插件
- –name mq:設置容器名稱為 mq
- –hostname mq1:設置主機名為 mq1
- -p 15672:15672:將容器的 15672 端口映射到主機的 15672 端口
- -p 5672:5672:將容器的 5672 端口映射到主機的 5672 端口
- -d:后臺運行容器
- rabbitmq:3.8-management:指定鏡像版本為 3.8-management
4.驗證安裝
訪問 http://服務器IP:15672
輸入用戶名和密碼,默認用戶名為 admin,密碼為 123456
三. 簡單隊列(Simple Queue)
3.1 消息模式
參考官網
3.2 簡單隊列(Simple Queue)
一個生產者對應一個消費者,消息直接發送到隊列。
官方的HelloWorld是基于最基礎的消息隊列模型來實現的,只包括三個角色:
- publisher:消息發布者,將消息發送到隊列queue
- queue:消息隊列,負責接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
3.2.1 面板操作
1.創建一個隊列
2.在默認交換處模擬生產者發送消息 因為該隊列綁定的是默認交換機,所以消息會直接發送到隊列中。
注:Routing Key 寫隊列名
3.隊列處查看消息
4.模擬消費者接收消息(查看消息內容)
AckMode : 應答模式
- Nack: 不應答,只查看,消息不會移除隊列
- Ack: 應答模式,查看并移除隊列
3.2.2 代碼操作
1.導入依賴
- java原生依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
- Spring依賴
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.5.RELEASE</version></dependency>
- Spring Boot 依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
根據自己的項目環境進行選擇即可
2.定義生產者
package com.syh.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;/*** @author shan* @date 2024/5/16 14:39*/
public class Producer {public static void main(String[] args) {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory();// 設置RabbitMQ地址factory.setHost("47.120.37.156");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");Connection connection = null;Channel channel = null;try {// 創建連接connection = factory.newConnection();// 創建通道channel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */channel.queueDeclare("hello", false, false, false, null);// 6: 發送消息String message = "Hello World!";// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange,會有一個默認交換機// @params2: 隊列名稱/routing// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");} catch (Exception e) {e.printStackTrace();System.out.println(" [x] Unexpected exception: " + e.getMessage());} finally {// 關閉連接和通道if (connection!= null && channel.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
執行發送,這個時候可以在web控制臺查看到這個隊列queue的信息。
3.定義消費者
package com.syh.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author shan* @date 2024/5/16 14:48*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數,分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("47.120.37.156");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創建通道ChannelChannel channel = connection.createChannel();// 3.創建隊列String queueName = "hello";channel.queueDeclare(queueName, false, false, false, null);// 4.訂閱消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.處理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
執行消費者,這個時候可以看到控制臺輸出接收到的消息。
四. 工作隊列(Work Queue)
4.1 消息模式
參考官網
4.2 工作隊列(Work Queue)
工作隊列(Work Queue)模式是一種消息模式,它將任務分派給多個消費者,每個消費者都可以獨立地處理任務。
當有多個消費者時,我們的消息會被哪個消費者消費呢,我們又該如何均衡消費者消費信息的多少呢? RabbitMQ提供了兩種工作隊列模式:
- 輪詢模式(Round-robin):每個消費者都輪流接收消息,平均分配消息;
- 公平模式(Fair dispatch):每個消費者都有相同的權重,按比例分配消息;
4.2.1 輪詢模式
- 輪詢模式是最簡單的工作隊列模式,每個消費者都接收到相同數量的消息,但消息的順序是不確定的。
- 輪詢模式適用于消費者數量固定的情況,消費者的數量越多,平均分配消息的數量越少。
- 輪詢模式的優點是簡單,缺點是消息的順序不確定。
輪詢模式的實現
- 我們需要創建一個隊列,并將消息發送到隊列中。
- 創建多個消費者,并將它們綁定到同一個隊列上。
1.生產者
public class Producer {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 6: 準備發送消息的內容//===============================end topic模式==================================for (int i = 1; i <= 20; i++) {//消息的內容String msg = "學相伴:" + i;// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange// @params2: 隊列名稱/routingkey// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish("", "queue1", null, msg.getBytes());}System.out.println("消息發送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
2.消費者1
public class Work1 {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("消費者-Work1");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義
// channel.queueDeclare("queue1", false, false, false, null);// 同一時刻,服務器只會推送一條消息給消費者// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(2000);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work1-開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
3.消費者2
public class Work2 {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("消費者-Work2");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義//channel.queueDeclare("queue1", false, true, false, null);// 同一時刻,服務器只會推送一條消息給消費者//channel.basicQos(1);// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(200);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work2-開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
4.運行結果
Work1-開始接受消息
Work2-開始接受消息
Work1-收到消息是:學相伴:1
Work2-收到消息是:學相伴:1
Work1-收到消息是:學相伴:2
Work2-收到消息是:學相伴:2
Work1-收到消息是:學相伴:3
4.2.2 公平模式
- 公平模式是一種更復雜的工作隊列模式,每個消費者都有相同的權重,按比例分配消息。
- 公平模式適用于消費者數量不固定的情況,消費者的數量越多,平均分配消息的數量越多。
- 公平模式的優點是消息的順序是確定的,缺點是分配消息的數量不確定。
公平模式的實現
- 我們需要創建一個隊列,并將消息發送到隊列中。
- 創建多個消費者,并將它們綁定到同一個隊列上。
1.生產者
public class Producer {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 6: 準備發送消息的內容//===============================end topic模式==================================for (int i = 1; i <= 20; i++) {//消息的內容String msg = "學相伴:" + i;// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange// @params2: 隊列名稱/routingkey// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish("", "queue1", null, msg.getBytes());}System.out.println("消息發送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
2.消費者1
public class Work1 {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("消費者-Work1");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義
// channel.queueDeclare("queue1", false, false, false, null);// 同一時刻,服務器只會推送一條消息給消費者// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(2000);finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work1-開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
3.消費者2
public class Work2 {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("消費者-Work2");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義//channel.queueDeclare("queue1", false, true, false, null);// 同一時刻,服務器只會推送一條消息給消費者//channel.basicQos(1);// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(200);finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work2-開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
4.運行結果
4.3 總結
工作模式下的輪詢: - 應答模式為自動應答 finalChannel.basicConsume(“queue1”, true, …) 工作模式下的公平: - finalChannel.basicQos(1); // 設置一次只接收一條消息 - 應答模式為手動應對 finalChannel.basicConsume(“queue1”, false, …) - finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 確認消息,手動應答
五. 發布訂閱(Pub/Sub)
5.1 理解
角色:
p : Product 生產者 發布消息
X : 交換機
Q : 多個隊列
C: Consumer 消費者 訂閱消息
1,創建fanout類型的交換機
2.創建多個隊列
3.綁定隊列和交換機
或者隊列處也可以綁定
4.在交換機處模擬生產者發送消息
5.查看隊列中的消息數量
消息數量增加了
5.1.1 發布訂閱模式具體實現
- 類型:fanout
- 特點:Fanout—發布與訂閱模式,是一種廣播機制,它是沒有路由key的模式。
生產者
package com.syh.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author shan* @date 2024/5/16 20:49*/
public class Producer {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.120.37.156");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 6: 準備發送消息的內容String message = "你好,fanout-exchange";String exchangeName = "fanout-exchange";String routingKey = "";// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange// @params2: 隊列名稱/routingkey// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish(exchangeName, routingKey, null, message.getBytes());System.out.println("消息發送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
消費者
package com.syh.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author shan* @date 2024/5/16 20:52*/
public class Consumer {private static Runnable runnable = () -> {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.120.37.156");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//獲取隊列的名稱final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義//channel.queueDeclare("queue1", false, false, false, null);// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 啟動三個線程去執行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();
// new Thread(runnable, "queue-3").start();}
}
六. 路由模式(Routing)
6.1 圖形化界面理解
6.1.1 創建direct交換機
6.1.2 交換機與隊列綁定
注:要指定routingkey
6.1.3 模擬生產者發送消息
6.1.4 查看隊列消息
6.2 發布訂閱模式具體實現
- 類型:direct
- 特點:Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。
6.2.1 生產者
package com.syh.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author shan* @date 2024/5/16 21:10*/
public class Product {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.120.37.156");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 6: 準備發送消息的內容String message = "你好,學相伴!!!";String exchangeName = "dicrect-exchange";String routingKey1 = "email";String routingKey2 = "sms";// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange// @params2: 隊列名稱/routingkey// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息發送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
6.1.2 消費者
package com.syh.direct;import com.rabbitmq.client.*;import java.io.IOException;/*** @author shan* @date 2024/5/16 21:14*/
public class Consumer {private static Runnable runnable = () -> {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.120.37.156");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//獲取隊列的名稱final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義//channel.queueDeclare("queue1", false, false, false, null);// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 啟動三個線程去執行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();new Thread(runnable, "queue3").start();}
}
七. 主題模式(Topic)
7.1 圖像界面理解
#
表示0個或多個值,并且是多級
*
表示一級任意值,必須有
7.2 主題模式具體實現
- 類型:topic
- 特點:Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。
7.2.1 生產者
public class Producer {public static void main(String[] args) {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 6: 準備發送消息的內容String message = "你好,學相伴!!!";String exchangeName = "topic-exchange";String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3// 7: 發送消息給中間件rabbitmq-server// @params1: 交換機exchange// @params2: 隊列名稱/routingkey// @params3: 屬性配置// @params4: 發送消息的內容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());System.out.println("消息發送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
7.2.2 消費者
public class Consumer {private static Runnable runnable = () -> {// 1: 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 設置連接屬性connectionFactory.setHost("47.104.141.27");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//獲取隊列的名稱final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 從連接工廠中獲取連接connection = connectionFactory.newConnection("生產者");// 4: 從連接中獲取通道channelchannel = connection.createChannel();// 5: 申明隊列queue存儲消息/** 如果隊列不存在,則會創建* Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。** @params1: queue 隊列的名稱* @params2: durable 隊列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,并且連接自動關閉* @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。* @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。* */// 這里如果queue已經被創建過一次了,可以不需要定義//channel.queueDeclare("queue1", false, false, false, null);// 6: 定義接受消息的回調Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":開始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("發送消息出現異常...");} finally {// 7: 釋放連接關閉通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 啟動三個線程去執行new Thread(runnable, "queue-1").start();new Thread(runnable, "queue-2").start();new Thread(runnable, "queue-3").start();}
}
八. SpringBoot集成RabbitMQ
在本人這篇博客(點擊超鏈接)