本文核心觀點
核心觀點:WindowedStream 是一個"假流",它比 KeyedStream 更虛,只是一個 API 的過渡器,不是真正意義上的 DataStream,需要調用函數回歸。
虛擬化時刻:從真實流到虛擬流
KeyedStream<T,K> keyedStream = …; // 半虛擬流
WindowedStream<T,K,W> windowedStream = keyedStream.window(assigner); // 完全虛擬流回歸時刻:從虛擬流回到真實流
windowedStream.sum()
return input.transform(opName, resultType, operator); // 回到DataStream標準流程
一、window() 方法的特殊性發現
1.1 只有 KeyedStream 才有 window 方法
// DataStream 上沒有 window 方法
DataStream<String> stream = ...;
// stream.window(assigner); // 編譯錯誤!// 只有 KeyedStream 才有 window 方法
KeyedStream<String, String> keyedStream = stream.keyBy(...);
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(assigner);
為什么這樣設計?
- 窗口操作需要基于 Key 進行分組
- 每個 Key 都有獨立的窗口狀態
- 保證相同 Key 的數據進入同一個窗口實例
1.2 KeyedStream 的特殊 API 設計
public class KeyedStream<T, KEY> extends DataStream<T> {// 繼承 DataStream 的所有方法:map, filter, flatMap...// KeyedStream 特有的窗口 APIpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);// KeyedStream 特有的聚合 APIpublic SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);public SingleOutputStreamOperator<T> sum(int positionToSum);public SingleOutputStreamOperator<T> max(int positionToMax);// ... 其他聚合操作
}
設計理念:
- 繼承性:保留 DataStream 的所有基礎能力
- 擴展性:增加基于 Key 的特殊操作
- 狀態性:支持有狀態的聚合操作
二、WindowedStream 的"虛擬"本質
2.1 WindowedStream 的創建過程
// KeyedStream.java
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner); // 僅僅是創建對象
}
關鍵發現:window() 方法沒有創建任何 Transformation!
2.2 WindowedStream 的內部結構
public class WindowedStream<T, K, W extends Window> {// 僅有兩個成員變量private final KeyedStream<T, K> input; // 上游流的引用private final WindowOperatorBuilder<T, K, W> builder; // 算子構建器// 注意:沒有繼承 DataStream!
}
2.3 WindowedStream 構造函數解析
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input; // 保存上游流引用// 創建窗口算子構建器,用于構建窗口操作的核心組件// WindowOperatorBuilder是構建者模式的實現,負責組裝窗口操作所需的各種組件this.builder = new WindowOperatorBuilder<>(// 窗口分配器:決定數據元素被分配到哪個窗口// 例如:TumblingEventTimeWindows、SlidingEventTimeWindows等windowAssigner,// 窗口觸發器:決定何時觸發窗口計算和輸出結果// 每種窗口分配器都有其默認的觸發器策略// 例如:EventTimeTrigger用于事件時間窗口,ProcessingTimeTrigger用于處理時間窗口windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),// 執行配置:包含序列化器、并行度等運行時配置信息input.getExecutionConfig(),// 輸入數據類型信息:用于序列化和反序列化輸入數據input.getType(),// Key選擇器:從輸入數據中提取分組鍵,確保相同key的數據進入同一個窗口實例input.getKeySelector(),// Key類型信息:用于序列化和反序列化分組鍵input.getKeyType());
}
重要理解:
- 構造函數只是組裝配置信息,沒有創建算子
- 比 KeyedStream 更"虛",KeyedStream 好歹有個 PartitionTransformation
- WindowedStream 什么 Transformation 都沒有
2.4 WindowedStream 的"虛擬"特性
流類型 | 虛擬化程度 | 特性描述 |
---|---|---|
DataStream | 🟢 真實流 | ? 有 Transformation ? 支持鏈式調用 ? 可直接執行 |
KeyedStream | 🟡 半虛擬流 | ? 有 PartitionTransformation ? 支持鏈式調用 ? 支持窗口API ?? 無實際算子 |
WindowedStream | 🔴 完全虛擬流 | ? 無 Transformation ? 斷開鏈式調用 ? 只支持窗口聚合API ?? 純過渡器 |
WindowedStream 的特殊性:
- 不繼承 DataStream - 徹底斷開鏈式調用
- 純 API 過渡器 - 只是工具類,不是真正的流
- 強制聚合 - 必須調用聚合操作才能回到正常流
- 臨時狀態 - 無法直接使用,必須轉換
WindowedStream 的特殊性:
- 不繼承 DataStream - 徹底斷開鏈式調用
- 純 API 過渡器 - 只是工具類,不是真正的流
- 強制聚合 - 必須調用聚合操作才能回到正常流
- 臨時狀態 - 無法直接使用,必須轉換
三、sum() 方法的完整解析
3.1 sum() 方法的調用鏈
// WindowedStream.java - 入口方法
public SingleOutputStreamOperator<T> sum(int positionToSum) {// 創建內置的求和聚合器return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}// aggregate 方法 - 中轉
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {return reduce(aggregator); // 轉發給 reduce
}
關鍵理解:
- sum() 只是一個便利方法
- 內部使用 Flink 預定義的
SumAggregator
- 最終還是調用 reduce() 方法
3.2 SumAggregator 的本質
// SumAggregator 的繼承關系
public class SumAggregator<T> extends AggregationFunction<T> implements ReduceFunction<T> {private final int positionToSum; // 要求和的字段位置// 實現具體的求和邏輯
}
重要發現:
SumAggregator
就是一個ReduceFunction
- 與用戶自定義的
MapFunction
地位完全相同 - Flink 內部預寫好的函數,用戶也可以自己實現
3.3 reduce() 方法的三層重載
// 第一層:只有 ReduceFunction(我們的入口)
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {function = input.getExecutionEnvironment().clean(function); // 清理函數return reduce(function, new PassThroughWindowFunction<>()); // 添加默認 WindowFunction
}// 第二層:ReduceFunction + WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function) {// 推斷輸出類型TypeInformation<R> resultType = getWindowFunctionReturnType(function, inputType);return reduce(reduceFunction, function, resultType); // 繼續傳遞
}// 第三層:完整參數(最終實現)
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function,TypeInformation<R> resultType) {// 1. 清理函數(序列化檢查)function = input.getExecutionEnvironment().clean(function);reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);// 2. 生成算子名稱和描述final String opName = builder.generateOperatorName();final String opDescription = builder.generateOperatorDescription(reduceFunction, function);// 3. 通過 builder 根據function 創建WindowOperatorOneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);// 4. 根據Operator 創建 OperatorFactory -> transformation -> DataStreamreturn input.transform(opName, resultType, operator).setDescription(opDescription);
}
重載鏈的設計目的:
- 逐步補充參數:從簡單到復雜
- 提供默認值:PassThroughWindowFunction 作為默認窗口函數
- 類型推斷:自動推斷輸出類型
- 函數清理:確保函數可序列化
3.4 PassThroughWindowFunction 的巧妙設計
// 第一層 reduce 方法中的關鍵一行
return reduce(function, new PassThroughWindowFunction<>());
PassThroughWindowFunction 的作用:
// PassThroughWindowFunction 的簡化實現
public class PassThroughWindowFunction<T, K, W extends Window>implements WindowFunction<T, T, K, W> {@Overridepublic void apply(K key, W window, Iterable<T> input, Collector<T> out) {// 直接透傳,不做任何處理for (T element : input) {out.collect(element);}}
}
為什么需要 PassThroughWindowFunction?
- 接口統一:WindowOperator 需要 ReduceFunction + WindowFunction 兩個函數
- 透明傳遞:用戶只想要聚合結果,不需要額外處理
- 適配器模式:將單一的 ReduceFunction 適配為完整的窗口處理流程
五、回到 DataStream 的標準流程
5.1 關鍵的回歸時刻
// WindowedStream 的最后一步 - 回到正軌!
return input.transform(opName, resultType, operator);
這一行代碼的重要性:
input
是KeyedStream
(繼承自DataStream
)- 調用的是
DataStream.transform()
方法 - WindowedStream 完成使命,回到標準流程
5.2 transform() 方法的標準處理
// DataStream.java - 標準的 transform 方法
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 包裝算子為工廠return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
5.3 doTransform() 的核心邏輯
protected <R> SingleOutputStreamOperator<R> doTransform(...) {// 1. 創建物理 TransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, // 上游:PartitionTransformation (keyBy產生的)operatorName, // "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumAggregator, PassThroughWindowFunction)"operatorFactory, // SimpleOperatorFactory(WindowOperator)outTypeInfo, // 輸出類型信息environment.getParallelism(), // 并行度false); // 不是并行度敏感的// 2. 創建新的 DataStreamSingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator<>(environment, resultTransform);// 3. 添加到執行環境 - 重要!getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}
關鍵步驟解析:
- 創建物理 Transformation:包含真正的算子
- 構建新的 DataStream:恢復正常的流
- 注冊到環境:只有物理 Transformation 才會被注冊
六、調用時序圖
導航鏈接
上節鏈接:Flink Stream API 源碼走讀 - keyBy
下節預告:Flink Stream API 源碼走讀 - print