MQTT
文章目錄
- MQTT
- 安裝
- 簡介
- MQTT客戶端代碼
安裝
安裝Paho MQTT C庫:
sudo apt-get install libpaho-mqtt3-dev
頭文件包含:
#include "MQTTClient.h"
編譯選項:
gcc -o $@ $^ -lpaho-mqtt3c
簡介
MQTT協議全稱是(Message Queuing Telemetry Transport),即消息隊列遙測傳輸協議。
是一種基于發布/訂閱(Publish/Subscribe)模式的輕量級通訊協議,并且該協議構建于TCP/IP協議之上,TCP協議本身就具有高可靠性的特點,因此基于其上的MQTT協議同樣也是具有高可靠、低開銷的特點,之所以低開銷,是以為MQTT協議傳輸的最小的報文也只有兩個字節。
在物聯網開發中,MQTT不是唯一的選擇,與MQTT互相競爭的協議有XMPP和CoAP協議等。
關于發布和訂閱的概念我們拿抖音平臺來舉個例子,我們每一個用戶就都是一個客戶端,而抖音就是MQTT協議中的服務器,當我們(用戶一)關注某一個視頻發布者(用戶二)時,這樣一個關注的行為就可以理解為訂閱;同時用戶二也可以關注你,那么這就是相互訂閱。當用戶二發布作品的時候,這個作品是發布到了抖音平臺,也就是我們現在的服務器,這個過程就是消息的發布。
在這里需要注意的是:用戶二(客戶端)發布的消息并不是直接發布給了用戶一,而是發布到了抖音平臺(服務器),由于用戶一訂閱了用戶二的消息(相當于點了關注),所以抖音平臺(服務器)就會向用戶一推送這個消息(注意發布和推送的區別)。這就是MQTT協議訂閱&發布的一個簡單比喻。
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分
- Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload)
- payload,可以理解為消息的內容,是指訂閱者具體要使用的內容
MQTT客戶端代碼
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"#define ADDRESS "thirdparty.mqtt.yoplore.com" //服務器的IP地址
#define CLIENTID "client01" //發布者的姓名(唯一的,如果發布者和訂閱者用同一個姓名,就會出現頂號的現象)
#define TOPIC1 "MQTT topic/topic1" //訂閱主題
#define TOPIC2 "MQTT topic/topic2" //訂閱主題
#define QOS 1 //服務登記(0.最多一次,1.最少一次,2.確保一次)
#define TIMEOUT 10000L //響應時間//定義一個傳遞令牌
volatile MQTTClient_deliveryToken deliveredtoken;//令牌交付回調函數,當消息成功交付給 MQTT 服務器時調用
/**context:用戶自定義的上下文指針,此處未使用dt:消息交付的令牌
*/
void delivered(void *context, MQTTClient_deliveryToken dt)
{printf("Message with token value %d delivery confirmed\n", dt);deliveredtoken = dt;
}//接受訂閱信息的回調函數,當接收到訂閱主題的消息時調用
/**context:用戶自定義的上下文指針,此處未使用topicName:接收到消息的主題名稱topicLen:主題名稱的長度message:接收到的 MQTT 消息結構體指針
*/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{int i;char *payloadptr;printf("Message arrived\n");printf("topic: %s\n", topicName);printf("message: ");payloadptr = message->payload;for (i = 0; i < message->payloadlen; i++){putchar(*payloadptr);payloadptr++;}putchar('\n');MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}
//斷開鏈接的回調函數
void connlost(void *context, char *cause)
{printf("\nConnection lost\n");printf("cause: %s\n", cause);
}int main(int argc, char *argv[])
{printf("\nCreating MQTTClient\n");// 消息緩沖區char buf[1024];// 1、定義一個MQTT客戶端結構體指針MQTTClient client;// 2、創建一個MQTT客戶端MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_create(&client, ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);conn_opts.keepAliveInterval = 20;// 連接保活時間conn_opts.cleansession = 1; // 設置是否清除會話,1為清除// 定義一個 MQTT 消息結構體,用于存儲要發布的消息MQTTClient_message publish_msg=MQTTClient_message_initializer;// 令牌tokenMQTTClient_deliveryToken token;// 設置回調MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);// 鏈接int rc = MQTTClient_connect(client, &conn_opts);if (rc != MQTTCLIENT_SUCCESS) {printf("Failed to connect, return code %d\n", rc);return EXIT_FAILURE;}// 訂閱多個主題rc = MQTTClient_subscribe(client, TOPIC1, QOS);if (rc != MQTTCLIENT_SUCCESS) {printf("Failed to subscribe to %s, return code %d\n", TOPIC1, rc);return EXIT_FAILURE;}rc = MQTTClient_subscribe(client, TOPIC2, QOS);if (rc != MQTTCLIENT_SUCCESS) {printf("Failed to subscribe to %s, return code %d\n", TOPIC2, rc);return EXIT_FAILURE;}//用戶退出char ch;while (1) {// 發送信息printf("請輸入要發布的內容(輸入 'q' 或 'Q' 退出):\n");if (fgets(buf, sizeof(buf), stdin) == NULL) {printf("讀取輸入失敗\n");continue;}// 去除換行符(fgets會將換行符一并讀取)size_t len = strlen(buf);if (len > 0 && buf[len - 1] == '\n') {buf[len - 1] = '\0';}// 檢查是否退出if (buf[0] == 'q' || buf[0] == 'Q') {break;}publish_msg.payload = (void *)buf;publish_msg.payloadlen = strlen(buf);rc = MQTTClient_publishMessage(client, TOPIC2, &publish_msg, &token);//用于將消息發布到指定的主題if (rc != MQTTCLIENT_SUCCESS) {printf("Failed to publish message, return code %d\n", rc);continue;}rc = MQTTClient_waitForCompletion(client, token, 1000); //用于等待指定的消息交付完成if (rc != MQTTCLIENT_SUCCESS) {printf("Failed to wait for message completion, return code %d\n", rc);continue;}printf("buf中的內容: %s\n", buf);}MQTTClient_disconnect(client,10000);MQTTClient_destroy(&client);printf("\nExiting\n");return 0;
}