C#MQTT協議服務器與客戶端通訊實現(客戶端包含斷開重連模塊)

C#MQTT協議服務器與客戶端通訊實現

  • 1 DLL版本
  • 2 服務器
  • 3 客戶端

1 DLL版本

MQTTnet.DLL版本-2.7.5.0
基于比較老的項目中應用的DLL,其他更高版本變化可能較大,謹慎參考。

2 服務器

開啟服務器
關閉服務器
綁定事件【客戶端連接服務器事件】
綁定事件【客戶端斷開(服務器)連接事件】
綁定事件【客戶端訂閱主題事件】
綁定事件【客戶端退訂主題事件】
綁定事件【接收客戶端(發送)消息事件】

using System;
using System.Net;
using MQTTnet;
using MQTTnet.Server;namespace Demo_MQTT.Model
{public class ServerModel{private static MqttServer _mqttServer = null;private readonly Action<string> _callbackLog;public ServerModel(Action<string> callbackLog){_callbackLog = callbackLog;}/// <summary>/// 綁定客戶端連接服務器事件/// </summary>private void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e){WriteLog($"客戶端[{e.Client.ClientId}]已連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 綁定客戶端斷開連接事件/// </summary>private void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e){WriteLog($"客戶端[{e.Client.ClientId}]已斷開連接 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}/// <summary>/// 綁定客戶端訂閱主題事件/// </summary>private void Server_ClientSubscribedTopic(object sensor, MqttClientSubscribedTopicEventArgs e){WriteLog($">>> 客戶端{e.ClientId}訂閱主題{e.TopicFilter.Topic}");}/// <summary>/// 綁定客戶端退訂主題事件/// </summary>/// <param name="e"></param>private void Server_ClientUnsubscribedTopic(object sensor, MqttClientUnsubscribedTopicEventArgs e){WriteLog($">>> 客戶端{e.ClientId}退訂主題{e.TopicFilter}");}/// <summary>/// 綁定接收客戶端消息事件/// </summary>private void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e){WriteLog($"接收到{e.ClientId}發送來的消息! {DateTime.Now:yyyy-MM-dd HH:mm:ss} {Environment.NewLine}");}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 開啟服務器/// </summary>/// <param name="ip">IP地址</param>/// <param name="port">端口號</param>public void StartServer(string ip, int port){if (_mqttServer == null){var optionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)).WithConnectionBacklog(1000).WithDefaultEndpointPort(port);IMqttServerOptions options = optionsBuilder.Build();try{_mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;_mqttServer.ClientConnected += MqttServer_ClientConnected;_mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;_mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;_mqttServer.ClientSubscribedTopic += Server_ClientSubscribedTopic;_mqttServer.ClientUnsubscribedTopic += Server_ClientUnsubscribedTopic;_mqttServer.StartAsync(options);}catch (Exception ex){Console.WriteLine(ex.Message);return;}WriteLog($"MQTT服務器啟動成功 {DateTime.Now:yyyy-MM-dd HH:mm:ss}{Environment.NewLine}");}}/// <summary>/// 關閉服務器/// </summary>public void CloseServer(){_mqttServer?.StopAsync();}}
}

3 客戶端

連接服務器
屬性:客戶端連接狀態
客戶端斷開重連線程
獲取所有訂閱主題
訂閱主題
退訂主題
發送消息
綁定事件【客戶端連接服務器事件】
綁定事件【客戶端斷開(服務器)連接事件】
綁定事件【客戶端接收消息事件】

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;namespace Demo_MQTT.Model
{public class ClientModel{/// <summary>/// 記錄所有訂閱主題,用于斷開重連時重新訂閱主題/// </summary>private readonly List<string> _subscribeTopics = new List<string>();private MqttClient _mqttClient = null;private string _serverIp;private int _nServerPort;private bool _isRunningReConnectThreadStart = false;private string _clienID;/// <summary>/// 接受消息回調函數,參數:主題,消息內容/// </summary>private readonly Action<string, byte[]> _callbackReceived;private readonly Action<string> _callbackLog;/// <summary>/// 構造函數/// </summary>/// <param name="callbackReceived">接受消息回調函數,參數:主題,消息內容</param>/// <param name="callbackLog"></param>public ClientModel(Action<string, byte[]> callbackReceived, Action<string> callbackLog){_callbackReceived = callbackReceived;_callbackLog = callbackLog;}/// <summary>/// 連接服務器/// </summary>private async void ConnectServer(){try{if (_mqttClient == null){_mqttClient = new MqttFactory().CreateMqttClient() as MqttClient;_mqttClient.Connected += (s, a) => WriteLog($"【{_clienID}】已連接到MQTT服務器!");_mqttClient.Disconnected += (s, a) => WriteLog($"【{_clienID}】已斷開MQTT連接!");_mqttClient.ApplicationMessageReceived += (sender, args) =>{_callbackReceived?.Invoke(args.ApplicationMessage.Topic, args.ApplicationMessage.Payload);};}if (_mqttClient.IsConnected) return;IMqttClientOptions options = new MqttClientOptions{ChannelOptions = new MqttClientTcpOptions(){Server = _serverIp,Port = _nServerPort},CleanSession = true};_clienID = options.ClientId;await _mqttClient.ConnectAsync(options);if (_mqttClient.IsConnected){ReConnectThreadStart();SubscribeAsync();}}catch (Exception ex){WriteLog("連接到MQTT服務器失敗!");}}/// <summary>/// 客戶端重連服務器線程-啟動/// </summary>/// <returns></returns>private void ReConnectThreadStart(){if (_isRunningReConnectThreadStart) return;if (_mqttClient != null){new Task(() =>{_isRunningReConnectThreadStart = true;Thread.Sleep(5000);while (true){Thread.Sleep(1000);if (!IsConnect){WriteLog($"客戶端[{_clienID}]斷開連接,嘗試重新連接服務器中...");int i;for (i = 0; i < 60; i++){if (IsConnect) break;WriteLog($"嘗試第{i + 1}次連接服務器");ConnectServer();Thread.Sleep(1000);if (IsConnect) break;}_isRunningReConnectThreadStart = i < 60;}if (!_isRunningReConnectThreadStart) break;}}).Start();}}private void WriteLog(string log){_callbackLog?.Invoke(log);}/// <summary>/// 客戶端連接狀態/// </summary>public bool IsConnect => _mqttClient?.IsConnected == true;/// <summary>/// 連接服務器/// </summary>/// <param name="serverIp">服務器IP</param>/// <param name="serverPort">服務器端口</param>/// <param name="topic"></param>public async void ConnectServer(string serverIp, int serverPort){_serverIp = serverIp;_nServerPort = serverPort;await Task.Run(() => { ConnectServer(); });}/// <summary>/// 關閉客戶端,斷開客戶端和服務器的連接/// </summary>public void CloseClient(){_mqttClient.DisconnectAsync();}/// <summary>/// 發送消息/// </summary>/// <param name="topic">發送主題</param>/// <param name="cmd">發送內容</param>[Obsolete("Obsolete")]public void PublishAsync(string topic, string cmd){var bytes = Encoding.UTF8.GetBytes(cmd);var mode = MqttQualityOfServiceLevel.AtMostOnce;var appMsg = new MqttApplicationMessage(topic, bytes, mode, false);_mqttClient.PublishAsync(appMsg);//發送消息}/// <summary>/// 訂閱主題/// </summary>/// <param name="topics">主題標識</param>public void SubscribeAsync(params string[] topics){foreach (var topic in topics){if (!_subscribeTopics.Contains(topic)){_subscribeTopics.Add(topic);}}var topicFilters = _subscribeTopics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient?.SubscribeAsync(topicFilters);}/// <summary>/// 退訂已訂閱主題/// </summary>/// <param name="topics">主題標識</param>public void UnSubscribeAsync(params string[] topics){if (topics == null || topics.Length == 0) return;var topicFilters = topics.Select(topic => new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)).ToList();_mqttClient.SubscribeAsync(topicFilters);}/// <summary>/// 獲取所有訂閱主題/// </summary>public string[] GetAllTopic => _subscribeTopics.ToArray();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/901123.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/901123.shtml
英文地址,請注明出處:http://en.pswp.cn/news/901123.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【連載3】基礎智能體的進展與挑戰綜述

基礎智能體的進展與挑戰綜述 從類腦智能到具備可進化性、協作性和安全性的系統 【翻譯團隊】劉軍(liujunbupt.edu.cn) 錢雨欣玥 馮梓哲 李正博 李冠諭 朱宇晗 張霄天 孫大壯 黃若溪 2. 認知 人類認知是一種復雜的信息處理系統&#xff0c;它通過多個專門的神經回路協調運行…

Python語言介紹

Python 是一種高級、通用、解釋型的編程語言&#xff0c;由 Guido van Rossum 于 1991 年首次發布。其設計哲學強調代碼的可讀性和簡潔性。 Python通過簡潔的語法和強大的生態系統&#xff0c;成為當今最受歡迎的編程語言之一。 一、核心特點 Python 是一種解釋型、面向對象、…

什么是回表?哪些數據庫存在回表?

目錄 一、什么是回表1. 回表的核心流程2. 示例說明3. 回表的性能問題4. 總結 二、哪些數據庫會有回表1. MySQL&#xff08;InnoDB&#xff09;2. Oracle3. 其他數據庫&#xff08;如 SQL Server、PostgreSQL&#xff09;4. 總結 三、非聚集索引與聚集索引的區別及產生原因1. 聚…

ssh 免密登錄服務器(vscode +ssh 免密登錄)

每次打開vscode連接服務器都需要輸入密碼&#xff0c;特別繁瑣。 然后自己在網上翻閱了一下教程&#xff0c;發現說的內容比較啰嗦&#xff0c;而且個人感覺非常有誤導性傾向。 因此自己直接干脆寫一個簡便易懂的教程算了。 &#xff08;以經過本人親測&#xff0c;真實可靠&am…

基于低空經濟的無人機操控與維護實訓室解決方案

一、低空經濟時代下的無人機人才需求 1.1 低空經濟發展趨勢與政策機遇 在當前經濟與科技飛速發展的大背景下&#xff0c;低空經濟作為國家戰略性新興產業&#xff0c;正以迅猛之勢崛起&#xff0c;展現出無限的潛力與活力。其應用場景極為廣泛&#xff0c;涵蓋了物流、安防、…

PyTorch實現二維卷積與邊緣檢測:從原理到實戰

本文通過PyTorch實現二維互相關運算、自定義卷積層&#xff0c;并演示如何通過卷積核檢測圖像邊緣。同時&#xff0c;我們將訓練一個卷積核參數&#xff0c;使其能夠從數據中學習邊緣特征。 1. 二維互相關運算的實現 互相關運算&#xff08;Cross-Correlation&#xff09;是卷…

數字政府網絡架構建設方案

數字政府網絡架構建設方案 一、引言 隨著信息技術的快速發展&#xff0c;數字政府建設已成為提升政府治理能力和服務水平的關鍵。網絡架構作為數字政府的核心基礎設施&#xff0c;對于保障數據安全、提高服務效率、促進信息共享具有重要意義。本方案旨在為數字政府網絡架構建…

Python map函數介紹

在 Python 里&#xff0c;map() 是一個內置函數&#xff0c;其用途是將指定的函數應用于可迭代對象&#xff08;像列表、元組等&#xff09;的每個元素&#xff0c;最終返回一個新的迭代器。此迭代器所包含的元素是原可迭代對象中每個元素經過指定函數處理后的結果。map() 函數…

【服務器端表單字符驗證】

文章目錄 一、實驗目的二、核心代碼實現三、調試關鍵問題四、總結 一、實驗目的 掌握JSP表單驗證在服務器端的實現技術&#xff0c;實現對用戶輸入字符的非空及長度為5的驗證&#xff0c;返回對應提示信息并優化用戶交互。 二、核心代碼實現 前端表單 <form action"…

dify windos,linux下載安裝部署,提供百度云盤地址

dify下載安裝 dify1.0.1 windos安裝包百度云盤地址 通過網盤分享的文件&#xff1a;dify-1.0.1.zip 鏈接: 百度網盤 請輸入提取碼 提取碼: 1234 dify安裝包 linux安裝包百度云盤地址 通過網盤分享的文件&#xff1a;dify-1.0.1.tar.gz 鏈接: 百度網盤 請輸入提取碼 提取碼…

C++ Primer 5e 習題2.5: 指出如下字面量常量的類型

Exercise 2.5: Determine the type of each of the following literals. Explain the differences among the literals in each of the four examples: (a) ‘a’, L’a’, “a”, L"a" (b) 10, 10u, 10L, 10uL, 012, 0xC © 3.14, 3.14f, 3.14L (d) 10, 10u, 10…

CFS 調度器兩種調度類型普通調度 和 組調度

在 Linux 的 CFS&#xff08;Completely Fair Scheduler&#xff09; 調度器中&#xff0c;確實存在兩種調度類型&#xff1a;普通調度 和 組調度。這兩種調度類型分別適用于不同的場景&#xff0c;并通過三個關鍵維度&#xff08;權重、搶占優先級、最大配額&#xff09;來影響…

AF3 ProteinDataset類的_get_masked_sequence方法解讀

AlphaFold3 protein_dataset模塊 ProteinDataset 類 _get_masked_sequence 方法屬于作用是為需要預測的殘基生成掩碼。該掩碼以二進制張量形式呈現,其中 1 代表需要預測的部分,0 代表其他部分。此方法會依據多個參數來選定要掩碼的殘基,這些參數包含 mask_whole_chains、mas…

【音視頻】SDL渲染YUV格式像素

SDL視頻顯示的流程 實現流程 準備視頻文件 準備一個格式為yuv420p&#xff0c;分辨率為320x240的yuv數據&#xff0c;并且將視頻文件放入項目構建的目錄下&#xff1a; 初始化SDL 初始化SDL的視頻模塊 //初始化 SDL if(SDL_Init(SDL_INIT_VIDEO)) {fprintf( stderr, "…

關于群暉安裝tailscale后無法直鏈的問題

問題是我局域網的ipv6無法正確獲取到ip, 通過命令可以看到ipv6沒有ip tailscale netcheck C:\Users\Administrator>tailscale netcheck 2025/04/12 23:43:34 attempting to fetch a DERPMap from https://controlplane.tailscale.comReport:* Time: 2025-04-12T15:43:38.27…

[數據結構]Trie字典樹

GPT的介紹 &#x1f9e0; 一句話總結&#xff1a; 字典樹是一種專門用來存很多字符串的“超級前綴樹”&#xff0c;查找某個字符串或前綴的時候&#xff0c;特別快&#xff01; ?? 舉個生活例子&#xff08;類比&#xff09;&#xff1a; 你想做一個詞典&#xff08;Dictio…

04-算法打卡-數組-二分查找-leetcode(69)-第四天

1 題目地址 69. x 的平方根 - 力扣&#xff08;LeetCode&#xff09;69. x 的平方根 - 給你一個非負整數 x &#xff0c;計算并返回 x 的 算術平方根 。由于返回類型是整數&#xff0c;結果只保留 整數部分 &#xff0c;小數部分將被 舍去 。注意&#xff1a;不允許使用任何內…

AI領域再突破,永洪科技榮獲“2025人工智能+創新案例”獎

在2025年的今天&#xff0c;人工智能已從技術概念全面滲透至產業核心。中國作為全球AI技術應用的前沿陣地&#xff0c;正通過“人工智能”行動加速推進技術與實體經濟深度融合。 這一背景下&#xff0c;永洪科技憑借其“國內某頭部ICT人力資源板塊GenAI項目”榮獲“2025全國企業…

反序列化漏洞介紹與挖掘指南

目錄 反序列化漏洞介紹與挖掘指南 一、漏洞核心原理與危害 二、漏洞成因與常見場景 1. 漏洞根源 2. 高危場景 三、漏洞挖掘方法論 1. 靜態分析 2. 動態測試 3. 利用鏈構造 四、防御與修復策略 1. 代碼層防護 2. 架構優化 3. 運維實踐 五、工具與資源推薦 總結 反…

從零開始的C++編程 2(類和對象下)

目錄 1.構造函數初始化列表 2.類型轉換 3.static成員 4.友元 5.內部類 6.匿名對象 1.構造函數初始化列表 ①之前我們實現構造函數時&#xff0c;初始化成員變量主要使?函數體內賦值&#xff0c;構造函數初始化還有?種?式&#xff0c;就是初始化列表&#xff0c;初始化…