時間機制
Flink中的時間機制主要用在判斷是否觸發時間窗口window的計算。
在Flink中有三種時間概念:ProcessTime、IngestionTime、EventTime。
ProcessTime:是在數據抵達算子產生的時間(Flink默認使用ProcessTime)
IngestionTime:是在DataSource生成數據產生的時間
EventTime:是數據本身攜帶的時間,具有實際業務含義,不是Flink框架產生的時間
水位機制
由于網絡原因、故障等原因,數據的EventTIme并不是單調遞增的,是亂序的,有時與當前實際時間相差很大。
水位(watermark)用在EventTime語義的窗口計算,可以當作當前計算節點的時間。當水位超過窗口的endtime,表示事件時間t <= T的數據都**已經到達,**這個窗口就會觸發WindowFunction計算。當水位超過窗口的endtime+允許遲到的時間,窗口就會消亡。本質是DataStream中的一種特殊元素,每個水印都攜帶有一個時間戳。
- 隊列中是亂序的數據,流入長度3s的窗口。2s的數據進入[0,4)的窗口中
- 2s、3s、1s的數據進入[0,4)的窗口,7s的數據分配到[4,8)的窗口中
- 水印4s到達,代表4s以前的數據都已經到達。觸發[0,4)的窗口計算,[4,8)的窗口等待數據
- 水印9s到達,[4,8)的窗口觸發
多并行情況下,不同的watermark流到算子,取最小的wartermark當作當前算子的watermark。
如果所有流入水印中時間戳最小的那個都已經達到或超過了窗口的結束時間,那么所有流的數據肯定已經全部收齊,就可以安全地觸發窗口計算了。
生成水位
首先設置env為事件時間
使用 DataStream API 實現 Flink 任務時,Watermark Assigner 能靠近 Source 節點就靠近 Source 節點,能前置盡量前置。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//測試數據,間隔1s發送
DataStreamSource<Tuple2<String, Long>> source = env.addSource(new SourceFunction<Tuple2<String, Long>>() {@Overridepublic void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {ctx.collect(Tuple2.of("aa", 1681909200000L));//2023-04-19 21:00:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909500000L));//2023-04-19 21:05:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909800000L));//2023-04-19 21:10:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910100000L));//2023-04-19 21:15:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910400000L));//2023-04-19 21:20:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910700000L));//2023-04-19 21:25:00Thread.sleep(Long.MAX_VALUE);}@Overridepublic void cancel() {}});
抽取EventTime、生成Watermark
周期性水位–AssignerWithPeriodicWatermarks�(常用)
周期性生成水位。周期默認的時間是 200ms.
源碼如下:
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}
自定義實現AssignerWithPeriodicWatermarks,代碼如下:
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {return new Watermark(currentTimestamp);}@Override//獲取事件時間public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系統當前時間:"+ DateUtil.date());System.out.println("當前水位時間:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口開始時間:"+DateUtil.date(context.window().getStart()));System.out.println("窗口結束時間:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("數據攜帶時間:"+DateUtil.date(element.f1)));}}).print();
運行結果如下:
水位時間到達2023-04-19 21:10:00觸發窗口2023-04-19 21:00:00到2023-04-19 21:10:00,窗口中的數據為2023-04-19 21:00:00和2023-04-19 21:05:00
水位時間到達2023-04-19 21:20:00觸發窗口2023-04-19 21:10:00到2023-04-19 21:20:00,窗口中的數據為2023-04-19 21:10:00和2023-04-19 21:15:00
長時間等待后,2023-04-19 21:20:00到2023-04-19 21:30:00是存在一個2023-04-19 21:25:00的數據,一直沒有觸發。這是因為沒有新的數據進入,周期性生成的watermark一直是2023-04-19 21:20:00。所以后面窗口即使有數據也沒有觸發計算。
BoundedOutOfOrdernessTimestampExtractor�
BoundedOutOfOrdernessTimestampExtractor實現了AssignerWithPeriodicWatermarks接口,是flink內置的實現類。
主要源碼如下:
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}public abstract long extractTimestamp(T element);@Overridepublic final Watermark getCurrentWatermark() {long potentialWM = currentMaxTimestamp - maxOutOfOrderness;if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}@Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = extractTimestamp(element);if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;}
BoundedOutOfOrdernessTimestampExtractor產生的時間戳和水印是允許“有界亂序”的,構造它時傳入的參數maxOutOfOrderness就是亂序區間的長度,而實際發射的水印為通過覆寫extractTimestamp()方法提取出來的時間戳減去亂序區間,相當于讓水印把步調“放慢一點”。這是Flink為遲到數據提供的第一重保障。
當然,亂序區間的長度要根據實際環境謹慎設定,設定得太短會丟較多的數據,設定得太長會導致窗口觸發延遲,實時性減弱。
設置maxOutOfOrderness為5min,代碼如下:
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系統當前時間:"+ DateUtil.date());System.out.println("當前水位時間:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口開始時間:"+DateUtil.date(context.window().getStart()));System.out.println("窗口結束時間:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("數據攜帶時間:"+DateUtil.date(element.f1)));}}).print();
運行結果如下:
看起來和我們自定義實現結果一樣。但是10min的水位時間是來自數據15min減去延遲時間5min得來的。
同理20min的水位時間是來自數據25min減去延遲時間5min得來的。
我們可以設置延遲時間為10min,看一下結果。最后一條數據是25min,那么最后的水位線就是25min-10min=15min。只會觸發00-10的窗口。
同樣的,由于沒有后續數據導致后面的窗口沒有觸發。
AscendingTimestampExtractor
AscendingTimestampExtractor要求生成的時間戳和水印都是單調遞增的。用戶實現從數據中獲取自增的時間戳extractAscendingTimestamp與上一次時間戳比較。如果出現減少,則打印warn日志。
源碼如下:
public abstract long extractAscendingTimestamp(T element);@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}@Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}
間斷性水位線
適用于根據接收到的消息判斷是否需要產生水位線的情況,用這種水印生成的方式并不多見。
舉例如下,數據為15min的時候生成水位。
source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {DateTime date = DateUtil.date(lastElement.f1);return date.minute()==15?new Watermark(lastElement.f1):null;}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系統當前時間:"+ DateUtil.date());System.out.println("當前水位時間:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口開始時間:"+DateUtil.date(context.window().getStart()));System.out.println("窗口結束時間:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("數據攜帶時間:"+DateUtil.date(element.f1)));}}).print();
結果如下:
15min的數據生成了15min的水位,只觸發了00-10的窗口。
窗口處理遲到的數據
allowedLateness�
Flink提供了WindowedStream.allowedLateness()方法來設定窗口的允許延遲。也就是說,正常情況下窗口觸發計算完成之后就會被銷毀,但是設定了允許延遲之后,窗口會等待allowedLateness的時長后再銷毀。在該區間內的遲到數據仍然可以進入窗口中,并觸發新的計算。當然,窗口也是吃資源大戶,所以allowedLateness的值要適當。給個完整的代碼示例如下。
source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系統當前時間:"+ DateUtil.date());System.out.println("當前水位時間:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口開始時間:"+DateUtil.date(context.window().getStart()));System.out.println("窗口結束時間:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("數據攜帶時間:"+DateUtil.date(element.f1)));}}).print();
side output
側輸出(side output)是Flink的分流機制。遲到數據本身可以當做特殊的流,我們通過調用WindowedStream.sideOutputLateData()方法將遲到數據發送到指定OutputTag的側輸出流里去,再進行下一步處理(比如存到外部存儲或消息隊列)。代碼如下。
// 側輸出的OutputTagOutputTag<Tuple2<String, Long>> lateOutputTag = new OutputTag<>("late_data_output_tag");SingleOutputStreamOperator<Object> process = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).sideOutputLateData(lateOutputTag).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系統當前時間:" + DateUtil.date());System.out.println("當前水位時間:" + DateUtil.date(context.currentWatermark()));System.out.println("窗口開始時間:" + DateUtil.date(context.window().getStart()));System.out.println("窗口結束時間:" + DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("數據攜帶時間:" + DateUtil.date(element.f1)));}});//處理側輸出數據
// process.getSideOutput(lateOutputTag).addSink()
最后的window不觸發解決方法
自定義自增水位
周期性獲取watermark時,自定義增加水位
source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {currentTimestamp+=60000;return new Watermark(currentTimestamp);}@Override//獲取事件時間public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}})
結果如下:
自定義trigger
當watermark不能滿足關窗條件時,我們給注冊一個晚于事件時間的處理時間定時器使它一定能達到關窗條件。
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class MyTrigger extends Trigger<Object, TimeWindow> {@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());ctx.registerProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());return TriggerResult.FIRE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (time == window.maxTimestamp()) {ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);}
}
Test.java
參考鏈接:
https://www.jianshu.com/p/c612e95a5028
https://blog.csdn.net/lixinkuan328/article/details/104129671
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/
https://cloud.tencent.com/developer/article/1573079
https://blog.csdn.net/m0_73707775/article/details/129560540?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&utm_relevant_index=10