一、場景描述
昨天在博客《客戶端訂閱服務端事件的實現方法》中提出了利用websocket、服務端EventEmitter和客戶端mitt實現客戶端訂閱服務端事件,大大簡化了客戶端對服務端數據實時響應的邏輯。上述方案適用于單服務節點的情形。
對于由服務集群支撐的微服務架構,websocket提供的點對點通信已無法滿足前端訂閱后端集群事件的需求,升級方案是使用基于消息總線的通信方式。
二、幾種消息總線適用性比較
常用的消息總線包括Kafka、Redis和基于MQTT協議實現的EMQX。
Kafka和MQTT都是從發布/訂閱系統演化而來,但發展側重點不同。Kafka通過分布式架構提供了海量數據流的存儲,并保證數據流順序,它的設計目標是支持數據發布、訂閱和存儲。而MQTT用于網絡中傳輸小型數據包,其設計目的是實現簡單、可靠的設備間通信。
而Redis是從內存數據庫系統演化而來,發布/訂閱功能是把消息保存在內存中。與Kafka相比,其只能提供半持久化;與MQTT相比,其通信效率較低。
由于應用場景沒有對消息持久化的需求,且考慮到產業大腦平臺未來會接入工業互聯網,使用MQTT協議來搭建事件總線更利于平臺在工業互聯網環境下的擴展。
三、MQTT簡介
幾年前,本人曾寫過MQTT簡介。本文摘抄其中重要概念。
(一)MQTT協議
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是基于“訂閱/發布”模式的輕量級通信協議,該協議基于TCP/IP,能以極低的帶寬為海量(百萬級)跨域設備提供可靠的消息服務,因此在物聯網、小型移動終端、邊緣計算方面有廣泛應用。
所謂可靠的消息傳輸,體現為可配置消息的服務質量(QoS),有三種服務質量可選:
- 至多一次:
消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。應用場景如環境傳感器的數據采集,丟失一次記錄無所謂,因為不久后還會有第二次發送。 - 至少一次:
確保消息送達訂閱者,但消息可能重復,適用于冪等性操作。 - 只有一次:
最嚴格的消息服務質量,確保消息到達且僅到達一次訂閱者。應用場景如計費系統等。
MQTT協議中存在三種身份:消息總線(Broker)、發布者(Publish)和訂閱者(Subscribe),其中消息總線屬于服務器,后兩者都屬于客戶端。發布者和訂閱者可以是各種物聯網設備和小型終端,消息發布者可以同時也是消息訂閱者,如下圖所示。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
- Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
- payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
訂閱消息時,可以在訂閱表達式中使用通配符篩選器對主題進行篩選,可同時訂閱所匹配的多個主題。
MQTT協議中主要有以下5個方法:
- connect:客戶端建立與服務器的連接
- disconnect:等待客戶端完成工作后,端口與總線的會話
- subscribe:客戶端向消息總線注冊訂閱主題
- unsubscribe:客戶端等待消息總線取消所注冊的訂閱
- publish:客戶端向消息總線發送某主題的消息
(二)開源消息總線EMQX
EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang語言開發的開源物聯網MQTT消息總線。其是一款由前華為員工開發的開源軟件,軟件主頁為https://www.emqx.io/。可根據操作系統類別選擇不同版本下載安裝,或通過docker部署。
軟件安裝后,通過 emqx start
以后臺方式啟動。啟動后將會開放兩個端口:
- 18083端口為控制臺端口,可通過瀏覽器訪問該端口,首次登錄的用戶名和密碼為admin和public。控制臺提供了總線監控、用戶權限管理、在線客戶端訂閱/發布等功能。
- 8083端口為通信端口,MQTT客戶端可通過該端口與EMQX消息總線通信。
(三)MQTT.js客戶端
MQTT.js是MQTT客戶端Nodejs SDK,可在瀏覽器(ES模塊)和Node.js環境(CommonJS模塊)下使用,前者可通過MQTT over WebSocket使用,后者既可以通過MQTT over WebSocket使用,也可以直接使用MQTT。區別僅僅是連接參數的協議頭不同。
1. 安裝和幫助文件
$ pnpm i mqtt -S #安裝
$ npx mqtt help #幫助
MQTT.js command line interface, available commands are:* publish publish a message to the broker* subscribe subscribe for updates from the broker* version the current MQTT.js version* help help about commandsLaunch 'mqtt help [command]' to know more about the commands.
2. 使用方法
// const mqtt = require('mqtt') //ES模塊
import mqtt from 'mqtt' //CommonJS模塊// 連接選項
const options = {clean: true, // true: 清除會話, false: 保留會話connectTimeout: 4000, // 超時時間// 認證信息clientId: 'user_id', // 要保證唯一性// 若在控制臺配置了用戶名和密碼:// username: 'xxx',// password: 'xxx',
}// 連接字符串, 通過協議指定使用的連接方式
// ws 未加密 WebSocket 連接
// wss 加密 WebSocket 連接
// mqtt 未加密 TCP 連接
// mqtts 加密 TCP 連接
// wxs 微信小程序連接
// alis 支付寶小程序連接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重連:', error)
})client.on('error', error => {console.error('連接失敗:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二進制流,需要轉換成字符串
})//訂閱主題
const topic='/user_id/#'
const qos=0 //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{ //訂閱user_id主題下所有消息if(error){console.error('訂閱主題失敗:', error)return}console.log('訂閱成功')
})//發布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('發布消息失敗:', error)}
})//取消訂閱
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消訂閱失敗', error)return}
})//斷開連接
if(client.connected){try{client.end(false,()=>{console.log('成功斷開連接')})catch(error){console.error('斷開連接失敗:', error)}}
}
(四)安全性
1. 排它訂閱
排它訂閱是 EMQX 支持的 MQTT 擴展功能。排它訂閱允許對主題進行互斥訂閱,一個主題同一時刻僅被允許存在一個訂閱者,在當前訂閱者未取消訂閱前,其他訂閱者都將無法訂閱對應主題。
2. JWT認證
系統整體采用JWT認證方式,通過一臺認證服務器頒發JWT Token。MQTT客戶端訪問EMQX總線時,攜帶由認證服務器頒發的JWT Token。