Flink Stream API 源碼走讀 - window 和 sum

本文核心觀點

核心觀點:WindowedStream 是一個"假流",它比 KeyedStream 更虛,只是一個 API 的過渡器,不是真正意義上的 DataStream,需要調用函數回歸。

  1. 虛擬化時刻:從真實流到虛擬流
    KeyedStream<T,K> keyedStream = …; // 半虛擬流
    WindowedStream<T,K,W> windowedStream = keyedStream.window(assigner); // 完全虛擬流

  2. 回歸時刻:從虛擬流回到真實流
    windowedStream.sum()
    return input.transform(opName, resultType, operator); // 回到DataStream標準流程

一、window() 方法的特殊性發現

1.1 只有 KeyedStream 才有 window 方法

//  DataStream 上沒有 window 方法
DataStream<String> stream = ...;
// stream.window(assigner); // 編譯錯誤!//  只有 KeyedStream 才有 window 方法
KeyedStream<String, String> keyedStream = stream.keyBy(...);
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(assigner);

為什么這樣設計?

  • 窗口操作需要基于 Key 進行分組
  • 每個 Key 都有獨立的窗口狀態
  • 保證相同 Key 的數據進入同一個窗口實例

1.2 KeyedStream 的特殊 API 設計

public class KeyedStream<T, KEY> extends DataStream<T> {// 繼承 DataStream 的所有方法:map, filter, flatMap...// KeyedStream 特有的窗口 APIpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);// KeyedStream 特有的聚合 APIpublic SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);public SingleOutputStreamOperator<T> sum(int positionToSum);public SingleOutputStreamOperator<T> max(int positionToMax);// ... 其他聚合操作
}

設計理念

  • 繼承性:保留 DataStream 的所有基礎能力
  • 擴展性:增加基于 Key 的特殊操作
  • 狀態性:支持有狀態的聚合操作

二、WindowedStream 的"虛擬"本質

2.1 WindowedStream 的創建過程

// KeyedStream.java
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);  // 僅僅是創建對象
}

關鍵發現:window() 方法沒有創建任何 Transformation!

2.2 WindowedStream 的內部結構

public class WindowedStream<T, K, W extends Window> {// 僅有兩個成員變量private final KeyedStream<T, K> input;           // 上游流的引用private final WindowOperatorBuilder<T, K, W> builder;  // 算子構建器// 注意:沒有繼承 DataStream!
}

2.3 WindowedStream 構造函數解析

public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;  // 保存上游流引用// 創建窗口算子構建器,用于構建窗口操作的核心組件// WindowOperatorBuilder是構建者模式的實現,負責組裝窗口操作所需的各種組件this.builder = new WindowOperatorBuilder<>(// 窗口分配器:決定數據元素被分配到哪個窗口// 例如:TumblingEventTimeWindows、SlidingEventTimeWindows等windowAssigner,// 窗口觸發器:決定何時觸發窗口計算和輸出結果// 每種窗口分配器都有其默認的觸發器策略// 例如:EventTimeTrigger用于事件時間窗口,ProcessingTimeTrigger用于處理時間窗口windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),// 執行配置:包含序列化器、并行度等運行時配置信息input.getExecutionConfig(),// 輸入數據類型信息:用于序列化和反序列化輸入數據input.getType(),// Key選擇器:從輸入數據中提取分組鍵,確保相同key的數據進入同一個窗口實例input.getKeySelector(),// Key類型信息:用于序列化和反序列化分組鍵input.getKeyType());
}

重要理解

  • 構造函數只是組裝配置信息,沒有創建算子
  • 比 KeyedStream 更"虛",KeyedStream 好歹有個 PartitionTransformation
  • WindowedStream 什么 Transformation 都沒有

2.4 WindowedStream 的"虛擬"特性

流類型虛擬化程度特性描述
DataStream🟢 真實流? 有 Transformation
? 支持鏈式調用
? 可直接執行
KeyedStream🟡 半虛擬流? 有 PartitionTransformation
? 支持鏈式調用
? 支持窗口API
?? 無實際算子
WindowedStream🔴 完全虛擬流? 無 Transformation
? 斷開鏈式調用
? 只支持窗口聚合API
?? 純過渡器

WindowedStream 的特殊性

  1. 不繼承 DataStream - 徹底斷開鏈式調用
  2. 純 API 過渡器 - 只是工具類,不是真正的流
  3. 強制聚合 - 必須調用聚合操作才能回到正常流
  4. 臨時狀態 - 無法直接使用,必須轉換

WindowedStream 的特殊性

  1. 不繼承 DataStream - 徹底斷開鏈式調用
  2. 純 API 過渡器 - 只是工具類,不是真正的流
  3. 強制聚合 - 必須調用聚合操作才能回到正常流
  4. 臨時狀態 - 無法直接使用,必須轉換

三、sum() 方法的完整解析

3.1 sum() 方法的調用鏈

// WindowedStream.java - 入口方法
public SingleOutputStreamOperator<T> sum(int positionToSum) {// 創建內置的求和聚合器return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}// aggregate 方法 - 中轉
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {return reduce(aggregator);  // 轉發給 reduce
}

關鍵理解

  • sum() 只是一個便利方法
  • 內部使用 Flink 預定義的 SumAggregator
  • 最終還是調用 reduce() 方法

3.2 SumAggregator 的本質

// SumAggregator 的繼承關系
public class SumAggregator<T> extends AggregationFunction<T> implements ReduceFunction<T> {private final int positionToSum;  // 要求和的字段位置// 實現具體的求和邏輯
}

重要發現

  • SumAggregator 就是一個 ReduceFunction
  • 與用戶自定義的 MapFunction 地位完全相同
  • Flink 內部預寫好的函數,用戶也可以自己實現

3.3 reduce() 方法的三層重載

// 第一層:只有 ReduceFunction(我們的入口)
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {function = input.getExecutionEnvironment().clean(function);  // 清理函數return reduce(function, new PassThroughWindowFunction<>());  // 添加默認 WindowFunction
}// 第二層:ReduceFunction + WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function) {// 推斷輸出類型TypeInformation<R> resultType = getWindowFunctionReturnType(function, inputType);return reduce(reduceFunction, function, resultType);  // 繼續傳遞
}// 第三層:完整參數(最終實現)
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function,TypeInformation<R> resultType) {// 1. 清理函數(序列化檢查)function = input.getExecutionEnvironment().clean(function);reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);// 2. 生成算子名稱和描述final String opName = builder.generateOperatorName();final String opDescription = builder.generateOperatorDescription(reduceFunction, function);// 3. 通過 builder 根據function 創建WindowOperatorOneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);// 4. 根據Operator 創建 OperatorFactory -> transformation -> DataStreamreturn input.transform(opName, resultType, operator).setDescription(opDescription);
}

重載鏈的設計目的

  • 逐步補充參數:從簡單到復雜
  • 提供默認值:PassThroughWindowFunction 作為默認窗口函數
  • 類型推斷:自動推斷輸出類型
  • 函數清理:確保函數可序列化

3.4 PassThroughWindowFunction 的巧妙設計

// 第一層 reduce 方法中的關鍵一行
return reduce(function, new PassThroughWindowFunction<>());

PassThroughWindowFunction 的作用

// PassThroughWindowFunction 的簡化實現
public class PassThroughWindowFunction<T, K, W extends Window>implements WindowFunction<T, T, K, W> {@Overridepublic void apply(K key, W window, Iterable<T> input, Collector<T> out) {// 直接透傳,不做任何處理for (T element : input) {out.collect(element);}}
}

為什么需要 PassThroughWindowFunction?

  • 接口統一:WindowOperator 需要 ReduceFunction + WindowFunction 兩個函數
  • 透明傳遞:用戶只想要聚合結果,不需要額外處理
  • 適配器模式:將單一的 ReduceFunction 適配為完整的窗口處理流程
用戶調用sum
只有ReduceFunction
SumAggregator
自動添加
PassThroughWindowFunction
WindowOperator需要的
完整函數對

五、回到 DataStream 的標準流程

5.1 關鍵的回歸時刻

// WindowedStream 的最后一步 - 回到正軌!
return input.transform(opName, resultType, operator);

這一行代碼的重要性

  • inputKeyedStream(繼承自 DataStream
  • 調用的是 DataStream.transform() 方法
  • WindowedStream 完成使命,回到標準流程

5.2 transform() 方法的標準處理

// DataStream.java - 標準的 transform 方法
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 包裝算子為工廠return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

5.3 doTransform() 的核心邏輯

protected <R> SingleOutputStreamOperator<R> doTransform(...) {// 1. 創建物理 TransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上游:PartitionTransformation (keyBy產生的)operatorName,          // "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumAggregator, PassThroughWindowFunction)"operatorFactory,       // SimpleOperatorFactory(WindowOperator)outTypeInfo,          // 輸出類型信息environment.getParallelism(),  // 并行度false);               // 不是并行度敏感的// 2. 創建新的 DataStreamSingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator<>(environment, resultTransform);// 3. 添加到執行環境 - 重要!getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

關鍵步驟解析

  1. 創建物理 Transformation:包含真正的算子
  2. 構建新的 DataStream:恢復正常的流
  3. 注冊到環境:只有物理 Transformation 才會被注冊

六、調用時序圖

在這里插入圖片描述

導航鏈接

上節鏈接:Flink Stream API 源碼走讀 - keyBy

下節預告:Flink Stream API 源碼走讀 - print

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/93660.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/93660.shtml
英文地址,請注明出處:http://en.pswp.cn/web/93660.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

藍牙 GFSK RX Core 架構解析

GFSK RX Core分為以下幾個模塊&#xff1a; 1.Frequency offset compensation CORDIC 2.A low pass filter 3.A power estimator for packet detection,RSSI and digital gaion computation for DPSK path 4.A demodulator implemented as Phase Shift Discriminator 5.A drequ…

微電網管控系統中python多線程緩存與SQLite多數據庫文件連接池實踐總結(含源碼)

1. 引言 在分散的微電網能源管理場景中,系統采用集中式云平臺模式,為100個獨立微電網用戶提供高并發數據寫入服務面臨三大挑戰:用戶數據隔離、I/O性能瓶頸、多線程安全性。本文揭示一種新式的分片鎖+三級緩存+sqlite多數據庫文件連接池架構,在保持SQLite輕量級優勢的同時,…

InfluxDB 開發工具鏈:IDE 插件與調試技巧(一)

引言 ** 在當今數字化時代&#xff0c;時間序列數據的處理與分析在眾多領域中都扮演著至關重要的角色。無論是物聯網設備產生的海量傳感器數據&#xff0c;還是金融市場中實時波動的交易數據&#xff0c;又或是服務器運維過程中不斷產生的性能指標數據&#xff0c;這些都屬于…

計算機網絡-IPv6

1、IPv6基礎IPv4與IPv6的對比&#xff1a;問題IPv4的缺陷IPv6的優勢地址空間IPv4地址采用32比特標識&#xff0c;能提供的地址數量是43億&#xff0c;分配很不均衡。針對IPv4的地址短缺問題&#xff0c;有幾種解決方案&#xff1a;無類別域間路由CIDR&#xff08;Classless Int…

整體設計 之“凝聚式中心點”原型 --整除:智能合約和DBMS的深層融合 之2

摘要&#xff08;CSDN的AI助手自動生成的&#xff09;本文提出了一種基于"整除"數學原型的智能合約與DBMS融合架構設計&#xff0c;將SQL查詢語句的四個關鍵段&#xff08;SELECT、FROM、WHERE、BY&#xff09;分別映射到整除運算的四個要素&#xff08;商、被除數、…

【趙渝強老師】TiDB表數據與鍵值對的映射關系

TiDB實例將表中的每一行數據映射成RocksDB中的鍵值對&#xff0c;則需要考慮如何構造Key和Value。首先&#xff0c;OLTP場景下有大量針對單行或者多行的增、刪、改、查等操作&#xff0c;要求數據庫具備快速讀取一行數據的能力。因此&#xff0c;對應的Key最好有一個唯一ID&…

帶操作系統的延時函數

delay.c:#include "delay.h"/*** brief 微秒級延時* param nus 延時時長&#xff0c;范圍&#xff1a;0~233015* retval 無*/ void delay_us(uint32_t nus) {uint32_t ticks;uint32_t tcnt 0, told, tnow;uint32_t reload SysTick->LOAD; //重…

ES Module 和 CommonJS的區別

ES Module&#xff08;ESM&#xff0c;ES6 模塊系統&#xff09;和 CommonJS 是 JavaScript 中兩種主流的模塊規范&#xff0c;分別用于現代前端和 Node.js 環境&#xff08;早期&#xff09;&#xff0c;它們在語法、加載機制、特性等方面有顯著區別。以下是詳細對比&#xff…

貓頭虎AI分享|一款智能量化交易系統:QuantCell,從數據收集到策略執行全流程自動化

貓頭虎AI分享&#xff5c;一款智能量化交易系統&#xff1a;QuantCell&#xff0c;從數據收集到策略執行全流程自動化 在當今金融市場中&#xff0c;量化交易系統已經成為越來越多投資者和機構的重要選擇。無論是股票、期貨還是加密貨幣&#xff0c;自動化交易與人工智能的結合…

直播美顏SDK架構揭秘:動態貼紙功能的實現原理與性能優化

如今&#xff0c;美顏SDK 已經不再只是“磨皮、美白”的基礎工具&#xff0c;而是逐漸進化為一個涵蓋 人臉識別、實時特效、動態貼紙交互 的復雜技術體系。尤其是 動態貼紙功能 的加入&#xff0c;讓主播與觀眾之間的互動更加生動有趣&#xff0c;也成為提升用戶粘性與平臺差異…

Docker安裝CDC

Docker安裝CDC拉取鏡像離線形式安裝上傳文件并創建docker-compose.yml把鏡像加載到docker中啟動容器連接數據庫創建賬號&#xff0c;并給賬號授權設置wal_level確認wal_level的值創建鏈接查詢連接狀態使用kafdrop消息中看不到修改之前的信息怎么辦補充拉取鏡像 docker pull co…

如何在win服務器中部署若依項目

一、安裝jdk的環境&#xff1a; 這一步很簡單&#xff0c;直接拿到安裝包雙擊安裝即可。 二、配置jdk的環境變量默認安裝的路徑為&#xff1a;C:\Program Files (x86)\Java\jdk1.7.0_51安裝完成之后進行環境變量配置右擊計算機&#xff08;此電腦&#xff09;點擊屬性點擊高級系…

CSS從入門到精通完整指南

第一部分&#xff1a;CSS基礎入門1.1 什么是CSSCSS&#xff08;層疊樣式表&#xff0c;Cascading Style Sheets&#xff09;是用于描述HTML文檔外觀和格式的樣式語言。CSS將內容與表現分離&#xff0c;讓HTML專注于內容結構&#xff0c;CSS專注于視覺效果。1.2 CSS語法結構選擇…

重溫k8s基礎概念知識系列二(Pod)

文章目錄1、Pod概念2、K8s 中的 Pod 的兩種用法3、定義Pod4、Pod的創建資源5、Pod 模板6、容器探針7、總結干貨8、 K8s Pod 經典面試題速查表Pod是Kubernetes中最小的單元&#xff1a; 1、Pod概念 Pod 是可以在 Kubernetes中創建和管理的、最小的可部署的計算單元。它由一組、一…

設計模式之靜態代理

一些個人理解 顧名思義&#xff0c;就是代理一個對象。 那么&#xff0c;既然要代理一個東西&#xff0c;就要傳入它吧? 【1】所以將代理對象當作屬性【【2】往往通過構造方法傳入被代理的目標對象】。 既然要代理&#xff0c;那必然要和代理對象擁有相同的功能吧? 所以實現了…

牛津大學xDeepMind 自然語言處理(1)

牛津大學xDeepMind 自然語言處理 Natural Language Processing 詞向量與詞匯語義學 Word Vectors and Lexical Semantics 詞語表示的基本問題與分布語義思想 傳統詞語表示&#xff08;如獨熱向量&#xff09;存在稀疏、正交、語義弱的問題&#xff0c;無法表達語義相似性。分布…

StarRocks數據庫集群的完整部署流程

目錄 依賴環境 下載安裝包 部署FE 部署BE 搭建集群 停止集群 依賴環境 詳見&#xff1a;StarRocks 部署&#xff1a;依賴環境-CSDN博客 下載安裝包 在官方網站下載安裝包&#xff1a;StarRocks 部署FE 創建元數據目錄。 mkdir -p <meta_dir> 修改 FE 配置文件 f…

簡單的 VSCode 設置

以下是我使用的vscode設置。雖然有些主觀&#xff0c;但很實用。1 主題。我放棄了那些炫酷的主題。我選擇了Tokyo Night (Storm)。理由是&#xff1a;它平靜、賞心悅目&#xff0c;并且與代碼形成了美麗的對比&#xff0c;卻又不顯得刺眼。2. 字體。我切換到了 JetBrains Mono …

Rust 條件語句

Rust 條件語句 在編程語言中&#xff0c;條件語句是程序流程控制的重要組成部分。Rust 作為一種系統編程語言&#xff0c;其條件語句的設計簡潔而強大。本文將詳細介紹 Rust 中的條件語句&#xff0c;包括其語法、用法以及一些高級特性。 1. 基本條件語句 Rust 中的基本條件語句…

【Java EE進階 --- SpringBoot】初識Spring(創建SpringBoot項目)

樂觀學習&#xff0c;樂觀生活&#xff0c;才能不斷前進啊&#xff01;&#xff01;&#xff01; 我的主頁&#xff1a;optimistic_chen 我的專欄&#xff1a;c語言 &#xff0c;Java, Java EE初階&#xff0c; Java數據結構 歡迎大家訪問~ 創作不易&#xff0c;大佬們點贊鼓勵…