介紹:
在當今互聯互通的世界里,設備之間高效可靠的通信至關重要。MQTT(消息隊列遙測傳輸)就是為此而設計的輕量級消息傳遞協議。本文將探討 MQTT 是什么、它的優勢以及如何在 .NET 框架中設置和實現它。最后,您將對 MQTT 有一個清晰的了解,并通過一個實際示例來幫助您入門。
1.什么是MQTT?
定義和概述:MQTT 代表消息隊列遙測傳輸 (Message Queuing Telemetry Transport)。它是一種輕量級的發布-訂閱網絡協議,用于在設備之間傳輸消息。對于需要較少代碼占用空間或網絡帶寬有限的遠程位置的連接,MQTT 非常有用。
MQTT的優點:
?? ?? 低帶寬使用率:旨在最大限度地減少網絡帶寬使用率,使其成為資源受限環境的理想選擇。
?? ?? 高效的消息傳遞:確保以不同的服務質量 (QoS) 級別可靠地傳遞消息。
?? ?? 可擴展性:適用于小型到大型實施,無縫處理數千臺設備。
?? ?? 易于使用:易于實施并與各種平臺集成。
與傳統方法的比較:
?? ?? 沒有 MQTT:HTTP 等傳統通信方法更繁重且效率更低,尤其是對于物聯網應用而言。
?? ?? 使用 MQTT:提供輕量、高效、可靠的消息傳遞機制,提高性能并減少延遲。
2. 在 .NET Framework 中設置 MQTT
分步指南:
?? ?安裝 MQTT 包:
?? ??? ?1、在 Visual Studio 中打開您的 .NET 項目。
?? ??? ?2、在解決方案資源管理器中右鍵單擊您的項目并選擇“管理 NuGet 包”。
?? ??? ?3、搜索并安裝 M2Mqtt 包。
編寫 MQTT 客戶端代碼:創建一個新類來處理 MQTT 客戶端操作。初始化 MQTT 客戶端,配置連接選項,并連接到代理。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Exceptions;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace MQTT
{
? ? public class MqttManager
? ? {
? ? ? ? #region Observer Pattern
? ? ? ? private HashSet<string> topics = new HashSet<string>();
? ? ? ? private HashSet<IMqttListener> listeners = new HashSet<IMqttListener>();
? ? ? ? private readonly ushort keepAlivePeriod = 30;
? ? ? ? private readonly string BrokerIp;
? ? ? ? private readonly int? BrokerPort = null;
? ? ? ? private string clientId;
? ? ? ? private string Username = null;
? ? ? ? private string Password = null;
? ? ? ? private MqttClient mqttClient;
? ? ? ? public MqttManager(string brokerIp, int? brokerPort = null, string username = null, string password = null)
? ? ? ? {
? ? ? ? ? ? BrokerIp = brokerIp;
? ? ? ? ? ? BrokerPort = brokerPort;
? ? ? ? ? ? Username = username;
? ? ? ? ? ? Password = password;
? ? ? ? ? ? clientId = RandomClientId();
? ? ? ? ? ? Debug.WriteLine($"mqttClient clientId: {clientId}");
? ? ? ? ? ? InitMQTT();
? ? ? ? }
? ? ? ? public void Register(string topic, IMqttListener listener)
? ? ? ? {
? ? ? ? ? ? if (!topics.Contains(topic))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? Subscribe(topic);
? ? ? ? ? ? }
? ? ? ? ? ? listeners.Add(listener);
? ? ? ? }
? ? ? ? public void UnRegister(IMqttListener listener)
? ? ? ? {
? ? ? ? ? ? listeners.Remove(listener);
? ? ? ? }
? ? ? ? public void NotifyListeners(string topic, string message)
? ? ? ? {
? ? ? ? ? ? foreach (IMqttListener listener in listeners)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? listener.OnMqttMessage(topic, message);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? #endregion
? ? ? ? #region Init Mqtt ? ? ? ?
? ? ? ? async private void InitMQTT()
? ? ? ? {
? ? ? ? ? ? Debug.WriteLine($"mqttClient.Try connecting to brokerIp: {BrokerIp}");
? ? ? ? ? ? mqttClient = new MqttClient(BrokerIp);
? ? ? ? ? ? mqttClient.MqttMsgPublishReceived += (sender, e) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? string topic = e.Topic;
? ? ? ? ? ? ? ? string message = Encoding.UTF8.GetString(e.Message);
? ? ? ? ? ? ? ? Debug.WriteLine($"mqttClient.MessageReceived. {topic} -> {message}");
? ? ? ? ? ? ? ? NotifyListeners(topic, message);
? ? ? ? ? ? };
? ? ? ? ? ? var connectToBroker = new Func<Task>(() =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? return Task.Run(() =>
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? byte resultCode = mqttClient.Connect(clientId, Username, Password, true, keepAlivePeriod);
? ? ? ? ? ? ? ? ? ? if (resultCode == 0)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? foreach (var topic in topics)
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? Subscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? });
? ? ? ? ? ? });
? ? ? ? ? ? mqttClient.ConnectionClosed += async (s, e) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));
? ? ? ? ? ? ? ? while (!mqttClient.IsConnected)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? await connectToBroker();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? Console.WriteLine($"An error occurred: {ex.Message}");
? ? ? ? ? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? };
? ? ? ? ? ? while (!mqttClient.IsConnected)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? await connectToBroker();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? catch (Exception ex)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? Console.WriteLine($"An error occurred: {ex.Message}");
? ? ? ? ? ? ? ? ? ? await Task.Delay(TimeSpan.FromSeconds(5));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? Console.WriteLine("Connected to Broker Successfully!");
? ? ? ? }
? ? ? ? async private void Subscribe(string topic)
? ? ? ? {
? ? ? ? ? ? Debug.WriteLine($"mqttClient.TryToSubscribe -> {topic}");
? ? ? ? ? ? mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
? ? ? ? ? ? topics.Add(topic);
? ? ? ? }
? ? ? ? private string RandomClientId()
? ? ? ? {
? ? ? ? ? ? return $"Server-{Guid.NewGuid()}";
? ? ? ? }
? ? }
? ? public interface IMqttListener
? ? {
? ? ? ? void OnMqttMessage(string topic, string message);
? ? }
}
3.示例實現:處理消息
場景:實現 MQTT 客戶端來訂閱主題并處理傳入的消息。
詳細步驟:
?? ?? 訂閱主題:修改客戶端代碼以訂閱特定主題。
?? ?? 處理傳入消息:使用事件處理程序來處理傳入消息。
public class BoostMqtt : Singleton<BoostMqtt>, IMqttListener
{
? ? public readonly MqttManager MqttManager;
? ? private const string TOPIC = "example_topic";
? ? public static ConcurrentQueue<MqttMessage> MqttMessageQueue = new ConcurrentQueue<MqttMessage>();
? ? private BoostMqtt()
? ? {
? ? ? ? string brokerIp = GetBrokerIp();
? ? ? ? int? brokerPort = GetBrokerPort();
? ? ? ? (string username, string password) = GetUsernamePassword();
? ? ? ? MqttManager = new MqttManager(brokerIp, brokerPort, username, password);
? ? ? ? MqttManager.Register(TOPIC, this);
? ? ? ? StartProcessingMqttMessagesFromQueue();
? ? }
? ? ~BoostMqtt()
? ? {
? ? ? ? MqttManager?.UnRegister(this);
? ? }
? ? private string GetBrokerIp()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? return ConfigurationManager.AppSettings["MqttBrokerIpAddress"];
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? return string.Empty;
? ? ? ? }
? ? }
? ? private int? GetBrokerPort()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? if (int.TryParse(ConfigurationManager.AppSettings["MqttBrokerPort"], out int port))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? return port;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? }
? ? ? ? return null;
? ? }
? ? private (string, string) GetUsernamePassword()
? ? {
? ? ? ? try
? ? ? ? {
? ? ? ? ? ? string username = ConfigurationManager.AppSettings["MqttUsername"];
? ? ? ? ? ? string password = ConfigurationManager.AppSettings["MqttPassword"];
? ? ? ? ? ? return (username, password);
? ? ? ? }
? ? ? ? catch (Exception e)
? ? ? ? {
? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? return (null, null);
? ? ? ? }
? ? }
? ? public void StartProcessingMqttMessagesFromQueue()
? ? {
? ? ? ? int numThreads = Environment.ProcessorCount;
? ? ? ? for (int i = 0; i < numThreads; i++)
? ? ? ? {
? ? ? ? ? ? var thread = new Thread(() =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? while (true)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? try
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? if (MqttMessageQueue.TryDequeue(out MqttMessage message))
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ProcessMqttMessage(message.Topic, message.Message);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? catch (Exception e)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? Logging.Instance.WriteErrorToLog(e);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? Thread.Sleep(1);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? ? ? thread.Start();
? ? ? ? }
? ? }
? ? public void ProcessMqttMessage(string topic, string original_message)
? ? {
? ? ? ? if (topic.Contains("example_topic"))
? ? ? ? {
? ? ? ? ? ? // Process the message here
? ? ? ? }
? ? }
}
4. 比較 HiveMQ、RabbitMQ、Kafka 和 MQTT
????? HiveMQ:針對物聯網進行了優化,具有廣泛的監控和管理功能,非常適合大規模工業和商業物聯網部署。
????? RabbitMQ:提供多種協議和復雜路由功能的靈活性,適用于企業消息傳遞和微服務架構。
????? Kafka:擅長高吞吐量、實時數據處理,適用于大數據和事件流應用。
????? MQTT:輕量級且高效,適用于低帶寬、高延遲環境,是物聯網和移動應用的理想選擇。
結論:
?? ?本文介紹了 MQTT 的基礎知識、優勢以及如何在 .NET 框架中設置它。我們還演示了一個使用 MQTT 處理消息的實際示例。利用 MQTT 輕量級且高效的協議,您可以顯著改善應用程序中設備之間的通信。
如果您喜歡此文章,請收藏、點贊、評論,謝謝,祝您快樂每一天。?