簡介
TDengine 中的流計算,功能相當于簡化版的 FLINK , 具有實時計算,計算結果可以輸出到超級表中存儲,同時也可用于窗口預計算,加快查詢速度。
創建流式計算
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name[(field1_name, field2_name [PRIMARY KEY], ...)] [TAGS (create_definition [, create_definition] ...)] SUBTABLE(expression) AS subquery
stream_options: {TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time | FORCE_WINDOW_CLOSE]WATERMARK timeIGNORE EXPIRED [0|1]DELETE_MARK timeFILL_HISTORY [0|1]IGNORE UPDATE [0|1]
}
其中 subquery 是 select 普通查詢語法的子集。
subquery: SELECT select_listfrom_clause[WHERE condition][PARTITION BY tag_list]window_clause
支持會話窗口、狀態窗口、滑動窗口、事件窗口和計數窗口。其中,狀態窗口、事件窗口和計數窗口搭配超級表時必須與 partition by tbname 一起使用。對于數據源表是復合主鍵的流,不支持狀態窗口、事件窗口、計數窗口的計算。
stb_name 是保存計算結果的超級表的表名,如果該超級表不存在,會自動創建;如果已存在,則檢查列的 schema 信息。詳見 寫入已存在的超級表。
TAGS 子句定義了流計算中創建TAG的規則,可以為每個 partition 對應的子表生成自定義的TAG值,詳見 自定義 TAG
create_definition:col_name column_definition
column_definition:type_name [COMMENT 'string_value']
subtable 子句定義了流式計算中創建的子表的命名規則,詳見 流式計算的 partition。
window_clause: {SESSION(ts_col, tol_val)| STATE_WINDOW(col)| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition| COUNT_WINDOW(count_val[, sliding_val])
}
其中:
-
SESSION 是會話窗口,tol_val 是時間間隔的最大范圍。在 tol_val 時間間隔范圍內的數據都屬于同一個窗口,如果連續的兩條數據的時間超過 tol_val,則自動開啟下一個窗口。該窗口的 _wend 等于最后一條數據的時間加上 tol_val。
-
STATE_WINDOW 是狀態窗口,col 用來標識狀態量,相同的狀態量數值則歸屬于同一個狀態窗口,col 數值改變后則當前窗口結束,自動開啟下一個窗口。
-
INTERVAL 是時間窗口,又可分為滑動時間窗口和翻轉時間窗口。INTERVAL 子句用于指定窗口相等時間周期,SLIDING 字句用于指定窗口向前滑動的時間。當 interval_val 與 sliding_val 相等的時候,時間窗口即為翻轉時間窗口,否則為滑動時間窗口,注意:sliding_val 必須小于等于 interval_val。
-
EVENT_WINDOW 是事件窗口,根據開始條件和結束條件來劃定窗口。當 start_trigger_condition 滿足時則窗口開始,直到 end_trigger_condition 滿足時窗口關閉。start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的條件表達式,且可以包含不同的列。
-
COUNT_WINDOW 是計數窗口,按固定的數據行數來劃分窗口。count_val 是常量,是正整數,必須大于等于 2,小于 2147483648。count_val 表示每個 COUNT_WINDOW 包含的最大數據行數,總數據行數不能整除 count_val 時,最后一個窗口的行數會小于 count_val。sliding_val 是常量,表示窗口滑動的數量,類似于 INTERVAL 的 SLIDING。
窗口的定義與時序數據特色查詢中的定義完全相同,詳見 TDengine 特色查詢
例如,如下語句創建流式計算。第一個流計算,自動創建名為 avg_vol 的超級表,以一分鐘為時間窗口、30 秒為前向增量統計這些電表的平均電壓,并將來自 meters 表的數據的計算結果寫入 avg_vol 表,不同 partition 的數據會分別創建子表并寫入不同子表。
第二個流計算,自動創建名為 streamt0 的超級表,將數據按時間戳的順序,以 voltage < 0 作為窗口的開始條件,voltage > 9 作為窗口的結束條件,劃分窗口做聚合運算,并將來自 meters 表的數據的計算結果寫入 streamt0 表,不同 partition 的數據會分別創建子表并寫入不同子表。
第三個流計算,自動創建名為 streamt1 的超級表,將數據按時間戳的順序,以 10 條數據為一組,劃分窗口做聚合運算,并將來自 meters 表的數據的計算結果寫入 streamt1 表,不同 partition 的數據會分別創建子表并寫入不同子表。
CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);CREATE STREAM streams0 INTO streamt0 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname EVENT_WINDOW START WITH voltage < 0 END WITH voltage > 9;CREATE STREAM streams1 IGNORE EXPIRED 1 WATERMARK 100s INTO streamt1 AS
SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WINDOW(10);
流式計算的 partition
可以使用 PARTITION BY TBNAME,tag,普通列或者表達式,對一個流進行多分區的計算,每個分區的時間線與時間窗口是獨立的,會各自聚合,并寫入到目的表中的不同子表。
不帶 PARTITION BY 子句時,所有的數據將寫入到一張子表。
在創建流時不使用 SUBTABLE 子句時,流式計算創建的超級表有唯一的 tag 列 groupId,每個 partition 會被分配唯一 groupId。與 schemaless 寫入一致,我們通過 MD5 計算子表名,并自動創建它。
若創建流的語句中包含 SUBTABLE 子句,用戶可以為每個 partition 對應的子表生成自定義的表名,例如:
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
PARTITION 子句中,為 tbname 定義了一個別名 tname, 在 PARTITION 子句中的別名可以用于 SUBTABLE 子句中的表達式計算,在上述示例中,流新創建的子表將以前綴 ‘new-’ 連接原表名作為表名(從 v3.2.3.0 開始,為了避免 SUBTABLE 中的表達式無法區分各個子表,即誤將多個相同時間線寫入一個子表,在指定的子表名后面加上 _stableName_groupId)。
注意,子表名的長度若超過 TDengine 的限制,將被截斷。若要生成的子表名已經存在于另一超級表,由于 TDengine 的子表名是唯一的,因此對應新子表的創建以及數據的寫入將會失敗。
流式計算讀取歷史數據
正常情況下,流式計算不會處理創建前已經寫入源表中的數據,若要處理已經寫入的數據,可以在創建流時設置 fill_history 1 選項,這樣創建的流式計算會自動處理創建前、創建中、創建后寫入的數據。流計算處理歷史數據的最大窗口數是 2000 萬,超過限制會報錯。例如:
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s)
結合 fill_history 1 選項,可以實現只處理特定歷史時間范圍的數據,例如:只處理某歷史時刻(2020 年 1 月 30 日)之后的數據
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' interval(10s)
再如,僅處理某時間段內的數據,結束時間可以是未來時間
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s)
如果該流任務已經徹底過期,并且您不再想讓它檢測或處理數據,您可以手動刪除它,被計算出的數據仍會被保留。
刪除流式計算
DROP STREAM [IF EXISTS] stream_name;
僅刪除流式計算任務,由流式計算寫入的數據不會被刪除。
展示流式計算
SHOW STREAMS;
若要展示更詳細的信息,可以使用:
SELECT * from information_schema.`ins_streams`;
流式計算的觸發模式
在創建流時,可以通過 TRIGGER 指令指定流式計算的觸發模式。
對于非窗口計算,流式計算的觸發是實時的;對于窗口計算,目前提供 4 種觸發模式,默認為 WINDOW_CLOSE。
-
AT_ONCE:寫入立即觸發
-
WINDOW_CLOSE:窗口關閉時觸發(窗口關閉由事件時間決定,可配合 watermark 使用)
-
MAX_DELAY time:若窗口關閉,則觸發計算。若窗口未關閉,且未關閉時長超過 max delay 指定的時間,則觸發計算。
-
FORCE_WINDOW_CLOSE:以操作系統當前時間為準,只計算當前關閉窗口的結果,并推送出去。窗口只會在被關閉的時刻計算一次,后續不會再重復計算。該模式當前只支持 INTERVAL 窗口(不支持滑動);FILL_HISTORY 必須為 0,IGNORE EXPIRED 必須為 1,IGNORE UPDATE 必須為 1;FILL 只支持 PREV、NULL、NONE、VALUE。
由于窗口關閉是由事件時間決定的,如事件流中斷、或持續延遲,則事件時間無法更新,可能導致無法得到最新的計算結果。
因此,流式計算提供了以事件時間結合處理時間計算的 MAX_DELAY 觸發模式。MAX_DELAY 最小時間是 5s,如果低于 5s,創建流計算時會報錯。
MAX_DELAY 模式在窗口關閉時會立即觸發計算。此外,當數據寫入后,計算觸發的時間超過 max delay 指定的時間,則立即觸發計算
流式計算的窗口關閉
流式計算以事件時間(插入記錄中的時間戳主鍵)為基準計算窗口關閉,而非以 TDengine 服務器的時間,以事件時間為基準,可以避免客戶端與服務器時間不一致帶來的問題,能夠解決亂序數據寫入等等問題。流式計算還提供了 watermark 來定義容忍的亂序程度。
在創建流時,可以在 stream_option 中指定 watermark,它定義了數據亂序的容忍上界。
流式計算通過 watermark 來度量對亂序數據的容忍程度,watermark 默認為 0。
T = 最新事件時間 - watermark
每次寫入的數據都會以上述公式更新窗口關閉時間,并將窗口結束時間 < T 的所有打開的窗口關閉,若觸發模式為 WINDOW_CLOSE 或 MAX_DELAY,則推送窗口聚合結果。
圖中,縱軸表示不同時刻,對于不同時刻,我們畫出其對應的 TDengine 收到的數據,即為橫軸。
橫軸上的數據點表示已經收到的數據,其中藍色的點表示事件時間(即數據中的時間戳主鍵)最后的數據,該數據點減去定義的 watermark 時間,得到亂序容忍的上界 T。
所有結束時間小于 T 的窗口都將被關閉(圖中以灰色方框標記)。
T2 時刻,亂序數據(黃色的點)到達 TDengine,由于有 watermark 的存在,這些數據進入的窗口并未被關閉,因此可以被正確處理。
T3 時刻,最新事件到達,T 向后推移超過了第二個窗口關閉的時間,該窗口被關閉,亂序數據被正確處理。
在 window_close 或 max_delay 模式下,窗口關閉直接影響推送結果。在 at_once 模式下,窗口關閉只與內存占用有關。
流式計算對于過期數據的處理策略
對于已關閉的窗口,再次落入該窗口中的數據被標記為過期數據.
TDengine 對于過期數據提供兩種處理方式,由 IGNORE EXPIRED 選項指定:
-
增量計算,即 IGNORE EXPIRED 0。
-
直接丟棄,即 IGNORE EXPIRED 1:默認配置,忽略過期數據
無論在哪種模式下,watermark 都應該被妥善設置,來得到正確結果(直接丟棄模式)或避免頻繁觸發重算帶來的性能開銷(重新計算模式)。
流式計算對于修改數據的處理策略
TDengine 對于修改數據提供兩種處理方式,由 IGNORE UPDATE 選項指定:
-
檢查數據是否被修改,即 IGNORE UPDATE 0,如果數據被修改,則重新計算對應窗口。
-
不檢查數據是否被修改,全部按增量數據計算,即 IGNORE UPDATE 1,默認配置。
寫入已存在的超級表
[field1_name, ...]
在本頁文檔頂部的 [field1_name, …] 是用來指定 stb_name 的列與 subquery 輸出結果的對應關系的。如果 stb_name 的列與 subquery 輸出結果的位置、數量全部匹配,則不需要顯示指定對應關系。如果 stb_name 的列與 subquery 輸出結果的數據類型不匹配,會把 subquery 輸出結果的類型轉換成對應的 stb_name 的列的類型。創建流計算時不能指定 stb_name 的列和 TAG 的數據類型,否則會報錯。
對于已經存在的超級表,檢查列的schema信息
- 檢查列的 schema 信息是否匹配,對于不匹配的,則自動進行類型轉換,當前只有數據長度大于 4096byte 時才報錯,其余場景都能進行類型轉換。
- 檢查列的個數是否相同,如果不同,需要顯示的指定超級表與 subquery 的列的對應關系,否則報錯;如果相同,可以指定對應關系,也可以不指定,不指定則按位置順序對應。
自定義 TAG
用戶可以為每個 partition 對應的子表生成自定義的 TAG 值。
CREATE STREAM streams2 trigger at_once INTO st1 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as cc interval(10s));
PARTITION 子句中,為 concat(“tag-”, tbname) 定義了一個別名 cc,對應超級表 st1 的自定義 TAG 的名字。在上述示例中,流新創建的子表的 TAG 將以前綴 ‘new-’ 連接原表名作為 TAG 的值。
會對TAG信息進行如下檢查
- 檢查 tag 的 schema 信息是否匹配,對于不匹配的,則自動進行數據類型轉換,當前只有數據長度大于 4096byte 時才報錯,其余場景都能進行類型轉換。
- 檢查 tag 的個數是否相同,如果不同,需要顯示的指定超級表與 subquery 的 tag 的對應關系,否則報錯;如果相同,可以指定對應關系,也可以不指定,不指定則按位置順序對應。
清理中間狀態
DELETE_MARK time
DELETE_MARK 用于刪除緩存的窗口狀態,也就是刪除流計算的中間結果。如果不設置,默認值是 10 年
T = 最新事件時間 - DELETE_MARK
流式計算支持的函數
- 所有的 單行函數 均可用于流計算。
- 以下 19 個聚合/選擇函數 不能 應用在創建流計算的 SQL 語句。此外的其他類型的函數均可用于流計算。
- leastsquares
- percentile
- top
- bottom
- elapsed
- interp
- derivative
- irate
- twa
- histogram
- diff
- statecount
- stateduration
- csum
- mavg
- sample
- tail
- unique
- mode
暫停、恢復流計算
1.流計算暫停計算任務
PAUSE STREAM [IF EXISTS] stream_name;
沒有指定 IF EXISTS,如果該 stream 不存在,則報錯;如果存在,則暫停流計算。指定了 IF EXISTS,如果該 stream 不存在,則返回成功;如果存在,則暫停流計算。
2.流計算恢復計算任務
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
沒有指定 IF EXISTS,如果該 stream 不存在,則報錯,如果存在,則恢復流計算;指定了 IF EXISTS,如果 stream 不存在,則返回成功;如果存在,則恢復流計算。如果指定 IGNORE UNTREATED,則恢復流計算時,忽略流計算暫停期間寫入的數據。
狀態數據備份與同步
流計算的中間結果成為計算的狀態數據,需要在流計算整個生命周期中進行持久化保存。為了確保流計算中間狀態能夠在集群環境下在不同的節點間可靠地同步和遷移,從 v3.3.2.1 開始,需要在運行環境中部署 rsync 軟件,還需要增加以下的步驟:
- 在配置文件中配置 snode 的地址(IP + 端口)和狀態數據備份目錄(該目錄系 snode 所在的物理節點的目錄)。
- 然后創建 snode。
完成上述兩個步驟以后才能創建流。
如果沒有創建 snode 并正確配置 snode 的地址,流計算過程中將無法生成檢查點(checkpoint),并可能導致后續的計算結果產生錯誤。
snodeAddress 127.0.0.1:873
checkpointBackupDir /home/user/stream/backup/checkpoint/
創建 snode 的方式
使用以下命令創建 snode(stream node),snode 是流計算中有狀態的計算節點,可用于部署聚合任務,同時負責備份不同的流計算任務生成的檢查點數據。
CREATE SNODE ON DNODE [id]
其中的 id 是集群中的 dnode 的序號。請注意選擇的dnode,流計算的中間狀態將自動在其上進行備份。
從 3.3.4.0 版本開始,在多副本環境中創建流會進行 snode 的存在性檢查,要求首先創建 snode。如果 snode 不存在,無法創建流。
訪問官網
更多內容歡迎訪問 TDengine 官網