Java連接Emqx實現訂閱發布消息

一:前提

? ? ? ? 安裝了Emqx開源版、MQTTX客戶端

二:訂閱發布實現步驟

1.引入依賴?

<!--MQTT客戶端-->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

2.編輯配置文件

mqtt:broker:uri: tcp://127.0.0.1:31883client:id: mqtt-am-client-${random.uuid}# 訂閱主題配置(支持多個)inTopics:- topic: test/topic1qos: 0- topic: test/topic2qos: 1- topic: test/topic3qos: 2# 發布主題配置(支持多個)outTopics:- topic: out/topic1qos: 0username: ampassword: LGyPtuAB4th5pkeepAliveInterval: 60

3.讀取配置文件

package com.wtzn.web.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {private Broker broker;private Client client;private List<TopicConfig> inTopics;private List<TopicConfig> outTopics;private String userName;private String password;private int KeepAliveInterval;@Datapublic static class Broker {private String uri;}@Datapublic static class Client {private String id;}@Datapublic static class TopicConfig {private String topic;private int qos;}}

4.創建Mqtt客戶端

package com.wtzn.web.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();// 此客戶端的用戶名和密碼options.setUserName(mqttProperties.getUserName());options.setPassword(mqttProperties.getPassword().toCharArray());options.setCleanSession(true);// 設置遺囑消息//  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下線,這是我的遺囑".getBytes(), 2, true);// 連接超時重試options.setConnectionTimeout(5000); //毫秒options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());options.setAutomaticReconnect(true);//網絡中斷重連client.connect(options);return client;}
}

5.controller層

package com.wtzn.web.controller;import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.LinkedList;@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@SaIgnore@PostMapping("/mqtt")public void publish() {try {//  LinkedList<Payload> payloadLinkedList=new LinkedList<>();for(int i=1; i<=10000; i++){Payload payload=new Payload();payload.setTemperature(i);//  payloadLinkedList.add(payload);mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));}} catch (MqttException e) {log.error("發布消息失敗{}", e.getMessage());}log.info("發布消息成功");}}

6.service層

package com.wtzn.web.service;import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Arrays;@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {@Autowiredprivate MqttClient mqttClient;@Autowiredprivate MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {mqttClient.setCallback(this);/*       mqttClient.subscribe(mqttProperties.getInTopic());log.info("訂閱主題{}", mqttProperties.getInTopic());
*/mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("訂閱主題{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}@PreDestroypublic void destroy() throws MqttException {mqttClient.disconnect();log.info("與服務器斷開連接");}/*** @description: 發送消息* @param: [message]* @return: void**/public void publish(String topic,int qos,String message) throws MqttException {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttClient.publish(topic, mqttMessage);log.info("向主題【{}】發布消息:【{}】", topic, message);}/*** @description: 接收消息* @param: [topic, message]* @return: void**/@Overridepublic void messageArrived(String topic, MqttMessage message) throws MqttException {Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);log.info("接收到來自【{}】的消息【{}】", topic, payload.getTemperature());/*  if (payload.getTemperature() > 37) {publish("發燒");}*/}@Overridepublic void connectionLost(Throwable cause) {log.error("連接丟失:{}", cause.getMessage());}@SneakyThrows@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {if( token!=null ){MqttMessage message = null;try {message = token.getMessage();} catch (MqttException e) {throw new RuntimeException(e);}String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();String str = message==null ? null : new String(message.getPayload());log.info("deliveryComplete: topic={}, message={}", topic, str);} else {log.info("deliveryComplete: null");}log.info("消息已送達");}@Overridepublic void connectComplete(boolean b, String s) {mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("訂閱主題{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}
}

7.dao層

package com.wtzn.web.domain.bo;import lombok.Data;@Data
public class Payload {private Integer temperature;
}

三:測試

1.PostMan直接調用測試

2、下載MQTTX客戶端進行測試

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913815.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913815.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913815.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ReactNative【實戰系列教程】我的小紅書 7 -- 消息(含彈窗菜單,右上角角標,空白頁等)

最終效果彈窗菜單 點擊右上角群聊按鈕后&#xff0c;彈窗菜單無消息代碼實現app/(tabs)/message.tsx import icon_no_collection from "/assets/icons/icon_no_collection.webp"; import FloatMenu, {FloatMenuRef, } from "/modules/message/components/FloatM…

Jenkins詳細教程 - 從入門到精通

目錄 1. 什么是Jenkins 1.1 簡單理解 1.2 技術定義 1.3 核心特點 2. 為什么需要Jenkins 2.1 傳統開發的痛點 手工發布的問題 真實場景舉例 2.2 Jenkins的解決方案 自動化CI/CD流程 3. 核心概念解析 3.1 Job(任務) Job示例 3.2 Build(構建) 3.3 Pipeline(流水…

bash 判斷 /opt/wslibs-cuda11.8 是否為軟連接, 如果是,獲取連接目的目錄并自動創建

以下是實現該功能的 Bash 腳本&#xff1a; bash #!/bin/bash LINK_PATH“/opt/wslibs-cuda11.8” 檢查是否為軟鏈接 if [ -L "KaTeX parse error: Expected EOF, got # at position 24: …H" ]; then#? 獲取軟鏈接的絕對目標路徑…(readlink -f “$LINK_PATH”) # …

【性能測試】jmeter+Linux環境部署和分布式壓測,一篇打通...

目錄&#xff1a;導讀 前言一、Python編程入門到精通二、接口自動化項目實戰三、Web自動化項目實戰四、App自動化項目實戰五、一線大廠簡歷六、測試開發DevOps體系七、常用自動化測試工具八、JMeter性能測試九、總結&#xff08;尾部小驚喜&#xff09; 前言 1、linux獲取動態…

Java 17 新特性筆記

Java 17 是一個 長期支持版本&#xff08;LTS&#xff09;&#xff0c;于 2021 年 9 月發布&#xff0c;是繼 Java 11 之后的重要里程碑。它整合了 Java 12~16 的眾多特性&#xff0c;并引入新的語言增強、JDK API 改進、性能優化和安全增強。 Java 17 版本信息 發布時間&…

WWDC 25 風云再起:SwiftUI 7 Charts 心法從 2D 到 3D 的華麗蛻變

概述 在 iOS 開發這個波譎云詭的江湖中&#xff0c;SwiftUI 可謂是一位后起之秀&#xff0c;以其簡潔明快的招式迅速在 UI 框架領域中嶄露頭角。 而其中的 Charts 框架&#xff0c;更是如同江湖中的 “數據可視化寶典”那樣&#xff0c;讓各位禿頭少俠們能夠輕松將復雜的數據轉…

Vue+Element Plus 中按回車刷新頁面問題排查與解決

VueElement Plus 中按回車刷新頁面問題排查與解決原因分析解決方案方法一&#xff1a;阻止默認行為 submit.prevent方法二&#xff1a;只監聽回車并觸發搜索最終推薦寫法如下&#xff1a;在使用 Vue 3 Element Plus 開發后臺系統時&#xff0c;我們常常會通過 搭配 實現搜索功…

x86匯編語言入門基礎(三)匯編指令篇3 位移運算

位移運算指令&#xff1a;SHL邏輯移位&#xff0c;SAR算術移位&#xff0c; ROR循環右移 1. SHL 邏輯移位 Shift Left, SHL代表向左移位&#xff0c;SHR代表向右移位 指令格式&#xff1a;shl op1, op2 目的操作數 op1&#xff1a;寄存器/內存地址源操作數 op2&#xff1a;寄…

Java-69 深入淺出 RPC 單體架構 垂直架構 分布式架構 微服務架構

點一下關注吧&#xff01;&#xff01;&#xff01;非常感謝&#xff01;&#xff01;持續更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持續更新中&#xff01;&#xff08;長期更新&#xff09; AI煉丹日志-29 - 字節跳動 DeerFlow 深度研究框斜體樣式架 私有…

Android 如何阻止應用自升級

問題背景 1.打開PlayStore,然后登陸賬戶 2.退出應用過幾分鐘后,應用會自動更新到新版本 3.再次打開應用,問題即可復現 一聯網進入playStore應用并且登錄谷歌賬號,退出幾分鐘,在進入,發現應用版本號更新了,應用進行了自我升級,關鍵是升級之后谷歌商店就用不了了,就…

Docker-構建鏡像并實現LNMP架構

一、搭建LNMP基礎配置1、制作Nginx鏡像制作dockerfilevim dockerfileFROM centos:7 RUN rm -rf /etc/yum.repos.d/* RUN curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo RUN yum clean all RUN yum makecache RUN yum -y install z…

Python之--基本知識

基本輸出語法結構: print(輸出內容)print()函數完整的語法格出: print (value,...,sep,end\n,fileNone)只有字符串可以用連接基本輸入語法結構: xinput(提示文字’)注意事項: 無論輸入的數據是什么 x 的數據類型都是字符串類型示例&#xff1a;name input("Enter your na…

VS CodeC51 單片機開發環境搭建

文章目錄前言1.安裝插件2.創建EIDE項目&#xff08;51單片機&#xff09;3.配置工具鏈&#xff08;第一次使用需要配置&#xff09;4.編譯與下載5.項目文件簡介與串口調試工具6.推薦插件7.打包模板與導出模板8.51單片機串口無法識別問題前言 需要安裝keil c51版本需要配置好C/…

國密算法(SM2/SM3/SM4)

文章目錄國密算法&#xff08;SM2/SM3/SM4&#xff09;詳解&#xff1a;從性能對比到Java手機號安全處理實戰一、 國密核心算法簡介二、 性能深度對比三、 Java實戰&#xff1a;手機號的安全處理方案一&#xff1a;使用SM3哈希存儲&#xff08;推薦用于驗證場景&#xff09;方案…

從前端轉go開發的學習路線

從前端開發轉向 Go&#xff08;Golang&#xff09;后端開發&#xff0c;是一個非常可行也很實用的方向&#xff0c;特別是在做 高性能微服務、分布式系統、云原生&#xff08;如Kubernetes&#xff09; 等方面。以下是一份適合你&#xff08;有多年開發經驗的前端開發者&#x…

node或瀏覽器上傳文件到阿里云OSS

阿里云配置 進入阿里云OSS Bucket 列表的某個 Bucket 倉庫下&#xff0c;點擊訪問控制 RAM 創建用戶 勾上 創建 AccessKey ID 和 AccessKey Secret 復制 AccessKey 信息 用文檔保存 創建角色 選擇云賬號 復制 ARN 用文檔保存&#xff0c;然后 新增權限 搜索 oss 選擇 AliyunOSS…

26考研物理復試面試常見問答問題匯總,物理專業保研推免夏令營面試問題匯總,物理本科知識專業面試最全攻略!

還在為物理考研復試面試發愁&#xff1f;還在為物理招聘的專業面試抓狂&#xff1f;還在為即將到來的物理夏令營面試不知從何下手、翻遍了厚厚的教材卻抓不住重點&#xff1f;別慌&#xff0c;接下來我會從「考研的物理復試經歷」「物理面試攻略」「物理面試基礎問答題匯總很全…

(5)機器學習小白入門 YOLOv:數據需求與圖像不足應對策略

(1)機器學習小白入門YOLOv &#xff1a;從概念到實踐 (2)機器學習小白入門 YOLOv&#xff1a;從模塊優化到工程部署 (3)機器學習小白入門 YOLOv&#xff1a; 解鎖圖片分類新技能 (4)機器學習小白入門YOLOv &#xff1a;圖片標注實操手冊 (5)機器學習小白入門 YOLOv&#xff1a;…

百年制造名企,三菱重工引領“智”造新范式

日前&#xff0c;由深圳軟件協會指導、法大大和信息俠聯合出品的《制造行業合同數智化升級白皮書》&#xff08;以下簡稱“白皮書”&#xff09;正式發布&#xff0c;并首次提出 “電子簽法律AI” 雙輪驅動模型。在制造行業面臨供應鏈協同、合規風控及全球化出海等多重挑戰的當…

【學習筆記】計算機操作系統(七)—— 文件管理

第七章 文件管理 文章目錄第七章 文件管理7.1 文件和文件系統7.1.1 數據項、記錄和文件7.1.2 文件名和類型7.1.3 文件系統的層次結構7.1.4 文件操作7.2 文件的邏輯結構7.2.1 文件邏輯結構的類型7.2.2 順序文件(Sequential File)7.2.3 記錄尋址7.2.4 索引文件(Index File)7.2.5 …