MQTT 是機器對機器(M2M)/物聯網(IoT)連接協議。它被設計為一個極其輕量級的發布/訂閱消息傳輸 協議。對于需要較小代碼占用空間和/或網絡帶寬非常寶貴的遠程連接非常有用,是專為受限設備和低帶寬、 高延遲或不可靠的網絡而設計。這些原則也使該協議成為新興的“機器到機器”(M2M)或物聯網(IoT)世界 的連接設備,以及帶寬和電池功率非常高的移動應用的理想選擇。例如,它已被用于通過衛星鏈路與代理 通信的傳感器、與醫療服務提供者的撥號連接,以及一系列家庭自動化和小型設備場景。它也是移動應用 的理想選擇,因為它體積小,功耗低,數據包最小,并且可以有效地將信息分配給一個或多個接收器。M QTT 通信模型如下圖所示:
前提需要在電腦上運行起來mqtt的服務器,emqx軟件非常合適,他還帶了mqtt的客戶端,很方便調試,具體下載和安裝方法見下面博文,也可以去emqx官網下載最新版進行免費試用。
MQTT:windows最簡單搭建mqtt服務端及本地客戶端測試_emqx-windows-4.3.6-CSDN博客
修改網絡參數
在Hi3861開發板上運行上述四個測試程序之前,需要根據你的無線路由、Linux系統IP修改 net_params.h文件的相關代碼:
- PARAM_HOTSPOT_SSID 修改為你的熱點名稱
- PARAM_HOTSPOT_PSK 修改為你的熱點密碼;
- PARAM_SERVER_ADDR 修改為你的服務器IP地址;
- PARAM_SERVER_PORT 修改為你的服務器端口號;
- SERVER_IP_ADDR 修改為你的mqtt服務器IP地址;
- SERVER_IP_PORT 修改為mqtt的tcp端口,默認為1883;
- MQTT_TOPIC_SUB 訂閱ID;
- MQTT_TOPIC_PUB 發布的ID;
- MQTT_CLIENT_ID MQTT的客戶端ID;
- MQTT_USER_NAME MQTT的用戶名;
- MQTT_PASSWORD MQTT的密碼;
代碼編寫
修改D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\BUILD.gn文件
# Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. import("//build/lite/config/component/lite_component.gni")lite_component("demo") {features = [#"base_00_helloworld:base_helloworld_example",#"base_01_led:base_led_example",#"base_02_loopkey:base_loopkey_example",#"base_03_irqkey:base_irqkey_example",#"base_04_adc:base_adc_example",#"base_05_pwm:base_pwm_example",#"base_06_ssd1306:base_ssd1306_example",#"kernel_01_task:kernel_task_example",#"kernel_02_timer:kernel_timer_example",#"kernel_03_event:kernel_event_example",#"kernel_04_mutex:kernel_mutex_example",#"kernel_05_semaphore_as_mutex:kernel_semaphore_as_mutex_example",#"kernel_06_semaphore_for_sync:kernel_semaphore_for_sync_example",#"kernel_07_semaphore_for_count:kernel_semaphore_for_count_example",#"kernel_08_message_queue:kernel_message_queue_example",#"wifi_09_hotspot:wifi_hotspot_example",#"wifi_10_sta:wifi_sta_example",#"tcp_11_server:tcp_server_example",#"tcp_12_client:tcp_client_example",#"udp_13_server:udp_server_example",#"udp_14_client:udp_client_example","network_15_mqtt:network_mqtt_example",]
}
創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt文件夾
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\BUILD.gn文件
# Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. static_library("network_mqtt_example") {sources = ["network_mqtt_example.c","network_mqtt.c","wifi_connecter.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectClient.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTDeserializePublish.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTFormat.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTPacket.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSerializePublish.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeClient.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeClient.c",]include_dirs = ["//utils/native/lite/include","//kernel/liteos_m/kal/cmsis","//base/iot_hardware/peripheral/interfaces/kits","//foundation/communication/wifi_lite/interfaces/wifiservice","//vendor/hqyj/fs_hi3861/common/bsp/include","//third_party/paho.mqtt.embedded-c/MQTTPacket/src","//third_party/cJSON",]
}
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt.h文件,文件主要包含mqtt的函數。
/** Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#ifndef NETWORK_MQTT_H
#define NETWORK_MQTT_H/*** @brief MQTT 連接MQTT服務器* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_connectServer(const char *ip_addr, int ip_port);
/*** @brief MQTT 斷開連接MQTT服務器* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_unConnectServer(void);
/*** @brief MQTT 訂閱MQTT主題* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_subscribe(char *subTopic);
/*** @brief MQTT 客戶端的初始化* @param clientID 客戶端ID* @param userName 用戶名* @param password 密碼* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_init(char *clientID, char *userName, char *password);
/*** @brief MQTT 發布消息* @param pub_Topic 具有發布權限的主題名稱* @param payloadData 發布數據* @param payloadLen 發布數據的長度* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen);
/*** @brief MQTT 接收消息* @param callback 當接收到消息之后,將消息傳到到回調函數中* @return Returns {0} 成功;* Returns {-1} 失敗.*/
int MQTTClient_sub(void);extern int8_t(*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);#endif // !NETWORK_MQTT_H
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt.c文件,文件主要包含mqtt的函數實現。
/** Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include "lwip/netifapi.h"
#include "lwip/sockets.h"
#include "wifi_device.h"
#include "ohos_init.h"
#include "MQTTPacket.h"
#include "network_mqtt.h"#define MQTT_BUFF_MAX_SIZE 512int g_tcp_socket_fd = 0; // 網絡套接字
unsigned char mqttBuff[MQTT_BUFF_MAX_SIZE] = {0};// 發送網絡數據
static int transport_sendPacketBuffer(unsigned char *buf, int buflen)
{int rc = send(g_tcp_socket_fd, buf, buflen, 0);return (rc <= 0) ? 0 : 1;
}
// 接收網絡數據
static int transport_getdata(unsigned char *buf, int count)
{int rc = recv(g_tcp_socket_fd, buf, count, 0);return rc;
}// 連接服務器
int MQTTClient_connectServer(const char *ip_addr, int ip_port)
{if (ip_addr == NULL) {return -1;}int res = 0; // 函數返回值struct sockaddr_in tcpServerConfig; // tcp服務器信息// 創建TCP套接字g_tcp_socket_fd = socket(AF_INET, SOCK_STREAM, 0);if (g_tcp_socket_fd < 0) {printf("Failed to create Socket\r\n");}// 連接TCP服務器tcpServerConfig.sin_family = AF_INET; // IPV4tcpServerConfig.sin_port = htons(ip_port); // 填寫服務器的IP端口號tcpServerConfig.sin_addr.s_addr = inet_addr(ip_addr); // 填寫服務器的IP地址res = connect(g_tcp_socket_fd, (struct sockaddr *)&tcpServerConfig, sizeof(tcpServerConfig)); // 連接服務器if (res == -1) {printf("Failed to connect to the server\r\n");return -1;}printf("Connection to server successful\r\n");return 0;
}// 斷開TCP服務器 0:成功, -1:失敗
int MQTTClient_unConnectServer(void)
{int ret = 0;printf("Server shut down successfully\r\n");ret = close(g_tcp_socket_fd);g_tcp_socket_fd = 0;return ret;
}// mqtt客戶端 訂閱主題
int MQTTClient_subscribe(char *subTopic)
{if (subTopic == NULL) {printf("Incorrect parameters\r\n");return -1;}int len = 0, res = 0;int subcount = 0, granted_qos = 0, req_qos = 0;unsigned short submsgid = 0;MQTTString topicString = MQTTString_initializer;/* subscribe */topicString.cstring = subTopic;len = MQTTSerialize_subscribe(mqttBuff, sizeof(mqttBuff), 0, 1, 1, &topicString, &req_qos);if (len <= 0) {printf("MQTTSerialize_subscribe Error %d\r\n", len);return -1;}res = transport_sendPacketBuffer(mqttBuff, len);if (res != 1) {printf("transport_sendPacketBuffer Error %d\r\n", res);return -1;}sleep(1);memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));/* wait for suback */if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != SUBACK) {printf("MQTTPacket_read Error\r\n");return -1;}if (MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, mqttBuff, sizeof(mqttBuff)) != 1) {printf("MQTTDeserialize_suback Error\r\n");return -1;}printf("MQTT subscribed to topics successfully\r\n");return 0;
}// 保持在線時長 60s
#define MQTT_KEEP_ALIVE 60
#define MQTT_DELAY_TIME 3// mqtt客戶端 初始化
int MQTTClient_init(char *clientID, char *userName, char *password)
{if (clientID == NULL || userName == NULL || password == NULL) {printf("Incorrect parameters\r\n");return -1;}int res = 0, len = 0, i = 0;int mqtt_read_len = 10;unsigned char sessionPresent = 0, connack_rc = 0;MQTTPacket_connectData mqttData = MQTTPacket_connectData_initializer;// 初始化MQTT客戶端mqttData.clientID.cstring = clientID;mqttData.username.cstring = userName;mqttData.password.cstring = password;mqttData.cleansession = true; // 是否初始化的時候,清除上一次的對話mqttData.keepAliveInterval = MQTT_KEEP_ALIVE;// 組MQTT消息包len = MQTTSerialize_connect(mqttBuff, sizeof(mqttBuff), &mqttData);if (len <= 0) {printf("MQTTSerialize_connect Error %d\r\n", res);return -1;}res = transport_sendPacketBuffer(mqttBuff, len);if (res != 1) {printf("transport_sendPacketBuffer Error %d\r\n", res);return -1;}sleep(MQTT_DELAY_TIME);/* 打印發送出去的數據幀,調試用 */printf("MQTT_sendPacket: \r\n");for (i = 0; i < len; i++) {printf("%x ", mqttBuff[i]);}printf("\r\n");memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));/* wait for connack */if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != CONNACK) {printf("MQTTPacket_read != CONNACK\r\n");}printf("MQTT_recvPacket: \r\n");/* 打印服務器返回的消息,調試用 */for (i = 0; i < mqtt_read_len; i++) {printf("%x ", mqttBuff[i]);}printf("\r\n");if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqttBuff, sizeof(mqttBuff)) != 1 || connack_rc != 0) {printf("Unable to connect, return code %d\r\n", connack_rc);memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));return -1;} else {printf("MQTT initialized successfully\r\n");}memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));return 0;
}#define MQTT_PUB_DATA_TIME (100 * 1000)int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen)
{if (payloadData == NULL) {printf("Incorrect parameters\r\n");return -1;}printf("pubTopic: %s\n", pub_Topic);printf("pubData: %s\n", payloadData);int ret = 0, len = 0;unsigned short retry_count = 5; // 重發次數unsigned char sendBuff[MQTT_BUFF_MAX_SIZE] = {0};MQTTString topicString = MQTTString_initializer;topicString.cstring = pub_Topic;len = MQTTSerialize_publish(sendBuff, sizeof(sendBuff), 0, 0, 0, 0, topicString,payloadData,payloadLen);while (--retry_count > 0) {ret = transport_sendPacketBuffer(sendBuff, len);if (ret == 1) {break;}printf("Send MQTT_Data Fail\r\n");usleep(MQTT_PUB_DATA_TIME);}if (!retry_count && ret != 1) {printf("transport_sendPacketBuffer Error %d\r\n", ret);return -1;}// printf("send==>%s", payloadData);return 0;
}
unsigned char mqtt_topic[200];
int8_t (*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);int MQTTClient_sub(void)
{int qos, payloadlen_in;unsigned char dup, retained;unsigned short msgid;unsigned char *payload_in;MQTTString receivedTopic;memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));// $oc/devices/63ad5a6cc4efcc747bd75973_lamp/sys/commands/request_id=42c20ffb-0885-4f6e-97b5-45d8f613efafif (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) == PUBLISH) {MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,&payload_in, &payloadlen_in, mqttBuff, sizeof(mqttBuff));printf("data: %s\n", receivedTopic.lenstring.data);printf("length: %d\n", strlen(receivedTopic.lenstring.data) - payloadlen_in);printf("payload_length: %d\n", payloadlen_in);memcpy_s(mqtt_topic, sizeof(mqtt_topic),receivedTopic.lenstring.data, strlen(receivedTopic.lenstring.data) - payloadlen_in);printf("topic: %s\n", mqtt_topic);printf("payload: %s\n", payload_in);p_MQTTClient_sub_callback(mqtt_topic, payload_in);}
}
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\wifi_connecter.h文件,該頭文件包含wifi連接的宏。文件同tcp_12_client\wifi_connecter.h
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\wifi_connecter.c文件,文件同tcp_12_client\wifi_connecter.c
文件夾中創建D:\DevEcoProjects\test\src\vendor\rtplay\rt_hi3861\demo\network_15_mqtt\network_mqtt_example.c文件
/** Copyright (c) 2023 Beijing HuaQing YuanJian Education Technology Co., Ltd* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#include <stdio.h>
#include <unistd.h>
#include <string.h>#include "ohos_init.h"
#include "cmsis_os2.h"
#include "network_mqtt.h"
#include "wifi_connecter.h"#define SERVER_IP_ADDR "192.168.137.1"
#define SERVER_IP_PORT 1883
#define MQTT_TOPIC_SUB "subTopic"
#define MQTT_TOPIC_PUB "pubTopic"
#define MQTT_CLIENT_ID "mqtt_client_123"
#define MQTT_USER_NAME "rtplay"
#define MQTT_PASSWORD "password"
#define TASK_STACK_SIZE (1024 * 5)
#define TASK_INIT_TIME 2 // s
#define MQTT_RECV_TASK_TIME (200 * 1000) // us
#define DELAY_TICKS_10 (10)osThreadId_t mqtt_send_task_id; // mqtt訂閱數據任務
osThreadId_t mqtt_recv_task_id; // mqtt發布數據任務
int8_t mqtt_sub_payload_callback(unsigned char *topic, unsigned char *payload)
{printf("[info] topic:[%s] recv<== %s\r\n", topic, payload);
}
void mqtt_recv_task(void)
{while (1) {MQTTClient_sub();usleep(MQTT_RECV_TASK_TIME);}
}
void mqtt_send_task(void)
{// 連接WifiWifiDeviceConfig config = {0};// 準備AP的配置參數// strcpy(config.ssid, PARAM_HOTSPOT_SSID);// strcpy(config.preSharedKey, PARAM_HOTSPOT_PSK);strcpy_s(config.ssid, WIFI_MAX_SSID_LEN, PARAM_HOTSPOT_SSID);strcpy_s(config.preSharedKey, WIFI_MAX_KEY_LEN, PARAM_HOTSPOT_PSK);config.securityType = PARAM_HOTSPOT_TYPE;osDelay(DELAY_TICKS_10);int netId = ConnectToHotspot(&config);// 連接MQTT服務器if (MQTTClient_connectServer(SERVER_IP_ADDR, SERVER_IP_PORT) != 0) {printf("[error] MQTTClient_connectServer\r\n");} else {printf("[success] MQTTClient_connectServer\r\n");}sleep(TASK_INIT_TIME);// 初始化MQTT客戶端if (MQTTClient_init(MQTT_CLIENT_ID, MQTT_USER_NAME, MQTT_PASSWORD) != 0) {printf("[error] MQTTClient_init\r\n");} else {printf("[success] MQTTClient_init\r\n");}sleep(TASK_INIT_TIME);// 訂閱Topicif (MQTTClient_subscribe(MQTT_TOPIC_SUB) != 0) {printf("[error] MQTTClient_subscribe\r\n");} else {printf("[success] MQTTClient_subscribe\r\n");}sleep(TASK_INIT_TIME);osThreadAttr_t options;options.name = "mqtt_recv_task";options.attr_bits = 0;options.cb_mem = NULL;options.cb_size = 0;options.stack_mem = NULL;options.stack_size = TASK_STACK_SIZE;options.priority = osPriorityNormal;mqtt_recv_task_id = osThreadNew((osThreadFunc_t)mqtt_recv_task, NULL, &options);if (mqtt_recv_task_id != NULL) {printf("ID = %d, Create mqtt_recv_task_id is OK!\r\n", mqtt_recv_task_id);}while (1) {MQTTClient_pub(MQTT_TOPIC_PUB, "hello world!!!", strlen("hello world!!!"));sleep(TASK_INIT_TIME);}
}static void network_wifi_mqtt_example(void)
{printf("Enter network_wifi_mqtt_example()!\r\n");p_MQTTClient_sub_callback = &mqtt_sub_payload_callback;osThreadAttr_t options;options.name = "mqtt_send_task";options.attr_bits = 0;options.cb_mem = NULL;options.cb_size = 0;options.stack_mem = NULL;options.stack_size = TASK_STACK_SIZE;options.priority = osPriorityNormal;mqtt_send_task_id = osThreadNew((osThreadFunc_t)mqtt_send_task, NULL, &options);if (mqtt_send_task_id != NULL) {printf("ID = %d, Create mqtt_send_task_id is OK!\r\n", mqtt_send_task_id);}
}
SYS_RUN(network_wifi_mqtt_example);
使用build,編譯成功后,使用upload進行燒錄。
首先運行emqx
設置中文
運行后電腦ap會看到設備
串口會輸出
網頁中客戶端會看到模塊
主題也會看到
創建websocket連接服務
客戶端已經有了兩個
訂閱模塊的消息
可以收到模塊發送的消息
發送模塊訂閱的消息
模塊能收到消息