窗口函數(Window Functions)
a)概述
定義了 window assigner 之后,需要指定當窗口觸發之后,如何計算每個窗口中的數據, 即 window function。
窗口函數有三種:ReduceFunction
、AggregateFunction
或 ProcessWindowFunction
;
- 前兩者執行更高效,因為 Flink 可以在每條數據到達窗口后進行增量聚合(incrementally aggregate);
- 而
ProcessWindowFunction
會得到能夠遍歷當前窗口內所有數據的Iterable
,以及關于這個窗口的 meta-information。
使用 ProcessWindowFunction
的窗口轉換操作沒有其它兩種函數高效,因為 Flink 在窗口觸發前必須緩存里面的所有數據; ProcessWindowFunction
可以與 ReduceFunction
或 AggregateFunction
合并來提高效率,既可以增量聚合窗口內的數據,又可以從 ProcessWindowFunction
接收窗口的 metadata。
b)ReduceFunction
ReduceFunction
指定兩條輸入數據如何合并起來產生一條輸出數據,輸入和輸出數據的類型必須相同。
Flink 使用 ReduceFunction
對窗口中的數據進行增量聚合。
示例:對窗口內元組的第二個屬性求和。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
c)AggregateFunction
ReduceFunction
是 AggregateFunction
的特殊情況; AggregateFunction
接收三個參數:輸入數據的類型(IN
)、累加器的類型(ACC
)和輸出數據的類型(OUT
)。
輸入數據的類型是輸入流的元素類型,AggregateFunction
接口有如下幾個方法: 把每一條元素加進累加器、創建初始累加器、合并兩個累加器、從累加器中提取輸出(OUT
類型)。
與 ReduceFunction
相同,Flink 會在輸入數據到達窗口時直接進行增量聚合。
示例:計算窗口內所有元素第二個屬性的平均值。
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
d)ProcessWindowFunction
ProcessWindowFunction 具備 Iterable 能獲取窗口內所有的元素 ,以及用來獲取時間和狀態信息的 Context 對象,比其他窗口函數更加靈活;ProcessWindowFunction 的靈活性是以性能和資源消耗為代價的, 因為窗口中的數據無法被增量聚合,而需要在窗口觸發前緩存所有數據。
ProcessWindowFunction
的函數簽名如下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key,Context context,Iterable<IN> elements,Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/*** The context holding window metadata.*/public abstract class Context implements java.io.Serializable {/*** Returns the window that is being evaluated.*/public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up* by implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/*** State accessor for per-key global state.*/public abstract KeyedStateStore globalState();}}
key
參數由 keyBy()
中指定的 KeySelector
選出;如果是給出 key 在 tuple 中的 index 或用屬性名的字符串形式指定 key,這個 key 的類型將總是 Tuple
, 并且需要手動將它轉換為正確大小的 tuple 才能提取 key。
示例:使用 ProcessWindowFunction
對窗口中的元素計數,并且將窗口本身的信息一同輸出。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}
e)增量聚合的 ProcessWindowFunction
ProcessWindowFunction
可以與 ReduceFunction
或 AggregateFunction
搭配使用, 使其能夠在數據到達窗口的時候進行增量聚合,當窗口關閉時,ProcessWindowFunction
將會得到聚合的結果;即實現了增量聚合窗口的元素并且從 ProcessWindowFunction 中獲得窗口的元數據。
使用 ReduceFunction 增量聚合
示例:將 ReduceFunction
與 ProcessWindowFunction
組合,返回窗口中的最小元素和窗口的開始時間。
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
使用 AggregateFunction 增量聚合
示例:將 AggregateFunction 與 ProcessWindowFunction 組合,計算平均值并與窗口對應的 key 一同輸出。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate(), new MyProcessWindowFunction());// Function definitionsprivate static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {public void process(String key,Context context,Iterable<Double> averages,Collector<Tuple2<String, Double>> out) {Double average = averages.iterator().next();out.collect(new Tuple2<>(key, average));}
}
f)在 ProcessWindowFunction 中使用 per-window state
除了訪問 keyed state,ProcessWindowFunction
還可以使用作用域僅為“當前正在處理的窗口”的 keyed state;
per-window 中的 window 對應某個 key 的窗口實例:比如 以 user-id xyz 為 key,從 12:00 到 13:00 的時間窗口,具體情況取決于窗口的定義,根據具體的 key 和時間段會產生諸多不同的窗口實例。
Per-window state 如果處理有 1000 種不同 key 的事件,并且目前所有事件都處于 [12:00, 13:00) 時間窗口內,那么將會得到 1000 個窗口實例, 且每個實例都有自己的 keyed per-window state。
process()
接收到的 Context
對象中有兩個方法允許訪問以下兩種 state:
globalState()
,訪問全局的 keyed statewindowState()
, 訪問作用域僅限于當前窗口的 keyed state
如果可能將一個 window 觸發多次(比如當遲到數據會再次觸發窗口計算, 或自定義了根據推測提前觸發窗口的 trigger),那么這個功能將非常有用,這時可能需要在 per-window state 中儲存關于之前觸發的信息或觸發的總次數。
當使用窗口狀態時,一定記得在刪除窗口時清除這些狀態,應該定義在 clear()
方法中。
WindowFunction(已過時)
在某些可以使用 ProcessWindowFunction
的地方,也可以使用 WindowFunction
;它是舊版的 ProcessWindowFunction
,只能提供更少的環境信息且缺少一些高級的功能,比如 per-window state。
WindowFunction
的函數簽名如下:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.** @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
可以像下例這樣使用:
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());