Spring Boot 與 RabbitMQ 的深度集成實踐(三)

高級特性實現

消息持久化

在實際的生產環境中,消息的可靠性是至關重要的。消息持久化是確保 RabbitMQ 在發生故障或重啟后,消息不會丟失的關鍵機制。它涉及到消息、隊列和交換機的持久化配置。

首先,配置隊列持久化。在創建隊列時,將durable參數設置為true,表示該隊列是持久化隊列。當 RabbitMQ 服務器重啟時,持久化隊列會從磁盤中恢復,而不是被重新創建。例如,在之前創建隊列的配置類中:

 

@Bean

public Queue directQueue() {

return new Queue("direct.queue", true);

}

這里創建的direct.queue隊列通過true參數設置為持久化隊列。這樣,即使服務器出現故障,隊列中的消息也不會丟失。

對于交換機,同樣可以通過durable參數來設置持久化。以直連交換機為例:

 

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange", true, false);

}

direct.exchange交換機被設置為持久化,保證了在服務器重啟后,交換機的配置信息仍然存在,不會影響消息的路由。

在消息層面,Spring AMQP 默認會將消息設置為持久化。當使用RabbitTemplate發送消息時,消息的MessageProperties中的deliveryMode屬性默認被設置為MessageDeliveryMode.PERSISTENT,表示消息是持久化的。這意味著消息不僅會被存儲在內存中,還會被寫入磁盤,從而在服務器重啟后仍然可用。例如,在生產者類RabbitMQProducer中發送消息時:

 

public void sendMessage(String exchange, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchange, routingKey, message);

System.out.println("Sent message: " + message);

}

這里發送的消息會自動被標記為持久化,確保了消息在傳輸過程中的可靠性。通過配置消息、隊列和交換機的持久化,可以大大提高消息系統的可靠性,避免因服務器故障導致的消息丟失問題,為企業級應用提供了堅實的消息保障。

消息確認機制

消息確認機制是保證消息在生產者和消費者之間可靠傳遞的重要手段,它分為生產者消息確認和消費者消息確認。

在生產者端,RabbitMQ 提供了confirm回調機制,用于確認消息是否成功發送到交換機。首先,在application.yml中開啟publisher - confirm - type配置:

 

spring:

rabbitmq:

publisher - confirm - type: correlated

這表示開啟了發布確認模式,當消息發送到交換機后,會觸發回調方法。

然后,在配置類中為RabbitTemplate設置ConfirmCallback回調:

 

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

@Bean

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

System.out.println("Message sent to exchange successfully, correlation data: " + correlationData);

} else {

System.out.println("Failed to send message to exchange, correlation data: " + correlationData + ", cause: " + cause);

}

});

return rabbitTemplate;

}

}

在這個回調中,correlationData包含了消息發送時的相關數據,如消息 ID 等;ack表示消息是否成功發送到交換機,true表示成功,false表示失敗;cause則是失敗的原因。通過這種方式,生產者可以根據回調結果來判斷消息的發送狀態,以便進行相應的處理,如記錄日志、重新發送消息等。

對于消費者,RabbitMQ 支持自動確認和手動確認兩種方式。自動確認是默認的方式,當消費者接收到消息后,RabbitMQ 會自動將消息從隊列中移除。然而,這種方式存在一定的風險,如果消費者在處理消息過程中出現異常,消息已經被確認移除,可能會導致數據丟失。

手動確認則更加安全可靠。在application.yml中,可以將消費者的確認模式設置為手動確認:

 

spring:

rabbitmq:

listener:

simple:

acknowledge - mode: manual

在消費者類中,通過Channel對象來手動確認消息:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component

public class RabbitMQConsumer implements ChannelAwareMessageListener {

@RabbitListener(queues = "direct.queue")

@Override

public void onMessage(Message message, Channel channel) throws Exception {

try {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 處理消息邏輯

System.out.println("Received message: " + new String(message.getBody()));

// 手動確認消息,multiple為false表示只確認當前消息

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 處理異常,例如將消息放入死信隊列或記錄日志

System.out.println("Error processing message: " + e.getMessage());

// 拒絕消息,requeue為false表示不重新入隊,消息會進入死信隊列(如果配置了死信隊列)

channel.basicNack(deliveryTag, false, false);

}

}

}

在手動確認模式下,消費者在成功處理消息后,通過channel.basicAck方法來確認消息;如果處理過程中出現異常,則通過channel.basicNack方法來拒絕消息,并可以根據業務需求決定是否將消息重新放入隊列。這種方式確保了消息在被正確處理后才會從隊列中移除,提高了消息處理的可靠性。

死信隊列和延遲隊列

死信隊列(Dead Letter Queue,DLQ)是一種特殊的隊列,用于處理那些無法被正常消費的消息。當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換器中,這個交換器就是死信交換器(DLX),綁定 DLX 的隊列就稱為死信隊列。

導致消息成為死信的常見原因有以下幾種:

  • 消息被拒絕:當消費者使用basic.reject或basic.nack方法拒絕消息,并且設置requeue參數為false時,消息會成為死信。這通常發生在消息內容不符合預期,或者消費者處理消息時出現嚴重錯誤,無法繼續處理該消息的情況下。例如,在處理訂單消息時,如果消息格式錯誤,無法解析訂單信息,消費者可以拒絕該消息并將其標記為死信。
  • 消息過期:如果為消息或隊列設置了生存時間(TTL,Time To Live),當消息在隊列中的存活時間超過了 TTL 值時,消息就會過期成為死信。例如,在電商場景中,用戶下單后生成的訂單消息,如果在一定時間內未被處理(如 30 分鐘),可以將其設置為過期,進入死信隊列進行后續處理,如取消訂單、通知用戶等。
  • 隊列達到最大長度:當隊列中的消息數量達到了其設置的最大長度限制時,新進入隊列的消息會被視為死信。這在一些對隊列容量有限制的場景中很有用,例如,為了防止隊列無限增長導致內存耗盡,可以設置隊列的最大長度,當隊列滿時,新消息進入死信隊列,以便進行特殊處理。

死信隊列在實際應用中有廣泛的場景。例如,在訂單處理系統中,當訂單消息處理失敗(如庫存不足、支付失敗等)時,可以將訂單消息放入死信隊列,由專門的處理程序對死信隊列中的消息進行分析和處理,如重新嘗試處理訂單、通知管理員等。在消息重試機制中,也可以利用死信隊列,當消息多次重試仍未成功時,將其放入死信隊列,避免消息在正常隊列中無限循環重試,占用資源。

在 Spring Boot 中配置死信隊列,首先需要創建正常隊列、死信隊列和死信交換機。以下是一個配置示例:

 

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class DeadLetterQueueConfig {

public static final String NORMAL_QUEUE = "normal.queue";

public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";

public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";

public static final String ROUTING_KEY = "routing.key";

// 創建正常隊列,并配置死信交換機和路由鍵

@Bean

public Queue normalQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

args.put("x-dead-letter-routing-key", ROUTING_KEY);

return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();

}

// 創建死信隊列

@Bean

public Queue deadLetterQueue() {

return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();

}

// 創建死信交換機

@Bean

public DirectExchange deadLetterExchange() {

return new DirectExchange(DEAD_LETTER_EXCHANGE);

}

// 綁定死信隊列和死信交換機

@Bean

public Binding bindingDeadLetterQueue(@Qualifier("deadLetterQueue") Queue queue,

@Qualifier("deadLetterExchange") DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

}

}

在這個配置中,normalQueue方法創建了一個正常隊列,并通過x-dead-letter-exchange和x-dead-letter-routing-key參數配置了死信交換機和路由鍵。當正常隊列中的消息成為死信時,會根據這些配置被發送到死信交換機,再由死信交換機路由到死信隊列。deadLetterQueue方法創建了死信隊列,deadLetterExchange方法創建了死信交換機,最后通過bindingDeadLetterQueue方法將死信隊列和死信交換機進行綁定,建立起死信消息的路由通道。

延遲隊列是一種特殊的隊列,它允許消息在指定的延遲時間后才被消費者消費。在 AMQP 協議中,RabbitMQ 本身并沒有直接支持延遲隊列的功能,但可以通過死信隊列和 TTL(Time To Live)來模擬實現延遲隊列的效果。

具體實現原理是:生產者將消息發送到一個設置了 TTL 的正常隊列中,當消息在正常隊列中的存活時間超過了 TTL 值時,消息會成為死信,并被發送到死信隊列中。由于死信隊列有消費者監聽,所以當消息進入死信隊列時,就相當于延遲了 TTL 時間后被消費,從而實現了延遲隊列的功能。

在實際應用中,延遲隊列有很多場景。例如,在電商系統中,用戶下單后,如果在一定時間內(如 30 分鐘)未支付,訂單將被自動取消。可以將取消訂單的消息發送到延遲隊列,設置延遲時間為 30 分鐘,當 30 分鐘后,消息從延遲隊列中被消費,系統可以檢查訂單狀態,如果仍未支付,則取消訂單。在定時任務場景中,也可以利用延遲隊列來實現定時執行任務的功能,如定時發送郵件、定時生成報表等。

以電商訂單超時取消為例,展示如何配置延遲隊列。首先,在上述死信隊列配置的基礎上,為正常隊列設置 TTL:

 

// 創建正常隊列,并配置死信交換機、路由鍵和TTL

@Bean

public Queue normalQueue() {

Map<String, Object> args = new HashMap<>();

args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);

args.put("x-dead-letter-routing-key", ROUTING_KEY);

// 設置隊列中消息的TTL為30分鐘(30 * 60 * 1000毫秒)

args.put("x-message-ttl", 30 * 60 * 1000);

return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();

}

在生產者類中,發送訂單消息到正常隊列:

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class OrderProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void sendOrderMessage(String orderInfo) {

rabbitTemplate.convertAndSend("", DeadLetterQueueConfig.NORMAL_QUEUE, orderInfo);

System.out.println("Sent order message: " + orderInfo);

}

}

在消費者類中,監聽死信隊列,處理超時訂單:

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class OrderConsumer {

@RabbitListener(queues = DeadLetterQueueConfig.DEAD_LETTER_QUEUE)

public void handleTimeoutOrder(String orderInfo) {

System.out.println("Received timeout order message: " + orderInfo);

// 處理超時訂單邏輯,如取消訂單、通知用戶等

}

}

通過以上配置和代碼,實現了利用死信隊列和 TTL 模擬延遲隊列,實現電商訂單超時自動取消的功能。這種方式充分利用了 RabbitMQ 的特性,為分布式系統中的定時任務和延遲處理提供了靈活可靠的解決方案。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/81667.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/81667.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/81667.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

成功案例丨GEZE與Altair合作推動智能建筑系統開發

Altair 作為計算智能領域的全球領導者&#xff0c;將分別在北京、上海、成都、深圳舉辦 “AI驅動&#xff0c;仿真未來”Altair 區域技術交流會。屆時將匯聚行業專家與先鋒企業&#xff0c;共同探討仿真智能化如何賦能工業創新&#xff0c;分享最新仿真與 AI 技術的應用實踐。歡…

DDoS與CC攻擊:誰才是服務器的終極威脅?

在網絡安全領域&#xff0c;DDoS&#xff08;分布式拒絕服務&#xff09;與CC&#xff08;Challenge Collapsar&#xff09;攻擊是兩種最常見的拒絕服務攻擊方式。它們的目標都是通過消耗服務器資源&#xff0c;導致服務不可用&#xff0c;但攻擊方式、威脅程度和防御策略存在顯…

循環中使用el-form

循環中使用el-form 主要是校驗問題 el-table 的數據 :data“ruleForm.tableData” :prop“‘tableData.’ $index ‘.name’” :rules“rules.name” <el-button type"primary" click"addNewData">新增項目</el-button><el-form :model&…

SAP學習筆記 - 開發13 - CAP 之 添加數據庫支持(Sqlite)

上一章學習了CAP開發準備&#xff0c;添加Service。 SAP學習筆記 - 開發12 - CAP 之 開發準備&#xff0c;添加服務-CSDN博客 本章繼續學習CAP開發 - 添加數據庫支持&#xff08;Sqlite&#xff09;。 目錄 1&#xff0c;數據庫準備 - H2 內存數據庫 - Sqlite數據庫 a&…

【數據結構與算法】——圖(三)——最小生成樹

前言 本將介紹最小生成樹以及普里姆算法&#xff08;Prim&#xff09;和克魯斯卡爾&#xff08;Kruskal&#xff09; 本人其他博客&#xff1a;https://blog.csdn.net/2401_86940607 圖的基本概念和存儲結構&#xff1a;【數據結構與算法】——圖&#xff08;一&#xff09; 源…

Flink運維要點

一、Flink 運維核心策略 1. 集群部署與監控 資源規劃 按業務優先級分配資源&#xff1a;核心作業優先保障內存和 CPU&#xff0c;避免資源競爭。示例&#xff1a;為實時風控作業分配專用 TaskManager&#xff0c;配置 taskmanager.memory.process.size8g。 監控體系 集成 Prom…

面試點補充

目錄 1. 搭建lnmp Linux 系統基礎命令 nginx相關命令 MySQL 相關命令 PHP 相關命令 驗證命令 下載并部署 Discuz! X3.4 論壇 到 Nginx 網站 2. 腦裂 2.1 腦裂的定義 2.2 腦裂產生的原因 1. 主備節點之間的心跳線中斷 2. 優先級沖突 3. 系統或服務負載過高 2.3 如何…

天能股份SAP系統整合實戰:如何用8個月實現零業務中斷的集團化管理升級

目錄 天能股份SAP系統整合案例&#xff1a;技術驅動集團化管理的破局之路 一、企業背景&#xff1a;新能源巨頭的數字化挑戰 二、項目難點&#xff1a;制造業的特殊攻堅戰 1. 生產連續性剛性需求 2. 數據整合三重障礙 3. 資源限制下的技術突圍 三、解決方案&#xff1a;S…

嵌入式學習筆記 - STM32獨立看門狗IWDG與窗口看門狗WWDG的區別

下圖說明了獨立看門狗IWDG與窗口看門狗WWDG的區別: 從中可以看出&#xff1a; 一 復位 獨立看門狗在計數器技術導0時復位&#xff0c; 窗口看門狗在計數器計數到0X40時復位。 二 喂狗 獨立看門狗可以在計數器從預裝載值降低到0過過程中的任意時間喂狗&#xff0c; 窗口看…

配電房值守難題終結者:EdgeView智能監控的7×24小時守護

在電力行業數字化轉型的背景下&#xff0c;開關柜中的設備作為電能傳輸過程中的重要一環&#xff0c;其質量及運行狀態直接關系到電網的安全性、可靠性、穩定性和抵抗事故的能力。 然而&#xff0c;在開關柜的調試部署與運行使用階段&#xff0c;也常常會遇到設備標準不統一、…

B樹與B+樹全面解析

B樹與B樹全面解析 前言一、B 樹的基本概念與結構特性1.1 B 樹的定義1.2 B 樹的結構特性1.3 B 樹的節點結構示例 二、B 樹的基本操作2.1 查找操作2.2 插入操作2.3 刪除操作 三、B 樹的基本概念與結構特性3.1 B 樹的定義3.2 B 樹的結構特性3.3 B 樹的節點結構示例 四、B 樹與…

如何使用VCS+XA加密verilog和spice網表

如果要交付verilog&#xff0c;但是需要對方進行VCS仿真&#xff0c;那么可以用以下方法&#xff1a; 一、基于編譯指令的局部加密? ?適用場景?&#xff1a;需精確控制加密范圍&#xff08;如僅加密核心算法或敏感邏輯&#xff09;。 ?實現步驟?&#xff1a; ?代碼標注…

策略模式-枚舉實現

策略模式的實現方法有很多&#xff0c;可以通過策略類if,else實現。下面是用枚舉類實現策略模式的方法。 定義一個枚舉類&#xff0c;枚舉類有抽象方法&#xff0c;每個枚舉都實現抽象方法。這個策略&#xff0c;實現方法是工具類的很實現&#xff0c;代碼簡單好理解 枚舉實現…

大數據hadoop小文件處理方案

Hadoop處理小文件問題的解決方案可分為存儲優化、處理優化和架構優化三個維度,以下是綜合技術方案及實施要點: 一、存儲層優化方案 1.文件合并技術 離線合并:使用hadoop fs -getmerge命令將多個小文件合并為大文件并重新上傳; MapReduce合并:開發專用MR…

線程調度與單例模式:wait、notify與懶漢模式解析

一.wait 和 notify&#xff08;等待 和 通知&#xff09; 引入 wait notify 就是為了能夠從應用層面&#xff0c;干預到多個不同線程代碼的執行順序&#xff0c;可以讓后執行的線程主動放棄被調度的機會&#xff0c;等先執行的線程完成后通知放棄調度的線程重新執行。 自助取…

ros運行包,Ubuntu20.04成功運行LIO-SAM

zz:~/lio_sam_ws$ source devel/setup.bash zz:~/lio_sam_ws$ roslaunch lio_sam run.launch 創建包鏈接&#xff1a; 鏈接1&#xff1a;Ubuntu20.04成功運行LIO-SAM_ubuntu20.04運行liosam-CSDN博客 鏈接2&#xff1a;ubuntu 20.04 ROS 編譯和運行 lio-sam,并且導出PCD文件…

AI自動化工作流:開啟當下智能生產力的價值

舉手之言&#xff1a;AI自動化工作流創造了什么呢&#xff1f; AI自動化工作流 &#xff0c;顧名思義&#xff0c;是將人工智能&#xff08;AI&#xff09;技術與自動化流程相結合&#xff0c;通過智能化的方式來完成復雜的任務和操作。簡單來說&#xff0c;它就是利用AI的強大…

【設計模式】- 行為型模式2

觀察者模式 定義了一對多的依賴關系&#xff0c;讓多個觀察者對象同時監聽某一個對象主題。這個主題對象在狀態變化時&#xff0c;會通知所有的觀察者對象&#xff0c;讓他們能夠自動更新自己。 【主要角色】 抽象主題角色&#xff1a;把所有觀察者對象保存在一個集合里&…

mapbox-gl強制請求需要accessToken的問題

vue引入"mapbox-gl": "^2.15.0", 1.13以后得版本&#xff0c;都強制需要驗證這個mapboxgl.accessToken。 解決辦法&#xff1a;實例化地圖的代碼中&#xff0c;加入這個&#xff1a; const originalFetch window.fetch; window.fetch function ({ url…

已知6、7、8月月平均氣溫和標準差,求夏季季平均溫度與標準差

由下面定理&#xff0c;得出平方和的公式&#xff1a;&#xff08;即每天的溫度平方和&#xff09; 這樣就可以推出季平均的算法&#xff1a; 舉例&#xff1a;在Excel用公式算&#xff0c;不要手算&#xff1a; 因此季平均&#xff1a;(B2*C2B3*C3B4*C4)/SUM(B2:B4) 季標準差…