1、MQTT程序分層
1.1、MQTT客戶端工作流程
(1)連接MQTT服務器。
(2)客戶端向服務器發送訂閱主題。
(3)客戶端等待MQTT的消息。
(4)客戶端向服務器發送消息。
2.2、MQTT程序結構
- APP層
- while循環或一個進程中:等待消息,處理消息;
發送消息(如檢測到著火,向服務端發送消息)
- while循環或一個進程中:等待消息,處理消息;
- 協議層:MQTT(或其他的SSH、FTP)
- MQTT的內部實現
- 驅動層
- MQTT把這一層看作平臺,會提供多線程、定時器(涉及心跳包)、網卡收發
- 提供相應網絡模塊的驅動程序
- 移植一個操作系統(FreeRTOS、RTthreed、Linux)
2、源碼瀏覽
從示例中emqx平臺的代碼開始分析,主要包括以下方面
- 連接服務器
- 創建線程
- 發布消息
- 訂閱消息
- 接收訂閱的消息并處理
2.1、連接服務器
(1)打開mqttclient\example\emqx\emqx.c文件。
(2)瀏覽main函數。
(3)函數調用過程
main()client = mqtt_lease(); // 客戶端結構體的內存分配mqtt_set_port(client, "1883"); // 設置要連接的服務器的端口mqtt_set_host(client, "120.25.213.14"); // 設置要連接的服務器IPmqtt_connect(client); // 服務器連接 mqtt_connect_with_results(c); // 以阻塞模式連接服務器,等待連接結果// 網絡初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 網絡連接rc = network_connect(c->mqtt_network);// nettype:網絡類型;TCP連接nettype_tcp_connect(n); // 這個需要程序自己提供,平臺相關函數platform_net_socket_connect();
(4)platform_net_socket_connect()函數功能及內容
- xx
- xx
2.2、創建發布消息線程
(1)打開mqttclient\example\emqx\emqx.c文件。
(2)瀏覽main函數。
(3)函數調用過程
mainres = pthread_create(&thread1, NULL, mqtt_publish_thread, client);mqtt_publish_thread(); // 發布消息線程函數// 1、構造要發送的消息 mqtt_message_t msg;memset(&msg, 0, sizeof(msg));msg.payload = (void *) buf;mqtt_publish(client, "topic1", &msg); // 發布消息// 2、根據平臺相關的函數發送數據包mqtt_send_packet();network_write();nettype_tcp_write();// 這個函數需要自己提供,平臺相關函數platform_net_socket_write_timeout();
(4)platform_net_socket_write_timeout()函數功能及內容:
- xx
- xx
2.3、mqtt_yield_thread線程
- 接收訂閱的消息
- 發送心跳包
- 處理錯誤
(1)打開mqttclient\example\emqx\emqx.c文件。
(2)瀏覽main函數。
(3)核心調用過程
mainmqtt_connect(client); // 服務器連接mqtt_connect_with_results(c); // 以阻塞模式連接服務器,等待連接結果 // 網絡初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 網絡連接rc = network_connect(c->mqtt_network);/* send connect packet */// 發送連接包if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)goto exit;// 等待回應if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {}/* connect success, and need init mqtt thread */// 連接成功就初始化線程mqtt_yield_threadc->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, ...);
2.4、處理訂閱消息函數
(1)打開mqttclient\example\emqx\emqx.c文件。
(2)瀏覽main函數。
- 訂閱消息函數: mqtt_subscribe();
// 訂閱消息且指定處理函數為topic1_handler
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);// 沒有指定處理函數的會調用默認處理函數
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
(3)訂閱消息處理函數(default_msg_handler)所在位置:
// int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
mqtt_subscribe(client, "topic2", QOS1, NULL); // 訂閱主題topic2// 定義消息處理結構體,結構體內容見下段代碼message_handlers_t *msg_handler = NULL; // 如果未指定handler(處理消息的函數指針),則使用默認的處理程序if (NULL == handler)handler = default_msg_handler; // 將消息處理結構體記錄到一個鏈表中:包含主題是啥,接收訂閱消息處理函數是啥msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
消息處理結構體
typedef struct message_handlers {mqtt_list_t list;mqtt_qos_t qos;const char* topic_filter; // 記錄消息的主題message_handler_t handler; // 函數指針,指向處理消息的函數
} message_handlers_t;
2.5、訂閱消息的接收
(1)因為消息何時到來是不知道的,所以消息的接收是放在線程中不斷查詢的。(或者在中斷中接收到去通知線程)
(2)找到mqtt_yield_thread線程函數。
(3)消息接收到調用消息處理函數的流程:
mqtt_yield_thread() // 線程函數while(1){rc = mqtt_yield(c, c->mqtt_cmd_timeout);// 處理MQTT報文rc = mqtt_packet_handle(c, &timer);// 讀取MQTT報文rc = mqtt_read_packet(c, &packet_type, timer);// 根據報文類型調用如下函數rc = mqtt_publish_packet_handle(c, timer); // 服務器發布的消息mqtt_deliver_message(c, &topic_name, &msg);// 獲取MQTT消息處理程序msg_handler = mqtt_get_msg_handler(c, topic_name);// 傳遞消息給處理函數;參數:客戶端,消息數據 msg_handler->handler(c, &md); }