C#MQTT協議服務器與客戶端通訊實現
- 1 DLL版本
- 2 服務器
- 3 客戶端
1 DLL版本
MQTTnet.DLL版本-2.7.5.0
基于比較老的項目中應用的DLL,其他更高版本變化可能較大,謹慎參考。
2 服務器
開啟服務器
關閉服務器
綁定事件【客戶端連接服務器事件】
綁定事件【客戶端斷開(服務器)連接事件】
綁定事件【客戶端訂閱主題事件】
綁定事件【客戶端退訂主題事件】
綁定事件【接收客戶端(發送)消息事件】
using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;namespace Demo_MQTT.Model
{public class ServerModel{private static MqttServer _mqttServer = null;private readonly Action<string> _callbackLog;public ServerModel(Action<string> callbackLog){_callbackLog = callbackLog;}/// <summary>/// 綁定客戶端連接服務器事件/// </summary>private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e){WriteLog($"客戶端[{e.Client.ClientId}]已連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 綁定客戶端斷開連接事件/// </summary>private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e){WriteLog($"客戶端[{e.Client.ClientId}]已斷開連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 綁定客戶端訂閱主題事件/// </summary>private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e){WriteLog($">>> 客戶端{e.ClientId}訂閱主題{e.TopicFilter.Topic}");}/// <summary>/// 綁定客戶端退訂主題事件/// </summary>/// <param name="e"></param>private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e){WriteLog($">>> 客戶端{e.ClientId}退訂主題{e.TopicFilter}");}/// <summary>/// 綁定接收客戶端消息事件/// </summary>private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){WriteLog($"接收到{e.ClientId}發送來的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 開啟服務器/// </summary>/// <param name="ip">IP地址</param>/// <param name="port">端口號</param>public void StartServer(string ip, int port){if (_mqttServer == null){var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)).WithConnectionBacklog(1000).WithDefaultEndpointPort(port);IMqttServerOptions options = optionsBuilder.Build();try{_mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;_mqttServer.ClientConnected += MqttServer_ClientConnected;_mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;_mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;_mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;_mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;_mqttServer.StartAsync(options);}catch (Exception ex){Console.WriteLine(ex.Message);return;}WriteLog($"MQTT服務器啟動成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}}/// <summary>/// 關閉服務器/// </summary>public void CloseServer(){_mqttServer?.StopAsync();}}
}
3 客戶端
連接服務器
屬性:客戶端連接狀態
客戶端斷開重連線程
獲取所有訂閱主題
訂閱主題
退訂主題
發送消息
綁定事件【客戶端連接服務器事件】
綁定事件【客戶端斷開(服務器)連接事件】
綁定事件【客戶端接收消息事件】
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;namespace Demo_MQTT.Model
{public class ClientModel{/// <summary>/// 記錄所有訂閱主題,用于斷開重連時重新訂閱主題/// </summary>private readonly List<string> _subscribeTopics = new List<string>();private MqttClient _mqttClient = null;private string _serverIp;private int _nServerPort;private bool _isRunningReConnectThreadStart = false;private string _clienID;/// <summary>/// 接受消息回調函數,參數:主題,消息內容/// </summary>private readonly Action<string, byte[]> _callbackReceived;private readonly Action<string> _callbackLog;/// <summary>/// 構造函數/// </summary>/// <param name="callbackReceived">接受消息回調函數,參數:主題,消息內容</param>/// <param name="callbackLog"></param>public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog){_callbackReceived = callbackReceived;_callbackLog = callbackLog;}/// <summary>/// 連接服務器/// </summary>private async void ConnectServer(){try{if (_mqttClient == null){_mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;_mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已連接到MQTT服務器!");_mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已斷開MQTT連接!");_mqttClient.ApplicationMessageReceived += (sender, args) =>{_callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);};}if (_mqttClient.IsConnected) return;IMqttClientOptions options = new MqttClientOptions{ChannelOptions = new MqttClientTcpOptions(){Server = _serverIp,Port = _nServerPort},CleanSession = true};_clienID = options.ClientId;await _mqttClient.ConnectAsync(options);if (_mqttClient.IsConnected){ReConnectThreadStart();SubscribeAsync();}}catch (Exception ex){WriteLog("連接到MQTT服務器失敗!");}}/// <summary>/// 客戶端重連服務器線程-啟動/// </summary>/// <returns></returns>private void ReConnectThreadStart(){if (_isRunningReConnectThreadStart) return;if (_mqttClient != null){new Task(() =>{_isRunningReConnectThreadStart = true;Thread.Sleep(5000);while (true){Thread.Sleep(1000);if (!IsConnect){WriteLog($"客戶端[{_clienID}]斷開連接,嘗試重新連接服務器中...");int i;for (i = 0; i < 60; i++){if (IsConnect) break;WriteLog($"嘗試第{i + 1}次連接服務器");ConnectServer();Thread.Sleep(1000);if (IsConnect) break;}_isRunningReConnectThreadStart = i < 60;}if (!_isRunningReConnectThreadStart) break;}}).Start();}}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 客戶端連接狀態/// </summary>public bool IsConnect => _mqttClient?.IsConnected == true;/// <summary>/// 連接服務器/// </summary>/// <param name="serverIp">服務器IP</param>/// <param name="serverPort">服務器端口</param>/// <param name="topic"></param>public async void ConnectServer(string serverIp, int serverPort){_serverIp = serverIp;_nServerPort = serverPort;await Task.Run(() => { ConnectServer(); });}/// <summary>/// 關閉客戶端,斷開客戶端和服務器的連接/// </summary>public void CloseClient(){_mqttClient.DisconnectAsync();}/// <summary>/// 發送消息/// </summary>/// <param name="topic">發送主題</param>/// <param name="cmd">發送內容</param>[Obsolete("Obsolete")]public void PublishAsync(string topic, string cmd){var bytes = Encoding.UTF8.GetBytes(cmd);var mode = MqttQualityOfServiceLevel.AtMostOnce;var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);_mqttClient.PublishAsync(appMsg);//發送消息}/// <summary>/// 訂閱主題/// </summary>/// <param name="topics">主題標識</param>public void SubscribeAsync(params string[] topics){foreach (var topic in topics){if (!_subscribeTopics.Contains(topic)){_subscribeTopics.Add(topic);}}var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient?.SubscribeAsync(topicFilters);}/// <summary>/// 退訂已訂閱主題/// </summary>/// <param name="topics">主題標識</param>public void UnSubscribeAsync(params string[] topics){if (topics == null || topics.Length == 0) return;var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient.SubscribeAsync(topicFilters);}/// <summary>/// 獲取所有訂閱主題/// </summary>public string[] GetAllTopic => _subscribeTopics.ToArray();}
}