Flink Stream API 源碼走讀 - socketTextStream

概述

本文深入分析了 Flink 中 socketTextStream() 方法的源碼實現,從用戶API調用到最終返回 DataStream 的完整流程。

核心知識點

1. socketTextStream 方法重載鏈

// 用戶調用入口
env.socketTextStream("hostname", 9999)↓ 補充分隔符參數
env.socketTextStream("hostname", 9999, "\n") ↓ 補充重試次數參數
env.socketTextStream("hostname", 9999, "\n", 0)↓ 創建 SocketTextStreamFunction
addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream")

重載過程分析:

  • 第一層:補充分隔符參數(默認 “\n”)
  • 第二層:補充重試次數參數(默認 0)
  • 最終:創建 SocketTextStreamFunction 并調用 addSource

2. SourceFunction 的重要說明

@Deprecated
public class SocketTextStreamFunction implements SourceFunction<String>

?? 重要提醒:

  • SourceFunction 已被標記為 @Deprecated(過時)
  • 官方建議使用新的 Source API
  • 基于 SourceFunction 的架構是老架構
  • 新架構基于 org.apache.flink.api.connector.source.Source

3. addSource 方法的重載鏈

addSource(function, sourceName)
addSource(function, sourceName, null)
addSource(function, sourceName, typeInfo, CONTINUOUS_UNBOUNDED)
核心處理邏輯

參數補充過程:

  1. addSource(function, "Socket Stream")
  2. addSource(function, "Socket Stream", null) - 補充 TypeInformation 為 null
  3. addSource(function, "Socket Stream", null, CONTINUOUS_UNBOUNDED) - 補充有界性

4. 核心處理邏輯分析

private <OUT> DataStreamSource<OUT> addSource(final SourceFunction<OUT> function,final String sourceName,@Nullable final TypeInformation<OUT> typeInfo,final Boundedness boundedness) {// 1. 非空檢查checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);// 2. 抽取類型信息TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);// 3. 判斷是否并行boolean isParallel = function instanceof ParallelSourceFunction;// 4. 序列化檢查clean(function);// 5. Function → Operatorfinal StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);// 6. 返回 DataStreamSourcereturn new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}

5. 四個核心概念的轉換

Function
用戶邏輯
Operator
算子封裝
Transformation
轉換操作
DataStream
用戶API

概念解釋:

  1. Function: 用戶的業務邏輯封裝

    • SocketTextStreamFunction - Socket連接和數據讀取邏輯
    • 繼承自 SourceFunction<String>
  2. Operator: 算子的抽象

    • StreamSource<OUT, ?> - 將Function包裝成算子
    • 繼承自 AbstractUdfStreamOperator
  3. Transformation: 轉換操作的封裝

    • LegacySourceTransformation - 包裝Operator和相關元信息
    • 包含類型信息、并行度、有界性等
  4. DataStream: 面向用戶的流式API

    • DataStreamSource - 繼承自 DataStream
    • 支持鏈式調用(map、filter、keyBy等)

6. 重要參數說明

TypeInformation(類型信息)
// 為什么需要 TypeInformation?
// Java 泛型在編譯后會被類型擦除,Flink需要顯式的類型信息來:
// 1. 創建序列化器/反序列化器
// 2. 根據不同類型產生不同的序列化機制
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
Boundedness(有界性)
// CONTINUOUS_UNBOUNDED 表示無界流
// 在翻譯成物理執行計劃時會用到這個信息
// 有界流和無界流會生成不同的執行計劃
Boundedness.CONTINUOUS_UNBOUNDED
并行性檢查
// 檢查是否為并行源函數
boolean isParallel = function instanceof ParallelSourceFunction;
// SocketTextStreamFunction 不是 ParallelSourceFunction,所以 isParallel = false

7. DataStreamSource 的構造

public DataStreamSource(StreamExecutionEnvironment environment,TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,boolean isParallel,String sourceName,Boundedness boundedness) {// 調用父類構造,創建 LegacySourceTransformationsuper(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism(), boundedness));// 如果不是并行的,設置并行度為1if (!isParallel) {setParallelism(1);}
}

8. 繼承關系分析

DataStream<T>
SingleOutputStreamOperator<T>
DataStreamSource<T>
包含所有流式API
map, filter, keyBy, window等

重要理解:

  • DataStreamSource 本質上就是一個 DataStream
  • 所有的鏈式調用API都定義在 DataStream
  • SingleOutputStreamOperator 這個命名容易誤導,它實際上是個 DataStream

9. DataStream 的內部結構

public class DataStream<T> {// 兩個最重要的成員protected final StreamExecutionEnvironment environment;  // 執行環境protected final Transformation<T> transformation;        // 轉換操作
}

關系鏈:

  • DataStream 包含 Transformation
  • Transformation 包含 Operator
  • Operator 包含 Function

10. 鏈式調用的實現

DataStream<String> stream = env.socketTextStream("localhost", 9999).map(...)           // 返回 SingleOutputStreamOperator (實際是DataStream).filter(...)        // 返回 SingleOutputStreamOperator  .keyBy(...)         // 返回 KeyedStream.window(...)        // 返回 WindowedStream.sum(...)           // 返回 SingleOutputStreamOperator.print();          // 返回 DataStreamSink

流程:
DataStreamSource → 各種變換 → DataStreamSink

總結

核心流程回顧

  1. 用戶調用 env.socketTextStream(hostname, port)
  2. 參數補全 通過重載方法逐步補充參數
  3. Function創建 創建 SocketTextStreamFunction
  4. addSource調用 進入核心處理邏輯
  5. 類型推斷 抽取輸出數據的類型信息
  6. 并行性檢查 判斷是否為并行源函數
  7. Function→Operator 封裝成 StreamSource
  8. Operator→Transformation 創建 LegacySourceTransformation
  9. 返回DataStream 創建 DataStreamSource

設計模式體現

  • 裝飾器模式: Function → Operator → Transformation → DataStream
  • 建造者模式: 通過重載方法逐步構建完整對象
  • 模板方法模式: addSource的處理流程

關鍵技術點

  • 類型擦除處理: 通過 TypeInformation 解決Java泛型擦除問題
  • 序列化機制: 根據類型信息創建對應的序列化器
  • 并行度控制: 非并行源強制設置并行度為1
  • 有界性標識: 為后續執行計劃生成提供信息

下節預告

Flink Stream API 源碼走讀 map和 flatmap


注意: 基于 Flink 1.18 版本,SourceFunction 已被標記為過時,實際項目中建議使用新的 Source API。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93240.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93240.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93240.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

待辦事項小程序開發

1. 項目規劃功能需求&#xff1a;添加待辦事項標記完成/未完成刪除待辦事項分類或標簽管理&#xff08;可選&#xff09;數據持久化&#xff08;本地存儲&#xff09;2. 實現功能添加待辦事項&#xff1a;監聽輸入框和按鈕事件&#xff0c;將輸入內容添加到列表。 標記完成/未完…

【C#】Region、Exclude的用法

在 C# 中&#xff0c;Region 和 Exclude 是與圖形編程相關的概念&#xff0c;通常在使用 System.Drawing 命名空間進行 GDI 繪圖時出現。它們主要用于定義和操作二維空間中的區域&#xff08;幾何區域&#xff09;&#xff0c;常用于窗體裁剪、控件重繪、圖形繪制優化等場景。 …

機器學習 - Kaggle項目實踐(3)Digit Recognizer 手寫數字識別

Digit Recognizer | Kaggle 題面 Digit Recognizer-CNN | Kaggle 下面代碼的kaggle版本 使用CNN進行手寫數字識別 學習到了網絡搭建手法學習率退火數據增廣 提高訓練效果。 使用混淆矩陣 以及對分類出錯概率最大的例子單獨拎出來分析。 最終以99.546%正確率 排在 86/1035 …

新手如何高效運營亞馬遜跨境電商:從傳統SP廣告到DeepBI智能策略

"為什么我的廣告點擊量很高但訂單轉化率卻很低&#xff1f;""如何避免新品期廣告預算被大詞消耗殆盡&#xff1f;""為什么手動調整關鍵詞和出價總是慢市場半拍&#xff1f;""競品ASIN投放到底該怎么做才有效&#xff1f;""有沒有…

【論文閱讀 | CVPR 2024 | UniRGB-IR:通過適配器調優實現可見光-紅外語義任務的統一框架】

論文閱讀 | CVPR 2024 | UniRGB-IR&#xff1a;通過適配器調優實現可見光-紅外語義任務的統一框架?1&&2. 摘要&&引言3.方法3.1 整體架構3.2 多模態特征池3.3 補充特征注入器3.4 適配器調優范式4 實驗4.1 RGB-IR 目標檢測4.2 RGB-IR 語義分割4.3 RGB-IR 顯著目…

Hyperf 百度翻譯接口實現方案

保留 HTML/XML 標簽結構&#xff0c;僅翻譯文本內容&#xff0c;避免破壞富文本格式。采用「HTML 解析 → 文本提取 → 批量翻譯 → 回填」的流程。百度翻譯集成方案&#xff1a;富文本內容翻譯系統 HTML 解析 百度翻譯 API 集成 文件結構 app/ ├── Controller/ │ └──…

字節跳動 VeOmni 框架開源:統一多模態訓練效率飛躍!

資料來源&#xff1a;火山引擎-開發者社區 多模態時代的訓練痛點&#xff0c;終于有了“特效藥” 當大模型從單一語言向文本 圖像 視頻的多模態進化時&#xff0c;算法工程師們的訓練流程卻陷入了 “碎片化困境”&#xff1a; 當業務要同時迭代 DiT、LLM 與 VLM時&#xff0…

配置docker pull走http代理

之前寫了一篇自建Docker鏡像加速器服務的博客&#xff0c;需要用到境外服務器作為代理&#xff0c;但是一般可能沒有境外服務器&#xff0c;只有http代理&#xff0c;所以如果本地使用想走代理可以用以下方式 臨時生效&#xff08;只對當前終端有效&#xff09; 設置環境變量…

OpenAI 開源模型 gpt-oss 本地部署詳細教程

OpenAI 最近發布了其首個開源的開放權重模型gpt-oss&#xff0c;這在AI圈引起了巨大的轟動。對于廣大開發者和AI愛好者來說&#xff0c;這意味著我們終于可以在自己的機器上&#xff0c;完全本地化地運行和探索這款強大的模型了。 本教程將一步一步指導你如何在Windows和Linux…

力扣-5.最長回文子串

題目鏈接 5.最長回文子串 class Solution {public String longestPalindrome(String s) {boolean[][] dp new boolean[s.length()][s.length()];int maxLen 0;String str s.substring(0, 1);for (int i 0; i < s.length(); i) {dp[i][i] true;}for (int len 2; len …

Apache Ignite超時管理核心組件解析

這是一個非常關鍵且設計精巧的 定時任務與超時管理組件 —— GridTimeoutProcessor&#xff0c;它是 Apache Ignite 內核中負責 統一調度和處理所有異步超時事件的核心模塊。&#x1f3af; 一、核心職責統一管理所有需要“在某個時間點觸發”的任務或超時邏輯。它相當于 Ignite…

DAY 42 Grad-CAM與Hook函數

知識點回顧回調函數lambda函數hook函數的模塊鉤子和張量鉤子Grad-CAM的示例# 定義一個存儲梯度的列表 conv_gradients []# 定義反向鉤子函數 def backward_hook(module, grad_input, grad_output):# 模塊&#xff1a;當前應用鉤子的模塊# grad_input&#xff1a;模塊輸入的梯度…

基于 NVIDIA 生態的 Dynamo 風格分布式 LLM 推理架構

網羅開發&#xff08;小紅書、快手、視頻號同名&#xff09;大家好&#xff0c;我是 展菲&#xff0c;目前在上市企業從事人工智能項目研發管理工作&#xff0c;平時熱衷于分享各種編程領域的軟硬技能知識以及前沿技術&#xff0c;包括iOS、前端、Harmony OS、Java、Python等方…

《吃透 C++ 類和對象(中):拷貝構造函數與賦值運算符重載深度解析》

&#x1f525;個人主頁&#xff1a;草莓熊Lotso &#x1f3ac;作者簡介&#xff1a;C研發方向學習者 &#x1f4d6;個人專欄&#xff1a; 《C語言》 《數據結構與算法》《C語言刷題集》《Leetcode刷題指南》 ??人生格言&#xff1a;生活是默默的堅持&#xff0c;毅力是永久的…

Python 環境隔離實戰:venv、virtualenv 與 conda 的差異與最佳實踐

那天把項目部署到測試環境&#xff0c;結果依賴沖突把服務拉崩了——本地能跑&#xff0c;線上不能跑。折騰半天才發現&#xff1a;我和同事用的不是同一套 site-packages&#xff0c;版本差異導致運行時異常。那一刻我徹底明白&#xff1a;虛擬環境不是可選項&#xff0c;它是…

[ 數據結構 ] 時間和空間復雜度

1.算法效率算法效率分析分為兩種 : ①時間效率, ②空間效率 時間效率即為 時間復雜度 , 時間復雜度主要衡量一個算法的運行速度空間效率即為 空間復雜度 , 空間復雜度主要衡量一個算法所需要的額外空間2.時間復雜度2.1 時間復雜度的概念定義 : 再計算機科學中 , 算法的時間復雜…

一,設計模式-單例模式

目的設計單例模式的目的是為了解決兩個問題&#xff1a;保證一個類只有一個實例這種需求是需要控制某些資源的共享權限&#xff0c;比如文件資源、數據庫資源。為該實例提供一個全局訪問節點相較于通過全局變量保存重要的共享對象&#xff0c;通過一個封裝的類對象&#xff0c;…

AIStarter修復macOS 15兼容問題:跨平臺AI項目管理新體驗

AIStarter是全網唯一支持Windows、Mac和Linux的AI管理平臺&#xff0c;為開發者提供便捷的AI項目管理體驗。近期&#xff0c;熊哥在視頻中分享了針對macOS 15系統無法打開AIStarter的修復方案&#xff0c;最新版已完美兼容。本文基于視頻內容&#xff0c;詳解修復細節與使用技巧…

LabVIEW 紡織檢測數據傳遞

基于 LabVIEW 實現紡織檢測系統中上位機&#xff08;PC 機&#xff09;與下位機&#xff08;單片機&#xff09;的串口數據傳遞&#xff0c;成功應用于煮繭機溫度測量系統。通過采用特定硬件架構與軟件設計&#xff0c;實現了溫度數據的高效采集、傳輸與分析&#xff0c;操作簡…

ECCV-2018《Variational Wasserstein Clustering》

核心思想 該論文提出了一個基于最優傳輸(optimal transportation) 理論的新型聚類方法&#xff0c;稱為變分Wasserstein聚類(Variational Wasserstein Clustering, VWC)。其核心思想有三點&#xff1a;建立最優傳輸與k-means聚類的聯系&#xff1a;作者指出k-means聚類問題本質…