場景
日志服務內置了20+類SQL函數。面對用戶復雜的業務場景,例如使用json來沉淀業務數據,普通的SQL函數可能就無法滿足需求,需要一些用戶自定義處理邏輯。為了處理json類的業務數據,我們可以采用把json展開成多行的形式進行統計分析,今天我們介紹使用UDF(lambda)的方式來編寫自定義邏輯,處理json、array、map類型的數據。
數據樣例:
__source__: 11.164.232.105
__tag__:__hostname__: vm-req-170103232316569850-tianchi111932.tc
__topic__: TestTopic_4
array_column: [1,2,3]
double_column: 1.23
map_column: {"a":1,"b":2}
text_column: 商品
lambda函數對array類型的數據進行求均值
為了遍歷每一個array元素,并且把計算所有元素的均值,我們通過reduce函數進行計算。
* | select array_column, reduce( cast( json_parse(array_column) as array(bigint)) , CAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER)) , (s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER)), s -> IF(s.count = 0, NULL, s.sum / s.count))
reduce 函數的具體語義參考語法文檔。參數分為四部分
-
cast( json_parse(array_column) as array(bigint))
表示輸入的數組數據 -
CAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER))
定義起始狀態為一個復雜的row類型,分別記錄sum和count - 對每一個元素,計算累加值,
(s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER))
s代表已經有的狀態,x代表新輸入的元素,計算結果通過cast強制定義為row類型 - 最后對最終狀態,計算avg值,
s -> IF(s.count = 0, NULL, s.sum / s.count)
。s代表最終狀態。
對所有行的array元素求avg:
* | select sum(rows.sum ) / sum(rows.count) from(select array_column, reduce( cast( json_parse(array_column) as array(bigint)) , CAST(ROW(0.0, 0) AS ROW(sum DOUBLE, count INTEGER)) , (s,x) -> cast(row( x+ s.sum, s.count+1) as ROW(sum double, count INTEGER)), s -> s) as rows from log )
通過子查詢的方式,先reduce每一行的array的sum 和count。之后在嵌套查詢中,求所有行的sum和count,最后相除求avg: