RabbitMQ 是一個 消息隊列中間件(Message Broker),實現了 AMQP 協議,常用于服務之間解耦、異步處理、流量削峰等場景。
我幫你分成兩個部分來講:核心原理 + 常見用法。
🧩 一、核心原理
RabbitMQ 的核心是 生產者(Producer) → 交換機(Exchange) → 隊列(Queue) → 消費者(Consumer) 這一條鏈路。
1. 基礎概念
- Producer:消息的發送方。
- Consumer:消息的接收方。
- Queue:存儲消息的隊列。
- Exchange(交換機):接收生產者的消息,按照規則路由到隊列。
- Binding:交換機和隊列之間的綁定規則。
- Routing Key:消息的“路由標識”,交換機根據它決定發給哪個隊列。
- Broker:RabbitMQ 服務本身。
2. 交換機類型(Exchange Types)
RabbitMQ 支持幾種路由模式:
- direct:按 routing key 精確匹配,把消息路由到對應隊列。
- fanout:廣播模式,消息發送到所有綁定的隊列。
- topic:模糊匹配路由(如
order.*
),支持通配符。 - headers:根據消息頭部的鍵值對匹配(較少用)。
3. ACK & 消息可靠性
- ACK:消費者處理完消息后確認(acknowledge)。
- 未 ACK:RabbitMQ 會重新投遞消息(保證至少一次投遞)。
- 持久化:隊列持久化 + 消息持久化,確保 RabbitMQ 重啟后消息不丟失。
- 死信隊列(DLX):處理無法投遞或過期的消息。
4. 工作模式
- 簡單隊列(Simple Queue):一個生產者 → 一個消費者。
- 工作隊列(Work Queue):一個生產者 → 多個消費者(任務分攤)。
- 發布訂閱(Pub/Sub):廣播消息(fanout exchange)。
- 路由模式(Routing):按 key 精確路由(direct exchange)。
- 主題模式(Topic):模糊匹配路由(topic exchange)。
🛠? 二、常見用法(Python 示例)
安裝依賴:
pip install pika
1. 簡單隊列
生產者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消費者:
import pikadef callback(ch, method, properties, body):print(f" [x] Received {body}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
2. 工作隊列(任務分攤)
- 一個生產者發送很多任務。
- 多個消費者并發消費,每個消息只會被一個消費者處理。
- 常用在 后臺任務處理。
設置 prefetch_count=1
可以讓 RabbitMQ 公平分發任務:
channel.basic_qos(prefetch_count=1)
3. 發布/訂閱(fanout)
生產者:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='log message')
消費者:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)
這樣消費者會自動生成臨時隊列,接收廣播。
4. 路由模式(direct)
生產者:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error log')
消費者:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
queue_name = channel.queue_declare(queue='', exclusive=True).method.queue
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')
🎯 總結
- RabbitMQ = 生產者 → 交換機 → 隊列 → 消費者。
- 交換機類型決定了 消息的路由方式(direct、fanout、topic、headers)。
- 可靠性依賴 ACK + 持久化 + 死信隊列。
- 常見場景:異步任務、削峰填谷、解耦微服務、日志廣播。
好問題 👍,這幾個是 RabbitMQ 保證消息可靠性 的關鍵機制。我幫你逐個拆開講:
🟢 1. ACK(消息確認機制)
- 默認行為:消費者從隊列里拿到消息后,RabbitMQ 就認為它“已消費”,會立即從隊列里刪除。
- 風險:如果消費者拿到消息后宕機/異常,消息就丟了。
👉 ACK 就是解決這個問題的機制:
-
自動 ACK (
auto_ack=True
)- 一旦消費者收到消息,就立刻確認,哪怕還沒處理完。
- 風險:消費者掛了,消息丟失。
-
手動 ACK (
auto_ack=False
)(推薦)- 消費者處理完任務后,再調用
channel.basic_ack()
確認。 - 如果消費者掛了,RabbitMQ 會把消息重新投遞給別的消費者。
- 消費者處理完任務后,再調用
例子:
def callback(ch, method, properties, body):print("處理消息:", body)# 處理完成后手動確認ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
🔑 作用:確保消息至少被處理一次,不會因為消費者掛掉而丟失。
🟢 2. 持久化(Persistence)
RabbitMQ 的數據默認存在內存里,服務一旦重啟,消息就沒了。
👉 持久化保證 RabbitMQ 重啟后消息不丟。
持久化分三層:
-
隊列持久化(聲明時加
durable=True
):channel.queue_declare(queue='task_queue', durable=True)
→ RabbitMQ 重啟后,這個隊列還在。
-
消息持久化(生產者發送時設置
delivery_mode=2
):channel.basic_publish(exchange='',routing_key='task_queue',body='Hello',properties=pika.BasicProperties(delivery_mode=2, # 2 表示持久化消息))
→ RabbitMQ 重啟后,消息仍然在隊列里。
-
交換機持久化(聲明時加
durable=True
)。
🔑 作用:保證即使 RabbitMQ 崩潰或重啟,消息不會丟失。
🟢 3. 死信隊列(Dead Letter Queue, DLQ)
當某些消息 無法被正常消費 時,RabbitMQ 可以把它們轉移到另一個隊列里(死信隊列),避免消息丟失。
死信隊列觸發的幾種情況:
- 消費者 拒絕消息(nack/reject) 且
requeue=False
。 - 消息在隊列里 過期(TTL 超時)。
- 隊列滿了,無法再接收新消息。
👉 配置死信隊列的方法:
args = {'x-dead-letter-exchange': 'dlx_exchange', # 指定死信交換機'x-dead-letter-routing-key': 'dlx_key' # 指定路由 key
}
channel.queue_declare(queue='task_queue', durable=True, arguments=args)
然后消息會被轉發到 死信隊列,便于后續人工排查或重試。
🔑 作用:防止消息丟失 & 提供兜底處理機制。
🎯 總結
- ACK:保證消費者掛掉時消息不會丟(至少投遞一次)。
- 持久化:保證 RabbitMQ 崩潰/重啟時消息不會丟。
- 死信隊列:保證異常消息有去處(過期/拒絕/無法投遞)。
這三個機制配合起來,RabbitMQ 就能實現 高可靠消息傳遞。
好問題 👍!RabbitMQ 里的 隊列滿了(或者說消息堆積過多)是一個常見的情況,處理思路分兩類:
🟢 1. 隊列為什么會滿?
隊列本質上是內存+磁盤結構,如果消費者消費不過來,就會導致消息積壓。幾種常見原因:
- 消費者處理能力不足(速度比不上生產者)。
- 沒有限制隊列長度,消息無限堆積。
- 消費者掛掉了,沒人消費。
- 某些消息過大,占滿內存/磁盤。
🟢 2. RabbitMQ 的應對機制
(1) 設置隊列最大長度/容量(防止無限堆積)
channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000, # 最大消息數'x-max-length-bytes': 10485760 # 最大字節數 (10MB)}
)
超過限制后,舊消息會被丟棄(FIFO),或者轉發到死信隊列(推薦)。
(2) 配置死信隊列(DLQ)
當隊列滿了時,新來的消息可以自動進入死信隊列:
channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000,'x-dead-letter-exchange': 'dlx_exchange','x-dead-letter-routing-key': 'dlx_key'}
)
👉 新消息進不來時,直接進入 DLQ,避免消息丟失。
(3) 限流(QoS)
消費者可以設置一次最多處理多少條消息,避免被“壓垮”:
channel.basic_qos(prefetch_count=1) # 一次只取 1 條,處理完再取
這樣 RabbitMQ 會 公平調度,不會把大量消息推給一個消費者。
(4) 水平擴展消費者
如果是消費能力不足,最直接的辦法就是:多開幾個消費者。
RabbitMQ 會按照 Round Robin(輪詢) 或 公平分發 把消息分配下去。
(5) 生產端限流 / 拒絕
RabbitMQ 本身不對生產者限流,但你可以在應用層做:
- 使用 發布確認(Publisher Confirms),如果消息積壓,可以選擇暫停生產。
- 用 消息速率控制(Rate Limit),比如令牌桶算法,減緩生產速度。
🟢 3. 總結
當隊列滿了,可以這樣處理:
- 預防堆積 → 設置
x-max-length
/x-max-length-bytes
。 - 兜底方案 → 配置死信隊列,把溢出的消息轉移出來。
- 消費優化 →
basic_qos
+ 增加消費者實例。 - 生產端調節 → 啟用發布確認,動態調整生產速度。
👉 最佳實踐:
- 設置合理的隊列長度 + 消息 TTL。
- 配死信隊列,確保不會無聲丟失。
- 消費端橫向擴展,必要時加緩存層(Kafka 更適合高吞吐)