Flink Slot 不足導致任務Pending修復方案

當前有3個虛擬機節點,每個節點配置的slot節點數量是4,${FLINK_HOME}/conf/flink-conf.yaml 關于slot的配置如下:

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
# checkpoint的時間根據集群性能和數據量確定
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://cdh01:8020/chkpoints
state.backend.incremental: true

其中,${HADOOP_HOME}/etc/hadoop/yarn-site.xml中關于yarn單個容器允許分配的最大內存設置如下:

	<!-- yarn單個容器允許分配的最大最小內存 --><property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>4096</value></property><!-- yarn容器允許管理的物理內存大小 --><property><name>yarn.nodemanager.resource.memory-mb</name><value>4096</value></property>

啟動yarn-session以Flink session cluster供FLINK SQL以共享方式運行:
$FLINK_HOME/bin/yarn-session.sh -d

啟動FLINK SQL Client:
$FLINK_HOME/bin/sql-client.sh embedded -i $FLINK_HOME/conf/sql_client_init.sql -s yarn-session

如圖,提交23個 FlinkSQL任務(flink-cdc -> hudi-ods),由于slot數量不夠,將導致12個任務之外的剩余任務處于pending的狀態(黑紫任務)。
在這里插入圖片描述

查看某個pending任務的log,看到報錯如下:

2025-xx-xx 21:45:56
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)... 26 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms... 27 more

從報錯來看,核心問題是 Flink集群總Slot(12個)不足以支撐23個任務的并發需求,導致11個任務因無法分配Slot而處于Pending狀態,最終觸發超時(300秒后仍未獲取到資源)。解決思路圍繞“增加可用Slot”和“優化資源分配”展開,具體方案如下:

一、核心原因拆解

Flink的Slot是任務執行的最小資源單元(每個Slot對應一個TaskManager的一個CPU核心,默認1個Slot/核心),當前12個Slot最多同時運行12個任務。23個任務并發時,超出的11個任務會進入Pending隊列,若超過slot.request.timeout(默認300秒)仍未獲取到Slot,就會報超時錯誤。

二、解決方案(按優先級排序)

方案1:增加TaskManager節點或每個節點的Slot數量(最直接)

通過擴展Flink集群的Slot總數,滿足23個任務的并發需求(需至少23個Slot)。

1.1 增加每個TaskManager的Slot數量(無需新增虛擬機)

若現有3臺虛擬機的CPU資源充足(如每臺虛擬機有10核CPU,當前僅用4核),可通過修改配置增加單節點的Slot數:

  1. 停止Flink集群:
    ./bin/stop-cluster.sh
    
  2. 修改flink-conf.yaml配置文件:
    # 每個TaskManager的Slot數量(默認1,根據CPU核數調整,建議不超過物理核數)
    taskmanager.numberOfTaskSlots: 10  # 3臺虛擬機 × 10個Slot = 30個Slot,滿足23個任務
    

    注意:numberOfTaskSlots不能超過虛擬機的物理CPU核心數(如虛擬機是10核,最大設為10,留1核給系統)。

  3. 重啟Flink集群:
    ./bin/start-cluster.sh
    
  4. 驗證Slot總數:
    • 訪問Flink Web UI(如http://hadoop103:8081),在左側“Cluster Overview”中查看“Total Slots”是否為30(3×10)。
1.2 新增TaskManager節點(現有節點資源不足時)

若3臺虛擬機的CPU已達上限,需新增1臺虛擬機作為TaskManager節點:

  1. 在新虛擬機上安裝Flink(與現有集群版本一致),并配置flink-conf.yaml
    # 指向現有JobManager的地址(如hadoop103)
    jobmanager.rpc.address: hadoop103
    # 新節點的Slot數量(如8個,3臺舊節點×4 + 1臺新節點×8 = 20,仍不夠可設為11)
    taskmanager.numberOfTaskSlots: 11
    
  2. 在新節點上啟動TaskManager:
    ./bin/taskmanager.sh start
    
  3. 驗證:Web UI中“Total Slots”是否達到23+。
方案2:優化任務資源分配(避免資源浪費)

若無法增加Slot總數,可通過優化任務的資源配置,減少單個任務的Slot占用,提升Slot利用率。

2.1 降低單個任務的并行度(Parallelism)

Flink任務的并行度默認等于Slot數量,若部分任務無需高并行(如簡單查詢),可手動降低并行度,減少Slot占用:

-- 執行任務時指定并行度(如設為1,僅占用1個Slot)
Flink SQL> set execution.parallelism = 1;
-- 再執行任務(如查詢)
Flink SQL> select * from order_info_cdc;

說明:默認情況下,每個任務的并行度 = parallelism.defaultflink-conf.yaml配置,默認1),若之前設為更高值(如2),會導致單個任務占用多個Slot。

2.2 合并小任務(減少任務總數)

若20個任務中有多個相似的輕量任務(如多個簡單查詢),可合并為1個任務,減少Slot需求:

  • 例:將多個select * from table where ...合并為union all查詢:
    select * from table1 where type=1
    union all
    select * from table1 where type=2;
    
    合并后1個任務僅占用1個Slot,替代原來的2個任務。
方案3:配置Slot共享組(Share Group)

Flink支持“Slot共享組”,允許不同任務共享同一個Slot(僅適用于非密集型任務),提升Slot利用率。

  1. 在創建表或執行任務時,指定共享組名稱:
    -- 執行任務時指定共享組(如所有查詢共享“query_group”)
    Flink SQL> set execution.share-group = query_group;
    -- 執行多個任務,會共享同一個Slot(需確保任務非CPU密集型)
    Flink SQL> select * from table1;
    Flink SQL> select * from table2;
    
  2. 注意:共享組僅適合輕量任務(如簡單查詢),若任務是CPU/內存密集型(如大表聚合),共享會導致性能下降。
方案4:延長Slot請求超時時間(臨時規避)

若任務可接受等待(如非實時任務),可延長slot.request.timeout,給Pending任務更多時間等待釋放的Slot:

  1. 修改flink-conf.yaml
    # Slot請求超時時間(單位:毫秒,設為10分鐘=600000ms)
    slot.request.timeout: 600000
    
  2. 重啟Flink集群生效。

注意:這只是臨時方案,若Slot總數始終不足,任務仍會最終失敗,需配合方案1/2徹底解決。

三、驗證與監控

  1. 查看任務狀態:
    • 訪問Flink Web UI,在“Jobs”頁面查看Pending任務是否已分配Slot(狀態從“Pending”變為“Running”)。
  2. 監控Slot使用:
    • 在Web UI“Cluster Overview”中查看“Used Slots”和“Available Slots”,確保Available Slots ≥ 0(無任務Pending)。

四、生產環境建議

  • 優先方案1:生產環境中,核心任務需保證資源充足,建議Slot總數預留20%-30%冗余(如20個任務,配置28個Slot),避免突發任務導致Pending。
  • 規范并行度配置:根據任務復雜度設置并行度(如大表處理設為4-8,簡單查詢設為1-2),避免盲目使用高并行度浪費資源。
  • 使用YARN/K8s動態資源:若Flink部署在YARN/K8s上,可開啟動態資源分配(yarn.containers.dynamic/K8s的Pod動態擴縮容),自動根據任務需求增減Slot,無需手動配置。

通過以上方案,可徹底解決Slot不足導致的任務Pending問題,確保所有任務正常運行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97179.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97179.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97179.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

亞馬遜合規風控升級:詳情頁排查與多賬號運營安全構建

2025年亞馬遜掀起的大規模掃號行動&#xff0c;聚焦商品詳情頁合規性審查&#xff0c;標志著跨境電商合規監管進入嚴風控時代&#xff0c;此次行動以關鍵詞規范與定價誠信為核心&#xff0c;大量賣家因內容違規遭遇賬號停用&#xff0c;對于賣家而言&#xff0c;構建系統化的合…

FISCO-BCOS-Python 模板

基于Python-SDK的FISCO BCOS區塊鏈HelloWorld模板&#xff0c;提供了簡單的問候語設置和查詢功能。本項目采用現代Python開發實踐&#xff0c;包含完整的配置管理、測試框架和項目結構。 快速開始 倉庫地址&#xff1a;git clone https://gitee.com/atanycosts/python-fisco-te…

移動端(微信等)使用 vConsole調試console

本文介紹了一種在移動端真機上進行調試的方法——使用VConsole。通過簡單的安裝步驟和代碼配置&#xff0c;開發者可以在移動端直接查看console.log輸出&#xff0c;極大提升了調試效率。 摘要生成于 C知道 &#xff0c;由 DeepSeek-R1 滿血版支持&#xff0c; 前往體驗 >作…

云計算資源分配問題

這里寫目錄標題一、云計算資源的基本類型二、資源分配的目標三、資源分配的方式四、資源分配的技術與工具五、挑戰與優化方向六、實際應用場景舉例總結云計算資源分配是指在云計算環境中&#xff0c;根據用戶需求、應用程序性能要求以及系統整體效率&#xff0c;將計算、存儲、…

深度學習之第二課PyTorch與CUDA的安裝

目錄 簡介 一、PyTorch 與 CUDA 的核心作用 1.PyTorch 2.CUDA 二、CUDA的安裝 1.查看 2.下載安裝 3.檢查是否安裝成功 三、PyTorch的安裝 1.GPU版本安裝 2.CPU版本安裝 簡介 在深度學習的實踐旅程中&#xff0c;搭建穩定且高效的開發環境是一切實驗與項目的基礎&…

Ubuntu22.04 安裝和使用標注工具labelImg

文章目錄一、LabelImg 的安裝及配置1. 安裝2. 配置二、使用1. 基礎操作介紹2. 創建自定義標簽2.1 修改 predefined_classes.txt2.2 直接軟件界面新增3. 圖像標注3.1 重命名排序3.2 標注3.2 voc2yolo 格式轉換3.3 視頻轉圖片Yolo系列 —— Ubuntu 安裝和使用標注工具 labelImgYo…

Jenkins與Docker搭建CI/CD流水線實戰指南 (自動化測試與部署)

更多云服務器知識&#xff0c;盡在hostol.com你是否已經厭倦了那個“人肉”部署的重復循環&#xff1f;每一次 git push 之后&#xff0c;都像是一個莊嚴的儀式&#xff0c;你必須虔誠地打開SSH&#xff0c;小心翼翼地敲下一連串的 git pull, npm install, docker build, docke…

【數據可視化-100】使用 Pyecharts 繪制人口遷徙圖:步驟與數據組織形式

&#x1f9d1; 博主簡介&#xff1a;曾任某智慧城市類企業算法總監&#xff0c;目前在美國市場的物流公司從事高級算法工程師一職&#xff0c;深耕人工智能領域&#xff0c;精通python數據挖掘、可視化、機器學習等&#xff0c;發表過AI相關的專利并多次在AI類比賽中獲獎。CSDN…

5G相對于4G網絡的優化對比

5G網絡作為新一代移動通信技術&#xff0c;相比4G實現了全方位的性能提升和架構優化。5G通過高速率、低時延和大連接三大核心特性&#xff0c;有效解決了4G網絡面臨的數據流量爆炸式增長和物聯網應用瓶頸問題 &#xff0c;同時引入了動態頻譜共享、網絡切片等創新技術&#xff…

AR智能巡檢:智慧工地的高效安全新引擎

在建筑行業,工地安全管理與施工效率的提升一直是核心議題。隨著增強現實(AR)技術的快速發展,AR智能巡檢系統正逐步成為智慧工地的“標配”,通過虛實結合、實時交互和智能分析,推動建筑行業邁入數字化、智能化的新階段。本文將從技術原理、應用場景、核心優勢及未來趨勢等…

TypeScript:枚舉類型

1. 什么是枚舉類型&#xff1f;枚舉&#xff08;Enum&#xff09;是TypeScript中一種特殊的數據類型&#xff0c;用于定義一組命名的常量值。它允許開發者用一個友好的名稱來代表數值或字符串&#xff0c;避免使用“魔法數字”或硬編碼值。基本語法&#xff1a;enum Direction …

Maven 編譯打包一個比較有趣的問題

前言最近做項目&#xff0c;發現一個比較有意思的問題&#xff0c;其實發現了問題的根源還是很好理解&#xff0c;但是如果突然看到會非常的難以理解。在Java項目中&#xff0c;明明包名錯誤了&#xff0c;居然可以正常編譯打包&#xff0c;IDEA報錯了&#xff0c;但是mvn命令正…

Leetcode貪心算法

題目&#xff1a;劃分字母區間 題號&#xff1a;763class Solution {public List<Integer> partitionLabels(String s) {List<Integer> list new LinkedList();int[] edge new int[27];char[] chars s.toCharArray();for(int i 0; i <chars.length;i){edge…

【密碼學基礎】加密消息語法 CMS:給數字信息裝個 “安全保險箱”

如果說數字世界是一座繁忙的城市&#xff0c;那么我們每天發送的郵件、合同、軟件安裝包就是穿梭在城市里的 “包裹”。有些包裹里裝著隱私&#xff08;比如銀行賬單&#xff09;&#xff0c;有些裝著重要承諾&#xff08;比如電子合同&#xff09;&#xff0c;還有些關系到設備…

leetcode算法刷題的第二十天

1.leetcode 39.組合總和 題目鏈接 這道題里面的數組里面的數字是可以重復使用的&#xff0c;那可能就會有人想&#xff0c;出現了0怎么辦&#xff0c;有這個想法的很好&#xff0c;但是題目要求數組里面的數字最小值為1&#xff0c;這就可以讓人放心了。但是有總和的限制&…

使用Spoon報錯Driver class ‘com.microsoft.sqlserver.jdbc.SQLServerDriver‘ could not be found解決方法

使用Spoon報錯Driver class ‘com.microsoft.sqlserver.jdbc.SQLServerDriver’ could not be found 產生原因 出現這個錯誤是因為Spoon無法找到用于連接MS SQL Server的JDBC驅動程序。該驅動程序是一個jar文件,通常需要手動下載并配置。 解決方案 下載JDBC驅動程序: 訪問 M…

【實時Linux實戰系列】基于實時Linux的音頻實時監控系統

在當今數字化時代&#xff0c;音頻監控系統在許多領域都有著廣泛的應用&#xff0c;例如安全監控、工業環境監測、智能交通等。音頻實時監控系統能夠實時采集、分析音頻信號&#xff0c;并在檢測到異常時發出警報&#xff0c;這對于提高安全性、優化生產流程和提升用戶體驗都有…

改造thinkphp6的命令行工具和分批次導出大量數據

文章目錄基本用法傳入參數addArgumentaddOption參數提示導出數據示例準備工作執行導出基本用法 在thinkphp6框架中&#xff0c;自帶了命令行工具&#xff0c;通過配置 config/console.php &#xff0c;添加自定義的命令&#xff1a; return [commands > [//...//新增的自定…

外匯中高頻 CTA 風控策略回測案例

在匯率波動日益頻繁、企業與機構對風險管理要求不斷提高的背景下&#xff0c;外匯交易策略已成為資產配置與對沖操作的重要工具。其中&#xff0c;CTA 策略在外匯交易中具有非常重要的實際應用價值&#xff0c;在風險控制、趨勢捕捉、資金效率與交易實用性之間取得了良好平衡。…

【iOS】內存管理及部分Runtime復習

1.繼承鏈關于繼承鏈存在兩個指針 類的superclass指向父類 父類的sp指向根類 根類的sp指向空 元類的sp指向父類的元類 最終指向根元類 而根元類的sp指向根類 而關于isa指針 對象的isa指針指向它所屬的類 類的isa指針指向元類 元類的isa指針指向根元類 根元類的isa指針指向自己2.…