EdgeX 規則引擎eKuiper
一、架構設計
LF Edge eKuiper 是物聯網數據分析和流式計算引擎。它是一個通用的邊緣計算服務或中間件,為資源有限的邊緣網關或設備而設計。
eKuiper 采用 Go 語言編寫,其架構如下圖所示:
eKuiper 是 Golang 實現的輕量級物聯網邊緣分析、流式處理開源軟件,可以運行在各類資源受限的邊緣設備上。eKuiper 基于源 (Source),SQL (業務邏輯處理), 目標 (Sink) 的方式來支持流式數據處理。
- 源(Source):流式數據的數據源,例如來自于 MQTT 服務器的數據。在 EdgeX 的場景下,數據源就是 EdgeX 消息總線(EdgeX message bus),可以是來自于 ZeroMQ 或者 MQTT 服務器;
- SQL:SQL 是你流式數據處理指定業務邏輯的地方,eKuiper 提供了 SQL 語句可以對數據進行抽取、過濾和轉換;
- 目標(Sink):目標用于將分析結果發送到特定的目標。例如,將分析結果發送到另外的 MQTT 服務器,或者一個 HTTP Rest 地址;
使用 eKuiper,一般需要完成以下三個步驟。
- 創建流,就是你定義數據源的地方
- 寫規則為數據分析寫 SQL
- 指定一個保存分析結果的目標
- 部署,并且運行規則
二、EdgeX集成eKuiper
在不同的微服務之間,EdgeX 使用消息總線進行數據交換。它包含了一個抽象的消息總線接口,并分別實現了 ZeroMQ 與 MQTT,在不同的微服務之間信息交互的支持。eKuiper 和 EdgeX 的集成工作包含了以下三部分,
- 擴展了一個 EdgeX 消息總線源,支持從 EdgeX 消息總線中接收數據
- 為了可以分析數據,eKuiper 需知道傳入的數據流的格式。一般來說,用戶最好在創建流的時候指定被分析的流數據的格式。
如下所示,一個 demo 流包含了一個名為 temperature 的字段。這與在關系型數據庫中創建表格定義的時候非常像。在創建了流定義以后,eKuiper 可以在編譯或者運行時對進入的數據進行類型檢查,相應錯誤也會報告給用戶。
CREATE STREAM demo (temperature bigint) WITH (FORMAT="JSON"...)
然而在 EdgeX 中,數據類型定義在 EdgeX event/reading 中已經指定,為了提升使用體驗,用戶可以在創建流的時候不指定數據類型。當接收到來自于消息總線的數據的時候,會根規則轉換為相應的數據類型。
- 擴展支持 EdgeX 消息總線目標(sink),用于將處理結果寫回至 EdgeX 消息總線。用戶也可以選擇將分析結果發送到 eKuiper 之前已經支持的 RestAPI 接口等。
三、使用eKuiper規則引擎控制設備
該章節描述了如何在 EdgeX 中使用 eKuiper 規則引擎,根據分析結果來實現對設備的控制。為了便于理解,該文章使用 device-virtual 示例,它對 device-virtual 服務發送的數據進行分析,然后根據由 eKuiper 規則引擎生成的分析結果來控制設備。
在本文中,將創建并運行以下兩條規則。
- 監視 Random-UnsignedInteger-Device 設備的規則,如果 uint8 值大于 20,則向 Random-Boolean-Device 設備發送命令,并開啟布爾值的隨機生成。
- 監視 Random-Integer-Device 設備的規則,如果每20秒 int8 的平均值大于0,則向 Random-Boolean-Device