RabbitMQ重復消費如何解決

消息重復消費的原因

  1. 生產者重試:網絡波動導致生產者未收到 Broker 確認,重復發送消息。
  2. 消費者失敗:消費者處理消息后未發送 ACK,消息重新入隊。
  3. 集群故障轉移:主節點宕機,未確認消息被重新投遞。

解決方案

1. 消費者冪等性設計

原理:確保同一消息多次處理的結果與一次處理相同。

實現方式
  • 數據庫唯一約束
    利用業務字段(如訂單號)的唯一性約束,避免重復插入數據。

    CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 唯一訂單號amount DECIMAL(10,2)
    );
    
    // Java 示例(使用 MyBatis)
    public void processOrder(Order order) {try {orderMapper.insert(order); // 唯一約束沖突時會拋出異常// 業務邏輯...} catch (DuplicateKeyException e) {// 已處理過該訂單,直接跳過log.warn("訂單已存在: {}", order.getId());}
    }
    
  • Redis 原子操作
    使用 Redis 記錄已處理消息的 ID,通過 SETNX 命令實現原子性檢查。

    // Java 示例(使用 Spring Data Redis)
    public boolean isMessageProcessed(String messageId) {Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));return Boolean.TRUE.equals(result);
    }public void consumeMessage(Message message) {String messageId = message.getMessageId();if (!isMessageProcessed(messageId)) {// 已處理過,直接返回return;}// 業務邏輯...
    }
    

2. 消息全局唯一 ID

原理:為每條消息分配唯一 ID,消費者記錄已處理 ID。

實現步驟
  1. 生產者端:發送消息時附加唯一 ID。

    // Java 示例(使用 RabbitTemplate)
    public void sendOrder(Order order) {String messageId = UUID.randomUUID().toString();Message message = MessageBuilder.withBody(order.toJson().getBytes()).setHeader("messageId", messageId).build();rabbitTemplate.send("order.exchange", "order.key", message);
    }
    
  2. 消費者端:處理前檢查 ID 是否已存在。

    // Java 示例(使用 @RabbitListener)
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (redisTemplate.hasKey("processed:" + messageId)) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}// 業務邏輯...redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));channel.basicAck(deliveryTag, false);
    }
    

3. 手動確認模式(Manual ACK)

原理:消費者處理完消息后手動發送 ACK,避免消息因異常重新入隊。

配置與代碼
  1. 配置手動 ACK(Spring Boot):

    spring:rabbitmq:listener:simple:acknowledge-mode: manual
    
  2. 消費者邏輯

    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 業務邏輯...channel.basicAck(deliveryTag, false); // 確認消息} catch (Exception e) {channel.basicNack(deliveryTag, false, true); // 重入隊列}
    }
    

4. 消息去重表

原理:在數據庫中維護一張去重表,記錄已處理的消息 ID。

表結構
CREATE TABLE message_dedup (message_id VARCHAR(128) PRIMARY KEY,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消費者邏輯
public void consumeMessage(Message message) {String messageId = extractMessageId(message);try {jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);// 業務邏輯...} catch (DuplicateKeyException e) {// 消息已處理,直接ACKchannel.basicAck(deliveryTag, false);}
}

5. 消息過期與死信隊列

原理:設置消息 TTL,超時未處理則轉入死信隊列,避免無限重試。

配置隊列 TTL 和死信交換
// Java 配置示例
@Bean
public Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 消息60秒過期args.put("x-dead-letter-exchange", "dlx.exchange");args.put("x-dead-letter-routing-key", "dlx.key");return new Queue("order.queue", true, false, false, args);
}@Bean
public DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange");
}@Bean
public Queue dlxQueue() {return new Queue("dlx.queue");
}@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}

方案對比與選型

方案優點缺點適用場景
數據庫唯一約束無需額外組件高并發下數據庫壓力大低頻業務(如訂單創建)
Redis 原子操作高性能需維護 Redis 高可用高頻業務(如支付回調)
手動ACK避免消息丟失需處理ACK異常所有需要可靠消費的場景
消息去重表數據持久化增加數據庫寫入壓力數據一致性要求高的場景
死信隊列避免消息堆積需額外處理死信消息需要異常消息兜底的場景

總結

  • 冪等性設計是核心:無論消息重復多少次,業務結果保持一致。
  • 組合使用多種方案:例如“手動ACK + Redis去重”兼顧可靠性與性能。
  • 監控與告警:通過 RabbitMQ 管理界面監控消息積壓情況,設置閾值告警。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/73096.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/73096.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/73096.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Node-RED基礎1

目錄 一、概述二、安裝三、基操四、通訊五、數據六、節點七、 應用END 一、概述 Rode-Red是什么&#xff1f; 基于Node.js的物聯網開發工具&#xff0c;做API、通訊&#xff1b;提供了一些基本的監控功能&#xff0c;可在編輯器界面中查看節點的運行狀態、消息流量等信息。通…

java登神之階之順序表

一、了解List接口 在Java中&#xff0c;List接口是一個非常重要的集合框架接口&#xff0c;它繼承自Collection接口&#xff08;Collection接口繼承Iterable接口&#xff09;。List接口定義了一個有序集合&#xff0c;允許我們存儲元素集合。并且可以根據元素的索引來訪問集合中…

redux_舊版本

reduxjs/toolkit&#xff08;RTK&#xff09;是 Redux 官方團隊推出的一個工具集&#xff0c;旨在簡化 Redux 的使用和配置。它于 2019 年 10 月 正式發布&#xff0c;此文章記錄一下redux的舊版本如何使用&#xff0c;以及引入等等。 文件目錄如下&#xff1a; 步驟 安裝依…

MySQL:SQL優化實際案例解析(持續更新)

文章目錄 一、MySQL&#xff1a;SQL優化1、時間格式化問題&#xff08;字符串&#xff09;2、in/inner join的問題 一、MySQL&#xff1a;SQL優化 1、時間格式化問題&#xff08;字符串&#xff09; -- 優化前 SELECT * FROM test_table WHERE date_format( begin_time, %Y-%…

【含文檔+PPT+源碼】基于Python的美食數據的設計與實現

項目介紹 本課程演示的是一款基于Python的美食數據分析系統&#xff0c;主要針對計算機相關專業的正在做畢設的學生與需要項目實戰練習的 Java 學習者。 包含&#xff1a;項目源碼、項目文檔、數據庫腳本、軟件工具等所有資料 帶你從零開始部署運行本套系統 該項目附帶的源碼…

vue調整表格樣式之深度修改

舉例&#xff1a; <div class"grid-item"><h3>日數據</h3><el-table :data"dailyData" v-loading"loading"><el-table-column label"銷售姓名" align"center" prop"salesName" />…

【Go每日一練】統計字符出現的次數

&#x1f47b;創作者&#xff1a;丶重明 &#x1f47b;創作時間&#xff1a;2025年3月9日 &#x1f47b;擅長領域&#xff1a;運維 目錄 1.&#x1f636;?&#x1f32b;?題目&#xff1a;統計字符出現的次數2.&#x1f636;?&#x1f32b;?代碼中可用的資源3.&#x1f636;…

uniapp在APP平臺(Android/iOS)選擇非媒體文件

TOC 背景 在我們APP開發過程中&#xff0c;經常會有這樣一個需求場景&#xff1a;從手機中選擇文件然后進行上傳&#xff0c;這些文件主要分為兩類&#xff0c;媒體文件和非媒體文件。而媒體文件選擇在APP平臺我們可以使用uni.chooseImage和uni.chooseVideo這兩個API來實現。…

【eNSP實戰】配置交換機端口安全

拓撲圖 目的&#xff1a;讓交換機端口與主機mac綁定&#xff0c;防止私接主機。 主機PC配置不展示&#xff0c;按照圖中配置即可。 開始配置之前&#xff0c;使用PC1 ping 一遍PC2、PC3、PC4、PC5&#xff0c;讓交換機mac地址表刷新一下記錄。 LSW1查看mac地址表 LSW1配置端…

卡爾曼濾波算法從理論到實踐:在STM32中的嵌入式實現

摘要&#xff1a;卡爾曼濾波&#xff08;Kalman Filter&#xff09;是傳感器數據融合領域的經典算法&#xff0c;在姿態解算、導航定位等嵌入式場景中廣泛應用。本文將從公式推導、代碼實現、參數調試三個維度深入解析卡爾曼濾波&#xff0c;并給出基于STM32硬件的完整工程案例…

Redis----大key、熱key解決方案、腦裂問題

文章中相關知識點在往期已經更新過了&#xff0c;如果有友友不理解可翻看往期內容 出現腦裂問題怎么保證集群還是高可用的 什么是腦裂問題 腦裂說的就是當我們的主節點沒有掛&#xff0c;但是因為網絡延遲較大&#xff0c;然后和主節點相連的哨兵通信較差&#xff0c;之后主…

python總結(3)

創建自定義類 終于要創建自定義類了!下面是一個簡單的示例: class Person:def set_name(self, name):self.name namedef get_name(self):return self.namedef greet(self):print("Hello, world! Im {}.".format(self.name))這個示例包含三個方法定義&#xff0c;它…

word畢業論文“et al.”替換為“等”——宏

Sub 中文參考文獻改等()中文參考文獻改等 宏Selection.Find.ClearFormattingSelection.Find.Replacement.ClearFormattingWith Selection.Find.Text "([一-龥], )et al.".Replacement.Text "\1等.".Forward True.Wrap wdFindContinue.Format False.Ma…

網絡編程-----服務器(多路復用IO 和 TCP并發模型)

一、單循環服務器模型 1. 核心特征 while(1){newfd accept();recv();close(newfd);}2. 典型應用場景 HTTP短連接服務&#xff08;早期Apache&#xff09;CGI快速處理簡單測試服務器 3. 綜合代碼 #include <stdio.h> #include <sys/types.h> /* See NO…

typora高亮方案+鼠標側鍵一鍵改色

引言 在typora里面有一個自定義的高亮, <mark></mark>>但是單一顏色就太難看了, 我使用人工智能, 搜索全網藝術家, 匯集了幾種好看的格式,并且方便大家側鍵一鍵 調用, 是不是太方便啦 ! 示例 午夜模式 春意盎然 深海藍調 石墨文檔 秋日暖陽 蜜桃宣言 使用方法 …

swift -(5) 匯編分析結構體、類的內存布局

一、結構體 在 Swift 標準庫中&#xff0c;絕大多數的公開類型都是結構體&#xff0c;而枚舉和類只占很小一部分 比如Bool、 Int、 Double、 String、 Array、 Dictionary等常見類型都是結構體 ① struct Date { ② var year: Int ③ var month: Int ④ …

從零開始學機器學習——初探分類器

首先給大家介紹一個很好用的學習地址:https://cloudstudio.net/columns 在上一節中,我們已經完成了對所有經過均衡和清洗的菜品數據的準備工作。接下來,我們將考慮使用多種算法,也就是不同的分類器,來構建模型。我們將利用這個數據集和各種分類器,基于一組特定的配料,預…

黑色RGB是什么

>好的&#xff0c;用戶現在問的是“黑色rgb是什么”。我需要先理解用戶的問題。之前用戶在使用MATLAB調整網格線顏色時&#xff0c;可能看到了默認顏色是黑色&#xff0c;或者之前我提到過默認顏色是[0.15 0.15 0.15]&#xff0c;而用戶可能現在想知道黑色的RGB值具體是什么…

做到哪一步才算精通SQL

做到哪一步才算精通SQL-Structured Query Language 數據定義語言 DDL for StructCREATE&#xff1a;用來創建數據庫、表、索引等對象ALTER&#xff1a;用來修改已存在的數據庫對象DROP&#xff1a;用來刪除整個數據庫或者數據庫中的表TRUNCATE&#xff1a;用來刪除表中所有的行…

《深度解析DeepSeek-M8:量子經典融合,重塑計算能效格局》

在科技飛速發展的今天&#xff0c;量子計算與經典算法的融合成為了前沿領域的焦點。DeepSeek-M8的“量子神經網絡混合架構”&#xff0c;宛如一把鑰匙&#xff0c;開啟了經典算法與量子計算協同推理的全新大門&#xff0c;為諸多復雜問題的解決提供了前所未有的思路。 量子計算…