六、MQTT源碼簡單瀏覽

1、MQTT程序分層

1.1、MQTT客戶端工作流程

(1)連接MQTT服務器。

(2)客戶端向服務器發送訂閱主題。

(3)客戶端等待MQTT的消息。

(4)客戶端向服務器發送消息。

2.2、MQTT程序結構

  • APP層
    • while循環或一個進程中:等待消息,處理消息;
      發送消息(如檢測到著火,向服務端發送消息)
  • 協議層:MQTT(或其他的SSH、FTP)
    • MQTT的內部實現
  • 驅動層
    • MQTT把這一層看作平臺,會提供多線程、定時器(涉及心跳包)、網卡收發
    • 提供相應網絡模塊的驅動程序
    • 移植一個操作系統(FreeRTOS、RTthreed、Linux)

2、源碼瀏覽

從示例中emqx平臺的代碼開始分析,主要包括以下方面

  • 連接服務器
  • 創建線程
  • 發布消息
  • 訂閱消息
  • 接收訂閱的消息并處理

2.1、連接服務器

(1)打開mqttclient\example\emqx\emqx.c文件。

(2)瀏覽main函數。

(3)函數調用過程

main()client = mqtt_lease();                       // 客戶端結構體的內存分配mqtt_set_port(client, "1883");               // 設置要連接的服務器的端口mqtt_set_host(client, "120.25.213.14");      // 設置要連接的服務器IPmqtt_connect(client);                        // 服務器連接 mqtt_connect_with_results(c);            // 以阻塞模式連接服務器,等待連接結果// 網絡初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 網絡連接rc = network_connect(c->mqtt_network);// nettype:網絡類型;TCP連接nettype_tcp_connect(n);    // 這個需要程序自己提供,平臺相關函數platform_net_socket_connect();

(4)platform_net_socket_connect()函數功能及內容

  • xx
  • xx

2.2、創建發布消息線程

(1)打開mqttclient\example\emqx\emqx.c文件。

(2)瀏覽main函數。

(3)函數調用過程

mainres = pthread_create(&thread1, NULL, mqtt_publish_thread, client);mqtt_publish_thread();        // 發布消息線程函數// 1、構造要發送的消息 mqtt_message_t msg;memset(&msg, 0, sizeof(msg));msg.payload = (void *) buf;mqtt_publish(client, "topic1", &msg);  // 發布消息// 2、根據平臺相關的函數發送數據包mqtt_send_packet();network_write();nettype_tcp_write();// 這個函數需要自己提供,平臺相關函數platform_net_socket_write_timeout();

(4)platform_net_socket_write_timeout()函數功能及內容:

  • xx
  • xx

2.3、mqtt_yield_thread線程

  • 接收訂閱的消息
  • 發送心跳包
  • 處理錯誤

(1)打開mqttclient\example\emqx\emqx.c文件。

(2)瀏覽main函數。

(3)核心調用過程

mainmqtt_connect(client);             // 服務器連接mqtt_connect_with_results(c); // 以阻塞模式連接服務器,等待連接結果 // 網絡初始化rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);// 網絡連接rc = network_connect(c->mqtt_network);/* send connect packet */// 發送連接包if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)goto exit;// 等待回應if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {}/* connect success, and need init mqtt thread */// 連接成功就初始化線程mqtt_yield_threadc->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, ...);

2.4、處理訂閱消息函數

(1)打開mqttclient\example\emqx\emqx.c文件。

(2)瀏覽main函數。

  • 訂閱消息函數: mqtt_subscribe();
// 訂閱消息且指定處理函數為topic1_handler
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);// 沒有指定處理函數的會調用默認處理函數
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);

(3)訂閱消息處理函數(default_msg_handler)所在位置:

// int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
mqtt_subscribe(client, "topic2", QOS1, NULL); // 訂閱主題topic2// 定義消息處理結構體,結構體內容見下段代碼message_handlers_t *msg_handler = NULL;   // 如果未指定handler(處理消息的函數指針),則使用默認的處理程序if (NULL == handler)handler = default_msg_handler; // 將消息處理結構體記錄到一個鏈表中:包含主題是啥,接收訂閱消息處理函數是啥msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);  

消息處理結構體

typedef struct message_handlers {mqtt_list_t         list;mqtt_qos_t          qos;const char*         topic_filter;   // 記錄消息的主題message_handler_t   handler;        // 函數指針,指向處理消息的函數
} message_handlers_t;

2.5、訂閱消息的接收

(1)因為消息何時到來是不知道的,所以消息的接收是放在線程中不斷查詢的。(或者在中斷中接收到去通知線程)

(2)找到mqtt_yield_thread線程函數。

(3)消息接收到調用消息處理函數的流程:

mqtt_yield_thread()  // 線程函數while(1){rc = mqtt_yield(c, c->mqtt_cmd_timeout);// 處理MQTT報文rc = mqtt_packet_handle(c, &timer);// 讀取MQTT報文rc = mqtt_read_packet(c, &packet_type, timer);// 根據報文類型調用如下函數rc = mqtt_publish_packet_handle(c, timer);  // 服務器發布的消息mqtt_deliver_message(c, &topic_name, &msg);// 獲取MQTT消息處理程序msg_handler = mqtt_get_msg_handler(c, topic_name);// 傳遞消息給處理函數;參數:客戶端,消息數據    msg_handler->handler(c, &md);   }

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/713861.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/713861.shtml
英文地址,請注明出處:http://en.pswp.cn/news/713861.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[法規規劃|方案實操]數據資產入表,城投將獲融資新渠道

2023年8月,財政部發布了《企業數據資源相關會計處理暫行規定》,并從2024年1月1日開始實施,標志著數據資產正式納入企業的資產負債表。這一舉措被視為數據資產從理論走向實踐的重大一步。 數據資產入表對城投運營模式的影響 隨著全球經濟格局…

Vue3速成

文章目錄 day 11. 創建vue3工程3. 響應式數據4. 計算屬性 day 25. watch 監視6. watchEffect7. 標簽的ref屬性8. 回顧TS中的接口_泛型_自定義類型 day 1 1. 創建vue3工程 相關代碼如下: ## 創建vue工程 npm create vuelastest## 安裝node_modules npm install //…

JSON 文件里的 “$schema” 是干什么用的?

最近我在做一些前端項目,我發現有的配置文件,比如 .prettierrc.json 或者 tsconfig.json 里面都會看到一個 $schema 字段,有點好奇,就查了一下。 什么是 JSON Schema JSON Schema是一種基于JSON (JavaScript Object Notation) 的…

【Leetcode】2369. 檢查數組是否存在有效劃分

文章目錄 題目思路代碼結果 題目 題目鏈接 給你一個下標從 0 開始的整數數組 nums ,你必須將數組劃分為一個或多個 連續 子數組。 如果獲得的這些子數組中每個都能滿足下述條件 之一 ,則可以稱其為數組的一種 有效 劃分: 子數組 恰 由 2 個…

MATLAB算法實戰應用案例精講-【圖像處理】三維重建(最終篇)

目錄 前言 相機定標和三維重建 針孔相機模型和變形 三維成像 一、機器視覺系統組成

大數據智能化-長視頻領域

隨著數字化時代的到來,長視頻領域的發展迎來了新的機遇和挑戰。在這一背景下,大數據智能化技術的應用成為長視頻行業提升用戶體驗、優化運營管理的重要手段之一。本文將從優愛騰3大長視頻背景需求出發,分析靜態資源CDN、視頻文件存儲與分發、…

網絡安全、信息安全、計算機安全,有何區別?

這三個概念都存在,一般人可能會混為一談。 究竟它們之間是什么關系?并列?交叉? 可能從廣義上來說它們都可以用來表示安全security這樣一個籠統的概念。 但如果從狹義上理解,它們應該是有區別的,區別在哪呢&…

力扣hot100題解(python版36-40題)

36、二叉樹的中序遍歷 給定一個二叉樹的根節點 root ,返回 它的 *中序 遍歷* 。 示例 1: 輸入:root [1,null,2,3] 輸出:[1,3,2]示例 2: 輸入:root [] 輸出:[]示例 3: 輸入&am…

tcping實用小工具

Tcping實用小工具命令詳解 一、tcping介紹 tcping:tcping命令基于tcp協議監控,可以從較低級別的協議獲得簡單的,可能不可靠的數據報服務。 原則上,TCP應該能夠在從容硬線連接到分組交換或電路交換網絡的各種通信系統之上操作。 …

【機器學習基礎】層次聚類-BIRCH聚類

🚀個人主頁:為夢而生~ 關注我一起學習吧! 💡專欄:機器學習 歡迎訂閱!相對完整的機器學習基礎教學! ?特別提醒:針對機器學習,特別開始專欄:機器學習python實戰…

企業微信私有部署:實現高效溝通與信息安全

隨著移動互聯網的快速發展,企業微信作為一種高效、便捷的通訊工具,已經成為了眾多企業的首選。然而,對于一些對信息安全有特殊要求的大型企業而言,使用公有版企業微信并不能滿足其安全需求。因此,企業微信私有部署應運…

matplotlib.animation 3d姿態動畫

目錄 演示效果: 演示代碼: 保存為gif 演示效果: 演示代碼: import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from matplotlib.animation import FuncAnimation# 定義人體關鍵點…

【c++入門】純粹的五位偶數

說明 純粹偶數指的是一個數的各個位都是偶數的數,比如:24686;請編程求出10000~n中,所有的五位的純粹偶數有多少個? 輸入數據 一個整數n(n為一個5位的整數) 輸出數據 一個整數,代…

網絡防御第6次作業

防病毒網關 按照傳播方式分類 病毒 病毒是一種基于硬件和操作系統的程序,具有感染和破壞能力,這與病毒程序的結構有關。病毒攻擊的宿主程序是病毒的棲身地,它是病毒傳播的目的地,又是下一次感染的出發點。計算機病毒感染的一般過…

Java基礎 - Stream 流:Stream API的中間操作

在上一篇博客中,我介紹了構建 Stream 流的多種方式,以及 Stream 流的特點和優勢。如果你還沒有閱讀,你可以點擊這里查看。 Java基礎 - Stream 流:構建流的多種方式 在這篇博客中,我將探索 Stream API 的中間操作&…

動態規劃(算法競賽、藍橋杯)--分組背包DP

1、B站視頻鏈接&#xff1a;E16 背包DP 分組背包_嗶哩嗶哩_bilibili #include <bits/stdc.h> using namespace std; const int N110; int v[N][N],w[N][N],s[N]; // v[i,j]:第i組第j個物品的體積 s[i]:第i組物品的個數 int f[N][N]; // f[i,j]:前i組物品&#xff0c;能放…

學習JavaEE的日子 Day21 枚舉

Day21 1.枚舉的引入 需求&#xff1a;編寫季節類&#xff08;Season&#xff09;&#xff0c;該類只有四個對象&#xff08;spring&#xff0c;summer&#xff0c;autumn&#xff0c;winter&#xff09; 概念&#xff1a;枚舉&#xff08;enum&#xff09;全稱為 enumeration&…

基帶信號處理設計原理圖:2-基于6U VPX的雙TMS320C6678+Xilinx FPGA K7 XC7K420T的圖像信號處理板

基于6U VPX的雙TMS320C6678Xilinx FPGA K7 XC7K420T的圖像信號處理板 綜合圖像處理硬件平臺包括圖像信號處理板2塊&#xff0c;視頻處理板1塊&#xff0c;主控板1塊&#xff0c;電源板1塊&#xff0c;VPX背板1塊。 一、板卡概述 圖像信號處理板包括2片TI 多核DSP處理…

Linux進程管理:(二)進程調度原語

文章說明&#xff1a; Linux內核版本&#xff1a;5.0 架構&#xff1a;ARM64 參考資料及圖片來源&#xff1a;《奔跑吧Linux內核》 Linux 5.0內核源碼注釋倉庫地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 進程調度的概念比較簡單&#xff0c…

Java學習筆記NO.17

T1&#xff1a;合并兩個排序好的整數數組 import java.util.Arrays;public class MergeSortedArrays {public static int[] mergeArrays(int[] arr1, int[] arr2) {int[] result new int[arr1.length arr2.length];int i 0, j 0, k 0;while (i < arr1.length &&am…