目錄
窗口分類
1.按照驅動類型分類
1. 時間窗口(Time window)
2.計數窗口(Count window)
2.按照窗口分配數據的規則分類
窗口API分類
API調用
窗口分配器器:
窗口函數
增量聚合函數:
全窗口函數
flink sql 窗口函數
窗口 | Apache Flink
窗口分類
1.按照驅動類型分類
1. 時間窗口(Time window)
? ? 時間窗口以時間點定義窗口的開始和結束,因此截取出就是某一段時間的數據。當到達結束時間時窗口不在接受數據,觸發計算輸出結果,并關閉銷毀窗口。
flink有一個專門的類用來表示時間窗口TimeWindow,這個類只有兩個私有屬性;窗口的方法獲取最大時間戳為end-1,因此窗口[start,end)? 左開右閉;
@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}@Overridepublic long maxTimestamp() {return end - 1;}
2.計數窗口(Count window)
計數窗口是基于元素個數截取,在到達固定個數是就觸發計算并關閉窗口。
3.全局窗口(Global Windows)
是計數窗口的底層實現,窗口分配器由GlobalWindows類提供,需要自定義觸發器實現窗口的計算;
stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// .max().aggregate(new AvgPv()).print();查看源代碼,windou函數后見windowStrream時獲取默認的觸發器
@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), //湖區觸發器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 計數窗口底層采用全局窗口加計數器來實現public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}
2.按照窗口分配數據的規則分類
滾動窗口(Tumbling Window):窗口大小固定,窗口沒有重疊;
滑動窗口 (Sliding Window):滑動窗口有重疊,也可以沒有重疊,如果窗口size和滑動size相等,等于滾動窗口;
會話窗口 (Session Window):基于會話對窗口進行分組,與其他兩個不同的是,會話窗口是借用會話窗口的超時失效機觸發窗口計算,當數據到來后會開啟一個窗口,如果在超時時間內有數據陸續到來,窗口不會關閉,反之會關閉;極端情況,如果數據總能在窗口超時時間到達前遠遠不斷的到來,該窗口會一直開啟不會關閉;
全局窗口 (Global Window):比較通用的窗口,該窗口會把數據分配到一個窗口中,窗口為全局有效,會把相同key的數據分配到同一個窗口中,默認不會觸發計算,跟沒有窗口一樣,需要自定義觸發器才能使用;
窗口API分類
窗口大的分類可以分為按鍵分區和非按鍵分區兩種:按鍵分需要經過keyby操作,會把數據進行分發,實現負載均分,可以并行處理更大的數據量。而非按鍵分區窗口,相當于并行度為1,使用上直接調用windowall(),因此一般并不推薦使用;
stream
.keyby(...) //流按鍵分區
.window(...) //定義窗口分配器
[.trigger()] //設置出發器
[.evictor()] //設置移除器
[.allowedLateness()] // 設置延遲時間
[.sideOutputLateData()] //設置側輸出流
.reduce/aggregate/fold/apply() //處理函數
[.getSideOutput()] //獲取側輸出流stream
.windowAll(...) //定義窗口分配器
[.trigger()] //設置出發器
[.evictor()] //設置移除器
[.allowedLateness()] // 設置延遲時間
[.sideOutputLateData()] //設置側輸出流
.reduce/aggregate/fold/apply() //處理函數
[.getSideOutput()] //獲取側輸出流
API調用
窗口操作包含兩個重要的概念:窗口分配器(window Assigners)和窗口函數(window function)兩部分;
窗口分配器用于構建窗口,確定窗口類型,確定數據劃分哪一個窗口,窗口函數制定數據的計算規則;
窗口分配器器:
作用:窗口分配器用來劃分窗口屬于哪一個窗口;
窗口按照時間可以劃分為:滾動、滑動和session,三種類型窗口;
窗口計數劃分:滾動和滑動兩種類型;
eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函數
窗口函數按照計算特點可以分為增量計算和全量計算;
增量聚合函數:數據到達后立即計算,窗口只保存中間結果。效率高,性能好,但不夠靈活。
全量聚合函數:緩存窗口的所有元素,觸發后統一計算,效率低,但計算靈活。
增量聚合函數:
數據進入窗口會參與計算,窗口結束前只需要保留一個聚合后的狀態值,內存壓力小。
1.規約函數(ReduceFunction):數據保存留一個狀態,輸入類型和輸出類型必須一致,來一條數據會處理將數據合并到狀態中;
stream.keyBy(r -> r.f0)// 設置滾動事件時間窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定義累加規則,窗口閉合時,向下游發送累加結果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();
sum、max、min等底層都是通過同名AggregateFunction實現(非下面的聚合函數),本質還是實現ReduceFunction結構重寫了reduce方法;
2.聚合函數(AggrateFunction):在規約函數基礎上進行完善。解決輸出和輸入類型必須一致的限制問題。實現應用更靈活;
// 所有數據設置相同的key,發送到同一個分區統計PV和UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {// 創建累加器return Tuple2.of(new HashSet<String>(), 0L);}@Overridepublic Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {// 屬于本窗口的數據來一條累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 窗口閉合時,增量聚合結束,將計算結果發送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {return null;}}
全窗口函數
全窗口函數會將進入窗口的數據先進行緩存,然后在窗口關閉時一起計算,緩存數據會占用內存資源,如果一個窗口數據量太大時,可能出現內存溢出的問題;
全窗口函數可以劃分窗口函數(windowFunction)和處理窗口函數(processWindowFunction)兩種;
窗口函數(windowFunction):老版本通用窗口接口,window()后調用apply(),傳入實現windowFunction接口; 缺點是不能獲取上下文信息,也沒有更高級的功能。因為在功能上可以被processWindowFunction全覆蓋,因此主鍵被棄用;
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
處理窗口函數(processWindowFunction):是窗口API中最底層通用的窗口函數接口,可以獲取到上問對象(context),實現為調用process方法傳入自定義繼承ProcessWindowFunction類;
input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}
注意:一般增量窗口函數和全量窗口函數可以一起使用,window().aggregate()方法可以傳入兩個函數,第一個采用增量聚合函數,第二個傳入全量函數,這樣數據在進入窗口會觸發增量計算,窗口不會緩存數據。當窗口關閉觸發計算時,結果數據穿度到全量計算,參數Iterable中一般只有一個數據;
aggregate(acct1,acct2)
flink sql 窗口函數
flink sql 窗口也包含常見的滾動窗口、滑動窗口、session窗口,但還有一種累計窗口。
在flink1.13版本后flinksql支持累計窗口CUMULATE,可以實現沒5分鐘觸發一次計算,輸出當天的累計數據,使用樣例
SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end