一、引言
Flink 作為一款強大的流處理框架,在其中扮演著關鍵角色。今天,咱們來聊聊 Flink 中一個極為重要的概念 —— Watermark(水位線),它是處理亂序數據和準確計算的關鍵。接下來我們直入主題,首先來看看今天的第一個問題Watermark是什么。
二、Watermark 是什么
(一)Watermark 定義?
Watermark(水位線)是 Flink 中用于處理事件時間(Event Time)的一種機制 ,它本質上是一種特殊的時間戳。簡單來說,Watermark 是插入到數據流中的一個標記,用于標記事件時間的上界。在 Flink 的流處理中,Watermark 可以看作是一個時間戳,它表示所有小于該時間戳的事件都已經到達,之后不會再有該時間戳之前的事件到來。?
比如,我們有一個訂單流,每個訂單都有一個下單時間(事件時間)。如果當前 Watermark 的值是 10:00,那就意味著所有下單時間在 10:00 之前的訂單都已經到達 Flink 系統,不會再有 10:00 之前下單的訂單數據了。?
Watermark 必須單調遞增,這是一個非常重要的特性。只有單調遞增,才能確保任務的事件時間時鐘在向前推進,而不是后退。如果 Watermark 不單調遞增,就會導致時間混亂,影響窗口計算等操作的準確性。?
(二)Watermark 作用?
Watermark 的主要作用是解決數據亂序問題 ,確保在事件時間語義下,窗口操作能夠準確地觸發計算。在實際的流處理場景中,由于網絡延遲、分布式系統等因素,數據到達 Flink 系統的順序往往和它們實際產生的時間順序不一致,也就是出現亂序問題。如果直接按照數據到達的順序進行窗口計算,結果可能會不準確。?
舉個例子,假設有一個統計每 10 分鐘內訂單金額總和的窗口操作。如果沒有 Watermark 機制,當一個 10:00 - 10:10 的窗口時間到了,系統可能會立刻觸發計算,而此時可能還有 10:00 - 10:10 之間產生的訂單數據因為網絡延遲還沒到達。這樣計算出來的訂單金額總和就會偏小,結果不準確。?
引入 Watermark 后,我們可以設置一個合理的延遲時間。比如設置延遲 5 分鐘,當系統接收到一個事件時間為 10:05 的訂單時,Watermark 的值可能是 10:00(10:05 - 5 分鐘)。此時,即使 10:00 - 10:10 的窗口時間到了,系統也不會立刻觸發計算,而是會等待,直到 Watermark 的值達到 10:10(10:15 - 5 分鐘),才會觸發窗口計算。這樣就給了可能遲到的訂單數據足夠的時間到達,保證了計算結果的準確性。?
總的來說,Watermark 就像是一個 “裁判”,它告訴 Flink 系統什么時候可以放心地進行窗口計算,不用擔心有遲到的數據會影響結果。通過設置 Watermark,我們可以在一定程度上平衡延遲和結果正確性,讓 Flink 在處理亂序數據時依然能夠給出準確的分析結果。
三、Watermark 工作原理?
(一)生成機制?
Watermark 的生成方式主要有兩種:周期性生成和基于事件的生成 。?
周期性生成:Flink 會按照一定的時間間隔周期性地生成 Watermark。在這種方式下,我們需要設置一個最大允許的亂序時間(maxOutOfOrderness)。例如,假設我們設置最大亂序時間為 5 秒,當 Flink 接收到一個事件時間為 10:05 的事件時,它會根據這個事件時間和最大亂序時間來生成 Watermark。此時 Watermark 的值為 10:00(10:05 - 5 秒)。這意味著系統認為所有 10:00 之前的事件都已經到達,不會再有更晚到達的 10:00 之前的事件了。?
在 Flink 中,我們可以通過BoundedOutOfOrdernessWatermarkStrategy來實現周期性生成 Watermark,示例代碼如下:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.OutputTag;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳return extractTimestamp(element);}));// 后續處理邏輯withWatermark.print();env.execute("Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}
基于事件的生成:基于事件的生成方式是當特定的事件發生時生成 Watermark ,這種方式適用于某些特殊的業務場景,比如在電商訂單流中,當接收到一個特殊的 “訂單批次結束” 事件時,生成 Watermark。在 Flink 中,我們可以通過實現AssignerWithPunctuatedWatermarks接口來實現基于事件的 Watermark 生成。這種方式需要我們在處理每個事件時,判斷是否滿足生成 Watermark 的條件。如果滿足,則生成一個新的 Watermark。 基于事件的生成方式示例代碼如下:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class PunctuatedWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {@Overridepublic long extractTimestamp(String element, long previousElementTimestamp) {// 從元素中提取事件時間戳return extractTimestamp(element);}@Overridepublic Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {// 判斷是否滿足生成Watermark的條件,這里假設當元素為 "END" 時生成Watermarkif ("END".equals(lastElement)) {return new Watermark(extractedTimestamp);}return null;}});// 后續處理邏輯withWatermark.print();env.execute("Punctuated Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}
(二)傳播機制?
Watermark 在 Flink 算子間的傳播過程是確保窗口計算準確性的關鍵 。當 Watermark 在數據流中生成后,它會隨著數據一起在算子間傳遞。在單并行度的情況下,Watermark 的傳播相對簡單,直接從上游算子傳遞到下游算子。比如,在一個簡單的數據流中,數據源生成 Watermark 后,經過 map 算子,再到 window 算子,Watermark 會依次傳遞下去。?
在多并行度的情況下,Watermark 的傳播會涉及到對齊(alignment)的概念 。假設一個數據源有多個并行子任務,每個子任務都會生成自己的 Watermark。當這些帶有不同 Watermark 的數據流匯聚到下游算子時,下游算子會選擇所有輸入流中最小的 Watermark 作為自己的 Watermark。這是因為只有當所有輸入流中小于等于該 Watermark 時間戳的事件都到達后,才能確保窗口計算的準確性。比如,有三個并行輸入流,它們的 Watermark 分別為 10:00、10:05 和 10:10,那么下游算子會選擇 10:00 作為自己的 Watermark。這樣做的目的是為了保證所有可能參與窗口計算的數據都已經到達,避免因為某個輸入流的數據延遲而導致窗口計算結果不準確。?
(三)觸發機制?
Watermark 達到一定條件時會觸發窗口計算 。具體來說,當 Watermark 的值大于或等于窗口的結束時間,并且該窗口內有數據時,就會觸發窗口計算。例如,我們定義一個 10:00 - 10:10 的窗口,設置最大亂序時間為 5 分鐘。當 Watermark 的值達到 10:10(假設事件時間為 10:15 的數據到達,生成的 Watermark 為 10:10,10:15 - 5 分鐘),并且 10:00 - 10:10 這個窗口內有數據時,就會觸發窗口計算,計算該窗口內的數據結果并輸出。?
再舉個例子,假設我們有一個統計每 5 分鐘內用戶訪問次數的窗口操作。如果沒有數據遲到,當時間到達 10:05 時,窗口內的數據統計完成,Watermark 達到 10:05,此時窗口計算被觸發,輸出 10:00 - 10:05 這個時間段內的用戶訪問次數。但如果有數據遲到,比如 10:03 產生的數據在 10:08 才到達,由于我們設置了一定的亂序時間,Watermark 不會立刻達到 10:05,而是會等待遲到的數據。當 Watermark 最終達到 10:05 時,窗口計算才會觸發,這樣就能保證遲到的數據也能被正確統計到 10:00 - 10:05 的窗口內 。
四、Watermark 生成策略?
(一)固定延遲策略?
固定延遲策略是一種較為簡單直觀的 Watermark 生成策略 。它的原理是在接收到的最大事件時間的基礎上,減去一個固定的延遲時間,以此來生成 Watermark。例如,我們設置固定延遲時間為 10 秒,當接收到的事件時間為 10:00:15 時,生成的 Watermark 時間戳就是 10:00:05(10:00:15 - 10 秒)。這表示系統認為所有 10:00:05 之前的事件都已經到達,不會再有更晚到達的 10:00:05 之前的事件了。?
在 Flink 中,使用固定延遲策略生成 Watermark 的代碼示例如下:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class FixedDelayWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成Watermark,設置固定延遲10秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳return extractTimestamp(element);}));// 后續處理邏輯withWatermark.print();env.execute("Fixed Delay Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}
(二)滑動策略?
滑動策略是指 Watermark 按照一定的時間間隔(滑動步長)和窗口大小進行生成 。與固定延遲策略不同,滑動策略下的 Watermark 不是簡單地基于最大事件時間減去固定延遲,而是在滑動窗口的基礎上生成。例如,我們設置滑動窗口大小為 5 分鐘,滑動步長為 1 分鐘。當第一個窗口(0:00 - 0:05)內的事件處理完成后,生成第一個 Watermark,其時間戳為 0:05。接著,當第二個窗口(0:01 - 0:06)內的事件處理完成后,生成第二個 Watermark,其時間戳為 0:06,以此類推。?
以電商訂單統計為例,如果我們想統計每 5 分鐘內的訂單金額總和,并且希望每分鐘更新一次統計結果,就可以使用滑動策略。假設我們設置滑動窗口大小為 5 分鐘,滑動步長為 1 分鐘。當接收到第一個訂單時,判斷其事件時間,將其分配到對應的窗口(例如 0:00 - 0:05)。隨著訂單的不斷接收,當 0:00 - 0:05 這個窗口內的訂單處理完成后,生成一個 Watermark,其時間戳為 0:05,表示 0:05 之前的訂單都已經到達。此時,開始計算 0:00 - 0:05 這個窗口內的訂單金額總和并輸出結果。接著,當 0:01 - 0:06 這個窗口內的訂單處理完成后,生成下一個 Watermark,時間戳為 0:06,計算并輸出 0:01 - 0:06 這個窗口內的訂單金額總和。這樣,我們就能每分鐘得到一個最近 5 分鐘內的訂單金額統計結果。?
(三)自定義策略?
在一些復雜的業務場景中,固定延遲策略和滑動策略可能無法滿足需求,這時就需要自定義 Watermark 生成策略 。自定義策略允許用戶根據具體的業務邏輯和數據特點,靈活地生成 Watermark。例如,在物聯網設備數據處理場景中,不同設備的數據產生頻率和延遲情況可能各不相同。有些設備可能每隔幾秒就發送一次數據,而有些設備可能由于網絡問題,數據延遲較高。在這種情況下,我們可以根據設備 ID 來分別設置不同的 Watermark 生成策略。對于數據產生頻率高、延遲低的設備,可以設置較小的固定延遲;對于數據延遲較高的設備,可以根據歷史數據的延遲情況,動態地調整 Watermark 的生成邏輯。?
在 Flink 中,實現自定義 Watermark 生成策略需要實現WatermarkStrategy接口 。通過重寫接口中的方法,我們可以定義自己的 Watermark 生成邏輯。例如,下面是一個簡單的自定義 Watermark 生成策略的示例代碼:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class CustomWatermarkStrategy implements WatermarkStrategy<String> {@Overridepublic WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomWatermarkGenerator();}private static class CustomWatermarkGenerator implements WatermarkGenerator<String> {private long maxTimestamp;private final long delay = 10000; // 延遲10秒public CustomWatermarkGenerator() {this.maxTimestamp = Long.MIN_VALUE;}@Overridepublic void onEvent(String event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - delay));}}
}
?在上述代碼中,我們定義了一個CustomWatermarkStrategy類,實現了WatermarkStrategy接口。在createWatermarkGenerator方法中,返回一個CustomWatermarkGenerator實例。CustomWatermarkGenerator類實現了WatermarkGenerator接口,通過onEvent方法記錄接收到的最大事件時間,在onPeriodicEmit方法中,根據最大事件時間減去固定延遲生成 Watermark。這樣,我們就實現了一個簡單的自定義 Watermark 生成策略,可以根據具體業務需求進行靈活調整 。
五、實際生產使用
定義數據源:這里我們以從 Kafka 讀取數據為例,假設 Kafka 中存儲的是訂單數據,每條數據格式為訂單ID,下單時間戳,訂單金額。示例代碼如下:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.util.Properties;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Kafka配置Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 后續代碼...}
}
設置 Watermark 生成器:我們采用固定延遲策略生成 Watermark,設置最大亂序時間為 5 秒。示例代碼如下:
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 設置Watermark生成器,固定延遲5秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳,假設數據格式為 "訂單ID,下單時間戳,訂單金額"String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 后續代碼...}
}
窗口操作與計算:對帶有 Watermark 的數據進行窗口操作,統計每 10 分鐘內的訂單總金額。示例代碼如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 按訂單ID分組,進行窗口操作,統計每10分鐘內的訂單總金額DataStream<String> result = withWatermark.keyBy(value -> value.split(",")[0]).timeWindow(Time.minutes(10)).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {double totalAmount = 0;for (String element : input) {String[] parts = element.split(",");double amount = Double.parseDouble(parts[2]);totalAmount += amount;}out.collect("訂單ID: " + key + ", 窗口開始時間: " + window.getStart() + ", 窗口結束時間: " + window.getEnd() + ", 訂單總金額: " + totalAmount);}});result.print();env.execute("Watermark Example");}
}
代碼解析?
- 創建流處理環境:StreamExecutionEnvironment.getExecutionEnvironment()獲取當前運行環境的執行環境對象,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)設置流處理的時間特性為事件時間,這是使用 Watermark 的前提。?
- 定義數據源:通過FlinkKafkaConsumer從 Kafka 中讀取數據,props中配置了 Kafka 的連接地址和消費者組 ID,SimpleStringSchema用于將 Kafka 中的數據反序列化為字符串。?
- 設置 Watermark 生成器:WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))表示采用固定延遲策略生成 Watermark,最大亂序時間為 5 秒。withTimestampAssigner方法用于從輸入數據中提取事件時間戳,這里假設數據格式為訂單ID,下單時間戳,訂單金額,通過Long.parseLong(parts[1])提取下單時間戳。?
- 窗口操作與計算:keyBy(value -> value.split(",")[0])按訂單 ID 進行分組,timeWindow(Time.minutes(10))定義了一個 10 分鐘的滾動窗口,apply方法中實現了具體的計算邏輯,遍歷窗口內的所有訂單數據,累加訂單金額,最后輸出統計結果。
?當我們運行上述代碼后,控制臺會輸出每 10 分鐘內每個訂單 ID 的訂單總金額。例如:
訂單ID: 1001, 窗口開始時間: 1633420800000, 窗口結束時間: 1633421400000, 訂單總金額: 1000.5
訂單ID: 1002, 窗口開始時間: 1633420800000, 窗口結束時間: 1633421400000, 訂單總金額: 2000.3
從結果可以看出,Watermark 機制有效地處理了亂序數據,確保了窗口計算的準確性。即使訂單數據到達的順序混亂,通過 Watermark 的延遲策略,Flink 也能準確地統計出每個窗口內的訂單總金額 。?
六、常見問題?
(一)Watermark 延遲設置?
在設置 Watermark 延遲時間時,一定要充分考慮業務場景的需求和數據的實際延遲情況 。如果延遲設置過小,可能無法給遲到的數據足夠的時間到達,導致窗口計算結果不準確,丟失重要數據。比如在電商促銷活動期間,訂單數據量會大幅增加,網絡延遲也可能更嚴重。如果 Watermark 延遲設置只有 1 分鐘,很可能一些在促銷活動剛開始幾分鐘內產生的訂單數據因為網絡擁堵,在 1 分鐘后才到達,這些遲到的訂單數據就無法被正確統計到相應的窗口內,影響銷售數據的統計準確性。?
相反,如果延遲設置過大,雖然能確保所有數據都能被處理,但會導致窗口計算的延遲增加,影響實時性 。假設一個實時監控系統,需要及時發現設備的異常情況并發出警報。如果 Watermark 延遲設置為 30 分鐘,即使設備在幾分鐘前就出現了異常,但由于 Watermark 延遲時間過長,要等 30 分鐘后才會觸發窗口計算,這就導致警報發出延遲,可能會給生產帶來嚴重的損失。?
(二)數據亂序處理?
當數據亂序嚴重時,原本設置的 Watermark 策略可能無法滿足需求 。比如在一些特殊的物聯網場景中,由于傳感器分布范圍廣,網絡狀況復雜,數據亂序情況非常嚴重。此時,可以考慮增加 Watermark 的延遲時間,以確保更多遲到的數據能夠被正確處理。例如,原本設置的延遲時間為 5 秒,在數據亂序嚴重的情況下,可以將延遲時間增加到 10 秒甚至更長。?
還可以結合其他技術手段,如使用側輸出流(Side Output)來處理遲到的數據 。側輸出流可以將遲到的數據單獨收集起來,進行后續的特殊處理。在電商訂單處理中,對于超過一定延遲時間(如 10 分鐘)的訂單數據,可以將其輸出到側輸出流,然后對這些數據進行單獨分析,看是否是由于某些特殊原因導致的延遲,如支付系統故障、物流信息更新不及時等。?
(三)Watermark 與窗口的配合?
Watermark 與窗口的大小和類型密切相關 ,直接影響窗口計算的正確性。如果窗口大小設置不合理,即使 Watermark 生成正常,也可能導致計算結果不準確。例如,在統計網站用戶活躍情況時,如果將窗口設置得過大,如以 1 天為一個窗口,那么在這一天內,可能會有大量的數據涌入,而且數據亂序情況也可能更復雜。即使 Watermark 能夠處理一定程度的亂序數據,但由于窗口時間跨度太長,仍然可能出現部分數據統計不準確的情況。相反,如果窗口設置得過小,如以 1 分鐘為一個窗口,雖然能提高實時性,但可能會導致頻繁的窗口計算,增加系統開銷,而且對于一些需要統計較長時間段內數據的業務場景,可能無法得到準確的結果。?
不同類型的窗口(如滾動窗口、滑動窗口、會話窗口)與 Watermark 的配合方式也有所不同 。滾動窗口是固定大小且不重疊的窗口,Watermark 達到窗口結束時間時觸發計算。滑動窗口有固定的窗口大小和滑動步長,Watermark 需要根據滑動步長和窗口大小來合理觸發計算。會話窗口則是根據事件之間的時間間隔來定義窗口,Watermark 在處理會話窗口時,需要考慮會話的開始和結束條件,以確保窗口計算的準確性。在實際應用中,需要根據業務需求選擇合適的窗口類型,并合理配置 Watermark,以實現準確高效的流處理 。
七、總結
Watermark 作為 Flink 流處理中處理事件時間和亂序數據的關鍵機制,其重要性不言而喻。它本質是一種特殊的時間戳,用于標記事件時間的上界,確保在亂序數據的情況下,窗口操作能夠準確地觸發計算。通過周期性生成或基于事件的生成方式,Watermark 在數據流中不斷推進,在單并行度和多并行度的情況下,以不同的傳播機制在算子間傳遞,最終在滿足觸發條件時,準確地觸發窗口計算。?
在實際應用中,我們可以根據業務場景選擇合適的 Watermark 生成策略,如固定延遲策略、滑動策略或自定義策略 。在代碼實現方面,通過搭建 Flink 開發環境,按照引入依賴、創建流處理環境、定義數據源、設置 Watermark 生成器、進行窗口操作與計算的步驟,能夠實現基于 Watermark 的流處理任務。同時,我們也要注意 Watermark 延遲設置、數據亂序處理以及 Watermark 與窗口的配合等問題,以確保流處理的準確性和高效性。