MQTT:Java集成MQTT

目錄

  • Git項目路徑
  • 一、原生java架構
    • 1.1 導入POM文件
    • 1.2 編寫測試用例
  • 二、SpringBoot集成MQTT
    • 2.1 導入POM文件
    • 2.2 在YML文件中增加配置
    • 2.3 新建Properties配置文件映射配置
    • 2.4 創建連接工廠
    • 2.5 增加入站規則配置
    • 2.6 增加出站規則配置
    • 2.7 創建消息發送網關
    • 2.8 測試消息發送
    • 2.9 項目結構


Git項目路徑

一、原生java架構

1.1 導入POM文件

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

1.2 編寫測試用例

package com.ming;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;/*** 使用java原生方法連接MQTT*/
public class MqttPahoTest {private final String serverURI = "tcp://localhost:1883";private final String clientId = "emqx_spring_client_132";/*** 建立連接* @throws MqttException*/@Testpublic MqttClient createConnection() throws MqttException {// 創建MQTT對象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 發送建立連接的請求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.connect(mqttConnectOptions);return client;}/*** 發送消息* @throws MqttException*/@Testpublic void sendMsg() throws MqttException {// 創建對象MqttClient client = createConnection();// 發送消息MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(2);mqttMessage.setPayload("Hello World".getBytes());client.publish("java/a", mqttMessage);// 關閉連接client.disconnect();client.close();}/*** 接收消息* @throws MqttException*/@Testpublic void receiveMsg() throws MqttException {// 創建MQTT對象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 發送建立連接的請求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {  // 當連接丟失時的回調System.out.println("Connection lost...");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {  // 消息接收回調System.out.println(String.format("%s ---> %s", topic, new String(mqttMessage.getPayload())));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  // 消息傳輸完畢System.out.println("Delivery complete");}});client.connect(mqttConnectOptions);// 訂閱主題client.subscribe("java/b", 2);while (true);}
}

二、SpringBoot集成MQTT

2.1 導入POM文件

<!-- spring boot 項目集成消息中間件基礎依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- spring boot 項目和MQTT客戶端集成依賴 -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

2.2 在YML文件中增加配置

spring:mqtt:username: adminpassword: adminurl: tcp://localhost:1883subClientId: sub_client_id_123subTopic: atguigu/iot/lamp/line1,atguigu/iot/lamp/line2pubClientId: pub_client_id_123

2.3 新建Properties配置文件映射配置

package com.ming.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String subTopic;private String pubClientId;
}

2.4 創建連接工廠

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;/*** 創建MQTT的配置類 配置連接工廠*/
@Configuration
public class MqttConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() {DefaultMqttPahoClientFactory mqttPahoClientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});mqttPahoClientFactory.setConnectionOptions(options);return mqttPahoClientFactory;}
}

2.5 增加入站規則配置

package com.ming.config;import com.ming.handler.ReceiverMessageHandler;
import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT入站規則配置類(接收消息)*/
@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;/*** 消息通道* @return*/@Beanpublic MessageChannel messageInboundChannel() {return new DirectChannel();}/*** 配置入站適配器,作用:設置訂閱主題,以及指定消息的相關屬性* @return*/@Beanpublic MessageProducer messageProducer() {MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getSubClientId(),mqttPahoClientFactory,mqttConfigurationProperties.getSubTopic().split(","));mqttPahoMessageDrivenChannelAdapter.setQos(1);mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());return mqttPahoMessageDrivenChannelAdapter;}/*** 消息處理器* @return*/@Bean@ServiceActivator(inputChannel = "messageInboundChannel")public MessageHandler messageHandler() {return receiverMessageHandler;}
}
package com.ming.handler;import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;/*** 當訂閱的主題有消息時就會觸發此處回調*/
@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {Object payload = message.getPayload();  // 獲取消息的內容System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  // 主題名稱System.out.println(payload);  // 消息主體}
}

2.6 增加出站規則配置

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT出站規則配置類(發送消息)*/
@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息通道* @return*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** 配置出站適配器* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutboundMessageHandler() {MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getPubClientId(),mqttPahoClientFactory);mqttPahoMessageHandler.setDefaultQos(0);mqttPahoMessageHandler.setDefaultTopic("default");mqttPahoMessageHandler.setAsync(true);return mqttPahoMessageHandler;}
}

2.7 創建消息發送網關

package com.ming.getway;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** MQTT發送消息的網關*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGetWay {public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos,String payload);
}
package com.ming.service;import com.ming.getway.MqttGetWay;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MqttMessageSender {@Autowiredprivate MqttGetWay mqttGetWay;public void sendMsg(String topic, String message) {mqttGetWay.sendMsgToMqtt(topic, message);}public void sendMsg(String topic, int qos, String message) {mqttGetWay.sendMsgToMqtt(topic, qos, message);}
}

2.8 測試消息發送

package com.ming;import com.ming.service.MqttMessageSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest(classes = SpringMqttDemoApplication.class)
public class MqttMessageSenderTest {@Autowiredprivate MqttMessageSender mqttMessageSender;@Testpublic void sendToMsg(){mqttMessageSender.sendMsg("java/c","hello mqtt spring boot ...");}
}

2.9 項目結構

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93168.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93168.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93168.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

day 16 stm32 IIC

1.IIC概述1基于對話的形式完成&#xff0c;不需要同時進行發送和接收所以刪掉了一根數據線&#xff0c;變成半雙工2為了安全起見添加了應答機制3可以接多個模塊&#xff0c;且互不干擾4異步時序&#xff0c;要求嚴格&#xff0c;發送過程中不能暫停&#xff0c;所以需要同步時序…

AMD KFD的BO設計分析系列 0:開篇

開啟我始終不敢碰的GPU存儲系列&#xff0c;先上個圖把核心關系表達下&#xff0c;以此紀念。注&#xff1a;圖中kfdm_mm誤寫&#xff0c;應該為kfd_mm&#xff0c;不修改了&#xff0c;請大家不要介意。

EUDR的核心內容,EUDR認證的好處,EUDR意義

近年來&#xff0c;全球森林退化問題日益嚴峻&#xff0c;毀林行為不僅加劇氣候變化&#xff0c;還威脅生物多樣性和原住民權益。為應對這一挑戰&#xff0c;歐盟于2023年6月正式實施《歐盟零毀林法案》&#xff08;EU Deforestation-free Regulation, EUDR&#xff09;&#x…

數據分析專欄記錄之 -基礎數學與統計知識

數據分析專欄記錄之 -基礎數學與統計知識&#xff1a; 1、描述性統計 均值 data_set [10, 20, 30, 40, 50] mean sum(data_set)/len(data_set)np 里面的函數&#xff0c;對二維進行操作時&#xff0c; 默認每一列 mean1 np.mean(data_set) print(mean, mean1)s 0 for i…

《星辰建造師:C++多重繼承的奇幻史詩》

&#x1f30c;&#x1f525; 《星辰建造師&#xff1a;多重繼承與this指針的終極史詩》 &#x1f525;&#x1f30c;—— 一場融合魔法、科技與哲學的C奇幻冒險&#x1f320;&#x1f30c; 序章&#xff1a;代碼宇宙的誕生 &#x1f30c;&#x1f320;在無盡的代碼維度中&#…

云計算-OpenStack 運維開發實戰:從 Restful API 到 Python SDK 全場景實現鏡像上傳、用戶創建、云主機部署全流程

一、python-Restful Api 簡介 Restful API 是一種軟件架構風格,基于 HTTP 協議設計,通過統一的接口(如 URL 路徑)和標準的 HTTP 方法(GET/POST/PUT/DELETE 等)實現資源(如數據、文件等)的操作,具有無狀態、可緩存、客戶端 - 服務器分離等特點。方法如下 用 GET 請求獲…

RxJava 在 Android 中的深入解析:使用、原理與最佳實踐

前言RxJava 是一個基于觀察者模式的響應式編程庫&#xff0c;它通過可觀察序列和函數式操作符的組合&#xff0c;簡化了異步和事件驅動程序的開發。在 Android 開發中&#xff0c;RxJava 因其強大的異步處理能力和簡潔的代碼風格而廣受歡迎。本文將深入探討 RxJava 的使用、核心…

面試實戰 問題三十 HTTP協議中TCP三次握手與四次揮手詳解

HTTP協議中TCP三次握手與四次揮手詳解 在HTTP協議中&#xff0c;連接建立和斷開依賴于底層的TCP協議。雖然HTTP本身不定義握手過程&#xff0c;但所有HTTP通信都通過TCP三次握手建立連接&#xff0c;通過四次揮手斷開連接。以下是詳細解析&#xff1a;一、TCP三次握手&#xff…

讀《精益數據分析》:雙邊市場的核心指標分析

雙邊市場數據分析指南&#xff1a;從指標體系到實戰落地&#xff08;基于《精益數據分析》框架&#xff09;在互聯網平臺經濟中&#xff0c;雙邊市場&#xff08;如電商、出行、外賣、自由職業平臺等&#xff09;的核心矛盾始終是"供需平衡與效率優化"。這類平臺連接…

Queue參考代碼

queue.c #include "queue.h" #include "stdlib.h" // 初始化循環隊列 void initializeCircularQueue(CircularQueue *cq, uint8_t *buffer, uint32_t size) {cq->front 0;cq->rear 0;cq->count 0;cq->size size;cq->data buffer; }…

通過時間計算地固系到慣性系旋轉矩陣

通過時間計算地固系到慣性系旋轉矩陣 1. 引言 在航天工程和衛星導航領域&#xff0c;經常需要在地固坐標系(ECEF)和慣性坐標系(ECI)之間進行轉換。本文將詳細介紹如何根據UTC時間計算這兩個坐標系之間的旋轉矩陣&#xff0c;并提供完整的C語言實現。 2. 基本概念 2.1 坐標系定義…

【Datawhale AI 夏令營】金融文檔分析檢索增強生成系統的架構演變與方法論進展

# **金融文檔分析檢索增強生成系統的架構演變與方法論進展****第一部分&#xff1a;基礎原則和基線系統分析****第一部分&#xff1a;金融領域檢索增強生成范式的解構****第二部分&#xff1a;基線剖析&#xff1a;流水線的二分法****同步軌跡 (SimpleRAG)****異步改進 (AsyncS…

C語言相關簡單數據結構:順序表

目錄 1.順序表的概念及結構 1.1 線性表 如何理解邏輯結構和物理結構&#xff1f; 1.2 順序表分類 順序表和數組的區別&#xff1a; 順序表分類&#xff1a; 靜態順序表 動態順序表 1.3 動態順序表的實現 初始化 尾插 頭插 尾刪 頭刪 在指定位置之前插入數據 刪…

nginx配置代理服務器

Nginx 作為代理服務器時&#xff0c;主要用于反向代理&#xff08;最常用&#xff0c;轉發客戶端請求到后端服務&#xff09;或正向代理&#xff08;較少用&#xff0c;為客戶端提供訪問外部網絡的代理&#xff09;。以下是兩種場景的具體配置示例&#xff1a; 一、反向代理配置…

MySQL數據庫知識體系總結 20250813

一、數據庫的原理 1.數據庫的分類 我們可以根據數據的結構類型&#xff0c;將數據分成三類&#xff0c;分別是&#xff1a;結構化數據&#xff0c;半結構化數據&#xff0c;非結構化數據。 要點&#xff1a;對于結構化數據來講通常是先有結構再有數據。要點&#xff1a;對于半…

C++ 中構造函數參數對父對象的影響:父子控件管理機制解析

文章目錄C 中構造函數參數對父對象的影響&#xff1a;父子控件管理機制解析1. Qt 中的父對象管理機制2. 構造函數傳遞父對象的不同方式2.1. 父控件是 QWidget parent&#xff08;通用方式&#xff09;分析&#xff1a;2.2. 父控件是 Books_Client parent&#xff08;限制父控件…

直播美顏SDK開發實戰:高性能人臉美型的架構與實現

在直播行業里&#xff0c;美顏已經不再是錦上添花&#xff0c;而是標配中的標配。無論是游戲主播、帶貨達人&#xff0c;還是唱歌、跳舞的才藝主播&#xff0c;直播美顏SDK往往決定了用戶的第一印象和停留時長。尤其是高性能人臉美型技術&#xff0c;不僅能讓主播的五官更加自然…

JavaWeb(蒼穹外賣)--學習筆記18(Apache POI)

前言 本篇文章是學習B站黑馬程序員蒼穹外賣的學習筆記&#x1f4d1;。我的學習路線是Java基礎語法-JavaWeb-做項目&#xff0c;管理端的功能學習完之后&#xff0c;就進入到了用戶端微信小程序的開發&#xff0c;用戶端開發的流程大致為用戶登錄—商品瀏覽&#xff08;其中涉及…

OpenJDK 17 源碼 安全點輪詢的信號處理流程

OpenJDK 17 源碼&#xff0c;安全點輪詢的信號處理流程如下&#xff08;重點分析安全點輪詢相關部分&#xff09;&#xff1a;核心信號處理流程信號觸發&#xff1a;當線程訪問安全點輪詢內存頁時&#xff08;SafepointMechanism::is_poll_address&#xff09;&#xff0c;會觸…

InfluxDB 在工業控制系統中的數據監控案例(一)

工業控制系統數據監控的重要性**在工業領域&#xff0c;生產過程的復雜性和連續性使得數據監控成為保障生產穩定運行的關鍵環節。通過實時收集、處理和分析生產數據&#xff0c;企業能夠及時掌握設備運行狀態、產品質量信息以及生產流程的各項參數&#xff0c;從而為生產決策提…