1.什么是消息隊列
消息隊列(Message Queue),我們一般簡稱為MQ。消息隊列中間件是分布式系統中重要的組件,具有異步性、松耦合、分布式、可靠性等特點。用于實現高性能、高可用、可伸縮和最終一致性架構。是大型分布式系統不可缺少的中間件。目前主流的消息隊列有RocketMQ、Kafka、RabbitMQ、ZeroMQ、MetaMQ等。消息隊列在很多業務場景中都會使用到,例如:異步處理、應用解耦、流量消鋒、數據同步、日志處理等等。下面是一個消息隊列最簡單的架構模型。
名詞解釋:
- Producer:消息的生產者,負責將消息發送到Broker
- Broker:消息處理中心(內部通常包含多個隊列,稱之為queue),負責消息的存儲等操作
- Consumer:消息消費者,負責從Broker中獲取消息并進行相應處理
2.RabbitMQ
2.1 簡介
RabbitMQ是流行的開源消息隊列其中的一種,用erlang語言開發。它基于AMQP協議(AMQP是應用層協議的一個開放標準,稱為高級消息隊列協議,專門為面向消息的中間件設計)的標準實現。RabbitMQ支持多種語言客戶端(如:Java、C#、Python、Ruby、C、PHP)等。在易用性、擴展性、高可用性等方面表現都不錯。
2.2 安裝
由于RabbitMQ是基于erlang語言編寫,在安裝前先必須安裝erlang環境。
官網地址:https://www.erlang.org/downloads。
最新版本為22.2,Windows用戶可直接下載OPT 22.2 Windows 64-bit Binary直接安裝即可。
接著去RabbitMQ官網下載最新版本的安裝包進行安裝。
官網地址:https://www.rabbitmq.com/install-windows.html#chocolatey
點擊下載rabbitmq-server-3.8.2.exe的Bintray安裝包,下載后直接打開安裝。
如果是macOS用戶,可以通過Homebrew直接安裝,并且Homebrew在安裝RabbitMQ時會自動下載并安裝erlang環境。
2.3 配置環境變量
將RabbitMQ安裝目錄下的sbin子目錄加入到環境變量的Path中。
2.4 啟動/停止服務
啟動或停止服務有應用方式啟動和服務啟動兩種方式。
應用方式啟動:
命令 | 說明 |
---|---|
rabbitmq-server | 直接啟動,關閉窗口后應用就會停止 |
rabbitmq-server -detached | 后臺啟動,后臺獨立進程方式運行,關閉窗口后應用不會關閉 |
rabbitmqctl stop | 停止應用 |
示例:
服務方式啟動:
當安裝完后可以在服務列表中查看到RabbitMQ這個服務,可以在這里直接啟用或停止。
也可以在命令行使用相關命令啟動或關閉服務(注意:控制臺要以管理員方式運行)
命令 | 說明 |
---|---|
rabbitmq-service start | 啟動服務 |
rabbitmq-service stop | 停止服務 |
rabbitmq-service disable | 禁用服務 |
rabbitmq-service enable | 啟用服務 |
示例:
2.5 可視化管理插件
RabbitMQ默認提供了一個rabbitmq_management可視化管理插件,方便我們通過web訪問的方式來管理和查看RabbitMQ。此插件默認是禁用的,因此需要手動啟用它。在命令行使用rabbitmq-plugins來啟用插件。如下:
rabbitmq-plugins enable rabbitmq_management
啟用后可以在瀏覽器中輸入http://localhost:15672來訪問登錄頁面,默認登陸賬號和密碼都為guest
。
登陸成功后進入功能管理首頁。
在后續的示例中會講解這里面的具體內容。
2.6 用戶管理
RabbitMQ默認提供了一個guest用戶,我們也可以創建新用戶并給用戶分配相應的權限。創建用戶有兩種方式,一種是使用rabbitmqctl工具,另一種是使用可視化的方式操作。
使用可視化操作:
在web管理登陸頁面登陸后,點擊Admin選項,這里會列出所有的用戶信息,默認只有一個guest用戶,如下:
點擊下面的Add a user,在展開的頁面中填寫新用戶的姓名、密碼以及身份標簽,確認無誤后點擊Add User按鈕保存。如下:
此時用戶列表就會多出一個新建的用戶,如下:
但這個用戶還不能正常使用,因為還未分配訪問的虛擬主機(虛擬主機的概念會在下個章節說明)以及權限,所以點擊列表中的用戶名(也就是wangl)跳轉到如下頁面:
說明:
- Virtual Host:設置虛擬主機的路徑,默認為“/”,因為沒有新創建別的虛擬主機,所以只有一個默認的。
- Configure regexp:設置用戶的配置權限,支持正則表達式(.*表示所有)。
- Write regexp:設置用戶的寫權限,支持正則表達式(.*表示所有)。
- Read regexp:設置用戶的讀權限,支持正則表達式(.*表示所有)。
最后點擊Set permission按鈕保存,然后回到用戶列表,這時新建的用戶就能正常使用了
登出后使用新用戶登陸來訪問。
使用rabbitmqctl工具:
在命令行可以使用rabbitmqctl,它是RabbitMQ中間件的一個命令行管理工具。
1.創建用戶:
命令:rabbitmqctl add_user username password
示例:rabbitmqctl add_user user1 123
2.刪除用戶:
命令:rabbitmqctl delete_user username
示例:rabbitmqctl delete_user user1
3.修改密碼:
命令:rabbitmqctl change_password username newpassword
示例:rabbitmqctl change_password user1 321
4.列出所有用戶:
命令:rabbitmqctl list_users
5.設置用戶權限:
命令:rabbitmqctl set_permissions [-p vhostpath] username
示例:rabbitmqctl set_permissions -p / user1 .* .* .*
6.刪除用戶權限:
命令:rabbitmqctl clear_permissions [-p vhostpath] username
示例:rabbitmqctl clear_permissions -p / user1
2.7 配置文件
不同的操作系統默認存放的配置文件目錄是不一樣的(也可以通過環境變量指定配置文件的目錄),下面列出在不同系統中默認配置文件的存放位置。
以Windows為例,我們在C:\Users%USERNAME%\AppData\Roaming\RabbitMQ目錄下創建一個名為rabbitmq.conf的配置文件。
使用記事本打開添加如下配置信息可以修改默認的配置。
listeners.tcp.default = 5673
management.listener.port = 15673
num_acceptors.tcp = 10
說明:
屬性 | 描述 | 默認值 |
---|---|---|
listeners.tcp.default | AMQP連接的默認監聽端口,也就是訪問RabbitMQ的默認端口號 | 5672 |
management.listener.port | 訪問web管理插件的默認端口 | 15672 |
num_acceptors.tcp | 接受tcp連接的erlang進程數 | 10 |
這里我們修改了默認的tcp連接端口以及web管理插件的默認端口,配置完成之后記得要重啟RabbitMQ服務,接著重新打開web管理頁面,使用修改后的端口進行訪問。
參考:https://www.linuxidc.com/Linux/2019-03/157354.htm
2.8 AMQP通信模型
名詞解釋:
- Broker:消息處理中心,也就是RabbitMQ Server。
- Virutal Host:虛擬主機相當于一個命名空間。用于隔離不同的Exchange和Queue。每個Virutal Host內部有自己的Exchange和Queue,他們之間互不影響。我們可以為不同用戶指定不同的Virutal Host,這樣不同用戶只能訪問當前設置的Virutal Host下的Exchange和Queue,而不能訪問其他的Virutal Host。在RabbitMQ有一個默認的Virutal Host就是“/”。我們也可以通過可視化插件或者使用rabbitmqctl工具來創建新的Virutal Host。
- Exchange:Exchange也稱之為交換機,核心作用就是將消息生產者(Producer)發送過來的message依據指定的路由規則發送到特定的Queue中。
- Queue:存放message的隊列,消息最終會被消息消費者(Consumer)取出消費。
- Producer:消息的生產者,負責將消息發送到交換機(Exchange)中。
- Consumer:消息消費者,負責從Queue中獲取消息并進行相應處理。
- Binding:Binding就是將一個或者多個消息隊列(Queue)綁定到交換機(Exchange)上。綁定時會設置一個路由的key(一種路由規則表達式)。這樣當Exchange接收到Producer發送的消息時,會根據路由規則將消息發送到具體的Queue中。
3. 基礎應用
RabbitMQ支持多種語言的客戶端,在這個章節中將使用Java客戶端來操作RabbitMQ。新建Maven項目并添加依賴。
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.0</version>
</dependency>
3.1 Queue
直接使用Queue是實現消息發布訂閱最簡單的一種方式,內部會通過一個默認的Exchange(交換機)來將消息路由到Queue中。
Producer示例:
public class Producer {/*** 消息隊列名稱*/private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) {//創建連接工廠并設置RabbitMQ主機地址,默認端口為5672ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道try (Connection conn = connectionFactory.newConnection();//使用連接對象構建一個消息通信的通道Channel channel = conn.createChannel()) {/*** 創建隊列* 參數一:隊列名稱* 參數二:隊列是否持久化(true為持久化)* 參數三:是否排他(true為排他),排他性指的是當exclusive為true時,* 隊列只對首次創建的connection是可見的,false則表示被所有創建的connection都可見* 參數四:如果設置為true,表示連接斷開時會自動刪除此隊列* 參數五:隊列的其他屬性設置,一個map集合*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";/*** 發布消息* 參數一:設置為"",表示未指定交換機的名稱,此時會通過一個默認的交換機來路由消息* 參數二:隊列名稱* 參數三:消息路由頭的其他屬性,這里未添添加任何屬性,設置為null* 參數四:消息體,將其轉換為字節數組*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());} catch (Exception e) {throw new RuntimeException(e);}}
}
運行Producer,打開web管理界面,在Queues的選項里可以查看到新創建一個名為"test_queue"的隊列,并且存有一條發布的消息,如下:
注意:隊列會在第一次使用時創建,如果之前已經創建則不會再創建。
Cosumer示例:
public class Consumer {/*** 消息隊列名稱*/private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {//創建連接工廠并設置RabbitMQ主機地址,默認端口為5672ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection connection = connectionFactory.newConnection();//創建通信通道Channel channel = connection.createChannel();//創建隊列(如果存在則不再創建)channel.queueDeclare(QUEUE_NAME, false, false, false, null);//接收消息時所需的回調接口DeliverCallback callback = (consumerTag, delivery) -> {//獲取消息體String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerTag:" + consumerTag);};//接收消息/*** 接收消息* 參數一:隊列名稱* 參數二:是否自動簽收(true為自動簽收),自動簽收就是* 消息處理完后會自動給rabbitmq回饋一條消息,表示這條消息已經處理完畢* 參數三:消息的回調接口,也就是上面聲明的DeliverCallback,用于接收消息體* 參數四:消費者取消訂閱時的回調接口,會傳入一個consumerTag簽收標簽*/channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
注意:Consumer在創建Connection時不要放在try-with-resources語句塊中,避免Connection自動關閉導致程序結束。因為Consumer運行后會產生阻塞,需要一直監聽隊列是否有新的消息,如果有則從隊列取出并消費。
運行Consumer,在控制臺查看接收的消息。
再次查看web管理控制臺,此時隊列的消息已經被消費掉。
大家可以反復運行Producer進行測試。
3.2 Exchange
前面的例子主要是講解Queue的用法,并通過一個默認的Exchange(交換機)來路由消息。在這個章節中我們主要來了解其他幾種Exchange的用法,Exchange的概念在前面的AMQP的通信模型中已經介紹過,它主要是根據路由key將轉發消息到綁定的隊列(Queue)上。
Exchange的類型有Topic、Direct、Fanout、Headers這四種。而Headers類型的交換機使用場景較少,我們主要學習Topic、Direct、Fanout這幾種交換機的用法。
3.2.1 Topic
作用:將消息中的Routing key
與該Exchange
關聯的所有Binding
中的Routing key
進行比較,如果匹配(可以通過通配符進行模糊匹配),則發送到該Binding
對應的Queue
中。
CusumerA示例:
public class ConsumerA {/*** 定義Exchange名稱*/private final static String EXCHANGE_NAME = "logs.topic";/*** 定義一個Queue名稱,這里指定為info.queue*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//聲明Exchange,類型指定為為topic, //第三個參數是否持久化,true為持久化,默認值為falsechannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);//聲明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key(使用"*"進行模糊綁定),表示任意以".info"結尾的key//的消息都會發送到這個queue中channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.info");//消息回調接口DeliverCallback callback = (consumerTag, delivery) -> {//獲取路由keySystem.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());//獲取消息體String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};//接收消息channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
CusomerB示例:
public class ConsumerB {/*** 定義Exchange名稱*/private final static String EXCHANGE_NAME = "logs.topic";/*** 定義一個Queue名稱,這里指定為error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//聲明交換機,類型為topicchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//聲明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key(使用"*"進行模糊綁定),表示任意以".error"結尾的key//的消息都會發送到這個queue中channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.error");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {//獲取路由keySystem.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());//獲取消息體String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
分別運行ConsumerA和ConsumerB,打開web管理頁面,在Exchanges的頁面中我們可以看到創建了一個名為logs.exchange,類型為topic的Exchange。
在Queues的頁面中可以看到創建了error.queue和info.queue兩個queue。
在Exchanges頁面的列表中點擊logs.topic我們創建的這個exchange,可以查看Exchange和queue的綁定信息,以及路由的key。
同樣在Queues頁面的的列表中點擊error.queue或者info.queue,也可以查看相互綁定的信息。
Producer示例:
public class Producer {/*** Exchange名稱*/private final static String EXCHANGE_NAME = "logs.topic";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址,默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//創建交換機,類型為topic//第三個參數是否持久化,true為持久化,默認值為falsechannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);//定義一個info的messageString infoMessage = "info message...";//定義一個error的messageString errorMessage = "error message...";//將消息發送到交換機,并指定不同路由key//第三個參數是否持久化消息,如果需要持久化則設置為MessageProperties.PERSISTENT_TEXT_PLAIN。//如果不需要持久化,則設置為nullchannel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "log.info", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}
運行Producer,將兩條消息發送到Exchange,此時Exchange會根據消息中指定的路由key將消息不同的消息發送到不同的Queue中。
結果:
3.2.2 Direct
作用:將消息中的Routing key
與該Exchange
關聯的所有Binding
中的Routing key
進行比較,如果完全匹配(注意:是完全匹配),則發送到該Binding
對應的Queue
中。
ConsumerA示例:
public class ConsumerA {/*** Exchange名稱*/private final static String EXCHANGE_NAME = "logs.direct";/*** Queue名稱*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//聲明Exchange,類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key,這里不能使用模糊匹配,direct類型要求路由的key必須完全匹配channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.info");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
ConsumerB示例:
public class ConsumerB {/*** 定義Exchange名稱*/private final static String EXCHANGE_NAME = "logs.direct";/*** 定義一個Queue名稱,這里指定為error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//創建交換機,類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//創建queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key,這里不能使用模糊匹配,direct類型要求路由的key必須完全匹配channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.error");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
Producer示例:
public class Producer {/*** Exchange名稱*/private final static String EXCHANGE_NAME = "logs.direct";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址,默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//創建交換機,類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定義一個info的messageString infoMessage = "info message...";//定義一個error的messageString errorMessage = "error message...";//將消息發送到交換機,并指定不同路由keychannel.basicPublish(EXCHANGE_NAME, "log.info", null, infoMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}
運行ConsumerA,ConsumerB以及Producer
結果:
3.2.3 Fanout
說明:直接將消息轉發到所有binding
的對應queue
中,這種exchange
在路由轉發的時候,忽略Routing key
,直接將消息發送到所有綁定的queue中,因此所有隊列都會接收到相同的消息,相當于廣播。
ConsumerA示例:
public class ConsumerA {/*** Exchange名稱*/private final static String EXCHANGE_NAME = "logs.fanout";/*** Queue名稱*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//聲明Exchange,類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//聲明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key,這里將路由key可設置為任意字符,通常設置為""channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "aa");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
ConsumerB示例:
public class ConsumerB {/*** 定義Exchange名稱*/private final static String EXCHANGE_NAME = "logs.fanout";/*** 定義一個Queue名稱,這里指定為error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址, 默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//聲明Exchange,類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//聲明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//為queue和exchange綁定路由key,這里將路由key可設置為任意字符,通常設置為""channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bb");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}
Producer示例:
public class Producer {/*** Exchange名稱*/private final static String EXCHANGE_NAME = "logs.fanout";public static void main(String[] args) throws Exception {//初始化連接工廠,并指定rabbitmq的主機地址,默認端口為5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//創建連接對象,并使用連接對象構建一個消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//創建交換機,類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//定義一個info的messageString infoMessage = "info message...";//定義一個error的messageString errorMessage = "error message...";//將消息發送到交換機,路由key可任意設置,通常設置為""channel.basicPublish(EXCHANGE_NAME, "", null, infoMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}
運行ConsumerA,ConsumerB以及Producer
結果:
ConsumerA和ConsumerB同時都收到info和error的消息。
4. 整合Spring Boot
4.1 示例
添加依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置:
spring:rabbitmq:addresses: 127.0.0.1# 連接端口,默認5672port: 5672# 設置登陸認證的賬號密碼,默認為guestusername: guestpassword: guest# 虛擬主機地址,默認為"/"virtual-host: /# 設置連接誒超時時間connection-timeout: 5000# 配置消費者監聽設置listener:simple:# 最小消息消費線程數,這里表示每個Listener容器將開啟2個線程去處理消息# 在2.0版本后可以在@RabbitListener注解中配置該參數concurrency: 2# 最大消費線程數max-concurrency: 5# 每個消費線程能從隊列獲取的消息數量# 每個customer會從消息隊列中預取一些消息放入自己的LinkedBlockingQueue中進行消費,# 注意,每個customer線程都有自己對應的BlockingQueueprefetch: 1# 消息簽收模式# none:表示沒有任何的應答會被發送# manual:表示監聽者必須通過調用Channel.basicAck()來告知所有的消息# auto:表示自動應答,除非堅挺著拋出異常,這是默認配置方式acknowledge-mode: auto# 當消費者監聽器產生異常時是否將消息重新放回隊列,默認值為truedefault-requeue-rejected: true
配置類:
在配置類中主要聲明Exchange、Queue等Bean的裝配
@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "order.exchange";public static final String QUEUE_NAME = "order.queue";public static final String ROUTER_KEY = "order.*";/*** 裝配Topic類型的Exchange* 也可以裝配其他類型如:DirectExchange、FanoutExchange* TopicExchange構造方法第一個參數指定交換機名稱,第二個參數是否持久化交換機,* 第三個參數是否自動刪除交換機*/@Beanpublic TopicExchange exchange(){//return new TopicExchange(EXCHANGE_NAME);return new TopicExchange(EXCHANGE_NAME, false, true);}/*** 裝配消息隊列* Queue構造方法第一個參數指定Queue的名稱,第二個參數表示是否持久化queue* @return*/@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, false);}/*** 將queue綁定到exchange*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}
}
Consumer示例:
@Service
public class ConsumerService {/*** 使用@RabbitListener注解進行監聽,通過queues屬性指定要從哪個queue中消費消息* @Payload注解標注的參數為轉換后的消息對象* @Headers注解標注的參數為消息頭* @param message 消息體內容* @param headers 消息頭* @param channel 消息通道*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(@Payload String message,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:" + message);}
}
上面的消費者使用的是自動簽收模式,如果設為手動簽收,也就是在yml中設置了acknowledge-mode: manual
,那么在簽收時需要調用Channel的basicAck()方法來確認簽收的消息。
//當手動確認簽收時,需要自行給rabbitmq回饋一條消息,這條消息已經處理完畢
//從headers獲取一個簽收標簽
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//確認簽收,basicAck方法參入一個簽收標簽,第二個參數表示是否支持批量簽收,false表示單個簽收
channel.basicAck(deliveryTag, false);
Producer示例:
@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送文本消息* @param message*/public void sendMessage(String message){//創建消息的唯一IDCorrelationData correlationData = new CorrelationData();//這里使用訂單ID作為消息的IDcorrelationData.setId(UUID.randomUUID().toString());//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);}
}
測試:
編寫單元測試,注入ProducerService來發送消息。
@SpringBootTest
class Ch04ApplicationTests {@Autowiredprivate ProducerService service;@Testpublic void testSendMessage() {service.sendMessage("Hello world");}}
先運行SpringBoot啟動類,然后執行單元測試,查看ConsumerService的接收結果。
4.2 @RabbitListener注解
@RabbitListener可以標注在方法上或者類上,Spring會根據不同的@RabbitListener注解創建并啟動不同的監聽容器(MessageListenerContainer),并通過queues屬性指定需要監聽的隊列。每個監聽容器都有自己的唯一標識,可以通過id屬性來標識,如果不指定id屬性則會自動創建一個默認的唯一標識。
/*** @param message 消息內容* @param headers 消息頭,需要@Headers或者@Header注解標注(可選參數)* @param channel 消息通道(可選參數)
*/
@RabbitListener(id="001", queues = "queue.a")
public void consumerA(String message,@Headers Map<String, Object> headers,Channel channel) {...
}@RabbitListener(id="002", queues = "queue.b")
public void consumerB(String message,@Headers Map<String, Object> headers,Channel channel) {...
}
除了可以通過配置類來聲明交換機、隊列與綁定,也可以使用@RabbitListener提供的bindings屬性來進行聲明綁定。例如:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "order.queue", durable = "true"),exchange = @Exchange(name = "order.exchange", type = ExchangeTypes.TOPIC),key = "order.*")
public void receive(Long id) {...
}
4.3 @RabbitHandler注解
當消費端需要接收不同的消息類型時,可以結合@RabbitHandler搭配使用。將@RabbitListener注解標注在類上,在不同方法上使用@RabbitHandler標注,這樣Listener監聽容器會根據消息轉換后的類型來調用相應的方法來處理。
@RabbitListener(queues = {"queue.a","queue.b"})
public class ConsumerService {@RabbitHandlerpublic void receiveA(String message) {...}@RabbitHandlerpublic void receiveB(User message) {...}@RabbitHandlerpublic void receiveC(Student message) {...}}
4.4 自定義消息轉換器
Spring默認使用的消息轉換器是SimpleMessageConverter,只能處理基于文本的內容,序列化的Java對象和字節數組。
當然也可以自定義MessageConverter,例如將發送的一個實體把它序列化成Json,接收時又將Json自動轉換為一個實體,那么可以使用Jackson2JsonMessageConverter。
添加依賴:
轉換Json時需要用到Jackson
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId>
</dependency>
配置類:
只需在配置類中添加Jackson2JsonMessageConverter的裝配
@Configuration
public class RabbitConfig { ...@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
Order示例:
用于Producer將一個Order序列化為Json后發送到MQ,Consumer從MQ接收Json后將其反序列化為一個Order對象。
public class Order {/*** 訂單ID*/private String orderId;/*** 訂單消息*/private String message;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}
Producer示例:
@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送對象,使用自定義消息轉換器轉換為json* @param order*/public void sendObject(Order order) {//創建消息的唯一IDCorrelationData correlationData = new CorrelationData();//這里使用訂單ID作為消息的IDcorrelationData.setId(order.getOrderId());//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, correlationData);}
}
Consumer示例:
@Service
public class ConsumerService {/*** 使用自定義消息轉換器* 使用@RabbitListener注解進行監聽,通過queues屬性指定要從哪個queue中消費消息* @Payload注解標注的參數為轉換后的消息對象* @Headers注解標注的參數為消息頭* @param order 轉換后的消息對象* @param headers 消息頭* @param channel 消息通道*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:");System.out.println("訂單編號:" + order.getOrderId());System.out.println("訂單明細:" + order.getMessage());}
}
測試:
編寫單元測試方法
@Test
public void testSendObject() {Order orderDTO = new Order();orderDTO.setOrderId("10001");orderDTO.setMessage("test order...");service.sendObject(orderDTO);
}
先運行SpringBoot啟動類,執行單元測試并查看Consumer接收結果:
5. ACK機制
ACK (Acknowledge character)是一種應答確認符號。用于在網絡通信中,數據接收方成功接收到消息后會給發送方返回一個確認信息。
5.1 發送確認
5.1.1 ConfirmCallback
當消息的發送端發送一條消息到Broker時,為了確保這條消息成功發送到Exchange,因此Broker可以返回一個確認信息給發送端,也就是Producer的Confirm模式。
yml配置:
設置publisher-confirm-type為correlated
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 啟用ConfirmCallback模式publisher-confirm-type: correlated
Producer示例:
public void sendMessage(String message){//使用uuid作為消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);//通過setConfirmCallback設置一個回調來確認消息是否成功發布到Exchange中//如果發布成功ack則為true,失敗為falserabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {//獲取CorrelationData中的IDString eventId = cdata.getId();if (ack) {System.out.println("投遞成功:"+eventId);} else {System.out.println("投遞失敗:"+eventId );}});}
5.1.2 ReturnsCallback
上面的confrim模式只能確認消息是否正確到達Exchange中,但不能保證消息正確投遞到目標 queue里。如果一定要確保消息投遞到queue中,就需要使用ReturnCallback。
yml配置:
將publisher-returns和template.mandatory設置為true
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 啟用ReturnCallback模式publisher-returns: true# 當mandatory標志位設置為true時,如果exchange根據自身類型和routingKey無法找到一個合適的queue,# 那么broker會調用basic.return方法將消息返還給生產者。設置為false時,出現上述情況broker會直接將消 息丟棄template:mandatory: true
Producer示例:
public void sendMessage(String message){//使用uuid作為消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);//通過setReturnsCallback設置回調來確認消息是否成功發布到queue中//注意,只有消息未正確到達queue時才會執行此回調此方法rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("ReturnsCallback=====>");System.out.println(returnedMessage.getMessage());System.out.println(returnedMessage.getReplyCode());System.out.println(returnedMessage.getReplyText());System.out.println(returnedMessage.getRoutingKey());});}
當消息未正確到達queue時,就會執行ReturnCallback。
5.2 消費確認
當消費端在消費一條消息時,Broker會等待消費端返回一條ACK來確認消息是否已成功消費,如果消費成功,那么Broker就會從隊列中移除此消息。在Springboot中配置ack有none、auto、manual三種模式。
5.2.1 NONE
none表示不做任何的簽收確認(相當于無ack),不管消費者是否正常消費消息,broker都認為消息已經被正常消費,并從broker中移除此消息。這樣會導致消費端在處理消息的過程中如果產生異常,那么消息就會丟失。
yml配置:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack確認機制# none:表示不做任何確認簽收(相當于無ack)acknowledge-mode: none
5.2.2 AUTO
auto表示自動確認,自動確認會根據消費端在處理消息的過程是否拋出異常來決定返回ack或者nack給broker。
yml設置:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack確認機制# auto:表示自動確認(默認配置)acknowledge-mode: auto# 當消費者產生異常時是否將消息重新放回隊列,默認值為truedefault-requeue-rejected: true
需要注意的是,在自動確認模式下,default-requeue-rejected設置為true并不能完全決定是否重新放回隊列,另外一個決定因素是具體裝配了哪一個MessageRecoverer(消息回收器)的Bean,它的作用是在消費消息失敗后要做什么樣的處理。默認使用是RejectAndDontRequeueRecoverer。下面分別說明幾種有常見的MessageRecoverer實現。
RejectAndDontRequeueRecoverer:
這是默認使用MessageRecoverer,只要在消費端拋出除AmqpRejectAndDontRequeueException以外的其他異常并且default-requeue-rejected設置為true的情況下,消息都會自動重新投遞到隊列中,否則就會丟棄。
ImmediateRequeueMessageRecoverer:
這個會在拋出除AmqpRejectAndDontRequeueException以外的其他異常會自動返回nack,會忽略default-requeue-rejected的設置,并立即將消息放回當前隊列。
@Configuration
public class RabbitConfig {/*** 裝配ImmediateRequeueMessageRecoverer* @return*/@Beanpublic MessageRecoverer messageRecoverer() {return new ImmediateRequeueMessageRecoverer();}
}
RepublishMessageRecoverer:
這個會在消費失敗后將消息投遞到自己指定的一個隊列中,由其他訂閱的消費者來處理。
@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "error.exchange";public static final String QUEUE_NAME = "error.queue";public static final String ROUTER_KEY = "error.key";@Autowiredprivate RabbitTemplate rabbitTemplate;@Beanpublic DirectExchange exchange(){return new DirectExchange(EXCHANGE_NAME, false, true);}@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, false, true);}@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}/*** 裝配RepublishMessageRecoverer* @return*/@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, ROUTER_KEY);}
}
5.2.3 MANUAL
manual表示手動確認,也就是在消費端的代碼中手動調用basicAck方法確認簽收。如果產生異常,可以通過basicNack或者basicReject拒絕簽收。需要注意的是,當ack模式為manual時,default-requeue-rejected設置是無效的,必須在basicNack或者basicReject拒絕簽收時指定是否重新放回隊列。
yml配置:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack確認機制# manual:表示手動確認acknowledge-mode: manual
手動確認簽收:
在消費端通過調用basicAck方法來確認簽收
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receiveMessage(@Payload String message,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:" + message);//從headers中獲取一個唯一標識Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);//確認簽收//參數1:消息投遞的唯一標識//參數2:是否支持批量簽收(true表示批量確認,false表示單個確認)channel.basicAck(deliveryTag, false);
}
在手動確認時,方法參數多了headers和channel兩個參數。header表示消息的頭信息,channel表示當前的消息通道。在投遞一個消息時,消息頭中會包含一個delivery tag,這個值表示本次投遞的唯一標識,在同一個Channel中,這個值是唯一的。delivery tag長度為64為,值從1開始,每發送一次消息該值會遞增1。消費者端在確認消息時帶上此參數,用于告訴RabbitMQ某次投遞已經正確應答。通過調用channel的basicAck方法來確認應答。
拒絕簽收:
消費端在處理消息時可以依據業務規則來決定是否確認簽收或拒絕簽收。如果需要拒絕簽收,可以調用channel的basicNack或者basicReject方法
//參數1:消息投遞的標簽
//參數2:是否支持批量拒絕
//參數3:是否重新放回隊列(true表示放回)
channel.basicNack(deliveryTag, false, true);
//參數1:消息投遞的標簽
//參數2:是否重新放回隊列(true表示放回)
channel.basicReject(deliveryTag, true);
兩個方法區別在于basicReject一次只能拒絕單條消息,basicNack可以拒絕多條。并且這兩個方法在拒絕簽收時可以設置是否將消息重新放回消息隊列。
6. 重試機制
在消息投遞或者消費的過程因為網絡或異常導致消息不能正常投遞和消費時,可以采用重試機制。需要注意的是,這里的重試和RabbitMQ無關,RabbitMQ本身是不提供重試的功能,而是由Spring的retry框架實現,具體可以參考spring-retry模塊的使用。
6.1 發送端重試
發送端重試是針對RabbitTemplate,在消息的投遞過程中由于網絡原因連接失敗或者其他的錯誤導致消息沒有正常投遞到Broker,那么可以啟用template的retry功能。
yml:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 發送端重試template:retry:# 啟用重試機制enabled: true# 重試次數max-attempts: 3# 重試間隔時間(單位:毫秒)initial-interval: 2000ms
6.2 消費端重試
消費端重試主要針對的是消費者的Listener。當消費者在處理一條消息時,在這個過程如果Listener拋出異常或其他原因導致消息沒有正常被消費,那么可以啟用listener的rety功能。需要注意的是,當acknowledge-mode設置為auto并且default-requeue-rejected設置為true時,同時使用的是默認的MessageRecoverer(消息回收器),這樣當消費端拋出除AmqpRejectAndDontRequeueException以外的其他異常時會將消息重新放回隊列中,此時消費者又會從隊列中取出消息進行消費,那么就會導致無限循環消費,這是不合理的。正確的做法是需要指定重試的次數,并且到達該次數后讓RabbitMQ將此消息放到死信隊列中(死信隊列在下個章節講解)做相應處理或由人工解決。如果未配置死信隊列,那么達到次數后該消息將被丟棄。當然也可以配置RepublishMessageRecoverer,到達重試次數后將消息投遞到自己指定的交換機和隊列來處理,效果是一樣的。
yml配置:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# 消費端重試retry:# 啟用消費端重試enabled: true# 重試次數max-attempts: 3# 重試間隔時間(單位:毫秒)initial-interval: 2000ms
7. 死信隊列
7.1 概念
在消費端重試時,當到達重試次數后,此時被拒絕的消息就會變為死信(通常一個消息變為死信有幾種情況,例如被拒絕的消息、消息達到TLL過期時間、以及隊列達到了最大長度等),如果沒有相應的處理,那么broker將丟棄此消息。所以當這些重試之后都無法消費的消息,我們就將其放入死信隊列中做進一步的處理。而這個死信隊列本身也是一個普通的Queue。這個Queue也需要綁定一個Exchange,這個Exchange就稱之為死信交換機(DLX)。同樣這個Exchange可以是任意類型如Direct、Topic、Fanout的Exchange,與普通的Exchange沒有什么差異。因此當我們將一個消息發送到死信隊列時,通過這個死信交換機將消息發送到指定的Queue。下面給出一個具體的示例:
7.2 自動確認處理
可以結合Spring的retry進行重試,當到大重試次數后指定將消息投遞到死信交換機。
yml配置:
spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# 自動確認acknowledge-mode: auto# 重試設置(如果使用手動確認建議使用redis來實現重試次數)retry:# 啟用消費端重試監聽enabled: true# 重試次數max-attempts: 3# 重試間隔時間(單位:毫秒)initial-interval: 2000ms
配置類:
@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "order.exchange";public static final String QUEUE_NAME = "order.queue";public static final String ROUTER_KEY = "order.*";//聲明死信交換機名稱public static final String DEAD_EXCHANGE_NAME = "dead.exchange";//聲明死信隊列名稱public static final String DEAD_QUEUE_NAME = "dead.queue";//死信隊列路由keypublic static final String DEAD_ROUTER_KEY = "dead.key";/*** 配置普通業務的Exchange*/@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE_NAME, false, true);}/*** 裝配死信Exchange(DLX),可以是direct類型也可以是其他類型** @return*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE_NAME, false, true);}/*** 配置普通業務的消息隊列并關聯死信交換機,當這個隊列中的消息被拒絕或達到重試次數后,* 通過死信路由的key將其發送到對應的死信交換機* @return*/@Beanpublic Queue queue() {//使用QueueBuilder.nonDurable(QUEUE_NAME)創建不持久化的queue,//如果需要創建持久化的queue使用durable(QUEUE_NAME)方法return QueueBuilder.nonDurable(QUEUE_NAME)//自動刪除//.autoDelete()//設置死信交換機的名稱.withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME)//設置死信隊列路由的key.withArgument("x-dead-letter-routing-key", DEAD_ROUTER_KEY)//消息超過這個時間還未被消費則路由到死信交換機//.withArgument("x-message-ttl", 5000).build();}/*** 配置死信隊列*/@Beanpublic Queue deadQueue() {return new Queue(DEAD_QUEUE_NAME, false);}/*** 將queue綁定到exchange*/@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}/*** 將死信隊列綁定到死信交換機上** @return*/@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTER_KEY);}/*** 裝配Jackson2JsonMessageConverter* @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}}
訂單實體
public class Order {/*** 訂單ID*/private String id;public String getId() {return id;}public void setId(String id) {this.id = id;}
}
消費端:
@Service
public class ConsumerService {/*** 在消費者執行中引發一個異常,此時Spring會自動執行retry功能,* 當達到retry次數時,該消息會自動路由到DLX中*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws Exception {System.out.println("訂單編號:" + order.getId());//產生異常System.out.println(10 / 0);}
}
死信隊列消費端:
@Service
public class DeadLetterService {/*** 監聽死信隊列,如果有消息進入死信隊列,將執行此方法做進一步的處理* @param message*/@RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)public void receiveDeadLetter(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收到死信消息,訂單ID:" + order.getId());}
}
發送端:
@Service
public class ProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendObject(Order order) {//創建CorrelationDataCorrelationData correlationData = new CorrelationData();//這里使用訂單ID作為消息的IDcorrelationData.setId(order.getId());//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, correlationData);}
}
單元測試:
@SpringBootTest
class RabbitApplicationTests {@Autowiredprivate ProducerService service;@Testpublic void testSendObject() {Order orderDTO = new Order();orderDTO.setId("10001");service.sendObject(orderDTO);}}
7.2 手動確認處理
手動處理不需要retry的支持,可以結合Redis來存儲重試的次數,當達到重試次數后執行nack并將消息投遞到死信交換機中,重點在消費者中的代碼實現。
yml配置:
spring:# redis配置redis:host: 127.0.0.1port: 6379database: 0password: wanglconnect-timeout: 2s# rabbitmq設置rabbitmq:# rabbitmq服務器地址addresses: 127.0.0.1# 連接端口,默認是5672port: 5672# 賬號密碼username: guestpassword: guest# 虛擬主機地址,默認為"/"virtual-host: /# 連接的超時時間connection-timeout: 5000# 啟用ConfirmCallback模式(發送確認),當消息到達交換機后會返回一條ack給發送端publisher-confirm-type: correlated# 設置發送端重試template:retry:# 啟用重試機制enabled: true# 重試次數max-attempts: 3# 重試間隔時間(單位:毫秒)initial-interval: 2000ms# 消費者監聽設置listener:simple:# 最小的消費線程數量concurrency: 2# 最大的消費線程數量max-concurrency: 5# 限流,每個線程能從隊列獲取的消息數量prefetch: 1# 手動確認acknowledge-mode: manual
消費端:
@Service
public class ConsumerService {/*** 重置次數的key前綴*/private static final String ATTEMPTS_PREFIX = "attempts:";/*** 最大重試次數*/private static final Integer MAX_RETRY = 3;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws Exception {//獲取一個消息的標簽Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);try {log.info("訂單ID: " + order.getOrderId());//產生一個異常System.out.println(10 / 0);//正常執行則手動簽收消息channel.basicAck(tag, false);} catch (Exception e) {//如果產生異常則拒絕簽收并將消息放回隊列進行重試操作//從redis中獲取重試次數,increment會在Redis中執行自增并返回自增的值,這一步是原子操作的Long retryTotal = redisTemplate.opsForValue().increment(ATTEMPTS_PREFIX + order.getOrderId());//如果大于最大重試次數則放入死信if(retryTotal > MAX_RETRY) {//拒絕簽收,第三個參數設置為false表示不重新放回隊列,//如果配置了死信隊列則直接丟到死信隊列中channel.basicNack(tag, false, false);//刪除keyredisTemplate.delete(ATTEMPTS_PREFIX + order.getOrderId());} else {//拒絕簽收并重新放回隊列繼續執行重試channel.basicNack(tag, false, true);}}}
}
死信隊列消費端:
@Service
public class DeadLetterConsumer {/*** 監聽死信隊列* @param order* @param headers* @param channel*/@RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)public void receiveDeadLetter(Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {log.info("接收到異常訂單,編號:" + order.getOrderId());//手動確認簽收Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(tag, false);}
}
8. 延遲隊列
所謂延遲隊列就是根據我們的業務要求將消息延遲進行處理。
-
在電商中,用戶下單后并沒有立即支付,如果在指定的時間內未支付,則取消該訂單
-
在系統發布一個通告,在某時刻之后通知到指定的人
8.1 實現方式
Rabbitmq實現延遲消費通常有兩種形式:
-
利用自身Time To Live(TTL)以及Dead Letter Exchanges(DLX)的特性實現
(也就是如果達到TTL時間未消費則投遞到死信隊列)
-
利用Rabbitmq插件rabbitmq_delayed_message_exchange(延遲投遞)
rabbitmq_delayed_message_exchange插件的實現方式簡單點說就是當發布消息后不會立即進入隊列,而是存儲在mnesia(一個分布式數據系統)表中,當達到延遲的時間后就立刻將消息投遞至目標隊列中。需要注意的是,插件能支持的最大延遲時間為(2^32)-1毫秒, 大約49天。
官方說明:
For each message that crosses an "x-delayed-message"
exchange, the plugin will try to determine if the message has to be expired by making sure the delay is within range, ie: Delay > 0, Delay =< ?ERL_MAX_T
(In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).
8.2 安裝插件
在官網https://www.rabbitmq.com/community-plugins.html下載延遲消息插件。
注意對應rabbitmq版本,下載后將插件拷貝到rabbitmq的plugins目錄,拷貝后在終端使用以下命令可以看插件列表
rabbitmq-plugins list
啟用插件:
在終端使用以下命令啟用延遲插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟用插件后重啟RabbitMQ服務。
8.3 示例
這里以用戶下單后未支付的場景為例,如果在指定的時間內未支付,則取消該訂單。
創建訂單表:
create table order_info(order_id varchar(50) primary key,order_status tinyint(1) not null, -- 0:取消訂單 1:未支付 2:已支付order_message varchar(100)
);
添加依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId>
</dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions>
</dependency>
yml配置:
spring:# 數據源配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/order?serverTimezone=GMT&useUnicode=true&characterEncoding=utf-8username: rootpassword: root# hikari連接池配置hikari:minimum-idle: 5maximum-pool-size: 20idle-timeout: 900000connection-timeout: 15000connection-test-query: select 1# 配置RabbitMQrabbitmq:addresses: 127.0.0.1# 連接端口,默認5672port: 5672# 設置登陸認證的賬號密碼,默認為guestusername: guestpassword: guest# 虛擬主機地址,默認為"/"virtual-host: /# 設置連接誒超時時間connection-timeout: 5000# 配置消費者監聽設置listener:simple:# 最小消息消費線程數concurrency: 2# 最大消息消費線程數max-concurrency: 5# 限流,每個消費線程能從隊列獲取的消息數量prefetch: 1# 自動應答acknowledge-mode: auto
# mybatis配置
mybatis:type-aliases-package: edu.nf.ch05.entitymapper-locations: classpath:/mappers/*.xml
配置類:
@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "delay.exchange";public static final String QUEUE_NAME = "delay.queue";public static final String ROUTER_KEY = "order.message";/*** 自定義Exchange,設置延遲交換機類型為direct,也可以設置為topic等其他類型*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> params = new HashMap<>();params.put("x-delayed-type", "direct");return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", false, true, params);}/*** 裝配消息隊列* Queue構造方法第二個參數表示是否持久化消息* @return*/@Beanpublic Queue queue(){return new Queue(QUEUE_NAME, false);}/*** 將queue綁定到exchange*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(delayExchange()).with(ROUTER_KEY).noargs();}/*** 自定義消息轉換器* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
Order示例:
@Data
public class Order {private String orderId;private Integer status;private String message;}
OrderDao示例:
public interface OrderDao {/*** 根據ID查詢訂單信息* @param orderId* @return*/Order getOrderById(String orderId);/*** 保存訂單信息* @param order*/void saveOrder(Order order);/*** 修改訂單* @param order*/void updateOrder(Order order);
}
Mapper映射配置:
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.nf.ch05.dao.OrderDao"><resultMap id="orderMap" type="order"><id property="orderId" column="order_id"/><result property="status" column="order_status"/><result property="message" column="order_message"/></resultMap><select id="getOrderById" parameterType="string" resultMap="orderMap">select order_id, order_status, order_message from order_info where order_id = #{orderId}</select><insert id="saveOrder" parameterType="order">insert into order_info(order_id, order_status, order_message) values(#{orderId}, #{status}, #{message})</insert><update id="updateOrder" parameterType="order">update order_info set order_status = #{status} where order_id = #{orderId}</update>
</mapper>
ProducerService示例:
@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 注入OrderDao*/@Autowiredprivate OrderDao orderDao;/*** 發送消息* @param order 訂單對象* @param delayTime 延遲消費時長*/public void send(Order order, int delayTime) {//創建消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(order.getOrderId());//將訂單信息入庫,此時訂單狀態1,表示未支付orderDao.saveOrder(order);//發送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTER_KEY, order, messagePostProcessor -> {//通過消息的后置處理器設置延遲放入的時間messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;}, correlationData);}
}
ConsumerService示例:
@Service
@Slf4j
public class ConsumerService {/*** 注入OrderDao*/@Autowiredprivate OrderDao orderDao;/*** 接收消息* 這里會延遲接收,也就是在發送端指定的延遲時間后才才進行接收*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(Order order) {log.info("接收消息,訂單編號:" + order.getOrderId());//依據訂單編號查詢數據庫,如果訂單狀態為1則將其更新為0,表示取消訂單order = orderDao.getOrderById(order.getOrderId());if(order.getStatus() == 1){order.setStatus(0);orderDao.updateOrder(order);log.info("訂單已取消");}}
}
測試:
運行SpringBoot啟動程序:
@SpringBootApplication
@MapperScan("edu.nf.ch05.dao")
public class Ch05Application {public static void main(String[] args) {SpringApplication.run(Ch05Application.class, args);}}
執行單元測試:
@SpringBootTest
public class ProducerServiceTests {@Autowiredprivate ProducerService producerService;@Testvoid testSend() {Order order = new Order();order.setOrderId("100001");order.setMessage("test order...");order.setStatus(1);producerService.send(order, 10000);}}
查看數據庫,測試會錄入一條訂單信息,其狀態為1。
如果在指定的過期時間內未其他服務處理該訂單,那么消費者會從隊列中取出這條訂單信息,根據ID去數據庫查詢該訂單的狀態,如果為1(未支付)則自動取消訂單,將其狀態更新為0。
再次查看這條訂單記錄,此時的狀態已更新為0。