rabbitmq 入門知識點

RabbitMQ 是一個 消息隊列中間件(Message Broker),實現了 AMQP 協議,常用于服務之間解耦、異步處理、流量削峰等場景。
我幫你分成兩個部分來講:核心原理 + 常見用法


🧩 一、核心原理

RabbitMQ 的核心是 生產者(Producer) → 交換機(Exchange) → 隊列(Queue) → 消費者(Consumer) 這一條鏈路。

1. 基礎概念

  • Producer:消息的發送方。
  • Consumer:消息的接收方。
  • Queue:存儲消息的隊列。
  • Exchange(交換機):接收生產者的消息,按照規則路由到隊列。
  • Binding:交換機和隊列之間的綁定規則。
  • Routing Key:消息的“路由標識”,交換機根據它決定發給哪個隊列。
  • Broker:RabbitMQ 服務本身。

2. 交換機類型(Exchange Types)

RabbitMQ 支持幾種路由模式:

  1. direct:按 routing key 精確匹配,把消息路由到對應隊列。
  2. fanout:廣播模式,消息發送到所有綁定的隊列。
  3. topic:模糊匹配路由(如 order.*),支持通配符。
  4. headers:根據消息頭部的鍵值對匹配(較少用)。

3. ACK & 消息可靠性

  • ACK:消費者處理完消息后確認(acknowledge)。
  • 未 ACK:RabbitMQ 會重新投遞消息(保證至少一次投遞)。
  • 持久化:隊列持久化 + 消息持久化,確保 RabbitMQ 重啟后消息不丟失。
  • 死信隊列(DLX):處理無法投遞或過期的消息。

4. 工作模式

  • 簡單隊列(Simple Queue):一個生產者 → 一個消費者。
  • 工作隊列(Work Queue):一個生產者 → 多個消費者(任務分攤)。
  • 發布訂閱(Pub/Sub):廣播消息(fanout exchange)。
  • 路由模式(Routing):按 key 精確路由(direct exchange)。
  • 主題模式(Topic):模糊匹配路由(topic exchange)。

🛠? 二、常見用法(Python 示例)

安裝依賴:

pip install pika

1. 簡單隊列

生產者:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消費者:

import pikadef callback(ch, method, properties, body):print(f" [x] Received {body}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2. 工作隊列(任務分攤)

  • 一個生產者發送很多任務。
  • 多個消費者并發消費,每個消息只會被一個消費者處理。
  • 常用在 后臺任務處理

設置 prefetch_count=1 可以讓 RabbitMQ 公平分發任務:

channel.basic_qos(prefetch_count=1)

3. 發布/訂閱(fanout)

生產者:

channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='log message')

消費者:

channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)

這樣消費者會自動生成臨時隊列,接收廣播。


4. 路由模式(direct)

生產者:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Error log')

消費者:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
queue_name = channel.queue_declare(queue='', exclusive=True).method.queue
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

🎯 總結

  • RabbitMQ = 生產者 → 交換機 → 隊列 → 消費者
  • 交換機類型決定了 消息的路由方式(direct、fanout、topic、headers)。
  • 可靠性依賴 ACK + 持久化 + 死信隊列
  • 常見場景:異步任務、削峰填谷、解耦微服務、日志廣播。

好問題 👍,這幾個是 RabbitMQ 保證消息可靠性 的關鍵機制。我幫你逐個拆開講:


🟢 1. ACK(消息確認機制)

  • 默認行為:消費者從隊列里拿到消息后,RabbitMQ 就認為它“已消費”,會立即從隊列里刪除。
  • 風險:如果消費者拿到消息后宕機/異常,消息就丟了。

👉 ACK 就是解決這個問題的機制

  • 自動 ACK (auto_ack=True)

    • 一旦消費者收到消息,就立刻確認,哪怕還沒處理完。
    • 風險:消費者掛了,消息丟失。
  • 手動 ACK (auto_ack=False)(推薦)

    • 消費者處理完任務后,再調用 channel.basic_ack() 確認。
    • 如果消費者掛了,RabbitMQ 會把消息重新投遞給別的消費者。

例子:

def callback(ch, method, properties, body):print("處理消息:", body)# 處理完成后手動確認ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

🔑 作用:確保消息至少被處理一次,不會因為消費者掛掉而丟失。


🟢 2. 持久化(Persistence)

RabbitMQ 的數據默認存在內存里,服務一旦重啟,消息就沒了。
👉 持久化保證 RabbitMQ 重啟后消息不丟

持久化分三層:

  1. 隊列持久化(聲明時加 durable=True):

    channel.queue_declare(queue='task_queue', durable=True)
    

    → RabbitMQ 重啟后,這個隊列還在。

  2. 消息持久化(生產者發送時設置 delivery_mode=2):

    channel.basic_publish(exchange='',routing_key='task_queue',body='Hello',properties=pika.BasicProperties(delivery_mode=2,  # 2 表示持久化消息))
    

    → RabbitMQ 重啟后,消息仍然在隊列里。

  3. 交換機持久化(聲明時加 durable=True)。

🔑 作用:保證即使 RabbitMQ 崩潰或重啟,消息不會丟失。


🟢 3. 死信隊列(Dead Letter Queue, DLQ)

當某些消息 無法被正常消費 時,RabbitMQ 可以把它們轉移到另一個隊列里(死信隊列),避免消息丟失。

死信隊列觸發的幾種情況:

  1. 消費者 拒絕消息(nack/reject)requeue=False
  2. 消息在隊列里 過期(TTL 超時)
  3. 隊列滿了,無法再接收新消息。

👉 配置死信隊列的方法:

args = {'x-dead-letter-exchange': 'dlx_exchange',  # 指定死信交換機'x-dead-letter-routing-key': 'dlx_key'     # 指定路由 key
}
channel.queue_declare(queue='task_queue', durable=True, arguments=args)

然后消息會被轉發到 死信隊列,便于后續人工排查或重試。

🔑 作用:防止消息丟失 & 提供兜底處理機制。


🎯 總結

  • ACK:保證消費者掛掉時消息不會丟(至少投遞一次)。
  • 持久化:保證 RabbitMQ 崩潰/重啟時消息不會丟。
  • 死信隊列:保證異常消息有去處(過期/拒絕/無法投遞)。

這三個機制配合起來,RabbitMQ 就能實現 高可靠消息傳遞

好問題 👍!RabbitMQ 里的 隊列滿了(或者說消息堆積過多)是一個常見的情況,處理思路分兩類:


🟢 1. 隊列為什么會滿?

隊列本質上是內存+磁盤結構,如果消費者消費不過來,就會導致消息積壓。幾種常見原因:

  • 消費者處理能力不足(速度比不上生產者)。
  • 沒有限制隊列長度,消息無限堆積。
  • 消費者掛掉了,沒人消費。
  • 某些消息過大,占滿內存/磁盤。

🟢 2. RabbitMQ 的應對機制

(1) 設置隊列最大長度/容量(防止無限堆積)

channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000,            # 最大消息數'x-max-length-bytes': 10485760   # 最大字節數 (10MB)}
)

超過限制后,舊消息會被丟棄(FIFO),或者轉發到死信隊列(推薦)。


(2) 配置死信隊列(DLQ)

當隊列滿了時,新來的消息可以自動進入死信隊列:

channel.queue_declare(queue='task_queue',durable=True,arguments={'x-max-length': 1000,'x-dead-letter-exchange': 'dlx_exchange','x-dead-letter-routing-key': 'dlx_key'}
)

👉 新消息進不來時,直接進入 DLQ,避免消息丟失。


(3) 限流(QoS)

消費者可以設置一次最多處理多少條消息,避免被“壓垮”:

channel.basic_qos(prefetch_count=1)  # 一次只取 1 條,處理完再取

這樣 RabbitMQ 會 公平調度,不會把大量消息推給一個消費者。


(4) 水平擴展消費者

如果是消費能力不足,最直接的辦法就是:多開幾個消費者
RabbitMQ 會按照 Round Robin(輪詢)公平分發 把消息分配下去。


(5) 生產端限流 / 拒絕

RabbitMQ 本身不對生產者限流,但你可以在應用層做:

  • 使用 發布確認(Publisher Confirms),如果消息積壓,可以選擇暫停生產。
  • 消息速率控制(Rate Limit),比如令牌桶算法,減緩生產速度。

🟢 3. 總結

當隊列滿了,可以這樣處理:

  1. 預防堆積 → 設置 x-max-length / x-max-length-bytes
  2. 兜底方案 → 配置死信隊列,把溢出的消息轉移出來。
  3. 消費優化basic_qos + 增加消費者實例。
  4. 生產端調節 → 啟用發布確認,動態調整生產速度。

👉 最佳實踐:

  • 設置合理的隊列長度 + 消息 TTL。
  • 配死信隊列,確保不會無聲丟失。
  • 消費端橫向擴展,必要時加緩存層(Kafka 更適合高吞吐)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/921329.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/921329.shtml
英文地址,請注明出處:http://en.pswp.cn/news/921329.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

點控云智能客服:以AI重塑服務體驗,登頂行業第一的革新之路

在數字化浪潮席卷全球的今天,客戶服務已成為企業核心競爭力之一。智能客服作為連接企業與客戶的重要橋梁,其效能與體驗直接關系到企業的品牌形象與市場口碑。近日,權威機構發布的《中國智能客服市場競爭力報告》顯示,點控云智能客…

9.5 IO-線程day5

信號量打印ABC#include <stdio.h> #include <string.h> #include <stdlib.h> #include <25061head.h> sem_t sem[1]; void *callback(void *arg) {while(1){sem_wait(&sem[0]);printf("A\n");sleep(1);sem_post(&sem[1]);}pthread_e…

老師如何高效收集學生學籍信息,完成收集工作?

開學的時光總是忙碌而充實&#xff0c;除了要熱情地迎接新生、用心地備課&#xff0c;還有一件讓人頭疼不已的事情——學生學籍信息的收集。上學期開學&#xff0c;我承擔起了收集班級新生信息的重任&#xff0c;滿心以為提前準備好的紙質表格&#xff0c;在新生報到那天發給家…

JAVA層的權限與SELinux的關系

Java 層權限是應用程序級別的“門禁卡”&#xff0c;而 SELinux 是系統級別的“防火墻規則和強制訪問控制”。即使你擁有進入大樓的“門禁卡”&#xff08;Java 權限&#xff09;&#xff0c;如果“防火墻規則”&#xff08;SELinux 策略&#xff09;不允許你的進程與目標服務或…

Screen 三步上手

好的&#xff0c;這是給同事的簡潔版說明&#xff1a;Screen 三步上手 開新窗口&#xff1a;干活前先開個帶名字的窗口&#xff0c;不怕斷連。 screen -S 任務名看所有窗口&#xff1a;隨時查看都有哪些任務在后臺跑。 screen -ls重回窗口&#xff1a;斷連后重新登錄&#xff0…

flink 偽代碼

import java.util.*; import java.util.concurrent.*;// 核心接口定義 interface StreamOperator {void open();void processElement(Object element);void close(); }interface SourceFunction extends StreamOperator {void run(SourceContext ctx); }interface SinkFunction…

一招快速識別你的電腦是機械硬盤還是固態硬盤

你是否經常覺得電腦開機慢、軟件打開卡頓&#xff1f;其中一個關鍵原因&#xff0c;可能就在于你使用的是機械硬盤&#xff08;HDD&#xff09;還是固態硬盤&#xff08;SSD&#xff09;。固態硬盤讀寫速度快&#xff0c;能顯著提升系統響應速度&#xff1b;而機械硬盤雖然容量…

52核心52線程,Intel下一代CPU憋了個大的

被逼急了的 Intel&#xff0c;可能正在憋大招&#xff01;如大伙兒所見&#xff0c;Intel 這兩年日子已經不能用「慘」來形容。其過去引以為傲的 PC 處理器&#xff0c;特別是高性能桌面處理器領域&#xff0c;如今算是徹底被 AMD 打懵了。無他&#xff0c;己方產品是連年擺爛&…

【LeetCode 熱題 100】1. 兩數之和——(解法二)哈希表

Problem: 1. 兩數之和 文章目錄整體思路完整代碼時空復雜度時間復雜度&#xff1a;O(N)空間復雜度&#xff1a;O(N)整體思路 這段代碼旨在高效地解決 “兩數之和” 問題。與 O(N^2) 的暴力枚舉法相比&#xff0c;此版本采用了一種經典的 “空間換時間” 策略&#xff0c;利用 …

MySQL主從同步--主從復制進階

MySQL支持一臺主庫同時向多臺從庫進行復制&#xff0c;從庫同時也可以作為其他從服務器的主庫&#xff0c;實現鏈狀復制。1、MySQL支持的binlog二進制日志復制類型- 基于語句&#xff08;statement&#xff09;的復制在主服務器上執行SQL語句&#xff0c;在從服務器上執行同樣的…

WPF外部打開html文件

注意&#xff1a;這是一份提供WPF外部瀏覽器打開html的方法&#xff0c;而不是WPF內部嵌入html 需要通過瀏覽器打開&#xff0c;否則無法使用地址欄拼接參數的形式操作html 下面是打開html的方法↓string localHtmlPath "C:\Users\pangb\Downloads\Help\幫助文檔 - 副本.…

Go初級之十:錯誤處理與程序健壯性

Go初級之十&#xff1a;錯誤處理與程序健壯性為什么選這個主題&#xff1f; 錯誤處理是 Go 語言中一個非常獨特且重要的設計哲學。它體現了 Go 的“顯式錯誤處理”思想&#xff0c;與其它語言&#xff08;如 Java/Python&#xff09;的異常機制不同。在實際開發中&#xff0c;幾…

Xsens解碼人形機器人訓練的語言

隨著人形機器人在現實世界的應用中變得越來越普遍&#xff0c;了解實現其類似人類運動的技術至關重要。在Xsens我們滿懷熱情地探索這一領域&#xff0c;致力于為人形機器人訓練開發最佳的動作捕捉解決方案。為了幫助您更好地理解所遇到的術語&#xff0c;我們創建了一份概述&am…

25年下載chromedriver.140

前提&#xff1a; 因為我需要用seleium模擬瀏覽器獲取數據&#xff0c;需要用到這個chromedriver 驅動。 1.chrome瀏覽器版本號 先檢查你的chrome 的版本號是多少&#xff0c;就下載對應的 chromedriver 【三個點】--->【幫助】------>【關于 Google chrome 】 我的版本…

深度學習玩游戲, 模型玩游戲,大模型+游戲 llm+game, 機器學習玩游戲,人工智能游戲陪伴,模型陪玩游戲

1. 論文地址 Think in Games: Learning to Reason in Games via Reinforcement Learning with Large Language Models 2. 中文&#xff1a; Think in Games&#xff1a;做一個在王者榮耀中會玩和思考的Agent 3. 我記得幾年前&#xff0c;相關文章還是使用dqn算法。玩雅利達小…

并查集|棧

lc1668不能直接跳class Solution { public:int maxRepeating(string sequence, string word) {int k 0, n sequence.size(), wn word.size(), t 0;for (int i 0; i < n - wn; i) {if (sequence.substr(i, wn) word) {t 1;int j i wn;while (j wn < n &&…

問題三ai思路

好的&#xff0c;我把“路線A&#xff1a;分類建模擇時”的代碼按功能分段給出&#xff0c;并為每段配上簡明解釋。你可以將這些段落依次粘貼到已完成清洗后的 df 變量之后直接運行。 0. 依賴導入&#xff08;一次即可&#xff09; 作用&#xff1a;導入所需庫&#xff1b;后續…

Java第十四幕集合啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦

集合1 Collection接口1.1 集合概述集合是一個裝對象的容器。集合中只能存放引用數據類型的對象。集合中有一些大小是固定的&#xff0c;有一些是不固定的。有一些是有序的&#xff0c;有些是無序的。有些可以有重復元素&#xff0c;有一些不可以有重復元素1.2 集合常用方法publ…

硬件基礎:串口通信

數據傳輸方式&#xff08;按位傳輸方式&#xff09;并行通信通過多條數據線同時傳輸多個數據位&#xff0c;速度較快但成本高&#xff0c;抗干擾能力弱&#xff0c;適用于短距離通信&#xff0c;如早期的打印機接口。串行通信通過單條或少數數據線逐位傳輸數據&#xff0c;線路…

從Java全棧到云原生:一場技術深度對話

從Java全棧到云原生&#xff1a;一場技術深度對話 面試官與應聘者互動記錄 面試官&#xff1a;你好&#xff0c;歡迎來到我們的面試。先簡單介紹一下你自己吧。 應聘者&#xff1a;您好&#xff0c;我叫李明&#xff0c;28歲&#xff0c;碩士學歷&#xff0c;有5年Java全棧開發…