概述
本文深入分析了 Flink 中 socketTextStream()
方法的源碼實現,從用戶API調用到最終返回 DataStream
的完整流程。
核心知識點
1. socketTextStream 方法重載鏈
// 用戶調用入口
env.socketTextStream("hostname", 9999)↓ 補充分隔符參數
env.socketTextStream("hostname", 9999, "\n") ↓ 補充重試次數參數
env.socketTextStream("hostname", 9999, "\n", 0)↓ 創建 SocketTextStreamFunction
addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream")
重載過程分析:
- 第一層:補充分隔符參數(默認 “\n”)
- 第二層:補充重試次數參數(默認 0)
- 最終:創建
SocketTextStreamFunction
并調用addSource
2. SourceFunction 的重要說明
@Deprecated
public class SocketTextStreamFunction implements SourceFunction<String>
?? 重要提醒:
SourceFunction
已被標記為@Deprecated
(過時)- 官方建議使用新的
Source
API - 基于
SourceFunction
的架構是老架構 - 新架構基于
org.apache.flink.api.connector.source.Source
3. addSource 方法的重載鏈
參數補充過程:
addSource(function, "Socket Stream")
addSource(function, "Socket Stream", null)
- 補充 TypeInformation 為 nulladdSource(function, "Socket Stream", null, CONTINUOUS_UNBOUNDED)
- 補充有界性
4. 核心處理邏輯分析
private <OUT> DataStreamSource<OUT> addSource(final SourceFunction<OUT> function,final String sourceName,@Nullable final TypeInformation<OUT> typeInfo,final Boundedness boundedness) {// 1. 非空檢查checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);// 2. 抽取類型信息TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);// 3. 判斷是否并行boolean isParallel = function instanceof ParallelSourceFunction;// 4. 序列化檢查clean(function);// 5. Function → Operatorfinal StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);// 6. 返回 DataStreamSourcereturn new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}
5. 四個核心概念的轉換
概念解釋:
-
Function: 用戶的業務邏輯封裝
SocketTextStreamFunction
- Socket連接和數據讀取邏輯- 繼承自
SourceFunction<String>
-
Operator: 算子的抽象
StreamSource<OUT, ?>
- 將Function包裝成算子- 繼承自
AbstractUdfStreamOperator
-
Transformation: 轉換操作的封裝
LegacySourceTransformation
- 包裝Operator和相關元信息- 包含類型信息、并行度、有界性等
-
DataStream: 面向用戶的流式API
DataStreamSource
- 繼承自DataStream
- 支持鏈式調用(map、filter、keyBy等)
6. 重要參數說明
TypeInformation(類型信息)
// 為什么需要 TypeInformation?
// Java 泛型在編譯后會被類型擦除,Flink需要顯式的類型信息來:
// 1. 創建序列化器/反序列化器
// 2. 根據不同類型產生不同的序列化機制
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
Boundedness(有界性)
// CONTINUOUS_UNBOUNDED 表示無界流
// 在翻譯成物理執行計劃時會用到這個信息
// 有界流和無界流會生成不同的執行計劃
Boundedness.CONTINUOUS_UNBOUNDED
并行性檢查
// 檢查是否為并行源函數
boolean isParallel = function instanceof ParallelSourceFunction;
// SocketTextStreamFunction 不是 ParallelSourceFunction,所以 isParallel = false
7. DataStreamSource 的構造
public DataStreamSource(StreamExecutionEnvironment environment,TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,boolean isParallel,String sourceName,Boundedness boundedness) {// 調用父類構造,創建 LegacySourceTransformationsuper(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism(), boundedness));// 如果不是并行的,設置并行度為1if (!isParallel) {setParallelism(1);}
}
8. 繼承關系分析
重要理解:
DataStreamSource
本質上就是一個DataStream
- 所有的鏈式調用API都定義在
DataStream
中 SingleOutputStreamOperator
這個命名容易誤導,它實際上是個DataStream
9. DataStream 的內部結構
public class DataStream<T> {// 兩個最重要的成員protected final StreamExecutionEnvironment environment; // 執行環境protected final Transformation<T> transformation; // 轉換操作
}
關系鏈:
DataStream
包含Transformation
Transformation
包含Operator
Operator
包含Function
10. 鏈式調用的實現
DataStream<String> stream = env.socketTextStream("localhost", 9999).map(...) // 返回 SingleOutputStreamOperator (實際是DataStream).filter(...) // 返回 SingleOutputStreamOperator .keyBy(...) // 返回 KeyedStream.window(...) // 返回 WindowedStream.sum(...) // 返回 SingleOutputStreamOperator.print(); // 返回 DataStreamSink
流程:
DataStreamSource
→ 各種變換 → DataStreamSink
總結
核心流程回顧
- 用戶調用
env.socketTextStream(hostname, port)
- 參數補全 通過重載方法逐步補充參數
- Function創建 創建
SocketTextStreamFunction
- addSource調用 進入核心處理邏輯
- 類型推斷 抽取輸出數據的類型信息
- 并行性檢查 判斷是否為并行源函數
- Function→Operator 封裝成
StreamSource
- Operator→Transformation 創建
LegacySourceTransformation
- 返回DataStream 創建
DataStreamSource
設計模式體現
- 裝飾器模式: Function → Operator → Transformation → DataStream
- 建造者模式: 通過重載方法逐步構建完整對象
- 模板方法模式: addSource的處理流程
關鍵技術點
- 類型擦除處理: 通過 TypeInformation 解決Java泛型擦除問題
- 序列化機制: 根據類型信息創建對應的序列化器
- 并行度控制: 非并行源強制設置并行度為1
- 有界性標識: 為后續執行計劃生成提供信息
下節預告
Flink Stream API 源碼走讀 map和 flatmap
注意: 基于 Flink 1.18 版本,SourceFunction
已被標記為過時,實際項目中建議使用新的 Source API。