整個連接過程如圖所示:
?高清圖片鏈接
1、環境準備
- thingsboard3.5.1 源碼啟動。(不懂怎么啟動的,大家可以看我的博文ThingsBoard3.5.1源碼啟動)
- MQTTX 客戶端(用來連接 thingsboard MQTT)
- 默認配置。queue.type=in-memory,cache.type=caffeine
因為我們的目的,是快速了解 thingsboard 的啟動過程,所以所有的配置全部采用默認的方式。默認消息隊列采用內存隊列ConcurrentHashMap,緩存也采用內存緩存caffeine。
使用 customerA 用戶賬號密碼登錄,使用設備A1 AccessToken 連接。
2、源碼分析
2.1 連接消息生產
2.1.1 入口
大家知道MQTT是基于TCP協議之上的輕量級通信協議,而TCP協議是面向連接、請求響應的通信協議。所以在 thingsboard 這一側必然有一個服務器實現,用來等待客戶端的連接。這個實現就是MqttTransportService
thingsboard 采用?netty 來實現一個MQTT server。
org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}
其中,關系到 netty server 性能的?bossGroupThreadCount,workerGroupThreadCount
thingsboard 提取出兩個參數變量?
NETTY_BOSS_GROUP_THREADS
NETTY_WORKER_GROUP_THREADS
方便用戶根據自己的設備臺數、部署架構,來優化自己的 netty 性能。
?netty server 的請求處理過程如下圖所示,圓圈為具體實現類,方框為方法。
在?MqttTransportHandler#processMqttMsg 方法中,因為我們的消息類型是連接,所以我們會進入?processConnect 方法。
org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}
?在?MqttTransportHandler#processConnect?方法中,由于采用 AccessToken 的授權方式,所以會進入?processAuthTokenConnect
在 MqttTransportHandler#processAuthTokenConnect?方法中,獲取我們在MQTTX填的用戶名、密碼,然后委托給?DefaultTransportService#process 處理
org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}
2.1.2?DefaultTransportService
?一路跟下去
DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法會調用 TbQueueProducer接口 send 方法,往主題?tb_transport.api.requests 發送消息。TbQueueProducer實現類是InMemoryTbQueueProducer
void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 將消息發送給消息隊列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的實現類有很多個,具體發送消息的實現類是哪一個呢? 因為我們使用內存隊列方式啟動,所以實現類是 InMemoryTbQueueProducer2、怎么確定發送的主題是?tb_transport.api.requests ?
主題是通過 requestTemplate獲取的
而?requestTemplate又是?DefaultTbQueueRequestTemplate的一個屬性,通過 Builder 構建器注入進來的。
對于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同種消息隊列的實現方式。我們現在所用的是內存隊列,所以進入InMemoryTbTransportQueueFactory
在InMemoryTbTransportQueueFactory中,對于DefaultTbQueueRequestTemplate.requestTemplate
的初始化,使用的是TbQueueTransportApiSettings的配置。
而 requestsTopic 讀取的,就是?tb_transport.api.requests 這一主題。
3、更進一步
認真分析初始化過程,得出下面請求主題的初始化圖。
2.1.3?InMemoryTbQueueProducer
InMemoryTbQueueProducer#send 調用?DefaultInMemoryStorage#put 方法?
DefaultInMemoryStorage 往自己持有的?ConcurrentHashMap 中存放消息,
key 是主題 tb_transport.api.requests,value 是存放有消息的?LinkedBlockingQueue 內存隊列
2.1.4 一個更抽象的發送模型
TbQueueProducer 往隊列 queue 發送消息,主題 tb_transport.api.requests,而不管這個消息的實現是內存隊列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 從queue中消費消息。至此,生產連接請求消息的過程結束。
2.2?消費消息
2.2.1?InMemoryTbQueueConsumer
我們知道現在消息生產者接口?TbQueueProducer 的實現類是?InMemoryTbQueueProducer,則它必然有一個消息消費者實現接口?TbQueueConsumer,消費者實現類是 InMemoryTbQueueConsumer
?InMemoryTbQueueConsumer 中對于消息的消費只有把消息從 ConcurrentHashMap 拉取出來的邏輯,而沒有具體處理的邏輯,則處理的邏輯,是存在于調用這個 poll 方法的地方。
org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}
poll 方法的調用端,全局是搜不到的。
?我們可以探究一下它的構造方法,看看誰初始化了它,則誰就有可能調用它的 poll 方法。排除掉它自己,有兩個類初始化了?InMemoryTbQueueConsumer,分別是?InMemoryMonolithQueueFactory 和?InMemoryTbTransportQueueFactory。
InMemoryTbTransportQueueFactory 訂閱的主題,是?tb_transport.api.responses 不是我們要找的?tb_transport.api.requests,忽略。
2.2.2?InMemoryMonolithQueueFactory
我們先來看一下?InMemoryMonolithQueueFactory。InMemoryMonolithQueueFactory 里面有一個方法,傳入的 TbQueueTransportApiSettings,剛好就是我們請求消息的主題配置類。
org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}
查看對方法?createTransportApiRequestConsumer 的調用,找到一個非具體隊列實現的調用類TbCoreTransportApiService
2.2.3 TbCoreTransportApiService
TbCoreTransportApiService 初始化 init 方法,會創建 TbQueueConsumer——也就是具體的實現類 InMemoryTbQueueConsumer 注入到?DefaultTbQueueResponseTemplate.requestTemplate,然后執行?DefaultTbQueueResponseTemplate#init() 方法。
?2.2.4?DefaultTbQueueResponseTemplate
至此,我們找到了?InMemoryTbQueueConsumer#poll 調用的地方。
繼續往下,看看對于消息 requests,是怎么消費的。?
?2.2.5?DefaultTransportApiService
?通過 AccessToken 查找到設備的授權 DeviceCredentials (即device_credentials表記錄)然后構造 DeviceInfo 返回給設備端。
org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}
?