一. 簡介
1> 什么是MQ
消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不同進程Process/線程Thread之間通信。
那么為什么會產生消息隊列呢?有幾個原因:
-
不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個;
-
不同進程(process)之間傳遞消息時,為了實現標準化,將消息的格式規范化了,并且,某一個進程接受的消息太多,一下子無法處理完,并且也有先后順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列;
MQ框架非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。本文主要介紹RabbitMq。
2> 什么是RabbitMQ
RabbitMQ 是一個消息代理:它接受和轉發消息。您可以將其視為郵局:當您將要寄的郵件放入郵箱時,您可以確信信使最終會將郵件發送給您的收件人。在本例中,RabbitMQ 是郵箱、郵局和信使。
RabbitMQ 與郵局的主要區別在于它不處理紙張,而是接受、存儲和轉發二進制數據塊——消息。
RabbitMQ 和一般意義上的消息傳遞使用了一些術語。
-
生產_僅僅意味著發送。發送消息的程序稱為_生產者
-
_隊列_是 RabbitMQ 中郵箱的名稱。雖然消息會流經 RabbitMQ 和您的應用程序,但它們只能存儲在_隊列_中。_隊列_僅受主機內存和磁盤限制的約束,它本質上是一個大型消息緩沖區。許多_生產者_可以發送消息到一個隊列,并且許多_消費者_可以嘗試從一個_隊列_中接收數據。這就是我們表示隊列的方式
其是實現 AMQP(高級消息隊列協議)的消息中間件的一種,最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那么需要一個中間層。保存這個數據。
AMQP,即 Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的 AMQP 實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
3> 相關概念
通常我們談到隊列服務,會有三個概念:發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上,多做了一層抽象,在發消息者和隊列之間,加入了交換器 (Exchange)。這樣發消息者和隊列就沒有直接聯系,轉而變成發消息者把消息給交換器,交換器根據調度策略再把消息給隊列。那么,其中比較重要的概念有4個,分別為:虛擬主機,交換機,隊列,和綁定。
- 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單, RabbitMQ 當中,用戶只能在虛擬主機的 粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ 服務器 都有一個默認的虛擬主機“/”。
- 交換機:Exchange 用于轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的 消息。這里有一個比較重要的概念:路由鍵。消息到交換機的時候,交互機會轉發到對應的隊列中,那么究竟轉發到哪個隊列,就要根據
該路由鍵。 - 綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。
二. 實現
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常簡單,如果只是簡單的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 項目對消息各種支持。
1. 簡單使用
1>配置 pom 包,主要是添加 spring-boot-starter-amqp 的支持
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2>配置文件application.yml
配置 RabbitMQ 的安裝地址、端口以及賬戶信息
# 配置文件
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456# RabbitMQ配置rabbitmq:host: 192.168.146.1port: 5672username: adminpassword: 123456
我這里還配置了數據庫
3>隊列配置
package com.nianxi.mybatisplus.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}
}
4>發送者
rabbitTemplate 是 Spring Boot 提供的默認實現
package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}
}
5>接收者
package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver : " + hello);}
}
6> 測試
package com.nianxi.mybatisplus;import com.nianxi.mybatisplus.mapper.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() throws Exception {helloSender.send();}
}
注意:發送者和接收者的 queue name 必須一致,不然不能接收
2.RabbitTemplate
**RabbitTemplate
**是SpringAMQP提供的一個高級消息操作模板,**用于在與RabbitMQ進行交互時進行消息的發送和接收操作。**它是對底層AMQP協議的封裝,簡化了與RabbitMQ的交互過程, 是SpringAMQP中的核心類,提供聲明式方式處理RabbitMQ,包括發送和接收消息、消息轉換、屬性設置及回調機制。通過配置和正確使用,簡化了RabbitMQ的集成與操作。
1> 發送消息
**RabbitTemplate
**提供了多種發送消息的方法,包括同步發送和異步發送。通過指定交換機、路由鍵和消息體,我們可以將消息發送到 RabbitMQ 服務器上的指定位置。此外,RabbitTemplate
還支持消息的確認機制,以確保消息被成功發送和接收。
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
2> 接收消息
除了發送消息外,**RabbitTemplate
**還提供了接收消息的功能。通過調用相關方法,我們可以從指定的隊列中接收消息,并進行相應的處理。這通常涉及到監聽隊列、處理消息和確認消息接收等步驟。
Message receivedMessage = rabbitTemplate.receive("queueName");
MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
3> 消息轉換
**RabbitTemplate
支持消息的自動轉換。這意味著我們可以將 Java 對象作為消息體發送,而RabbitTemplate
會自動將其轉換為可序列化的格式(如 JSON 或 XML)。同樣地,當從隊列中接收消息時,RabbitTemplate
**也可以自動將消息體轉換回 Java 對象。
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(messageConverter);
4> 消息屬性設置
在發送消息時,我們可以設置各種消息屬性,如消息的優先級、持久化標志、過期時間等。這些屬性可以通過**MessageProperties
對象進行設置,并在發送消息時傳遞給RabbitTemplate
**。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; @Service
public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) { // 創建MessageProperties MessageProperties properties = new MessageProperties(); // 設置優先級,值范圍0-9,其中0為最低優先級,9為最高優先級 properties.setPriority(priority); // 設置消息持久化 properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); // 設置消息的過期時間,單位為毫秒 properties.setExpiration(String.valueOf(ttl)); // 使用MessageBuilder構建Message對象 Message msg = MessageBuilder.withBody(message.getBytes()) .setContentEncoding("UTF-8") .setContentType("text/plain") .setMessageId(UUID.randomUUID().toString()) // 可選,設置消息ID .setTimestamp(new Date()) // 可選,設置時間戳 .setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可選,設置自定義頭 .andProperties(properties) .build(); // 發送消息 rabbitTemplate.convertAndSend(exchange, routingKey, msg); }
}
5> 回調機制
**RabbitTemplate
**支持發送消息時的回調機制。這意味著在發送消息后,我們可以注冊一個回調函數來處理發送結果或接收響應。這對于需要異步處理發送結果或接收響應的場景非常有用。
**setConfirmCallback
方法是RabbitTemplate
**類中的一個回調方法,用于處理消息的確認(acknowledgment)結果。當消息成功發送到RabbitMQ的交換機時,會觸發確認回調,你可以在該回調中處理相應的邏輯。
-
correlationData
:關聯數據,可以是任意類型的對象,通常用于唯一標識消息。 -
ack
:布爾值,表示消息是否成功發送到交換機。true
表示成功,false
表示失敗。 -
cause
:失敗的原因,當ack
為false
時,此參數會提供一個可選的異常信息。rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息發送成功
System.out.println(“Message sent successfully”);
} else {
// 消息發送失敗,進行處理
System.out.println("Message sent failed: " + cause);
}
});
6> 異步消息處理
RabbitTemplate
支持異步消息處理,你可以注冊ConfirmCallback
和ReturnCallback
來處理消息的確認和返回結果。ConfirmCallback
用于確認消息是否成功發送到交換機,ReturnCallback
用于處理無法路由到隊列的消息。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// 消息發送成功} else {// 消息發送失敗,進行處理}
});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 處理無法路由到隊列的消息
});
3.使用 RabbitTemplate 的步驟
1> 配置 RabbitTemplate
在使用**RabbitTemplate
**之前,我們需要對其進行配置。這通常涉及到設置連接工廠、交換機、隊列和綁定等。這些配置可以通過 XML 配置或 Java 配置完成。
2> 創建 RabbitTemplate 實例
一旦配置完成,我們可以創建一個**RabbitTemplate
**實例。這個實例將使用我們提供的配置來與 RabbitMQ 服務器進行交互。
3> 發送消息
使用**RabbitTemplate
**的發送方法,我們可以將消息發送到指定的交換機和路由鍵。我們可以指定消息體、消息屬性和其他發送選項。
4> 接收消息
要接收消息,我們可以使用**RabbitTemplate
**的接收方法或結合監聽器來監聽指定的隊列。當消息到達時,我們可以處理消息并執行相應的業務邏輯。
5> 處理異常和錯誤
在使用**RabbitTemplate
**時,我們還需要考慮異常和錯誤處理。例如,當發送消息失敗或接收消息時發生異常時,我們需要有相應的處理機制來確保系統的穩定性和可靠性。
4.RabbitTemplate 的優勢與注意事項
優勢:
- 簡化操作:
RabbitTemplate
封裝了底層細節,使得開發者能夠專注于業務邏輯的實現,而無需關心底層的消息傳輸細節。 - 靈活性:
RabbitTemplate
提供了豐富的配置選項和擴展點,使得開發者能夠根據實際需求進行定制和優化。 - 性能優化:
RabbitTemplate
內部進行了性能優化,如連接池管理、消息緩存等,以提高消息傳輸的效率和可靠性。
注意事項:
- 配置正確性:確保
RabbitTemplate
的配置正確無誤,包括連接工廠、交換機、隊列和綁定等的設置。錯誤的配置可能導致消息無法正確發送或接收。 - 異常處理:在使用
RabbitTemplate
時,要充分考慮異常處理機制,確保在發生異常時能夠及時發現并處理。 - 資源釋放:在使用完
RabbitTemplate
后,要確保釋放相關資源,如關閉連接、釋放連接池中的連接等,以避免資源泄漏和性能問題。