文章目錄
- 補充:HTTP協議
- MQTT協議
- MQTT的核心特性
- MQTT vs HTTP:關鍵對比
- EMQX
- 項目集成EMQX
- 集成配置
- 客戶端和回調方法
- 具體接口和方法處理
- 處理類
補充:HTTP協議
- HTTP是一種應用層協議,使用TCP作為傳輸層協議,默認端口是80,基于請求和響應的方式,即客戶端發起請求,服務器響應請求并返回數據(HTML,JSON)。在HTTP/1.1中,使用了長連接技術,允許一個連接復用多個請求和響應,減少了TCP三次握手的消耗。
- HTTP的基本結構
- **請求行:**包含請求方法(GET, POST等)、請求URL、協議版本。
- **請求頭:**包括各種元數據,如Connection、Host、Content-Type等。
- **空行:**標識頭部與載荷的分界線
- **請求體:**通常在POST請求中出現,包含請求的具體數據。
- HTTP的**無狀態性:**HTTP是無狀態協議,每次請求都是獨立的,不會記錄上一次請求的任何信息,如果需要記錄用戶狀態,需要額外機制,如:**Cookies:**瀏覽器在發送請求時,可以攜帶上次訪問時服務器存儲的Cookies(小型文本數據),服務器通過這些Cookies來識別用戶的身份或維持會話狀態。
- **高開銷:**每次請求都需要建立TCP連接,導致網絡開銷較大,尤其在頻繁請求的場景下。
- 實時性差:HTTP通常是客戶端主動發起請求,服務器無法主動推送數據。
MQTT協議
- MQTT(Message Queuing Telemetry Transport)是一種輕量級的發布/訂閱式消息傳輸協議,專為低帶寬、高延遲或不穩定的網絡環境設計。使用TCP協議進行傳輸,端口為1883(非加密)和8883(加密),客戶端通過發布(Publish)消息到某個主題(Topic),而其他訂閱(Subscribe)該主題的客戶端會接收到消息。現已成為物聯網(IoT)領域最流行的通信協議之一。
- **主題(Topic):**消息的標簽,決定消息的去向,訂閱者根據主題來接收消息。
- **QoS(Quality of Service)級別:**決定消息傳輸的可靠性。MQTT支持三個級別的QoS:
- QoS 0:最多一次發送,不保證消息送達。
- QoS 1:至少一次發送,確保消息至少送達一次。
- QoS 2:只有一次發送,確保消息只送達一次。
- **保留標志:**用于確保客戶端在訂閱時能接收到最后一條消息。
MQTT基于客戶端-服務器架構,其中:
- 發布者(Publisher):發送消息的客戶端
- 訂閱者(Subscriber):接收消息的客戶端
- 代理(Broker):接收所有消息并過濾后分發給相關訂閱者的服務器
MQTT的核心特性
- 輕量高效:最小化協議開銷,報文頭僅2字節
- 發布/訂閱模式:解耦消息生產者和消費者
- 三種服務質量(QoS)等級:
- QoS 0:最多一次(可能丟失)
- QoS 1:至少一次(可能重復)
- QoS 2:恰好一次(確保可靠)
- 持久會話:可恢復中斷的連接
- 遺囑消息:客戶端異常斷開時發送預設消息
- 主題過濾:支持多級通配符(#和+)
MQTT vs HTTP:關鍵對比
特性 | MQTT | HTTP |
---|---|---|
通信模式 | 發布/訂閱 | 請求/響應 |
連接開銷 | 保持長連接(Keep-Alive) | 通常短連接(可配置Keep-Alive) |
消息方向 | 雙向通信 | 客戶端發起請求 |
協議開銷 | 極小(最小2字節頭) | 較大(包含大量頭信息) |
實時性 | 高(消息即時推送) | 低(依賴輪詢或WebSocket) |
適用場景 | IoT、實時消息、低帶寬環境 | Web服務、API交互 |
消息推送 | 服務器可主動推送 | 傳統HTTP需客戶端輪詢 |
功耗 | 低 | 相對較高 |
安全性 | 支持TLS加密 | 支持HTTPS加密 |
EMQX
-
EMQX 是一款大規模可彈性伸縮的云原生分布式物聯網 MQTT 消息服務器。作為全球最具擴展性的 MQTT 消息服務器,EMQX 提供了高效可靠海量物聯網設備連接,能夠高性能實時移動與處理消息和事件流數據,幫助您快速構建關鍵業務的物聯網平臺與應用。
-
EMQX文檔
-
EMQX的docker安裝:開始在linux上安裝1Panel,然后再應用商店中進行一鍵安裝。
-
EMQX特性:
- 開放源碼:基于 Apache 2.0 許可證完全開源,自 2013 年起 200+ 開源版本迭代。
- MQTT 5.0:100% 支持 MQTT 5.0 和 3.x 協議標準,更好的伸縮性、安全性和可靠性。
- 海量連接:單節點支持 500 萬 MQTT 設備連接,集群可擴展至 1 億并發 MQTT 連接。
- 高性能:單節點支持每秒實時接收、移動、處理與分發數百萬條的 MQTT 消息。
- 低時延:基于 Erlang/OTP 軟實時的運行時系統設計,消息分發與投遞時延低于 1 毫秒。
- 高可用:采用 Masterless 的大規模分布式集群架構,實現系統高可用和水平擴展。
- 根據業務流程圖可以看出,系統與柜機交互是通過MQTT協議進行
項目集成EMQX
集成配置
- 引入依賴
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
- MqttTest
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttTest {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://ip:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設置回調client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}});// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發布所需參數MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
- 配置yaml文件
emqx:client:clientId: xt001username: xxxpassword: xxxserverURI: tcp://ip:1883keepAliveInterval: 10connectionTimeout: 30
- Emqx配置對象類(EmqxProperties)
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {private String clientId;private String username;private String password;private String serverURI;private int keepAliveInterval;private int connectionTimeout;
}
- Emqx常量(EmqxConstants)
/*** Emqx常量信息**/
public class EmqxConstants {/** 充電寶插入,柜機發布Topic消息, 服務器監聽消息 */public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";/** 用戶掃碼,服務器發布Topic消息 柜機監聽消息 */public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";/** 充電寶彈出,柜機發布Topic消息,服務器監聽消息 */public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";/** 柜機屬性上報,服務器監聽消息 */public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}
客戶端和回調方法
- EmqxClientWrapper
import com.share.device.emqx.callback.OnMessageCallback;
import com.share.device.emqx.config.EmqxProperties;
import com.share.device.emqx.constant.EmqxConstants;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class EmqxClientWrapper {@Autowiredprivate EmqxProperties emqxProperties;@Autowiredprivate MqttClient client;@Autowiredprivate OnMessageCallback onMessageCallback;@PostConstructprivate void init() {MqttClientPersistence mqttClientPersistence = new MemoryPersistence();try {//新建客戶端 參數:MQTT服務的地址,客戶端名稱,持久化client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);// 設置回調client.setCallback(onMessageCallback);// 建立連接connect();} catch (MqttException e) {log.info("MqttClient創建失敗");throw new RuntimeException(e);}}public Boolean connect() {// 設置連接的配置try {client.connect(mqttConnectOptions());log.info("連接成功");// 訂閱String[] topics = {EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST};client.subscribe(topics);return true;} catch (MqttException e) {log.info("連接失敗");e.printStackTrace();}return false;}/*創建MQTT配置類*/private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(emqxProperties.getUsername());options.setPassword(emqxProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);//是否自動重新連接options.setCleanSession(true);//是否清除之前的連接信息options.setConnectionTimeout(emqxProperties.getConnectionTimeout());//連接超時時間options.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳return options;}/*** 發布消息* @param topic* @param data*/public void publish(String topic, String data) {try {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(2);client.publish(topic, message);} catch (MqttException e) {log.info("消息發布失敗");e.printStackTrace();}}}
- 回調消息處理類 :OnMessageCallback
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component;@Slf4j @Component public class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}@Override public void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload()));try {// 根據主題選擇不同的處理邏輯MassageHandler massageHandler = messageHandlerFactory.getMassageHandler(topic);if(null != massageHandler) {String content = new String(message.getPayload());massageHandler.handleMessage(JSONObject.parseObject(content));}} catch (Exception e) {e.printStackTrace();log.error("mqtt消息異常:{}", new String(message.getPayload()));} }@Override public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete()); } }
具體接口和方法處理
- 定義策略接口:MassageHandler
public interface MassageHandler {/*** 策略接口* @param message*/void handleMessage(JSONObject message);
}
- 具體Handler處理
import java.lang.annotation.*;
// 自定義注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GuiguEmqx {String topic();
}
- 充電寶插入處理類:PowerBankConnectedHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
- 充電寶彈出處理類:PowerBankUnlockHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PowerBankUnlockHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
- 屬性上報:PropertyPostHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_PROPERTY_POST)
public class PropertyPostHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
處理類
- MessageHandlerFactory
public interface MessageHandlerFactory {MassageHandler getMassageHandler(String topic);
}
- MessageHandlerFactoryImpl
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {private Map<String, MassageHandler> handlerMap = new HashMap<>();/*** 初始化bean對象* @param ioc*/@Overridepublic void setApplicationContext(ApplicationContext ioc) {// 獲取對象Map<String, MassageHandler> beanMap = ioc.getBeansOfType(MassageHandler.class);for (MassageHandler massageHandler : beanMap.values()) {GuiguEmqx guiguEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), GuiguEmqx.class).iterator().next();if (null != guiguEmqx) {String topic = guiguEmqx.topic();// 初始化到maphandlerMap.put(topic, massageHandler);}}}@Overridepublic MassageHandler getMassageHandler(String topic) {return handlerMap.get(topic);}
}