Flink 并行度的設置

在 Apache Flink 中,并行度(Parallelism) 是控制任務并發執行的核心參數之一。Flink 提供了 多個層級設置并行度的方式,優先級從高到低如下:


🧩 一、Flink 并行度的四個設置層級

層級描述設置方式
Operator Level為某個具體的算子設置并行度operator.setParallelism(n)
Execution Environment Level為整個流處理環境設置默認并行度env.setParallelism(n)
Client Level(提交作業時)通過命令行指定全局并行度flink run -p n
System Level(系統配置)flink-conf.yaml 中定義全局默認值parallelism.default: n

? 二、各層級設置詳解與示例

1. Operator Level(算子級別)

  • 優先級最高
  • 可以為特定算子設置不同并行度,適用于數據傾斜或資源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");// 單獨為 map 算子設置并行度為4
stream.map(new MyMapFunction()).setParallelism(4).print();
? 適用場景:
  • 某個算子計算密集,需要更多資源
  • 數據源分區數較少,但后續算子可并行化處理

2. Execution Environment Level(執行環境級別)

  • 設置整個 Job 的默認并行度
  • 如果未對某些算子單獨設置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 所有算子默認并行度為4DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print(); // 默認并行度為4
? 適用場景:
  • 多數算子使用相同并行度
  • 統一配置便于管理和維護

3. Client Level(客戶端提交作業時)

  • 使用命令行參數動態設置并行度
  • 不修改代碼即可適配不同運行環境(如測試/生產)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
? 適用場景:
  • 快速調整不同集群資源配置
  • 測試階段快速驗證性能

4. System Level(系統級別)

  • flink-conf.yaml 中設置全局默認并行度
  • 對所有提交的作業生效(除非被更高級別覆蓋)
🔧 示例(flink-conf.yaml):
parallelism.default: 4
? 適用場景:
  • 所有作業共享相同的默認資源配置
  • 避免手動重復設置

📊 三、并行度優先級對比表

設置方式是否推薦場景覆蓋關系
Operator Level???特定算子優化最高優先級
Execution Environment Level??整體統一配置被 Operator 覆蓋
Client Level (-p)?動態部署被前兩者覆蓋
System Level (flink-conf.yaml)??兜底默認值最低優先級

💡 四、并行度設置建議

? 推薦做法:

  • 開發/測試環境:使用 .setParallelism()-p 命令行設置較小值(如1~4)
  • 生產環境
    • 使用 flink-conf.yaml 設置基礎并行度
    • 使用 env.setParallelism() 明確控制默認值
    • 為關鍵算子單獨設置更高并行度(如窗口聚合、復雜邏輯)

?? 示例組合:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 默認并行度env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(8) // Kafka Source 并行度設為8(等于topic分區數).map(new MyMapFunction()) // 使用默認并行度4.keyBy(keySelector).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowFunction()) // 可選 setParallelism().print();

🧠 五、并行度與資源的關系

并行度TaskManager 數量Slot 數量資源要求
≤ TM × slot? 正常運行? 正常運行資源充足
> TM × slot? 無法啟動? 無法啟動資源不足

? 建議:確保總并行度 ≤ 總 slot 數量


📈 六、實際調優建議

場景建議設置
Kafka Source并行度 = Kafka Topic 分區數
Map / FlatMap根據 CPU 利用率設置
Keyed Window Aggregation可適當提高并行度提升吞吐
Join / CoGroup視數據分布決定是否提高并行度
Sink若寫入慢可適當增加并行度

? 七、完整示例(Java + Shell)

Java 設置(Env + Operator):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);env.fromElements("a", "b", "c").map(x -> x).setParallelism(2) // 覆蓋默認值.print();env.execute("Parallelism Example");

Shell 設置(Client Level):

flink run -p 8 -c com.example.MyJob ./myjob.jar

? 八、總結

層級用途是否推薦使用
Operator Level控制單個算子并行度??? 強烈推薦用于關鍵路徑優化
Execution Environment Level設置默認并行度?? 推薦作為基礎配置
Client Level動態設置并行度? 適合多環境部署
System Level全局兜底配置?? 推薦配合其他方式使用

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81553.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81553.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81553.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OpenCV 筆記(39):頻域中的拉普拉斯算子

1. 拉普拉斯算子 在該系列的第八篇文章中&#xff0c;我們曾經介紹過在二維空間拉普拉斯算子的定義為&#xff1a; 這是對函數 的二階偏導數之和。 2. 拉普拉斯算子的傅里葉變換及其推導 在該系列的第三十二篇文章中&#xff0c;我們曾給介紹過下面的公式 二維連續傅里葉變換&…

入職軟件開發與實施工程師了后........

時隔幾個月沒有創作的我又回來了&#xff0c;這幾個月很忙&#xff0c;我一直在找工作&#xff0c;在自考&#xff08;順便還處理了一下分手的事&#xff09;&#xff0c;到處奔波&#xff0c;心力交瘁。可能我骨子里比較傲吧。我不愿意著急謀生&#xff0c;做我不愿意做的普通…

多卡跑ollama run deepseek-r1

# 設置環境變量并啟動模型 export CUDA_VISIBLE_DEVICES0,1,2,3 export OLLAMA_SCHED_SPREAD1 # 啟用多卡負載均衡 ollama run deepseek-r1:32b 若 deepseek-r1:32b 的顯存需求未超過單卡容量&#xff08;如單卡 24GB&#xff09;&#xff0c;Ollama 不會自動啟用多卡 在run…

09、底層注解-@Import導入組件

09、底層注解-Import導入組件 Import是Spring框架中的一個注解&#xff0c;用于將組件導入到Spring的應用上下文中。以下是Import注解的詳細介紹&#xff1a; #### 基本用法 - **導入配置類** java Configuration public class MainConfig { // 配置內容 } Configuration Impo…

題解:P12207 [藍橋杯 2023 國 Python B] 劃分

鏈接 題目描述 給定 40 個數&#xff0c;請將其任意劃分成兩組&#xff0c;每組至少一個元素。每組的權值為組內所有元素的和。劃分的權值為兩組權值的乘積。請問對于以下 40 個數&#xff0c;劃分的權值最大為多少。 5160 9191 6410 4657 7492 1531 8854 1253 4520 9231126…

配置ssh服務-ubuntu到Windows拷貝文件方法

背景&#xff1a; 在工作中&#xff0c;需要頻繁從ubuntu到Windows拷貝文件&#xff0c;但有時間總是無法拷出&#xff0c;每次重啟虛擬機又比較麻煩并且效率較低。可以使用scp服務進行拷貝&#xff0c;不僅穩定而且高效&#xff0c;現將配置過程進行梳理&#xff0c;以供大家參…

線程池模式與C#中用法

一、線程池模式解析 1. 核心概念 線程池是一種 管理線程生命周期的技術&#xff0c;主要解決以下問題&#xff1a; 減少線程創建/銷毀開銷&#xff1a;復用已存在的線程 控制并發度&#xff1a;避免無限制創建線程導致資源耗盡 任務隊列&#xff1a;有序處理異步請求 2. …

設置IDEA打開新項目使用JDK17

由于最近在學習Spring-AI&#xff0c;所以JDK8已經不適用了&#xff0c;但是每次創建新項目都還是JDK8&#xff0c;每次調來調去很麻煩 把Projects和SDKs都調整為JDK17即可 同時&#xff0c;Maven也要做些更改&#xff0c;主要是添加build標簽 <build><plugins>&…

初識MySQL · 索引

目錄 前言&#xff1a; 重溫磁盤 認識索引 為什么這么做&#xff0c;怎么做 重談page 聚簇索引VS非聚簇索引 回表查詢 索引分類 前言&#xff1a; 前文我們主要是介紹了MySQL的一些基本操作&#xff0c;增刪查改一類的操作都介紹了&#xff0c;并且因為大多數情況下&am…

MySQL——7、復合查詢和表的內外連接

復合查詢和表的內外連接 1、基本查詢回顧2、多表查詢3、自連接4、子查詢4.1、單行子查詢4.2、多行子查詢4.3、多列子查詢4.4、在from子句中使用子查詢4.5、合并查詢 5、表的內連和外連5.1、內連接5.2、外連接5.2.1、左外連接5.2.2、右外連接 1、基本查詢回顧 1.1、查詢工資高于…

MYSQL故障排查和環境優化

一、MySQL故障排查 1. 單實例常見故障 &#xff08;1&#xff09;連接失敗類問題 ERROR 2002 (HY000): Cant connect to MySQL server 原因&#xff1a;MySQL未啟動或端口被防火墻攔截。 解決&#xff1a;啟動MySQL服務&#xff08;systemctl start mysqld&#xff09;或開放…

7GB顯存如何部署bf16精度的DeepSeek-R1 70B大模型?

構建RAG混合開發---PythonAIJavaEEVue.js前端的實踐-CSDN博客 服務容錯治理框架resilience4j&sentinel基礎應用---微服務的限流/熔斷/降級解決方案-CSDN博客 conda管理python環境-CSDN博客 快速搭建對象存儲服務 - Minio&#xff0c;并解決臨時地址暴露ip、短鏈接請求改…

數字圖像處理——圖像壓縮

背景 圖像壓縮是一種減少圖像文件大小的技術&#xff0c;旨在在保持視覺質量的同時降低存儲和傳輸成本。隨著數字圖像的廣泛應用&#xff0c;圖像壓縮在多個領域如互聯網、移動通信、醫學影像和衛星圖像處理中變得至關重要。 技術總覽 當下圖像壓縮JPEG幾乎一統天下&#xff…

抖音視頻怎么去掉抖音號水印

你是不是經常遇到這樣的煩惱&#xff1f;看到喜歡的抖音視頻&#xff0c;想保存下來分享給朋友或二次創作&#xff0c;卻被抖音號水印擋住了畫面&#xff1f;別著急&#xff0c;今天教你幾種超簡單的方法&#xff0c;輕松去除水印&#xff0c;高清無水印視頻一鍵保存&#xff0…

RISC-V 開發板 MUSE Pi Pro PCIE 測試以及 fio 崩潰問題解決

視頻講解&#xff1a; RISC-V 開發板 MUSE Pi Pro PCIE 測試以及 fio 崩潰問題解決 板子上有一個m.2的pcie插槽&#xff0c;k1有三個pcie控制器&#xff0c;pcie0和usb3復用一個phy&#xff0c;所以實際開發板就兩個&#xff0c;測試的話&#xff0c;上一個nvme硬盤&#xff0c…

超級管理員租戶資源初始化與授權管理設計方案

背景說明 在多租戶系統中&#xff0c;資源&#xff08;如功能模塊、系統菜單、服務能力等&#xff09;需按租戶維度進行授權管理。超級管理員在創建新租戶時&#xff0c;需要初始化該租戶的資源授權信息。 兩種可選方案 方案描述方案 A&#xff1a;前端傳入選中的資源列表創…

stm32week16

stm32學習 十一.中斷 4.使用中斷 EXTI的配置步驟&#xff1a; 使能GPIO時鐘設置GPIO輸入模式使能AFIO/SYSCFG時鐘設置EXTI和IO對應關系設置EXTI屏蔽&#xff0c;上/下沿設置NVIC設計中斷服務函數 HAL庫的使用&#xff1a; 使能GPIO時鐘&#xff1a;__HAL_RCC_GPIOx_CLK_EN…

什么是RDMA?

什么是RDMA&#xff1f; RDMA(RemoteDirect Memory Access)技術全稱遠程直接內存訪問&#xff0c;就是為了解決網絡傳輸中服務器端數據處理的延遲而產生的。它將數據直接從一臺計算機的內存傳輸到另一臺計算機&#xff0c;無需雙方操作系統的介入。這允許高吞吐、低延遲的網絡…

golang 安裝gin包、創建路由基本總結

文章目錄 一、安裝gin包和熱加載包二、路由簡單場景總結 一、安裝gin包和熱加載包 首先終端新建一個main.go然后go mod init ‘項目名稱’執行以下命令 安裝gin包 go get -u github.com/gin-gonic/gin終端安裝熱加載包 go get github.com/pilu/fresh終端輸入fresh 運行 &…

【數據結構篇】鏈式結構二叉樹

目錄&#xff1a; 一 二叉鏈的概念與結構&#xff1a; 1.1 概念&#xff1a; 1.2 結構&#xff1a; 二 二叉鏈的實現&#xff1a; 2.1 二叉樹的構建&#xff1a; 2.2 二叉樹的遍歷&#xff1a; 2.2.1 前序遍歷&#xff1a; 2.2.2 中序遍歷&#xff1a; 2.2.3 后序遍歷…