一、冪等性實現
1.1 什么是冪等性?
冪等性是指同一條消息無論被消費多少次,業務結果都只生效一次,防止重復扣款、重復發貨等問題。
RabbitMQ 的投遞模式是“至少一次交付”(at-least-once delivery),如果消費者處理失敗或者沒有及時確認,消息會被多次投遞。如果業務本身不具備冪等性,就可能導致重復扣款、重復發貨等嚴重后果。
1.2 實現思路
RabbitMQ 只負責消息的可靠投遞,而不會記錄每條消息是否已經被成功消費。因此,需要由消費者端維護消費狀態,常見做法是借助 Redis 實現去重邏輯。
消息在生產階段應攜帶全局唯一的 message_id(例如訂單號:order:10010)。在消費邏輯中,先通過 Redis 的原子命令 SETNX 嘗試寫入該 message_id:①如果 SETNX返回1,表示第一次消費,可以處理;②如果返回0,表示已消費,直接忽略
?二、消息重放實現
在RabbitMQ中,ack和nack機制是保證可靠投遞、實現重放的關鍵。
2.1 ack和nack
如果你的消費邏輯里既沒有調用ack,也沒有調用nack,消息狀態會一直unacked。只要沒確認,就永遠不會刪除消息。
(1) ack
確認消息已被消費成功。當消費者調用:
ch.basic_ack(delivery_tag=method.delivery_tag)
RabbitMQ就會把消息從隊列里永久刪除。只要你ack了,這條消息就不可能再來了。
(2) nack
告訴RabbitMQ“我沒處理好”。有兩種方式:
# 發送nack并重入隊列
# RabbitMQ會立刻把消息放回隊列,再投遞給其他消費者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 發送nack不重入隊列
# 消息就會被丟棄(或者,如果綁定了死信隊列,就轉入死信隊列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
2.2 實現代碼
下方代碼實現了以下關鍵功能:
1. 消息通過 SETNX + EXPIRE 在 Redis 中寫入冪等標記,確保同一消息只會被一個消費者處理。
2. 如果標記已存在,判斷是“已完成”還是“正在處理”,分別選擇直接確認或稍后重試。
3. 業務處理成功后將標記更新為 done 并延長過期,表示消費已完成。
4. 如果處理失敗,刪除標記以便下次重新消費,并根據重試次數決定是否放棄或重試。
生產者代碼
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message" # 可以通過推送body = "fail message" 模擬消費異常properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
?消費者代碼
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)MAX_RETRY = 5def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"retry_key = f"retry:{message_id}"# 嘗試用SETNX寫入冪等標記result = r.setnx(redis_key, "processing")if not result:status = r.get(redis_key)if status and status.decode() == "done":# 已經處理過ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")else:# 正在處理,稍后重試ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)print(f"[!] Message {message_id} is being processed by another consumer.")return# SETNX成功,要設置過期時間,防止永久占用r.expire(redis_key, 300) # 300秒try:# 獲取重試次數retry_count = r.get(retry_key)if retry_count is None:retry_count = 0else:retry_count = int(retry_count)print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")# 模擬失敗if "fail" in body.decode():raise Exception("Simulated failure")# 業務邏輯# ...# 處理成功,改為done并延長過期r.set(redis_key, "done")r.expire(redis_key, 24*60*60)r.delete(retry_key)ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:retry_count += 1r.set(retry_key, retry_count)r.expire(retry_key, 24*60*60)print(f"[!] Error processing message (retry {retry_count}): {e}")# 失敗時刪除冪等標記,下次可以繼續處理r.delete(redis_key)if retry_count >= MAX_RETRY:ch.basic_ack(delivery_tag=method.delivery_tag)print("[!] Max retries reached, moving message to dead letter log.")else:ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()