要確保 RabbitMQ 在消費者(Python 服務)重啟或掛掉時消息不丟失,需結合 消息持久化、確認機制(ACK) 和 死信隊列(DLX) 實現高可靠性:
1. 消息持久化(Durability)
確保消息和隊列在 RabbitMQ 服務重啟后仍存在:
Java 發布者(設置持久化)
// 創建持久化隊列 + 持久化消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 聲明持久化隊列(durable=true)channel.queueDeclare("image_queue", true, false, false, null);// 發布持久化消息(deliveryMode=2)String message = "{\"task_id\": \"123\"}";channel.basicPublish("", "image_queue", new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build(),message.getBytes());System.out.println("消息已持久化發送");
}
Python 消費者(聲明持久化隊列)
channel.queue_declare(queue='image_queue', durable=True) # durable=True
2. 手動確認(Manual ACK)
消費者處理完消息后顯式發送 ACK,避免消息因崩潰丟失:
Python 消費者(關閉自動 ACK)
def callback(ch, method, properties, body):try:print(f"處理消息: {body.decode()}")# 業務邏輯...ch.basic_ack(delivery_tag=method.delivery_tag) # 手動確認except Exception as e:print(f"處理失敗: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 重新入隊channel.basic_consume(queue='image_queue', on_message_callback=callback, auto_ack=False) # 關閉自動ACK
關鍵點:
auto_ack=False
:必須關閉自動確認。- 處理成功后調用
basic_ack
,失敗時basic_nack
并重新入隊。
3. 死信隊列(DLX)
處理因消費者崩潰或消息超時未被確認的消息:
Java 發布者(聲明死信交換機和隊列)
// 定義死信交換機和隊列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交換機
args.put("x-message-ttl", 60000); // 消息存活時間(可選)channel.queueDeclare("image_queue", true, false, false, args);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "");
Python 消費者(監聽死信隊列)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.basic_consume(queue='dlx_queue', on_message_callback=handle_dlx_message, auto_ack=False)
作用:
- 消息超過 TTL 或消費者拒絕時,自動路由到死信隊列。
- 可對死信消息進行補償處理(如重試或記錄日志)。
4. 消費者高可用(HA)
確保消費者服務崩潰后能自動恢復:
方案 1:進程守護(如 systemd)
# /etc/systemd/system/python_consumer.service
[Unit]
Description=Python RabbitMQ Consumer
After=network.target[Service]
User=root
WorkingDirectory=/opt/app
ExecStart=/usr/bin/python3 /opt/app/consumer.py
Restart=always # 崩潰后自動重啟[Install]
WantedBy=multi-user.target
方案 2:容器化(Docker)
# Dockerfile
FROM python:3.9
COPY consumer.py /app/
CMD ["python", "/app/consumer.py"]
docker run -d --restart=unless-stopped my_consumer
5. 消息補償機制(可選)
極端情況下(如 RabbitMQ 崩潰),可通過數據庫記錄消息狀態:
Java 發布者(本地事務)
// 偽代碼:本地事務
try {saveToDatabase(task); // 1. 先存數據庫sendToRabbitMQ(task); // 2. 再發消息commitTransaction();
} catch (Exception e) {rollbackTransaction();// 定時任務掃描數據庫補償發送
}
完整流程圖
最佳實踐總結
措施 | 實現方式 | 作用 |
---|---|---|
消息持久化 | deliveryMode=2 + queueDeclare(durable=true) | 防止 RabbitMQ 重啟丟失消息 |
手動 ACK | auto_ack=false + basic_ack() /basic_nack() | 確保消息處理完成才刪除 |
死信隊列(DLX) | x-dead-letter-exchange + 獨立死信消費者 | 處理超時或失敗消息 |
消費者高可用 | systemd Restart=always 或 docker --restart=unless-stopped | 消費者崩潰后自動恢復 |
消息補償 | 數據庫記錄 + 定時任務掃描 | 極端情況下的兜底措施 |
通過以上組合方案,可確保消息在消費者崩潰、重啟或 RabbitMQ 異常時不丟失、不重復、可恢復。