1. MQTT簡介
MQTT是一種物聯網消息協議,為Message Queuing Telemetry Transport的縮寫,即消息隊列傳輸探測,協議基于發布訂閱模式進行通信,有開銷低、帶寬小、輕量的特點,通常應用在物聯網數據采集、移動應用、智能硬件、電力、能源等領域。
相關概念
三種身份:
- 客戶端(Client):MQTT 客戶端是發送和接收消息的應用程序。
- 服務器(Broker):也叫“代理”,服務器是處理消息的應用程序,位于發布者和訂閱者中間,負責接收消息,并按照某種規則發送給訂閱者。
- 主題(Topic): 主題是消息的標識符,用于區分不同類型的消息。
MQTT 消息
MQTT傳輸的消息可以分為:主題(topic)和負載(payload)兩部分
- 主題,可以理解為消息的類型
- 負載,可以理解為消息的內容
消息服務質量QoS(Quality of Service)
Qos用于保證在不同的網絡環境下消息傳遞的可靠性,分為3個等級
- 0 消息最多傳遞一次,消息發布完全依賴底層TCP/IP網絡,可能會發生消息丟失, 也就是發出去就不管了,也被叫做“即發即棄”
- 1 消息傳遞至少 1 次,確保消息到達,但消息重復可能會發生,發送者將會存儲發送的信息直到發送者收到一次來自接收者的PUBACK格式的應答。
- 2 消息僅傳送一次,確保消息到達一次
2. SpringBoot集成Mqtt
Spring集成Mqtt常用的有兩種方式,一種是直接使用Mqtt的客戶端庫,如Eclipse Paho
,另外一種是spring integration mqtt
第一種:使用Mqtt客戶端庫
依賴引入:org.eclipse.paho.client.mqttv3
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>
服務端配置
public class MqttSendMsgService {private static String clientId = "test";private static String username = "admin";private static String password = "xxxxxx";private static String broker = "tcp://xxxxx:1883";public ReturnT<String> mqttSend(String param) {MqttClient client;try {client = new MqttClient(broker, clientId, new MemoryPersistence());client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("Message arrived: " + mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("Delivery complete");}});MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName(username);connOpts.setPassword(password.toCharArray());client.connect(connOpts);log.info("Connected to MQTT Broker!");//主題String topic="test/simple";//消息String content="發送測試";MqttMessage message = new MqttMessage();message.setQos(1);message.setRetained(false);message.setPayload(content.getBytes());//消息發送client.publish(topic,message);} catch (MqttException e) {e.printStackTrace();}return ReturnT.SUCCESS;}
}
上面這種使用起來比較簡單,生產環境使用最多的還是下面這種
第二種:使用 Spring integration進行集成,這里以發送消息為例
依賴引入
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version>
</dependency>
添加yaml配置
mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024
添加對應的屬性配置類
@Component
public class MqttConfigProperties {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.defaultTopic}")private String defaultTopic;@Value("${mqtt.keepAliveInterval}")private Integer keepAliveInterval;@Value("${mqtt.automaticReconnect}")private Boolean automaticReconnect;@Value("${mqtt.cleanSession}")private Boolean cleanSession;@Value("${mqtt.connectionTimeout}")private Integer connectionTimeout;@Value("${mqtt.maxInflight}")private Integer maxInflight;
}
創建客戶端配置類
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigProperties.getUsername());options.setPassword(mqttConfigProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());options.setCleanSession(mqttConfigProperties.getCleanSession());options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());options.setMaxInflight(mqttConfigProperties.getMaxInflight());return options;}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);return factory;}// 推送通道@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);messageHandler.setAsync(true);messageHandler.setDefaultQos(1);messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());log.info("初始化mqttOutputChannel...");return messageHandler;}}
發送網關接口
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 發送消息** @param topic* @param data*/void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}
這樣,在發送消息時,直接將消息網關注入,調用發送方法就可以發送了
mqttGateway.send(topic, JSONObject.toJSONString(msg));
參考:
https://mqtt.org/