基于開源庫編寫MQTT通訊

目錄

  • 1. MQTT是什么?
  • 2. 開發交互UI
  • 3. 服務器核心代碼
  • 4. 客戶端核心代碼
  • 5. 消息訂閱與發布
  • 6. 通訊測試
  • 7. MQTT與PLC通訊
  • 最后. 核心總結


1. MQTT是什么?

MQTT(Message Queuing Terlemetry Transport)消息隊列遙測協議;是一種輕量級的發布/訂閱消息傳輸協議,專為IOT、低帶寬、高延遲的網絡環境設計,具有高效、低耗、海量設備連接特性。

1.通訊原理

  • 發布(Publisher):發布消息到特定主題(Topic)
  • 訂閱(Subscriber):訂閱主題接收消息
  • 代理(Broker):消息路由管理,接收發布的消息,頒發給所有訂閱者

示例:傳感器(Publisher)發送消息到主題(Topic),代理(Broker)接收消息并檢查該主題的所有客戶端,訂閱者(Subscriber)實時接收消息

2.消息結構

  • 主題(topic)+負載(payload)

3.必要條件

  • MQTT Broker: 消息代理服務器
  • 客戶端庫: 設備或應用需集成MQTT客戶端
  • 端口: 默認非加密端口 1883,加密端口 8883(TLS/SSL)
  • 連接認證: 支持用戶名/密碼、客戶端證書等認證方式
  • QoS服務質量/策略
QoS等級描述適用場景
0最多一次(無確認)實時性高,允許丟數據
1至少一次(需確認)數據需可靠但不重復
2精確一次(握手確認)關鍵數據,嚴格不重復

4. 關鍵特性
遺囑消息(Last Will)

  • 設備異常斷開時,Broker 自動發布預設消息(如“設備離線”),通知其他客戶端。

保留消息(Retained)

  • Broker 保存主題的最新消息,新訂閱者首次連接時立即獲取。

主題層級(Topic Hierarchy)

  • 支持多級通配符(+ 單層,# 多層)
  • 例如:home/floor1/temperature; home/+/status(匹配所有樓層狀態)

5.應用場景

  • 移動設備遠程監控(AGV狀態上報)
  • 跨廠區數據匯聚(通過云平臺中轉)

2. 開發交互UI

創建WindowsFormAPP項目,NuGet安裝MQTTnet開源庫(項目-屬性-框架;需與程序包的依賴項一致<否則安裝錯誤>:PM> NuGet\Install-Package MQTTnet -Version 2.8.2

控件

  • label、TextBox、ComboBox、Button
  • ListView(Dock=停靠<視圖=Details;小Imagelist=Imagelist1;編輯列=Infoname、Content>)

容器

  • SplitContainer(Orienting=垂直;SplitterWith=1;BorderStyle=邊框)

組件

  • Imagelist(添加圖像)
  • timer(Enabled=true、Interval=1000)

狀態

  • StatusStrip(系統時間、連接數量、版本說明)

Server端

  • 窗體設置(Text=標題、Font=字體、StartPosition=位置、FormBoardStyle=邊框)
  • 給定服務IP,固定端口號,設置開始、停止服務、快捷打開客戶端按鈕
  • 設置日志消息顯示窗口;設置狀態欄

在這里插入圖片描述

Client端

  • 技巧:復制FrmServer(修改窗口、Designer代碼)
  • 容器:在SP1的Panel2添加SplitContainer2(Listview放在SP2的Panel1中);
  • 設置SP1的FixedPanel的Panel1不動;設置SP2的FixedPanel的Panel2不動
  • 客戶端可發布主題消息
  • 設置連接、訂閱、取消訂閱、發布主題按鈕;
  • 主題信息、給定QoS策略

在這里插入圖片描述在這里插入圖片描述


3. 服務器核心代碼

初始化-public FrmServer(){…}

  • 獲取IP集合(Dns.GetHostAddresses)
  • 綁定控件(cmb_iplist.DataSource、.SelectedIndex)

創建服務器對象(IMqttServer)(->服務啟動按鈕點擊事件)

  • 創建服務器配置 _ var optionsBuilder = new MqttServerOptionsBuilder()
    – 驗證用戶密碼_ .WithConnectionValidator(c =>...
  • 實例化服務對象_mqttServer = new MqttFactory().CreateMqttServer();
  • 創建MQTT事件方法_mqttServer.ClientConnected += MqttServer_ClientConnected;
    – 方法日志顯示_this.AddLog(0, "MQTT客戶端已連接" + "ClietID:" + e.ClientId.Length);
  • 啟動服務_mqttServer.StartAsync(optionsBuilder.Build());

日志對象

  • 創建委托_public delegate void AddlogDelegate(int index, string info);
  • 委托方法_private void AddlogMothod(int index, string info){...}
  • 委托對象_private AddlogDelegate AddLog;
  • 對象綁定方法 _this.AddLog = this.AddlogMothod;
  • 對象應用eg:_ this.AddLog(0, "MQTT服務端已停止");

狀態欄

  • 系統時間_this.tss_time.Text = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
  • 連接個數(連接/斷開事件更新)_this.tss_connnum.Text = mqttServer.GetClientSessionsStatusAsync().Result.Count.ToString();

其他

  • 停止服務_mqttServer.StopAsync();
  • 清空日志_this.lst_info.Items.Clear();
  • 打開客戶端(注意“重新生成解決方案”)_new FrmClient().Show();
  • 當前時間_DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
  • 關閉窗體停止服務

4. 客戶端核心代碼

初始化

  • 獲取IP集合(Dns.GetHostAddresses)
  • 綁定控件(cmb_iplist.DataSource、.SelectedIndex)

快捷打開客戶端,ServerIP、Port自動填寫

  • 創建帶參數的初始化構造方法_ public FrmClient(string ip, string port)

創建客戶端對象(IMqttClient)(->客戶端連接按鈕點擊事件)

  • 創建客戶端配置_ var option = new MqttClientOptions() { ClientId = Guid.NewGuid().ToString("D") };
  • 創建通道配置_ option.ChannelOptions = new MqttClientTcpOptions()
  • 是否啟用賬戶_ if (this.chk_isuserpwd.Checked)
  • 創建會話配置_option.CleanSession = true;
  • 創建客戶端對象_mqttClient = new MqttFactory().CreateMqttClient();
  • 連接服務器_ mqttClient.ConnectAsync(option);
  • 綁定事件方法(添加日志)_ mqttClient.Connected += mqttClient_Connected;

日志顯示

  • 創建委托方法_與服務端一致
  • 創建委托對象_與服務端一致
  • 初始化對象綁定方法_與服務端一致
  • 對象引用eg_與服務端一致

其他

  • 斷開連接_mqttClient.DisconnectAsync();

5. 消息訂閱與發布

客戶端可以正確連接后,只有訂閱與發布消息,才算真正進行數據通訊;消息的訂閱和發布均在客戶端進行,服務端只需提供一個服務供客戶連接(橋梁的作用);小節解釋消息訂閱發布的核心代碼。
QoS服務策略(下拉框獲取)

  • QoS服務策略(枚舉類型)_this.cmb_qos_pub.DataSource = Enum.GetNames(typeof(MqttQualityOfServiceLevel));

消息訂閱

  • 訂閱主題_mqttClient.SubscribeAsync(new List<TopicFilter>(){...});1

消息取消訂閱

  • 取消訂閱_mqttClient.UnsubscribeAsync(this.txt_topic_sub.Text);

消息發布

  • 創建消息對象_ var msg = new MqttApplicationMessage(){...} 2
  • 發布消息_mqttClient.PublishAsync(msg);

6. 通訊測試

在這里插入圖片描述


7. MQTT與PLC通訊

將msg對象中的Payload更改為PLC的寄存器即可
自動發布

  • 添加定時器timer1_設置頻率,事件(消息內容,自動發布)
  • 連接成功時啟動定時器_ this.timer1.Enabled = true;

消息內容

  • 添加引用(西門子通訊庫)
  • 創建PLC對象
  • 讀取PLC變量
  • 添加到msg中的Payload中_Payload = Encoding.UTF8.GetBytes(plcmsg),

在這里插入圖片描述


最后. 核心總結

1. 開發實現
服務端:

  • 功能:IP/端口配置、啟動/停止服務、連接監控
  • 初始化 MqttServer 對象
  • 處理連接/斷開事件(日志記錄、狀態更新)

客戶端:

  • 功能:連接/斷開、訂閱/取消主題、消息發布
  • 配置 MqttClient(IP、端口、認證)
  • 實現訂閱 (SubscribeAsync) /發布 (PublishAsync)

2. 進階應用

PLC 集成:

  • 自動發布:定時器讀取 PLC 數據并推送
  • 數據格式:Payload 封裝寄存器值(如 Siemens PLC 數據)

異常測試:

  • 基礎測試:服務端啟停、客戶端連接/斷開
  • 消息流驗證:訂閱發布功能、QoS 策略生效
  • 異常測試:網絡斷開重連、遺囑消息觸發

附:關鍵代碼片段
服務端啟動:

var options = new MqttServerOptionsBuilder().WithDefaultPort(1883).Build();
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(options);

客戶端發布消息:

var msg = new MqttApplicationMessage {Topic = "sensor/temp",Payload = Encoding.UTF8.GetBytes("25℃"),QoS = MqttQualityOfServiceLevel.AtLeastOnce
};
client.PublishAsync(msg);

PLC 數據讀取:

var plcValue = SiemensPLC.Read("DB1.DBD0"); // 讀取浮點數
var payload = $"{{\"temperature\": {plcValue}}}";

源碼鏈接
學習鏈接


  1. 創建TopicFilter對象_new TopicFilter(this.txt_topic_sub.Text, (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel),this.cmb_qos_sub.Text)) ??

  2. 消息對象賦值_ Topic = this.txt_topic_pub.Text, QualityOfServiceLevel =(MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), this.cmb_qos_pub.Text), Payload = Encoding.UTF8.GetBytes(this.txt_msg_pub.Text), Retain = false, ??

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72691.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72691.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72691.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在VScode下配置C/C++環境(tasks.json、launch.json、c_cpp_properties.json)

文章目錄 1. tasks.json、launch.json配置文件中參數(屬性)的說明2. tasks.json介紹3. launch.json介紹4. 直接生成tasks.json、launch.json配置文件的另外一種方式5. c_cpp_properties.json介紹6. 運行多個C/C文件7. 命令行方式編譯C 1. tasks.json、launch.json配置文件中參數…

ORB-SLAM2源碼學習(六):相機跟蹤(局部地圖跟蹤和關鍵幀創建)

目錄 1.局部地圖跟蹤 1.1 更新局部關鍵幀UpdateLocalKeyFrames 1.2 更新局部地圖點&#xff08;來自局部關鍵幀&#xff09;UpdateLocalPoints() 1.3 投影匹配 2. 對比四種跟蹤方式以及使用的投影匹配 3.關鍵幀創建 3.1 判斷是否需要創建新關鍵幀: NeedNewKeyFrame() 3…

PostgreSQL時間計算大全:從時間差到時區轉換(保姆級教程)

一、時間計算的三大核心場景 當你遇到這些需求時&#xff0c;本文就是你的救星&#x1f31f;&#xff1a; 倒計時功能&#xff1a;計算活動剩余天數 用戶行為分析&#xff1a;統計操作間隔時間 跨國系統&#xff1a;多時區時間統一管理 報表生成&#xff1a;自動計算同比/環…

Qt6.8.2創建WebAssmebly項目使用FFmpeg資源

Qt6新出了WebAssmebly功能&#xff0c;可以將C寫的軟件到瀏覽器中運行&#xff0c;最近一段時間正在研究這方便內容&#xff0c;普通的控件響應都能實現&#xff0c;今天主要為大家分享如何將FFmpeg中的功能應用到瀏覽器中。 開發環境&#xff1a;window11&#xff0c;Qt6.8.2…

DeepSeek V3 源碼:從入門到放棄!

從入門到放棄 花了幾天時間&#xff0c;看懂了DeepSeek V3 源碼的邏輯。源碼的邏輯是不難的&#xff0c;但為什么模型結構需要這樣設計&#xff0c;為什么參數需要這樣設置呢&#xff1f;知其然&#xff0c;但不知其所以然。除了模型結構以外&#xff0c;模型的訓練數據、訓練…

【leetcode hot 100 240】搜索二維矩陣Ⅱ

解法一&#xff1a;直接查找 class Solution {public boolean searchMatrix(int[][] matrix, int target) {for(int i0; i<matrix.length; i){for(int j0; j<matrix[0].length; j){if(matrix[i][j]>target){break;}if(matrix[i][j]target){return true;}}}return fal…

UE4 組件 (對話組件)

制作一個可以生成對話氣泡&#xff0c;顯示對話臺詞的簡單組件。這個組件要的變量&#xff1a;臺詞&#xff08;外部傳入&#xff09;。功能&#xff1a;開始對話&#xff08;生成氣泡UI&#xff09; &#xff0c;結束對話。 一、對話組件創建 二、開始對話事件 1、注意這里獲…

自動化同步多服務器數據庫表結構

當項目每次進行版本升級的時候&#xff0c;如果在這次迭代中涉及表結構變更&#xff0c;需要將不同的生產環境下&#xff0c;都需要同步表結構的DDL語句&#xff0c;比較麻煩&#xff0c;而且還有可能忘記同步腳本&#xff0c;導致生產環境報錯.... 該方案采用SpringBootMybat…

DeepSeek安全:AI網絡安全評估與防護策略

&#x1f345; 點擊文末小卡片 &#xff0c;免費獲取網絡安全全套資料&#xff0c;資料在手&#xff0c;漲薪更快 本文基于現有的公開資料&#xff0c;從企業資深網絡安全專家的視角&#xff0c;系統梳理DeepSeek技術在網絡安全領域的潛在貢獻與核心風險&#xff0c;并結合中國…

【論文筆記】Attentive Eraser

標題&#xff1a;Attentive Eraser: Unleashing Diffusion Model’s Object Removal Potential via Self-Attention Redirection Guidance Source&#xff1a;https://arxiv.org/pdf/2412.12974 收錄&#xff1a;AAAI 25 作者單位&#xff1a;浙工商&#xff0c;字節&#…

【powerjob】 powerjobserver注冊服務IP錯誤

1、問題&#xff1a;powerjobserver 4.3.6 的服務器上有多個網卡對應多個ip,示例 eth0 :IP1 &#xff0c;docker0:IP2 和worker 進行通信時 正確的應該時IP1 但是注冊顯示獲取的確實IP2,導致 worker 通過ip2和server通信&#xff0c;網絡不通&#xff0c;注冊不上 2、解決方案 …

視頻錄像機視頻通道是指什么

視頻錄像機的視頻通道是指攝像機在監控矩陣或硬盤錄像機設備上的視頻輸入的物理位置。 與攝像頭數量關系&#xff1a;在視頻監控系統中&#xff0c;有多少個攝像頭就需要多少路視頻通道&#xff0c;通道數量決定了視頻錄像機可接入攝像頭的數量&#xff0c;一般硬盤錄像機有4路…

面試150,數組 / 字符串

27. 移除元素 class Solution:def removeElement(self, nums: List[int], val: int) -> int:# 把不等于 val 的值移動到前面n len(nums)left 0for right in range(n):if nums[right] ! val:nums[left] nums[right]left 1return left26. 刪除有序數組中的重復項 只保留 1…

【江科大STM32】TIM輸入捕獲模式PWMI模式測頻率

一、輸入捕獲測頻率 接線圖&#xff1a; 測信號的輸入引腳為PA6&#xff0c;信號從PA6進來&#xff0c;待測的PWM信號也是STM32自己生成的&#xff0c;輸出引腳是PA0&#xff0c;所以接線這里直接用一根線將PA0引到PA6就可以了。 如果有信號發生器的話&#xff0c;也可以設置成…

湖倉一體化及冷、熱、實時三級存儲

一、湖倉一體化&#xff08;Lakehouse&#xff09; 湖倉一體化&#xff08;Lakehouse&#xff09;是數據湖&#xff08;Data Lake&#xff09;與數據倉庫&#xff08;Data Warehouse&#xff09;的結合&#xff0c;旨在解決傳統數據架構中數據孤島、存儲冗余、計算性能不足等問…

go切片定義和初始化

1.簡介 切片是數組的一個引用&#xff0c;因此切片是引用類型&#xff0c;在進行傳遞時&#xff0c;遵守引用傳遞的機制。切片的使用和數組類似&#xff0c;遍歷切片、訪問切片的元素和切片的長度都一樣。。切片的長度是可以變化的&#xff0c;因此切片是一個可以動態變化的數…

游戲引擎學習第138天

倉庫:https://gitee.com/mrxiao_com/2d_game_3 資產&#xff1a;game_hero_test_assets_003.zip 發布 我們的目標是展示游戲運行時的完整過程&#xff0c;從像素渲染到不使用GPU的方式&#xff0c;我們自己編寫了渲染器并完成了所有的工作。今天我們開始了一些新的內容&#…

畢業項目推薦:基于yolov8/yolov5/yolo11的暴力行為檢測識別系統(python+卷積神經網絡)

文章目錄 概要一、整體資源介紹技術要點功能展示&#xff1a;功能1 支持單張圖片識別功能2 支持遍歷文件夾識別功能3 支持識別視頻文件功能4 支持攝像頭識別功能5 支持結果文件導出&#xff08;xls格式&#xff09;功能6 支持切換檢測到的目標查看 二、數據集三、算法介紹1. YO…

docker中kibana啟動后,通過瀏覽器訪問,出現server is not ready yet

問題&#xff1a;當我在瀏覽器訪問kibana時&#xff0c;瀏覽器給我報了server is not ready yet. 在網上試了很多方法&#xff0c;都未能解決&#xff0c;下面是我的方法&#xff1a; 查看kibana日志&#xff1a; docker logs -f kibana從控制臺打印的日志可以發現&#xff…

在 Docker 中,無法直接將外部多個端口映射到容器內部的同一個端口

Docker 的端口映射是一對一的&#xff0c;即一個外部端口只能映射到容器內部的一個端口。 1. 為什么不能多對一映射&#xff1f; 端口沖突&#xff1a; 如果外部多個端口映射到容器內部的同一個端口&#xff0c;Docker 無法區分外部請求應該轉發到哪個內部端口&#xff0c;會…