水位線特點
- 插入到數據流中的一個標記,可以認為是一個特殊的數據
- 主要內容是一個時間戳
- 水位線是基于數據的時間戳生成的,即
事件時間
- 水位線必須單調遞增
- 水位線可以通過設置延遲,來保證正確處理亂序數據
- 一個水位線,表示事件時間已經達到了時間戳t
- 水位線是Flink流處理中保證結果正確性的核心機制
窗口
錯誤理解:窗口是一個固定位置的框,數據流源源不斷地流過來,到某個時間窗口該關閉了,就停止收集數據,觸發計算并窗口關閉
輸出結果。
Flink中窗口是動態創建的,當有落在這個窗口區間范圍的數據達到時,才創建對應的窗口。事實上,觸發計算和窗口關閉兩個行為可以分開。
總體原則
水位線出現表示這個時間之前的數據已經全部到齊,之后再也不會出現了,不過要保證絕對正確,就必須等足夠長的時間,這會帶來更高的延遲。水位線是流處理中對低延遲和結果正確性的一個權衡機制。
水位線生成方案
水位線的生成位置:越靠近數據源越好
WatermarkStrategy:水位線策略對象
1. 水位線生成器 WatermarkGenerator
- onEvent() 給每條數據生成水位線
- onPeriodicEmit():周期性生成水位線
2. 時間戳分配器 TimestampAssigner
- extractTimestamp()
有序流水位線生成的代碼如下:
public class Flink01_UserDefineWaterMarkStrategy {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);//設置生成水位線的周期env.getConfig().setAutoWatermarkInterval(1000);//tom,/home,1000SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(",");return new Event(words[0].trim(),words[1].trim(),Long.valueOf(words[2].trim()));});ds.print("input");ds.assignTimestampsAndWatermarks(new MyWatermarkStrategy());ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{/*** 創建水位線生成器* @param context* @return*/@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}public static class MyWatermarkGenerator implements WatermarkGenerator<Event>{private Long maxTs = Long.MIN_VALUE;/*** 每一條數據調用一次,用于生成一次水位線* @param event* @param eventTimestamp* @param output*/@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {//有序流,每條數據生成水位線
// System.out.println("有序流每條數據生成水位線===》"+eventTimestamp);
// output.emitWatermark(new Watermark(eventTimestamp));maxTs = Math.max(maxTs, eventTimestamp);}/*** 周期性生成水位線* 默認周期是200ms* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//有序流,周期性生成水位線System.out.println("有序流周期性生成水位線===》"+maxTs);output.emitWatermark(new Watermark(maxTs));}}/*** 創建時間戳分配器,用于從數據中提取時間戳* @param context* @return*/@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimestampAssigner();}}public static class MyTimestampAssigner implements TimestampAssigner<Event>{/*** 從數據中提取時間戳* @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if* no timestamp has been assigned yet.* @return*/@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTs();}}}