安裝與配置
安裝 RabbitMQ
讀者可以在 RabbitMQ 官方文檔中找到完整的安裝教程:Downloading and Installing RabbitMQ — RabbitMQ
本文使用 Docker 的方式部署。
RabbitMQ 社區鏡像列表:https://hub.docker.com/_/rabbitmq
創建目錄用于映射存儲卷:
mkdir -p /opt/lib/rabbitmq
部署容器:
docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /opt/lib/rabbitmq:/var/lib/rabbitmq \
rabbitmq:3.12.8-management
部署時占用兩個端口。5672 是 MQ 通訊端口,15672 是 Management UI 工具端口。
打開 15672 端口,會進入 Web 登錄頁面,默認賬號密碼都是 guest。
關于 RabbitMQ Management UI 的使用方法,后續再介紹。
打開管理界面后會,在?Exchanges
?菜單中,可以看到如下圖表格。這些是默認的交換器。現在可以不需要了解這些東西,后面會有介紹。
Virtual host | Name | Type | Features |
---|---|---|---|
/ | (AMQP default) | direct | D |
/ | amq.direct | direct | D |
/ | amq.fanout | fanout | D |
/ | amq.headers | headers | D |
/ | amq.match | headers | D |
/ | amq.rabbitmq.trace | topic | D I |
/ | amq.topic | topic | D |
發布與訂閱模型
使用 C# 開發 RabbitMQ,需要使用 nuget 引入 RabbitMQ.Client,官網文檔地址:.NET/C# RabbitMQ Client Library — RabbitMQ
在繼續閱讀文章之前,請先創建一個控制臺程序。
生產者、消費者、交換器、隊列
為了便于理解,本文制作了幾十張圖片,約定一些圖形表示的含義:
對應生產者,使用如下圖表示:
對于消費者,使用如下圖表示:
對于消息隊列,使用如下圖表示:
對于交換器,使用如下圖表示:
在 RabbitMQ 中,生產者發布的消息是不會直接進入到隊列中,而是經過交換器(Exchange) 分發到各個隊列中。前面提到,部署 RabbitMQ 后,默認有 七個交換器,如?(AMQP default)
、amq.direct
?等。
當然,對于現在來說,我們不需要了解交換器,所以,在本節的教程中,會使用默認交換器完成實驗。
在忽略交換器存在的情況下,我們可以將生產和消費的流程簡化如下圖所示:
請一定要注意,圖中省略了交換器的存在,因為使用的是默認的交換器。但是生產者推送消息必須是推送到交換器,而不是隊列,這一句一定要弄清楚。
對于消費者來說,要使用隊列,必須確保隊列已經存在。
ConnectionFactory factory = new ConnectionFactory
{HostName = "localhost"
};// 連接
using IConnection connection = factory.CreateConnection();// 通道
using IModel channel = connection.CreateModel();channel.QueueDeclare(// 隊列名稱queue: "myqueue",// 持久化配置,隊列是否能夠在 broker 重啟后存活durable: false,// 連接關閉時被刪除該隊列exclusive: false,// 當最后一個消費者(如果有的話)退訂時,是否應該自動刪除這個隊列autoDelete: false,// 額外的參數配置arguments: null);
編寫一個消費者,消費該隊列中的消息,其完整代碼如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;ConnectionFactory factory = new ConnectionFactory
{HostName = "localhost"
};using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();channel.QueueDeclare(// 隊列名稱queue: "myqueue",// 持久化配置,隊列是否能夠在 broker 重啟后存活durable: false,// 連接關閉時被刪除該隊列exclusive: false,// 當最后一個消費者(如果有的話)退訂時,是否應該自動刪除這個隊列autoDelete: false,// 額外的參數配置arguments: null);// 定義消費者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($" [x] Received {message}");
};// 開始消費
channel.BasicConsume(queue: "myqueue",autoAck: true,consumer: consumer);Console.ReadLine();