.NET中RabbitMQ的使用

.NET中RabbitMQ的使用

概述

  MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規范,作為線路層協議,而不是API(例如JMS),AMQP 客戶端能夠無視消息的來源任意發送和接受信息。AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件 (MOM)系統,例如發布/訂閱隊列,沒有作為基本元素實現。AMQP當中有四個概念非常重要(一個虛擬主機持有一組交換機、隊列和綁定):

  1. virtual host,虛擬主機
  2. exchange,交換機
  3. queue,隊列
  4. binding,綁定

  更多理論性東西可以參考(在Windows上安裝Rabbit MQ 指南),針對隊列的講解相當詳細

Window下安裝RabbbitMQ

文件下載安裝

Rabbit MQ 是建立在強大的Erlang OTP平臺上,因此安裝Rabbit MQ的前提是安裝Erlang。通過下面兩個連接下載安裝3.2.3 版本:

  1. 下載并安裝?Erlang OTP For Windows?(vR16B03)
  2. 運行安裝?Rabbit MQ Server Windows Installer?(v3.2.3)

默認安裝的Rabbit MQ 監聽端口是5672。先安裝Erlang OTP后安裝RabbitMQ,安裝方式默認即可,RabbitMQ可以勾選安裝后臺服務、服務啟動和停止等操作。

激活Rabbit MQ's Management Plugin

使用Rabbit MQ 管理插件,可以更好的可視化方式查看Rabbit MQ 服務器實例的狀態,打開CMD命令,cd到安裝目錄(..\rabbitmq_server-3.2.3\sbin)下,輸入下面的命令激活:

rabbitmq-plugins enable rabbitmq_management

要重啟服務才能生效,可以執行

net stop RabbitMQ && net start RabbitMQ

輸入網址,打開監控頁面: ?http://localhost:15672 (默認賬號和密碼:guest 和guest)

配置RabbitMQ用戶權限

RabbitMQ是存在用戶權限的,默認是guest 密碼也是guest,隸屬于Administrator管理員下。現需要配置新用戶和權限,繼續打開CMD命令,cd到安裝目錄sbin下:

用戶操作指令:

復制代碼
::查詢服務狀態
rabbitmqctl status

::列舉虛擬主機列表
rabbitmqctl list_vhosts
::列舉用戶列表
rabbitmqctl list_users

:: 添加用戶和密碼 rabbitmqctl add_user hao abc123:: 設置權限 rabbitmqctl set_permissions yy ".*" ".*" ".*":: 分配用戶組 rabbitmqctl set_user_tags yy administrator
:: 刪除guest用戶
rabbitmqctl delete_user guest
::修改用戶密碼
rabbitmqctl change_password {username} ?{newpassowrd}
復制代碼

?

.NET中RabbitMQ使用

1、Nuget下載RabbitMQ.Client第三方類庫,版本V3.6.5,高版本與.NET Framework 4.5有沖突,RabbitMQ Client地址

2、利用RabbitMQ Clinet類庫編碼(代碼內容有注釋,此處不做詳細解釋,文章后有完整代碼的下載地址)

?

  <1>RabbitMQ的direct類型Exchange

   Producter發送消息代碼:

復制代碼
        /// <summary>/// 連接配置/// </summary>private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory(){HostName ="192.168.1.8",UserName="hao",Password="abc123",Port= 5672};/// <summary>/// 路由名稱/// </summary>const string ExchangeName = "justin.exchange";//隊列名稱const string QueueName = "justin.queue";public static void DirectExchangeSendMsg(){using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);var props = channel.CreateBasicProperties();props.Persistent = true;string vadata = Console.ReadLine();while (vadata != "exit"){var msgBody = Encoding.UTF8.GetBytes(vadata);channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);        Console.WriteLine(string.Format("***發送時間:{0},發送完成,輸入exit退出消息發送",DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));vadata = Console.ReadLine();}                   }}}
復制代碼

   

  Customer接收消息代碼:

?

復制代碼
        /// <summary>/// 連接配置/// </summary>private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() {HostName = "192.168.1.8", UserName = "hao", Password = "abc123", Port = 5672};/// <summary>/// 路由名稱/// </summary>const string ExchangeName = "justin.exchange";//隊列名稱const string QueueName = "justin.queue";public static void DirectAcceptExchange(){using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);while (true){ BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);if (msgResponse != null){var msgBody = Encoding.UTF8.GetString(msgResponse.Body);Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),msgBody));}//BasicGetResult msgResponse2 = channel.BasicGet(QueueName, noAck: false);process message ...//channel.BasicAck(msgResponse2.DeliveryTag, multiple: false);System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));}}}}
復制代碼

  

  但是這種處理速度較慢,因為循環線程等待。高效的接收消息的方式可以使用EventingBasicConsumer進行消息接收處理,修改代碼內容如下:

?

復制代碼
        public static void DirectAcceptExchangeEvent(){using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){//channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);//channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{                        var msgBody = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));};channel.BasicConsume(QueueName, noAck: true, consumer: consumer);//已過時用EventingBasicConsumer代替//var consumer2 = new QueueingBasicConsumer(channel);//channel.BasicConsume(QueueName, noAck: true, consumer: consumer);//var msgResponse = consumer2.Queue.Dequeue(); //blocking//var msgBody2 = Encoding.UTF8.GetString(msgResponse.Body);Console.WriteLine("按任意值,退出程序");Console.ReadKey();}}}
復制代碼

  

  但是有些時候,消費者同一時間沒有能力處理太多的業務,導致分配過來的隊列消息不能及時處理完成,這個時候,我們可以設置BasicQos屬性,告訴Broker同一時間將未處理完成的消息分配其他消費者,所以接收消息的地方需要略做修改,代碼如下:

?

復制代碼
public static void DirectAcceptExchangeTask()
{using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){//channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告訴broker同一時間只處理一個消息//channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var msgBody = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));int dots = msgBody.Split('.').Length - 1;System.Threading.Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");//處理完成,告訴Broker可以服務端可以刪除消息,分配新的消息過來channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};//noAck設置false,告訴broker,發送消息之后,消息暫時不要刪除,等消費者處理完成再說channel.BasicConsume(QueueName, noAck: false, consumer: consumer);Console.WriteLine("按任意值,退出程序");Console.ReadKey();}}
}
復制代碼

  

  <2> RabbitMQ的Topic類型Exchange

  Producter 發送消息代碼:

復制代碼
        /// <summary>/// 連接配置/// </summary>private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory(){HostName ="192.168.1.8",UserName="hao",Password="abc123",Port= 5672};/// <summary>/// 路由名稱/// </summary>const string TopExchangeName = "topic.justin.exchange";//隊列名稱const string TopQueueName = "topic.justin.queue";public static void TopicExchangeSendMsg(){using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);//var props = channel.CreateBasicProperties();//props.Persistent = true;string vadata = Console.ReadLine();while (vadata != "exit"){var msgBody = Encoding.UTF8.GetBytes(vadata);channel.BasicPublish(exchange: TopExchangeName, routingKey: TopQueueName, basicProperties: null, body: msgBody);Console.WriteLine(string.Format("***發送時間:{0},發送完成,輸入exit退出消息發送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));vadata = Console.ReadLine();}}}}
復制代碼

?

  Customer接收消息代碼:

?

復制代碼
        /// <summary>/// 連接配置/// </summary>private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() {HostName = "192.168.1.8", UserName = "hao", Password = "abc123", Port = 5672};/// <summary>/// 路由名稱/// </summary>const string TopExchangeName = "topic.justin.exchange";//隊列名稱const string TopQueueName = "topic.justin.queue";public static void TopicAcceptExchange(){using (IConnection conn = rabbitMqFactory.CreateConnection()){using (IModel channel = conn.CreateModel()){channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var msgBody = Encoding.UTF8.GetString(ea.Body);Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));int dots = msgBody.Split('.').Length - 1;System.Threading.Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};channel.BasicConsume(TopQueueName, noAck: false, consumer: consumer);Console.WriteLine("按任意值,退出程序");Console.ReadKey();}}}
復制代碼

參考資料:

在 Windows 上安裝Rabbit MQ 指南(http://www.cnblogs.com/shanyou/p/4067250.html)

.NET 環境中使用RabbitMQ(http://www.cnblogs.com/yangecnu/p/4227535.html)

RabbitMQ Tutorial(http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html)

源代碼下載

知道的越多,不知道的也就越多,多多學習!

轉載于:https://www.cnblogs.com/Jeely/p/10788484.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/277819.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/277819.shtml
英文地址,請注明出處:http://en.pswp.cn/news/277819.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SQL Server死鎖診斷--同一行數據在不同索引操作下引起的死鎖

死鎖概述 對于數據庫中出現的死鎖&#xff0c;通俗地解釋就是&#xff1a;不同Session&#xff08;會話&#xff09;持有一部分資源&#xff0c;并且同時相互排他性地申請對方持有的資源&#xff0c;然后雙方都得不到自己想要的資源&#xff0c;從而造成的一種僵持的現象。當然…

python下載安裝搭建

python官網下載python運行環境&#xff08;https://www.python.org/downloads/&#xff09;&#xff0c;建議下載穩定版本&#xff0c;不推薦使用最新版本 安裝 然后我們打開CMD&#xff0c;在里面輸入python&#xff0c;就可以直接進入進行編碼了 如果輸入python出現下面錯誤 …

35個Java 代碼性能優化總結

前言代碼優化&#xff0c;一個很重要的課題。可能有些人覺得沒用&#xff0c;一些細小的地方有什么好修改的&#xff0c;改與不改對于代碼的運行效率有什么影響呢&#xff1f;這個問題我是這么考慮的&#xff0c;就像大海里面的鯨魚一樣&#xff0c;它吃一條小蝦米有用嗎&#…

MySQL講義

1 MySQL基礎知識 瑞典MySQL AB公司開發&#xff0c;由SUN收購&#xff0c;而后SUN被甲骨文并購&#xff0c;目前屬于Oracle公司。 MySQL是一種關聯數據庫管理系統 由于其體積小、速度快、總體擁有成本低、MySQL軟件采用了雙授權政策&#xff0c;分為社區版和企業版。 …

Teams Bot App Manifest 文件解析

這篇文章我們繼續以 Hello World Bot 這個 sample 來講一下 manifest template。 實際上在 Teams app 開發的時候&#xff0c;有 manifest 的概念&#xff0c;manifest 是用來說明這個 teams app 的一些基本信息和配置信息&#xff0c;比如 app 的名字&#xff0c;app有哪些能…

[Dart] Flutter開發中的幾個常用函數

幾個Flutter開發中的常用函數 /** 返回當前時間戳 */static int currentTimeMillis() {return new DateTime.now().millisecondsSinceEpoch;}/** 復制到剪粘板 */static copyToClipboard(final String text) {if (text null) return;Clipboard.setData(new ClipboardData(text…

Cordova入門系列(三)Cordova插件調用 轉發 https://www.cnblogs.com/lishuxue/p/6018416.html...

Cordova入門系列&#xff08;三&#xff09;Cordova插件調用 版權聲明&#xff1a;本文為博主原創文章&#xff0c;轉載請注明出處 上一章我們介紹了cordova android項目是如何運行的&#xff0c;這一章我們介紹cordova的核心內容&#xff0c;插件的調用。演示一個例子&#xf…

clojure with postgres

主要關注訪問pg。不關心其他db 1 clojure.java.jdbc https://github.com/clojure/java.jdbchttp://clojure-doc.org/articles/ecosystem/java_jdbc/reusing_connections.html這個最廣&#xff0c;需要配合不同DB[org.clojure/java.jdbc "0.7.9"] [org.postgresql/pos…

lua入門

https://en.blog.nic.cz/2015/08/12/embedding-luajit-in-30-minutes-or-so/

shell腳本傳可選參數 getopts 和 getopt的方法

寫了一個shell腳本&#xff0c;需要向shell腳本中傳參數供腳本使用&#xff0c;達到的效果是傳的參數可以是可選參數 下面是一個常規化的shell腳本&#xff1a; echo "執行的文件名為: $0";echo "第一個參數名為: $1";echo "第二個參數名為: $2"…

Teams Tab App 代碼深入淺出 - 配置頁面

上一篇文章我們使用Teams Toolkit 來創建、運行 tab app。這篇文章我們深入來分析看一下tab app 的代碼。 先打開代碼目錄&#xff0c;可以看到在 src 目錄下有入口文件 index.tsx&#xff0c;然后在 components 目錄下有更多的一些 tsx 文件&#xff0c;tsx 是 typescript的一…

labelme標注的json文件數據轉成coco數據集格式(可處理目標框和實例分割)

這里主要是搬運一下能找到的 labelme標注的json文件數據轉成coco數據集格式&#xff08;可處理目標框和實例分割&#xff09;的代碼&#xff0c;以供需要時參考和提供相關幫助。 1、官方labelme實現 如下是labelme官方網址&#xff0c;提供了源代碼&#xff0c;以及相關使用方…

EpSON TM-82II驅動在POS系統上面安裝問題處理

按照品牌名稱&#xff0c;在網上下載的安裝包為apstmt82.rar 下面講解一下&#xff0c;如何的解決愛普生打印機在POS機器上面的安裝問題&#xff0c;這個算是一個比較奇特的故障問題&#xff0c;不像其它的新北冰洋&#xff08;SN3C&#xff09;的U80_U80II&#xff0c;SeNor的…

打印圖片的屬性和實現另存圖片功能以及使用numpy

上一篇我們已經學了如何讀取圖片的功能了以及和opencv的環境搭建了&#xff0c;今天接著來學習&#xff0c;哈哈哈&#xff0c;今天剛好五一&#xff0c;也沒閑著&#xff0c;繼續學習。 1、 首先我們來實現打印出圖片的一些屬性功能&#xff0c; 先來看一段代碼&#xff1a; 1…

Ubuntu 18.04下命令安裝VMware Tools

2019獨角獸企業重金招聘Python工程師標準>>> sudo apt-get upgrade sudo apt-get install open-vm-tools-desktop -y sudo reboot 轉載于:https://my.oschina.net/u/574036/blog/1829455

phpstorm PHP language level無法選擇

phpstorm PHP7新特性一直提示紅色波浪線&#xff0c;應該是沒有設置PHP 版本&#xff0c;但是打開PHPstorm---preference--lannguage&frameworks--PHP &#xff0c; 發現PHP language level 無法選擇PHP7.2 &#xff0c;查看旁邊的提示信息說是同步了composer 的原因&#…

Qfile

打開方式&#xff1a; 1 void AddStudents::write_to_file(QString src){2 QFile file("stu.txt");3 if (!file.open(QIODevice::Append | QIODevice::Text)){4 QMessageBox::critical(this,"打開文件錯誤","確認");5 r…

多層裝飾器、帶參數裝飾器

# 帶參數的裝飾器 # import time # FLAGE False # 加個標志位&#xff0c;使全部的裝飾器可以失效或有效 # def timmer_out(flag): # def timmer(func): # def inner(*args,**kwargs): # if flag: # start time.time() # …

IDEA svn 菜單不見了,解決方法

2019獨角獸企業重金招聘Python工程師標準>>> 參考地址: http://www.cnblogs.com/signheart/p/193448a98f92bd0cc064dbd772dd9f48.html,我是第二種方法解決的! 轉載于:https://my.oschina.net/liuchangng/blog/1829679

蘇寧易購:Hadoop失寵前提是出現更強替代品

在筆者持續調研國內Hadoop生態系統生存現狀的同時&#xff0c;KDnuggets發布的2018年數據科學和機器學習工具調查報告再次將“Hadoop失寵”言論復活。報告一出&#xff0c;“Hadoop被拋棄”幾個字瞬時成為各大標題黨的最愛&#xff0c;充斥在不同的新聞平臺。這些報告和數據是否…