【RabbitMQ】----RabbitMQ 的7種工作模式

1.Simple(簡單模式)

??P:?產者,也就是要發送消息的程序

??C:消費者,消息的接收者

??Queue:消息隊列,圖中??背景部分.類似?個郵箱,可以緩存消息;?產者向其中投遞消息,消費者從其中取出消息.

??特點:?個?產者P,?個消費者C,消息只能被消費?次.也稱為點對點(Point-to-Point)模式.適?場景:消息只能被單個消費者處理

工作流程

1.生產者生成一條消息,并將其發送到交換機。
2.交換機根據消息屬性(在簡單模式下可能是固定的路由鍵或不需要路由鍵)將消息發送到指定的隊列。
3.消費者監聽并綁定到該隊列,當隊列中有消息時,消費者將其取出并處理。
4.消息被消費者處理后,會自動從隊列中刪除(除非設置了消息持久化或手動確認機制)。?

應用場景

簡單模式適用于那些需要一對一消息傳遞的場景,例如:

1.手機短信發送:一個生產者(如短信服務)將短信內容發送到交換機,交換機將其發送到指定的隊列,消費者(如短信網關)從隊列中接收短信并發送到用戶的手機上。
2.郵件單發:一個生產者(如郵件服務)將郵件內容發送到交換機,交換機將其發送到指定的隊列,消費者(如郵件發送器)從隊列中接收郵件并發送到指定的郵箱。

優點

1.結構簡單,易于理解和實現。
2.消息傳遞可靠,確保消息按照到達的順序被處理。

缺點

1.一個隊列只能被一個消費者監聽和消費,可能無法充分利用系統資源。
2.在高并發場景下,單個消費者可能無法及時處理所有消息,導致消息堆積。?

使用案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
編寫生產者代碼
創建連接
  //1.建立連接工廠ConnectionFactory factory = new ConnectionFactory();//2. 設置參數factory.setHost("182.92.242.181");factory.setPort(5672); //默認值 5672factory.setVirtualHost("bite");//虛擬機名稱, 默認 /factory.setUsername("study");//?戶名,默認guestfactory.setPassword("study");//密碼, 默認guest//3. 創建連接ConnectionConnection connection = factory.newConnection();
?創建Channel

?產者和消費者創建的channel并不是同?個

Channel channel = connection.createChannel();
聲明一個隊列
        /*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)*  參數說明:*  queue: 隊列名稱*  durable: 可持久化*  exclusive: 是否獨占*  autoDelete: 是否自動刪除*  arguments: 參數*/channel.queueDeclare("hello", true, false, false, null);
發送消息
        /*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 參數說明:* exchange: 交換機名稱* routingKey: 內置交換機, routingkey和隊列名稱保持一致* props: 屬性配置* body: 消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq~"+i;channel.basicPublish("","hello", null, msg.getBytes());}
釋放資源
    channel.close();connection.close();

注意:一定是先關閉通道后關閉連接

編寫消費者代碼
創建連接+創建Channel+聲明隊列
        ConnectionFactory factory = new ConnectionFactory();factory.setHost("182.92.242.181");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("bite");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("hello",true, false, false, null);
消費消息
        /*** basicConsume(String queue, boolean autoAck, Consumer callback)* 參數說明:* queue: 隊列名稱* autoAck: 是否自動確認* callback: 接收到消息后, 執行的邏輯*/DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume("hello", true, consumer);
釋放資源
    channel.close();connection.close();

2.Work Queue(工作隊列)

概念

在工作隊列模式中,一個生產者(producer)將任務發布到隊列中,多個消費者(consumer)從隊列中獲取任務并執行。這種模式的主要目標是提高任務的并行處理能力,從而提高系統的吞吐量和效率。

?個?產者P,多個消費者C1,C2.在多個消息的情況下,Work Queue 會將消息分派給不同的消費者,每個消費者都會接收到不同的消息.

?特點:消息不會重復,分配給不同的消費者.

????????適?場景:任務分發:將任務分發給多個工作者(消費者),以便并行處理。這對于需要高吞吐量和任務處理效率的應用程序非常有用。例如,圖像處理、視頻編碼、數據轉換等應用可以使用工作隊列模式來并行處理大量任務。

工作原理

1.生產者發送任務:生產者將任務封裝為消息,并將其發送到RabbitMQ隊列中。
2.RabbitMQ分發任務:RabbitMQ根據配置的分發策略(如輪詢或公平分發)將任務分發給消費者。
3.消費者處理任務:消費者從隊列中獲取任務并執行。在處理完任務后,消費者會向RabbitMQ發送確認消息,表示任務已完成。
4.RabbitMQ確認任務完成:在收到消費者的確認消息后,RabbitMQ會將該任務從隊列中移除。

注意事項

1.消息確認:為了確保消息不會丟失,消費者在處理完任務后需要向RabbitMQ發送確認消息。如果消費者在處理任務時失敗或崩潰,RabbitMQ會將該任務重新分發給其他消費者。
2.負載均衡:RabbitMQ默認采用輪詢方式將消息分發給消費者。如果需要更復雜的負載均衡策略,可以考慮使用其他分發策略或自定義交換機類型。
3.錯誤處理:在生產者和消費者中都需要添加適當的錯誤處理邏輯,以處理可能出現的異常情況,如連接失敗、消息發送失敗等。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//工作隊列模式public static final String WORK_QUEUE = "work.queue";
}
編寫生產者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 發送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息發送成功~");//6. 資源釋放channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 資源釋放
//        channel.close();
//        connection.close();}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//        //6. 資源釋放
//        channel.close();
//        connection.close();}
}

先運行生產者,后運行消費者

查看管理界面

我們此時會看到,先啟動的消費者會消費掉隊列中所有的消息。

先運行消費者,后運行生產者

此時我們能看到,兩個消費者都能夠消費消息。

3.Publish/Subscribe(發布/訂閱)

概念
RabbitMQ的發布訂閱模式(Publish/Subscribe)是一種消息傳遞模式,它允許消息生產者(Publisher)將消息發送到交換機(Exchange),然后交換機根據路由規則將消息廣播到一個或多個隊列,最后由消費者(Subscriber)從隊列中接收并處理消息。

圖中X表?交換機, 在訂閱模型中,多了?個Exchange??, 過程略有變化。

概念介紹

Exchange: 交換機 (X).
作?: ?產者將消息發送到Exchange, 由交換機將消息按?定規則路由到?個或多個隊列中(上圖中?產者將消息投遞到隊列中, 實際上這個在RabbitMQ中不會發?. )

RabbitMQ交換機有四種類型: fanout,direct, topic, headers, 不同類型有著不同的路由策略. AMQP協議?還有另外兩種類型, System和?定義, 此處不再描述.

1. Fanout:?播,將消息交給所有綁定到交換機的隊列(Publish/Subscribe模式)
2. Direct:定向,把消息交給符合指定routing key的隊列(Routing模式)
3. Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列(Topics模式)

Exchange(交換機)只負責轉發消息, 不具備存儲消息的能?, 因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息就會丟失
RoutingKey: 路由鍵.?產者將消息發給交換器時, 指定的?個字符串, ?來告訴交換機應該如何處理這個消息.
Binding Key:綁定. RabbitMQ中通過Binding(綁定)將交換器與隊列關聯起來, 在綁定的時候?般會指定?個Binding Key, 這樣RabbitMQ就知道如何正確地將消息路由到隊列了.


?如下圖: 如果在發送消息時, 設置了RoutingKey 為orange, 消息就會路由到Q1

當消息的Routing key與隊列綁定的Bindingkey相匹配時,消息才會被路由到這個隊列.

BindingKey其實也屬于路由鍵中的?種, 官?解釋為:the routingkey to use for the binding.
可以翻譯為:在綁定的時候使?的路由鍵. ?多數時候,包括官??檔和RabbitMQJava API 中都把
BindingKey和RoutingKey看作RoutingKey, 為了避免混淆,可以這么理解:

1. 在使?綁定的時候,需要的路由鍵是BindingKey.
2. 在發送消息的時候,需要的路由鍵是RoutingKey

特點和優勢

1.解耦合:生產者和消費者之間通過交換機進行解耦。生產者無需知道消息將被傳遞到哪些隊列,消費者也無需知道消息來自哪個生產者。這種解耦合使得系統更加靈活和可擴展。
2.多播:支持多個消費者同時處理同一條消息,實現消息的多播效果。這有助于提高系統的并行處理能力和容錯性。
3.靈活性:可以根據需要使用不同類型的交換機和綁定規則,以滿足不同的消息傳遞需求。RabbitMQ提供了多種交換機類型,如直接交換機、扇形交換機、主題交換機等。

應用場景

發布訂閱模式適用于需要將消息廣播給多個消費者的場景,例如:

1.實時通知:如系統狀態更新、訂單狀態變更等實時事件的通知。通過發布訂閱模式,可以將這些事件廣播給所有感興趣的消費者。
2.日志記錄:將應用程序的日志信息廣播到多個日志處理服務進行處理和存儲。這有助于實現日志的集中管理和分析。
3.事件處理:在事件驅動架構中,將事件作為消息發布到交換機,由多個消費者訂閱并處理這些事件。這有助于實現事件的異步處理和分布式處理。

注意事項

1.消息持久化:為了確保消息在RabbitMQ服務器重啟后不會丟失,可以將消息和隊列標記為持久性。這樣,即使服務器發生故障,消息仍然可以被消費者接收和處理。
2.消息確認:RabbitMQ支持消息確認機制,確保消息在成功處理后才會從隊列中刪除。這有助于防止消息丟失和重復處理。
3.負載均衡:在發布訂閱模式中,多個消費者可以監聽同一個隊列或不同的隊列。為了實現負載均衡,可以配置多個消費者來處理同一個隊列中的消息。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//發布訂閱模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";
}
編寫生產者代碼
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明交換機channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4. 聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5. 交換機和隊列綁定channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6. 發布消息String msg = "hello fanout....";channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());System.out.println("消息發送成功");//7. 釋放資源channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

由此我們可以看到,交換機收到的生產者生產的一條消息被廣播到了兩個隊列,消費者都能夠分別從這兩個隊列中得到一條消息并消費。?

4.Routing(路由模式)

?路由模式是發布訂閱模式的變種,在發布訂閱基礎上,增加路由 key?發布訂閱模式是?條件的將所有消息分發給所有消費者,路由模式是 Exchange 根據 RoutingKey 的規則,將數據篩選后發給對應的消費者隊列 適合場景:需要根據特定規則分發消息的場景.

?????如系統打印?志,?志等級分為 error,warning,info,debug,就可以通過這種模式,把不同的?志發送到不同的隊列,最終輸出到不同的文件

工作原理

1.生產者發送消息:生產者將消息發送到RabbitMQ的交換機,并指定一個或多個路由鍵。
2.交換機根據路由鍵路由消息:交換機接收消息后,根據消息的路由鍵和綁定規則,將消息路由到與之匹配的隊列中。匹配規則由交換機的類型和綁定規則決定。
3.消費者監聽隊列:消費者可以選擇監聽特定的隊列或多個隊列,以接收他們感興趣的消息。
4.消息處理:消費者從隊列中接收消息,并進行相應的處理。

特點

1.靈活路由:生產者可以根據需要指定不同的路由鍵來發送消息,交換機根據路由鍵將消息路由到不同的隊列。
2.定向傳遞:消息只會被發送到與之匹配的隊列中,消費者只需關注他們感興趣的消息,而不需要接收所有的消息。
3.精確控制:通過定義不同的路由規則,RabbitMQ的路由模式可以實現各種復雜的消息傳遞需求,如日志級別過濾、消息過濾等。

實現步驟

1.創建交換機:創建一個類型為Direct的交換機。
2.創建隊列:創建多個隊列,用于存儲不同類型的消息。
3.綁定隊列和交換機:使用路由鍵將隊列和交換機進行綁定。
4.發送消息:生產者將消息發送到交換機,并指定路由鍵。
5.接收消息:消費者監聽特定的隊列,接收并處理消息。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";
}
編寫生產者代碼
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 路由模式生產者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明交換機channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//4. 聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5. 綁定交換機和隊列channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6. 發送消息String msg = "hello direct, my routingkey is a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());String msg_b = "hello direct, my routingkey is b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());String msg_c = "hello direct, my routingkey is c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());System.out.println("消息發送成功");//7. 釋放資源channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}
運行代碼

查看管理界面,可以看到一個隊列消息數目為1,另一條隊列消息數目為3

5.Topics(通配符模式)

路由模式的升級版, 在routingKey的基礎上,增加了通配符的功能, 使之更加靈活.

Topics和Routing的基本原理相同,即:?產者將消息發給交換機,交換機根據RoutingKey將消息轉發給與RoutingKey匹配的隊列. 類似于正則表達式的?式來定義Routingkey的模式.

不同之處是:routingKey的匹配?式不同,Routing模式是相等匹配,topics模式是通配符匹配.

應用場景

RabbitMQ的通配符模式在需要根據消息的特定屬性進行路由和過濾的場景中非常有用。例如,在一個日志系統中,可以使用通配符模式來將不同級別的日志消息路由到不同的隊列中,以便進行不同的處理和分析。

優勢

通配符模式的優勢在于它可以靈活地匹配消息,使得消息可以根據不同的條件進行過濾和選擇。通過合理地定義綁定和路由鍵,可以實現復雜的消息過濾和路由策略,提高系統的靈活性和性能。?

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic_queue1";public static final String TOPIC_QUEUE2 = "topic_queue2";
}
編寫生產者代碼
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 通配符模式生產者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明交換機channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//4. 聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5. 綁定交換機和隊列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6. 發送消息String msg = "hello topic, my routingkey is ae.a.f....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //轉發到Q1String msg_b = "hello topic, my routingkey is ef.a.b....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //轉發到Q1和Q2String msg_c = "hello topic, my routingkey is c.ef.d....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//轉發Q2System.out.println("消息發送成功");//7. 釋放資源channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}
運行代碼

觀察管理界面可以看到兩個隊列都各自收到了2條消息,與預期符合。

6.RPC(RPC通信)

在 RPC 通信的過程中,沒有?產者和消費者,?較像咱們 RPC 遠程調?,?概就是通過兩個隊列實現了?個可回調的過程

1.客戶端發送請求:

  • 客戶端連接到RabbitMQ服務器。
  • 客戶端聲明一個用于發送RPC請求的隊列(通常是固定的,如rpc_queue)。
  • 客戶端創建一個臨時的回調隊列,并在發送請求時,將回調隊列的名稱作為消息屬性(reply_to)發送給交換機。
  • 客戶端為每個請求生成一個唯一的correlation_id,并將其作為消息屬性發送,以便在接收響應時能夠匹配請求與響應。

2.交換機路由請求:

交換機接收到RPC請求后,根據路由鍵將請求路由到服務端監聽的隊列。

3.服務端處理請求:

服務端(消費者)從隊列中接收請求。
服務端處理請求,并生成響應。
服務端將響應發送到客戶端指定的回調隊列,并在消息屬性中設置相同的correlation_id。

4.客戶端接收響應:

客戶端監聽其回調隊列以接收響應。
當接收到響應時,客戶端檢查correlation_id以確定響應是否與之前的請求匹配。
如果匹配,客戶端處理響應;如果不匹配,客戶端可能丟棄該響應。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}

編寫客戶端代碼

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;/*** rpc 客戶端* 1. 發送請求* 2. 接收響應*/
public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3. 發送請求String msg = "hello rpc...";//設置請求的唯一標識String correlationID = UUID.randomUUID().toString();//設置請求的相關屬性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());//4. 接收響應//使用阻塞隊列, 來存儲響應信息final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回調消息: "+ respMsg);if (correlationID.equals(properties.getCorrelationId())){//如果correlationID校驗一致response.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String result = response.take();System.out.println("[RPC Client 響應結果]:"+ result);}
}
編寫服務端代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RPC server* 1. 接收請求* 2. 發送響應*/
public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 接收請求channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body,"UTF-8");System.out.println("接收到請求:"+ request);String response = "針對request:"+ request +", 響應成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}
運行程序(先運行客戶端,再運行服務端)

可以在管理界面看到其中一個隊列中有1條消息

7.Publisher Confirms(發布確認)

發布確認模式
概述

發布確認模式用于確保消息已經被正確地發送到RabbitMQ服務器,并被成功接收和持久化。通過使用發布確認,生產者可以獲得對消息的可靠性保證,避免消息丟失。這一機制基于通道(Channel)級別,通過兩個階段的確認來保證消息的可靠性。

消息丟失問題

作為消息中間件, 都會?臨消息丟失的問題.
消息丟失?概分為三種情況:

1. ?產者問題. 因為應?程序故障, ?絡抖動等各種原因, ?產者沒有成功向broker發送消息.
2. 消息中間件??問題. ?產者成功發送給了Broker, 但是Broker沒有把消息保存好, 導致消息丟失.
3. 消費者問題. Broker 發送消息到消費者, 消費者在消費消息時, 因為沒有處理好, 導致broker將消費失敗的消息從隊列中刪除了

RabbitMQ也對上述問題給出了相應的解決?案. 問題2可以通過持久化機制. 問題3可以采?消息應答機制.
針對問題1, 可以采?發布確認(Publisher Confirms)機制實現.?

發布確認的三種模式

RabbitMQ的發布確認模式主要有三種形式:單條確認、批量確認和異步確認。

單條確認(Single Publisher Confirm)

特點:在發布一條消息后,等待服務器確認該消息是否成功接收。
優點:實現簡單,每條消息的確認狀態清晰。
缺點:性能開銷較大,特別是在高并發的場景下,因為每條消息都需要等待服務器的確認。

批量確認(Batch Publisher Confirm)

特點:允許在一次性確認多個消息是否成功被服務器接收。
優點:在大量消息的場景中可以提高效率,因為可以減少確認消息的數量。
缺點:當一批消息中有一條消息發送失敗時,整個批量確認失敗。此時需要重新發送整批消息,但不知道是哪條消息發送失敗,增加了調試和處理的難度。

異步確認(Asynchronous Confirm)

特點:通過回調函數處理消息的確認和未確認事件,更加靈活。
優點:在異步場景中能夠更好地處理消息的狀態,提高了系統的并發性能和響應速度。
缺點:實現相對復雜,需要處理回調函數的邏輯和狀態管理。

實現步驟
1.設置通道為發布確認模式:在生產者發送消息之前,需要將通道設置為發布確認模式。這可以通過調用channel.confirmSelect()方法來實現。
2.發送消息并等待確認:生產者發送消息時,每條消息都會分配一個唯一的、遞增的整數ID(DeliveryTag)。生產者可以通過調用channel.waitForConfirms()方法來等待所有已發送消息的確認,或者通過其他方式處理確認回調。
3.處理確認回調:為了處理確認回調,需要創建一個ConfirmCallback接口的實現。在實現的handleAck()方法中,可以處理成功接收到確認的消息的邏輯;在handleNack()方法中,可以處理未成功接收到確認的消息的邏輯。

應用場景

發布確認模式適用于對數據安全性要求較高的場景,如金融交易、訂單處理等。在這些場景中,消息的丟失或重復都可能導致嚴重的業務問題。通過使用發布確認模式,可以確保消息被正確地發送到RabbitMQ服務器,并被成功接收和持久化,從而提高了系統的可靠性和穩定性。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "182.92.242.181";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//publisher confirmspublic static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
}
單條確認
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 100;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #1: Publishing Messages Individually//單獨確認publishingMessagesIndividually();}/*** 單獨確認*/private static void publishingMessagesIndividually() throws Exception {try(Connection connection = createConnection()) {//1. 開啟信道Channel channel = connection.createChannel();//2. 設置信道為confirm模式channel.confirmSelect();//3. 聲明隊列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 發送消息, 并等待確認long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待確認channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("單獨確認策略, 消息條數: %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);}}
}
運行代碼

批量確認
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 10000;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #2: Publishing Messages in Batches//批量確認publishingMessagesInBatches();}/*** 批量確認* @throws Exception*/private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 開啟信道Channel channel = connection.createChannel();//2. 設置信道為confirm模式channel.confirmSelect();//3. 聲明隊列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 發送消息, 并進行確認long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量確認策略, 消息條數: %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);}}
}
運行代碼
異步確認
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 10000;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #3: Handling Publisher Confirms Asynchronously//異步確認handlingPublisherConfirmsAsynchronously();}/*** 異步確認*/private static void handlingPublisherConfirmsAsynchronously() throws Exception{try (Connection connection = createConnection()){//1. 開啟信道Channel channel = connection.createChannel();//2. 設置信道為confirm模式channel.confirmSelect();//3. 聲明隊列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 監聽confirm//集合中存儲的是未確認的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//業務需要根據實際場景進行處理, 比如重發, 此處代碼省略}});//5. 發送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("異步確認策略, 消息條數: %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);}}
}

綜上,速度上:異步>批量>單條

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/98373.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/98373.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/98373.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

今日分享:C++ -- list 容器

&#x1f60e;【博客主頁&#xff1a;你最愛的小傻瓜】&#x1f60e; &#x1f914;【本文內容&#xff1a;C list容器 &#x1f60d;】&#x1f914; --------------------------------------------------------------------------------------------------------------------…

【Python】數據可視化之分布圖

分布圖主要用來展示某些現象或數據在地理空間、時間或其他維度上的分布情況。它可以清晰地反映出數據的空間位置、數量、密度等特征&#xff0c;幫助人們更好地理解數據的內在規律和相互關系。 目錄 單變量分布 變量關系組圖 雙變量關系 核密度估計 山脊分布圖 單變量分布…

DDD+WebAPI實戰

DDD+WebAPI實戰 DDD(領域驅動設計,Domain-Driven Design)是一種面向對象的設計方法,它強調將業務邏輯封裝在模型中,并通過這些模型來驅動整個應用的設計。在.NET環境中,特別是在使用ASP.NET Core和Web API構建應用時,DDD可以幫助我們更好地組織代碼,使得業務邏輯更加清…

人力資源管理的思維方法學習筆記1

北京師范大學政府管理學院1.課程介紹&#xff1a; 講述視角上&#xff0c;本課程側重人力資源管理的思維方式&#xff0c;即人力資源管理理論和時間的不同視角和主導范式的分析。這既是對人力資源管理理論發展的凝練&#xff0c;也是對人力資源管理實踐演進過程的總結。對于把握…

適應新環境:Trae編輯器下的IDEA快捷鍵定制

介紹&#xff1a;學習如何在Trae編輯器中配置IntelliJ IDEA風格的快捷鍵&#xff0c;減少開發環境間的切換成本&#xff0c;提升編碼效率。通過安裝插件或手動調整&#xff0c;讓你更快適應新工具大家好&#xff0c;我是凱哥Java本文標簽&#xff1a;代碼編輯效率、Trae快捷鍵、…

基于YOLO8的汽車碰撞事故檢測系統【數據集+源碼+文章】

基于YOLOv8和Streamlit的汽車碰撞事故檢測系統 文末附下載地址 開發目的 隨著城市化進程的加快和機動車保有量的持續攀升&#xff0c;道路交通安全問題日益突出&#xff0c;汽車碰撞事故頻發不僅嚴重威脅駕乘人員的生命安全&#xff0c;也對公共秩序、應急響應效率及交通管理…

Unity FARO 測量臂:從零構建實時數字孿生系統

前言:當精準測量遇見實時渲染 在高端制造、質量檢測和逆向工程領域,法奧 (FARO) 測量臂是精準的代名詞。它能以亞毫米級的精度捕捉現實世界中的三維坐標。現在,想象一下,如果我們將這種精度與 Unity 的強大實時渲染能力結合起來,會發生什么? 我們將得到一個數字孿生 (D…

延遲 隊列

概念 延遲隊列顧名思義就是消息不立即發送給消費者消費&#xff0c;而是延遲一段時間再交給消費者。 RabbitMQ本身沒有直接支持延遲隊列的的功能&#xff0c;但是可以通過前面所介紹的TTL死信隊列的方式組合 模擬出延遲隊列的功能. RabbitMQ 有些版本還支持延遲隊列的插件安…

Windows+Docker一鍵部署CozeStudio私有化,保姆級

在 ?Windows環境? 下&#xff0c;通過docker&#xff0c;使用 ?火山引擎Doubao-Seed-1.6模型&#xff0c;面向 ?小白新手? 的 ?Coze Studio私有化部署詳細步驟。整個過程分為四大階段&#xff0c;包含每一步的指令、成功標志。 Coze Studio 私有化部署指南&#xff08;W…

【HEMCO Reference Guide 參考指南第二期】配置文件的結構和語法

配置文件的結構和語法 HEMCO 配置文件的結構和語法(The HEMCO configuration file) 1. Settings(設置) 2. Extension Switches(擴展模塊開關) 3. Base Emissions(基礎排放配置) 4. Scale Factors(縮放因子) 5. Masks(掩膜區域) 6. Data Collections(數據集合) 參…

01.單例模式基類模塊

一、單例模式的構成1、私有的靜態成員變量2、公共的靜態成員屬性或方法3、私有構造函數using System.Collections; using System.Collections.Generic; using UnityEngine;public class BaseManager : MonoBehaviour {void Start(){}// Update is called once per framevoid Up…

[網絡入侵AI檢測] 深度前饋神經網絡(DNN)模型

第4章&#xff1a;深度前饋神經網絡&#xff08;DNN&#xff09;模型 歡迎回來&#x1f43b;??? 在第1章&#xff1a;分類任務配置&#xff08;二分類 vs. 多分類&#xff09;中&#xff0c;我們學習了如何配置模型以回答不同類型的問題&#xff1b;在第2章&#xff1a;數…

【目錄-多選】鴻蒙HarmonyOS開發者基礎

All look at the answer 針對包含文本元素的組件&#xff0c;例如Text、Button、TextInput等&#xff0c;可以使用下列哪些屬性關于ForEach(arr, itemGenerator, index)組件的描述正確的是下面哪些容器組件是可以滾動的關于Tabs組件和TabContent組件&#xff0c;下列描述正確的…

第一講 Vscode+Python+anaconda 安裝

1、vscode下載和安裝官網下載最新版&#xff1a;https://code.visualstudio.com/Download注&#xff1a;文件夾最好不要出現中文和空格 2、將vscode修改為中文環境注意&#xff1a;右下角彈出提示框&#xff0c;點擊“yes”若不慎關閉了對話框&#xff0c;也不要緊&#xff0c;…

《sklearn機器學習——回歸指標2》

均方對數誤差&#xff08;mean_squared_log_error函數&#xff09; mean_squared_log_error函數計算與平方&#xff08;二次方&#xff09;對數誤差或損失的期望值相一致的風險指標。 Mean Squared Logarithmic Error 參數與返回值 函數簡介 mean_squared_log_error 是用于計算…

當電力設計遇上AI:良策金寶AI如何重構行業效率邊界?

在工程設計行業&#xff0c;我們常說“經驗為王”。一個資深工程師的價值&#xff0c;往往體現在他對規范的熟悉、對計算的把握、對圖紙的掌控。但今天&#xff0c;這個“王座”正在被重新定義。不是經驗不重要了&#xff0c;而是——效率的邊界&#xff0c;正在被AI重構。以良…

【深度學習】重采樣(Resampling)

在深度學習的背景下&#xff0c;重采樣主要涉及兩個方面&#xff1a; 數據層面的重采樣&#xff1a;處理不平衡數據集。模型層面的重采樣&#xff1a;在神經網絡內部進行上采樣&#xff08;UpSampling&#xff09;或下采樣&#xff08;DownSampling&#xff09;&#xff0c;常見…

計算機實現乘法運算的方式---ChatGPT 5 thinking作答

計算機如何實現“乘法” 下面分層次把乘法在數據表示 → 整數硬件/軟件 → 大整數 → 浮點數 → 特殊場景里的主流實現方式講清楚&#xff0c;并給出取舍建議與簡單偽代碼。0&#xff09;前置&#xff1a;數的表示 無符號整數&#xff1a;按二進制位權求值。有符號整數&#xf…

Ubuntu 安裝 / 配置 VNC

一、基礎環境準備 1. 更新 sudo apt update 2. 安裝 VNC 服務器 & 輕量桌面(XFCE) # 安裝 TightVNC 服務器 + XFCE 桌面(推薦輕量方案) sudo apt install tightvncserver xfce4 xfce4-goodies xterm -y二、核心配置:讓 VNC 加載桌面環境 1. 初始化 VNC 密碼(首次…

計算機大數據畢業設計推薦:基于Spark的新能源汽車保有量可視化分析系統

精彩專欄推薦訂閱&#xff1a;在下方主頁&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f496;&#x1f525;作者主頁&#xff1a;計算機畢設木哥&#x1f525; &#x1f496; 文章目錄 一、項目介紹二、…