VEC系列-RabbitMQ 入門筆記

消息隊列(MQ)對于開發者來說是一個經常聽到的詞匯,但在實際開發中,大多數人并不會真正用到它。網上已經有很多關于 MQ 概述和原理的詳細講解,官網文檔和技術博客也都介紹得很深入,因此,我在這里就不再贅述。

我一直認為,學習一項技術不僅要知道它是什么,更重要的是知道怎么用,以及在哪些場景下應該用。所以這篇文章主要就是站在一個新手的角度進行描述以及實現MQ的實際運用。

使用MQ的常見情景

  1. 系統解耦:比如電商系統,訂單系統 → 庫存系統 → 物流系統?訂單系統發送“新訂單”消息到 MQ,庫存系統和物流系統各自訂閱處理。即使庫存系統或物流系統短暫不可用,消息仍然可以暫存,系統整體不會受影響。這一方面說實話不是架構師也沒必要太過關注,畢竟系統的底層普通開發也沒這個資格去搭建。只是用于了解,不要因為這段話阻攔學習的腳步。

  2. 流量削峰,降低并發:這個比較好理解,也是最能遇到的情況。用戶請求先進入 MQ 隊列,由后臺的消費端按照數據庫的最大承載能力逐步處理請求。確保數據庫不會被瞬間壓垮,提高系統穩定性。還是電商系統常用些。

  3. 異步任務處理:郵件、短信、推送通知,日志處理等。

理論上MQ能做的不止這些,拋磚引玉,一起深入學習吧。

對MQ進行拆分理解

MQ里常說生產者,消費者等。我會通過簡單的例子來描述:

  • 生產者:一個游戲,我是GM,我要發送公告,玩家分為普通玩家和VIP玩家等。在這里,發布公告的人就是消息的生產者。應該很好理解嗷?

  • 交換器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前還是要舔下的……那么我會發布一條給普通玩家的消息,和一條給VIP玩家的消息。交換器的作用在我看來就是消息的承載體,類似一條運輸船,負責把消息運輸給玩家們。產生消息的地方很多,但是交換器不用關心是誰發布了消息,他只承載你的消息。

  • 隊列:如上述,有了運輸船。那么隊列有點像是碼頭了。普通玩家進普通碼頭,VIP玩家進黃金碼頭。各自碼頭停泊各自的船。總不會在普通碼頭取出黃金碼頭的貨哦?

補充:交換器是有類型的:Direct(直連交換器)Fanout(扇形交換器)Topic(主題交換器)Headers(頭交換器)

概念不多說了。比較常用的是Direct,Fanout

Direct:通過路由鍵進行匹配,運輸船是一艘,但是分為普通區和VIP區,玩家憑借船票(路由鍵)進行取貨(取消息)

Fanout:只要是是綁定了某個交換器的隊列都能進行取貨。玩家進普通碼頭就拿普通貨,進黃金碼頭拿黃金貨。當然這是舉例子,玩家的隊列還是要看你如何分配的。

  • 消費者:說了這么多,玩家就是消費者嗷。

MQ代碼演示?

最新代碼是通過 事件總線?來跨方法傳遞信息和觸發動作。通過發布和訂閱事件,模塊之間能夠解耦通信,使得事件的發布和處理不再依賴于直接調用方法的方式,而是通過事件總線進行跨模塊、跨方法的異步傳遞。這種方式提高了系統的靈活性和擴展性,同時保持了模塊之間的松耦合。

長代碼警告,有興趣可以fork倉庫進行實際操練?VerEasy.Core

必要的知識點大致如此,通過代碼+注釋的形式來演示更好理解。

我這里是NETCore項目,所以還是接口的形式方便依賴注入。

接口部分代碼

    public interface IRabbitMQPersistentConnection{/// <summary>/// 是否已經連接:判斷MQ是否是連接狀態/// </summary>bool IsConnected { get; }/// <summary>/// 嘗試連接:斷連重連方法/// </summary>/// <returns></returns>Task<bool> TryConnectAsync();/// <summary>/// 唯一通道:發布通道可以隨時關閉,消費通道需要保持打開狀態,否則無法進行消費。/// </summary>IChannel Channel { get; }/// <summary>/// 唯一連接:同理,一個連接可以有N個通道,無需建立過多連接。/// </summary>IConnection Connection { get; }/// <summary>/// 釋放/// </summary>/// <returns></returns>Task DisposeAsync();/// <summary>/// 發布:發布消息/// </summary>/// <param name="msg"></param>/// <param name="exChangeName"></param>/// <param name="routeKey"></param>/// <param name="type"></param>/// <returns></returns>Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);/// <summary>/// 訂閱:訂閱隊列。/// </summary>/// <returns></returns>Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);}

?接口實現部分代碼

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{//構造函數注入,獲取MQ的地址賬號密碼端口,如果不傳就用我默認配置的。public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5){_connectionFactory = connectionFactory ?? new ConnectionFactory{HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};//使用Policy進行重連,這個是重連次數=5_retryCount = retryCount;}//私有變量,獲取連接成功時創建的Mq通道。private IChannel _channel = default!;public IChannel Channel{get{return _channel;}}/// <summary>/// RabbitMQ 連接工廠/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 私有變量 RabbitMQ 連接上下文/// </summary>private IConnection _connection = default!;/// <summary>/// 重連次數/// </summary>private readonly int _retryCount;/// <summary>/// 標志是否已釋放/// </summary>private bool _disposed;/// <summary>/// 是否有效連接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}public IConnection Connection{get{return _connection;}}/// <summary>/// 手動釋放/// </summary>/// <returns></returns>public async Task DisposeAsync(){if (_disposed) return;_disposed = true;try{await _connection.DisposeAsync();}catch (IOException ex){Console.WriteLine(ex.Message);}}/// <summary>/// 重連機制/// </summary>/// <returns></returns>public async Task<bool> TryConnectAsync(){var policy = Policy.Handle<SocketException>()//捕獲連接異常.Or<BrokerUnreachableException>()//無法連接異常.WaitAndRetryAsync(_retryCount, x =>TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>{//日志});try{await policy.ExecuteAsync(async () =>{//重建連接【賦值給私有化變量,通過get同步給接口里的Connection和Channel】_connection = await _connectionFactory.CreateConnectionAsync();_channel = await _connection.CreateChannelAsync();});//如果連接成功if (IsConnected){// 連接成功后,注冊連接關閉、異常、阻塞的事件處理程序_connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;_connection.CallbackExceptionAsync += OnCallbackExceptionAsync;_connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;return true;}else{return false;}}catch (Exception ex){Console.WriteLine($"重連失敗,最終拋出異常: {ex.Message}");return false;}}private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ連接關閉,正在嘗試重連...");await TryConnectAsync();}private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e){if (_disposed) return;Console.WriteLine($"RabbitMQ連接出現異常,正在嘗試重連... 異常信息: {e.Exception.Message}");await TryConnectAsync();}private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ連接被阻塞,正在嘗試重連...");await TryConnectAsync();}//發布消息public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){//判斷是否連接狀態,沒有連接就重連if (!IsConnected){await TryConnectAsync();}//創建通道,因為是發布消息,通道不用常打開,所以使用了USINGusing var channel = await _connection.CreateChannelAsync();//【ExchangeDeclareAsync】聲明交換機,exchange:交換機名稱,type:交換機類型await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);//msg就是消息,需要傳遞Byte[]var body = Encoding.UTF8.GetBytes(msg);//啟動消息持久化,我的項目里使用MQ來進行公告的推送,使用的Fanout類型交換機,故此消息保持持久化。var properties = new BasicProperties(){Persistent = true,};//發布消息await channel.BasicPublishAsync(exchange: exChangeName,routingKey: routeKey,mandatory: false,basicProperties: properties,body: body);}//訂閱消息public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){if (!IsConnected){await TryConnectAsync();}//【queue】隊列string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;//【durable】持久化隊列,MQ服務器不會刪除它。QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false);//根據queue,exchange,routingKey 對 交換機和隊列進行綁定,如果是Fanout類型不需要routeKey。await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);//創建消費者var consumer = new AsyncEventingBasicConsumer(Channel);//消費者消費后執行方法consumer.ReceivedAsync += async (model, ea) =>{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);//確認消息已被消費,這樣后續該消息就不會被該隊列繼續消費到了。await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);};//啟動消費者隊列,將消費者和隊列綁定await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);}}

?

MQ服務注入

? ? ? ? ? ? if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var connectionFactory = new ConnectionFactory(){HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};var mq = new RabbitMQPersistentConnection(connectionFactory);return mq;});}

?


我在注入各種服務時,添加了一些日志進行輸出,效果如下:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/897127.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/897127.shtml
英文地址,請注明出處:http://en.pswp.cn/news/897127.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

js中??是什么意思

在 JavaScript 中&#xff0c;?? 是一個邏輯運算符&#xff0c;稱為 空值合并運算符&#xff08;Nullish Coalescing Operator&#xff09;。它用于檢查左側的值是否為 null 或 undefined&#xff0c;如果是&#xff0c;則返回右側的值&#xff1b;否則返回左側的值。 語法 …

常見限流算法

限流是指在高并發、大流量請求的情況下&#xff0c;限制新的流量對系統的訪問&#xff0c;以保證系統服務的安全性。常見的限流算法及其詳細介紹如下&#xff1a; 計數器算法&#xff08;Fixed Window Counter&#xff09; 原理&#xff1a;使用一個固定時間窗口內的計數器來…

YOLOv12本地部署教程——42%速度提升,讓高效目標檢測觸手可及

YOLOv12 是“你只看一次”&#xff08;You Only Look Once, YOLO&#xff09;系列的最新版本&#xff0c;于 2025 年 2 月發布。它引入了注意力機制&#xff0c;提升了檢測精度&#xff0c;同時保持了高效的實時性能。在保持速度的同時&#xff0c;顯著提升了檢測精度。例如&am…

【原創】C# HttpClient 讀取流數據的問題

默認情況下HttpClient中有緩存&#xff0c;在讀取流數據的時候&#xff0c;往往要等一小會兒&#xff0c;然后讀出一大堆。 我們在請求OpenAI類的大模型的時候&#xff0c;往往要一邊讀取一邊顯示&#xff08;輸出&#xff09;&#xff0c;這時候需要禁止HttpClient 中內置的緩…

能源行業標桿:信創系統在智能電網中的3個創新應用案例

在當今數字化浪潮洶涌澎湃的時代&#xff0c;信息技術應用創新&#xff08;信創&#xff09;已成為推動我國經濟社會發展的重要引擎。智能電網作為能源行業的核心領域&#xff0c;其信息化建設對于保障國家能源安全和促進能源轉型具有重要意義。今天&#xff0c;讓我們一同探索…

AcWing 藍橋杯集訓·每日一題2025·5526. 平衡細菌

5526. 平衡細菌 題意 給定一個序列 ( a i ) (a_i) (ai?)&#xff0c;每次操作可以選擇一個位置 (p)&#xff0c;令從 ( a p ) (a_p) (ap?) 開始的每個數都加上一個以 (1) 或者 (-1) 為公差的從 ( 1 / ? 1 ) (1 / -1) (1/?1) 開始的等差數列。求最小化讓序列歸零的操作…

PTA 7-6 列出連通集

題目詳情&#xff1a; 給定一個有 n 個頂點和 m 條邊的無向圖&#xff0c;請用深度優先遍歷&#xff08;DFS&#xff09;和廣度優先遍歷&#xff08;BFS&#xff09;分別列出其所有的連通集。假設頂點從 0 到 n?1 編號。進行搜索時&#xff0c;假設我們總是從編號最小的頂點出…

ES中數據刷新策略refresh

在 Elasticsearch 中&#xff0c;插入數據時的 refresh 參數控制文檔在寫入后何時對搜索可見&#xff0c;其行為直接影響數據可見性和系統性能。以下是 refresh 參數的三個可選值&#xff08;true、false、wait_for&#xff09;的詳細說明及適用場景&#xff1a; 1. refreshtr…

用Python的Pandas庫解鎖數據科學:從入門到實戰

用Python的Pandas庫解鎖數據科學&#xff1a;從入門到實戰 引言 Python的Pandas庫&#xff08;名稱源自"Panel Data"&#xff09;作為數據科學生態系統的基石&#xff0c;憑借其強大的數據結構和靈活的操作功能&#xff0c;已成為全球超過90%數據工作者的首選工具。…

如何提高域名解析速度?

在搭建網站或使用在線服務時&#xff0c;許多人會問&#xff1a;“為什么我的網站加載速度這么慢?”“如何提高域名解析速度?”“域名解析速度對網站性能有什么影響?”域名解析速度直接影響用戶訪問網站的體驗&#xff0c;因此&#xff0c;了解如何提高域名解析速度尤為重要…

深度學習語義分割數據集全景解析

一、語義分割任務概述 語義分割是計算機視覺領域的核心任務之一&#xff0c;目標是通過算法將圖像中的每個像素精準劃分到對應的語義類別&#xff08;如道路、車輛、行人等&#xff09;。高質量標注數據集是推動該領域發展的關鍵因素。本文將系統梳理主流數據集的技術特征與適…

貪心算法一

> 作者&#xff1a;?舊言~ > 座右銘&#xff1a;松樹千年終是朽&#xff0c;槿花一日自為榮。 > 目標&#xff1a;了解什么是貪心算法&#xff0c;并且掌握貪心算法。 > 毒雞湯&#xff1a;有些事情&#xff0c;總是不明白&#xff0c;所以我不會堅持。早安! >…

基于websocket的多用戶網頁五子棋 --- 測試報告

目錄 功能測試自動化測試性能測試 功能測試 1.登錄注冊頁面 2.游戲大廳頁面 3.游戲房間頁面 自動化測試 1.使用腦圖編寫web自動化測試用例 2.創建自動化項目&#xff0c;根據用例通過selenium來實現腳本 根據腦圖進行測試用例的編寫&#xff1a; 每個頁面一個測試類&am…

docker學習與使用

一、docker概述 1.docker是什么 是一個開源的應用容器引擎&#xff0c;基于go語言開發并遵循apache2.0協議開源 是在Linux容器里運行應用的開源工具 是一種輕量級的 “虛擬機” Docker的容器技術,可以在一臺主機上輕松為任何應用創建一個輕量級的、可移植的、自給自足的容器…

2025-03-04 學習記錄--C/C++-C語言 判斷是否是素數

合抱之木&#xff0c;生于毫末&#xff1b;九層之臺&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; C語言 判斷是否是素數 一、代碼 ?? #include <stdio.h> #include <stdbool.h> // 使用 bool 類型// 判斷是否是…

如何將飛書多維表格與DeepSeek R1結合使用:效率提升的完美搭檔

將飛書的多維表格與DeepSeek R1結合使用&#xff0c;就像為你的數據管理和分析之旅裝上一臺渦輪增壓器。兩者的合作&#xff0c;不僅僅在速度上讓人耳目一新&#xff0c;更是將智能化分析帶入了日常的工作場景。以下是它們如何相輔相成并改變我們工作方式的一些分享。 --- 在…

離散傅里葉變換(Discrete Fourier Transform, DFT)及其在圖像處理中的應用

離散傅里葉變換&#xff08;DFT&#xff09;及其在圖像處理中的應用 什么是離散傅里葉變換&#xff1f; 離散傅里葉變換&#xff08;Discrete Fourier Transform, DFT&#xff09;是一種強大的數學工具&#xff0c;用于將離散信號從時域&#xff08;或空間域&#xff09;轉換…

在 macOS 上使用 CLion 進行 Google Test 單元測試

介紹 Google Test&#xff08;GTest&#xff09;是 Google 開源的 C 單元測試框架&#xff0c;它提供了簡單易用的斷言、測試夾具&#xff08;Fixtures&#xff09;和測試運行機制&#xff0c;使 C 開發者能夠編寫高效的單元測試。 本博客將介紹如何在 macOS 上使用 CLion 配…

Oracle SQL優化實戰要點解析(11)——索引、相關子查詢及NL操作(1)

11.1. 充分利用索引有序特性,避免發生大表上的FTS,以及對中間大數據集的排序。 11.1.1. 適用場景 從一個或多個大表(例如:億行級或TB級數據量)中過濾出全列大數據集(例如:數百萬或千萬行數據),對該大數據集按其中某列進行排序,最終,只取最前面的少部分數據(例如:…

軟考架構師筆記-計算機網絡

1.9 計算機網絡 OSI/RM 七層模型 物理層 二進制傳輸(中繼器、集線器) (typedef) 數據鏈路層 傳送以幀為單位的信息(網橋、交換機、網卡) 網絡層 分組傳輸和路由選擇(三層交換機、路由器)ARP/RARP/IGMP/ICMP/IP 傳輸層 端到端的連接(TCP/UDP)在前向糾錯系統中&#xff0c;當接…