Flink Watermark原理與實戰

一、引言

Flink 作為一款強大的流處理框架,在其中扮演著關鍵角色。今天,咱們來聊聊 Flink 中一個極為重要的概念 —— Watermark(水位線),它是處理亂序數據和準確計算的關鍵。接下來我們直入主題,首先來看看今天的第一個問題Watermark是什么。

二、Watermark 是什么

(一)Watermark 定義?

Watermark(水位線)是 Flink 中用于處理事件時間(Event Time)的一種機制 ,它本質上是一種特殊的時間戳。簡單來說,Watermark 是插入到數據流中的一個標記,用于標記事件時間的上界。在 Flink 的流處理中,Watermark 可以看作是一個時間戳,它表示所有小于該時間戳的事件都已經到達,之后不會再有該時間戳之前的事件到來。?

比如,我們有一個訂單流,每個訂單都有一個下單時間(事件時間)。如果當前 Watermark 的值是 10:00,那就意味著所有下單時間在 10:00 之前的訂單都已經到達 Flink 系統,不會再有 10:00 之前下單的訂單數據了。?

Watermark 必須單調遞增,這是一個非常重要的特性。只有單調遞增,才能確保任務的事件時間時鐘在向前推進,而不是后退。如果 Watermark 不單調遞增,就會導致時間混亂,影響窗口計算等操作的準確性。?

(二)Watermark 作用?

Watermark 的主要作用是解決數據亂序問題 ,確保在事件時間語義下,窗口操作能夠準確地觸發計算。在實際的流處理場景中,由于網絡延遲、分布式系統等因素,數據到達 Flink 系統的順序往往和它們實際產生的時間順序不一致,也就是出現亂序問題。如果直接按照數據到達的順序進行窗口計算,結果可能會不準確。?

舉個例子,假設有一個統計每 10 分鐘內訂單金額總和的窗口操作。如果沒有 Watermark 機制,當一個 10:00 - 10:10 的窗口時間到了,系統可能會立刻觸發計算,而此時可能還有 10:00 - 10:10 之間產生的訂單數據因為網絡延遲還沒到達。這樣計算出來的訂單金額總和就會偏小,結果不準確。?

引入 Watermark 后,我們可以設置一個合理的延遲時間。比如設置延遲 5 分鐘,當系統接收到一個事件時間為 10:05 的訂單時,Watermark 的值可能是 10:00(10:05 - 5 分鐘)。此時,即使 10:00 - 10:10 的窗口時間到了,系統也不會立刻觸發計算,而是會等待,直到 Watermark 的值達到 10:10(10:15 - 5 分鐘),才會觸發窗口計算。這樣就給了可能遲到的訂單數據足夠的時間到達,保證了計算結果的準確性。?

總的來說,Watermark 就像是一個 “裁判”,它告訴 Flink 系統什么時候可以放心地進行窗口計算,不用擔心有遲到的數據會影響結果。通過設置 Watermark,我們可以在一定程度上平衡延遲和結果正確性,讓 Flink 在處理亂序數據時依然能夠給出準確的分析結果。

三、Watermark 工作原理?

(一)生成機制?

Watermark 的生成方式主要有兩種:周期性生成基于事件的生成 。?

周期性生成:Flink 會按照一定的時間間隔周期性地生成 Watermark。在這種方式下,我們需要設置一個最大允許的亂序時間(maxOutOfOrderness)。例如,假設我們設置最大亂序時間為 5 秒,當 Flink 接收到一個事件時間為 10:05 的事件時,它會根據這個事件時間和最大亂序時間來生成 Watermark。此時 Watermark 的值為 10:00(10:05 - 5 秒)。這意味著系統認為所有 10:00 之前的事件都已經到達,不會再有更晚到達的 10:00 之前的事件了。?

在 Flink 中,我們可以通過BoundedOutOfOrdernessWatermarkStrategy來實現周期性生成 Watermark,示例代碼如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.OutputTag;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳return extractTimestamp(element);}));// 后續處理邏輯withWatermark.print();env.execute("Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

基于事件的生成:基于事件的生成方式是當特定的事件發生時生成 Watermark ,這種方式適用于某些特殊的業務場景,比如在電商訂單流中,當接收到一個特殊的 “訂單批次結束” 事件時,生成 Watermark。在 Flink 中,我們可以通過實現AssignerWithPunctuatedWatermarks接口來實現基于事件的 Watermark 生成。這種方式需要我們在處理每個事件時,判斷是否滿足生成 Watermark 的條件。如果滿足,則生成一個新的 Watermark。 基于事件的生成方式示例代碼如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class PunctuatedWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成WatermarkSingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {@Overridepublic long extractTimestamp(String element, long previousElementTimestamp) {// 從元素中提取事件時間戳return extractTimestamp(element);}@Overridepublic Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {// 判斷是否滿足生成Watermark的條件,這里假設當元素為 "END" 時生成Watermarkif ("END".equals(lastElement)) {return new Watermark(extractedTimestamp);}return null;}});// 后續處理邏輯withWatermark.print();env.execute("Punctuated Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

(二)傳播機制?

Watermark 在 Flink 算子間的傳播過程是確保窗口計算準確性的關鍵 。當 Watermark 在數據流中生成后,它會隨著數據一起在算子間傳遞。在單并行度的情況下,Watermark 的傳播相對簡單,直接從上游算子傳遞到下游算子。比如,在一個簡單的數據流中,數據源生成 Watermark 后,經過 map 算子,再到 window 算子,Watermark 會依次傳遞下去。?

在多并行度的情況下,Watermark 的傳播會涉及到對齊(alignment)的概念 。假設一個數據源有多個并行子任務,每個子任務都會生成自己的 Watermark。當這些帶有不同 Watermark 的數據流匯聚到下游算子時,下游算子會選擇所有輸入流中最小的 Watermark 作為自己的 Watermark。這是因為只有當所有輸入流中小于等于該 Watermark 時間戳的事件都到達后,才能確保窗口計算的準確性。比如,有三個并行輸入流,它們的 Watermark 分別為 10:00、10:05 和 10:10,那么下游算子會選擇 10:00 作為自己的 Watermark。這樣做的目的是為了保證所有可能參與窗口計算的數據都已經到達,避免因為某個輸入流的數據延遲而導致窗口計算結果不準確。?

(三)觸發機制?

Watermark 達到一定條件時會觸發窗口計算 。具體來說,當 Watermark 的值大于或等于窗口的結束時間,并且該窗口內有數據時,就會觸發窗口計算。例如,我們定義一個 10:00 - 10:10 的窗口,設置最大亂序時間為 5 分鐘。當 Watermark 的值達到 10:10(假設事件時間為 10:15 的數據到達,生成的 Watermark 為 10:10,10:15 - 5 分鐘),并且 10:00 - 10:10 這個窗口內有數據時,就會觸發窗口計算,計算該窗口內的數據結果并輸出。?

再舉個例子,假設我們有一個統計每 5 分鐘內用戶訪問次數的窗口操作。如果沒有數據遲到,當時間到達 10:05 時,窗口內的數據統計完成,Watermark 達到 10:05,此時窗口計算被觸發,輸出 10:00 - 10:05 這個時間段內的用戶訪問次數。但如果有數據遲到,比如 10:03 產生的數據在 10:08 才到達,由于我們設置了一定的亂序時間,Watermark 不會立刻達到 10:05,而是會等待遲到的數據。當 Watermark 最終達到 10:05 時,窗口計算才會觸發,這樣就能保證遲到的數據也能被正確統計到 10:00 - 10:05 的窗口內 。

四、Watermark 生成策略?

(一)固定延遲策略?

固定延遲策略是一種較為簡單直觀的 Watermark 生成策略 。它的原理是在接收到的最大事件時間的基礎上,減去一個固定的延遲時間,以此來生成 Watermark。例如,我們設置固定延遲時間為 10 秒,當接收到的事件時間為 10:00:15 時,生成的 Watermark 時間戳就是 10:00:05(10:00:15 - 10 秒)。這表示系統認為所有 10:00:05 之前的事件都已經到達,不會再有更晚到達的 10:00:05 之前的事件了。?

在 Flink 中,使用固定延遲策略生成 Watermark 的代碼示例如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class FixedDelayWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));// 生成Watermark,設置固定延遲10秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳return extractTimestamp(element);}));// 后續處理邏輯withWatermark.print();env.execute("Fixed Delay Watermark Example");}private static long extractTimestamp(String element) {// 實際應用中根據數據格式提取事件時間戳// 這里只是示例,假設數據格式為 "value, timestamp"String[] parts = element.split(",");return Long.parseLong(parts[1]);}
}

(二)滑動策略?

滑動策略是指 Watermark 按照一定的時間間隔(滑動步長)和窗口大小進行生成 。與固定延遲策略不同,滑動策略下的 Watermark 不是簡單地基于最大事件時間減去固定延遲,而是在滑動窗口的基礎上生成。例如,我們設置滑動窗口大小為 5 分鐘,滑動步長為 1 分鐘。當第一個窗口(0:00 - 0:05)內的事件處理完成后,生成第一個 Watermark,其時間戳為 0:05。接著,當第二個窗口(0:01 - 0:06)內的事件處理完成后,生成第二個 Watermark,其時間戳為 0:06,以此類推。?

以電商訂單統計為例,如果我們想統計每 5 分鐘內的訂單金額總和,并且希望每分鐘更新一次統計結果,就可以使用滑動策略。假設我們設置滑動窗口大小為 5 分鐘,滑動步長為 1 分鐘。當接收到第一個訂單時,判斷其事件時間,將其分配到對應的窗口(例如 0:00 - 0:05)。隨著訂單的不斷接收,當 0:00 - 0:05 這個窗口內的訂單處理完成后,生成一個 Watermark,其時間戳為 0:05,表示 0:05 之前的訂單都已經到達。此時,開始計算 0:00 - 0:05 這個窗口內的訂單金額總和并輸出結果。接著,當 0:01 - 0:06 這個窗口內的訂單處理完成后,生成下一個 Watermark,時間戳為 0:06,計算并輸出 0:01 - 0:06 這個窗口內的訂單金額總和。這樣,我們就能每分鐘得到一個最近 5 分鐘內的訂單金額統計結果。?

(三)自定義策略?

在一些復雜的業務場景中,固定延遲策略和滑動策略可能無法滿足需求,這時就需要自定義 Watermark 生成策略 。自定義策略允許用戶根據具體的業務邏輯和數據特點,靈活地生成 Watermark。例如,在物聯網設備數據處理場景中,不同設備的數據產生頻率和延遲情況可能各不相同。有些設備可能每隔幾秒就發送一次數據,而有些設備可能由于網絡問題,數據延遲較高。在這種情況下,我們可以根據設備 ID 來分別設置不同的 Watermark 生成策略。對于數據產生頻率高、延遲低的設備,可以設置較小的固定延遲;對于數據延遲較高的設備,可以根據歷史數據的延遲情況,動態地調整 Watermark 的生成邏輯。?

在 Flink 中,實現自定義 Watermark 生成策略需要實現WatermarkStrategy接口 。通過重寫接口中的方法,我們可以定義自己的 Watermark 生成邏輯。例如,下面是一個簡單的自定義 Watermark 生成策略的示例代碼:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class CustomWatermarkStrategy implements WatermarkStrategy<String> {@Overridepublic WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomWatermarkGenerator();}private static class CustomWatermarkGenerator implements WatermarkGenerator<String> {private long maxTimestamp;private final long delay = 10000; // 延遲10秒public CustomWatermarkGenerator() {this.maxTimestamp = Long.MIN_VALUE;}@Overridepublic void onEvent(String event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - delay));}}
}

?在上述代碼中,我們定義了一個CustomWatermarkStrategy類,實現了WatermarkStrategy接口。在createWatermarkGenerator方法中,返回一個CustomWatermarkGenerator實例。CustomWatermarkGenerator類實現了WatermarkGenerator接口,通過onEvent方法記錄接收到的最大事件時間,在onPeriodicEmit方法中,根據最大事件時間減去固定延遲生成 Watermark。這樣,我們就實現了一個簡單的自定義 Watermark 生成策略,可以根據具體業務需求進行靈活調整 。

五、實際生產使用

定義數據源:這里我們以從 Kafka 讀取數據為例,假設 Kafka 中存儲的是訂單數據,每條數據格式為訂單ID,下單時間戳,訂單金額。示例代碼如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.util.Properties;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Kafka配置Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");// 從Kafka讀取數據DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 后續代碼...}
}

設置 Watermark 生成器:我們采用固定延遲策略生成 Watermark,設置最大亂序時間為 5 秒。示例代碼如下:

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));// 設置Watermark生成器,固定延遲5秒SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {// 從元素中提取事件時間戳,假設數據格式為 "訂單ID,下單時間戳,訂單金額"String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 后續代碼...}
}

窗口操作與計算:對帶有 Watermark 的數據進行窗口操作,統計每 10 分鐘內的訂單總金額。示例代碼如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "watermark-group");DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>("order-topic", new SimpleStringSchema(), props));SingleOutputStreamOperator<String> withWatermark = source.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> {String[] parts = element.split(",");return Long.parseLong(parts[1]);}));// 按訂單ID分組,進行窗口操作,統計每10分鐘內的訂單總金額DataStream<String> result = withWatermark.keyBy(value -> value.split(",")[0]).timeWindow(Time.minutes(10)).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {double totalAmount = 0;for (String element : input) {String[] parts = element.split(",");double amount = Double.parseDouble(parts[2]);totalAmount += amount;}out.collect("訂單ID: " + key + ", 窗口開始時間: " + window.getStart() + ", 窗口結束時間: " + window.getEnd() + ", 訂單總金額: " + totalAmount);}});result.print();env.execute("Watermark Example");}
}

代碼解析?

  1. 創建流處理環境:StreamExecutionEnvironment.getExecutionEnvironment()獲取當前運行環境的執行環境對象,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)設置流處理的時間特性為事件時間,這是使用 Watermark 的前提。?
  2. 定義數據源:通過FlinkKafkaConsumer從 Kafka 中讀取數據,props中配置了 Kafka 的連接地址和消費者組 ID,SimpleStringSchema用于將 Kafka 中的數據反序列化為字符串。?
  3. 設置 Watermark 生成器:WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))表示采用固定延遲策略生成 Watermark,最大亂序時間為 5 秒。withTimestampAssigner方法用于從輸入數據中提取事件時間戳,這里假設數據格式為訂單ID,下單時間戳,訂單金額,通過Long.parseLong(parts[1])提取下單時間戳。?
  4. 窗口操作與計算:keyBy(value -> value.split(",")[0])按訂單 ID 進行分組,timeWindow(Time.minutes(10))定義了一個 10 分鐘的滾動窗口,apply方法中實現了具體的計算邏輯,遍歷窗口內的所有訂單數據,累加訂單金額,最后輸出統計結果。

?當我們運行上述代碼后,控制臺會輸出每 10 分鐘內每個訂單 ID 的訂單總金額。例如:

訂單ID: 1001, 窗口開始時間: 1633420800000, 窗口結束時間: 1633421400000, 訂單總金額: 1000.5

訂單ID: 1002, 窗口開始時間: 1633420800000, 窗口結束時間: 1633421400000, 訂單總金額: 2000.3

從結果可以看出,Watermark 機制有效地處理了亂序數據,確保了窗口計算的準確性。即使訂單數據到達的順序混亂,通過 Watermark 的延遲策略,Flink 也能準確地統計出每個窗口內的訂單總金額 。?

六、常見問題?

(一)Watermark 延遲設置?

在設置 Watermark 延遲時間時,一定要充分考慮業務場景的需求和數據的實際延遲情況 。如果延遲設置過小,可能無法給遲到的數據足夠的時間到達,導致窗口計算結果不準確,丟失重要數據。比如在電商促銷活動期間,訂單數據量會大幅增加,網絡延遲也可能更嚴重。如果 Watermark 延遲設置只有 1 分鐘,很可能一些在促銷活動剛開始幾分鐘內產生的訂單數據因為網絡擁堵,在 1 分鐘后才到達,這些遲到的訂單數據就無法被正確統計到相應的窗口內,影響銷售數據的統計準確性。?

相反,如果延遲設置過大,雖然能確保所有數據都能被處理,但會導致窗口計算的延遲增加,影響實時性 。假設一個實時監控系統,需要及時發現設備的異常情況并發出警報。如果 Watermark 延遲設置為 30 分鐘,即使設備在幾分鐘前就出現了異常,但由于 Watermark 延遲時間過長,要等 30 分鐘后才會觸發窗口計算,這就導致警報發出延遲,可能會給生產帶來嚴重的損失。?

(二)數據亂序處理?

當數據亂序嚴重時,原本設置的 Watermark 策略可能無法滿足需求 。比如在一些特殊的物聯網場景中,由于傳感器分布范圍廣,網絡狀況復雜,數據亂序情況非常嚴重。此時,可以考慮增加 Watermark 的延遲時間,以確保更多遲到的數據能夠被正確處理。例如,原本設置的延遲時間為 5 秒,在數據亂序嚴重的情況下,可以將延遲時間增加到 10 秒甚至更長。?

還可以結合其他技術手段,如使用側輸出流(Side Output)來處理遲到的數據 。側輸出流可以將遲到的數據單獨收集起來,進行后續的特殊處理。在電商訂單處理中,對于超過一定延遲時間(如 10 分鐘)的訂單數據,可以將其輸出到側輸出流,然后對這些數據進行單獨分析,看是否是由于某些特殊原因導致的延遲,如支付系統故障、物流信息更新不及時等。?

(三)Watermark 與窗口的配合?

Watermark 與窗口的大小和類型密切相關 ,直接影響窗口計算的正確性。如果窗口大小設置不合理,即使 Watermark 生成正常,也可能導致計算結果不準確。例如,在統計網站用戶活躍情況時,如果將窗口設置得過大,如以 1 天為一個窗口,那么在這一天內,可能會有大量的數據涌入,而且數據亂序情況也可能更復雜。即使 Watermark 能夠處理一定程度的亂序數據,但由于窗口時間跨度太長,仍然可能出現部分數據統計不準確的情況。相反,如果窗口設置得過小,如以 1 分鐘為一個窗口,雖然能提高實時性,但可能會導致頻繁的窗口計算,增加系統開銷,而且對于一些需要統計較長時間段內數據的業務場景,可能無法得到準確的結果。?

不同類型的窗口(如滾動窗口、滑動窗口、會話窗口)與 Watermark 的配合方式也有所不同 。滾動窗口是固定大小且不重疊的窗口,Watermark 達到窗口結束時間時觸發計算。滑動窗口有固定的窗口大小和滑動步長,Watermark 需要根據滑動步長和窗口大小來合理觸發計算。會話窗口則是根據事件之間的時間間隔來定義窗口,Watermark 在處理會話窗口時,需要考慮會話的開始和結束條件,以確保窗口計算的準確性。在實際應用中,需要根據業務需求選擇合適的窗口類型,并合理配置 Watermark,以實現準確高效的流處理 。

七、總結

Watermark 作為 Flink 流處理中處理事件時間和亂序數據的關鍵機制,其重要性不言而喻。它本質是一種特殊的時間戳,用于標記事件時間的上界,確保在亂序數據的情況下,窗口操作能夠準確地觸發計算。通過周期性生成或基于事件的生成方式,Watermark 在數據流中不斷推進,在單并行度和多并行度的情況下,以不同的傳播機制在算子間傳遞,最終在滿足觸發條件時,準確地觸發窗口計算。?

在實際應用中,我們可以根據業務場景選擇合適的 Watermark 生成策略,如固定延遲策略、滑動策略或自定義策略 。在代碼實現方面,通過搭建 Flink 開發環境,按照引入依賴、創建流處理環境、定義數據源、設置 Watermark 生成器、進行窗口操作與計算的步驟,能夠實現基于 Watermark 的流處理任務。同時,我們也要注意 Watermark 延遲設置、數據亂序處理以及 Watermark 與窗口的配合等問題,以確保流處理的準確性和高效性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/89181.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/89181.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/89181.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Rust Web 全棧開發(五):使用 sqlx 連接 MySQL 數據庫

Rust Web 全棧開發&#xff08;五&#xff09;&#xff1a;使用 sqlx 連接 MySQL 數據庫Rust Web 全棧開發&#xff08;五&#xff09;&#xff1a;使用 sqlx 連接 MySQL 數據庫項目創建數據庫準備連接請求功能實現Rust Web 全棧開發&#xff08;五&#xff09;&#xff1a;使用…

【zynq7020】PS的“Hello World”

目錄 基本過程 新建Vivado工程 ZYNQ IP核設置 使用SDK進行軟件開發 基于Vivado2017 Vivado工程建立 SDK調試 固化程序 注&#xff1a;Vivado 2019.1 及之前&#xff1a;默認使用 SDK Vivado 2019.2-2020.1&#xff1a;逐步過渡&#xff0c;支持 SDK 與 Vitis 并存 Vi…

希爾排序和選擇排序及計數排序的簡單介紹

希爾排序法又稱縮小增量法。希爾排序法的基本思想是&#xff1a;先選定一個整數gap&#xff0c;把待排序文件中所有數據分成幾個組&#xff0c;所有距離為gap的數據分在同一組內&#xff0c;并對每一組內的數據進行排序。然后gap減減&#xff0c;重復上述分組和排序的工作。當到…

Solid Edge多項目并行,浮動許可如何高效調度?

在制造企業的數字化設計體系中&#xff0c;Solid Edge 作為主流 CAD 工具&#xff0c;因其靈活的建模能力、同步技術和強大的裝配設計功能&#xff0c;廣泛應用于機械設備、零部件制造等行業的研發場景。隨著企業設計任務復雜化&#xff0c;多項目并行成為常態&#xff0c;Soli…

Flink cdc 使用總結

Flink 與 Flink CDC 版本兼容對照表Flink 版本支持的 Flink CDC 版本關鍵說明Flink 1.11.xFlink CDC 1.2.x早期版本&#xff0c;需注意 Flink 1.11.0 的 Bug&#xff08;如 Upsert 寫入問題&#xff09;&#xff0c;建議使用 1.11.1 及以上。Flink 1.12.xFlink CDC 2.0.x&#…

企業培訓筆記:axios 發送 ajax 請求

文章目錄axios 簡介一&#xff0c;Vue工程中安裝axios二&#xff0c;編寫app.vue三&#xff0c;編寫HomeView.vue四&#xff0c;Idea打開后臺項目五&#xff0c;創建HelloController六&#xff0c;配置web訪問端口七&#xff0c;運行項目&#xff0c;查看效果&#xff08;一&am…

Maven下載與配置對Java項目的理解

目錄 一、背景 二、JAVA項目與Maven的關系 2.1標準java項目 2.2 maven 2.2.1 下載maven 1、下載 2、配置環境 2.2.2 setting.xml 1、配置settings.xml 2、IDEA配置maven 一、背景 在java項目中&#xff0c;新手小白很有可能看不懂整體的目錄結構&#xff0c;以及每個…

Mars3d的走廊只能在一個平面的無法折疊的解決方案

問題場景&#xff1a;1. Mars3d的CorridorEntity只能在一個平面修改高度值&#xff0c;無法根據坐標點位制作有高度值的走廊效果&#xff0c;想要做大蜀山盤山走廊的效果實現不了。解決方案&#xff1a;1.使用原生cesium實現對應的走廊的截面形狀、走廊的坐標點&#xff0c;包括…

LeetCode 每日一題 2025/7/7-2025/7/13

記錄了初步解題思路 以及本地實現代碼&#xff1b;并不一定為最優 也希望大家能一起探討 一起進步 目錄7/7 1353. 最多可以參加的會議數目7/8 1751. 最多可以參加的會議數目 II7/9 3439. 重新安排會議得到最多空余時間 I7/10 3440. 重新安排會議得到最多空余時間 II7/11 3169. …

Bash常見條件語句和循環語句

以下是 Bash 中常用的條件語句和循環語句分類及語法說明&#xff0c;附帶典型用例&#xff1a;一、條件語句 1. if 語句 作用&#xff1a;根據條件執行不同代碼塊 語法&#xff1a; if [ 條件 ]; then# 條件為真時執行 elif [ 其他條件 ]; then# 其他條件為真時執行 else# 所有…

uni-app 選擇國家區號

uni-app選擇國家區號組件 hy-countryPicker 我們在做登錄注冊功能的時候&#xff0c;可能會遇到選擇區號來使用不同國家手機號來登錄或者注冊的功能。這里我就介紹下我這個uni-app中使用的選擇區號的組件&#xff0c;包含不同國家國旗圖標。 效果圖 別的不說&#xff0c;先來…

客戶端主機宕機,服務端如何處理 TCP 連接?詳解

文章目錄一、客戶端主機宕機后迅速重啟1、服務端有數據發送2、服務端開啟「保活」機制3、服務端既沒有數據發送&#xff0c;也沒有開啟「保活」機制二、客戶端主機宕機后一直沒有重啟1、服務端有數據發送2、服務端開啟「保活」機制3、服務端既沒有數據發送&#xff0c;也沒有開…

《大數據技術原理與應用》實驗報告五 熟悉 Hive 的基本操作

目 錄 一、實驗目的 二、實驗環境 三、數據集 四、實驗內容與完成情況 4.1 創建一個內部表 stocks&#xff0c;字段分隔符為英文逗號&#xff0c;表結構下所示。 4.2 創建一個外部分區表 dividends&#xff08;分區字段為 exchange 和symbol&#xff09;&#xff0c;字段…

【橘子分布式】Thrift RPC(編程篇)

一、簡介 之前我們研究了一下thrift的一些知識&#xff0c;我們知道他是一個rpc框架&#xff0c;他作為rpc自然是提供了客戶端到服務端的訪問以及兩端數據傳輸的消息序列化&#xff0c;消息的協議解析和傳輸&#xff0c;所以我們今天就來了解一下他是如何實現這些功能&#xff…

清理C盤--辦法

c盤經常爆紅1、命令行2、屬性3、臨時文件

Java-71 深入淺出 RPC Dubbo 上手 父工程配置編寫 附詳細POM與代碼

點一下關注吧&#xff01;&#xff01;&#xff01;非常感謝&#xff01;&#xff01;持續更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持續更新中&#xff01;&#xff08;長期更新&#xff09; AI煉丹日志-29 - 字節跳動 DeerFlow 深度研究框斜體樣式架 私有…

創客匠人:創始人 IP 打造的內核,藏在有效的精神成長里

當創始人 IP 成為企業增長的重要引擎&#xff0c;許多人急于尋找 “爆款公式”&#xff0c;卻忽略了一個更本質的問題&#xff1a;IP 的生命力&#xff0c;終究源于創始人的精神成長。創客匠人在深耕知識付費賽道的過程中&#xff0c;見證了無數案例&#xff1a;那些能持續實現…

GPT和MBR分區

GPT&#xff08;GUID分區表&#xff09;和MBR&#xff08;主引導記錄&#xff09;是兩種不同的磁盤分區表格式&#xff0c;用于定義硬盤上分區的布局、位置及啟動信息&#xff0c;二者在設計、功能和適用場景上有顯著差異。以下從多個維度詳細對比&#xff1a; 一、核心定義與起…

c#進階之數據結構(字符串篇)----String

1、String介紹首先我們得明白&#xff0c;string和String代表的實際上是同一個類型&#xff0c;string是C#中的關鍵字&#xff0c;代表String類型&#xff0c;因此我們直接來學習String類型。從官方的底層實現代碼可以看出&#xff0c;當前String類型實際上就是一個Char類型的聚…

快速排序遞歸和非遞歸方法的簡單介紹

基本思想為&#xff1a;任取待排序元素序列中 的某元素作為基準值&#xff0c;按照該排序碼將待排序集合分割成兩子序列&#xff0c;左子序列中所有元素均小于基準值&#xff0c;右 子序列中所有元素均大于基準值&#xff0c;然后最左右子序列重復該過程&#xff0c;直到所有元…