Rabbitmq消息不丟失

目錄

  • 一、消息不丟失
    • 1.消息確認
    • 2.消息確認業務封裝
      • 2.1 發送確認消息測試
      • 2.2 消息發送失敗,設置重發機制

一、消息不丟失

消息的不丟失,在MQ角度考慮,一般有三種途徑:
1,生產者不丟數據
2,MQ服務器不丟數據
3,消費者不丟數據
保證消息不丟失有兩種實現方式:
1,開啟事務模式
2,消息確認模式
說明:開啟事務會大幅降低消息發送及接收效率,使用的相對較少,因此我們生產環境一般都采取消息確認模式,以下我們只是講解消息確認模式

1.消息確認

消息持久化
如果希望RabbitMQ重啟之后消息不丟失,那么需要對以下3種實體均配置持久化
Exchange
聲明exchange時設置持久化(durable = true)并且不自動刪除(autoDelete = false)
Queue
聲明queue時設置持久化(durable = true)并且不自動刪除(autoDelete = false)
message
發送消息時通過設置deliveryMode=2持久化消息

處理消息隊列丟數據的情況,一般是開啟持久化磁盤的配置。這個持久化配置可以和confirm機制配合使用,你可以在消息持久化磁盤后,再給生產者發送一個Ack信號。這樣,如果消息持久化磁盤之前,rabbitMQ陣亡了,那么生產者收不到Ack信號,生產者會自動重發。那么如何持久化呢,其實也很容易,就下面兩步:
1、將queue的持久化標識durable設置為true,則代表是一個持久的隊列
2、發送消息的時候將deliveryMode=2
這樣設置以后,rabbitMQ就算掛了,重啟后也能恢復數據

發送確認
有時,業務處理成功,消息也發了,但是我們并不知道消息是否成功到達了rabbitmq,如果由于網絡等原因導致業務成功而消息發送失敗,那么發送方將出現不一致的問題,此時可以使用rabbitmq的發送確認功能,即要求rabbitmq顯式告知我們消息是否已成功發送。

手動消費確認
有時,消息被正確投遞到消費方,但是消費方處理失敗,那么便會出現消費方的不一致問題。比如:訂單已創建的消息發送到用戶積分子系統中用于增加用戶積分,但是積分消費方處理卻都失敗了,用戶就會問:我購買了東西為什么積分并沒有增加呢?
要解決這個問題,需要引入消費方確認,即只有消息被成功處理之后才告知rabbitmq以ack,否則告知rabbitmq以nack

2.消息確認業務封裝

service-mq修改配置
開啟rabbitmq消息確認配置,在common的配置文件中都已經配置好了!

spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated  #交換機的確認publisher-returns: true  #隊列的確認listener:simple:acknowledge-mode: manual #默認情況下消息消費者是自動確認消息的,如果要手動確認消息則需要修改確認模式為manualprefetch: 1 # 消費者每次從隊列獲取的消息數量。此屬性當不設置時為:輪詢分發,設置為1為:公平分發

搭建rabbit-util模塊
由于消息隊列是公共模塊,我們把mq的相關業務封裝到該模塊,其他service微服務模塊都可能使用,因此我們把他封裝到一個單獨的模塊,需要使用mq的模塊直接引用該模塊即可
搭建方式如:
pom.xml

    <dependencies><!--rabbitmq消息隊列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 協議--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies>

4.2.4 封裝發送端消息確認

/*** @Description 消息發送確認* <p>* ConfirmCallback  只確認消息是否正確到達 Exchange 中* ReturnCallback   消息沒有正確到達隊列時觸發回調,如果正確到達隊列不執行* <p>* 1. 如果消息沒有到exchange,則confirm回調,ack=false* 2. 如果消息到達exchange,則confirm回調,ack=true* 3. exchange到queue成功,則不回調return* 4. exchange到queue失敗,則回調return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修飾一個非靜態的void()方法,在服務器加載Servlet的時候運行,并且只會被服務器執行一次在構造函數之后執行,init()方法之前執行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息發送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息發送失敗:" + cause + " 數據:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化對象輸出System.out.println("消息主體: " + new String(message.getBody()));System.out.println("應答碼: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交換器 exchange : " + exchange);System.out.println("消息使用的路由鍵 routing : " + routingKey);}}

封裝消息發送

@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  發送消息* @param exchange 交換機* @param routingKey 路由鍵* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

2.1 發送確認消息測試

消息發送端

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息發送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "來人了,開始接客吧!");return Result.ok();}
}

消息接收端

@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// false 確認一個消息,true 批量確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

測試:http://localhost:8282/mq/sendConfirm

2.2 消息發送失敗,設置重發機制

實現思路:借助redis來實現重發機制
模塊中添加依賴

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定義一個實體類來接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主體private Object message;//  交換機private String exchange;//  路由鍵private String routingKey;//  重試次數private int retryCount = 0;//  消息類型  是否是延遲消息private boolean isDelay = false;//  延遲時間private int delayTime = 10;
}

修改發送方法

//  封裝一個發送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  將發送的消息 賦值到 自定義的實體類GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  聲明一個correlationId的變量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  發送消息的時候,將這個gmallCorrelationData 對象放入緩存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  調用發送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默認返回truereturn true;
}發送失敗調用重發方法  MQProducerAckConfig 類中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 說明消息正確發送到了交換機if (ack){System.out.println("哥們你來了.");log.info("消息發送到了交換機");}else {//  消息沒有到交換機log.info("消息沒發送到交換機");//  調用重試發送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主體: " + new String(message.getBody()));System.out.println("應答碼: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交換器 exchange : " + exchange);System.out.println("消息使用的路由鍵 routing : " + routingKey);//  獲取這個CorrelationData對象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因為在發送消息的時候,已經將數據存儲到緩存,通過 correlationDataId 來獲取緩存的數據String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息沒有到隊列的時候,則會調用重試發送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  調用方法  gmallCorrelationData 這對象中,至少的有,交換機,路由鍵,消息等內容.this.retrySendMsg(gmallCorrelationData);
}/*** 重試發送方法* @param correlationData   父類對象  它下面還有個子類對象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  數據類型轉換  統一轉換為子類處理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  獲取到重試次數 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判斷if (retryCount>=3){//  不需要重試了log.error("重試次數已到,發送消息失敗:"+JSON.toJSONString(gmallCorrelationData));} else {//  變量更新retryCount+=1;//  重新賦值重試次數 第一次重試 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重試次數:\t"+retryCount);//  更新緩存中的數據this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  調用發送消息方法 表示發送普通消息  發送消息的時候,不能調用 new RabbitService().sendMsg() 這個方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

測試:只需修改(錯誤信息)
在這里插入圖片描述
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/40151.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/40151.shtml
英文地址,請注明出處:http://en.pswp.cn/news/40151.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

設計HTML5列表和超鏈接

在網頁中&#xff0c;大部分信息都是列表結構&#xff0c;如菜單欄、圖文列表、分類導航、新聞列表、欄目列表等。HTML5定義了一套列表標簽&#xff0c;通過列表結構實現對網頁信息的合理排版。另外&#xff0c;網頁中還包含大量超鏈接&#xff0c;通過它實現網頁、位置的跳轉&…

C語言“牽手”微店商品詳情數據方法,微店商品詳情API接口申請指南

微店平臺的商品詳情通常包括以下信息&#xff1a; 商品名稱&#xff1a;展示商品的名稱&#xff0c;用于描述商品的特性和分類。 商品圖片&#xff1a;展示商品的圖片&#xff0c;可以有多張圖片以展示不同角度和細節。 商品價格&#xff1a;顯示商品的銷售價格&#xff0c;可…

nodejs服務后臺持續運行三種方法

nodejs服務后臺持續運行三種方法 一、利用 forever 推薦 forever是一個nodejs守護進程&#xff0c;完全由命令行操控。forever會監控nodejs服務&#xff0c;并在服務掛掉后進行重啟。 1、安裝 forever npm install forever -g 2、啟動服務 service forever start 3、使用…

小程序CSS button按鈕自定義高度之后不居中

問題&#xff1a; 按鈕設置高度后不居中 <view><button class"btn1" size"">Save</button> </view> page {font-size: 30rpx; }.btn1 {margin-top: 100rpx;height: 190rpx;background: linear-gradient(90deg, #FF8A06, #FF571…

Wi-Fi 安全在學校中的重要性

Wi-Fi 是教育機構的基礎設施&#xff0c;從在線家庭作業門戶到虛擬教師會議&#xff0c;應有盡有。大多數 K-12 管理員對自己的 Wi-Fi 網絡的安全性充滿信心&#xff0c;并認為他們現有的網絡安全措施已經足夠。 不幸的是&#xff0c;這種信心往往是錯誤的。Wi-Fi 安全雖然經常…

【數據結構OJ題】鏈表中倒數第k個結點

原題鏈接&#xff1a;https://www.nowcoder.com/practice/529d3ae5a407492994ad2a246518148a?tpId13&&tqId11167&rp2&ru/activity/oj&qru/ta/coding-interviews/question-ranking 目錄 1. 題目描述 2. 思路分析 3. 代碼實現 1. 題目描述 2. 思路分析 …

VectorStyler for Mac: 讓你的創意無限綻放的全新設計工具

VectorStyler for Mac是一款專為Mac用戶打造的矢量設計工具&#xff0c;它結合了功能強大的矢量編輯器和創意無限的樣式編輯器&#xff0c;讓你的創意無限綻放。 VectorStyler for Mac擁有直觀簡潔的用戶界面&#xff0c;讓你能夠輕松上手。它提供了豐富的矢量繪圖工具&#x…

JavaWeb博客項目--推薦算法--完整代碼及思路

基于用戶的協同過濾算法&#xff08;UserCF&#xff09; 因為我寫的是博客項目&#xff0c;博客數量可能比用戶數量還多 所以選擇基于用戶的協同過濾算法 重要思想 當要向用戶u進行推薦時&#xff0c;我們先找出與用戶u最相似的幾個用戶&#xff0c;再從這幾個用戶的喜歡的物…

數據可視化和數字孿生相互促進的關系

數據可視化和數字孿生是當今數字化時代中備受關注的兩大領域&#xff0c;它們在不同層面和領域為我們提供了深入洞察和智能決策的機會&#xff0c;隨著兩種技術的不斷融合發展&#xff0c;很多人會將他們聯系在一起&#xff0c;本文就帶大家淺談一下二者之間相愛相殺的關系。 …

Springboot集成ip2region離線IP地名映射-修訂版

title: Springboot集成ip2region離線IP地名映射 date: 2020-12-16 11:15:34 categories: springboot description: Springboot集成ip2region離線IP地名映射 1. 背景2. 集成 2.1. 步驟2.2. 樣例2.3. 響應實例DataBlock2.4. 響應實例RegionAddress 3. 打開瀏覽器4. 源碼地址&…

OpenShift 4 - 基于 MinIO 安裝 Red Hat Quay 鏡像倉庫

《OpenShift / RHEL / DevSecOps 匯總目錄》 說明&#xff1a;本文已經在 OpenShift 4.13 Quay 3.9 的環境中驗證 本文適合在單機 OpenShift 環境安裝 Red Hat Quay 鏡像倉庫。 另外《OpenShift 4 - 安裝 ODF 并部署紅帽 Quay (1 Worker)》也可以在單節點部署。 而《OpenShif…

前后端分離------后端創建筆記(11)用戶刪除

B站視頻&#xff1a;30-用戶刪除&結束語_嗶哩嗶哩_bilibili 1、現在我們要做一個刪除的功能 1.1 首先做一個刪除的功能接口&#xff0c;第一步先來到后端&#xff0c;做一個刪除的接口 2、刪除我們用Delete請求 3、方法名我給他改一下 3.1這里給他調一下刪除方法&#xf…

Java 中 List 集合排序方法

方式一&#xff1a; 調用List接口自己的sort方法排序 public static void main(String[] args) {List<Integer> numListnew ArrayList<>();numList.add(999);numList.add(123);numList.add(456);numList.add(66);numList.add(9);Collections.sort(numList); //使…

在一小時內構建您的深度學習應用程序

一、說明 我已經做了將近十年的數據分析。有時&#xff0c;我使用機器學習技術從數據中獲取見解&#xff0c;并且我習慣于使用經典 ML。 雖然我已經通過了神經網絡和深度學習的一些MOOC&#xff0c;但我從未在我的工作中使用過它們&#xff0c;這個領域對我來說似乎很有挑戰性。…

【Leetcode】91.解碼方法

一、題目 1、題目描述 一條包含字母 A-Z 的消息通過以下映射進行了 編碼 : A -> "1" B -> "2" ... Z -> "26"要 解碼 已編碼的消息,所有數字必須基于上述映射的方法,反向映射回字母(可能有多種方法)。例如,"11106" …

智能數據建模軟件DTEmpower 2023R2新版本功能介紹

DTEmpower是由天洑軟件自主研發的一款通用的智能數據建模軟件&#xff0c;致力于幫助工程師及工科專業學生&#xff0c;利用工業領域中的仿真、試驗、測量等各類數據進行挖掘分析&#xff0c;建立高質量的數據模型&#xff0c;實現快速設計評估、實時仿真預測、系統參數預警、設…

機器學習深度學習——自注意力和位置編碼(數學推導+代碼實現)

&#x1f468;?&#x1f393;作者簡介&#xff1a;一位即將上大四&#xff0c;正專攻機器學習的保研er &#x1f30c;上期文章&#xff1a;機器學習&&深度學習——注意力分數&#xff08;詳細數學推導代碼實現&#xff09; &#x1f4da;訂閱專欄&#xff1a;機器學習…

Cat(2):下載與安裝

1 github源碼下載 要安裝CAT&#xff0c;首先需要從github上下載最新版本的源碼。 官方給出的建議如下&#xff1a; 注意cat的3.0代碼分支更新都發布在master上&#xff0c;包括最新文檔也都是這個分支注意文檔請用最新master里面的代碼文檔作為標準&#xff0c;一些開源網站…

node.js內置模塊fs,path,http使用方法

NodeJs中分為兩部分 一是V8引擎為了解析和執行JS代碼。 二是內置API&#xff0c;讓JS能調用這些API完成一些后端操作。 內置API模塊(fs、path、http等) 第三方API模塊(express、mysql等) fs模塊 fs.readFile()方法&#xff0c;用于讀取指定文件中的內容。 fs.writeFile()方…