目錄
一. 作用:
二. RabbitMQ的5中隊列模式:
1. 簡單模式
2. Work模式
3. 發布/訂閱模式
4. 路由模式
5. 主題模式
三. 消息持久化:
消息過期時間
ACK應答?
四. 同步接收和異步接收:
應用場景
五. 基本使用 :
引入依賴庫:
配置文件RabbitMQConfig:?
創建消息任務類:?
解析:
一. 作用:
????????RabbitMQ主要用于消息隊列的實現。
二. RabbitMQ的5中隊列模式:
1. 簡單模式
一個生產者(發送方)對應一個消費者(接收方)
2. Work模式
一個生產者對應多個消費者,但是只能有一個消費者獲得消息(排他)
3. 發布/訂閱模式
一個消費者將消息首先發送到fanout交換器,交換器綁定到多個隊列,然后與之對應的所有消費者都能接收到消息(不排他)
4. 路由模式
生產者將消息發送到direct交換器,交換器按照關鍵字(Key),把消息路由到某個隊列
5. 主題模式
生產者將消息發送到Topic交換器,交換器按照復雜的規則,把消息路由到某個隊列
三. 消息持久化:
????????消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數據。除了消息持久化之外,甚至交換器和隊列都能持久化。也就是說rabbitmq的消息會被存儲在磁盤上,只有當消費收到消息,rabbitmq確認消費者收到消息(Acknowledgments--簡稱ACK)后才會將消息從隊列中刪除。??
-
消息過期時間
? ? ? ? 如果消費者一直不接收消息,消息會一直保存在消息隊列當中,短期內可能不會有什么影響,但是如果經過長時間的積累后,消息會變得很多很多?,浪費大量的資源,內存。
? ? ? ? 為了應對這種情況,就可以對rabbitmq設置消息的過期時間,在規定時間內消息沒有被接收,就會刪除掉該消息。
-
ACK應答?
? ? ? ? 消費者接收到消息后,為了讓RabbitMQ?知道,就需要返回一個ACK應答,告訴RabbitMQ消費者已經收到了消息,如果收到消息后我們需要刪除該消息,只需要在ACK應答中加上deliveryTag標志位。
四. 同步接收和異步接收:
? ? ? ? 同步接收:指消費者調用方法時,會阻塞來等待消息,直到消息被成功消費或者隊列為空。(沒有消息等待消息再接著處理)。
? ? ? ? 異步接收:?指消費者不會在接收消息時阻塞,而是通過回調函數處理消息。消費者在等待消息的同時不會停下,可以處理其他任務。(當有消息時才來處理消息)。
-
應用場景
????????同步接收 :當消息的處理順序對業務邏輯非常重要,就使用同步接收,消費者一次只處理一個消息,確保了每條消息的處理順序。
? ? ? ? 異步接收:當處理消息的時間比較長,或者系統的并發量大時,采用異步接收會更好。
RabbitMQ還有一個殺手锏——同時使用異步收發和同步收發。
五. 基本使用 :
引入依賴庫:
<dependency>
?? ?<groupId>com.rabbitmq</groupId>
? ? <artifactId>amqp-client</artifactId>
? ? <version>5.9.0</version>
</dependency>
<dependency>
?? ?<groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>?
配置文件RabbitMQConfig:?
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Value("${rabbitmq.factoryHost}")private String host;@Beanpublic ConnectionFactory connectionFactory() {ConnectionFactory factory = new ConnectionFactory();factory.setHost(host);factory.setPort(5672);return factory;}
}
?host配置,我將rabbitMQ放在虛擬機上的,所有ip是虛擬機的地址:
創建消息任務類:?
@Slf4j
@Component
public class MessageTask {@Autowiredprivate ConnectionFactory factory;@Autowiredprivate MessageService messageService;/** 同步發送消息* */public void send(String topic, MessageEntity entity) {//向MongoDB保存消息數據,返回消息IDString id = messageService.insertMessage(entity);//向RabbitMQ發送消息try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//連接到某個topicchannel.queueDeclare(topic, true, false, false, null);HashMap header = new HashMap();header.put("messageId",id);//創建AMQP協議參與對象,添加附加屬性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();channel.basicPublish("",topic,properties,entity.getMsg().getBytes());log.debug("消息發送成功");} catch (Exception e){log.error(e.getMessage());throw new EmosException("向MQ發送消息失敗");}}/** 異步發送消息* */@Async("AsyncTaskExecutor")public void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/** 同步接收消息* */public int receive(String topic) {int i = 0;try (//接收消息數據Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 從隊列中獲取消息,不自動確認channel.queueDeclare(topic, true, false, false, null);//Topic中有多少條數據未知,所以使用死循環接收數據,直到接收不到消息,退出死循環while (true) {//創建響應接收數據,禁止自動發送Ack應答GetResponse response = channel.basicGet(topic, false);if (response != null) {AMQP.BasicProperties properties = response.getProps();Map<String, Object> header = properties.getHeaders(); //獲取附加屬性對象String messageId = header.get("messageId").toString();byte[] body = response.getBody();//獲取消息正文String message = new String(body);log.debug("從RabbitMQ接收的消息:" + message);MessageRefEntity entity = new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);messageService.insertRef(entity); //把消息存儲在MongoDB中//數據保存到MongoDB后,才發送Ack應答,讓Topic刪除這條消息long deliveryTag = response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i++;} else {break; //接收不到消息,則退出死循環}}} catch (Exception e) {log.error("執行異常", e);}return i;}/** 異步接收消息* */@Asyncpublic int receiveAsync(String topic) {return receive(topic);}/** 同步刪除消息* */public void deleteQueue(String topic) {try(//接收消息數據Connection connection = factory.newConnection();Channel channel = connection.createChannel()){channel.queueDelete(topic);log.debug("成功刪除消息隊列:"+topic);} catch (Exception e){log.error("刪除消息隊列失敗:",e);throw new EmosException("刪除消息隊列失敗");}}/** 異步刪除消息* */@Asyncpublic void deleteAsync(String topic) {deleteQueue(topic);}
}
解析:
channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
- queueName:隊列的名稱,用于標識消息的存儲位置。
- durable:
????????true,表示隊列是持久化的。
????????false,表示隊列是非持久化的。
- exclusive:
????????true:隊列僅供當前連接使用,連接斷開時隊列會自動刪除。
????????false:隊列可供多個連接共享。
-
autoDelete:
true:當隊列不再被任何消費者訂閱時,隊列會自動刪除。
false:隊列即使沒有消費者訂閱也會一直存在,直到手動刪除。 - arguments:額外的參數,null表示沒有額外參數
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 設置隊列中消息的過期時間為 60 秒(60000 毫秒)channel.queueDeclare("myQueue", true, false, false, arguments);