MQTTServer服務器根據MQTTClient客戶端已訂閱的主題推送 分發消息

網絡讀卡器介紹:https://item.taobao.com/item.htm?ft=t&id=22173428704&spm=a21dvs.23580594.0.0.52de2c1bgK3bgZ

本示例使用了MQTTNet插件

C# MQTTNETServer?源碼

using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet.Protocol;namespace MQTTNETServerForms
{public partial class Form1 : Form{private MqttServerOptionsBuilder optionBuilder;private IMqttServer server;//mqtt服務器對象List<TopicItem> Topics = new List<TopicItem>();public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){//創建服務器對象server = new MqttFactory().CreateMqttServer();server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//綁定消息接收事件server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//綁定客戶端連接事件server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//綁定客戶端斷開事件server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//綁定客戶端訂閱主題事件server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//綁定客戶端退訂主題事件server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//綁定服務端啟動事件server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//綁定服務端停止事件}/// 綁定消息接收事件private void Server_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e){string msg = e.ApplicationMessage.ConvertPayloadToString();WriteLog(">>> 收到消息:" + msg + ",QoS =" + e.ApplicationMessage.QualityOfServiceLevel + ",客戶端=" + e.ClientId + ",主題:" + e.ApplicationMessage.Topic);}/// 綁定客戶端連接事件private void Server_ClientConnected(MqttServerClientConnectedEventArgs e){Task.Run(new Action(() =>{lbClients.BeginInvoke(new Action(() =>{lbClients.Items.Add(e.ClientId);}));}));WriteLog(">>> 客戶端" + e.ClientId + "連接");}/// 綁定客戶端斷開事件private void Server_ClientDisconnected(MqttServerClientDisconnectedEventArgs e){Task.Run(new Action(() =>{lbClients.BeginInvoke(new Action(() =>{lbClients.Items.Remove(e.ClientId);}));}));WriteLog(">>> 客戶端" + e.ClientId + "斷開");}/// 綁定客戶端訂閱主題事件private void Server_ClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e){Task.Run(new Action(() =>{var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter.Topic);if (topic == null){topic = new TopicItem { Topic = e.TopicFilter.Topic, Count = 0 };Topics.Add(topic);}if (!topic.Clients.Exists(c => c == e.ClientId)){topic.Clients.Add(e.ClientId);topic.Count++;}lvTopic.Invoke(new Action(() =>{this.lvTopic.Items.Clear();}));foreach (var item in this.Topics){lvTopic.Invoke(new Action(() =>{this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");}));}}));WriteLog(">>> 客戶端" + e.ClientId + "訂閱主題" + e.TopicFilter.Topic);}/// 綁定客戶端退訂主題事件private void Server_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e){Task.Run(new Action(() =>{var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter);if (topic != null){topic.Count--;topic.Clients.Remove(e.ClientId);}this.lvTopic.Items.Clear();foreach (var item in this.Topics){this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");}}));WriteLog(">>> 客戶端" + e.ClientId + "退訂主題" + e.TopicFilter);}/// 綁定服務端啟動事件private void Server_Started(EventArgs e){WriteLog(">>> 服務端已啟動!");Invoke(new Action(() => {this.button1.Text = "停止服務";}));}/// 綁定服務端停止事件private void Server_Stopped(EventArgs e){WriteLog(">>> 服務端已停止!");Invoke(new Action(() => {this.button1.Text = "啟動MQTT服務";}));}/// 顯示日志public void WriteLog(string message){if (txtMsg.InvokeRequired){txtMsg.Invoke(new Action(() =>{txtMsg.Text = "";txtMsg.Text = (message + "\r");}));}else{txtMsg.Text = "";txtMsg.Text = (message + "\r");}}[Obsolete]private async void button1_Click(object sender, EventArgs e){if (button1.Text == "啟動MQTT服務")   /// 啟動服務{optionBuilder = new MqttServerOptionsBuilder().WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(this.txtIP.Text)).WithDefaultEndpointPort(int.Parse(this.txtPort.Text)).WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000)).WithConnectionValidator(t =>{string un = "", pwd = "";un = this.txtUname.Text;pwd = this.txtUpwd.Text;if (t.Username != un || t.Password != pwd){t.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;}else{t.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;}});var option = optionBuilder.Build();await server.StartAsync(option);}else{if (server != null)   //停止服務{                    server.StopAsync();}}}}
}

C# MQTTNETClient?源碼

using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet;
using static System.Windows.Forms.Design.AxImporter;
using System.Net.Security;namespace MQTTNETClientForms
{public partial class Form1 : Form{private MqttFactory factory;private IManagedMqttClient mqttClient;//客戶端mqtt對象private MqttClientOptionsBuilder mqttClientOptions;private ManagedMqttClientOptionsBuilder options;private bool connstate;public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){}/// 顯示日志private void WriteLog(string message){if (txtMsg.InvokeRequired){txtMsg.Invoke(new Action(() =>{txtMsg.Text = (message);}));}else{txtMsg.Text = (message);}}/// 訂閱[Obsolete]private async void btnSub_Click(object sender, EventArgs e){if (connstate == false){WriteLog(">>> 請先與MQTT服務器建立連接!");return;}if (string.IsNullOrWhiteSpace(this.txtTopic.Text)){WriteLog(">>> 請輸入主題");return;}//在 MQTT 中有三種 QoS 級別: //At most once(0) 最多一次//At least once(1) 至少一次//Exactly once(2) 恰好一次//await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.tbTopic.Text).WithAtMostOnceQoS().Build());//最多一次, QoS 級別0await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.txtTopic.Text).WithAtLeastOnceQoS().Build());//恰好一次, QoS 級別1 WriteLog($">>> 成功訂閱");}/// 發布private async void btnPub_Click(object sender, EventArgs e){if (connstate==false){WriteLog(">>> 請先與MQTT服務器建立連接!");return;}if (string.IsNullOrWhiteSpace(this.txtTopik.Text)){WriteLog(">>> 請輸入主題");return;}var result = await mqttClient.PublishAsync(this.txtTopik.Text,this.txtContent.Text,MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 級別1   WriteLog($">>> 主題:{this.txtTopik.Text},消息:{this.txtContent.Text},結果: {result.ReasonCode}");}private async void button1_Click(object sender, EventArgs e){if (button1.Text == "連接到MQTT服務器"){connstate = false;factory = new MqttFactory();mqttClient = factory.CreateManagedMqttClient();//創建客戶端對象//綁定斷開事件mqttClient.UseDisconnectedHandler(async ee =>{                   WriteLog("與服務器之間的連接斷開了,正在嘗試重新連接");Invoke(new Action(() => {this.button1.Text = "連接到MQTT服務器";}));// 等待 5s 時間await Task.Delay(TimeSpan.FromSeconds(5));try{if (factory == null) { factory = new MqttFactory() ; }//創建客戶端對象 if (mqttClient == null) { mqttClient = factory.CreateManagedMqttClient(); }//創建客戶端對象 mqttClient.UseConnectedHandler(tt =>{connstate = true;WriteLog(">>> 連接到服務成功");Invoke(new Action(() => {this.button1.Text = "斷開與MQTT服務器的連續";}));});}catch (Exception ex){connstate = false;WriteLog($"重新連接服務器失敗:{ex}");Invoke(new Action(() => {this.button1.Text = "連接到MQTT服務器";}));}});//綁定接收事件mqttClient.UseApplicationMessageReceivedHandler(aa =>{try{string msg = aa.ApplicationMessage.ConvertPayloadToString();WriteLog(">>> 消息:" + msg + ",QoS =" + aa.ApplicationMessage.QualityOfServiceLevel + ",客戶端=" + aa.ClientId + ",主題:" + aa.ApplicationMessage.Topic);}catch (Exception ex){WriteLog($"+ 消息 = " + ex.Message);}});//綁定連接事件mqttClient.UseConnectedHandler(ee =>{connstate =true;WriteLog(">>> 連接到服務成功");Invoke(new Action(() => {this.button1.Text = "斷開與MQTT服務器的連續";}));});var mqttClientOptions = new MqttClientOptionsBuilder().WithClientId(this.txtId.Text).WithTcpServer(this.txtIP.Text, int.Parse(this.txtPort.Text)).WithCredentials(this.txtName.Text, this.txtUpwd.Text);var options = new ManagedMqttClientOptionsBuilder().WithAutoReconnectDelay(TimeSpan.FromSeconds(5)).WithClientOptions(mqttClientOptions.Build()).Build();//開啟await mqttClient.StartAsync(options);                }else{if (mqttClient != null){if (mqttClient.IsStarted){await mqttClient.StopAsync();}mqttClient.Dispose();connstate = false;}button1.Text = "連接到MQTT服務器";}}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87290.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87290.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87290.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【seismic unix 合并兩個su文件】

Seismic Unix簡介 Seismic Unix&#xff08;SU&#xff09;是由科羅拉多礦業學院開發的開源地震數據處理軟件包&#xff0c;基于Unix/Linux環境運行。它提供了一系列命令行工具&#xff0c;用于地震數據加載、處理、分析和可視化&#xff0c;支持SEG-Y格式和SU自定義格式。SU廣…

【vmware虛擬機使用】安裝vmware workstations17

安裝vmware17 本章學習目標VMware虛擬機簡介開始實操下載VMware workstation虛擬機安裝虛擬機配置虛擬機網絡 總結 本章學習目標 1.安裝vmware workstation虛擬機 2.自定義配置虛擬機網絡&#xff0c;避免網絡沖突 VMware虛擬機簡介 ? VMware的核心是Hypervisor&#xff0…

QT6 源(147)模型視圖架構里的表格窗體 QTableWidget 的范例代碼舉例,以及其條目 QTableWidgetItem 類型的源代碼。

&#xff08;1&#xff09;先用一個簡單的例子&#xff0c;學習一下本類里的成員函數的使用。生成如下圖的界面&#xff0c;表格窗體與初始數據&#xff1a; 查看其 ui_widget . h 文件 &#xff0c;里面的將是最標準的表格窗體的使用代碼 &#xff1a; #ifndef UI_WIDGET_H #…

URL時間戳參數深度解析:緩存破壞與前端優化的前世今生

&#x1f50d; URL時間戳參數深度解析&#xff1a;緩存破壞與前端優化的前世今生 在日常的Web開發中&#xff0c;你是否注意到很多接口URL后面都會帶有一個時間戳參數&#xff1f;比如 xxx/getMsg?_1751413509056。這個看似簡單的參數背后&#xff0c;卻隱藏著前端緩存策略、性…

分布式鎖實現方式:基于Redis的分布式鎖實現(Spring Boot + Redis)

Redis實現分布式鎖的原理 Redis分布式鎖基于其單線程執行命令的特性&#xff0c;通過原子操作實現多節點間的互斥訪問。下面從原理、實現、問題及優化四個方面詳細解析&#xff1a; 1.原子性與互斥性 Redis分布式鎖的核心是原子性操作&#xff1a; 獲取鎖&#xff1a;使用SE…

linux升級降級內核實驗

?實驗環境 vmware workstation 17 centos7.9 下載鏈接&#xff1a; https://vault.centos.org/7.9.2009/isos/x86_64/ ubuntu24.04 下載鏈接&#xff1a; https://old-releases.ubuntu.com/releases/24.04/ ?實驗目的 為了解決日常環境部署中某些驅動軟件依賴特定內…

華為云開始了“開發者空間 AI Agent 開發”活動

引言 今天在華為云App上偶然看到一個新活動&#xff1a;Developer Events_Developer Alliance-Huawei Cloud。這個活動要求開發者可結合自己的工作實踐&#xff0c;須在華為開發者空間內完成應用構建&#xff0c;應用構建類型和主題為AI Agent應用開發。 AI Agent平臺 華為開…

2025.6.26總結

今天和我做同一業務得同事進行了工作交接&#xff0c;主要給我講了怎么去執行自動化。包括性能自動化&#xff0c;API自動化&#xff0c;UI自動化&#xff0c;除了UI自動化要寫些代碼&#xff0c;其他跑得話也就在工具上配個參數&#xff0c;就是個搬磚得活&#xff0c;沒太大技…

ip網絡基礎

交換機工作原理&#xff1a; 自主學習mac地址并成mac地址表 根據mac地址表再進行單播、廣播轉發 主機通信原理&#xff08;局域網&#xff09;&#xff1a; 需要了解arp協議 拓撲圖&#xff1a; 首先&#xff0c;我們觀察icmp數據包&#xff0c;發現缺少目標mac地址&#…

AI大模型如何重塑軟件開發流程?

文章目錄 每日一句正能量前言一、AI大模型的定義與特點&#xff08;一&#xff09;定義&#xff08;二&#xff09;特點 二、AI大模型在軟件開發中的應用場景&#xff08;一&#xff09;代碼自動生成&#xff08;二&#xff09;智能測試&#xff08;三&#xff09;需求分析與設…

Kafka與RabbitMQ相比有什么優勢?

大家好&#xff0c;我是鋒哥。今天分享關于【Kafka與RabbitMQ相比有什么優勢&#xff1f;】面試題。希望對大家有幫助&#xff1b; Kafka與RabbitMQ相比有什么優勢&#xff1f; 超硬核AI學習資料&#xff0c;現在永久免費了&#xff01; Kafka與RabbitMQ在消息隊列的設計和應…

LeetCode 2090. 半徑為 k 的子數組平均值

題目鏈接 2090. 半徑為 k 的子數組平均值 題目描述 給定一個下標從 0 開始的整數數組 nums 和整數 k&#xff0c;構建并返回一個長度為 n 的數組 avgs&#xff0c;其中 avgs[i] 表示以下標 i 為中心、半徑為 k 的子數組的平均值。具體規則如下&#xff1a; 無效位置&#x…

深入理解C++11原子操作:從內存模型到無鎖編程

文章目錄 C并發編程的新紀元內存模型基礎&#xff1a;可見性與有序性數據競爭的根源happens-before關系memory_order枚舉詳解1. memory_order_relaxed2. memory_order_acquire/memory_order_release3. memory_order_seq_cst 原子操作詳解std::atomic模板核心原子操作1. 讀取與存…

DQL-1-基礎查詢

基礎查詢 DQL-1-基礎查詢 基礎查詢DQL - 介紹DQL - 語法DQL - 基本查詢案例 DQL - 介紹 SQL 英文全稱是 Data Query Language, 數據查詢語言, 用來查詢數據庫中表的記錄 查詢關鍵字: SELECT DQL - 語法 SELECT 字段列表FROM 表名列表WHERE條件列表GROUP BY分組字段列表HAVI…

Prompt 精通之路(七)- 你的終極 AI 寶典:Prompt 精通之路系列匯總

你的終極 AI 寶典&#xff1a;Prompt 精通之路系列匯總 標簽&#xff1a; #Prompt指南 #AI學習資源 #速查手冊 #ChatGPT #系列總結 &#x1f680; Prompt 精通之路&#xff1a;系列文章導航 第一篇&#xff1a;AI 時代的新語言&#xff1a;到底什么是 Prompt&#xff1f;為什么…

P27:RNN實現阿爾茨海默病診斷

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 一、過程解讀 PyTorch 實戰&#xff1a;阿爾茨海默病數據預測模型 今天&#xff0c;我將帶大家一起探索一個基于 PyTorch 的深度學習小項目——利用 RNN 模…

HakcMyVM-Arroutada

信息搜集 主機發現 ┌──(kali?kali)-[~] └─$ nmap -sn 192.168.21.0/24 Starting Nmap 7.95 ( https://nmap.org ) at 2025-07-01 07:13 EDT Nmap scan report for 192.168.21.11 Host is up (0.00062s latency). MAC Address: 08:00:27:4E:CC:FB (PCS Systemtechnik/Or…

TEXT Submitting Solutions

前言 USACO 訓練項目配備了一個自動評分系統&#xff0c;用于批改你的作業題目。你可以直接在題目頁面提交你的程序&#xff1b;系統會對程序進行編譯和評分&#xff0c;幾秒鐘內就能將結果反饋給你。 支持的語言有 C、C&#xff08;含 C11 和 C14&#xff09;、PASCAL、Pyth…

Reactor 瞬態錯誤

在響應式編程中&#xff0c;retryWhen 操作符通過 RetrySignal 接口提供了對重試行為的精細控制&#xff0c;特別是在處理 瞬態錯誤&#xff08;transient errors&#xff09; 時。瞬態錯誤是指那些在一段時間內發生&#xff0c;但隨后會自行恢復的錯誤&#xff0c;例如網絡請求…

基于 SpringBoot+Vue.js+ElementUI 的小型超市商品管理系統設計與實現7000字論文設計

摘要 本論文設計并實現了一個基于 SpringBoot、Vue.js 和 ElementUI 的小型超市商品管理系統。該系統旨在為小型超市提供一個高效、便捷的商品管理解決方案&#xff0c;實現商品信息的錄入、查詢、修改、刪除等功能&#xff0c;同時支持庫存管理、銷售統計等業務需求。論文首先…