RabbitMQ 消費冪等性與消息重放實現

一、冪等性實現

1.1 什么是冪等性?

冪等性是指同一條消息無論被消費多少次,業務結果都只生效一次,防止重復扣款、重復發貨等問題。

RabbitMQ 的投遞模式是“至少一次交付”(at-least-once delivery),如果消費者處理失敗或者沒有及時確認,消息會被多次投遞。如果業務本身不具備冪等性,就可能導致重復扣款、重復發貨等嚴重后果。

1.2 實現思路

RabbitMQ 只負責消息的可靠投遞,而不會記錄每條消息是否已經被成功消費。因此,需要由消費者端維護消費狀態,常見做法是借助 Redis 實現去重邏輯。

消息在生產階段應攜帶全局唯一的 message_id(例如訂單號:order:10010)。在消費邏輯中,先通過 Redis 的原子命令 SETNX 嘗試寫入該 message_id:①如果 SETNX返回1,表示第一次消費,可以處理;②如果返回0,表示已消費,直接忽略

?二、消息重放實現

在RabbitMQ中,ack和nack機制是保證可靠投遞、實現重放的關鍵。

2.1 ack和nack

如果你的消費邏輯里既沒有調用ack,也沒有調用nack,消息狀態會一直unacked。只要沒確認,就永遠不會刪除消息。

(1) ack

確認消息已被消費成功。當消費者調用:

ch.basic_ack(delivery_tag=method.delivery_tag)

RabbitMQ就會把消息從隊列里永久刪除。只要你ack了,這條消息就不可能再來了。

(2) nack

告訴RabbitMQ“我沒處理好”。有兩種方式:

# 發送nack并重入隊列
# RabbitMQ會立刻把消息放回隊列,再投遞給其他消費者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 發送nack不重入隊列
# 消息就會被丟棄(或者,如果綁定了死信隊列,就轉入死信隊列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

2.2 實現代碼

下方代碼實現了以下關鍵功能:

1. 消息通過 SETNX + EXPIRE 在 Redis 中寫入冪等標記,確保同一消息只會被一個消費者處理。
2. 如果標記已存在,判斷是“已完成”還是“正在處理”,分別選擇直接確認或稍后重試。
3. 業務處理成功后將標記更新為 done 并延長過期,表示消費已完成。
4. 如果處理失敗,刪除標記以便下次重新消費,并根據重試次數決定是否放棄或重試。

生產者代碼
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message"    # 可以通過推送body = "fail message" 模擬消費異常properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
?消費者代碼
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)MAX_RETRY = 5def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"retry_key = f"retry:{message_id}"# 嘗試用SETNX寫入冪等標記result = r.setnx(redis_key, "processing")if not result:status = r.get(redis_key)if status and status.decode() == "done":# 已經處理過ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")else:# 正在處理,稍后重試ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)print(f"[!] Message {message_id} is being processed by another consumer.")return# SETNX成功,要設置過期時間,防止永久占用r.expire(redis_key, 300)  # 300秒try:# 獲取重試次數retry_count = r.get(retry_key)if retry_count is None:retry_count = 0else:retry_count = int(retry_count)print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")# 模擬失敗if "fail" in body.decode():raise Exception("Simulated failure")# 業務邏輯# ...# 處理成功,改為done并延長過期r.set(redis_key, "done")r.expire(redis_key, 24*60*60)r.delete(retry_key)ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:retry_count += 1r.set(retry_key, retry_count)r.expire(retry_key, 24*60*60)print(f"[!] Error processing message (retry {retry_count}): {e}")# 失敗時刪除冪等標記,下次可以繼續處理r.delete(redis_key)if retry_count >= MAX_RETRY:ch.basic_ack(delivery_tag=method.delivery_tag)print("[!] Max retries reached, moving message to dead letter log.")else:ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89213.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89213.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89213.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【HarmonyOS 5】鴻蒙TEE(可信執行環境)詳解

【HarmonyOS 5】鴻蒙TEE(可信執行環境)詳解 一、TEE是什么? 1、TEE的定義: 可信執行環境(Trusted Execution Environment),簡稱TEE,是存在于智能手機、平板或任意移動設備主處理器…

算法: 冒泡排序

冒泡排序是一種簡單的排序算法,通過相鄰元素的比較和交換,使較大的元素逐漸"浮"到數組末尾。 時間復雜度:最佳 O(n) | 平均 O(n) | 最差 O(n) 空間復雜度:O(1) 穩定性:穩定 應用場景/前提條件 適用于小規模數據對幾乎已排序的數據效率較高…

基于SpringBoot的家電銷售展示平臺

源碼編號:S567 源碼名稱:基于SpringBoot的家電銷售展示平臺 用戶類型:雙角色,用戶、管理員 數據庫表數量:14 張表 主要技術:Java、Vue、ElementUl 、SpringBoot、Maven 運行環境:Windows/M…

java+vue+SpringBoo智慧旅游系統(程序+數據庫+報告+部署教程+答辯指導)

源代碼數據庫LW文檔(1萬字以上)開題報告答辯稿ppt部署教程代碼講解代碼時間修改工具 技術實現 開發語言:后端:Java 前端:vue框架:springboot數據庫:mysql 開發工具 JDK版本:JDK1.…

Docker 入門教程(三):鏡像操作命令

文章目錄 🐳 Docker 入門教程(三):鏡像操作命令獲取鏡像:docker pull查看鏡像:docker images刪除鏡像:docker rmi搜索鏡像:docker search鏡像打標簽:docker tag鏡像詳情與…

如何修改discuz文章標題字數限制 修改成255

在 Discuz! X3.5 中,文章(主題)標題字數的限制可以通過修改數據庫結構以及后臺配置來實現,以下是完整的修改方法,將標題長度限制改為 255 個字符: ? 一、修改數據庫字段長度 Discuz 默認標題字段是 subje…

基于BP神經網絡的26個英文字母識別

本課題旨在設計并實現一個基于BP(反向傳播)神經網絡的英文字母識別系統,實現對手寫或打印的26個英文字母(A-Z)的自動分類識別。項目首先對字母圖像進行預處理(如灰度化、歸一化、二值化和特征提取&#xff…

系統架構設計師論文分享-論云原生技術的應用

我的軟考歷程 摘要 2023年2月,我所在的公司做了開發紗線MES系統的決定,該系統為國內紗線工廠提供SAAS服務,旨在提高紗線工廠的智能化和數字化水平。我在該項目中被任命為系統架構設計師,全面掌管該項目的架構設計工作。該項目涉…

重置 MySQL root 密碼

引言 在linux可能存在安裝mysql安裝失敗,一直不出現默認密碼 /usr/local/mysql/mysql-8.0.26/bin/mysqld --defaults-file/etc/my.cnf --usermysql --basedir/usr/local/mysql/mysql-8.0.26 --datadir/usr/local/mysql/mysql-8.0.26/data --lower-case-table-name…

面試八股---HTML

面試八股 1、HTML 1.1 src和href的區別 src 用于替換當前元素,href 用于在當前文檔和引用資源之間確立聯系。 核心區別在于 href 關聯的資源(主要是 CSS)是用于描述頁面外觀的,瀏覽器可以先生成內容再應用樣式,因此…

氣候智能體:AI如何重構人類應對氣候危機的決策體系?

前言 前些天發現了一個巨牛的人工智能免費學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到網站 《氣候智能體:AI如何重構人類應對氣候危機的決策體系?》 展開全景式論述。文章結合2025年最新技術突破與…

UITableView的位置向下偏移, contentInsetAdjustmentBehavior使用詳情

一.contentInsetAdjustmentBehavior 作用: 在iOS 11及以后,蘋果引入了安全區域(Safe Area)的概念,當UITableView的frame超出了安全區域,系統會自定調整SafeAreaInsets的值,它可以自動調整內容的內邊距,使得內容不會被導航欄遮擋。…

騰訊云RayData全新推出“行業解決方案模板”,一鍵快捷制作3D數據可視化作品

點擊藍字? 關注我們 本文共計958字 預計閱讀時長3分鐘 騰訊云RayData Plus是一款專注于高視效的3D數據可視化的實時渲染工具。 功能全面:提供了三維、二維、動畫、數據、交互邏輯等各類能力; 零代碼制作:靈活的節點式創作,即便沒…

深度解析基于貝葉斯的垃圾郵件分類

貝葉斯垃圾郵件分類的核心邏輯是基于貝葉斯定理,利用郵件中的特征(通常是單詞)來計算該郵件屬于“垃圾郵件”或“非垃圾郵件”的概率,并根據概率大小進行分類。它是一種樸素貝葉斯分類器,因其假設特征(單詞…

WPF 3D 開發全攻略:實現3D模型創建、旋轉、平移、縮放

🎮 WPF 3D 入門實戰:從零打造一個可交互的立方體模型 標題: 🚀《WPF 3D 開發全攻略:實現旋轉、平移、縮放與法線顯示》 💡 引言 在現代圖形應用中,3D 可視化已經成為不可或缺的一部分。WPF 提供…

Ruby 安裝使用教程

一、Ruby 簡介 Ruby 是一種簡單快捷的面向對象腳本語言,以優雅、簡潔、易讀著稱。它常被用于 Web 開發(如 Ruby on Rails 框架)、自動化腳本、DevOps、命令行工具等領域。 二、Ruby 安裝教程 2.1 支持平臺 Ruby 支持跨平臺運行&#xff0c…

python | numpy小記(五):理解 NumPy 中的 `np.arccos`:反余弦函數

python | numpy小記(五):理解 NumPy 中的 np.arccos:反余弦函數 一、函數簽名與核心參數二、數學定義與取值范圍三、基礎使用示例四、與 Python 內建 math.acos 的對比五、常見問題與注意事項六、典型應用場景1. 三維向量夾角計算…

華為云Flexus+DeepSeek征文 | 華為云ModelArts與Reor的完美結合:創建高效本地AI筆記環境

華為云FlexusDeepSeek征文 | 華為云ModelArts與Reor的完美結合:創建高效本地AI筆記環境 引言一、ModelArts Studio平臺介紹華為云ModelArts Studio簡介ModelArts Studio主要特點 二、Reor介紹Reor簡介Reor主要特點 三、安裝Reor工具下載Reor軟件安裝Reor工具 四、開…

【啟發式算法】Dynamic A*(D*)算法詳細介紹(Python)

📢本篇文章是博主人工智能(AI)領域學習時,用于個人學習、研究或者欣賞使用,并基于博主對相關等領域的一些理解而記錄的學習摘錄和筆記,若有不當和侵權之處,指出后將會立即改正,還望諒…

報告怎么寫

替代方案(按場景選擇) 崗前準備階段 ? "熟悉業務流程/系統操作" ? "掌握XX工具/平臺的核心功能" ? "完成上崗前技術對接" 知識轉化場景 ? "梳理產品知識體系" ? "轉化技術文檔為實操方案" ? &…