ThingsBoard MQTT 連接認證過程 源碼分析+圖例

整個連接過程如圖所示:

?高清圖片鏈接

1、環境準備

  • thingsboard3.5.1 源碼啟動。(不懂怎么啟動的,大家可以看我的博文ThingsBoard3.5.1源碼啟動)
  • MQTTX 客戶端(用來連接 thingsboard MQTT)
  • 默認配置。queue.type=in-memory,cache.type=caffeine

因為我們的目的,是快速了解 thingsboard 的啟動過程,所以所有的配置全部采用默認的方式。默認消息隊列采用內存隊列ConcurrentHashMap,緩存也采用內存緩存caffeine。

使用 customerA 用戶賬號密碼登錄,使用設備A1 AccessToken 連接。

2、源碼分析

2.1 連接消息生產

2.1.1 入口

大家知道MQTT是基于TCP協議之上的輕量級通信協議,而TCP協議是面向連接、請求響應的通信協議。所以在 thingsboard 這一側必然有一個服務器實現,用來等待客戶端的連接。這個實現就是MqttTransportService

thingsboard 采用?netty 來實現一個MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}

其中,關系到 netty server 性能的?bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出兩個參數變量?

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用戶根據自己的設備臺數、部署架構,來優化自己的 netty 性能。

?netty server 的請求處理過程如下圖所示,圓圈為具體實現類,方框為方法。

在?MqttTransportHandler#processMqttMsg 方法中,因為我們的消息類型是連接,所以我們會進入?processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}

?在?MqttTransportHandler#processConnect?方法中,由于采用 AccessToken 的授權方式,所以會進入?processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect?方法中,獲取我們在MQTTX填的用戶名、密碼,然后委托給?DefaultTransportService#process 處理

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}

2.1.2?DefaultTransportService

?一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法會調用 TbQueueProducer接口 send 方法,往主題?tb_transport.api.requests 發送消息。TbQueueProducer實現類是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 將消息發送給消息隊列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的實現類有很多個,具體發送消息的實現類是哪一個呢?
因為我們使用內存隊列方式啟動,所以實現類是 InMemoryTbQueueProducer

2、怎么確定發送的主題是?tb_transport.api.requests

主題是通過 requestTemplate獲取的

而?requestTemplate又是?DefaultTbQueueRequestTemplate的一個屬性,通過 Builder 構建器注入進來的。

對于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同種消息隊列的實現方式。我們現在所用的是內存隊列,所以進入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,對于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 讀取的,就是?tb_transport.api.requests 這一主題。

3、更進一步

認真分析初始化過程,得出下面請求主題的初始化圖。

2.1.3?InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 調用?DefaultInMemoryStorage#put 方法?

DefaultInMemoryStorage 往自己持有的?ConcurrentHashMap 中存放消息,

key 是主題 tb_transport.api.requests,value 是存放有消息的?LinkedBlockingQueue 內存隊列

2.1.4 一個更抽象的發送模型

TbQueueProducer 往隊列 queue 發送消息,主題 tb_transport.api.requests,而不管這個消息的實現是內存隊列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 從queue中消費消息。至此,生產連接請求消息的過程結束。

2.2?消費消息

2.2.1?InMemoryTbQueueConsumer

我們知道現在消息生產者接口?TbQueueProducer 的實現類是?InMemoryTbQueueProducer,則它必然有一個消息消費者實現接口?TbQueueConsumer,消費者實現類是 InMemoryTbQueueConsumer

?InMemoryTbQueueConsumer 中對于消息的消費只有把消息從 ConcurrentHashMap 拉取出來的邏輯,而沒有具體處理的邏輯,則處理的邏輯,是存在于調用這個 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}

poll 方法的調用端,全局是搜不到的。

?我們可以探究一下它的構造方法,看看誰初始化了它,則誰就有可能調用它的 poll 方法。排除掉它自己,有兩個類初始化了?InMemoryTbQueueConsumer,分別是?InMemoryMonolithQueueFactory 和?InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 訂閱的主題,是?tb_transport.api.responses 不是我們要找的?tb_transport.api.requests,忽略。

2.2.2?InMemoryMonolithQueueFactory

我們先來看一下?InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一個方法,傳入的 TbQueueTransportApiSettings,剛好就是我們請求消息的主題配置類。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}

查看對方法?createTransportApiRequestConsumer 的調用,找到一個非具體隊列實現的調用類TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,會創建 TbQueueConsumer——也就是具體的實現類 InMemoryTbQueueConsumer 注入到?DefaultTbQueueResponseTemplate.requestTemplate,然后執行?DefaultTbQueueResponseTemplate#init() 方法。

?2.2.4?DefaultTbQueueResponseTemplate

至此,我們找到了?InMemoryTbQueueConsumer#poll 調用的地方。

繼續往下,看看對于消息 requests,是怎么消費的。?

?2.2.5?DefaultTransportApiService

?通過 AccessToken 查找到設備的授權 DeviceCredentials (即device_credentials表記錄)然后構造 DeviceInfo 返回給設備端。

org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}

?

2.2.6 消費消息流程圖

3、總結

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/21248.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/21248.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/21248.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

7-15 位模式(dump_bits)---PTA實驗C++

一、題目描述 為方便調試位運算相關程序&#xff0c;先做個展現位模式的小工具。 建議參照以下接口實現&#xff1a; // 利用函數重載特性&#xff1a;string dump_bits(char x);string dump_bits(short x);string dump_bits(int x);string dump_bits(long long x);// 或用函…

JVM類加載過程

在Java虛擬機規范中&#xff0c;把描述類的數據從class文件加載到內存&#xff0c;并對數據進行校驗、轉換解析和初始化&#xff0c;最終形成可以被虛擬機直接使用的java.lang.Class對象&#xff0c;這個過程被稱作類加載過程。一個類在整個虛擬機周期內會經歷如下圖的階段&…

C++編程法則365天一天一條(323)main函數執行之前和之后的動作

在C和C程序中&#xff0c;main 函數之前和之后執行的函數是由編譯器、鏈接器和運行時環境共同決定的。以下是一些通常會在這些階段執行的關鍵函數&#xff1a; 在 main 函數之前執行的函數 啟動代碼&#xff08;Start-up Code&#xff09;: 這是由編譯器提供的一段代碼&#…

DIYP對接駱駝后臺IPTV管理,退出菜單中顯示用戶名已經網絡信息,MAC,剩余天數,套餐名稱等

演示&#xff1a;https://url03.ctfile.com/f/1779803-1042599473-4dc000?p8976 (訪問密碼: 8976) 后臺加上EPG&#xff0c;增加一些播放源的動態端口替換。 前臺app上&#xff0c;退出菜單中顯示用戶名已經網絡信息&#xff0c;MAC&#xff0c;剩余天數&#xff0c;套餐名稱…

Python知識點17---包

提前說一點&#xff1a;如果你是專注于Python開發&#xff0c;那么本系列知識點只是帶你入個門再詳細的開發點就要去看其他資料了&#xff0c;而如果你和作者一樣只是操作其他技術的Python API那就足夠了。 Python的包&#xff0c;你可以把它看成是一個大的模塊&#xff0c;它…

JAVA基礎|多線程

什么是線程&#xff1f; 線程&#xff08;Thread&#xff09;是一個程序內部的一條執行流程。 多線程是什么&#xff1f; 多線程是指從軟硬件上實現的多條執行流程的技術&#xff08;多條線程由CPU負責調度執行&#xff09; 一. 如何在程序中創建出多條線程&#xff1f; Ja…

新接手業務的線上Bug特別多怎么辦?

文章目錄 接手&#xff1a;保證質量順利過渡緊急質量審計臨時增加測試頻次灰度發布加強監控與預警建立快速反饋機制 打補丁&#xff1a;針對性解決質量問題Bug 分析與分類測試策略優化環境一致性 搞基建&#xff1a;全流程質量控制需求分析與評審設計階段的評審與驗證代碼質量控…

Windows10系統中安裝與配置PyTorch(無GPU版本)

文章目錄 1. 什么是PyTorch2. PyTorch的安裝與配置&#xff08;無GPU&#xff09;2.1 創建環境2.2 安裝pytorch庫&#xff08;無GPU&#xff09;2.3 驗證安裝結果 1. 什么是PyTorch PyTorch 是一種用于構建深度學習模型且功能完備的開源框架&#xff0c;通常用于處理圖像識別和…

JVM學習-自定義類加載器

為什么要自定義類加載器 隔離加載類 在某些框架內進行中間件與應用的模塊隔離&#xff0c;把類加載到不同的環境&#xff0c;如Tomcat這類Web應用服務器&#xff0c;內部自定義了好幾種類加載器&#xff0c;用于隔離同一個Web應用服務器上的不同應用程序 修改類加載的方式 …

OpenCV 的幾種查找圖像中輪廓邊緣的方法

原始圖片&#xff1a; 1、Sobel() Sobel 算子結合了高斯平滑和微分&#xff0c;用于計算圖像的梯度&#xff0c;從而突出顯示邊緣。 import cv2# 讀取圖像 image cv2.imread(image.png, cv2.IMREAD_GRAYSCALE)# 使用 Sobel 算子查找水平和垂直邊緣 sobel_x cv2.Sobel(image…

建筑企業有閑置資質怎么辦?

如果建筑企業擁有閑置資質&#xff0c;可以考慮以下幾種方式來充分利用這些資質&#xff1a; 1. 租賃或轉讓資質&#xff1a; 將閑置的建筑資質租賃給其他企業或個人使用&#xff0c;或者通過轉讓的方式將資質出售給有需要的企業或個人。 2. 提供咨詢服務&#xff1a; 利用建…

git分布式版本控制系統(四)

目前世界上最先進的分布式版本控制系統 官方網址&#xff1a;https://git-scm.com 學習目標&#xff1a; 1 了解 git 前世今生 2 掌握 git 基礎概念、基礎操作 3 各種 git 問題處理 4 互聯網常用 gitflow(工作流程規范) 5 git 代碼提交規范 6 git 分支管理及命名規范 常見問…

OneForall工具的下載安裝和使用(Windows和Linux)

目錄 OneForall的介紹 OneForall的下載 OneForall的安裝 安裝要求 安裝步驟&#xff08;git 版&#xff09; 安裝&#xff08;kali&#xff09; OneForall的使用命令 在Windows 在Linux&#xff08;kali&#xff09; OneForall的結果說明 免責聲明 本文所提供的文字和…

車輛前向碰撞預警系統性能要求和測試規程

前言 本文整理《GB/T 33577-2017 智能運輸系統-車輛前向碰撞預警系統性能要求和測試規程》國標文件關鍵信息,FCW系統性能和測試右給深層次的認識。 術語和定義 車輛前向碰撞預警系統 forward vehicle collision warning system自車 subject vehicle(SV)目標車輛 target ve…

【Linux】查找和壓縮

一、文件查找 1、命令查找 which 2、文件查找、依賴數據庫 locate 3、文件查找 find 語法&#xff1a;find [path] [options] [expression] [action] ①按文件名 -name按名 -iname可不區分大小寫 ②按文件大小 5M&#xff1a;5M以上文件 5M&#xff1a;5M文件 -…

高中數學:解三角形相關公式總結及用法總結

一、正弦定理 二、余弦定理 三、三角形面積公式 由正弦定理&#xff0c;可以推出三角形的面積公式&#xff1a; S*ab*sinC S*ac*sinB S*bc*sinA 四、使用方法總結 五、練習 例題1 解析 對條件等式進行變形&#xff0c;結合余弦定理&#xff0c;求出∠A的度數&#xff0c;從而…

【面經分享-CPP篇】[建議收藏!!] C++基礎20問-01

&#x1f36d; 大家好這里是清隆學長 &#xff0c;一枚熱愛算法的程序員 ? 本系列打算持續跟新c面試基礎 &#x1f44f; 感謝大家的訂閱? 和 喜歡&#x1f497; 文章目錄 1.題目&#xff1a;解釋C中的RAII機制。2.題目&#xff1a;解釋C中的智能指針及其類型。3.題目&#xf…

從內存到sql的upsert

業務的upsert ? 在寫業務時&#xff0c;大家一開始都會以順序流程的方式開始著手寫代碼&#xff0c;CR時再看代碼&#xff0c;會有不一樣的感覺。 1. 需求描述 ? 現有一張數據庫表&#xff0c;表字段結構如下&#xff1a; 字段名稱類型描述uuidstring數據的唯一鍵datastrin…

代碼隨想錄算法訓練營第四十六天|KM52. 攜帶研究材料、518. 零錢兌換 II、377. 組合總和 Ⅳ

代碼隨想錄算法訓練營第四十六天 KM52. 攜帶研究材料 題目鏈接&#xff1a;KM52. 攜帶研究材料 確定dp數組以及下標的含義&#xff1a;j的含義是當前背包的最大容量&#xff0c;dp[j]背包內物品的總價值確定遞推公式&#xff1a;背包最大容量固定為j&#xff0c;每個循環嘗試…

Nginx01-HTTP簡介與Nginx簡介(安裝、命令介紹、目錄介紹、配置文件介紹)

目錄 HTTP簡介HTTP原理查看訪問網站的詳細流程curl -vwget --debug 查看網站訪問量HTTP協議版本HTTP協議交互HTTP 請求請求報文起始行請求頭 HTTP響應響應報文起始行響應頭 Nginx常見的Web服務常見網站服務 安裝NginxNginx目錄結構Nginx啟動管理Nginx常用命令 Nginx配置文件主配…