springBoot集成emqx 實現mqtt消息的發送訂閱

介紹

我們可以想象這么一個場景,我們java應用想要采集到電表a的每小時的用電信息,我們怎么拿到電表的數據?一般我們會想 直接 java 后臺發送請求給電表,然后讓電表返回數據就可以了,事實上,我們java應用發送請求請求電表的數據信息并不是發到電表上,而是發送到 服務端 (broker)上,請求服務器 給我們電表的信息,而電表會把數據 按照mqtt協議 源源不斷的發送到服務端,服務端可以把數據存儲到物聯網數據庫上,也可以由我們java應用手動存儲到物聯網數據庫上

而我們怎么知道電表發送到服務端的哪里,java應用又怎么請求到該電表發送的位置?

這就 引出了 一個概念,主題 (topic) ,這個topic在mqtt中不需要手動的創建,只要又客戶端訂閱或者發布消息,主題就會被自動創建出來

而我們服務端用的最多的就是 集成好的emqx服務器,本文我們也用的是集成好的emqx的服務端

,我們先是 一個電表 訂閱好一個固定的主題,然后 源源不斷的往服務端發消息,然后我們java應用訂閱這個主題,這樣 java應用就能持續的拿到電表的數據了

具體什么是主題,主題怎么設置的 ,mqtt協議的具體協議內容,直接登錄emqx官網查看即可

MQTT 最全教程:從入門到精通 | EMQ

而emqx服務器是怎么在linux系統上搭建的呢,具體直接看文檔即可,輸入文檔對應的yum命令就可以直接 在linux服務器上安裝了

在 CentOS/RHEL 上安裝 EMQX | EMQX文檔

文本主要書寫代碼的實現

代碼實現

我們的yml文件如下

?我們后續 java應用訂閱消息 都要到服務端 emqx 的1883端口

實體類如下

?

這里解釋一下 clientid 是不固定的,隨機的每一個發布/訂閱消息的客戶端都有一個唯一的clientid

而username 和password 是 客戶端連接到 服務端的認證賬戶,多個客戶端可以使用一個 賬號密碼

客戶端代碼實現

@Slf4j
@Component
@RequiredArgsConstructor
public class EMQXClient {
private  final MqttDefaultProperties mqttDefaultProperties;
private  final IMessageCallbackImpl mqttCallback;private IMqttClient mqttClient;/*** 初始化客戶端對象*/
public  boolean initMqttClient(String clientId,String serverUrl)  {MemoryPersistence memoryPersistence = new MemoryPersistence();try {if(Objects.isNull(clientId)){clientId= mqttDefaultProperties.getDefaultClientId();}if(Objects.isNull(serverUrl)){serverUrl= mqttDefaultProperties.getServerUrl();}mqttClient = new MqttClient(serverUrl, clientId, memoryPersistence);} catch (MqttException e) {log.info("mqtt創建異常:{}", e.getMessage());return  false;}return true;
}public  boolean initMqttClient()  {MemoryPersistence memoryPersistence = new MemoryPersistence();try {mqttClient = new MqttClient(mqttDefaultProperties.getServerUrl(),mqttDefaultProperties.getDefaultClientId(),  memoryPersistence);} catch (MqttException e) {log.info("mqtt創建異常:{}", e.getMessage());return  false;}return true;}/*** 獲取連接* @return*/public   boolean connect()  {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//當客戶端會話關閉的時候  對應的broker也關閉mqttConnectOptions.setCleanSession(true);//自動重連mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(mqttDefaultProperties.getDefaultUserName());mqttConnectOptions.setPassword(mqttDefaultProperties.getDefaultPassword().toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客戶端連接異常:{}",e.getMessage());return  false;}return true;}public   boolean connect(String username,String password)  {if(Objects.isNull(username)){username=mqttDefaultProperties.getDefaultUserName();}if(Objects.isNull(password)){password= mqttDefaultProperties.getDefaultPassword();}MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//當客戶端會話關閉的時候  對應的broker也關閉mqttConnectOptions.setCleanSession(true);//自動重連mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客戶端連接異常:{}",e.getMessage());return  false;}return true;
}/*** 斷開連接* @return*/public  boolean disConnect(){try {mqttClient.disconnect();} catch (MqttException e) {log.info("客戶端斷開連接異常:{}",e.getMessage());return  false;}
return true;
}/**** @param topic 主題* @param msg 消息內容* @param qosEnum* @param retain  新的訂閱者來了是否能拿到之前的 最新的一次消息* @return*/public  boolean publish(String topic, String msg, QosEnum qosEnum,boolean retain){int uniqueInt = (int) (System.nanoTime() & 0xFFFFFFFFL);//取納秒時間戳低32位MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());mqttMessage.setQos(qosEnum.getType());mqttMessage.setRetained(retain);mqttMessage.setId(uniqueInt);try {mqttClient.publish(topic,mqttMessage);} catch (MqttException e) {log.info("客戶端發送消息失敗:{}",e.getMessage());return  false;}return  true;}/**** @param topicFilters 要訂閱的主題 例子 testtopic/#* @param qosEnum* @return*/public boolean subscribe(String topicFilters,QosEnum qosEnum){try {mqttClient.subscribe(topicFilters,qosEnum.getType());} catch (MqttException e) {log.info("訂閱主題失敗:{}",e.getMessage());return  false;}return true;}
public boolean unSubscribe(String topicFilter){try {mqttClient.unsubscribe( topicFilter);} catch (MqttException e) {log.info("取消訂閱主題失敗:{}",e.getMessage());return false;}return  true;
}}

我們著重關注的是

我們想連接 服務端 是不是得有 一個client ,那這個client就對應IMqttclient

,我們java應用客戶端連接上服務端之后,是不是得訂閱主題,訂閱之后的邏輯在哪里,就在

IMessageCallbackImpl

這里面就是 書寫的 客戶端收到服務端發來的消息之后的處理情況

@Slf4j
@Component
public class IMessageCallbackImpl  implements MessageCallback {@Overridepublic void connectionLost(Throwable cause) {//丟失對服務端的連接后觸發該方法回調,此處可以做一些特殊處理,比如重連 或者記錄 日志之類的log.info("丟失了對broker的連接");}/*** 訂閱到消息后的回調* 該方法由mqtt客戶端同步調用,在此方法未正確返回之前,不會發送ack確認消息到broker* 一旦該方法向外拋出了異常客戶端將異常關閉,當再次連接時;所有QoS1,QoS2且客戶端未進行ack確認的消息都將由broker服務器再次發送到客戶端* @param topic* @param message* @throws Exception*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {log.info("訂閱到了消息;topic={},messageid={},qos={},msg={}",topic,message.getId(),message.getQos(),new String(message.getPayload()));}/*** 消息發布完成且收到ack確認后的回調* QoS0:消息被服務端發出后觸發一次* QoS1:當收到broker的PUBACK消息后觸發* QoS2:當收到broer的PUBCOMP消息后觸發* @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {int messageId = token.getMessageId();String[] topics = token.getTopics();log.info("消息發送完成,messageId={},topics={}",messageId,topics);}
}

?

我們用一個 bean 在初始化的時候就訂閱一個主題,這樣 只要有 客戶端往主題上發消息,我們就能收到了

而我們這個時候 沒有硬件,怎么辦呢,很簡單,直接下載一個mqttx 模擬硬件發送消息到主題,啟動springboot,就能看到消息的發送與接收了

當然 這實現的緊緊是最簡單的協議的發送接收,后面還有許多的高級功能等我們使用,具體的可以查閱官方文檔

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896969.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896969.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896969.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vue Table 表格自適應窗口高度,表頭固定

當表格內縱向內容過多時&#xff0c;可選擇固定表頭。 代碼很簡單&#xff0c;其實就是在table 里面定一個 height 屬性即可。 <template><el-table:data"tableData"height"250"borderstyle"width: 100%"><el-table-columnprop…

多線程-JUC

簡介 juc&#xff0c;java.util.concurrent包的簡稱&#xff0c;java1.5時引入。juc中提供了一系列的工具&#xff0c;可以更好地支持高并發任務 juc中提供的工具 可重入鎖 ReentrantLock 可重入鎖&#xff1a;ReentrantLock&#xff0c;可重入是指當一個線程獲取到鎖之后&…

【每日學點HarmonyOS Next知識】Web Header更新、狀態變量嵌套問題、自定義彈窗、stack圓角、Flex換行問題

【每日學點HarmonyOS Next知識】Web Header更新、狀態變量嵌套問題、自定義彈窗、stack圓角、Flex換行問題 1、HarmonyOS 有關webview Header無法更新的問題&#xff1f; 業務A頁面 打開 webivew B頁面&#xff0c;第一次打開帶了header請求&#xff0c;然后退出webview B頁面…

【ATXServer2】Android無法正確顯示手機屏幕

文章目錄 現象原因分析與解決排查手機內部minicap 解決minicap問題查看移動端Android SDK版本查看minicap支持版本單次方案多次方案 最后問題-如何支持Android SDK 32 現象 原因分析與解決 由于atxserver2在與Android動終端的鏈接過程中使用了agent&#xff1a;atxserver2-and…

【前端跨域】CORS:跨域資源共享的機制與實現

在現代Web開發中&#xff0c;跨域資源共享&#xff08;Cross-Origin Resource Sharing&#xff0c;簡稱CORS&#xff09;是一種非常重要的技術&#xff0c;用于解決瀏覽器跨域請求的限制 CORS允許服務器明確指定哪些外部源可以訪問其資源&#xff0c;從而在保證安全的前提下實…

【設計模式】單例模式|餓漢模式|懶漢模式|指令重排序

目錄 1.什么是單例模式&#xff1f; 2.如何保證單例&#xff1f; 3.兩種寫法 &#xff08;1&#xff09;餓漢模式&#xff08;早創建&#xff09; &#xff08;2&#xff09;懶漢模式&#xff08;緩執行&#xff0c;可能不執行&#xff09; 4.應用場景 &#x1f525;5.多…

RocketMQ順序消費機制

RocketMQ的順序消費機制通過生產端和消費端的協同設計實現&#xff0c;其核心在于局部順序性&#xff0c;即保證同一隊列&#xff08;MessageQueue&#xff09;內的消息嚴格按發送順序消費。以下是詳細機制解析及關鍵源碼實現&#xff1a; 一、順序消費的核心機制 1. 生產端路…

【JavaEE】-- 多線程(初階)4

文章目錄 8.多線程案例8.1 單例模式8.1.1 餓漢模式8.1.2 懶漢模式 8.2 阻塞隊列8.2.1 什么是阻塞隊列8.2.2 生產者消費者模型8.2.3 標準庫中的阻塞隊列8.2.4 阻塞隊列的應用場景8.2.4.1 消息隊列 8.2.5 異步操作8.2.5 自定義實現阻塞隊列8.2.6 阻塞隊列--生產者消費者模型 8.3 …

【C++設計模式】第四篇:建造者模式(Builder)

注意&#xff1a;復現代碼時&#xff0c;確保 VS2022 使用 C17/20 標準以支持現代特性。 分步驟構造復雜對象&#xff0c;實現靈活裝配 1. 模式定義與用途 核心目標&#xff1a;將復雜對象的構建過程分離&#xff0c;使得同樣的構建步驟可以創建不同的表示形式。 常見場景&am…

vuex中的state是響應式的嗎?

在 Vue.js 中&#xff0c;Vuex 的 state 是響應式的。這意味著當你更改 state 中的數據時&#xff0c;依賴于這些數據的 Vue 組件會自動更新。這是通過 Vue 的響應式系統實現的&#xff0c;該系統使用了 ES6 的 Proxy 對象來監聽數據的變化。 當你在 Vuex 中定義了一個 state …

若依框架中的崗位與角色詳解

若依框架中的崗位與角色詳解 一、核心概念與定位 崗位&#xff08;Post&#xff09; 業務職能導向&#xff1a;崗位是用戶在組織架構中的職務標識&#xff08;如“開發人員”“項目經理”&#xff09;&#xff0c;用于描述工作職責而非直接控制權限。崗位與部門關聯&#xff…

SQL經典常用查詢語句

1. 基礎查詢語句 1.1 查詢表中所有數據 在SQL中&#xff0c;查詢表中所有數據是最基本的操作之一。通過使用SELECT * FROM table_name;語句&#xff0c;可以獲取指定表中的所有記錄和列。例如&#xff0c;假設有一個名為employees的表&#xff0c;包含員工的基本信息&#xf…

EP 架構:未來主流方向還是特定場景最優解?

DeepSeek MoE架構采用跨節點專家并行&#xff08;EP&#xff09;架構&#xff0c;在提升推理系統性能方面展現出巨大潛力。這一架構在發展進程中也面臨諸多挑戰&#xff0c;其未來究竟是會成為行業的主流方向&#xff0c;還是僅適用于特定場景&#xff0c;成為特定領域的最優解…

[密碼學實戰]Java實現國密(SM2)密鑰協商詳解:原理、代碼與實踐

一、代碼運行結果 二、國密算法與密鑰協商背景 2.1 什么是國密算法&#xff1f; 國密算法是由中國國家密碼管理局制定的商用密碼標準&#xff0c;包括&#xff1a; SM2&#xff1a;橢圓曲線公鑰密碼算法&#xff08;非對稱加密/簽名/密鑰協商&#xff09;SM3&#xff1a;密碼…

動漫短劇開發公司,短劇小程序搭建快速上線

在當今快節奏的生活里&#xff0c;人們的娛樂方式愈發多元&#xff0c;而動漫短劇作為新興娛樂形式&#xff0c;正以獨特魅力迅速崛起&#xff0c;成為娛樂市場的耀眼新星。近年來&#xff0c;動漫短劇市場呈爆發式增長&#xff0c;吸引眾多創作者與觀眾目光。 從市場規模來看…

第四十五:創建一個vue 的程序

html <div id"app">{{ msg }}<h2>{{ web.title }}</h2><h3>{{ web.url }}</h3> </div> js /*<div id"app"></div> 指定一個 id 為 app 的 div 元素{{ }} 插值表達式, 可以將 Vue 實例中定義的數據在視圖…

docer swarm集群部署springboot項目

1.準備兩臺服務器&#xff0c;安裝好docker、docker-compose 因為用到了docker倉庫&#xff0c;安裝harbor,可以從github下載離線安裝包 2. 我這邊用到了gitlab-ci,整體流程也都差不多 1&#xff09;打包mvn clean install 2&#xff09;打鏡像 docker-compose -f docker-compo…

Python測試框架Pytest的參數化

上篇博文介紹過&#xff0c;Pytest是目前比較成熟功能齊全的測試框架&#xff0c;使用率肯定也不斷攀升。 在實際工作中&#xff0c;許多測試用例都是類似的重復&#xff0c;一個個寫最后代碼會顯得很冗余。這里&#xff0c;我們來了解一下pytest.mark.parametrize裝飾器&…

開發博客系統

前言 準備工作 數據庫表分為實體表和關系表 第一&#xff0c;建數據庫表 然后導入前端頁面 創建公共模塊 就是統一返回值&#xff0c;異常那些東西 自己造一個自定義異常 普通類 mapper 獲取全部博客 我們只需要返回id&#xff0c;title&#xff0c;content&#xff0c;us…

【Spring Boot 應用開發】-05 命令行參數

Spring Boot 常用命令行參數 Spring Boot 支持多種命令行參數&#xff0c;這些參數可以在啟動應用時通過命令行直接傳遞。以下是一些常用的命令行參數及其詳細說明&#xff1a; 1. 基本配置參數 --server.port端口號 指定應用程序運行的HTTP端口&#xff0c;默認為8080。 jav…