在 Apache Flink 中,并行度(Parallelism) 是控制任務并發執行的核心參數之一。Flink 提供了 多個層級設置并行度的方式 ,優先級從高到低如下:
🧩 一、Flink 并行度的四個設置層級
層級 描述 設置方式 Operator Level 為某個具體的算子設置并行度 operator.setParallelism(n)
Execution Environment Level 為整個流處理環境設置默認并行度 env.setParallelism(n)
Client Level(提交作業時) 通過命令行指定全局并行度 flink run -p n
System Level(系統配置) 在 flink-conf.yaml
中定義全局默認值 parallelism.default: n
? 二、各層級設置詳解與示例
1. Operator Level(算子級別)
優先級最高 可以為特定算子設置不同并行度,適用于數據傾斜或資源敏感操作
🔧 示例:
DataStream < String > stream = env. fromElements ( "a" , "b" , "c" ) ;
stream. map ( new MyMapFunction ( ) ) . setParallelism ( 4 ) . print ( ) ;
? 適用場景:
某個算子計算密集,需要更多資源 數據源分區數較少,但后續算子可并行化處理
2. Execution Environment Level(執行環境級別)
設置整個 Job 的默認并行度 如果未對某些算子單獨設置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
env. setParallelism ( 4 ) ; DataStream < String > stream = env. fromElements ( "a" , "b" , "c" ) ;
stream. map ( new MyMapFunction ( ) ) . print ( ) ;
? 適用場景:
3. Client Level(客戶端提交作業時)
使用命令行參數動態設置并行度 不修改代碼即可適配不同運行環境(如測試/生產)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
? 適用場景:
4. System Level(系統級別)
在 flink-conf.yaml
中設置全局默認并行度 對所有提交的作業生效(除非被更高級別覆蓋)
🔧 示例(flink-conf.yaml
):
parallelism.default : 4
? 適用場景:
📊 三、并行度優先級對比表
設置方式 是否推薦 場景 覆蓋關系 Operator Level ??? 特定算子優化 最高優先級 Execution Environment Level ?? 整體統一配置 被 Operator 覆蓋 Client Level (-p) ? 動態部署 被前兩者覆蓋 System Level (flink-conf.yaml) ?? 兜底默認值 最低優先級
💡 四、并行度設置建議
? 推薦做法:
開發/測試環境 :使用 .setParallelism()
或 -p
命令行設置較小值(如1~4)生產環境 : 使用 flink-conf.yaml
設置基礎并行度 使用 env.setParallelism()
明確控制默認值 為關鍵算子單獨設置更高并行度(如窗口聚合、復雜邏輯)
?? 示例組合:
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
env. setParallelism ( 4 ) ; env. fromSource ( kafkaSource, WatermarkStrategy . noWatermarks ( ) , "Kafka Source" ) . setParallelism ( 8 ) . map ( new MyMapFunction ( ) ) . keyBy ( keySelector) . window ( TumblingEventTimeWindows . of ( Time . seconds ( 5 ) ) ) . process ( new MyProcessWindowFunction ( ) ) . print ( ) ;
🧠 五、并行度與資源的關系
并行度 TaskManager 數量 Slot 數量 資源要求 ≤ TM × slot ? 正常運行 ? 正常運行 資源充足 > TM × slot ? 無法啟動 ? 無法啟動 資源不足
? 建議:確保總并行度 ≤ 總 slot 數量
📈 六、實際調優建議
場景 建議設置 Kafka Source 并行度 = Kafka Topic 分區數 Map / FlatMap 根據 CPU 利用率設置 Keyed Window Aggregation 可適當提高并行度提升吞吐 Join / CoGroup 視數據分布決定是否提高并行度 Sink 若寫入慢可適當增加并行度
? 七、完整示例(Java + Shell)
Java 設置(Env + Operator):
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
env. setParallelism ( 4 ) ; env. fromElements ( "a" , "b" , "c" ) . map ( x -> x) . setParallelism ( 2 ) . print ( ) ; env. execute ( "Parallelism Example" ) ;
Shell 設置(Client Level):
flink run -p 8 -c com.example.MyJob ./myjob.jar
? 八、總結
層級 用途 是否推薦使用 Operator Level 控制單個算子并行度 ??? 強烈推薦用于關鍵路徑優化 Execution Environment Level 設置默認并行度 ?? 推薦作為基礎配置 Client Level 動態設置并行度 ? 適合多環境部署 System Level 全局兜底配置 ?? 推薦配合其他方式使用