為了更好的閱讀體驗,建議移步至筆者的博客閱讀:JetLinks設備接入的認識與理解
1、認識 JetLinks
1.1、官網文檔
官網:https://www.jetlinks.cn/
JetLinks 有兩個產品:JetLinks-lot和JetLinks-view
官方文檔:
-
JetLinks 物聯網基礎平臺
-
JetLinks 物聯網平臺開發手冊
1.2、JetLinks
JetLinks 是可支持多種方式接入設備的物聯網設備管理平臺
https://hanta.yuque.com/px7kg1/yfac2l/fwqriw24lp3cy2lw
JetLinks IOT 是一個開源的、企業級的物聯網平臺,它集成了設備管理、數據安全通信、消息訂閱、規則引擎等一系列物聯網核心能力,支持以平臺適配設備的方式連接海量設備,采集設備數據上云,提供云端API,通過調用云端API實現遠程控制。JetLinks物聯網平臺還支持多種設備接入協議,并提供了豐富的協議庫。
支持:多協議(MQTT、HTTP、CoAP、UDP、TCP、WebSocket)自定義編解碼插件接入;
支持:云平臺對接接入;
支持: ModBus/TCP、OPC UA通道接入;
支持:基于GB/T 28181國標協議視頻接入;
支持:自研邊緣計算網關接入。
1.3、產品架構的理解
https://hanta.yuque.com/px7kg1/yfac2l/tvlxz93cht8zyl94
1.3.1、理解1
通過不同層級功能職責的封裝、組合,以支持多設備、多協議接入平臺
-
設備連接層:支持MQTT、TCP、UDP、CoAP、HTTP、WebSocket協議,提供統一設備接入的能力。
-
設備管理層:提供設備注冊、配置、維護和監控的功能,支持設備屬性、狀態實時展示和歷史屬性、設備日志記錄查詢等。
-
業務邏輯層:提供規則引擎、數據轉發和數據解析等功能,支持多種業務場景下的數據處理和交互操作。
-
應用開發層:提供RESTAPI和WebSocket接口,支持前端對接和自定義應用開發。同時還提供了可視化的數據展示和操作頁面,方便用戶快速搭建物聯網應用系統。
1.3.2、理解2
設備接入JetLinks物聯網平臺后,可實現:設備通訊、數據的采集、認證、流轉、存儲、分析和實時監控
13.3、理解3
開發者需要自行實現編解碼器邏輯,才可以讓平臺對設備數據進行全面管理
1.4、基本概念
https://hanta.yuque.com/px7kg1/yfac2l/dagxgfzc3vnul0sn
1.4.1、產品
產品是指一組具有相同功能和規格的設備集合,通常由同一家生產廠家制造。
設備可能是傳感器、執行器、控制器等各種不同類型的物聯網設備,它們可以通過網絡連接到物聯網平臺。通過將這些設備組合到一個產品中,企業可以對這些設備進行統一管理和監控,以便更有效地控制其行為和狀態。
1.4.2、設備
設備是指物理存在的、可通過網絡連接的單個物聯網設備。
設備可以是各種類型的物品,例如傳感器、執行器、控制器等。這些設備通過物聯網連接到平臺,以便與其他設備或應用程序進行通信、交換數據和接收命令。
1.4.3、物模型
物模型說明:http://doc.jetlinks.cn/function-description/metadata_description.html
物模型是物理空間中的實體在云端的數字化表示,有 4 個緯度:屬性、功能、事件、標簽。
-
屬性:用于描述設備運行時具體信息和狀態。例如溫濕度傳感器包含“溫度”、“濕度”兩個屬性。
-
功能:設備可被外部調用的能力或方法,可設置輸入參數和輸出參數。相比于屬性,服務可通過一條指令實現更復雜的業務邏輯
-
事件:用于描述設備上報云端的多個參數,多用于復雜報文結構或設備本身在某個閾值觸發的報文。
-
標簽:統一為設備添加拓展字段,添加后將在設備信息頁顯示。
2、開發手冊
社區版后端工程:
- github 倉庫:https://github.com/jetlinks/jetlinks-community
- gitee 倉庫:https://gitee.com/jetlinks/jetlinks-community
2.1、模塊說明
社區版系統模塊說明:https://hanta.yuque.com/px7kg1/nn1gdr/gfqb3xmxg8fsvyxf#lR7Pd
- jetlinks-components # 組件庫
- common-component # 通用組件、工具類等
- configure-component # 統一配置模塊
- dashboard-component # 儀表盤模塊
- elasticsearch-component # ElasticSearch集成
- gateway-component # 網關模塊,統一定義網關接口等信息
- io-component # IO模塊,文件管理等
- logging-component # 日志模塊
- network-component # 網絡組件模塊,統一定義網絡組件規范以及默認實現
- http-component # http模塊
- mqtt-component # mqtt模塊
- network-core # 網絡組件核心模塊
- tcp-component # tcp模塊
- notify-component # 通知模塊,統一定義通知規范以及默認實現
- notify-core # 通知模塊核心
- notify-dingtalk # 釘釘通知模塊
- notify-email # 郵件通知模塊
- notify-sms # 短信通知模塊
- notify-voice # 語音通知模塊
- notify-webhook # webhook通知模塊
- notify-wechat # 微信通知模塊
- protocol-component # 協議模塊
- relation-component # 關系模塊,用于描述物與物之間的關系
- rule-engine-component # 規則引擎模塊,集成規則引擎通用功能
- script-component # 腳本模塊,封裝腳本引擎
- tdengine-component # 對tdengine的支持
- things-component # 物管理模塊
- timeseries-component # 時序數據組件
- jetlinks-manager # 管理功能
- authentication-manager # 用戶,權限管理模塊
- device-manager # 設備管理模塊
- logging-manager # 日志管理模塊
- network-manager # 網絡組件管理模塊
- notify-manager # 通知管理模塊
- rule-engine-manager # 規則引擎管理模塊
- jetlinks-standalone #單例模塊,啟動JetLinks平臺
2.2、技術選型
技術棧 | 描述 |
---|---|
Java8 | 編程語言 |
hsweb Framework | 業務基礎框架 |
Spring Boot 2.7.x | 響應式web支持 |
vert.x,netty | 高性能網絡框架 |
R2DBC | 關系型數據庫響應式驅動 |
Postgresql | 關系型數據庫,可更換為mysql、sqlserver |
ElasticSearch | 設備數據與日志存儲,可更換為其他中間件 |
Redis | 用戶信息與權限緩存、設備注冊中心緩存 |
scalecube | 基于JVM的分布式服務框架,支持響應式 |
micrometer | 監控指標框架 |
2.3、必要的開發知識
響應式編程:http://doc.jetlinks.cn/dev-guide/reactor.html
事件驅動:http://doc.jetlinks.cn/dev-guide/event-driver.html
添加自定義模塊:https://hanta.yuque.com/px7kg1/dev/wdymp6flcfa1vwh5
3、設備接入流程
設備接入流程:http://doc.jetlinks.cn/function-description/device_message_description.html#%E8%AE%BE%E5%A4%87%E6%8E%A5%E5%85%A5%E6%B5%81%E7%A8%8B
HTTP協議設備接入:https://hanta.yuque.com/px7kg1/yfac2l/qlr6nz5btr5rwrgk
3.1、流程圖
3.2、開發:協議包
開發者自行實現自定義協議,官方教程:http://doc.jetlinks.cn/dev-guide/custom-message-protocol.html
官方提供了協議開發示例工程:https://github.com/jetlinks/jetlinks-official-protocol
JetLinks 官方協議 jar 包:https://github.com/jetlinks/jetlinks-official-protocol/blob/v3/package/jetlinks-official-protocol-3.0.0.jar
-
編寫
自定義編解碼器
:創建
org.jetlinks.core.message.codec.DeviceMessageCodec
接口實現類,重寫encode()
、decode()
、getSupportTransport()
方法 -
編寫
協議的元信息
創建
org.jetlinks.core.metadata.DefaultConfigMetadata
對象并設置對應屬性 -
編寫
自定義設備協議支持提供商
:創建
org.jetlinks.core.spi.ProtocolSupportProvider
接口實現類,并重寫create()
方法,在
create()
方法中將:將自定義編解碼器
注冊到協議中 -
配置路由配置:
在
org.jetlinks.core.spi.ProtocolSupportProvider
接口實現類的create()
方法中創建org.jetlinks.core.defaults.CompositeProtocolSupport
對象,在其中配置路由配置、身份認證(可選)
3.3、添加:協議包
將協議包上傳到協議管理中
3.4、添加:網絡組件
- 配置:本地和公網的接口地址、端口號
- 配置:接口處理的服務類型
3.5、添加:設備接入網關
將上述的協議包和網絡組件進行綁定
3.6、添加:產品
-
配置:產品信息
-
綁定:上述的自定義網絡組件(官方定義:設備接入)
-
配置:認證信息
-
配置存儲策略
-
配置:物模型
- 屬性定義
- 功能定義
- 事件定義
- 標簽定義
-
啟用:產品
行式存儲
ElasticSearch-行式存儲是系統默認情況下使用的存儲方案。每一個屬性值都保存為一條索引記錄。
典型應用場景:設備每次只會上報一部分屬性, 以及支持讀取部分屬性數據的時候。
列式存儲
一個屬性作為一列,一條屬性消息作為一條索引記錄進行存儲。
典型應用場景:適合設備每次都上報所有的屬性值的場景。
3.7、添加:設備
- 配置:設備ID、名稱
- 綁定:上述配置好的產品(只能配置狀態是正常的產品,即已啟用的產品)
- 啟動:設備
- (可選)默認繼承了所屬產品的物模型。可以配置專屬當前設備的物模型
4、理解協議包
關于協議包:https://hanta.yuque.com/px7kg1/nn1gdr/kcqv8dn8y6778t2a
協議包主要包含 4 個部分
-
數據傳輸協議:協議包約定了常見的網絡通信協議,例如MQTT、HTTP、TCP、CoAP等,來實現物聯網設備與JetLinks平臺之間的數據傳輸。開發者可根據設備實際情況選擇對應的通信協議。
-
數據解析標準:協議包定義了一套設備數據解析標準,使得各種類型的物聯網設備通過網絡協議傳輸至JetLinks后,根據協議包內的數據解析標準將不同類型的報文轉換成平臺統一的消息。
-
設備管理功能:協議包內可以獲取平臺內定義的設備數據,包括設備信息、設備配置、設備狀態等,方便開發者在接入設備時獲取設備相關數據進行自定義的業務邏輯處理。
-
身份認證:協議包支持物聯網設備的身份認證,用戶可以在協議包內編寫身份認證邏輯來驗證連接的客戶端身份,以保護設備和數據的安全。
5、理解自定義編解碼器開發流程
5.1、步驟1
自定義 DeviceMessageCodec 接口實現類,重寫 encode()、decode() 方法
5.1.1、消息編碼
重寫 DeviceMessageCodec 接口中的encode()
方法
5.1.2、消息解碼
重寫 DeviceMessageCodec 接口中的decode()
方法
5.2、步驟2
自定義 ProtocolSupportProvider 接口實現類,配置元數據信息
5.3、步驟3
配置路由與 DeviceMessage 的綁定關系
5.4、步驟4
自定義 Authenticator 接口實現并配置
6、理解編解碼涉及的核心類關系
協議加載設計:https://hanta.yuque.com/px7kg1/nn1gdr/gascdx49ia6u4lsf
平臺統一設備消息定義:http://doc.jetlinks.cn/function-description/device_message_description.html
7、協議包上傳邏輯分析
7.1、步驟1:上傳協議 jar
后端接口
POST
/api/file/upload
請求報文
表單請求,接收參數名為:file 的文件數據對象
Content-Disposition: form-data; name="file"; filename="jetlinks-official-protocol-3.0.0.jar"
Content-Type: application/octet-stream
接口類:org.jetlinks.community.io.file.web.FileManagerController#upload
處理邏輯
-
獲取文件信息,并將文件數據保存到本地指定目錄
-
默認文件目錄為:./data/files/yyyyMMdd/
-
重命名 jar 文件名,生成規則:md5(uuid())
-
計算當前文件的 md5 和 sha256 值
-
-
將文件相關信息保存到數據庫中,數據對象:
org.jetlinks.community.io.file.FileEntity
-
保存成功的文件數據記錄主鍵和文件信息一起通過接口返回
響應報文
返回文件數據相關記錄信息,核心信息:
{ "message": "success", "result": { "id": "9c9ce661a1fadb8019ca50145b33a074", "name": "jetlinks-official-protocol-3.0.0.jar", "extension": "jar", "length": 102512, "md5": "24504ceb0d6570b84b86e6180d9fca9f", "sha256": "fb0c6144ad056326e26eb829c13759b5080da095c7bb02386c7f064ac059f24e", "createTime": 1699859432789, "creatorId": "1199596756811550720", "options": [], "others": { "accessKey": "c24b19b0c91119c6673fa1a06a4d2ae0" } }, "status": 200, "timestamp": 1699859454032 }
7.2、步驟2:確定協議
7.2.1、后端接口
PATCH
/api/protocol
接口類:org.jetlinks.community.device.web.ProtocolSupportController
7.2.2、請求報文
{"id": "1722876422724329472","name": "官方協議v3.0","description": "","type": "jar","state": 1,"creatorId": "1199596756811550720","createTime": 1699600723328,"configuration": {"location": "http://localhost:5173/api/file/9c9ce661a1fadb8019ca50145b33a074?accessKey=c24b19b0c91119c6673fa1a06a4d2ae0"}
}
7.2.3、處理邏輯
- 將前端請求的文件信息保存到數據庫中,數據對象:
org.jetlinks.community.device.entity.ProtocolSupportEntity
前端邏輯:將步驟1 的響應結果拼接成:文件地址(用戶不可編輯)+ 用戶填寫的協議包基本信息(名稱、類型、說明)
org.jetlinks.community.device.web.ProtocolSupportController
實現了org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口。
org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口又繼承了三個接口:org.hswebframework.web.crud.web.reactive.ReactiveServiceSaveController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceDeleteController
ProtocolSupportController
package org.jetlinks.community.device.web;import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.jetlinks.community.device.service.LocalProtocolSupportService;@RestController
@RequestMapping("/protocol")
public class ProtocolSupportControllerimplements ReactiveServiceCrudController<ProtocolSupportEntity, String> {@Autowired@Getterprivate LocalProtocolSupportService service;}
ReactiveServiceCrudController
package org.hswebframework.web.crud.web.reactive;public interface ReactiveServiceCrudController<E, K> extendsReactiveServiceSaveController<E, K>,ReactiveServiceQueryController<E, K>,ReactiveServiceDeleteController<E, K> {
}
PATH /api/protocol
接口實際由:ReactiveServiceSaveController
接口提供的默認 save()
方法處理數據,最終調用getService()
方法進行save()
操作。
package org.hswebframework.web.crud.web.reactive;import org.hswebframework.web.authorization.annotation.Authorize;public interface ReactiveServiceSaveController<E, K> {@Authorize(ignore = true)ReactiveCrudService<E, K> getService();@PatchMapping@Operation(summary = "保存數據", description = "如果傳入了id,并且對應數據存在,則嘗試覆蓋,不存在則新增.")default Mono<SaveResult> save(@RequestBody Flux<E> payload) {return Authentication.currentReactive().flatMapMany(auth -> payload.map(entity -> applyAuthentication(entity, auth))).switchIfEmpty(payload).as(getService()::save);}}
由于ProtocolSupportController
注入了org.jetlinks.community.device.service.LocalProtocolSupportService
,并且屬性名為:service,因此ProtocolSupportController
的getService()
就是ReactiveServiceSaveController
接口的getService()
方法實現。顯而易見,確定協議的核心邏輯就在:LocalProtocolSupportService
的save()
方法。
org.jetlinks.community.device.service.LocalProtocolSupportService
類繼承了org.hswebframework.web.crud.service.GenericReactiveCrudService
抽象類,而GenericReactiveCrudService
抽象類又實現了org.hswebframework.web.crud.service.ReactiveCrudService
接口,在ReactiveCrudService
中有save()
方法
ProtocolSupportController
package org.jetlinks.community.device.service;import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;@Service
public class LocalProtocolSupportService extends GenericReactiveCrudService<ProtocolSupportEntity, String> {@Autowiredprivate ProtocolSupportManager supportManager;@Autowiredprivate DataReferenceManager referenceManager;}
GenericReactiveCrudService
package org.hswebframework.web.crud.service;import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.springframework.beans.factory.annotation.Autowired;public abstract class GenericReactiveCrudService<E, K> implements ReactiveCrudService<E, K> {@Autowiredprivate ReactiveRepository<E, K> repository;@Overridepublic ReactiveRepository<E, K> getRepository() {return repository;}}
GenericReactiveCrudService 注入了 ReactiveRepository 接口,該接口的實現類為:org.hswebframework.ezorm.rdb.mapping.defaults.DefaultReactiveRepository
,里面實現了save()
方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;public class DefaultReactiveRepository<E, K> extends DefaultRepository<E> implements ReactiveRepository<E, K> {@Overridepublic Mono<SaveResult> save(Publisher<E> data) {return Flux.from(data).collectList().filter(CollectionUtils::isNotEmpty).flatMap(list -> doSave(list).reactive().as(this::setupLogger)).defaultIfEmpty(SaveResult.of(0, 0));}
}
上述doSave()
方法是org.hswebframework.ezorm.rdb.mapping.defaults.DefaultRepository
抽象類提供的默認方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;import org.hswebframework.ezorm.rdb.mapping.events.EventResultOperator;public abstract class DefaultRepository<E> {protected SaveResultOperator doSave(Collection<E> data) {RDBTableMetadata table = getTable();UpsertOperator upsert = operator.dml().upsert(table.getFullName());return EventResultOperator.create(() -> {upsert.columns(getProperties());List<String> ignore = new ArrayList<>();for (E e : data) {upsert.values(Stream.of(getProperties()).map(property -> getInsertColumnValue(e, property, (prop, val) -> ignore.add(prop))).toArray());}upsert.ignoreUpdate(ignore.toArray(new String[0]));return upsert.execute();},SaveResultOperator.class,table,MappingEventTypes.save_before,MappingEventTypes.save_after,getDefaultContextKeyValue(instance(data),type("batch"),tableMetadata(table),upsert(upsert)));}
}
上述EventResultOperator
的create()
方法中,發布了EntitySavedEvent<E>
事件(通過 Spring的ApplicationEventPublisher 發送事件)。
在org.jetlinks.community.device.service.ProtocolSupportHandler
中訂閱了EntitySavedEvent<ProtocolSupportEntity>
事件:
package org.jetlinks.community.device.service;import lombok.AllArgsConstructor;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;@Component
@AllArgsConstructor
public class ProtocolSupportHandler {private final DataReferenceManager referenceManager;private ProtocolSupportLoader loader;private ProtocolSupportManager supportManager;@EventListenerpublic void handleCreated(EntityCreatedEvent<ProtocolSupportEntity> event) {event.async(reloadProtocol(event.getEntity()));}@EventListenerpublic void handleSaved(EntitySavedEvent<ProtocolSupportEntity> event) {event.async(reloadProtocol(event.getEntity()));}@EventListenerpublic void handleModify(EntityModifyEvent<ProtocolSupportEntity> event) {event.async(reloadProtocol(event.getAfter()));}// 重新加載協議private Mono<Void> reloadProtocol(Collection<ProtocolSupportEntity> protocol) {return Flux.fromIterable(protocol).filter(entity -> entity.getState() != null).map(entity -> entity.getState() == 1 ? entity.toDeployDefinition() : entity.toUnDeployDefinition()).flatMap(def -> loader//加載一下檢驗是否正確,然后就卸載.load(def).doOnNext(ProtocolSupport::dispose).thenReturn(def)).onErrorMap(err -> new BusinessException("error.unable_to_load_protocol", 500, err.getMessage())).flatMap(supportManager::save).then();}}
上述ProtocolSupportLoader
接口的實現類為org.jetlinks.community.protocol.SpringProtocolSupportLoader
,其中load()
方法會動態加載 jar 包為org.jetlinks.core.spi.ProtocolSupportProvider
接口實現,并執行create()
方法。
7.2.4、響應報文
{"message": "success","result": {"added": 0,"updated": 1,"total": 1},"status": 200,"timestamp": 1699859463951
}
8、加載協議包時機
8.1、加載協議包時機1
通過org.jetlinks.community.device.service.ProtocolSupportHandler
監聽EntityCreatedEvent<ProtocolSupportEntity>
、EntitySavedEvent<ProtocolSupportEntity>
、EntityModifyEvent<ProtocolSupportEntity>
事件,調用ProtocolSupportLoader
的load()
方法加載協議
在 ProtocolSupportLoader 的 load() 方法中:會調用 org.jetlinks.core.spi.ProtocolSupportProvider 接口實現,并執行 create() 方法
8.2、加載協議包時機2
通過org.jetlinks.community.protocol.LazyInitManagementProtocolSupports
實現org.springframework.boot.CommandLineRunner
接口,在項目啟動時執行init()
方法,調用ProtocolSupportLoader
的load()
方法加載協議
在 ProtocolSupportLoader 的 load() 方法中:會調用 org.jetlinks.core.spi.ProtocolSupportProvider 接口實現,并執行 create() 方法
9、設備網關加載機制
- 通過 DeviceGatewayEventHandler 實現 CommandLineRunner 接口,在項目啟動時執行 init() 方法
- 通過 DeviceGatewayEventHandler 監聽 DeviceGatewayEntity 的保存、創建、更新事件
為了更好的閱讀體驗,建議移步至筆者的博客閱讀:JetLinks設備接入的認識與理解