目錄
- 1. MQTT是什么?
- 2. 開發交互UI
- 3. 服務器核心代碼
- 4. 客戶端核心代碼
- 5. 消息訂閱與發布
- 6. 通訊測試
- 7. MQTT與PLC通訊
- 最后. 核心總結
1. MQTT是什么?
MQTT(Message Queuing Terlemetry Transport)消息隊列遙測協議;是一種輕量級的發布/訂閱消息傳輸協議,專為IOT、低帶寬、高延遲的網絡環境設計,具有高效、低耗、海量設備連接特性。
1.通訊原理
- 發布(Publisher):發布消息到特定主題(Topic)
- 訂閱(Subscriber):訂閱主題接收消息
- 代理(Broker):消息路由管理,接收發布的消息,頒發給所有訂閱者
示例:傳感器(Publisher)發送消息到主題(Topic),代理(Broker)接收消息并檢查該主題的所有客戶端,訂閱者(Subscriber)實時接收消息
2.消息結構
- 主題(topic)+負載(payload)
3.必要條件
- MQTT Broker: 消息代理服務器
- 客戶端庫: 設備或應用需集成MQTT客戶端
- 端口: 默認非加密端口 1883,加密端口 8883(TLS/SSL)
- 連接認證: 支持用戶名/密碼、客戶端證書等認證方式
- QoS服務質量/策略
QoS等級 | 描述 | 適用場景 |
---|---|---|
0 | 最多一次(無確認) | 實時性高,允許丟數據 |
1 | 至少一次(需確認) | 數據需可靠但不重復 |
2 | 精確一次(握手確認) | 關鍵數據,嚴格不重復 |
4. 關鍵特性
遺囑消息(Last Will)
- 設備異常斷開時,Broker 自動發布預設消息(如“設備離線”),通知其他客戶端。
保留消息(Retained)
- Broker 保存主題的最新消息,新訂閱者首次連接時立即獲取。
主題層級(Topic Hierarchy)
- 支持多級通配符(+ 單層,# 多層)
- 例如:
home/floor1/temperature; home/+/status(匹配所有樓層狀態)
5.應用場景
- 移動設備遠程監控(AGV狀態上報)
- 跨廠區數據匯聚(通過云平臺中轉)
2. 開發交互UI
創建WindowsFormAPP項目,NuGet安裝MQTTnet開源庫(項目-屬性-框架;需與程序包的依賴項一致<否則安裝錯誤>:PM> NuGet\Install-Package MQTTnet -Version 2.8.2
)
控件
- label、TextBox、ComboBox、Button
- ListView(Dock=停靠<視圖=Details;小Imagelist=Imagelist1;編輯列=Infoname、Content>)
容器
- SplitContainer(Orienting=垂直;SplitterWith=1;BorderStyle=邊框)
組件
- Imagelist(添加圖像)
- timer(Enabled=true、Interval=1000)
狀態
- StatusStrip(系統時間、連接數量、版本說明)
Server端
- 窗體設置(Text=標題、Font=字體、StartPosition=位置、FormBoardStyle=邊框)
- 給定服務IP,固定端口號,設置開始、停止服務、快捷打開客戶端按鈕
- 設置日志消息顯示窗口;設置狀態欄
Client端
- 技巧:復制FrmServer(修改窗口、Designer代碼)
- 容器:在SP1的Panel2添加SplitContainer2(Listview放在SP2的Panel1中);
- 設置SP1的FixedPanel的Panel1不動;設置SP2的FixedPanel的Panel2不動
- 客戶端可發布主題消息
- 設置連接、訂閱、取消訂閱、發布主題按鈕;
- 主題信息、給定QoS策略
3. 服務器核心代碼
初始化-public FrmServer(){…}
- 獲取IP集合
(Dns.GetHostAddresses)
- 綁定控件
(cmb_iplist.DataSource、.SelectedIndex)
創建服務器對象(IMqttServer)(->服務啟動按鈕點擊事件)
- 創建服務器配置 _
var optionsBuilder = new MqttServerOptionsBuilder()
– 驗證用戶密碼_.WithConnectionValidator(c =>...
- 實例化服務對象_
mqttServer = new MqttFactory().CreateMqttServer();
- 創建MQTT事件方法_
mqttServer.ClientConnected += MqttServer_ClientConnected;
– 方法日志顯示_this.AddLog(0, "MQTT客戶端已連接" + "ClietID:" + e.ClientId.Length);
- 啟動服務_
mqttServer.StartAsync(optionsBuilder.Build());
日志對象
- 創建委托_
public delegate void AddlogDelegate(int index, string info);
- 委托方法_
private void AddlogMothod(int index, string info){...}
- 委托對象_
private AddlogDelegate AddLog;
- 對象綁定方法 _
this.AddLog = this.AddlogMothod;
- 對象應用eg:_
this.AddLog(0, "MQTT服務端已停止");
狀態欄
- 系統時間_
this.tss_time.Text = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
- 連接個數(連接/斷開事件更新)_
this.tss_connnum.Text = mqttServer.GetClientSessionsStatusAsync().Result.Count.ToString();
其他
- 停止服務_
mqttServer.StopAsync();
- 清空日志_
this.lst_info.Items.Clear();
- 打開客戶端(注意“重新生成解決方案”)_
new FrmClient().Show();
- 當前時間_
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
- 關閉窗體停止服務
4. 客戶端核心代碼
初始化
- 獲取IP集合
(Dns.GetHostAddresses)
- 綁定控件
(cmb_iplist.DataSource、.SelectedIndex)
快捷打開客戶端,ServerIP、Port自動填寫
- 創建帶參數的初始化構造方法_
public FrmClient(string ip, string port)
創建客戶端對象(IMqttClient)(->客戶端連接按鈕點擊事件)
- 創建客戶端配置_
var option = new MqttClientOptions() { ClientId = Guid.NewGuid().ToString("D") };
- 創建通道配置_
option.ChannelOptions = new MqttClientTcpOptions()
- 是否啟用賬戶_
if (this.chk_isuserpwd.Checked)
- 創建會話配置_
option.CleanSession = true;
- 創建客戶端對象_
mqttClient = new MqttFactory().CreateMqttClient();
- 連接服務器_
mqttClient.ConnectAsync(option);
- 綁定事件方法(添加日志)_
mqttClient.Connected += mqttClient_Connected;
日志顯示
- 創建委托方法_與服務端一致
- 創建委托對象_與服務端一致
- 初始化對象綁定方法_與服務端一致
- 對象引用eg_與服務端一致
其他
- 斷開連接_
mqttClient.DisconnectAsync();
5. 消息訂閱與發布
客戶端可以正確連接后,只有訂閱與發布消息,才算真正進行數據通訊;消息的訂閱和發布均在客戶端進行,服務端只需提供一個服務供客戶連接(橋梁的作用);小節解釋消息訂閱發布的核心代碼。
QoS服務策略(下拉框獲取)
- QoS服務策略(枚舉類型)_
this.cmb_qos_pub.DataSource = Enum.GetNames(typeof(MqttQualityOfServiceLevel));
消息訂閱
- 訂閱主題_
mqttClient.SubscribeAsync(new List<TopicFilter>(){...});
1
消息取消訂閱
- 取消訂閱_
mqttClient.UnsubscribeAsync(this.txt_topic_sub.Text);
消息發布
- 創建消息對象_
var msg = new MqttApplicationMessage(){...}
2 - 發布消息_
mqttClient.PublishAsync(msg);
6. 通訊測試
7. MQTT與PLC通訊
將msg對象中的Payload更改為PLC的寄存器即可
自動發布
- 添加定時器timer1_設置頻率,事件(消息內容,自動發布)
- 連接成功時啟動定時器_
this.timer1.Enabled = true;
消息內容
- 添加引用(西門子通訊庫)
- 創建PLC對象
- 讀取PLC變量
- 添加到msg中的Payload中_
Payload = Encoding.UTF8.GetBytes(plcmsg),
最后. 核心總結
1. 開發實現
服務端:
- 功能:IP/端口配置、啟動/停止服務、連接監控
- 初始化 MqttServer 對象
- 處理連接/斷開事件(日志記錄、狀態更新)
客戶端:
- 功能:連接/斷開、訂閱/取消主題、消息發布
- 配置 MqttClient(IP、端口、認證)
- 實現訂閱 (SubscribeAsync) /發布 (PublishAsync)
2. 進階應用
PLC 集成:
- 自動發布:定時器讀取 PLC 數據并推送
- 數據格式:Payload 封裝寄存器值(如 Siemens PLC 數據)
異常測試:
- 基礎測試:服務端啟停、客戶端連接/斷開
- 消息流驗證:訂閱發布功能、QoS 策略生效
- 異常測試:網絡斷開重連、遺囑消息觸發
附:關鍵代碼片段
服務端啟動:
var options = new MqttServerOptionsBuilder().WithDefaultPort(1883).Build();
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(options);
客戶端發布消息:
var msg = new MqttApplicationMessage {Topic = "sensor/temp",Payload = Encoding.UTF8.GetBytes("25℃"),QoS = MqttQualityOfServiceLevel.AtLeastOnce
};
client.PublishAsync(msg);
PLC 數據讀取:
var plcValue = SiemensPLC.Read("DB1.DBD0"); // 讀取浮點數
var payload = $"{{\"temperature\": {plcValue}}}";
源碼鏈接
學習鏈接
創建TopicFilter對象_
new TopicFilter(this.txt_topic_sub.Text, (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel),this.cmb_qos_sub.Text))
??消息對象賦值_
Topic = this.txt_topic_pub.Text, QualityOfServiceLevel =(MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), this.cmb_qos_pub.Text), Payload = Encoding.UTF8.GetBytes(this.txt_msg_pub.Text), Retain = false,
??