為什么使用消息隊列
消息隊列(MQ)在分布式系統中用于解耦生產者和消費者,提高系統的異步處理能力、削峰填谷、增強可擴展性和可靠性。通過消息隊列,任務可以異步執行,避免系統因瞬時高并發而崩潰。
消息隊列場景
- 異步處理:耗時操作(如郵件發送、日志記錄)通過消息隊列異步完成,避免阻塞主流程。
- 削峰填谷:突發流量通過消息隊列緩沖,避免直接沖擊后端服務。
- 解耦:系統模塊間通過消息通信,降低直接依賴。
- 最終一致性:分布式事務中通過消息隊列實現數據最終一致性。
MQ的部署與實踐
- 安裝RabbitMQ
通過官方安裝包或Docker部署RabbitMQ,例如:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
- .NET集成
安裝NuGet包RabbitMQ.Client
,初始化連接:var factory = new ConnectionFactory { HostName = "localhost" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel();
發布確認和消費確認
- 發布確認
啟用發布確認模式,確保消息成功到達Broker:channel.ConfirmSelect(); channel.BasicPublish(exchange: "", routingKey: "queue", mandatory: true, basicProperties: null, body: body); channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
- 消費確認
手動ACK確保消息被正確處理:var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => {// 處理邏輯channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "queue", autoAck: false, consumer: consumer);
路由模式
通過Direct
交換器實現路由鍵精確匹配:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
channel.BasicPublish(exchange: "direct_logs", routingKey: "error", body: body);
主題模式
使用Topic
交換器支持通配符匹配路由鍵:
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
channel.BasicPublish(exchange: "topic_logs", routingKey: "logs.error", body: body);
發布確認機制
通過事務或確認機制確保消息可靠性:
channel.TxSelect();
try {channel.BasicPublish(exchange: "", routingKey: "queue", body: body);channel.TxCommit();
} catch {channel.TxRollback();
}
消費重復問題
- 冪等性設計:業務邏輯需支持重復消費(如唯一鍵約束)。
- 去重表:記錄已處理消息ID,避免重復執行。
TTL消息過期特性
設置消息或隊列的TTL(Time-To-Live):
var args = new Dictionary<string, object> { { "x-message-ttl", 60000 } };
channel.QueueDeclare(queue: "ttl_queue", arguments: args);
延遲隊列和死信隊列
- 死信隊列:消息過期或被拒絕時轉發到死信隊列:
var args = new Dictionary<string, object> {{ "x-dead-letter-exchange", "dlx_exchange" } }; channel.QueueDeclare(queue: "main_queue", arguments: args);
- 延遲隊列:通過TTL+死信隊列模擬延遲效果。
消息持久化
確保消息和隊列持久化:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.QueueDeclare(queue: "persistent_queue", durable: true);
channel.BasicPublish(exchange: "", routingKey: "persistent_queue", basicProperties: properties, body: body);
RabbitMQ集群模式
- 普通集群:節點間同步元數據,但消息不冗余。
- 鏡像隊列:消息在多個節點間鏡像復制,實現高可用。
# 加入集群 rabbitmqctl join_cluster rabbit@node1 rabbitmqctl set_policy ha-all "^ha." '{"ha-mode":"all"}'
以上內容涵蓋了RabbitMQ在.NET中的核心使用場景和高級特性,可根據實際需求選擇配置。
推薦學習 NetCoreKevin 框架
NetCoreKevin 是一個基于 .NET Core 的開源框架,專注于簡化微服務架構和身份驗證的實現。它內置了 IdentityServer4 集成、JWT 認證、API 網關等功能,適合構建現代化的分布式系統。
學習資源:
- GitHub 倉庫:NetCoreKevin
該框架提供了比標準 IdentityServer4 更簡潔的配置方式,適合快速開發企業級應用。