消息隊列(MQ)對于開發者來說是一個經常聽到的詞匯,但在實際開發中,大多數人并不會真正用到它。網上已經有很多關于 MQ 概述和原理的詳細講解,官網文檔和技術博客也都介紹得很深入,因此,我在這里就不再贅述。
我一直認為,學習一項技術不僅要知道它是什么,更重要的是知道怎么用,以及在哪些場景下應該用。所以這篇文章主要就是站在一個新手的角度進行描述以及實現MQ的實際運用。
使用MQ的常見情景
-
系統解耦:比如電商系統,訂單系統 → 庫存系統 → 物流系統?訂單系統發送“新訂單”消息到 MQ,庫存系統和物流系統各自訂閱處理。即使庫存系統或物流系統短暫不可用,消息仍然可以暫存,系統整體不會受影響。這一方面說實話不是架構師也沒必要太過關注,畢竟系統的底層普通開發也沒這個資格去搭建。只是用于了解,不要因為這段話阻攔學習的腳步。
-
流量削峰,降低并發:這個比較好理解,也是最能遇到的情況。用戶請求先進入 MQ 隊列,由后臺的消費端按照數據庫的最大承載能力逐步處理請求。確保數據庫不會被瞬間壓垮,提高系統穩定性。還是電商系統常用些。
-
異步任務處理:郵件、短信、推送通知,日志處理等。
理論上MQ能做的不止這些,拋磚引玉,一起深入學習吧。
對MQ進行拆分理解
MQ里常說生產者,消費者等。我會通過簡單的例子來描述:
-
生產者:一個游戲,我是GM,我要發送公告,玩家分為普通玩家和VIP玩家等。在這里,發布公告的人就是消息的生產者。應該很好理解嗷?
-
交換器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前還是要舔下的……那么我會發布一條給普通玩家的消息,和一條給VIP玩家的消息。交換器的作用在我看來就是消息的承載體,類似一條運輸船,負責把消息運輸給玩家們。產生消息的地方很多,但是交換器不用關心是誰發布了消息,他只承載你的消息。
-
隊列:如上述,有了運輸船。那么隊列有點像是碼頭了。普通玩家進普通碼頭,VIP玩家進黃金碼頭。各自碼頭停泊各自的船。總不會在普通碼頭取出黃金碼頭的貨哦?
補充:交換器是有類型的:Direct(直連交換器)Fanout(扇形交換器)Topic(主題交換器)Headers(頭交換器)
概念不多說了。比較常用的是Direct,Fanout
Direct:通過路由鍵進行匹配,運輸船是一艘,但是分為普通區和VIP區,玩家憑借船票(路由鍵)進行取貨(取消息)
Fanout:只要是是綁定了某個交換器的隊列都能進行取貨。玩家進普通碼頭就拿普通貨,進黃金碼頭拿黃金貨。當然這是舉例子,玩家的隊列還是要看你如何分配的。
-
消費者:說了這么多,玩家就是消費者嗷。
MQ代碼演示?
最新代碼是通過 事件總線?來跨方法傳遞信息和觸發動作。通過發布和訂閱事件,模塊之間能夠解耦通信,使得事件的發布和處理不再依賴于直接調用方法的方式,而是通過事件總線進行跨模塊、跨方法的異步傳遞。這種方式提高了系統的靈活性和擴展性,同時保持了模塊之間的松耦合。
長代碼警告,有興趣可以fork倉庫進行實際操練?VerEasy.Core。
必要的知識點大致如此,通過代碼+注釋的形式來演示更好理解。
我這里是NETCore項目,所以還是接口的形式方便依賴注入。
接口部分代碼
public interface IRabbitMQPersistentConnection{/// <summary>/// 是否已經連接:判斷MQ是否是連接狀態/// </summary>bool IsConnected { get; }/// <summary>/// 嘗試連接:斷連重連方法/// </summary>/// <returns></returns>Task<bool> TryConnectAsync();/// <summary>/// 唯一通道:發布通道可以隨時關閉,消費通道需要保持打開狀態,否則無法進行消費。/// </summary>IChannel Channel { get; }/// <summary>/// 唯一連接:同理,一個連接可以有N個通道,無需建立過多連接。/// </summary>IConnection Connection { get; }/// <summary>/// 釋放/// </summary>/// <returns></returns>Task DisposeAsync();/// <summary>/// 發布:發布消息/// </summary>/// <param name="msg"></param>/// <param name="exChangeName"></param>/// <param name="routeKey"></param>/// <param name="type"></param>/// <returns></returns>Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);/// <summary>/// 訂閱:訂閱隊列。/// </summary>/// <returns></returns>Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);}
?接口實現部分代碼
public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection{//構造函數注入,獲取MQ的地址賬號密碼端口,如果不傳就用我默認配置的。public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5){_connectionFactory = connectionFactory ?? new ConnectionFactory{HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};//使用Policy進行重連,這個是重連次數=5_retryCount = retryCount;}//私有變量,獲取連接成功時創建的Mq通道。private IChannel _channel = default!;public IChannel Channel{get{return _channel;}}/// <summary>/// RabbitMQ 連接工廠/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 私有變量 RabbitMQ 連接上下文/// </summary>private IConnection _connection = default!;/// <summary>/// 重連次數/// </summary>private readonly int _retryCount;/// <summary>/// 標志是否已釋放/// </summary>private bool _disposed;/// <summary>/// 是否有效連接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}public IConnection Connection{get{return _connection;}}/// <summary>/// 手動釋放/// </summary>/// <returns></returns>public async Task DisposeAsync(){if (_disposed) return;_disposed = true;try{await _connection.DisposeAsync();}catch (IOException ex){Console.WriteLine(ex.Message);}}/// <summary>/// 重連機制/// </summary>/// <returns></returns>public async Task<bool> TryConnectAsync(){var policy = Policy.Handle<SocketException>()//捕獲連接異常.Or<BrokerUnreachableException>()//無法連接異常.WaitAndRetryAsync(_retryCount, x =>TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>{//日志});try{await policy.ExecuteAsync(async () =>{//重建連接【賦值給私有化變量,通過get同步給接口里的Connection和Channel】_connection = await _connectionFactory.CreateConnectionAsync();_channel = await _connection.CreateChannelAsync();});//如果連接成功if (IsConnected){// 連接成功后,注冊連接關閉、異常、阻塞的事件處理程序_connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;_connection.CallbackExceptionAsync += OnCallbackExceptionAsync;_connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;return true;}else{return false;}}catch (Exception ex){Console.WriteLine($"重連失敗,最終拋出異常: {ex.Message}");return false;}}private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ連接關閉,正在嘗試重連...");await TryConnectAsync();}private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e){if (_disposed) return;Console.WriteLine($"RabbitMQ連接出現異常,正在嘗試重連... 異常信息: {e.Exception.Message}");await TryConnectAsync();}private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;Console.WriteLine("RabbitMQ連接被阻塞,正在嘗試重連...");await TryConnectAsync();}//發布消息public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){//判斷是否連接狀態,沒有連接就重連if (!IsConnected){await TryConnectAsync();}//創建通道,因為是發布消息,通道不用常打開,所以使用了USINGusing var channel = await _connection.CreateChannelAsync();//【ExchangeDeclareAsync】聲明交換機,exchange:交換機名稱,type:交換機類型await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);//msg就是消息,需要傳遞Byte[]var body = Encoding.UTF8.GetBytes(msg);//啟動消息持久化,我的項目里使用MQ來進行公告的推送,使用的Fanout類型交換機,故此消息保持持久化。var properties = new BasicProperties(){Persistent = true,};//發布消息await channel.BasicPublishAsync(exchange: exChangeName,routingKey: routeKey,mandatory: false,basicProperties: properties,body: body);}//訂閱消息public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout){if (!IsConnected){await TryConnectAsync();}//【queue】隊列string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;//【durable】持久化隊列,MQ服務器不會刪除它。QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(queue: queueName,durable: true,exclusive: false,autoDelete: false);//根據queue,exchange,routingKey 對 交換機和隊列進行綁定,如果是Fanout類型不需要routeKey。await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);//創建消費者var consumer = new AsyncEventingBasicConsumer(Channel);//消費者消費后執行方法consumer.ReceivedAsync += async (model, ea) =>{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);//確認消息已被消費,這樣后續該消息就不會被該隊列繼續消費到了。await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);};//啟動消費者隊列,將消費者和隊列綁定await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);}}
?
MQ服務注入
? ? ? ? ? ? if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(x =>{var connectionFactory = new ConnectionFactory(){HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()};var mq = new RabbitMQPersistentConnection(connectionFactory);return mq;});}
?