在 .NET 8 中集成 RabbitMQ 消息隊列,可以使用官方推薦的 RabbitMQ.Client
庫或封裝好的 MassTransit/EasyNetQ
等高級庫。以下是 RabbitMQ 的基本集成代碼 和 常見消息模式 的實現。
RabbitMQ 本身并沒有直接支持延時消息的功能,但是可以通過一些機制來實現延時消息的效果。以下是兩種常用的方法:
- TTL(Time To Live)+ 死信交換機(Dead Letter Exchange, DLX)
可以為隊列或消息設置 TTL,當消息的 TTL 到期后,如果沒有被消費,就會變成死信。
設置了死信交換機(DLX)的隊列中的死信會被轉發到指定的 DLX 上,然后可以由綁定到這個 DLX 的隊列進行處理,這樣就實現了延時消息的功能。
- 使用插件 rabbitmq-delayed-message-exchange
RabbitMQ 提供了一個官方插件 rabbitmq-delayed-message-exchange,它允許你創建一個特殊的交換機類型,該交換機能夠接受帶有延遲時間的消息,并在指定的時間后將消息投遞給相應的隊列。
這個插件需要安裝并啟用,并且要求 Erlang/OPT 版本在 18.0 及以上。
一、. RabbitMQ 基礎集成(.NET 8)
安裝 NuGet 包
dotnet add package RabbitMQ.Client
配置 RabbitMQ 連接
csharp
using RabbitMQ.Client;public class RabbitMQService
{private readonly IConnection _connection;private readonly IModel _channel;public RabbitMQService(string hostname = "localhost", string username = "guest", string password = "guest"){var factory = new ConnectionFactory{HostName = hostname,UserName = username,Password = password,DispatchConsumersAsync = true // 啟用異步消費};_connection = factory.CreateConnection();_channel = _connection.CreateModel();}public void Dispose(){_channel?.Close();_connection?.Close();}
}
二. RabbitMQ 常見消息模式
(1)、 簡單隊列(Simple Queue)
場景:生產者發送消息到隊列,消費者從隊列接收消息(一對一)。
- 生產者(Producer)
csharp
public void SendMessage(string queueName, string message)
{_channel.QueueDeclare(queue: queueName,durable: true, // 持久化隊列exclusive: false,autoDelete: false);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchange: "",routingKey: queueName,basicProperties: null,body: body);
}
- 消費者(Consumer)
csharp
public void ReceiveMessages(string queueName)
{_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");await Task.Yield(); // 模擬異步處理_channel.BasicAck(ea.DeliveryTag, false); // 手動ACK};_channel.BasicConsume(queue: queueName,autoAck: false, // 關閉自動ACKconsumer: consumer);
}
(2)、 工作隊列(Work Queue)
場景:多個消費者競爭消費同一個隊列的消息(任務分發)。
-
生產者
同 簡單隊列 的 SendMessage 方法。 -
消費者
csharp
// 啟動多個消費者實例,RabbitMQ 會輪詢分發消息
for (int i = 0; i < 3; i++) // 3個消費者
{Task.Run(() =>{using var service = new RabbitMQService();service.ReceiveMessages("task_queue");Console.WriteLine($"Consumer {i} started...");Thread.Sleep(Timeout.Infinite);});
}
(3)、 發布/訂閱(Pub/Sub)【Fannout】
場景:一個生產者發送消息到交換機(Exchange),多個隊列綁定到交換機,每個隊列有自己的消費者(廣播模式)。
- 生產者(發送到交換機)
csharp
public void PublishToExchange(string exchangeName, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // Fanout 廣播var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, "", null, body);
}
- 消費者(綁定隊列到交換機)
csharp
public void SubscribeToExchange(string exchangeName, string queueName)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, "");var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(4) 、路由模式(Routing)
場景:根據 RoutingKey 定向投遞消息到特定隊列。
- 生產者
csharp
public void SendWithRouting(string exchangeName, string routingKey, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, routingKey, null, body);
}
- 消費者
csharp
public void ReceiveWithRouting(string exchangeName, string queueName, string routingKey)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, routingKey);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(5) 、主題模式(Topic)
場景:使用通配符(*、#)匹配 RoutingKey,實現靈活路由。
*(星號)匹配單個單詞
#(井號)匹配多個單詞
- 生產者
csharp
public void SendWithTopic(string exchangeName, string topic, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, topic, null, body);
}
- 消費者
csharp
public void ReceiveWithTopic(string exchangeName, string queueName, string topicPattern)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, topicPattern);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(6)、 RPC(遠程過程調用)
場景:客戶端發送請求并等待服務端響應(同步通信)。
- 客戶端(RPC Client)
public string Call(string message, string queueName = "rpc_queue")
{var correlationId = Guid.NewGuid().ToString();var replyQueueName = _channel.QueueDeclare().QueueName;var properties = _channel.CreateBasicProperties();properties.ReplyTo = replyQueueName;properties.CorrelationId = correlationId;var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish("", queueName, properties, body);var tcs = new TaskCompletionSource<string>();var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{if (ea.BasicProperties.CorrelationId == correlationId){var response = Encoding.UTF8.GetString(ea.Body.Span);tcs.SetResult(response);}};_channel.BasicConsume(replyQueueName, autoAck: true, consumer);return tcs.Task.Result; // 同步等待響應(生產環境建議用異步)
}
- 服務端(RPC Server)
csharp
public void StartRpcServer(string queueName = "rpc_queue")
{_channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received RPC call: {message}");var response = $"Response to: {message}";var responseBytes = Encoding.UTF8.GetBytes(response);var properties = ea.BasicProperties;var replyProps = _channel.CreateBasicProperties();replyProps.CorrelationId = properties.CorrelationId;_channel.BasicPublish("",properties.ReplyTo,replyProps,responseBytes);_channel.BasicAck(ea.DeliveryTag, false);await Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: false, consumer);
}
三. 完整示例(.NET 8 Worker Service)
- 生產者項目
// Program.cs
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();var rabbitMQ = new RabbitMQService();
rabbitMQ.SendMessage("hello_queue", "Hello, RabbitMQ!");app.Run();
- 消費者項目(Worker Service)
csharp
// Program.cs
IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();}).Build();await host.RunAsync();// Worker.cs
public class Worker : BackgroundService
{private readonly RabbitMQService _rabbitMQ;public Worker(){_rabbitMQ = new RabbitMQService();}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_rabbitMQ.ReceiveMessages("hello_queue");while (!stoppingToken.IsCancellationRequested){await Task.Delay(1000, stoppingToken);}}
}
四. 總結
| 模式 | 適用場景 | 關鍵點 |
| 簡單隊列 | 一對一消息傳遞 | QueueDeclare + BasicPublish |
| 工作隊列 | 任務分發(競爭消費) | 多個消費者監聽同一隊列 |
| 發布/訂閱 | 廣播消息 | ExchangeType.Fanout |
| 路由模式 | 定向路由 | ExchangeType.Direct + RoutingKey |
| 主題模式 | 靈活匹配路由 | ExchangeType.Topic + */# |
| RPC | 同步請求-響應 | ReplyTo + CorrelationId |
推薦實踐
連接管理:使用 IHostedService 或單例模式管理 IConnection 和 IModel。
異常處理:監聽 Connection.ConnectionShutdown 事件并重連。
性能優化:啟用 DispatchConsumersAsync = true 支持異步消費。
高級封裝:考慮使用 MassTransit 或 EasyNetQ 簡化開發。
通過以上模式,可以靈活應對 異步任務處理、事件驅動架構、微服務通信 等場景。