【消息隊列】RabbitMQ “消息隊列模式” 以及NET8集成

在 .NET 8 中集成 RabbitMQ 消息隊列,可以使用官方推薦的 RabbitMQ.Client 庫或封裝好的 MassTransit/EasyNetQ 等高級庫。以下是 RabbitMQ 的基本集成代碼 和 常見消息模式 的實現。

RabbitMQ 本身并沒有直接支持延時消息的功能,但是可以通過一些機制來實現延時消息的效果。以下是兩種常用的方法:

  • TTL(Time To Live)+ 死信交換機(Dead Letter Exchange, DLX)

可以為隊列或消息設置 TTL,當消息的 TTL 到期后,如果沒有被消費,就會變成死信。
設置了死信交換機(DLX)的隊列中的死信會被轉發到指定的 DLX 上,然后可以由綁定到這個 DLX 的隊列進行處理,這樣就實現了延時消息的功能。

  • 使用插件 rabbitmq-delayed-message-exchange

RabbitMQ 提供了一個官方插件 rabbitmq-delayed-message-exchange,它允許你創建一個特殊的交換機類型,該交換機能夠接受帶有延遲時間的消息,并在指定的時間后將消息投遞給相應的隊列。
這個插件需要安裝并啟用,并且要求 Erlang/OPT 版本在 18.0 及以上。

一、. RabbitMQ 基礎集成(.NET 8)

安裝 NuGet 包

dotnet add package RabbitMQ.Client

配置 RabbitMQ 連接

csharp
using RabbitMQ.Client;public class RabbitMQService
{private readonly IConnection _connection;private readonly IModel _channel;public RabbitMQService(string hostname = "localhost", string username = "guest", string password = "guest"){var factory = new ConnectionFactory{HostName = hostname,UserName = username,Password = password,DispatchConsumersAsync = true // 啟用異步消費};_connection = factory.CreateConnection();_channel = _connection.CreateModel();}public void Dispose(){_channel?.Close();_connection?.Close();}
}
二. RabbitMQ 常見消息模式
(1)、 簡單隊列(Simple Queue)

場景:生產者發送消息到隊列,消費者從隊列接收消息(一對一)。

  • 生產者(Producer)
csharp
public void SendMessage(string queueName, string message)
{_channel.QueueDeclare(queue: queueName,durable: true, // 持久化隊列exclusive: false,autoDelete: false);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchange: "",routingKey: queueName,basicProperties: null,body: body);
}
  • 消費者(Consumer)
csharp
public void ReceiveMessages(string queueName)
{_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");await Task.Yield(); // 模擬異步處理_channel.BasicAck(ea.DeliveryTag, false); // 手動ACK};_channel.BasicConsume(queue: queueName,autoAck: false, // 關閉自動ACKconsumer: consumer);
}
(2)、 工作隊列(Work Queue)

場景:多個消費者競爭消費同一個隊列的消息(任務分發)。

  • 生產者
    同 簡單隊列 的 SendMessage 方法。

  • 消費者

csharp
// 啟動多個消費者實例,RabbitMQ 會輪詢分發消息
for (int i = 0; i < 3; i++) // 3個消費者
{Task.Run(() =>{using var service = new RabbitMQService();service.ReceiveMessages("task_queue");Console.WriteLine($"Consumer {i} started...");Thread.Sleep(Timeout.Infinite);});
}
(3)、 發布/訂閱(Pub/Sub)【Fannout】

場景:一個生產者發送消息到交換機(Exchange),多個隊列綁定到交換機,每個隊列有自己的消費者(廣播模式)。

  • 生產者(發送到交換機)
csharp
public void PublishToExchange(string exchangeName, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // Fanout 廣播var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, "", null, body);
}
  • 消費者(綁定隊列到交換機)
csharp
public void SubscribeToExchange(string exchangeName, string queueName)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, "");var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{var body = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received: {body}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(4) 、路由模式(Routing)

場景:根據 RoutingKey 定向投遞消息到特定隊列。

  • 生產者
csharp
public void SendWithRouting(string exchangeName, string routingKey, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, routingKey, null, body);
}
  • 消費者
csharp
public void ReceiveWithRouting(string exchangeName, string queueName, string routingKey)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, routingKey);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(5) 、主題模式(Topic)

場景:使用通配符(*、#)匹配 RoutingKey,實現靈活路由。
*(星號)匹配單個單詞
#(井號)匹配多個單詞

  • 生產者
csharp
public void SendWithTopic(string exchangeName, string topic, string message)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish(exchangeName, topic, null, body);
}
  • 消費者
csharp
public void ReceiveWithTopic(string exchangeName, string queueName, string topicPattern)
{_channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);_channel.QueueBind(queueName, exchangeName, topicPattern);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{Console.WriteLine($"Received {ea.RoutingKey}: {Encoding.UTF8.GetString(ea.Body.Span)}");return Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: true, consumer);
}
(6)、 RPC(遠程過程調用)

場景:客戶端發送請求并等待服務端響應(同步通信)。

  • 客戶端(RPC Client)
public string Call(string message, string queueName = "rpc_queue")
{var correlationId = Guid.NewGuid().ToString();var replyQueueName = _channel.QueueDeclare().QueueName;var properties = _channel.CreateBasicProperties();properties.ReplyTo = replyQueueName;properties.CorrelationId = correlationId;var body = Encoding.UTF8.GetBytes(message);_channel.BasicPublish("", queueName, properties, body);var tcs = new TaskCompletionSource<string>();var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{if (ea.BasicProperties.CorrelationId == correlationId){var response = Encoding.UTF8.GetString(ea.Body.Span);tcs.SetResult(response);}};_channel.BasicConsume(replyQueueName, autoAck: true, consumer);return tcs.Task.Result; // 同步等待響應(生產環境建議用異步)
}
  • 服務端(RPC Server)
csharp
public void StartRpcServer(string queueName = "rpc_queue")
{_channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false);var consumer = new AsyncEventingBasicConsumer(_channel);consumer.Received += async (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($"Received RPC call: {message}");var response = $"Response to: {message}";var responseBytes = Encoding.UTF8.GetBytes(response);var properties = ea.BasicProperties;var replyProps = _channel.CreateBasicProperties();replyProps.CorrelationId = properties.CorrelationId;_channel.BasicPublish("",properties.ReplyTo,replyProps,responseBytes);_channel.BasicAck(ea.DeliveryTag, false);await Task.CompletedTask;};_channel.BasicConsume(queueName, autoAck: false, consumer);
}

三. 完整示例(.NET 8 Worker Service)

  • 生產者項目
// Program.cs
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();var rabbitMQ = new RabbitMQService();
rabbitMQ.SendMessage("hello_queue", "Hello, RabbitMQ!");app.Run();
  • 消費者項目(Worker Service)
csharp
// Program.cs
IHost host = Host.CreateDefaultBuilder(args).ConfigureServices(services =>{services.AddHostedService<Worker>();}).Build();await host.RunAsync();// Worker.cs
public class Worker : BackgroundService
{private readonly RabbitMQService _rabbitMQ;public Worker(){_rabbitMQ = new RabbitMQService();}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_rabbitMQ.ReceiveMessages("hello_queue");while (!stoppingToken.IsCancellationRequested){await Task.Delay(1000, stoppingToken);}}
}
四. 總結

| 模式 | 適用場景 | 關鍵點 |
| 簡單隊列 | 一對一消息傳遞 | QueueDeclare + BasicPublish |
| 工作隊列 | 任務分發(競爭消費) | 多個消費者監聽同一隊列 |
| 發布/訂閱 | 廣播消息 | ExchangeType.Fanout |
| 路由模式 | 定向路由 | ExchangeType.Direct + RoutingKey |
| 主題模式 | 靈活匹配路由 | ExchangeType.Topic + */# |
| RPC | 同步請求-響應 | ReplyTo + CorrelationId |

推薦實踐
連接管理:使用 IHostedService 或單例模式管理 IConnection 和 IModel。
異常處理:監聽 Connection.ConnectionShutdown 事件并重連。
性能優化:啟用 DispatchConsumersAsync = true 支持異步消費。
高級封裝:考慮使用 MassTransit 或 EasyNetQ 簡化開發。
通過以上模式,可以靈活應對 異步任務處理、事件驅動架構、微服務通信 等場景。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/918450.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/918450.shtml
英文地址,請注明出處:http://en.pswp.cn/news/918450.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker 鏡像常見標簽(如 `標準`、`slim`、`alpine` 和 `noble`)詳細對比

以下是 Docker 鏡像常見標簽&#xff08;如 標準、slim、alpine 和 noble&#xff09;的詳細對比&#xff0c;涵蓋基礎系統、體積、適用場景及注意事項&#xff1a;1. 標準鏡像&#xff08;無后綴&#xff09; 基礎系統&#xff1a;完整 Linux 發行版&#xff08;如 Debian、Ub…

(1-9-2)Java 工廠模式

目錄 1.設計模式與分類 2. 工廠模式 2.1 工廠模式概述 2.2 簡單工廠 2.3 學生推薦就業Demo 00.項目目錄 01. 創建抽象接口Job 02. 創建實體類 03. 創建推薦工作工廠類 04. 創建學生推薦就業客戶類 3. i18n國際化語言標題自適應 00. 實現效果 01. 創建抽象接口 02…

QT遠程開發技巧

交叉編譯時野火提供的文件 交叉編譯器 GNU官網可以下載, QT庫文件這里給的QT-everyWhere版本,是開源版本需要自行編譯。(遠程服務器通常是PC架構【AMD64】,直接apt-get install下載的qt也是Pc架構的,不能直接在板卡【ARM64】上運行,必須對源碼交叉編譯) 注意QT遵從GPL開源…

Linux操作系統從入門到實戰(十九)進程狀態

Linux操作系統從入門到實戰&#xff08;十九&#xff09;進程狀態前言一、什么是進程狀態二、狀態本質三、最核心的3種狀態1. 就緒狀態2. 運行狀態3. 阻塞狀態四、狀態變化的核心1/兩種資源如何影響狀態&#xff1f;五、操作系統怎么管理這些狀態&#xff1f;六、Linux里結構體…

容器技術之docker

容器技術之Docker一、什么是Docker二、為什么會出現Docker2.1 環境一致性問題2.2 虛擬化技術的局限性2.3 微服務架構的興起三、重要概念3.1 什么是鏡像3.2 什么是容器3.3 總結&#xff1a;3.4 Docker倉庫四、安裝Docker&#xff08;Ubuntu系統下&#xff09;1.卸載老的版本2.更…

數據結構與算法:樹狀數組

前言 太難了…… 一、樹狀數組使用場景 樹狀數組一般用來維護可差分的信息&#xff0c;比如累加和&#xff0c;累乘積等。舉個例子&#xff0c;當整個數組的累加和為sum1&#xff0c;一個區間內的累加和為sum2&#xff0c;那么除了這個區間剩下部分的累加和就是sum1-sum2&am…

“一車一碼一池一充”:GB 17761-2024新國標下電動自行車的安全革命

2025年9月1日&#xff0c;電動自行車行業將迎來一場深刻變革。隨著強制性國家標準GB 17761-2024《電動自行車安全技術規范》的全面實施&#xff0c;我國超3.5億電動自行車用戶的安全出行將獲得全新的技術保障。在這場安全升級中&#xff0c;“一車一碼一池一充”的全鏈條管控機…

QT聊天項目DAY18

1.文件傳輸1.1 客戶端采用分塊傳輸(20MB/塊)&#xff0c;以及MD5碼校驗并將讀出的二進制數據采用Base64編碼進行傳輸1.1.0 通信協議1.1.1 UI采用垂直布局&#xff0c;該布局大小為570 * 160&#xff0c;間隔全是0&#xff0c;UI方面不詳細介紹了1.1.2 MainWindow頭文件#ifndef …

centos系統sglang單節點本地部署大模型

前置工作 本地部署大模型的基本概念和前置工作-CSDN博客 模型部署 這里通過docker容器進行部署。我這里是h20*8,部署deepseek-v3-0324,這個配置和模型都比較大,大家根據自己的硬件對應調整 步驟一 我們要通過sglang部署模型,先拉取sglang的docker鏡像,這里下載失敗的…

【dij算法/最短路/分層圖】P4568 [JLOI2011] 飛行路線

題目描述 Alice 和 Bob 現在要乘飛機旅行&#xff0c;他們選擇了一家相對便宜的航空公司。該航空公司一共在 nnn 個城市設有業務&#xff0c;設這些城市分別標記為 000 到 n?1n-1n?1&#xff0c;一共有 mmm 種航線&#xff0c;每種航線連接兩個城市&#xff0c;并且航線有一定…

告別傳統,CVPR三論文用GNN動態圖重塑視覺AI

本文選自gongzhonghao【圖靈學術SCI論文輔導】關注我們&#xff0c;掌握更多頂會頂刊發文資訊今天&#xff0c;為大家推薦一個極具前沿價值與實用潛力的研究方向&#xff1a;圖神經網絡&#xff08;GNN&#xff09;。作為深度學習領域的新興力量&#xff0c;圖神經網絡在近年頂…

HTTP/HTTPS代理,支持RSA和SM2算法

在日常工作和學習中&#xff0c;我們經常遇到HTTP和HTTPS的相關問題&#xff0c;要解決這些問題&#xff0c;有時就需要搭建各種實驗環境&#xff0c;重現業務場景&#xff0c;比如&#xff1a; 將HTTP轉為HTTPS。本地只能發送HTTP請求&#xff0c;但是遠程服務器卻只能接收HT…

如何提高AI寫作論文的查重率?推薦七個AI寫作論文工具

隨著AI技術在學術領域的廣泛應用&#xff0c;越來越多的學生和研究人員開始使用AI寫作工具來提高寫作效率&#xff0c;幫助完成畢業論文、科研論文等。然而&#xff0c;AI生成的內容是否會提高論文的查重率&#xff1f;是否能有效避免重復和提高通過率&#xff1f;這些問題成為…

跨平臺、低延遲、可嵌入:實時音視頻技術在 AI 控制系統中的進化之路

引言&#xff1a;面向未來的實時音視頻基座 在萬物互聯與智能化加速落地的時代&#xff0c;實時音視頻技術早已不再只是社交娛樂的附屬功能&#xff0c;而是智慧城市、應急指揮、遠程操控、工業智造、教育培訓、安防監控等系統的“神經中樞”。一條高性能、可控、低延遲的視頻…

Spring WebFlux開發指導

Spring WebFlux是一個響應式的web服務器端應用開發框架&#xff0c;響應式是指&#xff0c;當前端組件的狀態發生變化&#xff0c;則生成事件通知&#xff0c;根據需求可異步或者同步地向服務器端接口發送請求&#xff0c;當服務器端網絡IO組件的狀態發生變化&#xff0c;則生成…

09-docker鏡像手動制作

文章目錄一.手動制作單服務的nginx鏡像1.啟動一個基礎容器&#xff0c;此處我使用的是centos7鏡像。2.修改容器中的軟件源3.安裝nginx服務并啟動nginx服務4.修復nginx的首頁文件5.退出容器6.將退出的容器提交為鏡像7.測試鏡像的可用性二.手動制作多服務的nginx sshd鏡像1.啟用…

Android.mk教程

語法 Android.mk 的必備三行 LOCAL_PATH : $(call my-dir) # Android.mk的目錄&#xff0c;call調用函數include $(CLEAR_VARS) # 除了LOCAL_PATH清除所有LOCAL_XXXinclude $(BUILD_SHARED_LIBRARY) # BUILD_XXX, 指定構建類型 # BUILD_SHARED_LIBRARY → .so動態庫 # BUILD…

稠密檢索:基于神經嵌入的高效語義搜索范式

本文由「大千AI助手」原創發布&#xff0c;專注用真話講AI&#xff0c;回歸技術本質。拒絕神話或妖魔化。搜索「大千AI助手」關注我&#xff0c;一起撕掉過度包裝&#xff0c;學習真實的AI技術&#xff01; 1. 背景與定義 稠密檢索&#xff08;Dense Retrieval&#xff09;是一…

AI日報0807 | GPT-5或今晚1點來襲:四大版本全曝光

關注&#xff1a;未來世界2099每日分享&#xff1a;全球最新AI資訊【應用商業技術其他】服務&#xff1a;【學習Q】【資源Q】【學習資料】【行業報告】&#xff08;無限免費下載&#xff09;應用 1、訊飛星火代碼畫布震撼上線&#xff1a;動嘴就能開發&#xff0c;工作效率翻倍…

認識爬蟲 —— 正則表達式提取

本質是對字符串的處理&#xff0c;正則表達式描述的是一種字符串匹配的模式。簡而言之&#xff0c;用具備一定特征意義的表達式對字符串進行檢查&#xff0c;將符合條件的子字符串提取出來。導入模塊import re一、單字符匹配match(表達式&#xff0c;匹配對象)&#xff1a;匹配…