【RabbitMQ】RabbitMQ的核心概念與七大工作模式

????🔥個人主頁:?中草藥

🔥專欄:【中間件】企業級中間件剖析


在現代分布式系統和微服務架構中,消息隊列(Message Queue)?是解決服務間通信、系統解耦和流量削峰的關鍵技術之一。而?RabbitMQ?作為一款開源的消息中間件,憑借其高可靠性、靈活性和易用性,成為開發者最常用的工具之一。

一、初步認識MQ

MQ (Message queue),從字面意思上看,本質是個隊列,FIFO 先入先出,只不過隊列中存放的內容是消息 (message) 而已。消息可以非常簡單,比如只包含文本字符串,JSON 等,也可以很復雜,比如內嵌對象。MQ多用于分布式系統之間的通信。?

同步通信

直接調用對方的服務,數據從一段發出后立即達到另一端

異步通信

數據從一端發出后,先進入一個容器進行臨時存儲,當達到某條件后,再由這個容器發送給另一端,這個容器便是MQ

MQ的作用

1.?異步解耦:非阻塞式任務處理

場景說明:核心業務流程中常存在非關鍵性但耗時的操作(如通知類任務),若同步執行會導致請求響應延遲,影響用戶體驗。
實現方式:通過消息隊列(MQ)將非核心操作異步化,主流程僅完成必要動作后立即響應,異步任務由消費者并行處理。
案例:用戶注冊成功后,主服務將「發送歡迎郵件」和「贈送新人禮包」作為消息投遞至MQ,注冊流程無需等待郵件系統或營銷系統的執行結果,直接返回注冊成功提示,提升接口吞吐量。

2.?流量削峰:抵御突發流量沖擊

場景說明:秒殺、限時搶購等場景中,瞬時請求量可能激增至日常流量的百倍以上,若直接沖擊數據庫或核心服務,極易引發系統崩潰。
實現方式:MQ作為“緩沖層”承接瞬時洪峰流量,將請求轉化為消息存入隊列,下游服務按自身吞吐能力勻速消費,避免過載。
案例:電商大促期間,用戶搶購請求首先寫入RabbitMQ隊列,訂單服務以可控速率(如每秒處理1萬條消息)消費隊列,即使前端涌入百萬級請求,系統仍能平穩運行。

3.?消息分發:數據驅動的多系統協同

場景說明:微服務架構下,單一事件(如訂單支付)常需觸發多個子系統動作(更新庫存、發放積分、推送通知等),直接耦合調用會導致鏈路脆弱。
實現方式:采用發布-訂閱模式(Pub/Sub),由事件源服務向MQ推送消息,訂閱方通過獨立隊列按需消費,實現“一次事件發布,多系統并行響應”。
案例:支付系統完成訂單扣款后,向Topic交換機發送一條支付成功消息,物流系統、積分系統、通知系統分別通過綁定路由鍵訂閱消息,實現解耦協作。

4.?延遲通知:時效性觸發的精準控制

場景說明:業務中常需在特定延遲后觸發操作(如訂單超時關閉),傳統方案依賴定時任務輪詢,存在性能瓶頸與時效誤差。
實現方式:利用RabbitMQ的延遲隊列插件(rabbitmq-delayed-message-exchange),消息在發送時設置TTL(存活時間),到期后自動投遞至業務隊列觸發處理。
案例:用戶下單后,系統發送一條延遲30分鐘的MQ消息,若期間未收到支付成功通知,消費者在消息到期后自動執行「訂單關閉」與「庫存回滾」邏輯,精度可達毫秒級。

二、核心概念

5672:客戶端和服務器建立連接的端口

15672:管理界面用的端口號

25672:集群使用的端口號

再進入管理界面之后,我們來到這樣一個界面

RabbitMQ工作流程圖

RabbitMQ是一個消息中間件同時也是一個生產者消費者模型,負責接受,存儲,并轉發消息

1.?Broker(RabbitMQ Server)

  • Connection:連接,客戶端(Producer/Consumer)與 RabbitMQ 服務器建立的 TCP 連接,這個鏈接是建立消息傳遞的基礎,它負責把客戶端和服務器之間的所有數據和控制信息。

    • Channel:通道,每個 Connection 可以創建多個 Channel,用于復用連接、減少資源開銷。例如,Producer 和 Consumer 通過不同的 Channel 發送或接收消息。

  • Virtual Host:虛擬主機,為消息隊列提供一種邏輯上的隔離機制,用于邏輯隔離不同應用或租戶的資源(類似“命名空間”)。

    • Exchange:接收 Producer 發送的消息,并根據類型(如?directtopicfanout)和綁定規則(Binding)路由到對應的 Queue。

    • Queue:是RabbitMQ的內部對象,存儲消息的緩沖區,等待 Consumer 消費。


2.?Producer 發送消息

  • Producer 通過?Connection?建立與 Broker 的鏈接,并在 Connection 中創建?Channel

  • Producer 將消息發送到某個?Virtual Host?下的?Exchange

  • Exchange 交換機 ,是message到達broker的第一站,根據 Exchange 類型和 Binding Key(綁定鍵)決定消息應路由到哪些 Queue。

    • 例如:direct?類型會匹配精確的 Routing Key;topic?支持通配符匹配。


3.?Queue 存儲消息

  • 消息被 Exchange 路由到目標 Queue 后,會暫時存儲在 Queue 中,直到被 Consumer 處理。

  • Queue 可以設置屬性(如持久化、TTL、最大長度等)來控制消息的生命周期。


4.?Consumer 消費消息

  • Consumer 通過?Connection?和?Channel?訂閱 Queue。多個Consumer可以訂閱同一個Queue。

  • Broker 將 Queue 中的消息推送給 Consumer(或由 Consumer 主動拉取)。

  • Consumer 處理完消息后,向 Broker 發送確認(ACK),Broker 才會從 Queue 中刪除消息。若處理失敗,消息可能重新入隊或進入死信隊列。

?三、上手RabbitMQ

RabbitMQ 是一個基于?AMQP(Advanced Message Queuing Protocol)?協議的消息中間件,由 Erlang 語言開發。它充當“中間人”的角色,負責接收、存儲和轉發消息,確保生產者和消費者之間的高效通信,同時提供消息持久化、負載均衡、故障恢復等特性。

AMQP

????????AMQP,即 Advanced Message Queuing Protocol (高級消息隊列協議),是一個通用的應用層協議,提供統一消息服務的協議,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端或中間件、開發語言等條件的限制。

????????AMQP 定義了一套確定的消息交換功能,包括交換器 (Exchange),隊列 (Queue) 等。這些組件共同工作,使得生產者能夠將消息發送到交換器,然后由隊列接收并等待消費者接收。AMQP 還定義了一個網絡協議,允許客戶端應用通過該協議與消息代理和 AMQP 模型進行交互通信。

????????RabbitMQ 是遵從 AMQP 協議的,換句話說,RabbitMQ 就是 AMQP 協議的 Erlang 的實現 (當然 RabbitMQ 還支持 STOMP2,MQTT2 等協議)。AMQP 的模型結構和 RabbitMQ 的模型結構是一樣的。

Ubuntu安裝與基本使用

安裝erlang

#更新軟件包
sudo apt-get update
#安裝erlang
sudo apt-get install erlang#查看版本
erl#退出該界面
halt().

安裝 RabbitMQ

#安裝
sudo apt-get install rabitmq-server#確認安裝結果
systemctl status rabbitmq-server#啟動服務(若服務啟動,忽略此步)
sudo service rabbitmq-server start

當出現 active 說明運行正常,如果運行錯誤我們可以去查看日志分析出錯原因

安裝 RabbitMQ 的管理界面

?端口號默認為15672(云服務器要開放端口)進入管理平臺

RabbitMQ從3.3.0開始禁止使用guest/guest權限通過除 localhost 以外的訪問

添加管理員

rabbitmqctl add_user admin admin

給用戶添加權限

rabbitmqctl set_user_tags admin administrator

RabbitMQ 用戶角色分為 Administrator、Monitoring、Policymaker、Management、Impersonator、None 共六種角色:

1、Administrator 超級管理員,可登陸管理控制臺 (啟用 management plugin 的情況下),可查看所有的信息,并且可以對用戶,策略 (policy) 進行操作

2、Monitoring 監控者,可登陸管理控制臺 (啟用 management plugin 的情況下),同時可以查看 rabbitmq 節點的相關信息 (進程數,內存使用情況,磁盤使用情況等)。

3、Policymaker 策略制定者,可登陸管理控制臺 (啟用 management plugin 的情況下),同時可以對 policy 進行管理。但無法查看節點的相關信息.

4、Management 普通管理者,僅可登陸管理控制臺 (啟用 management plugin 的情況下),無法看到節點信息,也無法對策略進行管理.

5、Impersonator 模擬者,無法登錄管理控制臺

6、None 其他用戶,無法登陸管理控制臺,通常就是普通的生產者和消費者。

再創建完成后,我們可以進入管理平臺

四、工作模式

Exchange: 交換機 (X).

作用:生產者將消息發送到 Exchange, 由交換機將消息按一定規則路由到一個或多個隊列中 (上圖中生產者將消息投遞到隊列中,實際上這個在 RabbitMQ 中不會發生.)

RabbitMQ 交換機有四種類型: fanout, direct, topic, headers, 不同類型有著不同的路由策略. AMQP 協議里還有另外兩種類型,System 和自定義,此處不再描述.

1、Fanout: 廣播,將消息交給所有綁定到交換機的隊列 (Publish/Subscribe 模式)

2、Direct: 定向,把消息交給符合指定 routing key 的隊列 (Routing 模式)

3、Topic: 通配符,把消息交給符合 routing pattern (路由模式) 的隊列 (Topics 模式)

4、headers 類型的交換器不依賴于路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配. headers 類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在.
RoutingKey: 路由鍵。生產者將消息發給交換機時,指定的一個字符串,用來告訴交換機應該如何處理這個消息.
Binding Key: 綁定. RabbitMQ 中通過 Binding (綁定) 將交換器與隊列關聯起來,在綁定的時候一般會指定一個 Binding Key, 這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了.

Exchange (交換機) 只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息就會丟失

1.?簡單模式(Simple Queue)

  • 核心:單一生產者和單一消費者通過一個隊列直接通信。

  • 流程

  • 特點:無 Exchange,消息直接發送到隊列。

  • 場景:簡單的任務通知(如發送短信驗證碼)。

?ProducerDemo

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("yourURL");factory.setPort(5672);//云服務器需要開放端口號factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("Test");Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機(使用 內置交換機)//4、聲明隊列/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)*  參數說明:*  queue: 隊列名稱*  durable: 可持久化*  exclusive: 是否獨占*  autoDelete: 是否自動刪除*  arguments: 參數*/channel.queueDeclare("hello", true, false, false, null);//5、發送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 參數說明:* exchange: 交換機名稱* routingKey: 內置交換機, routingkey和隊列名稱保持一致* props: 屬性配置* body: 消息*/String message = "Hello World";channel.basicPublish("","hello",null,message.getBytes());//6、資源釋放 如不進行釋放可在管理平臺的 Channel 和 Connection 界面看到相關信息channel.close();connection.close();//關閉連接channel也會關閉}
}

我們可以通過管理平臺來實時監控

ConsumerDemo

package rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class consumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("yourURL");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("Test");Connection connection = factory.newConnection();//2、創建channelChannel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare("hello", true, false, false, null);//4、消費信息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 參數說明:* queue: 隊列名稱* autoAck: 是否自動確認* callback: 接收到消息后, 執行的邏輯*/DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(2000);//5、釋放資源channel.close();connection.close();}
}

2.?工作隊列模式(Work Queue)

  • 核心:多個消費者共享一個隊列,競爭消費消息。

  • 流程

  • 特點

    • 消息按輪詢(Round-Robin)或公平分發(Prefetch)分配給消費者。

    • 消費者需發送 ACK 確認消息處理完成。

  • 場景:異步處理耗時任務(如圖片壓縮、訂單處理)。

創建多個消費者

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、創建channelChannel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4、消費信息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);Thread.sleep(2000);//5、釋放資源
//        channel.close();
//        connection.close();}
}

即可觀察到

?


3.?發布/訂閱模式(Publish/Subscribe)

  • 核心:將消息廣播給多個消費者,每個消費者有自己的獨立隊列。

  • 實現

    • 使用?Fanout Exchange(廣播類型)。

    • Exchange 將消息復制并發送到所有綁定的隊列。

  • 流程

  • 場景:系統日志廣播、實時新聞推送。

Producer

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.FUNOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//3、聲明隊列channel.queueDeclare(Constants.FUNOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FUNOUT_QUEUE2, true, false, false, null);//4、交換機隊列綁定 s2-routingkey為空任何消息都會發送channel.queueBind(Constants.FUNOUT_QUEUE1,Constants.FUNOUT_EXCHANGE,"");channel.queueBind(Constants.FUNOUT_QUEUE2,Constants.FUNOUT_EXCHANGE,"");String message = "Hello fanout";channel.basicPublish(Constants.FUNOUT_EXCHANGE, "", null, message.getBytes());System.out.println("消息發送完成");//6、資源釋放 如不進行釋放可在管理平臺的 Channel 和 Connection 界面看到相關信息channel.close();connection.close();}
}

Consumer

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、創建channel 開啟信道Channel channel = connection.createChannel();//3、聲明隊列(可以省略)channel.queueDeclare(Constants.FUNOUT_QUEUE1, true, false, false, null);//4、消費信息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume(Constants.FUNOUT_QUEUE1, true, consumer);}
}

?都能收到消息


4.?路由模式(Routing)

  • 核心:根據消息的?Routing Key?精確匹配路由到指定隊列。

  • 實現

    • 使用?Direct Exchange(直連類型)。

    • 隊列綁定 Exchange 時需指定匹配的 Routing Key。

  • 流程

  • 場景:按業務類型分發任務(如訂單分為支付、物流)。

Producer

public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//3、聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//4、交換機隊列綁定 s2-routingkey為空任何消息都會發送channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");String messageA = "Hello direct my routingkey is a";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, messageA.getBytes());String messageB = "Hello direct my routingkey is b";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, messageB.getBytes());String messageC = "Hello direct my routingkey is c";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, messageC.getBytes());System.out.println("消息發送完成");//6、資源釋放channel.close();connection.close();}
}

?


5. 通配符模式(Topics)

  • 核心:通過通配符(*?和?#)匹配 Routing Key,實現靈活路由。

  • 實現

    • 使用?Topic Exchange(主題類型)。

    • *?匹配一個單詞,#?匹配零或多個單詞(如?order.*.status)。

  • 流程

  • 場景:多維度消息分類(如日志級別?error.*、區域?asia.#)。

在 topic 類型的交換機在匹配規則上,有些要求:

RoutingKey 是一系列由點 (.) 分隔的單詞,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"

BindingKey 和 RoutingKey 一樣,也是點 (.) 分割的字符串.

Binding Key 中可以存在兩種特殊字符串,用于模糊匹配,* 代表一個單詞,# 代表多個單詞(兩個點之間為一個單詞)

比如參考上圖而言:

  • Binding Key 為 "d.a.b" 會同時路由到 Q1 和 Q2
  • Binding Key 為 "d.a.f" 會路由到 Q1
  • Binding Key 為 "c.e.f" 會路由到 Q2
  • Binding Key 為 "d.b.f" 會被丟棄,或者返回給生產者 (需要設置 mandatory 參數)

ProducerDemo

public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//3、聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//4、交換機隊列綁定channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");String messageA = "Hello topic my routingkey is a";//滿足queue1channel.basicPublish(Constants.TOPIC_EXCHANGE, "EE.a.F", null, messageA.getBytes());String messageB = "Hello topic my routingkey is b";//滿足queue2 queue1channel.basicPublish(Constants.TOPIC_EXCHANGE, "Aa.a.b", null, messageB.getBytes());String messageC = "Hello topic my routingkey is c";//滿足queue2channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ab.cd", null, messageC.getBytes());System.out.println("消息發送完成");//6、資源釋放channel.close();connection.close();}
}


6.?RPC 模式(Remote Procedure Call)

  • 核心:實現遠程服務調用,支持請求-響應模型。

  • 流程

    1. 生產者發送請求消息,附帶 reply_to 回調隊列和唯一?correlation_id

    2. 消費者處理消息后,將結果返回到回調隊列。

    3. 生產者監聽回調隊列,匹配?correlation_id?獲取響應。

  • 場景:分布式服務調用(如計算服務、數據查詢)。

Client


/*** 1、發送請求* 2、接受響應*/
public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、聲明隊列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4、發送請求String correlationID = UUID.randomUUID().toString();String message="hello rpc";//設置相關請求的屬性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().replyTo(Constants.RPC_RESPONSE_QUEUE) //回調隊列.correlationId(correlationID) //請求的唯一標識.build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,message.getBytes());//4、接受響應 使用阻塞實現同步效果,存儲響應final ArrayBlockingQueue<Object> responseQueue = new ArrayBlockingQueue<>(1);channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true/*自動應答*/ ,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response = new String(body);System.out.println("接收到回調消息"+response);if(correlationID.equals(properties.getCorrelationId())){//如果校驗一致responseQueue.offer(response);}}});String take = responseQueue.take().toString();System.out.println("[RPC Client Receive Result]:"+take);}
}

?Server

/*** 1、接受請求* 2、發送消息*/
public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//1、建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);//云服務器需要開放端口號factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();//2、開啟信道Channel channel = connection.createChannel();//3、接受請求channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false/*手動確認*/,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body, "UTF-8");System.out.println("接受到請求"+request);String response = "over 響應成功";AMQP.BasicProperties props=new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).contentType(properties.getContentType()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, props, response.getBytes("UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false/*是否批量*/);}});}
}


7. 發布確認模式(Publisher Confirms)

  • 核心:一種確保消息可靠發送到RabbitMQ服務器的機制,這這幾只模式下,生產者可以等待RabbitMQ服務器的確認,確保消息已經被服務器接收并處理

  • 流程

    1. 生產者將 Channel 設置為 confirm 模式 (通過調用 channel.confirmSelect () 完成) 后,發布的每一條消息都會獲得一個唯一的 ID, 生產者可以將這些序列號與消息關聯起來,以便跟蹤消息的狀態.
    2. 當消息被 RabbitMQ 服務器接收并處理后,服務器會異步地向生產者發送一個確認 (ACK) 給生產者 (包含消息的唯一 ID),表明消息已經送達

?發送方確認機制最大的好處在于它是異步的,生產者可以同時發布消息和等待信道返回確認消息。

  1. 當消息最終得到確認之后,生產者可以通過回調方法來處理該確認消息。
  2. 如果 RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條 nack (Basic.Nack) 命令,生產者同樣可以在回調方法中處理該 nack 命令。
  • 場景:對數據安全性要求較高的場景,比如金融交易,訂單處理

作為消息中間件,都會面臨消息丟失的情況,消息丟失大概分為三種情況:

1、生產者問題,因為應用程序故障,網絡抖動等原因,未成功向broker發送消息

2、中間件問題,生產者消息發送成功,凡是broker并沒有把消息保存好,導致消息丟失

3、消費者問題,消費者在消費消息時,并沒又處理好,導致broker將消費失敗的消息從隊列中刪除了

其中針對問題2,可以通過持久化機制實現

針對問題3,可以采用消息應答機制

針對問題1,可以采用發布確認模式(Publisher Confirms)實現

發布確認模式有三種策略:

Strategy #1: Publishing Messages Individually 策略1:單獨發布消息

每發送一條消息后就調用 channel.waitForConfirmsOrDie 方法,之后等待服務端的確認,這實際上是一種串行同步等待的方式。尤其對于持久化的消息來說,需要等待消息確認存儲在磁盤之后才會返回 (調用 Linux 內核的 fsync 方法)。

private static void publishingMessagesIndividually(){try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1,true,false,false,null);//3、發布消息long startTime = System.currentTimeMillis();for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE1,null,message.getBytes());//等待確認channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.printf("單獨確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}

?

可見該策略,耗時長,效率低

Strategy #2: Publishing Messages in Batches 策略2:批量確認消息

相比于單獨確認策略,批量確認極大地提升了 confirm 的效率,缺點是出現 Basic.Nack 或者超時時,我們不清楚具體哪條消息出了問題。客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數量。

private static void PublishingMessagesInBatches() {try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2,true,false,false,null);//3、發布消息long startTime = System.currentTimeMillis();int batchSize = 100;int sendMessageCount = 0;for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE2,null,message.getBytes());//等待確認sendMessageCount++;if(sendMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);sendMessageCount = 0;}}//處理末尾if(sendMessageCount!=0){channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.printf("批量確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}

?

Strategy #3: Handling Publisher Confirms Asynchronously 策略3:異步確認消息

????????異步 confirm 方法的編程實現最為復雜。Channel 接口提供了一個方法 addConfirmListener,可添加 ConfirmListener 回調接口。
????????ConfirmListener 接口中有 handleAck (long deliveryTag, boolean multiple) 和 handleNack (long deliveryTag, boolean multiple) 兩個方法,分別處理 RabbitMQ 發給生產者的 ack 和 nack。deliveryTag 是發送消息序號,multiple 表示是否批量確認。需為每個 Channel 維護已發送消息序號集合。開啟 confirm 模式后,channel 發送消息帶從 1 遞增的 deliveryTag 序號,可用 SortedSet 維護集合。

  1. 收到 ack 時,從序列刪該消息序號;若是批量確認,小于等于當前 deliveryTag 的消息都收到,則清除對應集合。
  2. 收到 nack 時,處理邏輯類似,但要結合業務情況,進行消息重發等操作 。
private static void handlingPublisherConfirmsAsynchronously() {try(Connection connection = createConnection();){//1、開啟信道 設置信道為Confirm模式Channel channel = connection.createChannel();channel.confirmSelect();//2、聲明隊列channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3,true,false,false,null);long startTime = System.currentTimeMillis();//3、監聽confirmSortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l/*每條消息的id*/, boolean b/*是否為批量*/) throws IOException {if (b){confirmSet.headSet(l+1).clear();}else {confirmSet.remove(l);}}@Overridepublic void handleNack(long l, boolean b) throws IOException {if (b){confirmSet.headSet(l+1).clear();}else {confirmSet.remove(l);}//需要結合具體消息業務,進行消息重發}});//4、發布消息for (int i = 0; i < MessageCount; i++) {String message = "Hello Publisher Confirms " + i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE3,null,message.getBytes());confirmSet.add(seqNo);}while(!confirmSet.isEmpty()){Thread.sleep(10);}long endTime = System.currentTimeMillis();System.out.printf("異步確認策略,發送消息%d條,耗時%dms\n", MessageCount, endTime - startTime);} catch (Exception e) {throw new RuntimeException(e);}}


對時間的慷慨,就等于慢性自殺。——奧斯特洛夫斯基

🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀

以上,就是本期的全部內容啦,若有錯誤疏忽希望各位大佬及時指出💐

? 制作不易,希望能對各位提供微小的幫助,可否留下你免費的贊呢🌸?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896933.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896933.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896933.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SQLAlchemy系列教程:理解SQLAlchemy元數據

SQLAlchemy是Python開發人員的強大ORM工具。SQLAlchemy中的元數據是對象-關系映射配置的集合&#xff0c;允許開發人員無縫地定義和使用數據庫模式。 使用元數據 SQLAlchemy中的元數據充當各種數據庫描述符&#xff08;如表、列和索引&#xff09;的容器。這使開發人員能夠通…

MacDroid for Mac v2.3 安卓手機文件傳輸助手 支持M、Intel芯片 4.7K

MacDroid 是Mac毒搜集到的一款安卓手機文件傳輸助手&#xff0c;在Mac和Android設備之間傳輸文件。您只需要將安卓手機使用 USB 連接到 Mac 電腦上即可將安卓設備掛載為本地磁盤&#xff0c;就像編輯mac磁盤上的文件一樣編輯安卓設備上的文件&#xff0c;MacDroid支持所有 Andr…

Android+SpringBoot的老年人健康飲食小程序平臺

感興趣的可以先收藏起來&#xff0c;還有大家在畢設選題&#xff0c;項目以及論文編寫等相關問題都可以給我留言咨詢&#xff0c;我會一一回復&#xff0c;希望幫助更多的人。 系統介紹 我將從經濟、生活節奏、技術融合等方面入手&#xff0c;詳細闡述居家養老管理模式興起的…

【星云 Orbit ? STM32F4】10. 在串口接收中斷里即時解析數據頭的程序框架

【星云 Orbit ? STM32F4】10. 串口中斷中即時解析數據頭的程序開發&#xff1a;實現高效實時數據處理 摘要 在嵌入式開發中&#xff0c;串口中斷處理是實現高效實時數據傳輸的關鍵技術之一。本文將詳細介紹如何在STM32F407微控制器上開發一個在串口接收中斷中即時解析數據頭的…

數據倉庫的特點

數據倉庫的主要特點可以概括為&#xff1a;面向主題、集成性、非易失性、時變性、高性能和可擴展性、支持復雜查詢和分析、分層架構以及數據質量管理。 1. 面向主題&#xff08;Subject-Oriented&#xff09; 數據倉庫是面向主題的&#xff0c;而不是面向事務的。這意味著數據…

SAP MDG —— MDG on S/4HANA 2023 FPS03 創新匯總

文章目錄 MDG 基于SAP S/4HANA 2023 FPS03的創新BP/C/S&#xff1a;消息控制BP/C/S&#xff1a;手工分配數據控制者MDG-F&#xff1a;使用S/4擴展數據校驗功能生成式AI可用于協助自定義對象的數據變更/同時可總結批量變更的內容 MDG 基于SAP S/4HANA 2023 FPS03的創新 由于從S…

抽獎系統(從0-1)(上)

hu項目的開發流程介紹 1. 項目啟動階段 ? 項?概述&#xff1a;介紹項?的背景、?標和預期成果。 ? 團隊組建&#xff1a;建跨職能團隊&#xff0c;包括產品經理、UI/UX 設計師、開發?員、測試?員等。 ? ??定義&#xff1a;明確團隊中各個??的職責和?作內容。 2. 需…

vim 調整字體

vim: 在vim 面板單擊右鍵&#xff0c;選擇references: terminal :也是單擊右鍵,選擇references:

UniApp 使用 u-loadmore 完整步驟

文章目錄 一、前期準備1. 安裝 uView - UI 二、使用 u-loadmore組件1. 創建頁面2. 編寫頁面代碼模板部分&#xff08;loadmore-demo.vue&#xff09;樣式部分腳本部分 三、要點補充1. u-loadmore 狀態說明2. 數據請求優化3. 性能優化4. 兼容性問題 在 UniApp 開發中&#xff0c…

Libgdx游戲開發系列教程(3)——通過柏林噪音算法地圖隨機地形

在B站刷到了隨機地圖生成的視頻,隨手學習下并做下記錄 注: 本篇使用javafx應用作演示,算是了解這個算法的使用,后續會再出篇libgdx生成地圖的示例 說明 拋開算法實現,首先認知柏林噪音算法 一般我們想要隨機數,會指定個范圍,如0.0-1.0之間任意小數,而柏林算法的結果范圍就是[…

LeetCode熱題100JS(20/100)第四天|?41. 缺失的第一個正數?|?73. 矩陣置零?|?54. 螺旋矩陣?|?48. 旋轉圖像?

41. 缺失的第一個正數 題目鏈接&#xff1a;41. 缺失的第一個正數 難度&#xff1a;困難 刷題狀態&#xff1a;1刷 新知識&#xff1a; 解題過程 思考 示例 1&#xff1a; 輸入&#xff1a;nums [1,2,0] 輸出&#xff1a;3 解釋&#xff1a;范圍 [1,2] 中的數字都在數組中…

e2studio開發RA2E1(17)---- ADC掃描多通道采樣

e2studio開發RA2E1.17-- ADC掃描多通道采樣 概述視頻教學樣品申請硬件準備參考程序源碼下載ADC屬性配置回調函數主程序演示結果 概述 在嵌入式系統中&#xff0c;ADC&#xff08;模數轉換器&#xff09;是一個非常重要的組件&#xff0c;它將模擬信號轉換為數字信號。為了提高…

FPGA標準庫-Open Logic

在現代技術發展的浪潮中&#xff0c;開源項目已經成為了推動技術創新和發展的核心力量。無論是人工智能、區塊鏈、云計算&#xff0c;還是傳統的嵌入式開發、操作系統&#xff0c;開源項目都在其中扮演著至關重要的角色。它們不僅促進了技術的快速迭代&#xff0c;也為全球開發…

FineReport 操作注意

1.父單元格重復的時候&#xff0c;如何取消合并 效果如下&#xff1a; 只需要在單元格中&#xff0c;將數據設置為【列表】即可。 2.待定

開源之夏經驗分享|Koupleless 社區黃興抗:在開源中培養工程思維

開源之夏經驗分享&#xff5c;Koupleless 社區黃興抗&#xff1a;在開源中培養工程思維 文|黃興抗 電子信息工程專業 Koupleless 社區貢獻者 就讀于南昌師范學院&#xff0c;電子信息工程專業的大三學生。 本文 2634 字&#xff0c;預計閱讀 7? 分鐘? 今天 SOFAStack 邀…

Ollama存在安全風險的情況通報及解決方案

據清華大學網絡空間測繪聯合研究中心分析&#xff0c;開源跨平臺大模型工具Ollama默認配置存在未授權訪問與模型竊取等安全隱患。鑒于目前DeepSeek等大模型的研究部署和應用非常廣泛&#xff0c;多數用戶使用Ollama私有化部署且未修改默認配置&#xff0c;存在數據泄露、算力盜…

線代[9]|線性代數主要內容及其發展簡史(任廣千《線性代數的幾何意義》的附錄1)

文章目錄 向量行列式矩陣線性方程組二次型 向量 向量又稱為矢量&#xff0c;最初應用與物理學。很多物理量如力、速度、位移以及電場強度、磁感應強度等等都是向量。大約公元前350年前&#xff0c;古希臘著名學者亞里士多德就知道了力可以表示成向量&#xff0c;兩個力的組合作…

H20半精度推理報錯:Floating point exception (core dumped)

Nvidia H20 顯卡在執行bf16&#xff0c;f16推理時程序異常中斷 時間是 2025年3月4日 課題組新到的8卡H20服務器在使用過程中&#xff0c;torch加載模型進行bf16的推理時&#xff0c;出現Floating point exception (core dumped)錯誤 當時一頭霧水&#xff0c;后來苦苦尋找&…

服務是否設置為開機自啟動

在 Linux 系統中&#xff0c;可以通過以下幾種方法檢查服務是否設置為開機自啟動&#xff1a; 方法 1&#xff1a;使用 systemctl 命令&#xff08;適用于 systemd 系統&#xff09; systemctl 是 systemd 系統的命令行工具&#xff0c;用于管理系統服務。以下是具體步驟&…

QT——基于 QListWidget 和 QStackedWidget 的頁面切換

Qt 練習題&#xff1a;基于 QListWidget 和 QStackedWidget 的頁面切換 Qt 練習題&#xff1a;基于 QListWidget 和 QStackedWidget 的頁面切換 題目描述&#xff1a; 請使用 Qt 設計一個窗口&#xff0c;其中包含一個 QListWidget 和一個 QStackedWidget。要求實現以下功能&a…