MQTT(Message Queueing Telemetry Transport) 消息隊列遙測傳輸,在物聯網領域應用的很廣泛,它是基于Publish/Subscribe模式,具有簡單易用,支持QoS,傳輸效率高的特點。
它被設計用于低帶寬,不穩定或高延遲的網絡環境,因此非常適合于設備之間的數據通信。
EMQX提供了MQTT的服務器,并且可以在后臺網頁查看面板,還支持中文顯示。
下載鏈接:Directory listing for EMQX: / | EMQ
由于5.0之后的版本不再支持Windows所以使用的是4.0版本的包,在下載完壓縮包后,不用安裝,進入cmd導航到安裝的bin目錄下(注意:路徑中不能包含中文),執行命令:emqx start,看見沒有報錯就說明啟動成功了。
之后在瀏覽器里輸入:http://127.0.0.1:18083 進入面板。
在WebSocket菜單里可以模擬發布/訂閱的操作,接下來我們將使用C#完成這一系列的操作。
1、連接主機
首先新建一個WPF項目,然后在Nuget中下載MQTTnet。
// 連接主機MqttFactory factory = new MqttFactory();_client = factory.CreateMqttClient();var options = new MqttClientOptionsBuilder().WithTcpServer(this.ipAddress.Text, Convert.ToInt32(this.port.Text)).WithClientId(this.clientId.Text).Build();var result = await _client.ConnectAsync(options, CancellationToken.None);if (result.ResultCode == MqttClientConnectResultCode.Success){this.log.Text = DateTime.Now.ToString() + " 連接成功" + Environment.NewLine + this.log.Text;}else{this.log.Text = DateTime.Now.ToString() + $" 連接失敗,{result.ReasonString}" + Environment.NewLine + this.log.Text;return;}
上述使用的是TCP的方式進行連接,需要主機地址,端口號,客戶編號(一個用于區分用戶的字符串)。
2、訂閱消息
訂閱消息分為兩塊,一個是消息的回顯,一個是訂閱消息。
// 訂閱消息var option = new MqttClientSubscribeOptions();MqttQualityOfServiceLevel level;switch (this.subscribeQos.SelectedIndex){case 0:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce;break;case 1:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;break;case 2:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce;break;default:throw new Exception("請選擇服務質量");}option.TopicFilters = new List<MqttTopicFilter>(){new MqttTopicFilter(){Topic = this.subscribeTopic.Text,QualityOfServiceLevel = level}};this._client.SubscribeAsync(option, CancellationToken.None);// 將訂閱的消息回顯到日志區this._client.ApplicationMessageReceivedAsync += e =>{var task = Task.Factory.StartNew(() => {try{var array = e.ApplicationMessage.PayloadSegment.Array;if (array == null){return;}var str = Encoding.UTF8.GetString(array);// 跨線程更新UIApplication.Current.Dispatcher.Invoke(() => {this.log.Text = DateTime.Now.ToString() + " 收到消息:" + str + Environment.NewLine + this.log.Text;});}catch (Exception ex){this.log.Text = DateTime.Now.ToString() + $" {ex.Message}" + Environment.NewLine + this.log.Text;}});return task;};this.log.Text = DateTime.Now.ToString() + " 訂閱成功" + Environment.NewLine + this.log.Text;
訂閱消息只需要兩個參數:主題Topic和服務質量QoC,主題是用來區分不同頻段的消息,避免出現沖突,如果想接收到所有的消息可以這么寫:topicXXX/#,#就代表不限制范圍,如果打算只接受固定區域的消息,則需要將#改成某個字符串。
服務質量QoC是用來控制可用性的,0是最低等級,最多只發送一次,1是中級,至少發一次,但有可能出現重復接收的情況,2是最高級,只發一次,不會多也不會少。
將消息回顯需要注冊ApplicationMessageReceivedAsync事件,傳入的參數是回顯對象,返回值是一個Task類型,是在Task中獲取回顯的值并完成控件的更新操作。
3、發布消息
發布消息的參數比訂閱多兩個:消息內容Payload,持久會話(在恢復連接后保留之前的訂閱和消息傳遞狀態)
var msg = new MqttApplicationMessage();msg.Topic = this.topic.Text;msg.PayloadSegment = Encoding.UTF8.GetBytes(this.msg.Text);msg.Retain = isSave.IsChecked??false;MqttQualityOfServiceLevel level;switch (this.publishQos.SelectedIndex){case 0:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce;break;case 1:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;break;case 2:level = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce;break;default:throw new Exception("請選擇服務質量");}msg.QualityOfServiceLevel = level;var resultPublish = await _client.PublishAsync(msg, CancellationToken.None);if (resultPublish.IsSuccess == true){this.log.Text = DateTime.Now.ToString() + " 發送成功" + Environment.NewLine + this.log.Text;}else{this.log.Text = DateTime.Now.ToString() + " 發送失敗" + Environment.NewLine + this.log.Text;}