MQTT協議
MQTT 是一種基于發布/訂閱模式的輕量級消息傳輸協議,專門針對低帶寬和不穩定網絡環境的物聯網應用而設計,可以用極少的代碼為聯網設備提供實時可靠的消息服務。MQTT 協議廣泛應用于物聯網、移動互聯網、智能硬件、車聯網、智慧城市、遠程醫療、電力、石油與能源等領域。
MQTT 與其他協議對比
MQTT vs HTTP
MQTT 的最小報文僅為 2 個字節,比 HTTP 占用更少的網絡開銷。
MQTT 與 HTTP 都能使用 TCP 連接,并實現穩定、可靠的網絡連接。
MQTT 基于發布訂閱模型,HTTP 基于請求響應,因此 MQTT 支持雙工通信。
MQTT 可實時推送消息,但 HTTP 需要通過輪詢獲取數據更新。
MQTT 是有狀態的,但是 HTTP 是無狀態的。
MQTT 可從連接異常斷開中恢復,HTTP 無法實現此目標。
MQTT vs XMPP
MQTT 協議設計簡單輕量、路由靈活,將在移動互聯網、物聯網消息領域,全面取代 PC 時代的 XMPP 協議。
MQTT 報文體積小且編解碼容易,XMPP 基于繁重的 XML,報文體積大且交互繁瑣。
MQTT 基于發布訂閱模式,相比 XMPP 基于 JID 的點對點消息路由更為靈活。
MQTT 支持 JSON、二進制等不同類型報文。XMPP 采用 XML 承載報文,二進制必須 Base64 編碼等處理。
MQTT 通過 QoS 保證消息可靠傳輸,XMPP 主協議并未定義類似機制。
MQTT VS HTTP 請求響應
HTTP 是萬維網數據通信的基礎,其簡單易用無客戶端依賴,被廣泛應用于各個行業。在物聯網領域,HTTP也可以用于連接物聯網設備和 Web 服務器,實現設備的遠程監控和控制。
雖然使用簡單、開發周期短,但是基于請求響應的 HTTP 在物聯網領域的應用卻有一定的局限性。
首先,協議層面 HTTP 報文相較與 MQTT 需要占用更多的網絡開銷;
其次,HTTP 是一種無狀態協議,這意味著服務器在處理請求時不會記錄客戶端的狀態,也無法實現從連接異常斷開中恢復;
最后,請求響應模式需要通過輪詢才能獲取數據更新,而 MQTT 通過訂閱即可獲取實時數據更新。
發布訂閱模式的松耦合特性,也給 MQTT 帶來了一些副作用。由于發布者并不知曉訂閱者的狀態,因此發布者也無法得知訂閱者是否收到了消息,或者是否正確處理了消息。為此,MQTT 5.0 增加了請求響應特性,以實現訂閱者收到消息后向某個主題發送應答,發布者收到應答后再進行后續操作。
MQTT VS消息隊列
盡管 MQTT 與消息隊列的很多行為和特性非常接近,比如都采用發布/訂閱模式,但是他們面向的場景卻有著顯著的不同。
消息隊列主要用于服務端應用之間的消息存儲與轉發,這類場景往往數據量大但客戶端數量少。
MQTT 是一種消息傳輸協議,主要用于物聯網設備之間的消息傳遞,這類場景的特點是海量的設備接入、管理與消息傳輸。
在一些實際的應用場景中,MQTT 與消息隊列往往會被結合起來使用,以使 MQTT 服務器能專注于處理設備的連接與設備間的消息路由。比如先由 MQTT 服務器接收物聯網設備上報的數據,然后再通過消息隊列將這些數據轉發到不同的業務系統進行處理。
不同于消息隊列,MQTT 主題不需要提前創建。MQTT 客戶端在訂閱或發布時即自動的創建了主題,開發者無需再關心主題的創建,并且也不需要手動刪除主題。
MQTT常見的特點
1)輕量高效,節省帶寬:MQTT 將協議本身占用的額外消耗最小化,消息頭部最小只需要占用 2 個字節,可穩定運行在帶寬受限的網絡環境下。同時,MQTT 客戶端只需占用非常小的硬件資源,能運行在各種資源受限的邊緣端設備上。
2)可靠性較強:提供了多種消息的質量等級
3)安全的雙向通訊:依賴于發布訂閱模式,MQTT 允許在設備和云之間進行雙向消息通信。發布訂閱模式的優點在于:發布者與訂閱者不需要建立直接連接,也不需要同時在線,而是由消息服務器負責所有消息的路由和分發工作。
安全性是所有物聯網應用的基石,MQTT 支持通過 TLS/SSL 確保安全的雙向通信,同時 MQTT 協議中提供的客戶端 ID、用戶名和密碼允許我們實現應用層的身份驗證和授權。
4)多語言支持:PHP、Node.js、python、java、golang
5)海量連接支持:連接海量的物聯網設備,離不開 MQTT 服務器的支持。目前,MQTT 服務器中支持并發連接數最多的是EMQX。最近發布的 EMQX 5.0 通過一個 23 節點的集群達成了 1 億 MQTT 連接+每秒 100 萬消息吞吐,這使得 EMQX 5.0 成為目前為止全球最具擴展性的 MQTT 服務器。
6)在線狀態感知:為了應對網絡不穩定的情況,MQTT 提供了心跳保活(Keep Alive)機制。在客戶端與服務端長時間無消息交互的情況下,Keep Alive 保持連接不被斷開,若一旦斷開,客戶端可即時感知并立即重連。
同時,MQTT 設計了遺愿(Last Will)消息,讓服務端在發現客戶端異常下線的情況下,幫助客戶端發布一條遺愿消息到指定的 MQTT 主題。
另外,部分 MQTT 服務器如 EMQX 也提供了上下線事件通知功能,當后端服務訂閱了特定主題后,即可收到所有客戶端的上下線事件,這樣有助于后端服務統一處理客戶端的上下線事件。
MQTT的常見概念
1)MQTT客戶端:任何運行的MQTT客戶端庫(MQTT開發工具的SDK)的應用或設備都是MQTT客戶端
2)MQTT Broker:實現了MQTT通訊的代理軟件
3)主題:存在于MQTT Broker中的,就是一個普通字符串,使用主體來對消息進行分類的
MQTT快速入門
EMQX:一款實現了MQTT協議的,開源的MQTT消息代理軟件,MQTT定義了消息通訊的規則和流程,而EMQX則是遵循這些規則的軟件,使得設備能夠一句MQTT協議進行有效通信。
官網地址:EMQX:用于物聯網、車聯網和工業物聯網的企業級 MQTT 平臺
下載emqx
在windows上下載emqx,網址:Directory listing for EMQX: / | EMQ
將下載的emqx-5.3.2-windows-amd64.zip
解壓出來,解壓目錄不能存在中文、空格、特殊字符
打開其中的bin文件夾,在地欄復制地址
用管理員 身份在cmd中進行操作
.\emqx.cmd install 將發行版安裝為 Windows 服務
.\emqx.cmd start 啟動服務和 Erlang 節點
.\emqx.cmd stop 停止服務和 Erlang 節點
.\emqx.cmd restart 運行停止命令和啟動命令
.\emqx.cmd uninstall 卸載服務并終止正在運行的節點
.\emqx.cmd ping 檢查節點是否正在運行
.\emqx.cmd ctl 運行管理命令
.\emqx.cmd console 在Windows shell 中啟動 Erlang 版本
.\emqx.cmd attach 連接到正在運行的節點并打開交互式控制臺
.\emqx.cmd remote_console - 與附加相同
.\emqx.cmd list 顯示已安裝的 Erlang 服務的列表
.\emqx.cmd usage 顯示可用命令
啟動 emqx服務,輸入命令:.\emqx.cmd console
提示EMQX 版本號 is running now!,則說明運行成功
瀏覽器輸入localhost:18083回車,即可訪問EMQX控制臺,在登錄頁面輸入初始化賬號 :用戶名:admin 密碼:public
連接MQTT
下載并安裝MQTTX,網址:MQTTX:全功能 MQTT 客戶端工具
進入EMQX控制臺,訪問控制->客戶端認證->創建認證方式(以選擇密碼認證為例)->添加用戶
打開MQTTX客戶端,建立連接兩個 sub和pub
sub添加訂閱aa
pub發送消息
sub接收到消息
MQTT控制報文
控制報文簡介
MQTT 控制報文是 MQTT 數據傳輸的最小單元。MQTT 客戶端和服務端通過交換控制報文來完成它們的工作,比如訂閱主題和發布消息。
MQTT 目前定義了 15 種控制報文類型,如果按照功能進行分類,我們可以將這些報文分為連接、發布、訂閱三個類別:
CONNECT 報文用于客戶端向服務端發起連接,
CONNACK 報文則作為響應返回連接的結果。
如果想要結束通信,或者遇到了一個必須終止連接的錯誤,客戶端和服務端可以發送一個 DISCONNECT 報文然后關閉網絡連接。
AUTH 報文是 MQTT 5.0 引入的全新的報文類型,它僅用于增強認證,為客戶端和服務端提供更安全的身份驗證。
PINGREQ 和 PINGRESP 報文用于連接保活和探活,客戶端定期發出 PINGREQ 報文向服務端表示自己仍然活躍,然后根據 PINGRESP 報文是否及時返回判斷服務端是否活躍。
PUBLISH 報文用于發布消息,余下的四個報文分別用于 QoS 1 和 2 消息的確認流程。
SUBSCRIBE 報文用于客戶端向服務端發起訂閱,UNSUBSCRIBE 報文則正好相反,
SUBACK 和UNSUBACK 報文分別用于返回訂閱和取消訂閱的結果。
MQTT報文格式
在MQTT中,無論是什么類型的控制報文,它們都由固定報頭、可變報頭和有效載荷三個部分組成。
固定報頭固定存在于所有控制報文中,而可變報頭和有效載荷是否存在以及它們的內容則取決于具體的報文類型。例如用于維持連接的 PINGREQ 報文就只有一個固定報頭,用于傳遞應用消息的 PUBLISH 報文則完整地包含了這三個部分。
固定報頭
固定報頭由報文類型、標識位和報文剩余長度三個字段組成。
報文類型位于固定報頭第一個字節的高 4 位,它是一個無符號整數,很顯然,它表示當前報文的類型,例如 1 表示這是一個 CONNECT 報文,2 表示 CONNACK 報文等等。事實上,除了報文類型和剩余長度這兩個字段,MQTT報文剩余部分的內容基本都取決于具體的報文類型,所以這個字段也決定了接收方應該如何解析報文的后續內容。
固定報頭第一個字節中剩下的低 4 位包含了由控制報文類型決定的標識位。不過到 MQTT 5.0 為止,只有 PUBLISH 報文的這四個比特位被賦予了明確的含義:
Bit 3:DUP,表示當前 PUBLISH 報文是否是一個重傳的報文。
Bit 2,1:QoS,表示當前 PUBLISH 報文使用的服務質量等級。
Bit 0:Retain,表示當前 PUBLISH 報文是否是一個保留消息。
其他所有的報文中,這 4 位都仍是保留的,即它們是一個固定的,不可隨意變更的值。
最后的剩余長度指示了當前控制報文剩余部分的字節數,也就是可變報頭和有效載荷這兩個部分的長度。
所以 MQTT 控制報文的總長度實際上等于固定報頭的長度加上剩余長度。
可變報頭
可變報頭的內容取決于具體的報文類型。例如 CONNECT 報文的可變報頭按順序包含了協議名、協議級別、連接標識、Keep Alive 和屬性這五個字段。PUBLISH 報文的可變報頭則按順序包含了主題名、報文標識符和屬性這三個字段。
有效載荷
最后是有效載荷部分。我們可以將報文的可變報頭看作是它的附加項,而有效載荷則用于實現這個報文的核心目的。
比如在 PUBLISH 報文中,Payload 用于承載具體的應用消息內容,這也是 PUBLISH 報文最核心的功能。
而 PUBLISH 報文的可變報頭中的 QoS、Retain 等字段,則是圍繞著應用消息提供一些額外的能力。
SUBSCRIBE 報文也是如此,Payload 包含了想要訂閱的主題以及對應的訂閱選項,這也是 SUBSCRIBE 報文最主要的工作。
MQTT報文驗證
wireshark工具抓取通訊數據報文
下載地址:Wireshark · Download
QOS
QOS是什么
很多時候,使用 MQTT 協議的設備都運行在網絡受限的環境下,而只依靠底層的 TCP 傳輸協議,并不能完全保證消息的可靠到達。因此,MQTT 提供了 QoS 機制,其核心是設計了多種消息交互機制來提供不同的服務質量,來滿足用戶在各種場景下對消息可靠性的要求。
MQTT 定義了三個 QoS 等級,分別為:
QoS 0,消息最多發送一次--->可能丟失消息
QoS 1,消息至少發送一次--->可以保證收到消息,但消息可能重復
QoS 2,消息只發送一次--->可以保證消息既不丟失也不重復
QoS 等級從低到高,不僅意味著消息可靠性的提升,也意味著傳輸復雜程度的提升。
在一個完整的從發布者到訂閱者的消息投遞流程中,QoS 等級是由發布者在 PUBLISH 報文中指定的,大部分情況下 Broker 向訂閱者轉發消息時都會維持原始的 QoS 不變。不過也有一些例外的情況,根據訂閱者的訂閱要求,消息的 QoS 等級可能會在轉發的時候發生降級。
例如,訂閱者在訂閱時要求 Broker 可以向其轉發的消息的最大 QoS 等級為 QoS 1,那么后續所有 QoS 2 消息都會降級至 QoS 1 轉發給此訂閱者,而所有 QoS 0 和 QoS 1 消息則會保持原始的 QoS 等級轉發。
QoS 0 - 最多交付一次
QoS 0 是最低的 QoS 等級。QoS 0 消息即發即棄,不需要等待確認,不需要存儲和重傳,因此對于接收方來說,永遠都不需要擔心收到重復的消息。
為什么 QoS 0 消息會丟失?
當我們使用 QoS 0 傳遞消息時,消息的可靠性完全依賴于底層的 TCP 協議。
而 TCP 只能保證在連接穩定不關閉的情況下消息的可靠到達,一旦出現連接關閉、重置,仍有可能丟失
當前處于網絡鏈路或操作系統底層緩沖區中的消息。這也是 QoS 0 消息最主要的丟失場景。
QoS 1 - 至少交付一次
為了保證消息到達,QoS 1 加入了應答與重傳機制,發送方只有在收到接收方的 PUBACK 報文以后,才能認為消息投遞成功,在此之前,發送方需要存儲該 PUBLISH 報文以便下次重傳。
QoS 1 需要在 PUBLISH 報文中設置 Packet ID,而作為響應的 PUBACK 報文,則會使用與 PUBLISH 報文相同的 Packet ID,以便發送方收到后刪除正確的 PUBLISH 報文緩存。
為什么 QoS 1 消息會重復?
對于發送方來說,沒收到 PUBACK 報文分為以下兩種情況:
1.PUBLISH 未到達接收方
2.PUBLISH 已經到達接收方,接收方的 PUBACK 報文還未到達發送方
在第一種情況下,發送方雖然重傳了 PUBLISH 報文,但是對于接收方來說,實際上仍然僅收到了一次消息。
但是在第二種情況下,在發送方重傳時,接收方已經收到過了這個 PUBLISH 報文,這就導致接收方將收到重復的消息。
雖然重傳時 PUBLISH 報文中的 DUP 標志會被設置為 1,用以表示這是一個重傳的報文。但是接收方并不能因此假定自己曾經接收過這個消息,仍然需要將其視作一個全新的消息。
QoS 2 - 只交付一次
QoS 2 解決了 QoS 0、1 消息可能丟失或者重復的問題,但相應地,它也帶來了最復雜的交互流程和最高的開銷。每一次的 QoS 2 消息投遞,都要求發送方與接收方進行至少兩次請求/響應流程。
首先,發送方存儲并發送 QoS 為 2 的 PUBLISH 報文以啟動一次 QoS 2 消息的傳輸,然后等待接收方回復 PUBREC 報文。這一部分與 QoS 1 基本一致,只是響應報文從 PUBACK 變成了PUBREC。
當發送方收到 PUBREC 報文,即可確認對端已經收到了 PUBLISH 報文,發送方將不再需要重傳這個報文,并且也不能再重傳這個報文。所以此時發送方可以刪除本地存儲的 PUBLISH 報文,然后發送一個 PUBREL 報文,通知對端自己準備將本次使用的 Packet ID 標記為可用了。與 PUBLISH 報文一樣,我們需要確保 PUBREL 報文到達對端,所以也需要一個響應報文,并且這個 PUBREL 報文需要被存儲下來以便后續重傳。
當接收方收到 PUBREL 報文,也可以確認在這一次的傳輸流程中不會再有重傳的 PUBLISH 報文到達,因此回復 PUBCOMP 報文表示自己也準備好將當前的 Packet ID 用于新的消息了。
當發送方收到 PUBCOMP 報文,這一次的 QoS 2 消息傳輸就算正式完成了。在這之后,發送方可以再次使用當前的 Packet ID 發送新的消息,而接收方再次收到使用這個 Packet ID 的 PUBLISH 報文時,也會將它視為一個全新的消息。
為什么 QoS 2 消息不會重復?
QoS 2 消息保證不會丟失的邏輯與 QoS 1 相同,所以這里我們就不再重復了。
與 QoS 1 相比,QoS 2 新增了 PUBREL 報文和 PUBCOMP 報文的流程,也正是這個新增的流程帶來了消息不會重復的保證。
在我們更進一步之前,我們先快速回顧一下 QoS 1 消息無法避免重復的原因。
當我們使用 QoS 1 消息時,對接收方來說,回復完 PUBACK 這個響應報文以后 Packet ID 就重新可用了,也不管響應是否確實已經到達了發送方。所以就無法得知之后到達的,攜帶了相同 Packet ID 的PUBLISH 報文,到底是發送方因為沒有收到響應而重傳的,還是發送方因為收到了響應所以重新使用了這個 Packet ID 發送了一個全新的消息。
所以,消息去重的關鍵就在于,通信雙方如何正確地同步釋放 Packet ID,換句話說,不管發送方是重傳消息還是發布新消息,一定是和對端達成共識了的。而 QoS 2 中增加的 PUBREL 流程,正是提供了幫助通信雙方協商 Packet ID 何時可以重用的能力。
QoS 2 規定,發送方只有在收到 PUBREC 報文之前可以重傳 PUBLISH 報文。一旦收到 PUBREC 報文并發出 PUBREL 報文,發送方就進入了 Packet ID 釋放流程,不可以再使用當前 Packet ID 重傳PUBLISH 報文。同時,在收到對端回復的 PUBCOMP 報文確認雙方都完成 Packet ID 釋放之前,也不可以使用當前 Packet ID 發送新的消息。
因此,對于接收方來說,能夠以 PUBREL 報文為界限,凡是在 PUBREL 報文之前到達的 PUBLISH 報文,都必然是重復的消息;而凡是在 PUBREL 報文之后到達的 PUBLISH 報文,都必然是全新的消息。一旦有了這個前提,我們就能夠在協議層面完成 QoS 2 消息的去重。
不同 QoS 的適用場景和注意事項
QoS 0
QoS 0 的缺點是可能會丟失消息,消息丟失的頻率依賴于你所處的網絡環境,并且可能使你錯過斷開連接期間的消息,不過優點是投遞的效率較高。
所以我們通常選擇使用 QoS 0 傳輸一些高頻且不那么重要的數據,比如傳感器數據,周期性更新,即使遺漏幾個周期的數據也可以受。
QoS 1
QoS 1 可以保證消息到達,所以適合傳輸一些較為重要的數據,比如下達關鍵指令、更新重要的有實時性要求的狀態等。
但因為 QoS 1 還可能會導致消息重復,所以當我們選擇使用 QoS 1 時,還需要能夠處理消息的重復,或者能夠允許消息的重復。
在我們決定使用 QoS 1 并且不對其進行去重處理之前,我們需要先了解,允許消息的重復,可能意味著什么。
如果我們不對 QoS 1 進行去重處理,我們可能會遭遇這種情況,發布方以 1、2 的順序發布消息,但最終訂閱方接收到的消息順序可能是 1、2、1、2。如果 1 表示開燈指令,2 表示關燈指令,我想大部分用戶都不會接受自己僅僅進行了開燈然后關燈的操作,結果燈在開和關的狀態來回變化。
QoS 2
QoS 2 既可以保證消息到達,也可以保證消息不會重復,但傳輸成本最高。如果我們不愿意自行實現去重方案,并且能夠接受 QoS 2 帶來的額外開銷,那么 QoS 2 將是一個合適的選擇。通常我們會在金融、航空等行業場景下會更多地見到 QoS 2 的使用。
如何為 QoS 1 消息去重?
在我們介紹 QoS 1 的時候講到,QoS 1 消息的重復在協議層面上是無法避免的。所以如果我們想要對 QoS1 消息進行去重,只能從業務層面入手。
一個比較常用且簡單的方法是,在每個 PUBLISH 報文的 Payload 中都帶上一個時間戳或者一個單調遞增的計數,這樣上層業務就可以根據當前收到消息中的時間戳或計數是否大于自己上一次接收的消息中的時間戳或計數來判斷這是否是一個新消息。
何時向后分發 QoS 2 消息?
我們已經了解到,QoS 2 的流程是非常長的,為了不影響消息的實時性,我們可以在第一次收到 PUBLISH 報文時,就啟動消息的向后分發。當然一旦開始向后分發,后續收到在 PUBREL 報文之前到達的 PUBLISH 報文,都不能再重復分發操作,以免消息重復。
不同 QoS 的性能有差距么?
以 EMQX 為例,在相同的硬件配置下進行點對點通信,通常 QoS 0 與 QoS 1 能夠達到的吞吐比較接近,不過 QoS 1 的 CPU 占用會略高于 QoS 0,負載較高時,QoS 1 的消息延遲也會進一步增加。而 QoS 2 能夠達到的吞吐一般僅為 QoS 0、1 的一半左右。
主題與通配符
主題
MQTT 主題本質上是一個 UTF-8 編碼的字符串,是 MQTT 協議進行消息路由的基礎。MQTT 主題類似URL 路徑,使用斜杠/進行分層
為了避免歧義且易于理解,通常不建議主題以/開頭或結尾,例如/chat或chat/
不同于消息隊列中的主題(比如 Kafka 和 Pulsar),MQTT 主題不需要提前創建。MQTT 客戶端在訂閱或發布時即自動的創建了主題,開發者無需再關心主題的創建,并且也不需要手動刪除主題。
主題通配符
MQTT 主題通配符包含單層通配符+及多層通配符#,主要用于客戶端一次訂閱多個主題。
注意: 通配符只能用于訂閱,不能用于發布。
單層通配符
加號 (“+” U+002B) 是用于單個主題層級匹配的通配符。在使用單層通配符時,單層通配符必須占據整個層級,例如:
多層通配符
井字符號(“#” U+0023)是用于匹配主題中任意層級的通配符。多層通配符表示它的父級和任意數量的子層級,在使用多層通配符時,它必須占據整個層級并且必須是主題的最后一個字符,例如:
系統主題-以 $ SYS/開頭的主題
以 $SYS/ 開頭的主題為系統主題,系統主題主要用于獲取 MQTT 服務器自身運行狀態、消息統計、客戶端上下線事件等數據。目前,MQTT 協議暫未明確規定 $SYS/ 主題標準,但大多數 MQTT 服務器都遵循該標準建議。
例如,EMQX 服務器支持通過以下主題獲取集群狀態。
EMQX 還支持客戶端上下線事件、收發流量、消息收發、系統監控等豐富的系統主題,用戶可通過訂閱$SYS/# 主題獲取所有系統主題消息。
不同場景主題設計
智能家居
比如我們用傳感器監測臥室、客廳以及廚房的溫度、濕度和空氣質量,可以設計以下幾個主題:
myhome/bedroom/temperature?
myhome/bedroom/humidity
myhome/bedroom/airquality
myhome/livingroom/temperature?
myhome/livingroom/humidity
myhome/livingroom/airquality
myhome/kitchen/temperature?
myhome/kitchen/humidity
myhome/kitchen/airquality
接下來,可以通過訂閱 myhome/bedroom/+ 主題獲取臥室的溫度、濕度及空氣質量數據,訂閱myhome/+/temperature 主題獲取三個房間的溫度數據,訂閱 myhome/# 獲取所有的數據。
充電樁
充電樁的上行主題格式為 ocpp/cp/${cid}/notify/${action},下行主題格式為 ocpp/cp/${cid}/reply/${action}。
ocpp/cp/cp001/notify/bootNotification
充電樁上線時向該主題發布上線請求。
ocpp/cp/cp001/notify/startTransaction?
向該主題發布充電請求。
ocpp/cp/cp001/reply/bootNotification
充電樁上線前需訂閱該主題接收上線應答。
ocpp/cp/cp001/reply/startTransaction
充電樁發起充電請求前需訂閱該主題接收充電請求應答。
即時消息
chat/user/${user_id}/inbox?
一對一聊天:用戶上線后訂閱該收件箱主題 ,將能接收到好友發送給自己的消息。給好友回復消息時,只需要將該主題的 user_id 換為好友的的 id 即可。
chat/group/${group_id}/inbox?
群聊:用戶加群成功后,可訂閱該主題獲取對應群組的消息,回復群聊時直接給該主題發布消息即可。
req/user/${user_id}/add
添加好友:可向該主題發布添加好友的申請(user_id 為對方的 id)。
接收好友請求:用戶可訂閱該主題(user_id 為自己的 id)接收其他用戶發起的好友請求。
resp/user/${user_id}/add
接收好友請求的回復:用戶添加好友前,需訂閱該主題接收請求結果(user_id 為自己的 id)。
回復好友申請:用戶向該主題發送消息表明是否同意好友申請(user_id 為對方的 id)。
user/${user_id}/state?
用戶在線狀態:用戶可以訂閱該主題獲取好友的在線狀態。
MQTT 主題常見問題及解答
主題的層級及長度有什么限制嗎?
MQTT 協議規定主題的長度為兩個字節,因此主題最多可包含 65,535 個字符。
建議主題層級為 7 個以內。使用較短的主題名稱和較少的主題層級意味著較少的資源消耗,例如
my-home/room1/data 比 my/home/room1/data 更好。
服務器對主題數量有限制嗎?
不同消息服務器對最大主題數量的支持各不一致,目前 EMQX 的默認配置對主題數量沒有限制,但是主題數量越多將會消耗越多的服務器內存。考慮到連接到 MQTT Broker 的設備數量一般較多,我們建議一個客戶端訂閱的主題數量最好控制在 10 個以內。
通配符主題訂閱與普通主題訂閱性能是否一致?
通配符主題訂閱的性能弱于普通主題訂閱,且會消耗更多的服務器資源,用戶可根據實際業務情況選擇訂閱類型。
同一個主題能被共享訂閱與普通訂閱同時使用嗎?
可以,但是不建議同時使用。
常見的 MQTT 主題使用建議有哪些?
不建議使用 # 訂閱所有主題;
不建議主題以 / 開頭或結尾,例如 /chat 或 chat/;
不建議在主題里添加空格及非 ASCII 特殊字符;
同一主題層級內建議使用下劃線 _ 或橫桿 - 連接單詞(或者使用駝峰命名);
盡量使用較少的主題層級;
當使用通配符時,將唯一值的主題層(例如設備號)越靠近第一層越好。例如,
device/00000001/command/# 比 device/command/00000001/# 更好。
持久會話
什么是 MQTT 持久會話?
不穩定的網絡及有限的硬件資源是物聯網應用需要面對的兩大難題,MQTT 客戶端與服務器的連接可能隨時會因為網絡波動及資源限制而異常斷開。為了解決網絡連接斷開對通信造成的影響,MQTT 協議提供了持久會話功能。
MQTT 客戶端在發起到服務器的連接時,可以設置是否創建一個持久會話。持久會話會保存一些重要的數據,以使會話能在多個網絡連接中繼續。持久會話主要有以下三個作用:
避免因網絡中斷導致需要反復訂閱帶來的額外開銷。
避免錯過離線期間的消息。
確保 QoS 1 和 QoS 2 的消息質量保證不被網絡中斷影響。
持久會話需要存儲哪些數據?
通過上文我們知道持久會話需要存儲一些重要的數據,以使會話能被恢復。這些數據有的存儲在客戶端,有的則存儲在服務端。
客戶端中存儲的會話數據:
已發送給服務端,但是還沒有完成確認的 QoS 1 與 QoS 2 消息。
從服務端收到的,但是還沒有完成確認的 QoS 2 消息。
服務端中存儲的會話數據:
會話是否存在,即使會話狀態其余部分為空。
已發送給客戶端,但是還沒有完成確認的 QoS 1 與 QoS 2 消息。
等待傳輸給客戶端的 QoS 0 消息(可選),QoS 1 與 QoS 2 消息。
從客戶端收到的,但是還沒有完成確認的 QoS 2 消息,遺囑消息和遺囑延時間隔。
MQTT Clean Session 的使用
Clean Session 是用來控制會話狀態生命周期的標志位,為 true 時表示創建一個新的會話,在客戶端斷開
連接時,會話將自動銷毀。為 false 時表示創建一個持久會話,在客戶端斷開連接后會話仍然保持,直到會話超時注銷。
注意: 持久會話能被恢復的前提是客戶端使用固定的 Client ID 再次連接,如果 Client ID 是動態的,那么連接成功后將會創建一個新的持久會話。
MQTT 5.0 中將 Clean Session 拆分成了 Clean Start 與 Session Expiry Interval。Clean Start 用于指定連接時是創建一個全新的會話還是嘗試復用一個已存在的會話,Session Expiry Interval 用于指定網絡連接斷開后會話的過期時間。
Clean Start 為 true 時表示必須丟棄任何已存在的會話,并創建一個全新的會話;為 false 時表示必須使用與 Client ID 關聯的會話來恢復與客戶端的通信(除非會話不存在)。
Session Expiry Interval 解決了 MQTT 3.1.1 中持久會話永久存在造成的服務器資源浪費問題。設置為 0或未設置,表示斷開連接時會話即到期;設置為大于 0 的數值,則表示會話在網絡連接關閉后會保持多少秒;設置為 0xFFFFFFFF 表示會話永遠不會過期。
會話相關問題
當會話結束后,保留消息還存在么?
MQTT 保留消息不是會話狀態的一部分,它們不會在會話結束時被刪除。
客戶端如何知道當前會話是被恢復的會話?
MQTT 協議從 v3.1.1 開始,就為 CONNACK 報文設計了 Session Present 字段。當服務器返回的該字
段值為 1 時,表示當前連接將會復用服務器保存的會話。客戶端可通過該字段值決定在連接成功后是否需
要重新訂閱。
使用持久會話時有哪些建議?
不能使用動態 Client ID,需要保證客戶端每次連接的 Client ID 都是固定的。
根據服務器性能、網絡狀況、客戶端類型等合理評估會話過期時間。設置過長會占用更多的服務端資源,設置過短會導致未重連成功會話就失效。
當客戶端確定不再需要會話時,可使用 Clean Session 為 true 進行重連,重連成功后再斷開連接。
如果是 MQTT 5.0 則可在斷開連接時直接設置 Session Expiry Interval 為 0,表示連接斷開后會話即失效。
保留消息
什么是 MQTT 保留消息?
發布者發布消息時,如果 Retained 標記被設置為 true,則該消息即是 MQTT 中的保留消息(RetainedMessage)。MQTT 服務器會為每個主題存儲最新一條保留消息,以方便消息發布后才上線的客戶端在訂閱主題時仍可以接收到該消息。
何時使用 MQTT 保留消息?
發布訂閱模式雖然能讓消息的發布者與訂閱者充分解耦,但也存在一個缺點,即訂閱者無法主動向發布者請求消息。訂閱者何時收到消息完全依賴于發布者何時發布消息,這在某些場景中就產生了不便。
借助保留消息,新的訂閱者能夠立即獲取最近的狀態,而不需要等待無法預期的時間,例如:
智能家居設備的狀態只有在變更時才會上報,但是控制端需要在上線后就能獲取到設備的狀態;
傳感器上報數據的間隔太長,但是訂閱者需要在訂閱后立即獲取到最新的數據;
傳感器的版本號、序列號等不會經常變更的屬性,可在上線后發布一條保留消息告知后續的所有訂閱者。
MQTT 保留消息的使用
若要使用 MQTT 保留消息,只需在消息發布時將 Retained 狀態設置為 true 即可
如何判斷一條消息是否是保留消息?
當客戶端訂閱了有保留消息的主題后,即會收到該主題的保留消息,可通過消息中的保留標志位判斷是否是保留消息。需要注意的是,在保留消息發布前訂閱主題,將不會收到保留消息。需要待保留消息發布后,重新訂閱該主題,才會收到保留消息。
先訂閱主題 sensor/t2,然后向該主題發布一條保留消息,該訂閱會立即收到一條消息,但是該消息并不是保留消息。當我們刪除該訂閱,再次重新訂閱 sensor/t2 主題時,立即收到了剛剛發布的保留消息。
注意:
1)可以通過Dasgboard查看保留消息
2)MQTT服務器會為每個主題存儲最新一條保留消息
3)在保留消息發布前訂閱主題,將不會收到保留消息。需要待保留消息發布后,重新訂閱該主題,才會收到保留消息。
保留消息將保存多久?如何刪除?
服務器只會為每個主題保存最新一條保留消息,保留消息的保存時間與服務器的設置有關。若服務器設置保留消息存儲在內存,則 MQTT 服務器重啟后消息即會丟失;若存儲在磁盤,則服務器重啟后保留消息仍然存在。
保留消息存儲方式默認存儲為ram,存儲在內存中,emqx重啟后消息丟失;可以設置為disc,存儲在磁盤中,重啟后消息不會丟失
保留消息雖然存儲在服務端中,但它并不屬于會話的一部分。也就是說,即便發布這個保留消息的會話已結束,保留消息也不會被刪除。刪除保留消息有以下幾種方式:
客戶端往某個主題發送一個 Payload 為空的保留消息,服務端就會刪除這個主題下的保留消息;
在 MQTT 服務器上刪除,比如 EMQX MQTT 服務器提供了在 Dashboard 上刪除保留消息的功能;
MQTT 5.0 新增了消息過期間隔屬性,發布時可使用該屬性設置消息的過期時間,不管消息是否為保留消息,都將會在過期時間后自動被刪除。
消息過期間隔
什么是消息過期間隔?
消息過期間隔是 MQTT 5.0 引入的一個新特性,它允許發布端為有時效性的消息設置一個過期間隔,如果該消息在服務端中停留超過了這個指定的間隔,那么服務端將不會再將它分發給訂閱端。默認情況下,消息中不會包含消息過期間隔,這表示該消息永遠不會過期。
MQTT 的持久會話可以為離線客戶端緩存尚未發送的消息,然后在客戶端恢復連接時發送。但如果客戶端離線時間較長,可能有一些壽命較短的消息已經沒有必要必須發送給客戶端了,繼續發送這些過期的消息,只會浪費網絡帶寬和客戶端資源。
以聯網汽車為例,我們可以向車輛發送建議車速使它能夠在綠燈期間通過路口,這類消息通常僅在車輛到達下一個路口之前有效,生命周期非常短暫。而前方擁堵提醒這類消息的生命周期則會更長一些,一般會在半小時到 1 小時內有效。
如果客戶端在發布消息時設置了過期間隔,那么服務端在轉發這個消息時也會包含過期間隔,但過期間隔的值會被更新為服務端接收到的值減去該消息在服務端停留的時間。
何時使用消息過期間隔?
消息過期間隔非常適合在以下場景下使用:
-
與時間強綁定的消息。比如優惠還剩最后兩小時這個消息,如果用戶在兩個小時后才收到它,不會有任何的意義。
-
周期性告知最新狀態的消息。仍然以道路擁堵提醒為例,我們需要周期性向車輛發送擁堵的預計結束時間,這個時間會隨最新的道路情況而發生變化。所以當最新的消息到達后,之前還未發送的消息也沒有必要繼續發送了。此時消息的過期間隔將由我們實際的發送周期決定。
-
保留消息。相比于需要再次發送 Payload 為空的保留消息來清除對應主題下的保留消息,為其設置過期時間然后由服務器自動刪除顯然更加方便,這也可以有效避免保留消息占用過多的存儲資源。
遺囑消息
遺囑消息是 MQTT 為那些可能出現意外斷線的設備提供的將遺囑優雅地發送給第三方的能力。意外斷線包
括但不限于:
因網絡故障或網絡波動,設備在保持連接周期內未能通訊,連接被服務端關閉
設備意外掉電
設備嘗試進行不被允許的操作而被服務端關閉連接,例如訂閱自身權限以外的主題等
遺囑消息可以看作是一個簡化版的 PUBLISH 消息,他也包含 Topic, Payload, QoS 等字段。遺囑消息會在設備與服務端連接時,通過 CONNECT 報文指定,然后在設備意外斷線時由服務端將該遺囑消息發布到連接時指定的遺囑主題(Will Topic)上。這也意味著服務端必須在回復 CONNACK 之前完成遺囑消息的存儲,以確保之后任一時刻發生意外斷線的情況,服務端都能保證遺囑消息被發布。
Will Retain 的使用場景,它是保留消息與遺囑消息的結合。如果訂閱該遺囑主題(WillTopic)的客戶端不能保證遺囑消息發布時在線,那么建議為遺囑消息設置 Will Retain,避免訂閱端錯過遺囑消息。
Will Flag 通常是 MQTT 協議實現方關心的字段,它用于標識 CONNECT 報文中是否會包含 WillProperties、Will Topic 等字段。
最后一個是 MQTT 5.0 新增的 Will Properties 字段,屬性本身也是 MQTT 5.0 的一個新特性,不同類型的報文有著不同的屬性,例如 CONNECT 報文有會話過期間隔(Session Expiry Interval)、最大報文長度(Maximum Packet Size)等屬性,SUBSCRIBE 報文則有訂閱標識符(Subscription Identifier)等屬性。
Will Properties 中的消息過期間隔(Message Expiry Interval)等屬性與 PUBLISH 報文中的用法基本一致,只有一個遺囑延遲間隔(Will Delay Interval)是遺囑消息特有的屬性。
遺囑延遲間隔顧名思義,就是在連接斷開后延遲一段時間才發布遺囑消息。它的一個重要用途就是避免在設備因網絡波動短暫斷開連接,但能夠快速恢復連接繼續提供服務時發出遺囑消息,并對遺囑消息訂閱方造成困擾。
需要注意的是,具體延遲多久發布遺囑消息,除了遺囑延遲間隔,還受限于會話過期間隔,取決于兩者誰先發生。所以當我們將會話過期間隔設置為 0 時,即會話在網絡連接關閉時過期,那么不管遺囑延遲間隔的值是多少,遺囑消息都會在網絡連接斷開時立即發布。
延遲發布
EMQX的延遲發布功能是一種強大的消息處理機制,它允許用戶按照配置的時間間隔延遲發布MQTT消息。以下是對EMQX延遲發布功能的詳細解析:
一、功能概述
EMQX的延遲發布功能依賴于其內置的emqx_mod_delayed
模塊。當客戶端使用特殊主題前綴$delayed/{DelayInterval}
發布消息到EMQX時,將觸發延遲發布功能。這意味著消息不會立即被發布到目標主題,而是會在指定的延遲時間后發布。
二、主題格式
延遲發布主題的具體格式如下:
-
$delayed/
:這是延遲發布功能的前綴,所有使用該前綴的主題都將被視為需要延遲發布的消息。 -
{DelayInterval}
:指定MQTT消息延遲發布的時間間隔,單位是秒。允許的最大間隔是4294967秒。如果{DelayInterval}
無法被解析為一個整型數字,EMQX將丟棄該消息,客戶端不會收到任何信息。 -
{TopicName}
:MQTT消息的目標主題名稱。
例如,$delayed/15/x/y
表示15秒后將MQTT消息發布到主題x/y
;$delayed/60/a/b
表示1分鐘后將MQTT消息發布到主題a/b
。
三、使用步驟
-
啟用模塊:首先,需要在EMQX的模塊中啟用
emqx_mod_delayed
模塊。這通常可以在EMQX的管理界面或配置文件中完成。
-
發布延遲消息:在發布消息時,將主題設置為上述的延遲發布主題格式。例如,如果想要在10秒后發布消息到主題
test/topic
,則可以將主題設置為$delayed/10/test/topic
。 -
等待延遲時間:在指定的延遲時間后,消息將被自動發布到目標主題。
四、應用場景
EMQX的延遲發布功能在多種場景下都非常有用,例如:
-
消息調度:在某些應用中,可能需要按照特定的時間間隔發布消息。通過延遲發布功能,可以輕松實現這一需求。
-
負載均衡:在高并發場景下,通過延遲發布功能可以平滑消息發布速率,減輕系統的瞬時負載。
-
消息去重:在訂閱者集群中,通過延遲發布和消息ID的唯一性檢查,可以確保消息只被消費一次,避免重復消費問題。
五、注意事項
-
延遲時間限制:EMQX對延遲時間有一定的限制,最大允許間隔為4294967秒。在實際應用中,需要根據具體需求設置合理的延遲時間。
-
模塊狀態:確保
emqx_mod_delayed
模塊已經啟用,否則延遲發布功能將無法正常工作。 -
消息丟棄:如果延遲時間間隔無法被解析為整型數字,或者由于其他原因導致消息無法被處理,EMQX將丟棄該消息。因此,在發布延遲消息時,需要確保主題格式正確且延遲時間合理。
用戶屬性
什么是用戶屬性
用戶屬性(User Properties)其實是一種自定義屬性,允許用戶向 MQTT 消息添加自己的元數據,傳輸額外的自定義信息以擴充更多應用場景。
它由一個用戶自定義的 UTF-8 的鍵/值對數組組成,并在消息屬性字段中配置,只要不超過最大的消息大小,可以使用無限數量的用戶屬性來向 MQTT 消息添加元數據,并在發布者、MQTT 服務器和訂閱者之間傳遞信息。
如果你熟悉 HTTP 協議的話,該功能與 HTTP 的 Header 的概念非常類似。用戶屬性有效地允許用戶擴展 MQTT 協議,并且可以出現在所有消息和響應中。因為用戶屬性是由用戶定義的,它們只對該用戶的實現有意義。
為什么需要使用用戶屬性
MQTT 3 的協議擴展性能力較差,用戶屬性其實就是為了解決這個問題,它支持在消息中傳遞任何信息,確保了用戶可擴展標準協議的功能。
對于選擇和配置不同的消息類型,用戶屬性可以在客戶端與 MQTT 服務器之間,或者客戶端和客戶端之間發送。在連接客戶端中配置用戶屬性時,只能在 MQTT 服務器上接收,無法在客戶端中接收。如果在發送消息的時候配置用戶屬性,則可以在其它客戶端中接收。常用的有以下兩種用戶屬性配置。
連接客戶端的用戶屬性
當客戶端與 MQTT 服務器發起連接時,服務器可以預先定義好一些需要并且可以使用到的元數據信息,即用戶屬性,當連接成功后,MQTT 服務可以拿到連接發送過來的相關信息進行使用,因此連接客戶端的用戶屬性依賴于 MQTT 服務器。
消息發布的用戶屬性
消息發布時的用戶屬性可能是較為常用的,因為它們可以在客戶端與客戶端之間進行元數據信息傳遞。比如可以在發布時添加一些常見的信息:消息編號,時間戳,文件,客戶端信息和路由信息等屬性。
除上述較為常用的用戶屬性設置外,還可以在訂閱 Topic 時,取消訂閱時,斷開連接時配置用戶屬性。
訂閱
訂閱的組成:
1)主題過濾器:決定了服務端將為我們轉發哪些主題下的消息
2)訂閱選項:允許我們進一步定制服務端的轉發行為
訂閱選項
MQTT 5.0 提供了 4 個訂閱選項,分別是 QoS、No Local、Retain As Published、Retain Handling
QoS
QoS 是最常用的一個訂閱選項,它表示服務端在向訂閱端發送消息時可以使用的最大 QoS 等級。
客戶端可能會在訂閱時指定一個小于 2 的 QoS,因為它的實現不支持 QoS 1 或者 QoS 2。而如果服務端支持的最大 QoS 小于客戶端訂閱時請求的最大 QoS,那么顯然服務端將無法滿足客戶端的要求,這時服務端就會通過訂閱的響應報文(SUBACK)告知訂閱端最終授予的最大 QoS 等級,訂閱端可以自行評估是否接受并繼續通信。
一個簡單的計算公式:
服務端最終授予的最大 QoS = min ( 服務端支持的最大 QoS, 客戶端請求的最大 QoS )
但是,我們在訂閱時請求的最大 QoS,并不能限制發布端發布消息時使用的 QoS。當我們訂閱時請求的最大 QoS,小于消息發布時的 QoS 時,為了盡可能地投遞消息,服務端不會忽略這些消息,而是會在轉發時對這些消息的 QoS 進行降級處理。
同樣,我們也有一個簡單的計算公式:
消息被轉發時的 QoS = min ( 消息原始的 QoS, 服務端最終授予的最大 QoS )
No Local
No Local 只有 0 和 1 兩個可取值,為 1 表示服務端不能將消息轉發給發布這個消息的客戶端,為 0 則相反。
這個選項通常被用在橋接場景中。橋接本質上是兩個 MQTT Server 建立了一個 MQTT 連接,然后相互訂閱一些主題,Server 將客戶端的消息轉發給另一個 Server,而另一個 Server 則可以將消息繼續轉發給它的客戶端。
那么最簡單的一個例子,我們假設兩個 MQTT Server 分別是 Server A 和 Server B,它們分別向對方訂閱了 # 主題。現在,Server A 將一些來自客戶端的消息轉發給了 Server B,而當 Server B 查找匹配的訂閱時,Server A 也會位于其中。如果 Server B 將消息轉發給了 Server A,那么同樣 Server A 在收到消息后又會把它們再次轉發給 Server B,這樣就陷入了無休止的轉發風暴。而如果 Server A 和 Server B 在訂閱 # 主題的同時,將 No Local 選項設置為 1,就可以完美地避免這個問題。
Retain As Published
Retain As Published 同樣只有 0 和 1 兩個可取值,為 1 表示服務端在向此訂閱轉發應用消息時需要保持消息中的 Retain 標識不變,為 0 則表示必須清除。
Retain As Published 與 No Local 一樣,同樣也是主要適用于橋接場景。我們知道當服務端收到一條保留消息時,除了將它存儲起來,還會將它像普通消息一樣轉發給當前已經存在的訂閱者,并且在轉發時會清除消息的 Retain 標識。
這在橋接場景下帶來了一些問題。我們繼續沿用前面的設定,當 Server A 將保留消息轉發給 Server B 時,由于消息中的 Retain 標識已經被清除,Server B 將不會知道這原本是一條保留消息,自然不會再存儲它。這就導致了保留消息無法跨橋接使用。
那么在 MQTT 5.0 中,我們可以讓橋接的服務端在訂閱時將 Retain As Published 選項設置為 1,來解決這個問題。
Retain Handling
Retain Handling 這個訂閱選項被用來向服務端指示當訂閱建立時,是否需要發送保留消息。
我們知道默認情況下,只要訂閱建立,那么服務端中與訂閱匹配的保留消息就會下發。
但某些時候,客戶端可能并不想接收保留消息,比如客戶端在連接時復用了會話,但是客戶端無法確認上一次連接中是否成功創建了訂閱,所以它可能會再次發起訂閱。如果訂閱已經存在,那么可能保留消息已經被消費過了,也可能服務端已經在會話中緩存了一些離線期間到達的消息,這時客戶端可能并不希望服務端發布保留消息。
另外,客戶端也可能在任何時刻都不想收到保留消息,即使是第一次訂閱。比如我們將開關狀態作為保留消息發送,但對某個訂閱端來說,開關事件將觸發一些操作,那么在這種情況下不發送保留消息是很有用的。
這三種不同的行為,我們可以通過 Retain Handling 來選擇。
將 Retain Handling 設置為 0,表示只要訂閱建立,就發送保留消息;
將 Retain Handling 設置為 1,表示只有建立全新的訂閱而不是重復訂閱時,才發送保留消息;
將 Retain Handling 設置為 2,表示訂閱建立時不要發送保留消息。
共享訂閱
在普通的訂閱中,我們每發布一條消息,所有匹配的訂閱端都會收到該消息的副本。當某個訂閱端的消費速度無法跟上消息的生產速度時,我們沒有辦法將其中一部分消息分流到其他訂閱端中來分擔壓力。這使訂閱端容易成為整個消息系統的性能瓶頸。
所以 MQTT 5.0 引入了共享訂閱特性,它使得 MQTT 服務端可以在使用特定訂閱的客戶端之間均衡地分配消息負載。這表示,當我們有兩個客戶端共享一個訂閱時,那么每個匹配該訂閱的消息都只會有一個副本投遞給其中一個客戶端。
共享訂閱不僅為消費端帶來了極佳的水平擴展能力,使我們可以應對更高的吞吐量,還為其帶來了高可用性,即使共享訂閱組中的一個客戶端斷開連接或發生故障,其他客戶端仍然可以繼續處理消息,在必要時還可以接管原先流向該客戶端的消息流。
共享訂閱是 MQTT 5.0 引入的新特性,用于在多個訂閱者之間實現訂閱的負載均衡,MQTT 5.0 規定的共享訂閱主題以 $share 開頭。
下圖中,3 個訂閱者用共享訂閱的方式訂閱了同一個主題 $share/g/topic,其中 topic 是它們訂閱的真實主題名,而 $share/g/ 是共享訂閱前綴(g/ 是群組名,可為任意 UTF-8 編碼字符串)。
使用共享訂閱,我們不需要對客戶端的底層代碼進行任何改動,只需要在訂閱時使用遵循以下命名規范的
$share/{Share Name}/{Topic Filter}
其中 $share 是一個固定的前綴,以便服務端知道這是一個共享訂閱主題。{Topic Filter} 則是我們實際想要訂閱的主題。
中間的 {Share Name} 是一個由客戶端指定的字符串,表示當前共享訂閱使用的共享名。很多時候,{Share Name} 這個字段也會被叫作 Group Name 或者 Group ID,這確實會更容易理解一些。
需要共享同一個訂閱的一組訂閱會話,必須使用相同的共享名。所以 $share/consumer1/sport/# $share/consumer2/sport/# 屬于不同的共享訂閱組。當一個消息同時與多個共享訂閱組使用的過濾器匹配時,服務端會在每個匹配的共享訂閱組中選擇一個會話發送該消息的副本。這在某個主題的消息有多個不同類型的消費者時非常有用。
共享訂閱的負載均衡策略
共享訂閱的核心在于服務端如何在客戶端之間分配消息負載。比較常見的負載均衡策略有以下幾種:
隨機(Random),在共享訂閱組內隨機選擇一個會話發送消息。
輪詢(Round Robin),在共享訂閱組內按順序選擇一個會話發送消息,循環往復。
哈希(Hash),基于某個字段的哈希結果來分配。
粘性(Sticky),在共享訂閱組內隨機選擇一個會話發送消息,此后保持這一選擇,直到該會話結束再重復這一過程。
本地優先(Local),隨機選擇,但優先選擇與消息的發布者處于同一節點的會話,如果不存在這樣的會話,則退化為普通的隨機策略。
隨機 和 輪詢 這兩種策略實現的均衡效果較為接近,所以它們在應用場景上的區別不大,但 隨機 策略實際的均衡效果通常還會受到服務端采用的隨機算法的影響。
在實際應用中,消息之間可能存在關聯,比如屬于同一張圖片的多個分片顯然不適合分發給多個訂閱者。在這種情況下,我們就需要基于Client ID 或者 Topic 的 哈希 策略來選擇會話。這可以保證來自同一個發布端或者主題的消息始終由共享訂閱組中的同一個會話處理。當=然,粘性 策略也有相同的效果。
本地優先 策略比 隨機 策略更合適在集群中使用,優先選擇本地訂閱端的策略可以有效降低消息的延遲。
不過使用這一策略的前提是我們可以確保發布端和訂閱端比較均衡地分布在每個節點上,以免不同訂閱端上的消息負載差別過大。
共享訂閱使用場景
以下是幾個典型的共享訂閱的使用場景:
后端消費能力與消息的生產能力不匹配時,我們可以借助共享訂閱讓更多的客戶端一起分擔負載。
系統需要保證高可用性,特別是在大量消息流入的關鍵業務上,我們可以通過共享訂閱來避免單點故障。
消息的流入量可能會在未來快速增長,需要消費端能夠水平擴展,我們可以通過共享訂閱來提供高擴展性。
共享訂閱使用建議
在共享訂閱組內使用相同的 QoS
MQTT 雖然允許一個共享訂閱組內的會話使用不同的 QoS 等級,但這可能會使消息在投遞給同一個組內的不同會話時存在不同的質量保證。相應地,在出現一些問題的時候,我們的調試也將變得困難重重。所以我們最好在共享訂閱組內使用相同的 QoS。
合理地設置會話過期時間
持久會話與共享訂閱一起使用是非常常見的。但需要注意,即便共享訂閱組中的某個客戶端離線,但只要它的會話與訂閱仍在存在時,MQTT 服務端仍然會向此會話分發消息。考慮到客戶端可能因為故障等原因長時間離線,如果會話的過期時間過長,那么這段時間內將有很多消息因為被投遞給離線客戶端而無法得到處理。
一個更好的選擇可能是,一旦訂閱端離線,即便會話沒有過期,MQTT 服務端在分配消息負載時也不再考慮這個訂閱端。雖然與普通訂閱的行為不同,但這是 MQTT 協議允許的。
排它訂閱
概念:
排它訂閱是MQTT協議中一種特殊的訂閱方式,它允許對主題進行互斥訂閱,即一個主題在同一時刻僅被允許存在一個訂閱者。在當前訂閱者未取消訂閱前,其他訂閱者將無法訂閱該主題。這種訂閱方式確保了消息的獨占性,避免了多個客戶端同時處理同一主題的消息,從而減少了消息處理的沖突和復雜性。
使用:
-
前綴規則:在MQTT中,排它訂閱通常通過使用特定的主題前綴(如
$exclusive/
)來標識。例如,主題$exclusive/t/1
就是一個排它訂閱的主題。 -
配置啟用:排它訂閱功能需要在EMQX等MQTT Broker中進行配置和啟用。用戶可以通過配置文件(如
etc/emqx.conf
)或Dashboard來開啟排它訂閱功能。 -
訂閱操作:一旦排它訂閱功能被啟用,客戶端就可以嘗試訂閱帶有
$exclusive/
前綴的主題。如果某個客戶端已經訂閱了該主題,那么其他客戶端在嘗試訂閱同一主題時將失敗,直到前一個客戶端取消訂閱為止。 -
錯誤處理:當客戶端嘗試訂閱已被其他客戶端訂閱的排它主題時,MQTT Broker會返回一個錯誤碼(如0x97),表示該主題已被訂閱。
自動訂閱
概念:
自動訂閱是MQTT協議中一種便捷的消息訂閱方式,它允許設備在成功連接到MQTT Broker后,自動根據預設的規則訂閱指定的主題。這種方式簡化了設備連接和消息訂閱的流程,提高了系統的自動化程度。
使用:
-
配置規則:在MQTT Broker中,用戶可以通過配置文件、Dashboard或外部數據庫等方式來設置自動訂閱的規則。這些規則包括要訂閱的主題、服務質量(QoS)等選項。在DashBoard中,管理-->MQTT高級屬性-->自動訂閱-->添加
-
連接觸發:當設備成功連接到MQTT Broker時,Broker會根據預設的規則自動為設備訂閱相應的主題。這意味著設備無需手動發送訂閱請求即可開始接收消息。
-
動態調整:在某些高級場景中,MQTT Broker還支持動態調整自動訂閱的規則。例如,可以根據設備的類型、狀態或外部條件來動態添加、刪除或修改訂閱的主題。
-
注意事項:雖然自動訂閱提高了系統的自動化程度,但也需要用戶謹慎配置規則,以避免不必要的消息訂閱和資源浪費。同時,對于敏感或重要的消息主題,建議采用手動訂閱方式進行精確控制。