充電寶項目中的MQTT(輕量高效的物聯網通信協議)

文章目錄

  • 補充:HTTP協議
  • MQTT協議
    • MQTT的核心特性
    • MQTT vs HTTP:關鍵對比
  • EMQX
  • 項目集成EMQX
    • 集成配置
    • 客戶端和回調方法
    • 具體接口和方法處理
    • 處理類

補充:HTTP協議

  • HTTP是一種應用層協議,使用TCP作為傳輸層協議,默認端口是80,基于請求和響應的方式,即客戶端發起請求,服務器響應請求并返回數據(HTML,JSON)。在HTTP/1.1中,使用了長連接技術,允許一個連接復用多個請求和響應,減少了TCP三次握手的消耗。
  • HTTP的基本結構
    • **請求行:**包含請求方法(GET, POST等)、請求URL、協議版本。
    • **請求頭:**包括各種元數據,如Connection、Host、Content-Type等。
    • **空行:**標識頭部與載荷的分界線
    • **請求體:**通常在POST請求中出現,包含請求的具體數據。

  • HTTP的**無狀態性:**HTTP是無狀態協議,每次請求都是獨立的,不會記錄上一次請求的任何信息,如果需要記錄用戶狀態,需要額外機制,如:**Cookies:**瀏覽器在發送請求時,可以攜帶上次訪問時服務器存儲的Cookies(小型文本數據),服務器通過這些Cookies來識別用戶的身份或維持會話狀態。
  • **高開銷:**每次請求都需要建立TCP連接,導致網絡開銷較大,尤其在頻繁請求的場景下。
  • 實時性差:HTTP通常是客戶端主動發起請求,服務器無法主動推送數據。

MQTT協議

  • MQTT(Message Queuing Telemetry Transport)是一種輕量級的發布/訂閱式消息傳輸協議,專為低帶寬、高延遲或不穩定的網絡環境設計。使用TCP協議進行傳輸,端口為1883(非加密)和8883(加密),客戶端通過發布(Publish)消息到某個主題(Topic),而其他訂閱(Subscribe)該主題的客戶端會接收到消息。現已成為物聯網(IoT)領域最流行的通信協議之一。

  • **主題(Topic):**消息的標簽,決定消息的去向,訂閱者根據主題來接收消息。
  • **QoS(Quality of Service)級別:**決定消息傳輸的可靠性。MQTT支持三個級別的QoS:
    • QoS 0:最多一次發送,不保證消息送達。
    • QoS 1:至少一次發送,確保消息至少送達一次。
    • QoS 2:只有一次發送,確保消息只送達一次。
  • **保留標志:**用于確保客戶端在訂閱時能接收到最后一條消息。

MQTT基于客戶端-服務器架構,其中:

  • 發布者(Publisher):發送消息的客戶端
  • 訂閱者(Subscriber):接收消息的客戶端
  • 代理(Broker):接收所有消息并過濾后分發給相關訂閱者的服務器

MQTT的核心特性

  1. 輕量高效:最小化協議開銷,報文頭僅2字節
  2. 發布/訂閱模式:解耦消息生產者和消費者
  3. 三種服務質量(QoS)等級
    • QoS 0:最多一次(可能丟失)
    • QoS 1:至少一次(可能重復)
    • QoS 2:恰好一次(確保可靠)
  4. 持久會話:可恢復中斷的連接
  5. 遺囑消息:客戶端異常斷開時發送預設消息
  6. 主題過濾:支持多級通配符(#和+)

MQTT vs HTTP:關鍵對比

特性MQTTHTTP
通信模式發布/訂閱請求/響應
連接開銷保持長連接(Keep-Alive)通常短連接(可配置Keep-Alive)
消息方向雙向通信客戶端發起請求
協議開銷極小(最小2字節頭)較大(包含大量頭信息)
實時性高(消息即時推送)低(依賴輪詢或WebSocket)
適用場景IoT、實時消息、低帶寬環境Web服務、API交互
消息推送服務器可主動推送傳統HTTP需客戶端輪詢
功耗相對較高
安全性支持TLS加密支持HTTPS加密

EMQX

  • EMQX 是一款大規模可彈性伸縮的云原生分布式物聯網 MQTT 消息服務器。作為全球最具擴展性的 MQTT 消息服務器,EMQX 提供了高效可靠海量物聯網設備連接,能夠高性能實時移動與處理消息和事件流數據,幫助您快速構建關鍵業務的物聯網平臺與應用。

  • EMQX文檔

  • EMQX的docker安裝:開始在linux上安裝1Panel,然后再應用商店中進行一鍵安裝。
    在這里插入圖片描述

  • EMQX特性:

    • 開放源碼:基于 Apache 2.0 許可證完全開源,自 2013 年起 200+ 開源版本迭代。
    • MQTT 5.0:100% 支持 MQTT 5.0 和 3.x 協議標準,更好的伸縮性、安全性和可靠性。
    • 海量連接:單節點支持 500 萬 MQTT 設備連接,集群可擴展至 1 億并發 MQTT 連接。
    • 高性能:單節點支持每秒實時接收、移動、處理與分發數百萬條的 MQTT 消息。
    • 低時延:基于 Erlang/OTP 軟實時的運行時系統設計,消息分發與投遞時延低于 1 毫秒。
    • 高可用:采用 Masterless 的大規模分布式集群架構,實現系統高可用和水平擴展。

  • 根據業務流程圖可以看出,系統與柜機交互是通過MQTT協議進行
    在這里插入圖片描述

項目集成EMQX

集成配置

  1. 引入依賴
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
  1. MqttTest
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttTest {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://ip:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設置回調client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}});// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發布所需參數MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
  1. 配置yaml文件
emqx:client:clientId: xt001username: xxxpassword: xxxserverURI: tcp://ip:1883keepAliveInterval: 10connectionTimeout: 30
  1. Emqx配置對象類(EmqxProperties)
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {private String clientId;private String username;private String password;private String serverURI;private int keepAliveInterval;private int connectionTimeout;
}
  1. Emqx常量(EmqxConstants)
/*** Emqx常量信息**/
public class EmqxConstants {/** 充電寶插入,柜機發布Topic消息, 服務器監聽消息 */public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";/** 用戶掃碼,服務器發布Topic消息 柜機監聽消息  */public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";/** 充電寶彈出,柜機發布Topic消息,服務器監聽消息  */public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";/** 柜機屬性上報,服務器監聽消息  */public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}

客戶端和回調方法

  1. EmqxClientWrapper
import com.share.device.emqx.callback.OnMessageCallback;
import com.share.device.emqx.config.EmqxProperties;
import com.share.device.emqx.constant.EmqxConstants;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class EmqxClientWrapper {@Autowiredprivate EmqxProperties emqxProperties;@Autowiredprivate MqttClient client;@Autowiredprivate OnMessageCallback onMessageCallback;@PostConstructprivate void init() {MqttClientPersistence mqttClientPersistence = new MemoryPersistence();try {//新建客戶端 參數:MQTT服務的地址,客戶端名稱,持久化client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);// 設置回調client.setCallback(onMessageCallback);// 建立連接connect();} catch (MqttException e) {log.info("MqttClient創建失敗");throw new RuntimeException(e);}}public Boolean connect() {// 設置連接的配置try {client.connect(mqttConnectOptions());log.info("連接成功");// 訂閱String[] topics = {EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST};client.subscribe(topics);return true;} catch (MqttException e) {log.info("連接失敗");e.printStackTrace();}return false;}/*創建MQTT配置類*/private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(emqxProperties.getUsername());options.setPassword(emqxProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);//是否自動重新連接options.setCleanSession(true);//是否清除之前的連接信息options.setConnectionTimeout(emqxProperties.getConnectionTimeout());//連接超時時間options.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳return options;}/*** 發布消息* @param topic* @param data*/public void publish(String topic, String data) {try {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(2);client.publish(topic, message);} catch (MqttException e) {log.info("消息發布失敗");e.printStackTrace();}}}
  1. 回調消息處理類 :OnMessageCallback
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.stereotype.Component;@Slf4j
    @Component
    public class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}@Override
    public void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload()));try {// 根據主題選擇不同的處理邏輯MassageHandler massageHandler = messageHandlerFactory.getMassageHandler(topic);if(null != massageHandler) {String content = new String(message.getPayload());massageHandler.handleMessage(JSONObject.parseObject(content));}} catch (Exception e) {e.printStackTrace();log.error("mqtt消息異常:{}", new String(message.getPayload()));}
    }@Override
    public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());
    }
    }
    

具體接口和方法處理

  1. 定義策略接口:MassageHandler
public interface MassageHandler {/*** 策略接口* @param message*/void handleMessage(JSONObject message);
}
  1. 具體Handler處理
import java.lang.annotation.*;
// 自定義注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GuiguEmqx {String topic();
}
  1. 充電寶插入處理類:PowerBankConnectedHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 充電寶彈出處理類:PowerBankUnlockHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PowerBankUnlockHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 屬性上報:PropertyPostHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_PROPERTY_POST)
public class PropertyPostHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}

處理類

  1. MessageHandlerFactory
public interface MessageHandlerFactory {MassageHandler getMassageHandler(String topic);
}
  1. MessageHandlerFactoryImpl
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {private Map<String, MassageHandler> handlerMap = new HashMap<>();/*** 初始化bean對象* @param ioc*/@Overridepublic void setApplicationContext(ApplicationContext ioc) {// 獲取對象Map<String, MassageHandler> beanMap = ioc.getBeansOfType(MassageHandler.class);for (MassageHandler massageHandler : beanMap.values()) {GuiguEmqx guiguEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), GuiguEmqx.class).iterator().next();if (null != guiguEmqx) {String topic = guiguEmqx.topic();// 初始化到maphandlerMap.put(topic, massageHandler);}}}@Overridepublic MassageHandler getMassageHandler(String topic) {return handlerMap.get(topic);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/79270.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/79270.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/79270.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【iOS】UIPageViewController學習

UIPageViewController學習 前言創建一個UIPageViewController最簡單的使用 UIPageViewController的方法說明&#xff1a;效果展示 UIPageViewController的協議方法 前言 筆者最近在寫項目時想實現一個翻書效果&#xff0c;上網學習到了UIPageViewController今天寫本篇博客總結…

Linux搭建環境:從零開始掌握基礎操作(四)

? ? 您好&#xff0c;我是程序員小羊&#xff01; 前言 軟件測試第一步就是搭建測試環境&#xff0c;如何搭建好測試環境&#xff0c;需要具備兩項的基礎知識&#xff1a; 1、Linux 命令: 軟件測試第一個任務, 一般都需要進行環境搭建, 一部分&#xff0c;環境搭建內容是在服…

一天一個java知識點----Tomcat與Servlet

認識BS架構 靜態資源&#xff1a;服務器上存儲的不會改變的數據&#xff0c;通常不會根據用戶的請求而變化。比如&#xff1a;HTML、CSS、JS、圖片、視頻等(負責頁面展示) 動態資源&#xff1a;服務器端根據用戶請求和其他數據動態生成的&#xff0c;內容可能會在每次請求時都…

YOLOV8 OBB 海思3516訓練流程

YOLOV8 OBB 海思3516訓練流程 目錄 1、 下載帶GPU版本的torch(可選) 1 2、 安裝 ultralytics 2 3、 下載pycharm 社區版 2 4、安裝pycharm 3 5、新建pycharm 工程 3 6、 添加conda 環境 4 7、 訓練代碼 5 9、配置Ymal 文件 6 10、修改網絡結構 9 11、運行train.py 開始訓練模…

【深度學習】花書第18章——配分函數

直面配分函數 許多概率模型&#xff08;通常是無向圖模型&#xff09;由一個未歸一化的概率分布 p ~ ( x , θ ) \tilde p(\mathbf x,\theta) p~?(x,θ)定義。我們必須通過除以配分函數 Z ( θ ) Z(\pmb{ \theta}) Z(θ)來歸一化 p ~ \tilde p p~?。以獲得一個有效的概率分…

工作記錄1

日常總結、靈感記錄、學習要點。持續記錄 學海無涯,再好的記性也比不過爛筆頭,記錄一下學習日常、靈感、要點。 前言:最近看見一個博文,很有感觸,是某個大佬自己運營的網站,分享了他的各種經驗文章和自身的一些筆記。本人還沒有他這么屌,所以還是先在CSDN上小試牛刀吧…

Spring Boot(二十一):RedisTemplate的String和Hash類型操作

RedisTemplate和StringRedisTemplate的系列文章詳見&#xff1a; Spring Boot&#xff08;十七&#xff09;&#xff1a;集成和使用Redis Spring Boot&#xff08;十八&#xff09;&#xff1a;RedisTemplate和StringRedisTemplate Spring Boot&#xff08;十九&#xff09;…

智能指針之設計模式1

本文探討一下智能指針和GOF設計模式的關系&#xff0c;如果按照設計模式的背后思想來分析&#xff0c;可以發現圍繞智能指針的設計和實現有設計模式的一些思想體現。當然&#xff0c;它們也不是嚴格意義上面向對象的設計模式&#xff0c;畢竟它們沒有那么分明的類層次體系&…

中間件--ClickHouse-1--基礎介紹(列式存儲,MPP架構,分布式計算,SQL支持,向量化執行,億萬級數據秒級查詢)

1、概述 ClickHouse是一個用于聯機分析(OLAP)的列式數據庫管理系統(DBMS)。它由俄羅斯的互聯網巨頭Yandex為解決其內部數據分析需求而開發&#xff0c;并于2016年開源。專為大規模數據分析&#xff0c;實時數據分析和復雜查詢設計&#xff0c;具有高性能、實時數據和可擴展性等…

Go之Slice和數組:深入理解底層設計與最佳實踐

在Go語言中&#xff0c;數組&#xff08;Array&#xff09;和切片&#xff08;Slice&#xff09;是兩種看似相似卻本質不同的數據結構。本文將深入剖析它們的底層實現機制&#xff0c;并結合實際代碼示例&#xff0c;幫助開發者掌握核心差異和使用場景。 一、基礎概念&#xff…

力扣熱題100——普通數組(不普通)

普通數組但一點不普通&#xff01; 最大子數組和合并區間輪轉數組除自身以外數組的乘積缺失的第一個正數 最大子數組和 這道題是非常經典的適用動態規劃解決題目&#xff0c;但同時這里給出兩種解法 動態規劃、分治法 那么動態規劃方法大家可以在我的另外一篇博客總結中看到&am…

矩陣基礎+矩陣轉置+矩陣乘法+行列式與逆矩陣

GPU渲染過程 矩陣 什么是矩陣&#xff08;Matrix&#xff09; 向量 &#xff08;3&#xff0c;9&#xff0c;88&#xff09; 點乘&#xff1a;計算向量夾角 叉乘&#xff1a;計算兩個向量構成平面的法向量。 矩陣 矩陣有3行&#xff0c;2列&#xff0c;所以表示為M32 獲取固…

MySQL之text字段詳細分類說明

在 MySQL 中&#xff0c;TEXT 是用來存儲大量文本數據的數據類型。TEXT 類型可以存儲非常長的字符串&#xff0c;比 VARCHAR 類型更適合存儲大塊的文本數據。TEXT 數據類型分為以下幾個子類型&#xff0c;每個子類型用于存儲不同大小范圍的文本數據&#xff1a; TINYTEXT: 可以…

超詳細!Android 面試題大匯總與深度解析

一、Java 與 Kotlin 基礎 1. Java 的多態是如何實現的&#xff1f; 多態是指在 Java 中&#xff0c;同一個行為具有多個不同表現形式或形態的能力。它主要通過方法重載&#xff08;Overloading&#xff09;和方法重寫&#xff08;Overriding&#xff09;來實現。 方法重載&a…

如何提高webrtc操作跟手時間,降低延遲

第一次做webrtc項目&#xff0c;操作延遲&#xff0c;一直是個問題&#xff0c;多次調試都不能達到理想效果。偶爾發現提高jitterBuffer時間可以解決此問題。關鍵代碼 const _setJitter (values: number) > { const receives peerConnection.getReceivers();receives.f…

語音合成(TTS)從零搭建一個完整的TTS系統-第一節-效果演示

一、概述 語音合成又叫文字轉語音&#xff08;TTS-text to speech &#xff09;&#xff0c;本專題我們記錄從零搭建一個完整的語音合成系統&#xff0c;包括文本前端、聲學模型和聲碼器&#xff0c;從模型訓練到系統的工程化實現&#xff0c;模型可以部署在手機等嵌入式設備上…

實驗三 I/O地址譯碼

一、實驗目的 掌握I/O地址譯碼電路的工作原理。 二、實驗電路 實驗電路如圖1所示&#xff0c;其中74LS74為D觸發器&#xff0c;可直接使用實驗臺上數字電路實驗區的D觸發器&#xff0c;74LS138為地址譯碼器&#xff0c; Y0&#xff1a;280H&#xff5e;287H&…

Linux 使用Nginx搭建簡易網站模塊

網站需求&#xff1a; 一、基于域名[www.openlab.com](http://www.openlab.com)可以訪問網站內容為 welcome to openlab ? 二、給該公司創建三個子界面分別顯示學生信息&#xff0c;教學資料和繳費網站&#xff0c;基于[www.openlab.com/student](http://www.openlab.com/stud…

MyBatis 如何使用

1. 環境準備 添加依賴&#xff08;Maven&#xff09; 在 pom.xml 中添加 MyBatis 和數據庫驅動依賴&#xff1a; <dependencies><!-- MyBatis 核心庫 --><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId&g…

ArkTS組件的三個通用(通用事件、通用屬性、通用手勢)

文章目錄 通用事件點擊事件 onClick觸摸事件 onTouch掛載、卸載事件拖拽事件按鍵事件 onKeyEvent焦點事件鼠標事件懸浮事件組件區域變化事件 onAreaChange組件尺寸變化事件組件可見區域變化事件組件快捷鍵事件自定義事件分發自定義事件攔截 通用屬性尺寸設置位置設置布局約束邊…