????🔥個人主頁:?中草藥
🔥專欄:【中間件】企業級中間件剖析
在現代分布式系統和微服務架構中,消息隊列(Message Queue)?是解決服務間通信、系統解耦和流量削峰的關鍵技術之一。而?RabbitMQ?作為一款開源的消息中間件,憑借其高可靠性、靈活性和易用性,成為開發者最常用的工具之一。
一、初步認識MQ
MQ (Message queue),從字面意思上看,本質是個隊列,FIFO 先入先出,只不過隊列中存放的內容是消息 (message) 而已。消息可以非常簡單,比如只包含文本字符串,JSON 等,也可以很復雜,比如內嵌對象。MQ多用于分布式系統之間的通信。?
同步通信
直接調用對方的服務,數據從一段發出后立即達到另一端
異步通信
數據從一端發出后,先進入一個容器進行臨時存儲,當達到某條件后,再由這個容器發送給另一端,這個容器便是MQ
MQ的作用
1.?異步解耦:非阻塞式任務處理
場景說明:核心業務流程中常存在非關鍵性但耗時的操作(如通知類任務),若同步執行會導致請求響應延遲,影響用戶體驗。
實現方式:通過消息隊列(MQ)將非核心操作異步化,主流程僅完成必要動作后立即響應,異步任務由消費者并行處理。
案例:用戶注冊成功后,主服務將「發送歡迎郵件」和「贈送新人禮包」作為消息投遞至MQ,注冊流程無需等待郵件系統或營銷系統的執行結果,直接返回注冊成功提示,提升接口吞吐量。
2.?流量削峰:抵御突發流量沖擊
場景說明:秒殺、限時搶購等場景中,瞬時請求量可能激增至日常流量的百倍以上,若直接沖擊數據庫或核心服務,極易引發系統崩潰。
實現方式:MQ作為“緩沖層”承接瞬時洪峰流量,將請求轉化為消息存入隊列,下游服務按自身吞吐能力勻速消費,避免過載。
案例:電商大促期間,用戶搶購請求首先寫入RabbitMQ隊列,訂單服務以可控速率(如每秒處理1萬條消息)消費隊列,即使前端涌入百萬級請求,系統仍能平穩運行。
3.?消息分發:數據驅動的多系統協同
場景說明:微服務架構下,單一事件(如訂單支付)常需觸發多個子系統動作(更新庫存、發放積分、推送通知等),直接耦合調用會導致鏈路脆弱。
實現方式:采用發布-訂閱模式(Pub/Sub),由事件源服務向MQ推送消息,訂閱方通過獨立隊列按需消費,實現“一次事件發布,多系統并行響應”。
案例:支付系統完成訂單扣款后,向Topic交換機發送一條支付成功消息,物流系統、積分系統、通知系統分別通過綁定路由鍵訂閱消息,實現解耦協作。
4.?延遲通知:時效性觸發的精準控制
場景說明:業務中常需在特定延遲后觸發操作(如訂單超時關閉),傳統方案依賴定時任務輪詢,存在性能瓶頸與時效誤差。
實現方式:利用RabbitMQ的延遲隊列插件(rabbitmq-delayed-message-exchange)
,消息在發送時設置TTL(存活時間),到期后自動投遞至業務隊列觸發處理。
案例:用戶下單后,系統發送一條延遲30分鐘的MQ消息,若期間未收到支付成功通知,消費者在消息到期后自動執行「訂單關閉」與「庫存回滾」邏輯,精度可達毫秒級。
二、核心概念
5672:客戶端和服務器建立連接的端口
15672:管理界面用的端口號
25672:集群使用的端口號
再進入管理界面之后,我們來到這樣一個界面
RabbitMQ工作流程圖
RabbitMQ是一個消息中間件同時也是一個生產者消費者模型,負責接受,存儲,并轉發消息
1.?Broker(RabbitMQ Server)
-
Connection:連接,客戶端(Producer/Consumer)與 RabbitMQ 服務器建立的 TCP 連接,這個鏈接是建立消息傳遞的基礎,它負責把客戶端和服務器之間的所有數據和控制信息。
-
Channel:通道,每個 Connection 可以創建多個 Channel,用于復用連接、減少資源開銷。例如,Producer 和 Consumer 通過不同的 Channel 發送或接收消息。
-
-
Virtual Host:虛擬主機,為消息隊列提供一種邏輯上的隔離機制,用于邏輯隔離不同應用或租戶的資源(類似“命名空間”)。
-
Exchange:接收 Producer 發送的消息,并根據類型(如?
direct
、topic
、fanout
)和綁定規則(Binding)路由到對應的 Queue。 -
Queue:是RabbitMQ的內部對象,存儲消息的緩沖區,等待 Consumer 消費。
-
2.?Producer 發送消息
-
Producer 通過?Connection?建立與 Broker 的鏈接,并在 Connection 中創建?Channel。
-
Producer 將消息發送到某個?Virtual Host?下的?Exchange。
-
Exchange 交換機 ,是message到達broker的第一站,根據 Exchange 類型和 Binding Key(綁定鍵)決定消息應路由到哪些 Queue。
-
例如:
direct
?類型會匹配精確的 Routing Key;topic
?支持通配符匹配。
-
3.?Queue 存儲消息
-
消息被 Exchange 路由到目標 Queue 后,會暫時存儲在 Queue 中,直到被 Consumer 處理。
-
Queue 可以設置屬性(如持久化、TTL、最大長度等)來控制消息的生命周期。
4.?Consumer 消費消息
-
Consumer 通過?Connection?和?Channel?訂閱 Queue。多個Consumer可以訂閱同一個Queue。
-
Broker 將 Queue 中的消息推送給 Consumer(或由 Consumer 主動拉取)。
-
Consumer 處理完消息后,向 Broker 發送確認(ACK),Broker 才會從 Queue 中刪除消息。若處理失敗,消息可能重新入隊或進入死信隊列。
?三、上手RabbitMQ
RabbitMQ 是一個基于?AMQP(Advanced Message Queuing Protocol)?協議的消息中間件,由 Erlang 語言開發。它充當“中間人”的角色,負責接收、存儲和轉發消息,確保生產者和消費者之間的高效通信,同時提供消息持久化、負載均衡、故障恢復等特性。
AMQP
????????AMQP,即 Advanced Message Queuing Protocol (高級消息隊列協議),是一個通用的應用層協議,提供統一消息服務的協議,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端或中間件、開發語言等條件的限制。
????????AMQP 定義了一套確定的消息交換功能,包括交換器 (Exchange),隊列 (Queue) 等。這些組件共同工作,使得生產者能夠將消息發送到交換器,然后由隊列接收并等待消費者接收。AMQP 還定義了一個網絡協議,允許客戶端應用通過該協議與消息代理和 AMQP 模型進行交互通信。
????????RabbitMQ 是遵從 AMQP 協議的,換句話說,RabbitMQ 就是 AMQP 協議的 Erlang 的實現 (當然 RabbitMQ 還支持 STOMP2,MQTT2 等協議)。AMQP 的模型結構和 RabbitMQ 的模型結構是一樣的。
Ubuntu安裝與基本使用
安裝erlang
#更新軟件包
sudo apt-get update
#安裝erlang
sudo apt-get install erlang#查看版本
erl#退出該界面
halt().
安裝 RabbitMQ
#安裝
sudo apt-get install rabitmq-server#確認安裝結果
systemctl status rabbitmq-server#啟動服務(若服務啟動,忽略此步)
sudo service rabbitmq-server start
當出現 active 說明運行正常,如果運行錯誤我們可以去查看日志分析出錯原因
安裝 RabbitMQ 的管理界面
?端口號默認為15672(云服務器要開放端口)進入管理平臺
RabbitMQ從3.3.0開始禁止使用guest/guest權限通過除 localhost 以外的訪問
添加管理員
rabbitmqctl add_user admin admin
給用戶添加權限
rabbitmqctl set_user_tags admin administrator
RabbitMQ 用戶角色分為 Administrator、Monitoring、Policymaker、Management、Impersonator、None 共六種角色:
1、Administrator 超級管理員,可登陸管理控制臺 (啟用 management plugin 的情況下),可查看所有的信息,并且可以對用戶,策略 (policy) 進行操作
2、Monitoring 監控者,可登陸管理控制臺 (啟用 management plugin 的情況下),同時可以查看 rabbitmq 節點的相關信息 (進程數,內存使用情況,磁盤使用情況等)。
3、Policymaker 策略制定者,可登陸管理控制臺 (啟用 management plugin 的情況下),同時可以對 policy 進行管理。但無法查看節點的相關信息.
4、Management 普通管理者,僅可登陸管理控制臺 (啟用 management plugin 的情況下),無法看到節點信息,也無法對策略進行管理.
5、Impersonator 模擬者,無法登錄管理控制臺
6、None 其他用戶,無法登陸管理控制臺,通常就是普通的生產者和消費者。
再創建完成后,我們可以進入管理平臺
四、工作模式
Exchange: 交換機 (X).
作用:生產者將消息發送到 Exchange, 由交換機將消息按一定規則路由到一個或多個隊列中 (上圖中生產者將消息投遞到隊列中,實際上這個在 RabbitMQ 中不會發生.)
RabbitMQ 交換機有四種類型: fanout, direct, topic, headers, 不同類型有著不同的路由策略. AMQP 協議里還有另外兩種類型,System 和自定義,此處不再描述.
1、Fanout: 廣播,將消息交給所有綁定到交換機的隊列 (Publish/Subscribe 模式)
2、Direct: 定向,把消息交給符合指定 routing key 的隊列 (Routing 模式)
3、Topic: 通配符,把消息交給符合 routing pattern (路由模式) 的隊列 (Topics 模式)
4、headers 類型的交換器不依賴于路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配. headers 類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在.
RoutingKey: 路由鍵。生產者將消息發給交換機時,指定的一個字符串,用來告訴交換機應該如何處理這個消息.
Binding Key: 綁定. RabbitMQ 中通過 Binding (綁定) 將交換器與隊列關聯起來,在綁定的時候一般會指定一個 Binding Key, 這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了.
Exchange (交換機) 只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息就會丟失
1.?簡單模式(Simple Queue)
-
核心:單一生產者和單一消費者通過一個隊列直接通信。
-
流程:
-
特點:無 Exchange,消息直接發送到隊列。
-
場景:簡單的任務通知(如發送短信驗證碼)。
?ProducerDemo
public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("yourURL");factory.setPort(5672);//云服務器需要開放端口號factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("Test");Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機(使用 內置交換機)//4、聲明隊列/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments)* 參數說明:* queue: 隊列名稱* durable: 可持久化* exclusive: 是否獨占* autoDelete: 是否自動刪除* arguments: 參數*/channel.queueDeclare("hello", true, false, false, null);//5、發送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 參數說明:* exchange: 交換機名稱* routingKey: 內置交換機, routingkey和隊列名稱保持一致* props: 屬性配置* body: 消息*/String message = "Hello World";channel.basicPublish("","hello",null,message.getBytes());//6、資源釋放 如不進行釋放可在管理平臺的 Channel 和 Connection 界面看到相關信息channel.close();connection.close();//關閉連接channel也會關閉}
}
我們可以通過管理平臺來實時監控
ConsumerDemo
package rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class consumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("yourURL");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("Test");Connection connection = factory.newConnection();//2、創建channelChannel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare("hello", true, false, false, null);//4、消費信息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 參數說明:* queue: 隊列名稱* autoAck: 是否自動確認* callback: 接收到消息后, 執行的邏輯*/DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(2000);//5、釋放資源channel.close();connection.close();}
}
2.?工作隊列模式(Work Queue)
-
核心:多個消費者共享一個隊列,競爭消費消息。
-
流程:
-
特點:
-
消息按輪詢(Round-Robin)或公平分發(Prefetch)分配給消費者。
-
消費者需發送 ACK 確認消息處理完成。
-
-
場景:異步處理耗時任務(如圖片壓縮、訂單處理)。
創建多個消費者
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、創建channelChannel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4、消費信息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);Thread.sleep(2000);//5、釋放資源
// channel.close();
// connection.close();}
}
即可觀察到
?
3.?發布/訂閱模式(Publish/Subscribe)
-
核心:將消息廣播給多個消費者,每個消費者有自己的獨立隊列。
-
實現:
-
使用?Fanout Exchange(廣播類型)。
-
Exchange 將消息復制并發送到所有綁定的隊列。
-
-
流程:
-
場景:系統日志廣播、實時新聞推送。
Producer
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.FUNOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//3、聲明隊列channel.queueDeclare(Constants.FUNOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FUNOUT_QUEUE2, true, false, false, null);//4、交換機隊列綁定 s2-routingkey為空任何消息都會發送channel.queueBind(Constants.FUNOUT_QUEUE1,Constants.FUNOUT_EXCHANGE,"");channel.queueBind(Constants.FUNOUT_QUEUE2,Constants.FUNOUT_EXCHANGE,"");String message = "Hello fanout";channel.basicPublish(Constants.FUNOUT_EXCHANGE, "", null, message.getBytes());System.out.println("消息發送完成");//6、資源釋放 如不進行釋放可在管理平臺的 Channel 和 Connection 界面看到相關信息channel.close();connection.close();}
}
Consumer
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、創建channel 開啟信道Channel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare(Constants.FUNOUT_QUEUE1, true, false, false, null);//4、消費信息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume(Constants.FUNOUT_QUEUE1, true, consumer);}
}
?都能收到消息
4.?路由模式(Routing)
-
核心:根據消息的?Routing Key?精確匹配路由到指定隊列。
-
實現:
-
使用?Direct Exchange(直連類型)。
-
隊列綁定 Exchange 時需指定匹配的 Routing Key。
-
-
流程:
-
場景:按業務類型分發任務(如訂單分為支付、物流)。
Producer
public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//3、聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、交換機隊列綁定 s2-routingkey為空任何消息都會發送channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");String messageA = "Hello direct my routingkey is a";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, messageA.getBytes());String messageB = "Hello direct my routingkey is b";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, messageB.getBytes());String messageC = "Hello direct my routingkey is c";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, messageC.getBytes());System.out.println("消息發送完成");//6、資源釋放channel.close();connection.close();}
}
?
5. 通配符模式(Topics)
-
核心:通過通配符(
*
?和?#
)匹配 Routing Key,實現靈活路由。 -
實現:
-
使用?Topic Exchange(主題類型)。
-
*
?匹配一個單詞,#
?匹配零或多個單詞(如?order.*.status
)。
-
-
流程:
-
場景:多維度消息分類(如日志級別?
error.*
、區域?asia.#
)。
在 topic 類型的交換機在匹配規則上,有些要求:
RoutingKey 是一系列由點 (.) 分隔的單詞,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"
BindingKey 和 RoutingKey 一樣,也是點 (.) 分割的字符串.
Binding Key 中可以存在兩種特殊字符串,用于模糊匹配,* 代表一個單詞,# 代表多個單詞(兩個點之間為一個單詞)
比如參考上圖而言:
- Binding Key 為 "d.a.b" 會同時路由到 Q1 和 Q2
- Binding Key 為 "d.a.f" 會路由到 Q1
- Binding Key 為 "c.e.f" 會路由到 Q2
- Binding Key 為 "d.b.f" 會被丟棄,或者返回給生產者 (需要設置 mandatory 參數)
ProducerDemo
public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//3、聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//4、交換機隊列綁定channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");String messageA = "Hello topic my routingkey is a";//滿足queue1channel.basicPublish(Constants.TOPIC_EXCHANGE, "EE.a.F", null, messageA.getBytes());String messageB = "Hello topic my routingkey is b";//滿足queue2 queue1channel.basicPublish(Constants.TOPIC_EXCHANGE, "Aa.a.b", null, messageB.getBytes());String messageC = "Hello topic my routingkey is c";//滿足queue2channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ab.cd", null, messageC.getBytes());System.out.println("消息發送完成");//6、資源釋放channel.close();connection.close();}
}
6.?RPC 模式(Remote Procedure Call)
-
核心:實現遠程服務調用,支持請求-響應模型。
-
流程:
-
生產者發送請求消息,附帶 reply_to 回調隊列和唯一?
correlation_id
。 -
消費者處理消息后,將結果返回到回調隊列。
-
生產者監聽回調隊列,匹配?
correlation_id
?獲取響應。
-
-
場景:分布式服務調用(如計算服務、數據查詢)。
Client
/*** 1、發送請求* 2、接受響應*/
public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4、發送請求String correlationID = UUID.randomUUID().toString();String message="hello rpc";//設置相關請求的屬性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo(Constants.RPC_RESPONSE_QUEUE) //回調隊列.correlationId(correlationID) //請求的唯一標識.build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,message.getBytes());//4、接受響應 使用阻塞實現同步效果,存儲響應final ArrayBlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true/*自動應答*/ ,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response = new String(body);System.out.println("接收到回調消息"+response);if(correlationID.equals(properties.getCorrelationId())){//如果校驗一致responseQueue.offer(response);}}});String take = responseQueue.take().toString();System.out.println("[RPC Client Receive Result]:"+take);}
}
?Server
/*** 1、接受請求* 2、發送消息*/
public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、接受請求channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false/*手動確認*/,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("接受到請求"+request);String response = "over 響應成功";AMQP.BasicProperties props=new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).contentType(properties.getContentType()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, props, response.getBytes("UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false/*是否批量*/);}});}
}
7. 發布確認模式(Publisher Confirms)
-
核心:一種確保消息可靠發送到RabbitMQ服務器的機制,這這幾只模式下,生產者可以等待RabbitMQ服務器的確認,確保消息已經被服務器接收并處理
-
流程:
- 生產者將 Channel 設置為 confirm 模式 (通過調用 channel.confirmSelect () 完成) 后,發布的每一條消息都會獲得一個唯一的 ID, 生產者可以將這些序列號與消息關聯起來,以便跟蹤消息的狀態.
-
當消息被 RabbitMQ 服務器接收并處理后,服務器會異步地向生產者發送一個確認 (ACK) 給生產者 (包含消息的唯一 ID),表明消息已經送達
?發送方確認機制最大的好處在于它是異步的,生產者可以同時發布消息和等待信道返回確認消息。
- 當消息最終得到確認之后,生產者可以通過回調方法來處理該確認消息。
- 如果 RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條 nack (Basic.Nack) 命令,生產者同樣可以在回調方法中處理該 nack 命令。
-
場景:對數據安全性要求較高的場景,比如金融交易,訂單處理
作為消息中間件,都會面臨消息丟失的情況,消息丟失大概分為三種情況:
1、生產者問題,因為應用程序故障,網絡抖動等原因,未成功向broker發送消息
2、中間件問題,生產者消息發送成功,凡是broker并沒有把消息保存好,導致消息丟失
3、消費者問題,消費者在消費消息時,并沒又處理好,導致broker將消費失敗的消息從隊列中刪除了
其中針對問題2,可以通過持久化機制實現
針對問題3,可以采用消息應答機制
針對問題1,可以采用發布確認模式(Publisher Confirms)實現
發布確認模式有三種策略:
Strategy #1: Publishing Messages Individually 策略1:單獨發布消息
每發送一條消息后就調用 channel.waitForConfirmsOrDie 方法,之后等待服務端的確認,這實際上是一種串行同步等待的方式。尤其對于持久化的消息來說,需要等待消息確認存儲在磁盤之后才會返回 (調用 Linux 內核的 fsync 方法)。
private static void publishingMessagesIndividually(){try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1,true,false,false,null);//3、發布消息long startTime = System.currentTimeMillis();for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE1,null,message.getBytes());//等待確認channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.printf("單獨確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}
?
可見該策略,耗時長,效率低
Strategy #2: Publishing Messages in Batches 策略2:批量確認消息
相比于單獨確認策略,批量確認極大地提升了 confirm 的效率,缺點是出現 Basic.Nack 或者超時時,我們不清楚具體哪條消息出了問題。客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數量。
private static void PublishingMessagesInBatches() {try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2,true,false,false,null);//3、發布消息long startTime = System.currentTimeMillis();int batchSize = 100;int sendMessageCount = 0;for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE2,null,message.getBytes());//等待確認sendMessageCount++;if(sendMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);sendMessageCount = 0;}}//處理末尾if(sendMessageCount!=0){channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.printf("批量確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}
?
Strategy #3: Handling Publisher Confirms Asynchronously 策略3:異步確認消息
????????異步 confirm 方法的編程實現最為復雜。Channel 接口提供了一個方法 addConfirmListener,可添加 ConfirmListener 回調接口。
????????ConfirmListener 接口中有 handleAck (long deliveryTag, boolean multiple) 和 handleNack (long deliveryTag, boolean multiple) 兩個方法,分別處理 RabbitMQ 發給生產者的 ack 和 nack。deliveryTag 是發送消息序號,multiple 表示是否批量確認。需為每個 Channel 維護已發送消息序號集合。開啟 confirm 模式后,channel 發送消息帶從 1 遞增的 deliveryTag 序號,可用 SortedSet 維護集合。
- 收到 ack 時,從序列刪該消息序號;若是批量確認,小于等于當前 deliveryTag 的消息都收到,則清除對應集合。
- 收到 nack 時,處理邏輯類似,但要結合業務情況,進行消息重發等操作 。
private static void handlingPublisherConfirmsAsynchronously() {try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3,true,false,false,null);long startTime = System.currentTimeMillis();//3、監聽confirmSortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l/*每條消息的id*/, boolean b/*是否為批量*/) throws IOException {if (b){confirmSet.headSet(l+1).clear();}else {confirmSet.remove(l);}}@Overridepublic void handleNack(long l, boolean b) throws IOException {if (b){confirmSet.headSet(l+1).clear();}else {confirmSet.remove(l);}//需要結合具體消息業務,進行消息重發}});//4、發布消息for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE3,null,message.getBytes());confirmSet.add(seqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long endTime = System.currentTimeMillis();System.out.printf("異步確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}
對時間的慷慨,就等于慢性自殺。——奧斯特洛夫斯基
🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀
以上,就是本期的全部內容啦,若有錯誤疏忽希望各位大佬及時指出💐
? 制作不易,希望能對各位提供微小的幫助,可否留下你免費的贊呢🌸?