??????
目錄
1.添加包
2. 連接配置
? ? ? ? 2.1.連接字符串
?? 2.2.連接對象
3.創建連接服務
? ? ? ? 3.1.添加配置獲取方法
? ? ? ? 3.2.服務實現類
? ? ? ? 3.3.服務接口
4.創建生產者服務
? ? ? ? 4.1.生產者實現類
? ? ? ? ?4.2.生產者接口
5.創建消費者服務
? ? ? ? 5.1.消費者服務接口
5.2.消費者接口
6.注冊
7.簡單使用案例
????????7.1.實現
? ? ? ? 7.2.接口
????????7.3.控制器
????????在 .NET Core 應用程序中使用 RabbitMQ 有許多好處,主要體現在其作為消息隊列系統所帶來的靈活性、可靠性和擴展性等方面,還能促進微服務架構的實施,是構建現代分布式應用的理想選擇之一
1.添加包
? ? ? ? 添加?RabbitMQ.Client 包。
2. 連接配置
? ? ? ? 2.1.連接字符串
? ? ? ? dbsettings.json文件添加 RabbitMQ 連接配置
//RabbitMQ配置
"RabbitMQSettings": {"HostName": "ip地址", //地址"Port": "端口", //端口"UserName": "RabbitMQ用戶名", //用戶名"Password": "RabbitMQ密碼", //密碼"VirtualHost": "/", //本地虛擬地址"RetryCount": 5, //最大重試次數"RetryInterval": 5, //斷開重連次數"PrefetchCount": 5, //預取消息數量"ConsumerCount": 5 //消費者數量
}
?? 2.2.連接對象
namespace Frame3_DataRepository.RabbitMQRepository.BaseMQ
{/// <summary>/// 消息隊列配置類/// </summary>public class RabbitMQSettings{/// <summary>/// RabbitMQ 服務器地址/// </summary>public string HostName { get; set; }/// <summary>/// 端口號,默認5672/// </summary>public int Port { get; set; } = 5672;/// <summary>/// 用戶名/// </summary>public string UserName { get; set; }/// <summary>/// 密碼/// </summary>public string Password { get; set; }/// <summary>/// 虛擬主機,默認為//// </summary>public string VirtualHost { get; set; } = "/";/// <summary>/// 連接重試次數/// </summary>public int RetryCount { get; set; } = 5;/// <summary>/// 重試間隔(秒)/// </summary>public int RetryInterval { get; set; } = 5;/// <summary>/// 預取消息數量/// </summary>public ushort PrefetchCount { get; set; }/// <summary>/// 消費者數量/// </summary>public int ConsumerCount { get; set; }}/// <summary>/// 持久化/// </summary>public enum DeliveryMode : byte{NonPersistent = 1,Persistent = 2}/// <summary>/// 消費者狀態信息/// </summary>public class ConsumerStatus{/// <summary>/// 當前活躍消費者數量/// </summary>public int CurrentCount { get; set; }/// <summary>/// 最大允許消費者數量/// </summary>public int MaxCount { get; set; }/// <summary>/// 活躍消費者標簽列表/// </summary>public List<string> ActiveConsumers { get; set; } = new();}
}
? ? ? ? 案例如下
3.創建連接服務
? ? ? ? 先創建配置獲取方法,再創建?RabbitMqClient 服務實現類和?IRabbitMqClient 服務接口來MQ連接。
? ? ? ? 3.1.添加配置獲取方法
using Microsoft.Extensions.Configuration;namespace Frame4_LibraryCore.BaseConfig
{/// <summary>/// 全局配置/// </summary>public static class Config{/// <summary>/// 從指定的 JSON 配置文件中讀取配置,并反序列化為指定類型/// </summary>/// <typeparam name="T">目標配置類型(如 RedisSettings、DatabaseSettings 等)</typeparam>/// <param name="fileName">JSON 配置文件名(如 "appsettings.json")</param>/// <param name="sessions">配置節點名稱(如 "RedisSettings")</param>/// <returns>返回綁定后的強類型配置對象</returns>public static T GetSetting<T>(string fileName, string sessions){//創建 ConfigurationBuilder 實例,用于構建配置var builder = new ConfigurationBuilder()//設置配置文件的基礎路徑為當前程序運行目錄.SetBasePath(Directory.GetCurrentDirectory())//添加 JSON 文件作為配置源://- fileName: 指定要加載的 JSON 文件//- optional: false 表示文件必須存在,否則拋出異常//- reloadOnChange: true 表示文件修改時自動重新加載.AddJsonFile(fileName, optional: false, reloadOnChange: true);//構建配置對象(IConfigurationRoot)IConfigurationRoot config = builder.Build();//獲取指定配置節點(sessions),并將其反序列化為類型 Tvar conn = config.GetSection(sessions).Get<T>();//返回反序列化后的配置對象return conn;}}
}
?????????案例如下
?
? ? ? ? 3.2.服務實現類
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 隊列服務實現類/// 提供消息隊列連接和通道管理功能/// </summary>public sealed class RabbitMqClient : BaseServiceSingleton, IRabbitMqClient{/// <summary>/// RabbitMQ 連接工廠實例/// </summary>private readonly IConnectionFactory _connectionFactory;/// <summary>/// 日志記錄器/// </summary>private readonly ILogger<RabbitMqClient> _logger;/// <summary>/// 連接重試最大次數/// </summary>private readonly int _retryCount;/// <summary>/// 重試間隔時間(秒)/// </summary>private readonly int _retryInterval;/// <summary>/// RabbitMQ 連接對象/// </summary>private IConnection _connection;/// <summary>/// 標識對象是否已被釋放/// </summary>private bool _disposed;/// <summary>/// 連接操作的線程鎖/// </summary>private readonly SemaphoreSlim _connectionLock = new(1, 1);/// <summary>/// 心跳檢測定時器,用于定期檢查連接狀態/// </summary>private Timer _heartbeatTimer;/// <summary>/// 心跳檢測間隔(秒),默認30秒/// </summary>private const int HeartbeatInterval = 30;/// <summary>/// 預取消息數量/// </summary>private readonly ushort _prefetchCount;/// <summary>/// 最大允許的消費者數量/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 構造函數,初始化RabbitMQ服務/// </summary>/// <param name="logger">日志記錄器,從DI容器注入</param>/// <exception cref="ArgumentNullException">當必需參數為null時拋出</exception>public RabbitMqClient(ILogger<RabbitMqClient> logger){//參數校驗,確保依賴注入的參數不為null_logger = logger ?? throw new ArgumentNullException(nameof(logger));//讀取配置//var settingsValue = settings?.Value ?? throw new ArgumentNullException(nameof(settings));var settingsValue = Config.GetSetting<RabbitMQSettings>("dbsettings.json", "RabbitMQSettings");//從配置中初始化重試參數_retryCount = settingsValue.RetryCount;_retryInterval = settingsValue.RetryInterval;//配置連接工廠參數_connectionFactory = new ConnectionFactory{HostName = settingsValue.HostName, // 主機地址Port = settingsValue.Port, // 端口號UserName = settingsValue.UserName, // 用戶名Password = settingsValue.Password, // 密碼VirtualHost = settingsValue.VirtualHost, // 虛擬主機AutomaticRecoveryEnabled = true, // 啟用自動恢復NetworkRecoveryInterval = TimeSpan.FromSeconds(10), // 網絡恢復間隔10秒RequestedHeartbeat = TimeSpan.FromSeconds(60) // 設置心跳間隔60秒};// 初始化心跳檢測定時器_heartbeatTimer = new Timer(HeartbeatCheck, null, Timeout.Infinite, Timeout.Infinite);//初始化預取消息數量_prefetchCount = settingsValue.PrefetchCount;//初始化消費者數量_maxConsumerCount = settingsValue.ConsumerCount;}/// <summary>/// 檢查當前是否已建立有效連接/// </summary>public bool IsConnected => _connection?.IsOpen == true && !_disposed;/// <summary>/// 預取消息數量/// </summary>public ushort prefetchCount => _prefetchCount;/// <summary>/// 消費者數量/// </summary>public int ConsumerCount => _maxConsumerCount;/// <summary>/// 嘗試建立RabbitMQ連接/// </summary>/// <returns>是否連接成功</returns>public async Task<bool> TryConnectAsync(){// 已連接則直接返回成功if (IsConnected) return true;// 加鎖防止多線程同時創建連接await _connectionLock.WaitAsync();try{// 雙重檢查鎖定模式if (IsConnected) return true;//記錄建立連接_logger.LogInformation("正在建立RabbitMQ連接...");// 帶重試機制的連接邏輯for (int i = 0; i < _retryCount; i++){try{//創建新連接_connection = await _connectionFactory.CreateConnectionAsync().ConfigureAwait(false);//訂閱連接關閉事件_connection.ConnectionShutdownAsync += OnConnectionShutdown;//驗證連接是否成功建立if (IsConnected){//記錄連接成功_logger.LogInformation("RabbitMQ連接已成功建立");//連接成功后啟動心跳檢測 不用心跳檢測可注釋//StartHeartbeatCheck();return true;}}catch (BrokerUnreachableException ex){//專門處理Broker不可達異常_logger.LogWarning(ex, $"RabbitMQ服務不可達,第{i}次重試...");}catch (Exception ex){//處理其他類型的異常_logger.LogWarning(ex, $"RabbitMQ連接異常,第{i}次重試...");}//如果未達到最大重試次數,等待間隔時間后重試if (i < _retryCount){//等待間隔時間后重試await Task.Delay(_retryInterval * 1000).ConfigureAwait(false);}}//記錄連接到最大重試數_logger.LogError($"RabbitMQ連接失敗,已達到最大重試次數{_retryCount}");return false;}finally{//確保鎖被釋放_connectionLock.Release();}}/// <summary>/// 創建通道/// </summary>/// <returns>創建的通道對象</returns>/// <exception cref="InvalidOperationException">當連接不可用時拋出</exception>public async Task<IChannel> CreateChannelAsync(){//確保連接可用if (!IsConnected && !await TryConnectAsync().ConfigureAwait(false)){throw new InvalidOperationException("沒有可用的RabbitMQ連接");}try{return await _connection.CreateChannelAsync().ConfigureAwait(false);}catch (OperationInterruptedException ex){//處理RabbitMQ操作中斷異常_logger.LogError(ex, "創建RabbitMQ通道時操作被中斷");throw;}catch (Exception ex){//處理其他創建通道時的異常_logger.LogError(ex, "創建RabbitMQ通道失敗");throw;}}/// <summary>/// 連接關閉事件處理程序,自動嘗試重新連接/// </summary>private async Task OnConnectionShutdown(object sender, ShutdownEventArgs args){//記錄連接關閉事件,包括關閉原因_logger.LogWarning($"RabbitMQ連接已關閉,原因: {args}");//如果服務未被釋放,嘗試自動重新連接if (!_disposed){try{//異步嘗試重新連接,不阻塞當前線程await TryConnectAsync().ConfigureAwait(false);}catch (Exception ex){//記錄重連失敗異常_logger.LogError(ex, "重連失敗");}}}/// <summary>/// 心跳檢測回調方法,定期檢查連接狀態/// </summary>private async void HeartbeatCheck(object state){try{// 如果連接不存在或已關閉,嘗試重新連接if (!IsConnected){//記錄檢測斷開重新連接_logger.LogWarning("心跳檢測發現連接斷開,嘗試重新連接...");//建立連接await TryConnectAsync().ConfigureAwait(false);}}catch (Exception ex){// 記錄心跳檢測過程中的異常_logger.LogError(ex, "心跳檢測異常");}}/// <summary>/// 啟動心跳檢測定時器/// </summary>private void StartHeartbeatCheck(){// 設置定時器,定期執行心跳檢測_heartbeatTimer.Change(TimeSpan.FromSeconds(HeartbeatInterval),TimeSpan.FromSeconds(HeartbeatInterval));}/// <summary>/// 停止心跳檢測定時器/// </summary>private void StopHeartbeatCheck(){// 禁用定時器_heartbeatTimer.Change(Timeout.Infinite, Timeout.Infinite);}/// <summary>/// 釋放資源/// </summary>public void Dispose(){//釋放Dispose(true);//優化垃圾回收GC.SuppressFinalize(this);}/// <summary>/// 受保護的釋放方法/// </summary>/// <param name="disposing">是否主動釋放</param>private void Dispose(bool disposing){//如果已經釋放,直接返回if (_disposed) return;//如果是主動釋放,處理托管資源if (disposing){try{//停止心跳檢測StopHeartbeatCheck();//釋放定時器資源_heartbeatTimer?.Dispose();//取消事件訂閱if (_connection != null){_connection.ConnectionShutdownAsync -= OnConnectionShutdown;}//釋放連接資源_connection?.Dispose();//釋放線程鎖資源_connectionLock.Dispose();//記錄連接關閉日志_logger.LogInformation("RabbitMQ連接已關閉");}catch (Exception ex){//記錄資源釋放過程中的異常_logger.LogError(ex, "關閉RabbitMQ連接時出錯");}finally{//標記為已釋放狀態_disposed = true;}}}}
}
? ? ? ? 案例如下
? ? ? ? 3.3.服務接口
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;namespace Frame3_DataRepository.RabbitMQRepository
{/// <summary>/// 隊列服務接口/// 供生產者和消費者調用/// </summary>public interface IRabbitMqClient //: IDisposable{/// <summary>/// 獲取當前連接狀態/// true表示已建立有效連接,false表示連接不可用/// </summary>bool IsConnected { get; }/// <summary>/// 預取消息數量/// </summary>ushort prefetchCount { get; }/// <summary>/// 消費者數量/// </summary>int ConsumerCount { get; }/// <summary>/// 嘗試建立RabbitMQ連接/// </summary>/// <returns>是否連接成功</returns>Task<bool> TryConnectAsync();/// <summary>/// 異步創建RabbitMQ通道/// 用于消息發布、消費等操作/// </summary>/// <returns>RabbitMQ通道實例</returns>/// <exception cref="InvalidOperationException">當無法創建連接時拋出</exception>/// <exception cref="OperationInterruptedException">當RabbitMQ操作被中斷時拋出</exception>Task<IChannel> CreateChannelAsync();void Dispose();}
}
? ? ? ? ?案例如下
4.創建生產者服務
? ? ? ? 創建生產者服務實現類?MQProducerService 和接口?IMQProducerService
? ? ? ? 4.1.生產者實現類
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生產者服務實現類/// RabbitMQ/// </summary>public sealed class MQProducerService : BaseService, IMQProducerService{/// <summary>/// 基礎連接服務/// </summary>private readonly IRabbitMqClient _iRabbitMQService;/// <summary>/// 日志記錄器/// </summary>private readonly ILogger<MQProducerService> _logger;/// <summary>/// 構造函數,注入依賴/// </summary>/// <param name="iRabbitMQService"></param>/// <param name="logger"></param>public MQProducerService(IRabbitMqClient iRabbitMQService, ILogger<MQProducerService> logger){//參數校驗,確保依賴注入的參數不為null_iRabbitMQService = iRabbitMQService ?? throw new ArgumentNullException(nameof(iRabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));}/// <summary>/// 發布消息-點對點模式(Point-to-Point)/// </summary>/// <typeparam name="T">消息類型,必須是class類型</typeparam>/// <param name="queueName">目標隊列名稱</param>/// <param name="message">要發布的消息對象</param>/// <param name="messageId">可選的消息ID,未提供時自動生成</param>/// <param name="exchange">可選交換機名稱,默認使用直接交換機</param>/// <param name="headers">可選的消息頭字典</param>/// <param name="withDLX">是否啟用死信隊列</param>/// <param name="maxRetryCount">最大重試次數(僅當啟用死信隊列時有效)</param>/// <returns>異步任務</returns>/// <exception cref="ArgumentNullException">當必要參數為空時拋出</exception>/// <exception cref="InvalidOperationException">當消息序列化失敗時拋出</exception>public async Task PublishByPTPAsync<T>(string queueName, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class{//參數校驗if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "隊列名稱不能為空");if (message == null)throw new ArgumentNullException(nameof(message), "消息內容不能為空");//生成消息ID(如果未提供則使用Guid)messageId = messageId.IsEmpty() ? messageId = Guid.NewGuid().ToString() : messageId;//聲明變量用于存儲序列化后的消息,便于錯誤處理var jsonMessage = string.Empty;//聲明RabbitMQ通道IChannel? channel = null;try{//創建RabbitMQ通道channel = await _iRabbitMQService.CreateChannelAsync();// 死信隊列配置var arguments = new Dictionary<string, object>();if (withDLX){// 死信交換機配置var dlxExchange = $"{queueName}.DLX";var dlxQueue = $"{queueName}.DLQ";// 聲明死信交換機和隊列await channel.ExchangeDeclareAsync(dlxExchange, ExchangeType.Direct, durable: true);await channel.QueueDeclareAsync(queue: dlxQueue,durable: true,exclusive: false,autoDelete: false,arguments: null);await channel.QueueBindAsync(dlxQueue, dlxExchange, dlxQueue);// 設置死信隊列參數arguments.Add("x-dead-letter-exchange", dlxExchange);arguments.Add("x-dead-letter-routing-key", dlxQueue);arguments.Add("x-max-retry-count", maxRetryCount); // 自定義屬性,記錄最大重試次數arguments.Add("x-max-length", 100000);arguments.Add("x-queue-mode", "lazy");}// 添加消費者數量限制arguments["x-max-consumers"] = _iRabbitMQService.ConsumerCount;//聲明隊列(確保隊列存在)var queueDeclareOk = await channel.QueueDeclareAsync(queue: queueName, //隊列名稱durable: true, //隊列持久化(服務器重啟后仍然存在)exclusive: false, //非獨占隊列autoDelete: false, //不自動刪除arguments: arguments//new Dictionary<string, object>//{// // 只允許一個活躍消費者// //["x-single-active-consumer"] = true,// ["x-max-consumers"] = _iRabbitMQService.ConsumerCount,//});//序列化消息為JSON格式jsonMessage = JsonSerializer.Serialize(message);//將消息內容轉換為UTF-8字節數組var body = Encoding.UTF8.GetBytes(jsonMessage);//創建消息屬性var properties = new BasicProperties{Persistent = true, //消息持久化(需要隊列也是持久化的才有效)MessageId = messageId, //設置唯一消息ID用于追蹤ContentType = "application/json", //明確指定內容類型為JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加時間戳};//設置消息頭(如果提供)if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}// 發布消息到隊列await channel.BasicPublishAsync(exchange: exchange ?? string.Empty, //交換機名稱(空字符串表示默認direct交換機)routingKey: queueName, //路由鍵(對于默認交換機就是隊列名)mandatory: false, //不強制要求消息被路由到隊列basicProperties: properties, //消息屬性body: body //消息體);//記錄成功日志(結構化日志)_logger.LogInformation("消息發布成功。\r\n交換機: {Exchange}\r\n消息ID: {MessageId}\r\n隊列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默認)", messageId, queueName, jsonMessage);}catch (JsonException jsonEx){// 專門處理JSON序列化錯誤_logger.LogError(jsonEx, "消息序列化失敗。\r\n類型: {MessageType}", typeof(T).Name);throw new InvalidOperationException("消息序列化失敗", jsonEx);}catch (OperationInterruptedException opEx){// 處理RabbitMQ操作中斷異常_logger.LogError(opEx, "RabbitMQ操作中斷。\r\n消息: {jsonMessage}\r\n隊列: {queueName}", jsonMessage, queueName);throw;}catch (Exception ex){// 處理其他所有異常_logger.LogError(ex, "消息發布失敗。\r\n交換機: {Exchange}\r\n隊列: {queueName}\r\n消息: {jsonMessage}", exchange ?? "(默認)", queueName, jsonMessage);throw;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}/// <summary>/// 發布消息-發布/訂閱模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息類型,必須是 class 類型</typeparam>/// <param name="exchangeName">目標 Exchange 名稱</param>/// <param name="message">要發布的消息對象</param>/// <param name="messageId">可選的消息唯一標識符,默認自動生成</param>/// <param name="headers">可選的消息頭字典</param>/// <returns>異步任務</returns>public async Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//校驗 Exchange 名稱是否為空或空白字符串if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//校驗消息內容是否為 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息內容不能為空");//如果未提供消息ID,則使用 Guid 生成唯一的 IDmessageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于存儲序列化后的 JSON 消息(便于日志和異常處理)var jsonMessage = string.Empty;//聲明 RabbitMQ 通道變量,初始為 nullIChannel? channel = null;try{//創建 RabbitMQ 通道channel = await _iRabbitMQService.CreateChannelAsync();//聲明 Fanout 類型的 Exchange(廣播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout, //扇出類型,廣播給所有綁定隊列durable: true, //可持久化autoDelete: false); //不自動刪除//將消息對象序列化為 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//將 JSON 消息轉換為 UTF-8 編碼的字節數組var body = Encoding.UTF8.GetBytes(jsonMessage);//創建消息屬性var properties = new BasicProperties{Persistent = true, //消息持久化//DeliveryMode = (DeliveryModes)DeliveryMode.Persistent,MessageId = messageId, //設置唯一消息IDContentType = "application/json", //內容類型為 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //添加時間戳};//如果提供了 Headers,則設置到消息屬性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//向 Exchange 發送消息(不指定 Routing Key,Fanout 類型忽略此參數)await channel.BasicPublishAsync(exchange: exchangeName, //目標 Exchange 名稱routingKey: string.Empty, //Fanout 類型不需要路由鍵mandatory: false, //不要求消息必須被投遞basicProperties: properties, //消息屬性body: body); //消息體字節數據//記錄消息發布成功的日志信息_logger.LogInformation("消息已發布到Exchange。\r\nExchange: {Exchange}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}", exchangeName, messageId, jsonMessage);}catch (JsonException jsonEx){//捕獲 JSON 序列化異常并記錄錯誤日志_logger.LogError(jsonEx, "消息序列化失敗。\r\n類型: {MessageType}", typeof(T).Name);//拋出自定義異常,包含原始異常信息throw new InvalidOperationException("消息序列化失敗", jsonEx);}catch (Exception ex){//捕獲其他所有異常并記錄錯誤日志_logger.LogError(ex, "消息發布到Exchange失敗。\r\nExchange: {Exchange}\r\n消息: {jsonMessage}", exchangeName, jsonMessage);//拋出異常throw;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}/// <summary>/// 發布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息體的類型,必須為引用類型</typeparam>/// <param name="exchangeName">要發布的交換機名稱。不能為空或空白字符串。</param>/// <param name="routingKey">消息的路由鍵,用于匹配綁定隊列。不能為空或空白字符串。</param>/// <param name="message">要發送的消息對象,將被序列化為 JSON 格式。</param>/// <param name="messageId">消息的唯一標識符。如果未提供,則自動生成 Guid 字符串。</param>/// <param name="headers">可選的消息頭部信息,用于攜帶額外元數據。</param>/// <returns>異步任務</returns>public async Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//檢查 Exchange 名稱是否為空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//檢查路由鍵是否為空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由鍵不能為空");//檢查消息對象是否為 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息內容不能為空");//如果沒有提供 MessageId,則生成一個 Guid 字符串作為唯一標識messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于記錄日志的消息 JSON 字符串var jsonMessage = string.Empty;//聲明一個 IChannel 對象,初始為 nullIChannel? channel = null;try{//創建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();//聲明一個 Direct 類型的 Exchange(如果不存在則創建)await channel.ExchangeDeclareAsync(exchange: exchangeName, //指定 Exchange 的名稱type: ExchangeType.Direct, //指定 Exchange 的類型為 Direct(直連模式)durable: true, //設置為持久化 Exchange,RabbitMQ 重啟后不會丟失autoDelete: false //不自動刪除 Exchange,即使最后一個隊列被解綁也不會自動刪除);//將消息對象序列化為 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//將 JSON 字符串轉換為 UTF-8 編碼的字節數組var body = Encoding.UTF8.GetBytes(jsonMessage);//創建并初始化 BasicProperties(消息屬性)var properties = new BasicProperties{Persistent = true, //設置消息持久化MessageId = messageId, //設置消息 IDContentType = "application/json", //內容類型為 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //時間戳};//如果提供了自定義 Headers,則復制到消息屬性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//發布消息到指定 Exchange 和路由鍵await channel.BasicPublishAsync(exchange: exchangeName, //指定消息要發送到的 Exchange 名稱routingKey: routingKey, //指定消息的路由鍵,用于 Exchange 路由決策mandatory: false, //如果為 true,當消息無法路由到任何隊列時會返回給生產者;false 則直接丟棄basicProperties: properties, //消息的屬性,如持久化、內容類型、消息 ID、時間戳等body: body //消息的實際內容(字節數組),通常是序列化后的 JSON 數據);//記錄日志:消息發送成功_logger.LogInformation("消息已通過路由鍵發送。\r\nExchange: {exchangeName}\r\n路由鍵: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕獲異常并記錄日志_logger.LogError(ex, "消息發送失敗。\r\nExchange: {exchangeName}\r\n路由鍵: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//拋出異常以便上層處理throw;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}/// <summary>/// 發布消息-主題模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要發布的交換機名稱。不能為空或空白字符串。</param>/// <param name="routingKey">消息的路由鍵,用于匹配 Topic 類型 Exchange 的綁定規則。不能為空或空白字符串。</param>/// <param name="message">要發送的消息對象,將被序列化為 JSON 格式。</param>/// <param name="messageId">消息的唯一標識符。如果未提供,則自動生成 Guid 字符串。</param>/// <param name="headers">可選的消息頭部信息,用于攜帶額外元數據。</param>/// <returns>異步任務</returns>public async Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class{//檢查 Exchange 名稱是否為空或空白字符if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//檢查路由鍵是否為空或空白字符if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentNullException(nameof(routingKey), "路由鍵不能為空");//檢查消息對象是否為 nullif (message == null)throw new ArgumentNullException(nameof(message), "消息內容不能為空");//如果沒有提供 MessageId,則生成一個 Guid 字符串作為唯一標識messageId = messageId.IsEmpty() ? Guid.NewGuid().ToString() : messageId;//用于記錄日志的消息 JSON 字符串var jsonMessage = string.Empty;//聲明一個 IChannel 對象,初始為 nullIChannel? channel = null;try{//創建 RabbitMQ Channel(通道)channel = await _iRabbitMQService.CreateChannelAsync();// 刪除已存在的 Exchange(如果不需要保留消息)//await channel.ExchangeDeleteAsync("TopicTest");//聲明一個 Topic 類型的 Exchange(如果不存在則創建)await channel.ExchangeDeclareAsync(exchange: exchangeName, //指定要聲明的 Exchange 名稱,名稱由外部傳入的 exchangeName 變量指定type: ExchangeType.Topic, //設置 Exchange 的類型為 Topic(主題模式),支持通配符路由鍵匹配durable: true, //設置 Exchange 為持久化,即使 RabbitMQ 重啟也不會丟失autoDelete: false //設置 Exchange 不自動刪除,即使最后一個綁定被移除后仍保留);//將消息對象序列化為 JSON 字符串jsonMessage = JsonSerializer.Serialize(message);//將 JSON 字符串轉換為 UTF-8 編碼的字節數組var body = Encoding.UTF8.GetBytes(jsonMessage);//創建并初始化 BasicProperties(消息屬性)var properties = new BasicProperties{Persistent = true, //設置消息持久化MessageId = messageId, //設置消息 IDContentType = "application/json", //內容類型為 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) //時間戳};//如果提供了自定義 Headers,則復制到消息屬性中if (headers != null && headers.Any()){properties.Headers = new Dictionary<string, object>(headers);}//發布消息到指定 Exchange 和路由鍵await channel.BasicPublishAsync(exchange: exchangeName, //指定消息要發送到的 Exchange 名稱routingKey: routingKey, //指定消息的路由鍵,用于 Exchange 路由決策mandatory: false, //如果為 true,當消息無法路由到任何隊列時會返回給生產者;false 則直接丟棄basicProperties: properties, //消息的屬性,如持久化、內容類型、消息 ID、時間戳等body: body //消息的實際內容(字節數組),通常是序列化后的 JSON 數據);//記錄日志:消息發送成功_logger.LogInformation("消息已通過路由鍵發送。\r\nExchange: {exchangeName}\r\n路由鍵: {routingKey}\r\n消息ID: {MessageId}\r\n消息: {jsonMessage}",exchangeName, routingKey, messageId, jsonMessage);}catch (Exception ex){//捕獲異常并記錄日志_logger.LogError(ex, "消息發送失敗。\r\nExchange: {exchangeName}\r\n路由鍵: {routingKey}\r\n消息: {jsonMessage}",exchangeName, routingKey, jsonMessage);//拋出異常以便上層處理throw;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}/// <summary>/// 發布消息-請求/響應(RPC)/// </summary>/// <typeparam name="TRequest">請求消息的類型,必須為引用類型</typeparam>/// <typeparam name="TResponse">期望的響應消息類型,必須為引用類型</typeparam>/// <param name="exchangeName">要發送請求的目標 Exchange 名稱。不能為空或空白字符串。</param>/// <param name="routingKey">用于路由請求消息的路由鍵。不能為空或空白字符串。</param>/// <param name="request">請求對象,將被序列化為 JSON 并作為消息體發送。</param>/// <param name="timeout">等待響應的超時時間。默認為 default(可能無限期等待)。</param>/// <returns>異步任務</returns>public async Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class{// 參數校驗if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名稱不能為空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由鍵不能為空", nameof(routingKey));if (request == null)throw new ArgumentNullException(nameof(request));// 設置默認超時時間(30秒)var actualTimeout = timeout == default ? TimeSpan.FromSeconds(5) : timeout;if (actualTimeout <= TimeSpan.Zero)throw new ArgumentException("超時時間必須大于0", nameof(timeout));// 生成唯一 CorrelationIdvar correlationId = Guid.NewGuid().ToString();// 創建 TaskCompletionSourcevar tcs = new TaskCompletionSource<TResponse>();// 創建獨立 Channelvar channel = await _iRabbitMQService.CreateChannelAsync();try{// 在 PublishByPRCAsync 方法中,發送請求前添加:await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct, // RPC通常使用Directdurable: true, // 持久化autoDelete: false);// 聲明臨時隊列用于接收響應var replyQueue = await channel.QueueDeclareAsync(queue: "",durable: false,exclusive: true,autoDelete: true);// 創建消費者var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += (model, ea) =>{try{if (ea.BasicProperties?.CorrelationId == correlationId){var response = JsonSerializer.Deserialize<TResponse>(ea.Body.Span);tcs.TrySetResult(response);}}catch (Exception ex){tcs.TrySetException(ex);}return Task.CompletedTask;};// 開始監聽回復隊列var consumerTag = await channel.BasicConsumeAsync(queue: replyQueue,autoAck: true,consumer: consumer);// 構建消息屬性var props = new BasicProperties();props.ReplyTo = replyQueue;props.CorrelationId = correlationId;props.ContentType = "application/json";// 序列化請求體var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(request));// 發送請求await channel.BasicPublishAsync(exchange: exchangeName,routingKey: routingKey,mandatory: false,basicProperties: props,body: body);// 設置超時取消using var cts = new CancellationTokenSource(actualTimeout);cts.Token.Register(() =>{if (!tcs.Task.IsCompleted){tcs.TrySetException(new TimeoutException($"RPC請求超時({actualTimeout.TotalSeconds}秒)"));channel.BasicCancelAsync(consumerTag);}});return await tcs.Task;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}/// <summary>/// 將死信隊列中的消息重新發布到原始隊列(泛型版本)/// </summary>/// <typeparam name="T">消息體的類型(如 DTO 類)</typeparam>/// <param name="queueName">原始隊列名稱</param>/// <param name="batchSize">每次處理的消息批大小</param>/// <param name="delay">重發延遲時間(毫秒)</param>/// <returns>成功處理的消息數量</returns>public async Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class{// 檢查傳入的隊列名是否為空或空白字符串,若為空則拋出異常if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName));// 構造死信隊列(DLQ)名稱,格式為:{原始隊列名}.DLQvar dlxQueueName = $"{queueName}.DLQ";// 聲明一個 RabbitMQ 的 Channel 對象,用于后續操作IChannel? channel = null;// 記錄已處理的消息數量int processedCount = 0;try{// 創建一個新的 Channel 實例(通過服務注入的 _iRabbitMQService)channel = await _iRabbitMQService.CreateChannelAsync();// 檢查死信隊列是否存在(被動聲明方式)try{// 如果不存在會拋出異常,catch 中捕獲并記錄日志后返回 0await channel.QueueDeclarePassiveAsync(dlxQueueName);}catch{// 日志記錄:如果 DLQ 不存在,則直接返回 0_logger.LogWarning("死信隊列 {DLXQueueName} 不存在", dlxQueueName);return 0;}// 循環獲取最多 batchSize 條消息for (int i = 0; i < batchSize; i++){// 使用 BasicGet 從 DLQ 獲取一條消息(不自動確認)var result = await channel.BasicGetAsync(dlxQueueName, autoAck: false);// 如果沒有更多消息了,跳出循環if (result == null)break;// 獲取消息體內容,并轉為 byte[] 數組var body = result.Body.ToArray();// 獲取原始消息屬性(BasicProperties),用于后續操作var originalProperties = result.BasicProperties;// 獲取當前消息的 DeliveryTag,用于確認或拒絕消息var deliveryTag = result.DeliveryTag;try{// 如果設置了 delay > 0,則等待指定時間(模擬延遲重試)if (delay > 0)await Task.Delay(delay);// 生成新的唯一 MessageId,用于追蹤消息var messageId = Guid.NewGuid().ToString();// 創建新的 BasicProperties 實例,用于新消息的屬性設置var properties = new BasicProperties{Persistent = true, // 設置消息持久化(需隊列也持久化才生效)MessageId = messageId, // 設置唯一消息 IDContentType = "application/json", // 明確內容類型為 JSONTimestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()) // 添加當前時間戳};// 將消息重新發布到原始隊列,使用默認交換機(exchange 為空)await channel.BasicPublishAsync(exchange: string.Empty,routingKey: queueName,mandatory: false,basicProperties: properties,body: body);// 確認 DLQ 中該消息已被正確處理(ACK)await channel.BasicAckAsync(deliveryTag, multiple: false);// 成功處理計數器 +1processedCount++;// 日志記錄:消息已成功重新發布_logger.LogInformation("已重新發布死信消息 {MessageId} 到隊列 {QueueName}",properties.MessageId ?? "未知ID", queueName);}catch (Exception ex){// 日志記錄:消息重發失敗_logger.LogError(ex, "重新發布死信消息失敗,DeliveryTag={DeliveryTag}", deliveryTag);// 拒絕消息并重新入隊(回到 DLQ),requeue: true 表示重新入隊await channel.BasicNackAsync(deliveryTag, multiple: false, requeue: true);}}// 返回成功處理的消息數量return processedCount;}catch (Exception ex){// 日志記錄:整個處理過程中發生錯誤_logger.LogError(ex, "處理死信隊列 {DLXQueueName} 時發生錯誤", dlxQueueName);// 拋出異常,供上層調用者捕獲處理throw;}finally{// 確保通道資源被釋放await channel?.CloseAsync();}}}
}
? ? ? ? 案例如下
? ? ? ? ?4.2.生產者接口
namespace Frame3_DataRepository.RabbitMQRepository.Producer
{/// <summary>/// 生產者服務接口/// RabbitMQ/// </summary>public interface IMQProducerService{/// <summary>/// 發布消息-點對點模式(Point-to-Point)/// </summary>/// <param name="queue">隊列名稱</param>/// <typeparam name="T">消息類型,必須是引用類型</typeparam>/// <param name="message">要發布的消息對象</param>/// <param name="messageId">消息ID(可選,未提供時自動生成GUID)</param>/// <param name="exchange">交換機名稱(空字符串表示默認交換機)</param>/// <returns>異步任務</returns>/// <exception cref="ArgumentNullException">當隊列名或消息為空時拋出</exception>Task PublishByPTPAsync<T>(string queue, T message, string messageId = null, string exchange = null, IDictionary<string, object> headers = null, bool withDLX = true, int maxRetryCount = 3) where T : class;/// <summary>/// 發布消息-發布/訂閱模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息類型,必須是 class 類型</typeparam>/// <param name="exchangeName">目標 Exchange 名稱</param>/// <param name="message">要發布的消息對象</param>/// <param name="messageId">可選的消息唯一標識符,默認自動生成</param>/// <param name="headers">可選的消息頭字典</param>/// <returns>異步任務</returns>Task PublishByPubSubAsync<T>(string exchangeName, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 發布消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息體的類型,必須為引用類型</typeparam>/// <param name="exchangeName">要發布的交換機名稱。不能為空或空白字符串。</param>/// <param name="routingKey">消息的路由鍵,用于匹配綁定隊列。不能為空或空白字符串。</param>/// <param name="message">要發送的消息對象,將被序列化為 JSON 格式。</param>/// <param name="messageId">消息的唯一標識符。如果未提供,則自動生成 Guid 字符串。</param>/// <param name="headers">可選的消息頭部信息,用于攜帶額外元數據。</param>/// <returns>異步任務</returns>Task PublishByRoutingAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 發布消息-主題模式(Topic)/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchangeName">要發布的交換機名稱。不能為空或空白字符串。</param>/// <param name="routingKey">消息的路由鍵,用于匹配 Topic 類型 Exchange 的綁定規則。不能為空或空白字符串。</param>/// <param name="message">要發送的消息對象,將被序列化為 JSON 格式。</param>/// <param name="messageId">消息的唯一標識符。如果未提供,則自動生成 Guid 字符串。</param>/// <param name="headers">可選的消息頭部信息,用于攜帶額外元數據。</param>/// <returns>異步任務</returns>Task PublishByTopicAsync<T>(string exchangeName, string routingKey, T message, string messageId = null, IDictionary<string, object> headers = null) where T : class;/// <summary>/// 發布消息-請求/響應(RPC)/// </summary>/// <typeparam name="TRequest">請求消息的類型,必須為引用類型</typeparam>/// <typeparam name="TResponse">期望的響應消息類型,必須為引用類型</typeparam>/// <param name="exchangeName">要發送請求的目標 Exchange 名稱。不能為空或空白字符串。</param>/// <param name="routingKey">用于路由請求消息的路由鍵。不能為空或空白字符串。</param>/// <param name="request">請求對象,將被序列化為 JSON 并作為消息體發送。</param>/// <param name="timeout">等待響應的超時時間。默認為 default(可能無限期等待)。</param>/// <returns>異步任務</returns>Task<TResponse> PublishByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, TRequest request, TimeSpan timeout = default) where TRequest : class where TResponse : class;/// <summary>/// 將死信隊列中的消息重新發布到原始隊列(泛型版本)/// </summary>/// <typeparam name="T">消息體的類型(如 DTO 類)</typeparam>/// <param name="queueName">原始隊列名稱</param>/// <param name="batchSize">每次處理的消息批大小</param>/// <param name="delay">重發延遲時間(毫秒)</param>/// <returns>成功處理的消息數量</returns>Task<int> RepublishDeadLetterMessagesAsync<T>(string queueName, int batchSize = 100, int delay = 0) where T : class;}
}
? ? ? ? 案例如下
5.創建消費者服務
?????????創建消費者服務實現類?MQConsumerService 和接口?IMQConsumerService
? ? ? ? 5.1.消費者服務接口
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;
using Frame6_LibraryUtility;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消費者服務實現類/// 提供可靠的消息消費功能,支持自動重試和錯誤處理/// </summary>public sealed class MQConsumerService : BaseServiceSingleton, IMQConsumerService{/// <summary>/// RabbitMQ 基礎服務/// </summary>private readonly IRabbitMqClient _rabbitMQService;/// <summary>/// 日志記錄器/// </summary>private readonly ILogger<MQConsumerService> _logger;/// <summary>/// 最大消費者數量限制/// </summary>private readonly int _maxConsumerCount;/// <summary>/// 當前消費者數量計數器(線程安全)/// </summary>private int _currentConsumerCount;/// <summary>/// 用于限制消費者數量的信號量/// </summary>private readonly SemaphoreSlim _consumerLimitSemaphore;/// <summary>/// 當前活躍的消費者字典,線程安全集合/// Key: 消費者標簽/// Value: (通道對象, 消費者對象)/// </summary>private readonly ConcurrentDictionary<string, (IChannel Channel, AsyncEventingBasicConsumer Consumer)> _activeConsumers;/// <summary>/// 構造函數,依賴注入初始化/// </summary>/// <param name="rabbitMQService">RabbitMQ基礎服務</param>/// <param name="logger">日志記錄器</param>/// <exception cref="ArgumentNullException">當參數為null時拋出</exception>public MQConsumerService(IRabbitMqClient rabbitMQService, ILogger<MQConsumerService> logger){_rabbitMQService = rabbitMQService ?? throw new ArgumentNullException(nameof(rabbitMQService));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_activeConsumers = new ConcurrentDictionary<string, (IChannel, AsyncEventingBasicConsumer)>();_maxConsumerCount = _rabbitMQService.ConsumerCount;_currentConsumerCount = 0;_consumerLimitSemaphore = new SemaphoreSlim(_maxConsumerCount, _maxConsumerCount);}/// <summary>/// 消費消息-點對點(Point-to-Point)/// </summary>/// <typeparam name="T">消息類型</typeparam>/// <param name="queueName">要消費的隊列名稱</param>/// <param name="messageHandler">消息處理委托</param>/// <param name="prefetchCount">預取消息數量,控制消費者負載</param>/// <param name="autoAck">是否自動確認消息,建議設為false實現可靠消費</param>/// <param name="withDLX">是否啟用死信隊列</param>/// <returns>取消令牌源,用于停止消費</returns>public async Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class{//參數校驗if (string.IsNullOrWhiteSpace(queueName))throw new ArgumentNullException(nameof(queueName), "隊列名稱不能為空");if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息處理器不能為空");//等待獲取消費者槽位(帶超時防止死鎖)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消費者槽位超時,當前已有 {_maxConsumerCount} 個活躍消費者");}//當前消費者數量+1Interlocked.Increment(ref _currentConsumerCount);//創建取消令牌源var cancellationTokenSource = new CancellationTokenSource();//創建通道IChannel? channel = null;try{//賦值預讀取數量if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//創建通道channel = await _rabbitMQService.CreateChannelAsync();//設置QoS(服務質量),控制預取消息數量await channel.BasicQosAsync(prefetchSize: 0, //不限制預取消息總大小prefetchCount: prefetchCount, //prefetchCount > 0 ? prefetchCount : _rabbitMQService.prefetchCount, //每次預取的消息數量global: false //應用于當前消費者而非整個通道);//檢查隊列是否存在try{await channel.QueueDeclarePassiveAsync(queueName);}catch{_logger.LogError($"隊列 {queueName} 不存在");throw;}//創建消費者var consumer = new AsyncEventingBasicConsumer(channel);//注冊消息接收事件consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息var message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//記錄接收日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\n隊列: {QueueName}\r\n消息體:{message}", ea.BasicProperties.MessageId, queueName, message?.ToJson());//處理消息await messageHandler(message);//如果不是自動確認模式,手動確認消息if (!autoAck){await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, //消息投遞標簽multiple: false //不批量確認);}}catch (JsonException jsonEx){//處理反序列化錯誤_logger.LogError(jsonEx, "消息反序列化失敗。\r\n隊列: {QueueName}", queueName);//拒絕消息,不重新入隊if (!autoAck){await channel.BasicRejectAsync(deliveryTag: ea.DeliveryTag, requeue: false);}}catch (Exception ex){//處理業務邏輯錯誤_logger.LogError(ex, "消息處理失敗。\r\n隊列: {QueueName}", queueName);//如果不是自動確認模式(autoAck=false),需要手動處理消息確認和重試邏輯if (!autoAck){//獲取當前消息的屬性對象var properties = ea.BasicProperties;//獲取消息頭,如果headers為null則創建新的字典var headers = properties.Headers ?? new Dictionary<string, object>();//獲取當前重試次數,如果不存在x-retry-count頭則默認為0int retryCount = headers.TryGetValue("x-retry-count", out var retryObj) ? Convert.ToInt32(retryObj) : 0;//獲取最大重試次數,如果不存在x-max-retry-count頭則默認為1int maxRetryCount = headers.TryGetValue("x-max-retry-count", out var maxRetryObj) ? Convert.ToInt32(maxRetryObj) : 1;//如果啟用了死信隊列(withDLX=true)且當前重試次數已達最大重試次數if (withDLX && retryCount >= maxRetryCount){//記錄警告日志,說明消息已達到最大重試次數_logger.LogWarning("消息已達到最大重試次數 {MaxRetryCount},將被移入死信隊列", maxRetryCount);//拒絕消息,requeue=false表示不重新入隊,消息將被路由到死信隊列await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}else{//創建新的消息屬性對象,復制原始消息的所有屬性var newProperties = new BasicProperties{ContentType = properties.ContentType, //復制內容類型ContentEncoding = properties.ContentEncoding, //復制內容編碼DeliveryMode = properties.DeliveryMode, //復制投遞模式(1-非持久化,2-持久化)Priority = properties.Priority, //復制消息優先級CorrelationId = properties.CorrelationId, //復制關聯ID(用于請求 - 響應模式)ReplyTo = properties.ReplyTo, //復制回復隊列名稱Expiration = properties.Expiration, //復制消息過期時間MessageId = properties.MessageId, //復制消息IDTimestamp = properties.Timestamp, //復制時間戳Type = properties.Type, //復制消息類型UserId = properties.UserId, //復制用戶IDAppId = properties.AppId, //復制應用IDClusterId = properties.ClusterId, //復制集群ID//復制消息頭,并更新重試次數Headers = new Dictionary<string, object>(headers){["x-retry-count"] = retryCount + 1 //重試次數+1}};//重新發布消息到原始隊列await channel.BasicPublishAsync(exchange: string.Empty, //exchange: 空字符串表示默認direct交換機routingKey: queueName, //使用原始隊列名稱mandatory: false, //false表示如果無法路由則丟棄消息basicProperties: newProperties, //使用更新后的屬性(包含新的重試次數)body: ea.Body //原始消息體);//確認原始消息已被處理(從隊列中移除)//- deliveryTag: 消息投遞標簽//- multiple: false表示只確認單條消息await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}}};//開始消費隊列消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName, //隊列名稱autoAck: autoAck, //自動確認設置consumer: consumer //消費者實例);//將消費者添加到活躍集合_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注冊取消令牌回調,當取消時停止消費者cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//記錄消費者/總消費者數量_logger.LogInformation("成功啟動消費者,當前消費者數: {_currentConsumerCount}/{_maxConsumerCount},當前消費者標記: {consumerTag}", _currentConsumerCount, _maxConsumerCount, consumerTag);return cancellationTokenSource;}catch (Exception ex){//清理創建失敗的通道資源channel?.Dispose();//當前消費者數量-1Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();//記錄啟動失敗日志_logger.LogError(ex, "啟動消費者失敗。隊列: {QueueName}", queueName);throw;}//finally//{// // 如果出錯或任務完成,確保 channel 被釋放// channel?.Dispose();//}}/// <summary>/// 消費消息-發布/訂閱模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息類型</typeparam>/// <param name="exchangeName">要訂閱的Exchange名稱</param>/// <param name="messageHandler">消息處理委托</param>/// <param name="prefetchCount">預取消息數量,控制消費者負載</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>取消令牌源,用于停止消費</returns>public async Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校驗exchange名稱是否為空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//校驗消息處理器是否為空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息處理器不能為空");//等待獲取消費者槽位(防止并發消費者過多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//獲取失敗則拋出超時異常throw new InvalidOperationException($"等待消費者槽位超時,當前已有 {_maxConsumerCount} 個活躍消費者");}//原子增加當前消費者數量Interlocked.Increment(ref _currentConsumerCount);//創建取消令牌源用于后續停止消費var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{//如果未指定prefetchCount,則使用默認值if (prefetchCount == 0){prefetchCount = _rabbitMQService.prefetchCount;}//創建 RabbitMQ 通道channel = await _rabbitMQService.CreateChannelAsync();//設置QoS(服務質量),限制預取的消息數量await channel.BasicQosAsync(prefetchSize: 0,prefetchCount: prefetchCount,global: false);//聲明一個 fanout 類型的 Exchange(廣播模式)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Fanout,durable: true, //可持久化autoDelete: false); //不自動刪除//自定義臨時隊列名稱var queueName = "PubSub-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//創建臨時隊列(由RabbitMQ自動生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName, //隊列名稱durable: false, //隊列是否持久化 false:隊列僅存于內存,RabbitMQ 重啟后隊列丟失(適合臨時隊列) true:隊列會持久化到磁盤,RabbitMQ 重啟后仍存在(適合重要消息)exclusive: true, //隊列是否排他 true:隊列僅對當前連接可見,連接關閉后隊列自動刪除(適合臨時私有隊列) false:隊列可被多個消費者共享(默認值,適合常規隊列)autoDelete: true //隊列是否自動刪除 true:當最后一個消費者斷開連接后,隊列自動刪除(適合臨時隊列) false:隊列不會自動刪除,需手動調用 QueueDelete 刪除(默認值));//將隊列綁定到Exchange(fanout類型忽略routingKey)await channel.QueueBindAsync(queue: queueName,exchange: exchangeName,routingKey: "");//創建異步消費者對象var consumer = new AsyncEventingBasicConsumer(channel);//注冊消息接收事件處理邏輯consumer.ReceivedAsync += async (model, ea) =>{try{//反序列化消息體為泛型對象Tvar message = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//記錄收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息體:{message}", ea.BasicProperties.MessageId, exchangeName, message?.ToJson());//調用用戶定義的消息處理方法await messageHandler(message);//如果不是自動確認,則手動發送 Ack 確認消息已處理if (!autoAck){await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}}catch (JsonException jsonEx){//消息反序列化失敗,記錄錯誤日志_logger.LogError(jsonEx, "消息反序列化失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,不重入隊列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息處理過程中發生其他異常,記錄錯誤日志_logger.LogError(ex, "消息處理失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,并嘗試重新入隊if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};//啟動消費,開始監聽消息var consumerTag = await channel.BasicConsumeAsync(queue: queueName,autoAck: autoAck,consumer: consumer);//將消費者和通道保存起來以便后續取消操作_activeConsumers.TryAdd(consumerTag, (channel, consumer));//注冊取消回調,當取消令牌被觸發時調用StopConsumingcancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//記錄啟動成功日志_logger.LogInformation("成功啟動訂閱者,當前消費者數: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);//返回取消令牌源,供外部控制消費終止return cancellationTokenSource;}catch (Exception ex){//出現異常時釋放資源channel?.Dispose();//原子減少消費者計數Interlocked.Decrement(ref _currentConsumerCount);//釋放信號量槽位_consumerLimitSemaphore.Release();//記錄啟動失敗日志_logger.LogError(ex, "啟動訂閱者失敗。Exchange: {ExchangeName}", exchangeName);//拋出異常throw;}}/// <summary>/// 消費消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目標類型</typeparam>/// <param name="exchangeName">要綁定的 Exchange 名稱</param>/// <param name="routingKey">用于綁定隊列和 Exchange 的路由鍵</param>/// <param name="messageHandler">處理接收到的消息的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>public async Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校驗exchange名稱是否為空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//校驗路由鍵是否為空if (routingKey == null)throw new ArgumentNullException(nameof(routingKey), "路由鍵不能為空");//校驗消息處理器是否為空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息處理器不能為空");//等待獲取消費者槽位(防止并發消費者過多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//獲取失敗則拋出超時異常throw new InvalidOperationException($"等待消費者槽位超時,當前已有 {_maxConsumerCount} 個活躍消費者");}//原子增加當前消費者數量Interlocked.Increment(ref _currentConsumerCount);// 創建 CancellationTokenSource,用于后續控制取消消費var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 創建一個新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 聲明一個 Direct 類型的 Exchange(如果不存在則創建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Direct,durable: true, // Exchange 持久化autoDelete: false); // 不自動刪除//自定義臨時隊列名稱var queueName = "Routing-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//創建臨時隊列(由RabbitMQ自動生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName, //隊列名稱durable: false, //隊列是否持久化 false:隊列僅存于內存,RabbitMQ 重啟后隊列丟失(適合臨時隊列) true:隊列會持久化到磁盤,RabbitMQ 重啟后仍存在(適合重要消息)exclusive: true, //隊列是否排他 true:隊列僅對當前連接可見,連接關閉后隊列自動刪除(適合臨時私有隊列) false:隊列可被多個消費者共享(默認值,適合常規隊列)autoDelete: true //隊列是否自動刪除 true:當最后一個消費者斷開連接后,隊列自動刪除(適合臨時隊列) false:隊列不會自動刪除,需手動調用 QueueDelete 刪除(默認值));// 將隊列綁定到指定的 Exchange,并使用給定的 routingKeyawait channel.QueueBindAsync(queueName, exchangeName, routingKey);// 創建異步消費者var consumer = new AsyncEventingBasicConsumer(channel);// 注冊消息接收事件處理邏輯consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息體為泛型 T 對象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//記錄收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息體:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 調用用戶定義的消息處理函數await messageHandler(msg);// 如果不是自動確認,則手動發送 Ack 確認消息已處理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失敗,記錄錯誤日志_logger.LogError(jsonEx, "消息反序列化失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,不重入隊列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息處理過程中發生其他異常,記錄錯誤日志_logger.LogError(ex, "消息處理失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,并嘗試重新入隊if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 開始消費隊列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 記錄當前消費者信息以便后續取消或釋放資源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注冊取消令牌,在取消時調用 StopConsuming 方法停止消費cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//記錄啟動成功日志_logger.LogInformation("成功啟動訂閱者,當前消費者數: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出現異常時釋放資源channel?.Dispose();//原子減少消費者計數Interlocked.Decrement(ref _currentConsumerCount);//釋放信號量槽位_consumerLimitSemaphore.Release();//記錄啟動失敗日志_logger.LogError(ex, "啟動訂閱者失敗。Exchange: {ExchangeName}", exchangeName);//拋出異常throw;}}/// <summary>/// 消費消息-主題模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目標類型</typeparam>/// <param name="exchangeName">要綁定的 Exchange 名稱</param>/// <param name="topicPattern">用于綁定隊列的 Topic 匹配規則(如 user.*)</param>/// <param name="messageHandler">處理接收到的消息的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>public async Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class{//校驗exchange名稱是否為空if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentNullException(nameof(exchangeName), "Exchange名稱不能為空");//校驗匹配規則是否為空if (topicPattern == null)throw new ArgumentNullException(nameof(topicPattern), "topicPattern不能為空");//校驗消息處理器是否為空if (messageHandler == null)throw new ArgumentNullException(nameof(messageHandler), "消息處理器不能為空");//等待獲取消費者槽位(防止并發消費者過多)if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){//獲取失敗則拋出超時異常throw new InvalidOperationException($"等待消費者槽位超時,當前已有 {_maxConsumerCount} 個活躍消費者");}//原子增加當前消費者數量Interlocked.Increment(ref _currentConsumerCount);// 創建 CancellationTokenSource,用于后續控制取消消費var cancellationTokenSource = new CancellationTokenSource();IChannel? channel = null;try{// 創建一個新的 RabbitMQ Channelchannel = await _rabbitMQService.CreateChannelAsync();// 聲明一個 Topic 類型的 Exchange(如果不存在則創建)await channel.ExchangeDeclareAsync(exchange: exchangeName,type: ExchangeType.Topic,durable: true, // Exchange 持久化autoDelete: false); // 不自動刪除//自定義臨時隊列名稱var queueName = "Topic-" + DateTime.Now.ToString("yyyyMMddHHmmssfff");//創建臨時隊列(由RabbitMQ自動生成名字)//var queueResult = await channel.QueueDeclareAsync();var queueResult = await channel.QueueDeclareAsync(queue: queueName, //隊列名稱durable: false, //隊列是否持久化 false:隊列僅存于內存,RabbitMQ 重啟后隊列丟失(適合臨時隊列) true:隊列會持久化到磁盤,RabbitMQ 重啟后仍存在(適合重要消息)exclusive: true, //隊列是否排他 true:隊列僅對當前連接可見,連接關閉后隊列自動刪除(適合臨時私有隊列) false:隊列可被多個消費者共享(默認值,適合常規隊列)autoDelete: true //隊列是否自動刪除 true:當最后一個消費者斷開連接后,隊列自動刪除(適合臨時隊列) false:隊列不會自動刪除,需手動調用 QueueDelete 刪除(默認值));// 將隊列綁定到指定的 Exchange,并使用 Topic 模式匹配規則await channel.QueueBindAsync(queueName, exchangeName, topicPattern);// 創建異步消費者var consumer = new AsyncEventingBasicConsumer(channel);// 注冊消息接收事件處理邏輯consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化消息體為泛型 T 對象var msg = JsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(ea.Body.Span));//記錄收到消息的日志_logger.LogInformation("收到消息。\r\n消息ID: {MessageId}\r\nExchange: {ExchangeName}\r\n消息體:{message}", ea.BasicProperties.MessageId, exchangeName, msg?.ToJson());// 調用用戶定義的消息處理函數await messageHandler(msg);// 如果不是自動確認,則手動發送 Ack 確認消息已處理成功if (!autoAck)await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);}catch (JsonException jsonEx){//消息反序列化失敗,記錄錯誤日志_logger.LogError(jsonEx, "消息反序列化失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,不重入隊列if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: false);}}catch (Exception ex){//消息處理過程中發生其他異常,記錄錯誤日志_logger.LogError(ex, "消息處理失敗。\r\nExchange: {ExchangeName}", exchangeName);//非自動確認模式下拒絕消息,并嘗試重新入隊if (!autoAck){await channel.BasicRejectAsync(ea.DeliveryTag, requeue: true);}}};// 開始消費隊列中的消息var consumerTag = await channel.BasicConsumeAsync(queueName, autoAck, consumer);// 記錄當前消費者信息以便后續取消或釋放資源_activeConsumers.TryAdd(consumerTag, (channel, consumer));// 注冊取消令牌,在取消時調用 StopConsuming 方法停止消費cancellationTokenSource.Token.Register(() => StopConsuming(consumerTag).ConfigureAwait(false));//記錄啟動成功日志_logger.LogInformation("成功啟動訂閱者,當前消費者數: {_currentConsumerCount}/{_maxConsumerCount}", _currentConsumerCount, _maxConsumerCount);// 返回 CancellationTokenSource,供外部控制取消return cancellationTokenSource;}catch (Exception ex){//出現異常時釋放資源channel?.Dispose();//原子減少消費者計數Interlocked.Decrement(ref _currentConsumerCount);//釋放信號量槽位_consumerLimitSemaphore.Release();//記錄啟動失敗日志_logger.LogError(ex, "啟動訂閱者失敗。Exchange: {ExchangeName}", exchangeName);//拋出異常throw;}}/// <summary>/// 消費消息-請求/響應(RPC)/// </summary>/// <typeparam name="TRequest">請求消息的類型</typeparam>/// <typeparam name="TResponse">響應消息的類型</typeparam>/// <param name="exchangeName">Exchange 名稱,通常為空字符串表示默認 Exchange</param>/// <param name="routingKey">用于監聽的隊列名稱(同時也是 routingKey)</param>/// <param name="handler">處理請求并返回響應的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>public async Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class{// 參數校驗if (string.IsNullOrWhiteSpace(exchangeName))throw new ArgumentException("Exchange名稱不能為空", nameof(exchangeName));if (string.IsNullOrWhiteSpace(routingKey))throw new ArgumentException("路由鍵不能為空", nameof(routingKey));if (handler == null)throw new ArgumentNullException(nameof(handler), "消息處理器不能為空");// 等待獲取消費者槽位if (!await _consumerLimitSemaphore.WaitAsync(TimeSpan.FromSeconds(30))){throw new InvalidOperationException($"等待消費者槽位超時,當前已有 {_maxConsumerCount} 個活躍消費者");}try{Interlocked.Increment(ref _currentConsumerCount);// 創建新的 RabbitMQ Channelvar channel = await _rabbitMQService.CreateChannelAsync();try{// 設置預取數量(控制并發處理能力)if (prefetchCount > 0){await channel.BasicQosAsync(0, prefetchCount, false);}// 聲明隊列(與生產者保持一致)var queueDeclareOk = await channel.QueueDeclareAsync(queue: routingKey, //隊列名稱durable: true, //隊列是否持久化 false:隊列僅存于內存,RabbitMQ 重啟后隊列丟失(適合臨時隊列) true:隊列會持久化到磁盤,RabbitMQ 重啟后仍存在(適合重要消息)exclusive: false, //隊列是否排他 true:隊列僅對當前連接可見,連接關閉后隊列自動刪除(適合臨時私有隊列) false:隊列可被多個消費者共享(默認值,適合常規隊列)autoDelete: false, //隊列是否自動刪除 true:當最后一個消費者斷開連接后,隊列自動刪除(適合臨時隊列) false:隊列不會自動刪除,需手動調用 QueueDelete 刪除(默認值)arguments: null);// 綁定隊列到Exchangeawait channel.QueueBindAsync(queue: routingKey,exchange: exchangeName,routingKey: routingKey);// 創建異步消費者var consumer = new AsyncEventingBasicConsumer(channel);// 消息處理邏輯consumer.ReceivedAsync += async (model, ea) =>{try{// 反序列化請求var request = JsonSerializer.Deserialize<TRequest>(ea.Body.Span);// 處理請求var response = await handler(request);// 準備響應屬性var replyProps = new BasicProperties();replyProps.CorrelationId = ea.BasicProperties.CorrelationId;replyProps.ContentType = "application/json";// 發送響應await channel.BasicPublishAsync(exchange: "", // 默認ExchangeroutingKey: ea.BasicProperties.ReplyTo,mandatory: false,basicProperties: replyProps,body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(response)));// 確認消息await channel.BasicAckAsync(ea.DeliveryTag, false);}catch (Exception ex){_logger.LogError(ex, "處理RPC請求失敗: {CorrelationId}",ea.BasicProperties?.CorrelationId);// 拒絕消息且不重新入隊await channel.BasicNackAsync(ea.DeliveryTag, false, false);}};// 開始消費var consumerTag = await channel.BasicConsumeAsync(queue: routingKey,autoAck: false, // 手動確認consumer: consumer);// 創建取消令牌var cts = new CancellationTokenSource();// 注冊取消回調cts.Token.Register(async () =>{try{await channel.BasicCancelAsync(consumerTag);await channel.CloseAsync();}catch (Exception ex){_logger.LogWarning(ex, "取消消費者時發生錯誤");}finally{channel.Dispose();Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();}});return cts;}catch{// 發生異常時確保通道被關閉channel?.Dispose();throw;}}catch{// 發生異常時釋放信號量Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();throw;}}/// <summary>/// 停止指定消費者的消息消費/// </summary>/// <param name="consumerTag">消費者標簽</param>/// <returns>異步任務</returns>public async Task StopConsuming(string consumerTag){//從活躍集合中移除消費者if (_activeConsumers.TryRemove(consumerTag, out var consumerInfo)){try{//取消消費者訂閱await consumerInfo.Channel.BasicCancelAsync(consumerTag);//異步釋放通道資源await consumerInfo.Channel.DisposeAsync();//記錄停止成功日志_logger.LogInformation("已停止消費者。消費者標簽: {ConsumerTag}", consumerTag);}catch (OperationInterruptedException opEx){//記錄操作中斷警告日志_logger.LogWarning(opEx, "消費者取消操作被中斷。消費者標簽: {ConsumerTag}", consumerTag);}catch (Exception ex){//記錄停止失敗錯誤日志_logger.LogError(ex, "停止消費者時出錯。消費者標簽: {ConsumerTag}", consumerTag);throw;}finally{Interlocked.Decrement(ref _currentConsumerCount);_consumerLimitSemaphore.Release();_logger.LogInformation("當前消費者數: {CurrentCount}/{MaxCount}", _currentConsumerCount, _maxConsumerCount);}}else{// 記錄未找到消費者警告日志_logger.LogWarning("未找到對應的消費者。消費者標簽: {ConsumerTag}", consumerTag);}}/// <summary>/// 停止所有消費者的消息消費/// </summary>/// <returns>異步任務</returns>public async Task StopAllConsuming(){// 遍歷所有消費者標簽并停止foreach (var consumerTag in _activeConsumers.Keys.ToList()){try{await StopConsuming(consumerTag).ConfigureAwait(false);}catch (Exception ex){// 記錄單個消費者停止失敗日志,繼續停止其他消費者_logger.LogError(ex, "停止消費者時出錯。消費者標簽: {ConsumerTag}", consumerTag);}}}/// <summary>/// 獲取當前消費者狀態/// </summary>public ConsumerStatus GetConsumerStatus(){// 創建并返回一個新的 ConsumerStatus 對象,用于封裝當前消費者的運行狀態return new ConsumerStatus{// 設置當前消費者數量CurrentCount = _currentConsumerCount,// 設置最大消費者數量MaxCount = _maxConsumerCount,// 獲取當前所有活躍消費者的標識符列表ActiveConsumers = _activeConsumers.Keys.ToList()};}}}
? ? ? ? 案例如下
5.2.消費者接口
using Frame3_DataRepository.RabbitMQRepository.BaseMQ;namespace Frame3_DataRepository.RabbitMQRepository.Consumer
{/// <summary>/// 消費者服務接口/// RabbitMQ/// </summary>public interface IMQConsumerService{/// <summary>/// 消費消息-點對點(Point-to-Point)/// </summary>/// <typeparam name="T">消息類型</typeparam>/// <param name="queueName">要消費的隊列名稱</param>/// <param name="messageHandler">消息處理委托</param>/// <param name="prefetchCount">預取消息數量,控制消費者負載</param>/// <param name="autoAck">是否自動確認消息,建議設為false實現可靠消費</param>/// <returns>取消令牌源,用于停止消費</returns>/// <exception cref="ArgumentNullException">當必要參數為空時拋出</exception>Task<CancellationTokenSource> StartConsumingByPTPAsync<T>(string queueName, Func<T, Task> messageHandler, bool withDLX = true, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消費消息-發布/訂閱模式(Pub/Sub)/// </summary>/// <typeparam name="T">消息類型</typeparam>/// <param name="exchangeName">要訂閱的Exchange名稱</param>/// <param name="messageHandler">消息處理委托</param>/// <param name="prefetchCount">預取消息數量,控制消費者負載</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>取消令牌源,用于停止消費</returns>Task<CancellationTokenSource> StartConsumingByPubSubAsync<T>(string exchangeName, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消費消息-路由模式(Routing)/// </summary>/// <typeparam name="T">消息反序列化的目標類型</typeparam>/// <param name="exchangeName">要綁定的 Exchange 名稱</param>/// <param name="routingKey">用于綁定隊列和 Exchange 的路由鍵</param>/// <param name="messageHandler">處理接收到的消息的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>Task<CancellationTokenSource> StartConsumingByRoutingAsync<T>(string exchangeName, string routingKey, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消費消息-主題模式(Topic)/// </summary>/// <typeparam name="T">消息反序列化的目標類型</typeparam>/// <param name="exchangeName">要綁定的 Exchange 名稱</param>/// <param name="topicPattern">用于綁定隊列的 Topic 匹配規則(如 user.*)</param>/// <param name="messageHandler">處理接收到的消息的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <param name="autoAck">是否自動確認消息</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>Task<CancellationTokenSource> StartConsumingByTopicAsync<T>(string exchangeName, string topicPattern, Func<T, Task> messageHandler, ushort prefetchCount = 0, bool autoAck = false) where T : class;/// <summary>/// 消費消息-請求/響應(RPC)/// </summary>/// <typeparam name="TRequest">請求消息的類型</typeparam>/// <typeparam name="TResponse">響應消息的類型</typeparam>/// <param name="exchangeName">Exchange 名稱,通常為空字符串表示默認 Exchange</param>/// <param name="routingKey">用于監聽的隊列名稱(同時也是 routingKey)</param>/// <param name="handler">處理請求并返回響應的異步回調函數</param>/// <param name="prefetchCount">預取消息數量,默認為0(未使用)</param>/// <returns>CancellationTokenSource,用于取消消費操作</returns>Task<CancellationTokenSource> StartConsumingByPRCAsync<TRequest, TResponse>(string exchangeName, string routingKey, Func<TRequest, Task<TResponse>> handler, ushort prefetchCount = 0) where TRequest : class where TResponse : class;/// <summary>/// 停止指定消費者的消息消費/// </summary>/// <param name="consumerTag">消費者標簽</param>/// <returns>異步任務</returns>Task StopConsuming(string consumerTag);/// <summary>/// 停止所有消費者的消息消費/// </summary>/// <returns>異步任務</returns>Task StopAllConsuming();/// <summary>/// 獲取當前消費者狀態/// </summary>/// <returns></returns>ConsumerStatus GetConsumerStatus();}
}
? ? ? ? 案例如下
6.注冊
? ? ? ? ?在 Program 或?Startup 中注冊隊列。
// 注冊 RabbitMQ 連接服務為單例(Singleton)// IRabbitMqClient 是一個接口,代表 RabbitMQ 客戶端連接的抽象// RabbitMqClient 是其具體實現類// 單例生命周期意味著在整個應用程序生命周期中只創建一次該實例,所有請求共享同一個實例builder.Services.AddSingleton<IRabbitMqClient, RabbitMqClient>();// 注冊 RabbitMQ 消息生產者服務為作用域(Scoped)// IMQProducerService 是用于發送消息的接口// MQProducerService 是其實現類// Scoped 生命周期表示在同一個請求上下文中使用同一個實例(適用于 Web 應用場景)builder.Services.AddScoped<IMQProducerService, MQProducerService>();// 注冊 RabbitMQ 消息消費者服務為單例(Singleton)// IMQConsumerService 是用于消費消息(接收并處理消息)的接口// MQConsumerService 是其實現類// 使用 Singleton 是因為消費者通常需要長時間運行、持續監聽隊列,適合整個應用周期內保持一個實例builder.Services.AddSingleton<IMQConsumerService, MQConsumerService>();
? ? ? ? ?案例如下
7.簡單使用案例
? ? ? ? 下面是 實現、接口和控制器的使用案例
????????7.1.實現
using Frame1_Service.IService.Product;
using Frame2_DataModel.Entity.Products;
using Frame3_DataRepository.RabbitMQRepository.Consumer;
using Frame3_DataRepository.RabbitMQRepository.Producer;
using Frame6_LibraryUtility;
using RabbitMQ.Client.Exceptions;namespace Frame1_Service.Service.Product
{public class RabbitMQTestSvr : BaseService, IRabbitMQTestSvr{/// <summary>/// 生產者/// </summary>private readonly IMQProducerService _iRabbitMQProducer;/// <summary>/// 消費者/// </summary>private readonly IMQConsumerService _iRabbitMQConsumer;/// <summary>/// 構造/// </summary>/// <param name="iRabbitMQProducer"></param>/// <param name="iRabbitMQConsumer"></param>public RabbitMQTestSvr(IMQProducerService iRabbitMQProducer, IMQConsumerService iRabbitMQConsumer){_iRabbitMQConsumer = iRabbitMQConsumer;_iRabbitMQProducer = iRabbitMQProducer;}/// <summary>/// 模擬消費邏輯/// </summary>/// <param name="model"></param>/// <returns></returns>private async Task ProcessOrderAsync(ProductsEntity model){Console.WriteLine("消費成功:" + model.ToJson());}/// <summary>/// 生產者-點對點(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 創建 Random 實例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "測試" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPTPAsync<ProductsEntity>("ProducerTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消費者-點對點(Point-to-Point)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTest(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPTPAsync<ProductsEntity>("ProducerTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生產者-發布訂閱(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 創建 Random 實例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "測試" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByPubSubAsync<ProductsEntity>("PubSubTest", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消費者-發布訂閱(Pub/Sub)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerPubSub(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByPubSubAsync<ProductsEntity>("PubSubTest", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生產者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 創建 Random 實例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "測試" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消費者-路由模式(Routing)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRouting(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByRoutingAsync<ProductsEntity>("RoutingTest", "Routing", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生產者-主題模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model){var result = new ResultModel<bool>() { Data = false };// 創建 Random 實例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "測試" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);await _iRabbitMQProducer.PublishByTopicAsync<ProductsEntity>("TopicTest", "Topic.test", model);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 消費者-主題模式(Topic)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerTopic(){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQConsumer.StartConsumingByTopicAsync<ProductsEntity>("TopicTest", "Topic.*", ProcessOrderAsync);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 生產者-請求響應模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model){var result = new ResultModel<CalculateResponse>();var request = new CalculateRequest { X = 5, Y = 7 };// 創建 Random 實例Random random = new Random();model.Id = Guid.NewGuid().ToString();model.ProductName = "測試" + (random.Next(1, 999)).ToString();model.Price = random.Next(1000, 9999);model.Stock = random.Next(1, 99);var response = await _iRabbitMQProducer.PublishByPRCAsync<CalculateRequest, CalculateResponse>("RPCTest", "RPC", request);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = response;return result;}/// <summary>/// 消費者-請求響應模式(RPC)/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> ConsumerRPC(){var result = new ResultModel<bool> { Data = false };try{//示例:模擬一個計算器服務(可替換為真實的 ICalculatorService)Func<CalculateRequest, Task<CalculateResponse>> handler = async req =>{await Task.Delay(10); //模擬異步處理return new CalculateResponse { Result = req.X + req.Y };};//啟動消費者var cts = await _iRabbitMQConsumer.StartConsumingByPRCAsync<CalculateRequest, CalculateResponse>(exchangeName: "RPCTest", routingKey: "RPC", handler);result.Data = true;result.Code = ResultCodeEnum.Success;result.Msg = "RPC消費者已啟動";}catch (OperationInterruptedException ex){result.Msg = "消息隊列服務不可用";}catch (Exception ex){result.Msg = "消費者初始化失敗";}return result;}/// <summary>/// 死信隊列重拋/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> Republish(string queueName){var result = new ResultModel<bool>() { Data = false };await _iRabbitMQProducer.RepublishDeadLetterMessagesAsync<ProductsEntity>(queueName);result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}/// <summary>/// 停止消費者/// </summary>/// <returns></returns>public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag){var result = new ResultModel<bool>() { Data = false };if (consumerTag.Equals("0")){await _iRabbitMQConsumer.StopAllConsuming();}else{await _iRabbitMQConsumer.StopConsuming(consumerTag);}result.Code = ResultCodeEnum.Success;result.Msg = "操作成功";result.Data = true;return result;}}
}
? ? ? ? 案例如下
?
? ? ? ? 7.2.接口
using Frame2_DataModel.Entity.Products;
using Frame6_LibraryUtility;namespace Frame1_Service.IService.Product
{public interface IRabbitMQTestSvr{/// <summary>/// 生產者-點對點(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTest(ProductsEntity model);/// <summary>/// 消費者-點對點(Point-to-Point)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTest();/// <summary>/// 生產者-發布訂閱(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model);/// <summary>/// 消費者-發布訂閱(Pub/Sub)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerPubSub();/// <summary>/// 生產者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerRouting(ProductsEntity model);/// <summary>/// 消費者-路由模式(Routing)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRouting();/// <summary>/// 生產者-主題模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<bool>> ProducerTopic(ProductsEntity model);/// <summary>/// 消費者-主題模式(Topic)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerTopic();/// <summary>/// 生產者-請求響應模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model);/// <summary>/// 消費者-請求響應模式(RPC)/// </summary>/// <returns></returns>Task<ResultModel<bool>> ConsumerRPC();/// <summary>/// 死信隊列重拋/// </summary>/// <returns></returns>Task<ResultModel<bool>> Republish(string queueName);/// <summary>/// 停止消費者/// </summary>/// <returns></returns>Task<ResultModel<bool>> StopAllConsumer(string consumerTag);}
}
? ? ? ? 案例如下
????????7.3.控制器
using Frame1_Service.IService.Product;
using Frame1_Service.Service.Product;
using Frame2_DataModel.Entity.Products;
using Frame4_LibraryCore.BaseConfig;
using Frame6_LibraryUtility;
using Microsoft.AspNetCore.Mvc;namespace DemoAPI.Controllers
{/// <summary>/// 消息隊列控制器 -RabbitMQ/// </summary>//[Authorize]// 保護整個控制器[Route("api/[controller]/[action]")]//標記路由地址規格[ApiController] // 標記該類為 API 控制器,啟用一些默認的行為,如模型綁定、輸入驗證等[ApiExplorerSettings(GroupName = nameof(ApiVersionInfo.V1))]//設置控制器的API版本public class RabbitMQTestController : BaseController{private IRabbitMQTestSvr _iRabbitMQTestSvr;/// <summary>/// 構造/// </summary>/// <param name="iRabbitMQTestSvr"></param>public RabbitMQTestController(IRabbitMQTestSvr iRabbitMQTestSvr) {_iRabbitMQTestSvr = iRabbitMQTestSvr;}/// <summary>/// 生產者-點對點(Point-to-Point)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTest(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTest(model);/// <summary>/// 消費者-點對點(Point-to-Point)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTest() => await _iRabbitMQTestSvr.ConsumerTest();/// <summary>/// 生產者-發布訂閱(Pub/Sub)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerPubSub(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerPubSub(model);/// <summary>/// 消費者-發布訂閱(Pub/Sub)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerPubSub() => await _iRabbitMQTestSvr.ConsumerPubSub();/// <summary>/// 生產者-路由模式(Routing)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerRouting(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRouting(model);/// <summary>/// 消費者-路由模式(Routing)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRouting() => await _iRabbitMQTestSvr.ConsumerRouting();/// <summary>/// 生產者-主題模式(Topic)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<bool>> ProducerTopic(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerTopic(model);/// <summary>/// 消費者-主題模式(Topic)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerTopic() => await _iRabbitMQTestSvr.ConsumerTopic();/// <summary>/// 生產者-請求響應模式(RPC)/// </summary>/// <param name="model"></param>/// <returns></returns>[HttpPost]public async Task<ResultModel<CalculateResponse>> ProducerRPC(ProductsEntity model) => await _iRabbitMQTestSvr.ProducerRPC(model);/// <summary>/// 消費者-請求響應模式(RPC)/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> ConsumerRPC() => await _iRabbitMQTestSvr.ConsumerRPC();/// <summary>/// 死信隊列重拋/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> Republish(string queueName) => await _iRabbitMQTestSvr.Republish(queueName);/// <summary>/// 停止消費者/// </summary>/// <returns></returns>[HttpGet]public async Task<ResultModel<bool>> StopAllConsumer(string consumerTag) => await _iRabbitMQTestSvr.StopAllConsumer(consumerTag);}
}
? ? ? ? 案例如下