RabbitMQ實踐學習筆記

RabbitMQ實踐

以下是關于RabbitMQ實踐的整理,涵蓋常見場景和示例代碼(基于Markdown格式)。內容按模塊分類,避免步驟詞匯,直接提供可操作的方法:

基礎連接與隊列聲明

使用Python的pika庫建立連接并聲明隊列:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')  # 聲明持久化隊列可添加參數 durable=True

消息發布示例:

channel.basic_publish(exchange='',routing_key='hello',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))  # 消息持久化


工作隊列模式

消費者端的公平分發(prefetch)設置:

channel.basic_qos(prefetch_count=1)  # 每次只處理一條消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)

消息確認機制:

def callback(ch, method, properties, body):print("Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動確認


發布/訂閱模式

聲明扇形交換機:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

臨時隊列綁定:

result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(exchange='logs', queue=result.method.queue)

路由與主題模式

直連交換機實現路由鍵過濾:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

主題交換機匹配模式:

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.critical')

RPC遠程調用

客戶端發送請求并監聽回調隊列:

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queuechannel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=callback_queue,correlation_id=corr_id,),body=request_body)

服務端響應處理:

def on_request(ch, method, props, body):response = process_request(body)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)


消息TTL與死信隊列

設置消息過期時間:

channel.basic_publish(exchange='',routing_key='queue_with_ttl',body='expiring message',properties=pika.BasicProperties(expiration='60000'))  # 60秒后過期

配置死信交換機:

args = {"x-dead-letter-exchange": "dlx_exchange"}
channel.queue_declare(queue='work_queue', arguments=args)


插件與擴展

延遲消息通過插件實現:

headers = {"x-delay": 5000}  # 延遲5秒
channel.basic_publish(exchange='delayed_exchange',routing_key='delayed_queue',body='delayed msg',properties=pika.BasicProperties(headers=headers))

集群與高可用

配置鏡像隊列策略:

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'  # 同步所有節點

客戶端連接多個節點:

hosts = [{'host': 'node1'}, {'host': 'node2'}]
connection = pika.BlockingConnection(pika.ConnectionParameters(hosts))

監控與管理

通過API獲取隊列狀態:

import requests
response = requests.get('http://localhost:15672/api/queues', auth=('guest', 'guest'))

以上示例覆蓋了RabbitMQ的核心功能場景,包括消息模式、可靠性保證和擴展功能。實際應用時需根據業務需求調整參數和錯誤處理機制。更多細節可參考RabbitMQ官方文檔和社區最佳實踐。

RabbitMQ 源碼分析(C++ 客戶端)

RabbitMQ 是一個開源的消息代理軟件,采用 Erlang 語言編寫。C++ 客戶端是 RabbitMQ 官方提供的客戶端庫之一,用于在 C++ 項目中與 RabbitMQ 服務器進行交互。以下是對 RabbitMQ C++ 客戶端源碼的分析框架和關鍵點。

源碼結構與核心模塊

RabbitMQ C++ 客戶端的源碼主要包含以下幾個核心模塊:

AMQP 協議實現
C++ 客戶端基于 AMQP 0-9-1 協議實現,核心代碼位于 amqp.hamqp.c 中。這部分負責協議的編解碼、幀的構造與解析。

Socket 通信層
使用系統套接字(Socket)實現與 RabbitMQ 服務器的 TCP 通信。代碼位于 amqp_socket.hamqp_socket.c,支持普通 Socket 和 SSL/TLS 加密通信。

連接管理
amqp_connection.h 中定義了連接的生命周期管理,包括連接建立、心跳檢測、連接關閉等邏輯。

通道管理
通過 amqp_channel.h 實現多路復用機制,單個 TCP 連接可以支持多個邏輯通道(Channel),每個通道獨立處理消息。

消息發布與消費
amqp_queue.hamqp_exchange.h 實現了隊列和交換器的聲明、綁定操作。消息的發布(basic_publish)和消費(basic_consume)邏輯在 amqp_basic.h 中定義。


關鍵流程分析

連接建立流程

  1. 調用 amqp_new_connection 創建連接對象。
  2. 通過 amqp_socket_open 建立 Socket 連接。
  3. 發送協議頭并協商參數(amqp_login)。

消息發布流程

  1. 聲明交換器(amqp_exchange_declare)。
  2. 構造消息屬性(amqp_basic_properties_t)。
  3. 調用 amqp_basic_publish 發送消息幀。

消息消費流程

  1. 聲明隊列并綁定到交換器(amqp_queue_bind)。
  2. 調用 amqp_basic_consume 訂閱隊列。
  3. 通過 amqp_consume_message 循環接收消息。

核心數據結構

幀(Frame)結構
RabbitMQ 使用幀作為通信的基本單位,幀類型包括:

  • METHOD:傳遞 AMQP 方法(如 queue.declare)。
  • HEADER:消息屬性。
  • BODY:消息內容。

連接狀態機
通過 amqp_connection_state_t 管理連接狀態,包括協議版本、通道列表、心跳超時等。


示例代碼片段

以下是使用 C++ 客戶端發布消息的簡化代碼:

#include <amqp.h>
#include <amqp_tcp_socket.h>void publish_message() {amqp_connection_state_t conn = amqp_new_connection();amqp_socket_t *socket = amqp_tcp_socket_new(conn);amqp_socket_open(socket, "localhost", 5672);amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");amqp_channel_open(conn, 1);amqp_basic_publish(conn, 1, amqp_cstring_bytes("exchange_name"),amqp_cstring_bytes("routing_key"), 0, 0, NULL,amqp_cstring_bytes("Hello, RabbitMQ!"));amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);amqp_connection_close(conn, AMQP_REPLY_SUCCESS);amqp_destroy_connection(conn);
}


性能優化與調試

連接復用
通過通道復用 TCP 連接,減少頻繁建立連接的開銷。

異步模式
使用 amqp_consume_message 的非阻塞模式,結合事件循環實現高吞吐量。

調試工具

  • 啟用 AMQP_DEBUG 宏打印協議幀日志。
  • 使用 Wireshark 抓包分析 AMQP 流量。

參考資源

  1. RabbitMQ 官方文檔
  2. AMQP 0-9-1 協議規范
  3. rabbitmq-c 源碼倉庫

通過分析源碼,可以更深入地理解 RabbitMQ 的通信機制和設計思想,便于定制化開發或性能調優。

基于Python和RabbitMQ的基礎連接與隊列

以下是基于Python和RabbitMQ的基礎連接與隊列聲明的實例代碼示例,涵蓋不同場景和參數配置:


基礎連接與簡單隊列聲明

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
connection.close()

聲明持久化隊列

channel.queue_declare(queue='durable_queue', durable=True)


聲明帶自定義參數的隊列

channel.queue_declare(queue='custom_queue', arguments={'x-message-ttl': 60000})


聲明排他隊列

channel.queue_declare(queue='exclusive_queue', exclusive=True)


聲明自動刪除隊列

channel.queue_declare(queue='auto_delete_queue', auto_delete=True)


聲明隊列并綁定死信交換器

args = {'x-dead-letter-exchange': 'dlx_exchange'}
channel.queue_declare(queue='with_dlx', arguments=args)


聲明優先級隊列

channel.queue_declare(queue='priority_queue', arguments={'x-max-priority': 10})


聲明延遲隊列(通過插件)

channel.queue_declare(queue='delayed_queue', arguments={'x-delayed-type': 'direct'})


聲明長度限制隊列

channel.queue_declare(queue='length_limited', arguments={'x-max-length': 1000})


聲明多消費者隊列

channel.queue_declare(queue='multi_consumer', durable=True)
channel.basic_qos(prefetch_count=1)  # 公平分發

使用SSL加密連接

ssl_params = pika.SSLOptions(ssl.SSLContext(), 'localhost')
connection = pika.BlockingConnection(pika.ConnectionParameters(port=5671, ssl_options=ssl_params))

連接集群節點

params = pika.ConnectionParameters(host='node1.cluster')
connection = pika.BlockingConnection(params)

使用連接池

from rabbitmq import ConnectionPool
pool = ConnectionPool(pika.ConnectionParameters, max_size=10)

聲明隊列并返回隊列信息

result = channel.queue_declare(queue='passive_queue', passive=True)
print(f"Queue has {result.method.message_count} messages")

聲明無參數默認隊列

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

使用異步連接適配器

import pika
connection = pika.SelectConnection(pika.ConnectionParameters('localhost'))

連接失敗重試機制

parameters = pika.ConnectionParameters(host='localhost',connection_attempts=5,retry_delay=3
)

聲明隊列并綁定多個路由鍵

channel.queue_declare(queue='multi_binding')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='info')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='error')

使用URL參數連接

url = 'amqp://user:pass@host:port/vhost'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)

聲明隊列時設置TTL

args = {'x-expires': 3600000}  # 1小時后自動刪除
channel.queue_declare(queue='temp_queue', arguments=args)

注意事項

  • 所有示例需預先安裝pika庫:pip install pika
  • 生產環境建議添加異常處理邏輯
  • 連接參數應根據實際RabbitMQ服務器配置調整
  • 隊列屬性(如durable)需與已存在隊列的屬性一致,否則會報錯

每個示例均可獨立運行,建議根據實際需求組合使用不同參數。

RabbitMQ 工作隊列模式與 prefetch 設置

RabbitMQ 的工作隊列模式(Work Queue)用于在多個消費者之間分發任務。通過 prefetch 參數可以控制消費者未確認消息的最大數量,從而優化任務分配效率。

Python 實現工作隊列模式

安裝 RabbitMQ 的 Python 客戶端庫:

pip install pika

生產者代碼示例(發送任務):

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)for i in range(10):message = f"Task {i}"channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化)print(f" [x] Sent {message}")connection.close()

消費者代碼示例(處理任務):

import pika
import timedef callback(ch, method, properties, body):print(f" [x] Received {body.decode()}")time.sleep(body.count(b'.'))  # 模擬耗時任務print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動確認connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # 關鍵 prefetch 設置
channel.basic_consume(queue='task_queue', on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

prefetch 參數詳解

prefetch_count=1 表示每個消費者最多只能有一個未確認的消息。該設置能實現:

  • 公平調度:避免某個消費者積壓大量消息,而其他消費者空閑
  • 負載均衡:新任務會自動分配給空閑的消費者
  • 流量控制:防止消費者過載

對于需要更高吞吐量的場景,可以適當增大 prefetch_count 值,但需注意:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92576.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92576.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92576.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

量子生成對抗網絡:量子計算與生成模型的融合革命

引言&#xff1a;當生成對抗網絡遇上量子計算在人工智能與量子計算雙重浪潮的交匯處&#xff0c;量子生成對抗網絡&#xff08;Quantum Generative Adversarial Networks, QGAN&#xff09;正成為突破經典算力瓶頸的關鍵技術。傳統生成對抗網絡&#xff08;GAN&#xff09;在圖…

VBA 多個選項,將選中的選項錄入當前選中的單元格

1、使用LISTBOX插件&#xff0c;選中后回車錄入 維護好數據&#xff0c;并新增一個activeX列表框插件 Private Sub Worksheet_SelectionChange(ByVal Target As Range)If Target.Count > 1 Then Exit SubIf Target.Row > 2 And Target.Row < 10 And Target.Column 2…

【NLP輿情分析】基于python微博輿情分析可視化系統(flask+pandas+echarts) 視頻教程 - 主頁-微博點贊量Top6實現

大家好&#xff0c;我是java1234_小鋒老師&#xff0c;最近寫了一套【NLP輿情分析】基于python微博輿情分析可視化系統(flaskpandasecharts)視頻教程&#xff0c;持續更新中&#xff0c;計劃月底更新完&#xff0c;感謝支持。今天講解主頁-微博點贊量Top6實現 視頻在線地址&…

SAP調用外部API

SAP需求將中文字符轉化為對應的拼音具體思路,由于sap中沒有將中文字符轉化為拼音的函數或方法類,則以http請求訪問外部服務器發布的API服務,然后獲取其返回值即可1.調用外部網站上提供的api缺點:免費次數有限,后需要充值這里是用www格式的json報文*&----------------------…

(12)機器學習小白入門YOLOv:YOLOv8-cls 模型微調實操

YOLOv8-cls 模型微調實操 (1)機器學習小白入門YOLOv &#xff1a;從概念到實踐 (2)機器學習小白入門 YOLOv&#xff1a;從模塊優化到工程部署 (3)機器學習小白入門 YOLOv&#xff1a; 解鎖圖片分類新技能 (4)機器學習小白入門YOLOv &#xff1a;圖片標注實操手冊 (5)機器學習小…

基于Matlab傳統圖像處理技術的車輛車型識別與分類方法研究

隨著計算機視覺和圖像處理技術的發展&#xff0c;車輛檢測與識別已經成為智能交通系統中的一個重要研究方向。傳統圖像處理方法通過對圖像進行預處理、特征提取、分類與識別&#xff0c;提供了一種無需復雜深度學習模型的解決方案。本研究基于MATLAB平臺&#xff0c;采用傳統圖…

未來趨勢:LeafletJS 與 Web3/AI 的融合

引言 LeafletJS 作為一個輕量、靈活的 JavaScript 地圖庫&#xff0c;以其模塊化設計和高效渲染能力在 Web 地圖開發中占據重要地位。隨著 Web3 和人工智能&#xff08;AI&#xff09;的興起&#xff0c;地圖應用的開發范式正在發生變革。Web3 技術&#xff08;如區塊鏈、去中…

Spring AI 系列之二十一 - EmbeddingModel

之前做個幾個大模型的應用&#xff0c;都是使用Python語言&#xff0c;后來有一個項目使用了Java&#xff0c;并使用了Spring AI框架。隨著Spring AI不斷地完善&#xff0c;最近它發布了1.0正式版&#xff0c;意味著它已經能很好的作為企業級生產環境的使用。對于Java開發者來說…

LFU算法及優化

繼上一篇的LRU算法的實現和講解&#xff0c;這一篇來講述LFU最近使用頻率高的數據很大概率將會再次被使用,而最近使用頻率低的數據,將來大概率不會再使用。做法&#xff1a;把使用頻率最小的數據置換出去。這種算法更多是從使用頻率的角度&#xff08;但是當緩存滿時&#xff0…

關于原車一鍵啟動升級手機控車的核心信息及注意事項

想知道如何給原車已經有一鍵啟動功能的車輛加裝手機遠程啟動。這是個很實用的汽車改裝需求&#xff0c;尤其適合想在冬天提前熱車、夏天提前開空調的車主。一、適配方案與核心功能 ?升級專車專用4G手機控車模塊?&#xff0c;推薦安裝「移動管家YD361-3」系統&#xff0c;該方…

數據結構與算法:類C語言有關操作補充

數據結構與算法:類C語言操作補充 作為老師,我將詳細講解類C語言(如C或C++)中的關鍵操作,包括動態內存分配和參數傳遞。這些內容在數據結構與算法中至關重要,例如在實現動態數組、鏈表或高效函數調用時。我會用通俗易懂的語言和代碼示例逐步解釋,確保你輕松掌握。內容基…

Go 并發(協程,通道,鎖,協程控制)

一.協程&#xff08;Goroutine&#xff09;并發&#xff1a;指程序能夠同時執行多個任務的能力&#xff0c;多線程程序在一個核的cpu上運行&#xff0c;就是并發。并行&#xff1a;多線程程序在多個核的cpu上運行&#xff0c;就是并行。并發主要由切換時間片來實現"同時&q…

圖機器學習(15)——鏈接預測在社交網絡分析中的應用

圖機器學習&#xff08;15&#xff09;——鏈接預測在社交網絡分析中的應用0. 鏈接預測1. 數據處理2. 基于 node2vec 的鏈路預測3. 基于 GraphSAGE 的鏈接預測3.1 無特征方法3.2 引入節點特征4. 用于鏈接預測的手工特征5. 結果對比0. 鏈接預測 如今&#xff0c;社交媒體已成為…

每日一算:華為-批薩分配問題

題目描述"吃貨"和"饞嘴"兩人到披薩店點了一份鐵盤&#xff08;圓形&#xff09;披薩&#xff0c;并囑咐店員將披薩按放射狀切成大小相同的偶數個小塊。但是粗心的服務員將披薩切成了每塊大小都完全不同的奇數塊&#xff0c;且肉眼能分辨出大小。由于兩人都…

Transfusion,Show-o and Show-o2論文解讀

目錄 一、Transfusion 1、概述 2、方法 二、Show-o 1、概述 2、方法 3、訓練 三、Show-o2 1、概述 2、模型架構 3、訓練方法 4、實驗 一、Transfusion 1、概述 Transfusion模型應該是Show系列&#xff0c;Emu系列的前傳&#xff0c;首次將文本和圖像生成統一到單…

聊聊 Flutter 在 iOS 真機 Debug 運行出現 Timed out *** to update 的問題

最近剛好有人在問&#xff0c;他的 Flutter 項目在升級之后出現 Error starting debug session in Xcode: Timed out waiting for CONFIGURATION_BUILD_DIR to update 問題&#xff0c;也就是真機 Debug 時始終運行不了的問題&#xff1a; 其實這已經是一個老問題了&#xff0c…

《R for Data Science (2e)》免費中文翻譯 (第1章) --- Data visualization(2)

寫在前面 本系列推文為《R for Data Science (2)》的中文翻譯版本。所有內容都通過開源免費的方式上傳至Github&#xff0c;歡迎大家參與貢獻&#xff0c;詳細信息見&#xff1a; Books-zh-cn 項目介紹&#xff1a; Books-zh-cn&#xff1a;開源免費的中文書籍社區 r4ds-zh-cn …

【機器學習【9】】評估算法:數據集劃分與算法泛化能力評估

文章目錄一、 數據集劃分&#xff1a;訓練集與評估集二、 K 折交叉驗證&#xff1a;提升評估可靠性1. 基本原理1.1. K折交叉驗證基本原理1.2. 邏輯回歸算法與L22. 基于K折交叉驗證L2算法三、棄一交叉驗證&#xff08;Leave-One-Out&#xff09;1、基本原理2、代碼實現四、Shuff…

CodeBuddy三大利器:Craft智能體、MCP協議和DeepSeek V3,編程效率提升的秘訣:我的CodeBuddy升級體驗之旅(個性化推薦微服務系統)

&#x1f31f; 嗨&#xff0c;我是Lethehong&#xff01;&#x1f31f; &#x1f30d; 立志在堅不欲說&#xff0c;成功在久不在速&#x1f30d; &#x1f680; 歡迎關注&#xff1a;&#x1f44d;點贊??留言收藏&#x1f680; &#x1f340;歡迎使用&#xff1a;小智初學計…

Spring Boot 整合 Redis 實現發布/訂閱(含ACK機制 - 事件驅動方案)

Spring Boot整合Redis實現發布/訂閱&#xff08;含ACK機制&#xff09;全流程一、整體架構二、實現步驟步驟1&#xff1a;添加Maven依賴<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter…