C# .NET Framework 中的高效 MQTT 消息傳遞

介紹:

在當今互聯互通的世界里,設備之間高效可靠的通信至關重要。MQTT(消息隊列遙測傳輸)就是為此而設計的輕量級消息傳遞協議。本文將探討 MQTT 是什么、它的優勢以及如何在 .NET 框架中設置和實現它。最后,您將對 MQTT 有一個清晰的了解,并通過一個實際示例來幫助您入門。

1.什么是MQTT?

定義和概述:MQTT 代表消息隊列遙測傳輸 (Message Queuing Telemetry Transport)。它是一種輕量級的發布-訂閱網絡協議,用于在設備之間傳輸消息。對于需要較少代碼占用空間或網絡帶寬有限的遠程位置的連接,MQTT 非常有用。

MQTT的優點:

?? ?? 低帶寬使用率:旨在最大限度地減少網絡帶寬使用率,使其成為資源受限環境的理想選擇。

?? ?? 高效的消息傳遞:確保以不同的服務質量 (QoS) 級別可靠地傳遞消息。

?? ?? 可擴展性:適用于小型到大型實施,無縫處理數千臺設備。

?? ?? 易于使用:易于實施并與各種平臺集成。

與傳統方法的比較:

?? ?? 沒有 MQTT:HTTP 等傳統通信方法更繁重且效率更低,尤其是對于物聯網應用而言。

?? ?? 使用 MQTT:提供輕量、高效、可靠的消息傳遞機制,提高性能并減少延遲。

2. 在 .NET Framework 中設置 MQTT

分步指南:

?? ?安裝 MQTT 包:

?? ??? ?1、在 Visual Studio 中打開您的 .NET 項目。

?? ??? ?2、在解決方案資源管理器中右鍵單擊您的項目并選擇“管理 NuGet 包”。

?? ??? ?3、搜索并安裝 M2Mqtt 包。

編寫 MQTT 客戶端代碼:創建一個新類來處理 MQTT 客戶端操作。初始化 MQTT 客戶端,配置連接選項,并連接到代理。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Exceptions;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace MQTT
{
? ? public class MqttManager
? ? {
? ? ? ? #region Observer Pattern
? ? ? ? private HashSet<string> topics = new HashSet<string>();
? ? ? ? private HashSet<IMqttListener> listeners = new HashSet<IMqttListener>();
? ? ? ? private readonly ushort keepAlivePeriod = 30;

? ? ? ? private readonly string BrokerIp;
? ? ? ? private readonly int? BrokerPort = null;
? ? ? ? private string clientId;

? ? ? ? private string Username = null;
? ? ? ? private string Password = null;

? ? ? ? private MqttClient mqttClient;

? ? ? ? public MqttManager(string brokerIp, int? brokerPort = null, string username = null, string password = null)
? ? ? ? {
? ? ? ? ? ? BrokerIp = brokerIp;
? ? ? ? ? ? BrokerPort = brokerPort;
? ? ? ? ? ? Username = username;
? ? ? ? ? ? Password = password;

? ? ? ? ? ? clientId = RandomClientId();
? ? ? ? ? ? Debug.WriteLine($"mqttClient clientId: {clientId}");

? ? ? ? ? ? InitMQTT();
? ? ? ? }

? ? ? ? public void Register(string topic, IMqttListener listener)
? ? ? ? {
? ? ? ? ? ? if (!topics.Contains(topic))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? Subscribe(topic);
? ? ? ? ? ? }

? ? ? ? ? ? listeners.Add(listener);
? ? ? ? }

? ? ? ? public void UnRegister(IMqttListener listener)
? ? ? ? {
? ? ? ? ? ? listeners.Remove(listener);
? ? ? ? }

? ? ? ? public void NotifyListeners(string topic, string message)
? ? ? ? {
? ? ? ? ? ? foreach (IMqttListener listener in listeners)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? listener.OnMqttMessage(topic, message);
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? #endregion

? ? ? ? #region Init Mqtt ? ? ? ?

? ? ? ? async private void InitMQTT()
? ? ? ? {
? ? ? ? ? ? Debug.WriteLine($"mqttClient.Try connecting to brokerIp: {BrokerIp}");
? ? ? ? ? ? mqttClient = new MqttClient(BrokerIp);
? ? ? ? ? ? mqttClient.MqttMsgPublishReceived += (sender, e) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? string topic = e.Topic;
? ? ? ? ? ? ? ? string message = Encoding.UTF8.GetString(e.Message);

? ? ? ? ? ? ? ? Debug.WriteLine($"mqttClient.MessageReceived. {topic} -> {message}");

? ? ? ? ? ? ? ? NotifyListeners(topic, message);
? ? ? ? ? ? };

? ? ? ? ? ? var connectToBroker = new Func<Task>(() =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? return Task.Run(() =>
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? byte resultCode = mqttClient.Connect(clientId, Username, Password, true, keepAlivePeriod);

? ? ? ? ? ? ? ? ? ? if (resultCode == 0)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? foreach (var topic in topics)
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? Subscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
? ? ? ? ? ? });

? ? ? ? ? ? mqttClient.ConnectionClosed += async (s, e) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));

? ? ? ? ? ? ? ? while (!mqttClient.IsConnected)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? await connectToBroker();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? Console.WriteLine($"An error occurred: {ex.Message}");
? ? ? ? ? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? };

? ? ? ? ? ? while (!mqttClient.IsConnected)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? await connectToBroker();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? Console.WriteLine($"An error occurred: {ex.Message}");
? ? ? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }

? ? ? ? ? ? Console.WriteLine("Connected to Broker Successfully!");
? ? ? ? }

? ? ? ? async private void Subscribe(string topic)
? ? ? ? {
? ? ? ? ? ? Debug.WriteLine($"mqttClient.TryToSubscribe -> {topic}");
? ? ? ? ? ? mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
? ? ? ? ? ? topics.Add(topic);
? ? ? ? }

? ? ? ? private string RandomClientId()
? ? ? ? {
? ? ? ? ? ? return $"Server-{Guid.NewGuid()}";
? ? ? ? }
? ? }

? ? public interface IMqttListener
? ? {
? ? ? ? void OnMqttMessage(string topic, string message);
? ? }
}

3.示例實現:處理消息

場景:實現 MQTT 客戶端來訂閱主題并處理傳入的消息。

詳細步驟:

?? ?? 訂閱主題:修改客戶端代碼以訂閱特定主題。

?? ?? 處理傳入消息:使用事件處理程序來處理傳入消息。

public class BoostMqtt : Singleton<BoostMqtt>, IMqttListener
{
? ? public readonly MqttManager MqttManager;
? ? private const string TOPIC = "example_topic";
? ? public static ConcurrentQueue<MqttMessage> MqttMessageQueue = new ConcurrentQueue<MqttMessage>();

? ? private BoostMqtt()
? ? {
? ? ? ? string brokerIp = GetBrokerIp();
? ? ? ? int? brokerPort = GetBrokerPort();
? ? ? ? (string username, string password) = GetUsernamePassword();

? ? ? ? MqttManager = new MqttManager(brokerIp, brokerPort, username, password);
? ? ? ? MqttManager.Register(TOPIC, this);

? ? ? ? StartProcessingMqttMessagesFromQueue();
? ? }

? ? ~BoostMqtt()
? ? {
? ? ? ? MqttManager?.UnRegister(this);
? ? }

? ? private string GetBrokerIp()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? return ConfigurationManager.AppSettings["MqttBrokerIpAddress"];
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? return string.Empty;
? ? ? ? }
? ? }

? ? private int? GetBrokerPort()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? if (int.TryParse(ConfigurationManager.AppSettings["MqttBrokerPort"], out int port))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? return port;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? }
? ? ? ? return null;
? ? }

? ? private (string, string) GetUsernamePassword()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? string username = ConfigurationManager.AppSettings["MqttUsername"];
? ? ? ? ? ? string password = ConfigurationManager.AppSettings["MqttPassword"];
? ? ? ? ? ? return (username, password);
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? return (null, null);
? ? ? ? }
? ? }

? ? public void StartProcessingMqttMessagesFromQueue()
? ? {
? ? ? ? int numThreads = Environment.ProcessorCount;

? ? ? ? for (int i = 0; i < numThreads; i++)
? ? ? ? {
? ? ? ? ? ? var thread = new Thread(() =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? while (true)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? if (MqttMessageQueue.TryDequeue(out MqttMessage message))
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ProcessMqttMessage(message.Topic, message.Message);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? catch (Exception e)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? Thread.Sleep(1);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? ? ? thread.Start();
? ? ? ? }
? ? }

? ? public void ProcessMqttMessage(string topic, string original_message)
? ? {
? ? ? ? if (topic.Contains("example_topic"))
? ? ? ? {
? ? ? ? ? ? // Process the message here
? ? ? ? }
? ? }
}

4. 比較 HiveMQ、RabbitMQ、Kafka 和 MQTT

????? HiveMQ:針對物聯網進行了優化,具有廣泛的監控和管理功能,非常適合大規模工業和商業物聯網部署。

????? RabbitMQ:提供多種協議和復雜路由功能的靈活性,適用于企業消息傳遞和微服務架構。

????? Kafka:擅長高吞吐量、實時數據處理,適用于大數據和事件流應用。

????? MQTT:輕量級且高效,適用于低帶寬、高延遲環境,是物聯網和移動應用的理想選擇。

結論:

?? ?本文介紹了 MQTT 的基礎知識、優勢以及如何在 .NET 框架中設置它。我們還演示了一個使用 MQTT 處理消息的實際示例。利用 MQTT 輕量級且高效的協議,您可以顯著改善應用程序中設備之間的通信。

如果您喜歡此文章,請收藏、點贊、評論,謝謝,祝您快樂每一天。?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/86930.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/86930.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/86930.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

nn.Embedding 和 word2vec 的區別

理解它們的關鍵在于??區分概念層級和職責??。 可以將它們類比為&#xff1a; ??word2vec&#xff1a;?? 一個??專門制作高質量詞向量模型的“工廠”??。??nn.Embedding&#xff1a;?? 一個??可存儲、查找并訓練詞向量的“智能儲物柜”??&#xff08;作為…

華為云Flexus+DeepSeek征文|??華為云ModelArts Studio大模型 + WPS:AI智能PPT生成解決方案?

引言&#xff1a;告別繁瑣PPT制作&#xff0c;AI賦能高效辦公 ?? 在商業匯報、學術研究、產品發布等場景中&#xff0c;制作專業PPT往往需要耗費大量時間進行內容整理、邏輯梳理和視覺美化。??華為云ModelArts Studio大模型??與??WPS??深度結合&#xff0c;推出AI-P…

【連接redis超時】

報錯 客戶端輸出緩沖區超限 Client … scheduled to be closed ASAP for overcoming of output buffer limits 表示這些客戶端&#xff08;通過 psubscribe 命令進行發布訂閱操作&#xff09;的輸出緩沖區超過了 Redis 配置的限制&#xff0c;Redis 會關閉這些客戶端連接來避免…

PHP「Not enough Memory」實戰排錯筆記

目錄 PHP「Not enough Memory」實戰排錯筆記 1. 背景 2. 快速定位 3. 為什么 5 MB 的圖片能耗盡 128 MB&#xff1f; 3.1 粗略估算公式&#xff08;GD&#xff09; 4. 實際峰值監控 5. 解決過程 6. 最佳實踐與防御措施 7. 總結 PHP「Not enough Memory」實戰排錯筆記 —…

Java垃圾回收機制和三色標記算法

一、對象內存回收 對于對象回收&#xff0c;需要先判斷垃圾對象&#xff0c;然后收集垃圾。 收集垃圾采用垃圾收集算法和垃圾收集器。 判斷垃圾對象&#xff0c;通常采用可達性分析算法。 引用計數法 每個對象設置一個引用計數器。每被引用一次&#xff0c;計數器就加1&am…

基于python網絡數據挖掘的二手房推薦系統

基于網絡數據挖掘的二手房推薦系統設計與實現 【摘要】 隨著互聯網技術在房地產行業的深入應用&#xff0c;線上房源信息呈爆炸式增長&#xff0c;給購房者帶來了信息過載的挑戰。為了提升二手房篩選的效率與精準度&#xff0c;本文設計并實現了一個基于網絡數據挖掘的二手房推…

Java + 阿里云 Gmsse 實現 SSL 國密通信

前言 解決接口或頁面僅密信瀏覽器&#xff08;或 360 國密瀏覽器&#xff09;能訪問的問題 測試頁面 測試網站-中國銀行&#xff1a;https://ebssec.boc.cn/boc15/help.html 使用其他瀏覽器&#xff08;google&#xff0c;edge等&#xff09;打開 使用密信瀏覽器打開 解決…

國產數據庫分類總結

文章目錄 一、華為系數據庫1. 華為 GaussDB 二、阿里系數據庫1. 阿里云 OceanBase2. PolarDB&#xff08;阿里云自研&#xff09; 三、騰訊系數據庫1. TDSQL&#xff08;騰訊云&#xff09;2. TBase&#xff08;PostgreSQL增強版&#xff09; 四、傳統國產數據庫1. 達夢數據庫&…

解密閉包:函數如何記住外部變量

&#x1f9e0; 什么是閉包&#xff1f; 閉包是一個函數對象&#xff0c;它不僅記住它的代碼邏輯&#xff0c;還記住了定義它時的自由變量&#xff08;即非全局也非局部&#xff0c;但被內部函數引用的變量&#xff09;。即使外部函數已經執行完畢&#xff0c;這些自由變量的值…

I2C協議詳解及STM32 HAL庫硬件I2C卡死問題分析

一、I2C協議詳解 1. I2C協議概述 Inter-Integrated Circuit (I2C) 是由 Philips 半導體&#xff08;現 NXP 半導體&#xff09;于 1980 年代設計的一種同步串行通信總線協議。該協議采用半雙工通信模式&#xff0c;支持多主從架構&#xff0c;專為短距離、低速率的芯片間通信…

HTTP協議-后端接收請求

起因就是不知道post這個請求體中這些格式有什么區別&#xff0c;后端又怎么去接收這些不同格式的內容 Get請求 get請求是比較簡單的一類 正常的直接用參數接收&#xff08;不寫的話名字要匹配&#xff09;或者RequestParam都可以接收&#xff0c;用對象綁定也可以 resultful…

HTML5 實現的圣誕主題網站源碼,使用了 HTML5 和 CSS3 技術,界面美觀、節日氛圍濃厚。

以下是一個 HTML5 實現的圣誕主題網站源碼&#xff0c;使用了 HTML5 和 CSS3 技術&#xff0c;界面美觀、節日氛圍濃厚。它包括&#xff1a; 圣誕樹動畫 &#x1f384;雪花飄落特效 ??圣誕祝福語 &#x1f381;響應式布局&#xff0c;適配移動端 你可以將代碼保存為 index.…

Spring Cloud Bus 和 Spring Cloud Stream

Spring Cloud Bus 和 Spring Cloud Stream 都是 Spring Cloud 生態中的消息通信組件&#xff0c;但它們的定位和使用場景有顯著區別&#xff1a; 1. Spring Cloud Bus 核心定位&#xff1a;分布式系統的消息廣播&#xff08;配置刷新、事件傳播&#xff09;。 典型場景&#x…

磁懸浮軸承位移信號的高精度估計:卡爾曼濾波算法深度解析

無需位移傳感器,濾波算法如何實現微米級精度? 磁懸浮軸承作為革命性的非接觸式支承技術,憑借無磨損、無需潤滑、高轉速等優勢,在飛輪儲能、高速電機、人工心臟泵和航空航天領域獲得了廣泛應用。其核心控制依賴于對轉子位移信號的高精度實時檢測,傳統電渦流傳感器雖能提供位…

DAY 43 預訓練模型

目錄 一、預訓練的概念 二、 經典的預訓練模型 2.1 CNN架構預訓練模型 2.2 Transformer類預訓練模型 2.3 自監督預訓練模型 三、常見的分類預訓練模型介紹 3.1 預訓練模型的發展史 3.2 預訓練模型的訓練策略 知識點回顧&#xff1a; 預訓練的概念常見的分類預訓練模型圖像…

Redis:事物

&#x1f308; 個人主頁&#xff1a;Zfox_ &#x1f525; 系列專欄&#xff1a;Redis &#x1f525; 什么是事務 Redis的事務和MySQL的事務概念上是類似的.都是把?系列操作綁定成?組.讓這?組能夠批量執?. 但是注意體會Redis的事務和MySQL事務的區別: 弱化的原?性:redi…

CppCon 2018 學習:An allocator is a handle to a heap Lessons learned from std::pmr

“An allocator is a handle to a heap — Lessons learned from std::pmr” 翻譯過來就是&#xff1a;“分配器&#xff08;allocator&#xff09;是對堆&#xff08;heap&#xff09;的一種句柄&#xff08;handle&#xff09;——從 std::pmr 中學到的經驗”。 基礎概念 分…

設備健康實時監測方法演進:從傳感網絡到AI決策樹的工業智能實踐

引言&#xff1a;當設備運維遇上AIoT革命 在工業4.0進程中&#xff0c;?毫秒級設備狀態捕獲能力正成為智能工廠的核心競爭力。傳統監測方法因數據滯后、診斷粗放被詬病&#xff0c;本文將深入探討三大前沿實時監測技術路徑&#xff0c;并揭秘中訊燭龍系統如何通過深度強化學習…

劍指offer53_二叉樹的深度

二叉樹的深度 輸入一棵二叉樹的根結點&#xff0c;求該樹的深度。 從根結點到葉結點依次經過的結點&#xff08;含根、葉結點&#xff09;形成樹的一條路徑&#xff0c;最長路徑的長度為樹的深度。 數據范圍 樹中節點數量 [ 0 , 500 ] [0,500] [0,500]。 樣例 輸入&#…

探秘AI的秘密:leaked-system-prompts

揭秘:揭秘系統提示合集背后的秘密 在當今這個人工智能技術迅速發展的時代,了解和使用大型語言模型(LLM)已成為技術愛好者、開發者和研究人員的共同目標。而作為核心組成部分,系統提示(system prompts)的設計和應用直接影響了LLM的表現和功能。今天, 我們將為大家揭示一…