介紹
我們可以想象這么一個場景,我們java應用想要采集到電表a的每小時的用電信息,我們怎么拿到電表的數據?一般我們會想 直接 java 后臺發送請求給電表,然后讓電表返回數據就可以了,事實上,我們java應用發送請求請求電表的數據信息并不是發到電表上,而是發送到 服務端 (broker)上,請求服務器 給我們電表的信息,而電表會把數據 按照mqtt協議 源源不斷的發送到服務端,服務端可以把數據存儲到物聯網數據庫上,也可以由我們java應用手動存儲到物聯網數據庫上
而我們怎么知道電表發送到服務端的哪里,java應用又怎么請求到該電表發送的位置?
這就 引出了 一個概念,主題 (topic) ,這個topic在mqtt中不需要手動的創建,只要又客戶端訂閱或者發布消息,主題就會被自動創建出來
而我們服務端用的最多的就是 集成好的emqx服務器,本文我們也用的是集成好的emqx的服務端
,我們先是 一個電表 訂閱好一個固定的主題,然后 源源不斷的往服務端發消息,然后我們java應用訂閱這個主題,這樣 java應用就能持續的拿到電表的數據了
具體什么是主題,主題怎么設置的 ,mqtt協議的具體協議內容,直接登錄emqx官網查看即可
MQTT 最全教程:從入門到精通 | EMQ
而emqx服務器是怎么在linux系統上搭建的呢,具體直接看文檔即可,輸入文檔對應的yum命令就可以直接 在linux服務器上安裝了
在 CentOS/RHEL 上安裝 EMQX | EMQX文檔
文本主要書寫代碼的實現
代碼實現
我們的yml文件如下
?我們后續 java應用訂閱消息 都要到服務端 emqx 的1883端口
實體類如下
?
這里解釋一下 clientid 是不固定的,隨機的每一個發布/訂閱消息的客戶端都有一個唯一的clientid
而username 和password 是 客戶端連接到 服務端的認證賬戶,多個客戶端可以使用一個 賬號密碼
客戶端代碼實現
@Slf4j
@Component
@RequiredArgsConstructor
public class EMQXClient {
private final MqttDefaultProperties mqttDefaultProperties;
private final IMessageCallbackImpl mqttCallback;private IMqttClient mqttClient;/*** 初始化客戶端對象*/
public boolean initMqttClient(String clientId,String serverUrl) {MemoryPersistence memoryPersistence = new MemoryPersistence();try {if(Objects.isNull(clientId)){clientId= mqttDefaultProperties.getDefaultClientId();}if(Objects.isNull(serverUrl)){serverUrl= mqttDefaultProperties.getServerUrl();}mqttClient = new MqttClient(serverUrl, clientId, memoryPersistence);} catch (MqttException e) {log.info("mqtt創建異常:{}", e.getMessage());return false;}return true;
}public boolean initMqttClient() {MemoryPersistence memoryPersistence = new MemoryPersistence();try {mqttClient = new MqttClient(mqttDefaultProperties.getServerUrl(),mqttDefaultProperties.getDefaultClientId(), memoryPersistence);} catch (MqttException e) {log.info("mqtt創建異常:{}", e.getMessage());return false;}return true;}/*** 獲取連接* @return*/public boolean connect() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//當客戶端會話關閉的時候 對應的broker也關閉mqttConnectOptions.setCleanSession(true);//自動重連mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(mqttDefaultProperties.getDefaultUserName());mqttConnectOptions.setPassword(mqttDefaultProperties.getDefaultPassword().toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客戶端連接異常:{}",e.getMessage());return false;}return true;}public boolean connect(String username,String password) {if(Objects.isNull(username)){username=mqttDefaultProperties.getDefaultUserName();}if(Objects.isNull(password)){password= mqttDefaultProperties.getDefaultPassword();}MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//當客戶端會話關閉的時候 對應的broker也關閉mqttConnectOptions.setCleanSession(true);//自動重連mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttClient.setCallback(mqttCallback);try {mqttClient.connect(mqttConnectOptions);} catch (MqttException e) {log.info("客戶端連接異常:{}",e.getMessage());return false;}return true;
}/*** 斷開連接* @return*/public boolean disConnect(){try {mqttClient.disconnect();} catch (MqttException e) {log.info("客戶端斷開連接異常:{}",e.getMessage());return false;}
return true;
}/**** @param topic 主題* @param msg 消息內容* @param qosEnum* @param retain 新的訂閱者來了是否能拿到之前的 最新的一次消息* @return*/public boolean publish(String topic, String msg, QosEnum qosEnum,boolean retain){int uniqueInt = (int) (System.nanoTime() & 0xFFFFFFFFL);//取納秒時間戳低32位MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload(msg.getBytes());mqttMessage.setQos(qosEnum.getType());mqttMessage.setRetained(retain);mqttMessage.setId(uniqueInt);try {mqttClient.publish(topic,mqttMessage);} catch (MqttException e) {log.info("客戶端發送消息失敗:{}",e.getMessage());return false;}return true;}/**** @param topicFilters 要訂閱的主題 例子 testtopic/#* @param qosEnum* @return*/public boolean subscribe(String topicFilters,QosEnum qosEnum){try {mqttClient.subscribe(topicFilters,qosEnum.getType());} catch (MqttException e) {log.info("訂閱主題失敗:{}",e.getMessage());return false;}return true;}
public boolean unSubscribe(String topicFilter){try {mqttClient.unsubscribe( topicFilter);} catch (MqttException e) {log.info("取消訂閱主題失敗:{}",e.getMessage());return false;}return true;
}}
我們著重關注的是
我們想連接 服務端 是不是得有 一個client ,那這個client就對應IMqttclient
,我們java應用客戶端連接上服務端之后,是不是得訂閱主題,訂閱之后的邏輯在哪里,就在
IMessageCallbackImpl
這里面就是 書寫的 客戶端收到服務端發來的消息之后的處理情況
@Slf4j
@Component
public class IMessageCallbackImpl implements MessageCallback {@Overridepublic void connectionLost(Throwable cause) {//丟失對服務端的連接后觸發該方法回調,此處可以做一些特殊處理,比如重連 或者記錄 日志之類的log.info("丟失了對broker的連接");}/*** 訂閱到消息后的回調* 該方法由mqtt客戶端同步調用,在此方法未正確返回之前,不會發送ack確認消息到broker* 一旦該方法向外拋出了異常客戶端將異常關閉,當再次連接時;所有QoS1,QoS2且客戶端未進行ack確認的消息都將由broker服務器再次發送到客戶端* @param topic* @param message* @throws Exception*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {log.info("訂閱到了消息;topic={},messageid={},qos={},msg={}",topic,message.getId(),message.getQos(),new String(message.getPayload()));}/*** 消息發布完成且收到ack確認后的回調* QoS0:消息被服務端發出后觸發一次* QoS1:當收到broker的PUBACK消息后觸發* QoS2:當收到broer的PUBCOMP消息后觸發* @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {int messageId = token.getMessageId();String[] topics = token.getTopics();log.info("消息發送完成,messageId={},topics={}",messageId,topics);}
}
?
我們用一個 bean 在初始化的時候就訂閱一個主題,這樣 只要有 客戶端往主題上發消息,我們就能收到了
而我們這個時候 沒有硬件,怎么辦呢,很簡單,直接下載一個mqttx 模擬硬件發送消息到主題,啟動springboot,就能看到消息的發送與接收了
當然 這實現的緊緊是最簡單的協議的發送接收,后面還有許多的高級功能等我們使用,具體的可以查閱官方文檔