Flink核心功能與運行流程詳解

目錄

一、背景?

二、圖構建

三、任務執行流程(yarn per-job模式)

3.1 Flink組件

3.2 執行流程

四、分布式調度

4.1 TM的slot

4.2 TM的slot的CPU與內存

4.3 節點的部署

4.4 節點的狀態

4.5 節點部署流程

五、數據傳輸

5.1 內存分配

5.2 傳輸流程

六、高可用

6.1 節點高可用

6.2 作業高可用

6.2.1 作業異常

6.2.1 狀態后端

6.2.2 狀態存儲位置

6.2.3?分布式快照算法

6.2.4 exactly once

七、新特性

7.1 窗口

7.1.1 窗口存儲的位置

7.1.2 分配器

7.13 觸發器

7.14 整體流程

八、批流一體

1. 統一的編程模型:寫一次,跑在批或流上

2. 統一的執行引擎核心 (Runtime)

3. 統一的核心抽象:數據即流 (Data as Stream)

4. 執行策略的“自適應”而非“分裂”

一、背景?

很火的分布式計算框架,解析一下其核心功能與運行流程,只分析在流式計算下的情況,不分析批的情況,內容大量基于DeepSeek回復(已根據本人理解校驗過一遍正確性)

二、圖構建

我們的寫的計算邏輯會封裝在Operator相關類中

大概類似下圖所示:

Operator和Operator之間的關系即路由關系會封裝在TransForm中,如下圖所示,input是上一個轉換,OperatorFactory是當前計算邏輯

最后所有的路由關系會在Enviroment中transformation中保存

詳情可以看我的:短分享-Flink圖構建_路線圖的構建使用flink-CSDN博客

后面會根據路由構建StreamGraph->JobGraph->ExecutionGraph->物理執行圖

其中Operator的合并發生在StreamGraph->JobGraph中

ExecutionGraph算是拓撲圖

物理執行圖的對象中就包含具體的數據了

具體對象關系可以見下圖所示

三、任務執行流程(yarn per-job模式

下面是復制 DeepSeek的回答(個人感覺它的回答已經很全面了)

3.1 Flink組件

  1. Dispatcher?(調度器)

    • 職責:Flink 集群的“前臺”和“作業入口”。

      • 接收客戶端提交的 JobGraph(作業圖)。

      • 為每個提交的作業啟動一個新的?JobMaster

      • 提供 Web UI 和 REST API 的入口。

    • 高可用需求:如果 Dispatcher 掛了,新的作業提交、Web UI/REST 訪問都會中斷。需要選舉新的 Leader Dispatcher 來接管這些職責。

  2. ResourceManager?(資源管理器 - RM)

    • 職責:Flink 集群的“資源總監”。

      • 管理?TaskManager (TM)?資源(Slot)。

      • 與底層資源框架(如 YARN ResourceManager, Kubernetes API Server, Mesos Master)通信,申請/釋放資源。

      • 處理 TM 的注冊、心跳、Slot 報告。

      • 響應?JobMaster?的資源請求,為其分配 Slot。

    • 高可用需求:如果 RM 掛了,資源管理(申請、釋放、分配 Slot)癱瘓,作業無法啟動新任務或替換失敗任務。需要選舉新的 Leader RM 來恢復資源管理。

  3. JobMaster?(作業主管)

    • 職責每個 Flink 作業的“專屬管家”和“執行指揮官”。

      • 一個運行的作業對應一個?JobMaster?實例(由?Dispatcher?創建)。

      • 管理單個作業的執行生命周期(調度、部署、檢查點協調、故障恢復、保存點、作業取消/完成)。

      • 持有該作業的?ExecutionGraph(執行圖)。

      • 向?ResourceManager?申請 Slot 資源。

      • 管理分配給該作業的 TaskManager Task Slot。

      • 協調 Source、Sink 和算子的 Task。

    • 高可用需求:如果某個作業的?JobMaster?掛了,該作業的管理和執行就癱瘓了。需要為該作業選舉新的 Leader?JobMaster?來接管其恢復和執行(從 Checkpoint 恢復狀態,重新調度任務等)。

  4. BlobServer?/?BlobCache?(二進制大對象服務/緩存):

  • 職責:分布式文件緩存,用于存儲和分發用戶 JAR 包、配置文件等。

  • 高可用需求:通常 HA 對其影響較小,因為文件本身存儲在 HA 存儲(如 HDFS)中。

以上的幾個組件組成了JobManager

3.2 執行流程

  1. 作業提交 (flink run -m yarn-per-job ...):

    • 用戶在命令行執行?flink run -t yarn-per-job ...?提交 Flink 作業。

    • Flink Client (flink run) 開始工作:

      • 解析命令行參數和配置。

      • 生成作業的 JobGraph(優化后的執行計劃)。

      • 將作業的 JAR 文件、庫依賴、配置文件等上傳到分布式存儲(通常是 HDFS)的一個臨時位置。

      • 創建一個?YARN Client

      • YARN Client 向?YARN ResourceManager (RM)?提交一個新的?YARN Application。提交的信息包括:

        • 啟動 AM 所需的命令:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

        • AM 所需的資源(如 1 vcore, 1024 MB 內存)。

        • 作業的配置 (flink-conf.yaml)。

        • 包含作業 JAR、依賴和配置的分布式存儲路徑

        • YARN_APPLICATION_NAME?(通常基于作業名)。

      • 提交后,Client 可以退出或保持連接等待作業結果(-d?參數決定)。

  2. YARN 資源分配與 AM/JM 啟動:

    • YARN ResourceManager (RM)?收到新的 Application 提交請求。

    • YARN RM 找到一個合適的?YARN NodeManager (NM)?節點(基于調度策略和資源可用性)。

    • YARN RM 向選定的 NM 發送指令,要求它啟動一個?Container?來運行?ApplicationMaster (AM)

    • YARN NodeManager (NM)?收到指令:

      • 準備 Container 的運行環境(本地目錄、資源限制)。

      • 執行啟動 AM 的命令:YarnJobClusterEntrypoint。這標志著?Flink JobManager (JM) 進程的啟動

      • NM 監控 Container 的生命周期并向 RM 報告狀態。

  3. Flink JobManager (JM) / AM 初始化:

    • YarnJobClusterEntrypoint?進程啟動。

    • 初始化 Flink 運行時環境:

      • 加載配置 (flink-conf.yaml?和從 Client 上傳的配置)。

      • 啟動?RPC 服務端點?(Akka / Netty)。

      • 啟動?Blob Server?(Artifact Server),使其準備好提供文件下載服務。

      • 啟動?Dispatcher?組件。

      • 啟動?Flink ResourceManager (RM),其具體實現是?YarnResourceManager

      • 啟動?Metrics Reporter,?Web UI?等組件。

    • YarnResourceManager?向?YARN ResourceManager (RM)?注冊自己作為該 YARN Application 的 AM。至此,Flink JM 進程正式承擔起 AM 的角色。

    • Dispatcher?從分布式存儲加載 Client 上傳的 Job JAR、依賴和配置。

    • Dispatcher?根據加載的 Job JAR 和配置,創建 JobGraph?(如果 Client 沒生成,這里會生成)。

    • Dispatcher?啟動一個 JobMaster?實例來管理這個即將運行的作業。它將 JobGraph 傳遞給 JobMaster。

  4. TaskManager (TM) 資源申請與啟動:

    • JobMaster?接收到 JobGraph 后,開始進行調度準備工作:

      • 解析 JobGraph 成 ExecutionGraph (包含所有并行任務實例)。

      • 計算作業所需的?Slot 總數?(基于算子的并行度)。

    • JobMaster?向?Flink ResourceManager (RM) (YarnResourceManager)?發出?Slot 資源請求

    • YarnResourceManager?收到 Slot 請求:

      • 根據配置 (taskmanager.numberOfTaskSlots) 和所需 Slot 總數,計算出需要啟動的?TM 實例數量

      • 考慮資源規格 (taskmanager.memory.process.size,?taskmanager.cpu.cores)。

      • 將這些需求轉化為向?YARN ResourceManager (RM)?申請特定數量、特定資源規格的?Container?的請求。

      • YarnResourceManager?通過 YARN AM-RM 協議發送?Container 申請給 YARN RM。申請中指定了啟動 TM 所需的資源(CPU, 內存)和啟動命令 (org.apache.flink.yarn.entrypoint.YarnTaskExecutorRunner)。

    • YARN ResourceManager (RM)?收到 Container 申請:

      • 根據集群資源和調度策略(Capacity/Fair Scheduler),在滿足條件的?YARN NodeManager (NM)?節點上進行資源分配。

      • 將分配好的 Container 信息(在哪個 NM 上、資源詳情)通過?Container Launch Context?的形式“發放”給 AM (YarnResourceManager)。

    • YarnResourceManager?收到 YARN RM 分配的 Container 信息:

      • 它通知對應的?YARN NodeManager (NM)?啟動 Container。發送的信息包含啟動 TM 的確切命令 (YarnTaskExecutorRunner) 和必要的環境變量(特別是?JM 的 RPC 地址Blob Server 地址)。

    • YARN NodeManager (NM)?收到啟動 Container 的指令:

      • 準備 Container 的運行環境。

      • 執行啟動命令:YarnTaskExecutorRunner。這標志著?Flink TaskManager (TM) 進程的啟動

      • NM 監控該 Container 的生命周期并向 YARN RM 報告狀態。

  5. TaskManager 注冊與 Slot 提供:

    • Flink TaskManager (TM)?進程 (YarnTaskExecutorRunner) 啟動后:

      • 加載配置。

      • 初始化 RPC 服務、內存管理器、網絡棧、I/O 管理器等。

      • 根據配置創建指定數量的?Task Slot

      • 獲取到啟動時傳入的?JM 的 RPC 地址

      • 主動向 JobManager 的 RPC 端點發起注冊請求。注冊信息包含自己的資源 ID、主機名、可用 Slot 信息、TM 的 RPC 地址等。

    • Flink ResourceManager (RM) (YarnResourceManager)?收到 TM 的注冊請求:

      • 記錄這個 TM 實例及其 Slot 資源。

      • 將 TM 注冊成功的信息通知給?SlotManager?(Flink RM 內部組件)。

    • SlotManager?知道有新的 Slot 可用后,通知?JobMaster:之前申請的 Slot 資源現在可用了。

    • JobMaster?收到 Slot 可用通知:

      • 開始實際的任務調度

      • 將具體的?Task(序列化的執行代碼和數據)部署到注冊上來的 TM 的 Slot 中。

      • 部署是通過 RPC 調用將?TaskDeploymentDescriptor?(TDD) 發送給 TM 來完成的。TDD 包含了執行任務所需的所有信息,包括從?JM 的 Blob Server?下載用戶代碼 JAR 文件的地址。

  6. 作業執行:

    • TaskManager (TM)?收到 JobMaster 部署 Task 的請求:

      • 如果需要,從?JM 的 Blob Server?下載用戶代碼 JAR 和依賴。

      • 在指定的 Slot 中反序列化并啟動 Task 線程執行用戶代碼。

      • 任務開始執行計算邏輯,處理數據流。

      • TM 定期向 JM 發送心跳和狀態更新(如指標信息)。

    • JobMaster?持續監控所有 Task 的執行狀態:

      • 協調檢查點 (Checkpointing) 和保存點 (Savepointing)。

      • 處理 Task 失敗(重試、重啟策略)。

      • 管理作業的生命周期狀態(RUNNING, FINISHED, FAILED, CANCELED)。

  7. 作業完成與集群清理:

    • 當作業最終狀態變為?FINISHEDFAILED?或?CANCELED

      • JobMaster?通知?Flink ResourceManager (RM)?(YarnResourceManager) 作業結束。

      • YarnResourceManager?向?YARN ResourceManager (RM)?發送?Application 完成?的最終狀態報告。

      • YarnResourceManager?主動請求 YARN RM?釋放所有為該 Application 分配的 Container?(即所有的 TM Container)。

      • YARN RM 通知對應的 NM 停止 TM Container。

      • Flink JobManager (JM) / AM?進程 (YarnJobClusterEntrypoint) 正常退出。

      • YARN NM 監控到 AM Container 退出,向 YARN RM 報告 Application 完成。

      • YARN RM 清理 Application 相關的記錄。

      • Flink Client 如果還在等待,會收到作業最終狀態的報告。

      • 臨時上傳到分布式存儲(HDFS)的作業 JAR、依賴和配置文件通常會被自動清理

四、分布式調度

有dag圖之后,要把節點deploy到機器上

4.1 TM的slot

首先要了解下機器能承受多少個節點并發執行

TM的slot個數是TM能承受的并發Task個數,由用戶配置

  • 關鍵配置參數是?taskmanager.numberOfTaskSlots

  • 你可以在?flink-conf.yaml?文件中設置這個值,或者在啟動 TaskManager 時通過命令行參數 (-Dtaskmanager.numberOfTaskSlots=<N>) 指定。

  • 理論上,你可以設置成任意正整數。

  • 實踐中,這個值通常設置為 TaskManager 所在機器/容器可用的 CPU 核心數(或其整數倍)

4.2 TM的slot的CPU與內存

因為TM其實是一個JVM,所以一個TM上的所有slot共享所在機器上的所有CPU資源

內存的話:

JVM堆內存:

  • 框架堆內存 (Framework Heap):?Flink 框架本身運行所需的內存(如 RPC、協調等)。

  • 任務堆內存 (Task Heap):?分配給用戶代碼(如 UDF、窗口狀態中的對象)使用的堆內存。

  • 元空間

堆外內存:

  • 托管內存 (Managed Memory):?這是最關鍵的部分。Flink 自己管理的一塊內存區域(可以是堆內或堆外,默認是堆外內存),主要用于:

    • 排序 (Sorting)

    • 哈希表 (Hash Tables - Joins/Aggregations)

    • RocksDB 狀態后端時的緩存 (Caching)?(當使用 RocksDBStateBackend 時,RocksDB 本身使用 JNI 管理自己的堆外內存,但 Flink 會預留 Managed Memory 給它做緩存)

    • 批處理作業的中間結果 (Batch Intermediate Results)

  • 網絡內存 (Network Memory):?用于任務之間數據傳輸(Shuffle)的緩沖(Buffers)。這對流處理和高吞吐批處理至關重要。

  • ?它們在?flink-conf.yaml?中通過不同的參數配置大小:

  • taskmanager.memory.managed.size/fraction?控制?Managed Memory?總量。

  • taskmanager.memory.network.min/max/fraction?控制?Network Memory?總量。

每個slot平分的是堆外內存,而不是JVM內存

4.3 節點的部署

上面說到了Flink會把算子構造ExecutionGraph,要去做節點部署時,會根據ExecutionGraph構造一個拓撲圖DefaultExecutionTopology,調度的順序依賴拓撲圖決定,這個我們后面再看,先看當決定要部署一個節點時,做了哪些工作。

ExecutionVertex相關任務的內容會被打包成TDD,發送給TaskExecutor

TaskExecutor和TaskManager的區別:

TaskManager?是 JVM 進程,TaskExecutor?是運行在這個 JVM 進程內的核心服務組件。TaskManager?進程啟動時會啟動?TaskExecutor?服務。我們通常說“任務部署到 TaskManager 上”,更精確地說是部署到 TaskManager 內部的?TaskExecutor?服務上

然后:

  • 創建:當?TaskExecutor?接收到?JobManager?發來的?TaskDeploymentDescriptor?(TDD) 并決定部署任務時,它會:

    1. 實例化?Task?對象(解析 TDD,加載類,初始化內部組件如狀態后端、輸入/輸出網關)。

    2. 將實例化好的?Task?對象提交給?TaskExecutor?的線程池?(ExecutorService)。

  • 啟動:線程池中的一個空閑線程被分配來執行這個?Task?對象的?run()?方法。這時,Task?線程開始運行,用戶代碼(算子邏輯)開始執行。

  • 執行

    • 線程執行?Task.run()?方法。

    • 這通常涉及:

      • 初始化算子狀態(從狀態后端恢復或初始化空狀態)。

      • 打開算子(調用算子的?open()?方法)。

      • 進入主循環:從輸入通道(InputGate)讀取數據 -> 調用算子鏈中的?processElement/processWatermark/processLatencyMarker?等方法處理數據 -> 將結果寫入輸出通道(ResultPartitionWriter)。

      • 處理檢查點(Checkpoint)觸發、任務取消(Cancel)等事件。

  • 結束

    • 正常結束:當所有輸入數據處理完畢(達到數據結束標記)或收到?JobManager?的正常停止指令時,任務會優雅結束:調用算子的?close()?方法,清理資源,報告完成狀態。

    • 異常結束:如果任務執行中拋出未捕獲的異常、被外部取消(如 Failover)、或超時等,線程會異常終止。TaskExecutor?會捕獲異常,清理資源,并向?JobManager?報告任務失敗。JobManager?會觸發重試或整個作業的 Failover。

4.4 節點的狀態

節點狀態:

節點部署后,在JM中用Execution表示,在TM中用Task表示。

它們有共同的狀態,且Task?的狀態變化會通過 RPC?主動上報給 JobManager,JobManager 更新?ExecutionGraph?中對應?Execution?的狀態。狀態共有如下幾種:

  1. CREATED?(已創建):

    • 頂點剛被創建時的初始狀態。

    • 尚未請求任何資源,也未被調度。

  2. SCHEDULED?(已調度):

    • 核心狀態!?表示該頂點已被調度

    • 調度器已成功為其請求并分配到了所需的?Slot

    • 該頂點的?TaskDeploymentDescriptor (TDD)?已生成已發送給目標 TaskExecutor (通過 RPC)。

    • 此時,該頂點的輸出?IntermediateResultPartition?狀態會被設置為?SCHEDULED?這滿足了依賴它的下游頂點的調度條件(輸入分區可消費)。

    • 注意:?任務尚未在 TaskExecutor 上啟動。它可能在 TaskExecutor 的接收隊列中,或者 TaskExecutor 正在創建 Task 對象。

  3. DEPLOYING?(部署中):

    • 可選的中間狀態 (并非所有實現都顯式使用)。

    • 表示目標 TaskExecutor?已接收到?JobMaster 發來的 TDD,并開始創建對應的?Task?對象和配置執行環境。

    • 任務尚未開始執行用戶代碼 (SourceFunction.run()?或?processElement())。

  4. RUNNING?(運行中):

    • 核心狀態!?表示任務已在 TaskExecutor 上成功啟動正在執行

    • TaskExecutor 已將?Task?對象提交給線程池,Task.run()?方法正在執行。

    • 任務已完成初始化 (調用了所有算子的?open()?方法),并已通知下游其輸出分區可用。

    • 任務可能正在:

      • (Source) 調用?SourceFunction.run()?產生數據。

      • (非 Source) 從?InputGate?讀取數據并進行處理。

      • 處理檢查點屏障 (Checkpoint Barrier)。

      • 將結果寫入?ResultPartitionWriter

    • 任務會定期向 JobMaster 發送心跳,表明其存活。

  5. FINISHED?(已完成):

    • 表示任務已成功完成其所有工作

    • 對于流任務:通常意味著收到了來自上游的終止信號 (如?EndOfPartitionEvent) 并處理完了所有輸入數據。

    • 對于批任務:意味著處理完了分配給它處理的所有數據分片。

    • 任務已調用所有算子的?close()?方法進行清理。

    • 任務釋放了占用的 Slot 資源 (通知 JobMaster)。

    • 任務的輸出?IntermediateResultPartition?狀態變為?PRODUCED?(如果所有并行任務都完成) 或?FINISHED

  6. CANCELING?(取消中):

    • 表示任務收到了取消請求?(例如用戶手動取消作業、發生全局故障觸發取消),正在執行取消操作。

    • 任務會嘗試中斷執行線程 (如果可能且安全),并盡快調用算子的?close()?方法。

    • 這是一個短暫的中間狀態。

  7. CANCELED?(已取消):

    • 表示任務已被成功取消

    • 任務已停止執行,并完成了清理工作。

    • 任務釋放了占用的 Slot 資源。

    • 任務的輸出?IntermediateResultPartition?狀態變為?CANCELED

  8. FAILED?(已失敗):

    • 核心狀態!?表示任務在執行過程中遇到了未處理的異常而失敗。

    • 任務執行線程因異常退出。

    • 任務會向 JobMaster 報告失敗原因 (FailureEnricher?可能添加額外信息)。

    • JobMaster 根據配置的重啟策略 (Restart Strategy)?決定是否以及如何重新調度該任務 (可能導致整個 ExecutionGraph 重啟或僅重啟失敗任務鏈)。

    • 任務占用的 Slot 會被釋放。

    • 任務的輸出?IntermediateResultPartition?狀態變為?FAILED

4.5 節點部署流程

前面介紹了一些基本的狀態、TaskExecutor等等,下面基于前面的基礎知識,詳細說明節點部署的全流程:

?從 ExecutionGraph 到物理部署 (JobMaster 側):

  1. ExecutionGraph 就緒:

    • 客戶端提交 JobGraph。

    • JobMaster 將 JobGraph 轉換成并行化的、包含所有并行子任務(ExecutionVertex)和中間結果分區(IntermediateResultPartition)的?ExecutionGraph

    • ExecutionGraph?代表了邏輯執行計劃在物理層面的并行化視圖,包含任務、數據交換方式(點對點、廣播等)、容錯信息(檢查點/保存點)等。

  2. 資源請求 (Slot Allocation):

    • JobMaster 的?Scheduler(通常是?DefaultScheduler)遍歷?ExecutionGraph

    • 對于每個?ExecutionVertex,調度器計算其所需的資源規格(ResourceProfile,主要是托管內存大小)。

    • 調度器通過?SlotPoolService?向 ResourceManager 請求相應數量和規格的 Slot。Slot 是 TaskManager 上資源的抽象單位(CPU、內存)。

  3. Slot 分配:

    • ResourceManager 從注冊的 TaskManager 資源池中選擇滿足要求的 Slot(可能從現有 TaskManager 分配,也可能觸發啟動新的 TaskManager)。

    • 選中的 Slot 被分配給 JobMaster 的?SlotPoolService

  4. 部署描述符生成 (TaskDeploymentDescriptor - TDD):

    • 一旦一個?ExecutionVertex?所需的所有輸入數據分區(上游任務產生的)都處于可消費狀態(?SCHEDULED?以上都可以),并且其所需的 Slot 已分配,調度器就會觸發該頂點的部署。

    • JobMaster 為該?ExecutionVertex?創建一個?TaskDeploymentDescriptor?(TDD)。TDD 包含了該特定子任務執行所需的所有信息:

      • Job ID / Job Master Actor Path:?用于任務與 JobMaster 通信(心跳、狀態更新)。

      • Execution Attempt ID:?該任務執行嘗試的唯一標識(用于容錯)。

      • 頂點配置:?序列化的?JobInformation(作業主類、配置等)和?TaskInformation(該子任務的算子鏈、輸入/輸出格式、檢查點配置、用戶代碼類加載器等)。

      • 輸入信息:

        • 輸入通道數量。

        • 每個輸入通道對應的?ResultPartitionID(上游任務產生的特定數據分區)。

        • 數據消費方式(點對點、廣播等)。

        • 對應上游 TaskManager 的位置信息(用于建立網絡連接)。

      • 輸出信息:

        • 輸出?ResultPartitionDescriptor(描述該任務將產生的數據分區的類型、位置等)。

      • Slot 信息:?分配到的 Slot 的 ID 和位置(哪個 TaskManager)。

      • 檢查點配置:?序列化的檢查點配置對象。

      • TaskManager 上的文件緩存:?需要預先傳輸到 TaskManager 的 JAR 包和其他文件列表。

  5. 任務分發:

    • JobMaster 通過 RPC(通常是 Akka 或 Netty)將 TDD 發送到持有分配給該任務 Slot 的?TaskExecutor(TaskManager 的核心組件)。

    • 分發通常是異步批量進行的。調度器會考慮拓撲順序(例如優先調度 Source 任務)和資源可用性,但分發本身是多線程并行執行的。

注意,在以上過程中:

  • 遍歷頂點是順序的:?調度器需要按邏輯順序(如拓撲順序、調度策略決定的順序)逐個處理 ExecutionVertex 來決定是否需要為其請求 Slot。

  • Slot 請求的發出是并發/異步的:?調度器向?SlotPoolService?提交請求,以及?SlotPoolService?向?ResourceManager?發送請求,都是非阻塞和并發的。一個頂點的請求發出后,下一個頂點的請求可以立即發出,無需等待前一個請求的響應。

  • ResourceManager 處理是并發的:?RM 并行處理接收到的多個 Slot 請求,并行查找資源池,并行分配 Slot 并返回響應。

  • 分配結果的接收是并發的:?SlotPoolService?和?Scheduler?并發地接收和處理來自 RM 的 Slot 分配結果通知。

TaskManager 側的任務執行 (TaskExecutor 側):

  1. 接收 TDD:

    • TaskExecutor 的 RPC 端點接收到 JobMaster 發來的?submitTask(TaskDeploymentDescriptor tdd)?調用。

  2. 創建 Task 對象:

    • TaskExecutor 反序列化 TDD 中的?TaskInformation?和其他必要信息。

    • 使用?TaskInformation?和 TDD 中的其他元數據(輸入/輸出描述符、JobID、ExecutionID 等)實例化一個?Task?對象。Task?是 Flink 運行時執行的基本單位,它封裝了一個算子鏈(可能包含多個算子,如?Source -> Map -> Sink)。

  3. 配置執行環境:

    • 根據 TDD 配置 Task 的?Environment?對象。這包括:

      • 設置輸入?InputGate:連接到上游任務產生的?ResultPartitionInputGate?負責從網絡(或本地)讀取數據。

      • 設置輸出?ResultPartitionWriter:用于將任務產生的數據寫入?ResultPartition,供下游任務消費。

      • 設置狀態后端(如果啟用檢查點)。

      • 配置用戶代碼類加載器(加載用戶定義的算子類)。

      • 初始化算子鏈中的每個算子(調用?Operator#setup?方法)。

  4. 線程分配與執行:

    • 關鍵點:?每個 Task 由一個獨立的線程執行。

    • TaskExecutor 內部維護一個線程池(通常是?ForkJoinPool?或固定大小的線程池,可通過?taskmanager.numberOfTaskSlots?配置間接影響)。

    • TaskExecutor 將實例化好的?Task?對象?提交給這個線程池執行。具體執行調用的是?Task.run()?方法。

    • 線程模型:

      • Slot 不是線程!?Slot 是資源的容器(內存)。一個 TaskManager 有?N?個 Slot(taskmanager.numberOfTaskSlots?配置項)。

      • 每個 Slot 可以運行一個或多個 Task。?在 Flink 中,一個 Slot 內可以運行由算子鏈接形成的 Task(一個線程)。默認情況下,一個 Slot 運行一個 Task(一個線程)。開啟 Slot Sharing 后,同一個 Slot 內可以運行來自同一個 Job 的多個不同算子鏈(不同 Task),這些 Task 共享 Slot 的內存資源,但每個 Task 仍然由獨立的線程執行。例如,一個 Slot 里可能同時運行一個 Source Task 線程和一個 Map Task 線程。

      • 總結:?TaskManager 的總并行線程數上限 ≈?taskmanager.numberOfTaskSlots * max(每個Slot內共享的Task數)?通常配置為 Slot 數等于 CPU 核心數,且每個 Slot 運行一個 Task(即一個線程),這樣線程數大致等于核心數。

  5. Task.run() 執行流程:

    • 初始化階段:

      • 調用算子鏈中所有算子的?Operator#open()?方法(用戶初始化代碼在此執行)。

      • 向 JobMaster 注冊(確認任務開始執行)。

      • 建立與上游?ResultPartition?的網絡連接(初始化?InputGate)。

      • 通知下游消費者(InputGate)本任務的?ResultPartition?已就緒。

    • 主循環階段 (處理數據):

      • 對于?Source Task:?主動調用?SourceFunction.run()?產生數據。

      • 對于?非 Source Task:?循環從?InputGate?請求數據(BufferOrEvent)。

      • 數據到達后,InputGate?將數據傳遞給算子鏈的頭部算子。

      • 數據在算子鏈內流動(OneInputStreamOperator#processElement?/?TwoInputStreamOperator#processElement1/2),可能經過轉換、計算、聚合等操作。

      • 處理后的數據被算子鏈尾部的算子通過?ResultPartitionWriter?寫入到對應的?ResultPartition?中。

      • 網絡交互:?InputGate?從網絡緩沖區讀取數據(來自上游),ResultPartitionWriter?將數據寫入網絡緩沖區(發送給下游)。這部分由 Netty 線程高效處理,Task 線程主要處理業務邏輯。

      • 檢查點屏障處理:?當接收到檢查點屏障(Checkpoint Barrier)時,任務會觸發異步快照操作(對齊階段、異步快照狀態),完成后將屏障繼續傳遞給下游。

    • 結束階段:

      • 正常結束:?所有輸入數據處理完畢(對于流任務可能是收到終止信號),調用算子鏈中所有算子的?Operator#close()?方法(用戶清理代碼在此執行)。

      • 異常結束:?發生未捕獲異常,任務標記為失敗,向 JobMaster 報告失敗原因。JobMaster 根據策略(重啟策略)決定是否重新調度該任務。

      • 無論正常或異常結束,最終都會釋放占用的 Slot 資源(通知 JobMaster)并清理?ResultPartition?等資源。

五、數據傳輸

5.1 內存分配

LocalBufferPool是Task級別的

NetwordBufferPool是TM級別的

5.2 傳輸流程

我們這里只探討遠程TM之間的數據交互,本地交互沒有背壓,邏輯簡單,就不討論了

1.?每個產出數據會被序列化成字節數組存儲到PipelinedSubpartition中的一個內存塊(BufferConsumer)中

ArrayDeque<BufferConsumer> buffers

當內存塊滿了之后會去localbufferpool中再去申請內存塊

2.?第一個數據到達時,會申請內存塊的之后會通知下游,以后的數據不會通知

3. 通知到下游后下游會發送初始的credit

4. 上游將接收到的 Credit 值累加到該下游對應的?creditsAvailable?計數器上

5.??上游檢查?creditsAvailable > 0?并且?本地有數據 Buffer(之前申請的內存塊)?可發送。

6.?上游發送數據:?調用?writeAndFlushNextMessageIfPossible()(或其內部調用的方法)真正將 Buffer 數據封裝成 Netty 消息發送出去。

7.?每發送一個 Buffer,上游的?creditsAvailable?計數器減 1。

8. 5-7過程會不斷的循環

8. 下游每次收到數據會根據backlog(上游積壓數)選擇是否更新Credit,下游處理完Buffer也會更新Credit,如果更新的話上游就會重復第4步驟

9. 下游RemoteInputChannel收到buffer后會從內存池localbufferpool中申請內存塊存儲相應數據,申請到的內存塊放到

ArrayDeque<Buffer> receivedBuffers

中,并把自己放到等待處理的List中

ArrayDeque<InputChannel> inputChannelsWithData

10 下游有線程在不斷的循環從inputChannelsWithData中取出對應的channel,然后獲取channel中的receivedBuffers的buffer進行處理,默認采取阻塞模式讀,即如果inputChannelsWithData沒數據就會調用inputChannelsWithData.wait()

六、高可用

6.1 節點高可用

首先要看Flink的運行模式,

1. Local Mode (單機模式)

2. Standalone Cluster Mode (獨立集群模式)

3. Apache Hadoop YARN Mode

  • YARN Session Mode:

    • 先向 YARN 申請一個長期運行的 Flink 集群(包含 JobManager 和 TaskManager 容器)。這個集群就像一個資源池。

    • 用戶隨后可以向這個 Session 集群提交多個作業。這些作業共享集群資源(JobManager, TaskManagers)。

    • 類似 Standalone 集群,但資源由 YARN 管理。

  • YARN Per-Job Mode:

    • 每個提交的作業單獨向 YARN 申請資源。YARN 會為每個作業啟動一個專屬的 JobManager 和一組 TaskManager。

    • 作業之間資源隔離性最好。

    • 作業完成后,其占用的所有資源(JobManager + TaskManagers)會被釋放回 YARN。

  • YARN Application Mode:

    • 類似于 Per-Job Mode,也是為每個應用(Application)啟動獨立的集群。

    • 關鍵區別:?main()?方法在 JobManager 上執行(在 YARN 的 Application Master 容器內),而不是在提交作業的客戶端上執行。這解決了客戶端依賴和負載問題,特別適合需要復雜依賴或大量參數傳遞的應用。

    • 是目前在 YARN 上運行 Flink 生產作業的推薦模式(優于 Per-Job),因為它更好地解耦了客戶端和集群。

4. Kubernetes Native Mode (云原生模式的主要代表)

總結:因為我的工作環境用的是Per-Job Mode,所以我這里只討論Per-Job Mode

當TM節點掛了,Flink的恢復流程(來自DeepSeek):

  1. 停止所有數據處理:?當 JM 檢測到故障(TM 掛掉或自身掛掉被新 JM 接管)并決定啟動恢復流程時,它做的第一件事通常是停止整個作業當前的數據處理流(或至少停止受影響子圖的數據流)。這是為了確保一個干凈的恢復起點。

  2. 重啟所有任務:

    • TM 故障:?JM 會重啟所有受故障 TM 影響的任務(即那些在故障 TM 上運行的任務)。其他健康的 TM 上運行的任務不會被 JM 主動停止或重啟。

    • JM 故障:?在新的 Leader JM 接管后,它會重新調度并啟動整個作業的所有任務(相當于整個作業重啟)。

  3. 狀態恢復指令:?JM 會向所有需要啟動的任務(無論是新啟動替換故障任務的副本,還是健康 TM 上被要求重啟的任務,或者在 JM 故障恢復時所有任務)發送指令,明確要求它們從指定的最新成功檢查點恢復狀態

  4. 健康任務的狀態重置:?這是最關鍵的一點:

    • 那些在健康 TM 上持續運行的任務,雖然在物理進程上沒有失敗,但它們的內存狀態已經“超前”于我們選定的恢復點(最新的完整檢查點)。它們可能已經處理了檢查點之后的一些數據,更新了本地狀態。

    • 為了確保整個作業狀態的一致性(即所有任務都從完全相同的邏輯點開始),JM?必須命令這些健康的任務也丟棄掉它們當前內存中持有的、屬于檢查點之后的狀態更新,并從持久化存儲中重新加載該任務在選定檢查點時刻的狀態快照

    • 簡單說:所有任務(新啟動的副本、健康的任務)在恢復開始時,都需要將自己的狀態回滾到那個全局一致的檢查點狀態。健康任務“知道”自己該回滾到哪里,因為它們持續參與了檢查點機制,保存了狀態句柄。

  5. 數據重放與狀態對齊:?在所有任務(包括健康的和新的)都成功加載了檢查點狀態后:

    • JM 命令 Source 任務從檢查點對應的位置開始重新讀取數據(重放)。

    • 數據流重新建立。

    • 為了實現 Exactly-Once 語義,Flink 會進行狀態對齊:任務在開始處理重放數據中檢查點之后的部分之前,會等待接收齊所有上游在檢查點時刻“飛行中”的數據(這些數據在故障發生時可能未被下游處理完)。這確保了即使健康任務重置了狀態,它們也不會丟失或重復處理任何數據,狀態更新會基于重放的數據流精確地重新計算出來。

當JM掛了,Flink的恢復流程(來自DeepSeek):

基礎知識:

  1. ? 無預先部署的備 JM
    在 YARN per-job 模式下,每個作業獨立啟動一個 Application Master(AM),該 AM 即 Flink JobManager(JM)。不存在預先啟動的備用 JM 進程。作業運行期間只有?一個 JM 實例(即 AM)。

  2. ? ZK 的選舉對象是 JM 內部的領導者組件
    當配置?high-availability: zookeeper?時,ZK 的選舉作用發生在?單個 JM 內部,而非多個 JM 之間:

    • Flink JM 由多個子服務構成(如?Dispatcher,?ResourceManager,?JobMaster)。

    • ZK 用于選舉這些子服務的 Leader(例如選舉?Dispatcher?的 Leader),確保 JM?內部服務的高可用

    • 不是選舉多個 JM 實例中的主備!

  3. 🚀 JM 整體故障恢復由 YARN 接管
    當整個 JM(AM)進程崩潰時:

    • YARN ResourceManager(RM)?檢測到 AM 失敗。

    • YARN RM 根據配置的?yarn.application-attempts?重啟一個新的 AM 容器(即新 JM)。

    • 新 JM 啟動后,通過 ZK 獲取原作業的元數據指針(如 JobGraph 路徑),完成恢復。

流程:

  1. TM 會停止執行任務:

    • 當原 JM 崩潰時,它與其管理的所有 TM 之間的心跳和 RPC 連接會立即中斷。

    • TM 配置了?heartbeat.timeout。當 TM 在這個超時時間內無法收到來自 JM 的心跳響應或任何通信時,它會判定當前的 JM 已經失效。

    • 判定 JM 失效后,TM 會主動關閉自己正在執行的所有任務 (Tasks)。?這是標準行為,因為任務的協調者(JM)已經不存在了,繼續執行可能造成狀態不一致或數據丟失。

    • 關閉任務后,TM 本身不會立即退出。它會嘗試向 YARN ResourceManager 重新注冊(taskmanager.registration.initial-backoff?和?max-backoff?參數控制重試間隔和次數),等待新的 JM 來認領它。然而,在 JM 崩潰恢復期間,這個重注冊過程通常不會成功,因為新 JM 尚未完全啟動或尚未通知 YARN RM 它已準備好接收 TM。

  2. YARN 會清理原 AM 的容器:

    • YARN RM 檢測到 AM 失聯(通常是 AM-RM 心跳超時)后,會將該 AM 標記為失敗。

    • YARN RM 的一個核心職責是管理集群資源。當它判定一個 AM 失敗后,它會負責清理該 AM 申請的所有容器,包括該 AM 自己的容器(原 JM)和它申請的所有 TM 容器

    • 這個清理過程是通過?yarn.resourcemanager.amliveliness-monitor.expiry-interval-ms?(默認 600000ms) 和?yarn.am.liveness-monitor.expiry-interval-ms?(默認 600000ms) 等參數控制的。一旦超時,YARN RM 會主動發送?SIGKILL?信號給這些容器。

    • 因此,即使 TM 自己沒有主動退出(在等待重注冊),YARN RM 最終也會強制殺死這些 TM 容器,回收它們占用的資源(CPU、內存)。

  3. 新 JM 重新申請 TM 資源是正確的:

    • 由于步驟 1 和 2,原有的 TM 容器最終都會被清理掉(任務已停止,容器將被回收)。

    • 新 JM 啟動后,它面對的是一個“空”的 TaskManager 集群(舊的 TM 要么自己關閉了,要么被 YARN 殺死了)。

    • 為了恢復作業執行,新 JM?必須向 YARN RM 重新申請所需數量的 TaskManager 資源。這是唯一能獲得計算資源來運行任務的方式。

    • 新申請的 TM 是全新的容器實例,與之前的 TM 無關。

  4. 恢復過程:

    • 新 JM 從 ZK 獲取輕量級元數據(leader 信息、JobGraph HDFS 路徑、最新 Checkpoint HDFS 路徑)。

    • 新 JM 從 HDFS (high-availability.storageDir) 下載完整的恢復數據(JobGraph、Checkpoint 元數據、用戶 Jar 包)。

    • 新 JM 向 YARN RM 申請新的 TM 容器資源。

    • 新的 TM 容器啟動,向新 JM 注冊。

    • 新 JM 將 JobGraph 中的任務 (Tasks) 調度到新注冊的 TM 上運行。

    • 關鍵點:任務恢復:新 JM 指示所有任務(尤其是那些有狀態的任務)從 HDFS 上持久化的最新 Checkpoint 或 Savepoint 恢復它們的狀態。對于 Source 任務,它們會根據 Checkpoint/Savepoint 中記錄的讀取位置(例如 Kafka offset)進行“重放”,確保數據一致性(精確一次或至少一次語義,取決于配置)。

6.2 作業高可用

6.2.1 作業異常

平時我們經常會說,作業異常自動拉起,這個自動拉起指的是?Task (任務) 的重啟次數,而不是整個 TaskManager (TM) 進程的重啟次數。?這里的“拉起”更準確地說是任務的重新調度和執行

  • 當 Flink 作業中的一個 Task (例如一個 MapFunction、一個 FlatMapFunction 等算子實例) 由于各種原因失敗時(包括拋出未捕獲的異常),Flink 的容錯機制會根據配置的?重啟策略?來決定如何處理。

  • 重啟策略會嘗試在可用的資源(Slot)上重新啟動這個失敗的 Task。這個重啟過程可能發生在:

    • 同一個 TM 的另一個 Slot 上(如果該 TM 還有資源且健康)。

    • 另一個健康的 TM 上(集群模式)。

    • 甚至是同一個 Slot 上(如果 TM 進程沒有崩潰)。

  • 關鍵點:?重啟策略配置的“重啟次數”(例如?restart-strategy.fixed-delay.attempts: 3)限制的是同一個 Task 實例(或者更精確地說,是同一個 ExecutionVertex)被重新調度執行的次數上限

  • 如果重啟次數已達到上限:JobManager 會認為該 Task 無法成功恢復,進而導致整個 Job 失敗

6.2.1 狀態后端

有些作業需要記錄消費過程中的一些信息,Flink提供了狀態供我們存儲信息,并在作業崩潰后恢復

Flink提供了兩種狀態,一種是算子狀態,一種是鍵控狀態

  1. 算子狀態 (Operator State / Non-Keyed State)

    • 作用域:?綁定到一個并行算子實例(一個任務 Task)。同一個算子并行度內的不同實例,其 Operator State 是彼此獨立的。

    • 訪問方式:?不需要數據流按 Key 分區。算子內的所有數據都可以訪問這個狀態。

    • 典型應用場景:

      • Source 連接器:?存儲 Kafka 分區的偏移量 (offset)。每個 Source 實例負責讀取特定分區,存儲對應分區的 offset。

      • 維護全局計數器或聚合 (謹慎使用):?例如,統計整個作業處理的總記錄數(如果并行度改變,需要小心處理狀態重分配)。

      • 廣播狀態 (Broadcast State):?一種特殊的 Operator State。它把一個低吞吐流(廣播流)的狀態廣播給所有下游算子的并行實例,使得所有實例都持有完全相同的狀態副本。常用于動態配置規則、模式匹配等。

    • 狀態重新分配 (Rescaling):?當作業并行度改變時(如增加或減少 TaskManager 或 Slot),Flink 需要將 Operator State 重新分配到新的(可能更多或更少的)算子實例上。用戶通常需要實現?CheckpointedFunction?或?ListCheckpointed?接口來定義狀態如何拆分 (snapshotState) 和合并 (restoreState?/?initializeState)。

  2. 鍵控狀態 (Keyed State)

    • 作用域:?綁定到數據流中每個 Key。更準確地說,是綁定到鍵控流 (KeyedStream)?中每個 Key 的每個算子實例

    • 訪問方式:?必須在?KeyedStream?上使用。算子只能訪問當前處理數據所屬 Key 對應的狀態。不同 Key 的狀態是隔離的。

    • 數據結構:?Flink 提供了幾種原生的、類型化的數據結構來存儲 Keyed State:

      • ValueState?存儲單個值 (e.g.,?T)。每個 Key 對應一個值。

      • ListState?存儲一個值的列表 (e.g.,?List)。每個 Key 對應一個列表。

      • MapState?存儲一個鍵值映射 (e.g.,?Map)。每個 Key 對應一個 Map。

      • ReducingState?/?AggregatingState?存儲一個值,該值是應用了 ReduceFunction 或 AggregateFunction 后聚合的結果。新數據到來時會自動聚合更新狀態值。

    • 典型應用場景:?這是 Flink 中最常用、最強大的狀態類型。

      • 窗口聚合 (Windowed Aggregations):?計算每個 Key 在窗口內的計數、求和、最大值等。

      • 狀態機 (State Machines):?實現復雜事件處理 (CEP) 或模式檢測,狀態轉換依賴于當前 Key 的歷史事件序列。

      • 去重 (Deduplication):?記錄每個 Key 最近看到的事件 ID 或時間戳,用于檢測或過濾重復數據。

      • 實時特征計算:?如計算每個用戶的會話時長、點擊率、最近 N 次行為等。

6.2.2 狀態存儲位置

上面提到了狀態,那么狀態存儲在哪里呢?Flink提供了幾種不同的存儲方法

1.?MemoryStateBackend?(內存狀態后端)

  • 狀態存儲位置:?堆內內存 (Heap Memory)

  • 機制:

    • 工作狀態 (Working State):?直接存儲在 TaskManager JVM 的堆內存中。例如,ValueState,?ListState?等對象直接存在于 Java 堆上。

    • 檢查點 (Checkpoint):?當觸發 Checkpoint 時,狀態會被序列化,然后發送給 JobManager,并存儲在?JobManager 的堆內存中。

  • 特點:

    • 速度最快:?內存訪問,無序列化/反序列化開銷(工作狀態)。

    • 容量最小:?受限于 TaskManager 堆大小(工作狀態)和 JobManager 堆大小(檢查點)。僅適用于小狀態(例如測試、實驗、極小狀態流)

    • GC 壓力大:?大量狀態對象直接存在于堆上,會顯著增加垃圾回收負擔和停頓時間。

  • 總結:?純堆內存儲。

2.?FsStateBackend?(文件系統狀態后端)

  • 狀態存儲位置:?堆內內存 (Heap Memory) + 外部文件系統 (External File System)

  • 機制:

    • 工作狀態 (Working State):?存儲在 TaskManager JVM 的堆內存中。與?MemoryStateBackend?相同。

    • 檢查點 (Checkpoint):?當觸發 Checkpoint 時,狀態會被序列化,然后異步寫入配置的分布式文件系統或對象存儲(如 HDFS, S3, GCS, OSS, NFS 等)。

  • 特點:

    • 工作狀態在堆內:?訪問快,但受 TaskManager 堆大小限制。

    • 檢查點在外部:?狀態大小不再受 JobManager 內存限制,理論上只受文件系統容量限制,支持大狀態

    • 恢復較慢:?恢復時需要從外部存儲讀取并反序列化狀態。

    • 仍有 GC 壓力:?工作狀態在堆內,大狀態應用 GC 壓力依然顯著。

  • 總結:?工作狀態堆內存儲,檢查點外部存儲。

3.?RocksDBStateBackend?(RocksDB 狀態后端)

  • 狀態存儲位置:?堆外內存 (Off-Heap Memory) + 本地磁盤 (Local Disk) + 外部文件系統 (External File System)

  • 機制:

    • 工作狀態 (Working State):

      • 主要存儲:?存儲在?TaskManager 節點的本地磁盤文件中(由 RocksDB 管理)。

      • 內存緩存:

        • Write Buffer (MemTable):?新寫入/更新的數據首先進入位于?JVM 堆外內存的 MemTable(可配置大小)。

        • Block Cache:?用于緩存頻繁讀取的 SST 文件塊,位于?JVM 堆外內存(使用 Flink 的?Managed Memory?配額)。這是提升讀取性能的關鍵

    • 檢查點 (Checkpoint):

      • 增量 Checkpoint (默認推薦):?只將 RocksDB 中自上次 Checkpoint 以來變化的數據文件復制到配置的外部文件系統(如 HDFS, S3)。效率高。

      • 全量 Checkpoint:?將整個 RocksDB 數據庫目錄復制到外部文件系統。

6.2.3?分布式快照算法

上面提到了狀態的存儲,那對于一個流式系統來說,什么時候觸發狀態的存儲呢?

Flink一般管這種觸發點叫checkpoint,存儲的內容叫做checkpoint文件,它的生成基于?Chandy-Lamport算法

Chandy-Lamport算法用于有向無環圖的checkpoint生成,用于有環圖的算法也有,但我沒看懂

如果我們的job作業中沒有使用狀態,就是普通的消費數據,那其實是用不到checkpoint(不用的話任務重啟的時候可能有丟很少數據的風險,還是看業務了)

具體觸發checkpoint的流程為(依舊來自于deepseek)

  1. Checkpoint Coordinator 發起 Checkpoint:

    • JobManager 中的?CheckpointCoordinator?組件負責整個作業的 Checkpoint。

    • 它會周期性地(基于配置的?checkpointInterval)向 JobManager 中管理所有 TaskManager 的組件發送觸發消息。

    • 關鍵點:?這個觸發是?全局統一?的。CheckpointCoordinator?會為這次 Checkpoint 生成一個唯一的、單調遞增的 ID(稱為?checkpointId)。

  2. Barrier 注入:

    • JobManager 通過 TaskManager 向該作業的?所有 Source Task?發送觸發消息,消息中包含當前的?checkpointId

    • 多 Source 處理的核心:?每個 Source Task?在接收到觸發消息后:

      • 在自己的數據流中插入一個特殊的?Barrier (n),其中?n?就是當前的?checkpointId

      • 這個 Barrier 會被插入到 Source 當前正在處理或即將發出的?所有數據記錄之后

      • Source Task 開始執行自己狀態的快照(例如 Kafka Source 會記錄當前消費的 offset 集合)。

      • 重要:?每個 Source Task 是?獨立地、幾乎同時地?注入屬于同一個?checkpointId?的 Barrier。它們不需要彼此等待或同步注入時刻。Barrier 會跟隨正常的數據流向下游流動。

  3. Barrier 傳播與對齊:

    • Barrier 隨著數據記錄一起流向下游算子。

    • 當算子(Operator)有多個輸入流時(這正是多 Source 或 keyBy/rebalance 等操作導致的結果),Checkpoint 的關鍵機制?Barrier 對齊?開始發揮作用:

      • 對齊階段:?算子會為每個輸入通道維護一個小的緩沖區。當它從某個輸入通道接收到 Barrier?n?時:

        • 它會將該通道?阻塞,暫停從該通道消費數據(但該通道接收到的 Barrier?n?之后?的數據依然會被放入緩沖區暫存,不會被處理也不會被丟棄)。

        • 它繼續處理其他?尚未接收到 Barrier?n?的輸入通道的數據。

      • 快照觸發:?只有當算子從?所有?輸入通道都收到了 Barrier?n?時:

        • 算子知道,在它所有輸入流中,屬于 Checkpoint?n?之前的所有數據都已到達并被處理(或已緩存在 Barrier 前的緩沖區)。

        • 此時,算子:

          • 執行其自身狀態的快照(例如 WindowOperator 的快照包含已觸發但未清除的窗口狀態、定時器等)。

          • 將 Barrier?n?廣播?到它所有的輸出通道(給下游算子)。

        • 在廣播完 Barrier?n?之后,算子解除對所有輸入通道的阻塞,開始處理之前被阻塞通道緩沖區中的數據。

    • 單輸入算子:?如果算子只有一個輸入通道,它在接收到 Barrier?n?時,會立即執行狀態快照,然后將 Barrier?n?發送到其所有輸出通道。無需對齊。

  4. 狀態快照與異步寫入:

    • 當算子(無論是 Source、Transformation 還是 Sink)執行自身狀態快照時(步驟 2 和 3 中提到的):

      • 這個快照操作通常是?異步?執行的,以避免阻塞主數據處理線程。

      • 算子將狀態拷貝到一個臨時位置(內存或本地磁盤),然后由后臺線程異步地將這份拷貝寫入配置的?持久化存儲后端(如 HDFS, S3, NFS, RocksDB 等)。

      • Exactly-Once 保證的核心:?在 Barrier 之后到達的數據(屬于 Checkpoint?n+1)不會影響 Checkpoint?n?的狀態快照內容。Barrier 對齊確保了快照點的一致性。

  5. 確認 (Acknowledge) 與完成:

    • 當 TaskManager 上的一個?Task(算子的一個并行實例)成功完成?以下兩件事:

      1. 將自己負責的所有狀態(算子狀態和/或 Keyed State)異步寫入持久化存儲。

      2. 將 Barrier?n?發送給了它的所有下游任務(對于 Sink 任務,沒有下游,發送 Barrier 這一步自然完成)。

    • 它會向 JobManager 的?CheckpointCoordinator?發送一個?Ack 消息。這個消息包含:

      • checkpointId?(n)

      • 該 Task 的狀態句柄(State Handle) - 指向持久化存儲中該 Task 狀態文件位置的指針。

      • 該 Task 的算子 ID 和子任務索引(Subtask Index)。

    • 多 Source/多 Task 處理的核心:?CheckpointCoordinator?會?收集所有 Task?發來的針對?checkpointId = n?的 Ack 消息。每個 Task 都是獨立報告完成的。

  6. Checkpoint 完成:

    • 當?CheckpointCoordinator?收到了所有 Task?對同一個?checkpointId = n?的 Ack 消息后:

      • 它知道這次 Checkpoint?n?在?整個作業的所有 Task?上都已成功完成。

      • 它將這次 Checkpoint?n?的元數據(包括所有 State Handle 的集合、checkpointId、時間戳等)持久化?到高可用的存儲(如 ZooKeeper 或配置的 StateBackend 支持的元數據存儲)。

      • (可選)它可能會觸發一些完成后的操作,如刪除更早的、不再需要的 Checkpoint。

    • 此時,Checkpoint?n?正式完成,可用于故障恢復。

6.2.4 exactly once

經常在網上能看到,Flink支持excatly once,這個once到底是什么once呢?是Flink消費source消費一次嗎?還是只會在中間算子中只會計算一次?還是輸出只有一次?

這里的excatly once指的是對于Flink中狀態數據的計算只有一次,因為Flink的重啟,異常恢復等等,source中的數據會被多次消費,同一個數據在中間算子中也會被也有可能多次被計算,也可能多次輸出,但這些數據在整個流程中,只被統計了一次

七、新特性

7.1 窗口

首先Flink為什么要有窗口?

流數據本質上是無界的,但很多數據比如排序、TOP N等,都需要一個有界的數據來計算,所以設計窗口解決這部分排序

7.1.1 窗口存儲的位置

如果key stream的window,存儲在key state,如果是data stream的window,存儲在Operator state

7.1.2 分配器

一個窗口算子內部通常會管理著多個(通常是大量的、動態變化的)窗口實例,如對于每分鐘的滾動窗口 (TumblingEventTimeWindows.of(Time.minutes(1))),?分配器會為每個?2023-10-27 10:00:00?到?2023-10-27 10:01:00、?2023-10-27 10:01:00?到?2023-10-27 10:02:00?等這樣的時間區間創建一個窗口實例

?WindowAssigner(分配器)?是?WindowOperator?內部使用的核心組件。它的職責非常明確且關鍵:對于到來的每一個數據元素,根據該元素攜帶的信息(主要是時間戳,有時也包括元素本身),計算出這個元素應該被分配到哪些窗口實例中

7.13 觸發器

決定何時對窗口的內容調用窗口函數進行計算(并可能輸出結果)。

我們拿滾動窗口來舉例子

如果滾動窗口基于處理時間,那系統上會有個定時器來定時觸發窗口的計算

如果滾動窗口基于事件時間,那觸發窗口的計算就依賴于水位線

水位線代表 位于水位線表示時間之前的元素都已到達,如果水位線時間大于窗口時間,就會觸發窗口的計算

水位線一般是在source節點產出的,它夾雜在正常數據中發出

有的節點可能上游有四五個節點,那么它的水位線是其接收的最小的水位線,看起來有點像barrier對齊。

7.14 整體流程

給個示例代碼:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 設置事件時間// 1. 定義 WatermarkStrategy
WatermarkStrategy<Order> strategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // 允許10秒亂序.withTimestampAssigner((order, timestamp) -> order.getOrderTimestamp()); // 提取事件時間戳// 2. 應用 WatermarkStrategy 到數據源流
DataStream<Order> orderStream = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(strategy);// 3. 定義基于事件時間的窗口處理
orderStream.keyBy(Order::getProductId).window(TumblingEventTimeWindows.of(Time.minutes(5))) // 5分鐘滾動窗口.reduce((o1, o2) -> ...) // 或 aggregate/process.print();
  1. OrderSource?產生訂單事件流。

  2. assignTimestampsAndWatermarks(strategy)?算子:

    • 使用?TimestampAssigner?從每個?Order?對象中提取?orderTimestamp?作為該事件的事件時間戳。

    • 內部維護一個狀態:currentMaxTimestamp(當前觀察到的最大事件時間戳)。

    • 每當一個新事件到達,更新?currentMaxTimestamp = max(currentMaxTimestamp, orderTimestamp)

    • 周期性生成水位線(默認200ms):?每隔 200ms,計算新的水位線?watermark = currentMaxTimestamp - 10000(10秒),并將這個水位線記錄插入到輸出流中向下游傳播。

  3. 下游的?keyBy?和?window?算子:

    • 接收帶有事件時間戳的事件和水位線。

    • 對于每個 Key(ProductId),窗口算子維護著 5 分鐘長的窗口(如?[10:00, 10:05),?[10:05, 10:10))。

    • 當水位線?W(T)?到達窗口算子時,算子檢查所有 Key 的所有窗口:對于結束時間?<= T?的窗口(例如,當?T >= 10:05?時,[10:00, 10:05)?窗口結束),觸發該窗口的計算(執行?reduce?函數)。

    • 事件時間戳為?10:03?的訂單,即使它在系統處理時間?10:08才到達(假設之前?currentMaxTimestamp?已經是?10:07),只要它在?10:07 - 10s = 9:57?之后到達(允許10s亂序),它仍然會被正確地放入?[10:00, 10:05)?窗口并被計算。如果它在水位線?W(10:05)(對應?currentMaxTimestamp=10:15)之后才到達,它就會被視為遲到數據(默認丟棄)。

八、批流一體

前面說了很多在流式計算下的情況,我們都知道Flink也支持批處理,它是如何支持批處理的呢?

比如對于Join,可以來一條數據做一次點查,批處理一般是先把全量數據load到機器,然后發往下游算子,Flink如何平衡這種情況?

首先Flink會去判斷作業是批的還是流的,如果作業都是批處理則是批作業,反之有一個是流處理則是流作業。

對于批作業,Flink會使用一些批處理優化

在Flink批處理匯總,Flinkgroup by 、reduce等操作是上游算子全部計算完成才會去算下游算子,會發EndOfPartitionEvent表示上游數據已計算完成,并且區別于spark,Flink采用推送的方式寫數據,而不是Spark的拉取數據。

有個常見問題就是,既然flink對于批處理也采取了另外一種方式,為什么說flink是批流一體的呢?

以下是來自deepseekd的回答:

1. 統一的編程模型:寫一次,跑在批或流上

  • DataStream API:?開發者使用同一套?DataStream?API 編寫代碼。無論是處理 Kafka 實時流 (env.addSource(kafkaSource)) 還是處理 HDFS 歷史文件 (env.readTextFile(path)),代碼邏輯(算子鏈:map,?filter,?keyBy,?window,?join完全一致。引擎根據 Source 的?Boundedness?自動適配底層執行策略。

  • Table API / SQL (更高層抽象):

    • 用戶寫一條標準 SQL?SELECT ... FROM A JOIN B ON ...

    • Flink 優化器根據?A?和?B?是流表還是批表

      • 流表 -> 生成基于狀態管理 + 時間屬性的執行計劃。

      • 批表 -> 生成基于?Sort-Merge-Join / Hash-Join?的執行計劃。

    • 用戶無需修改查詢!?同一份 SQL 可同時用于實時報表(流)和歷史數據分析(批)。

對比“調度層分流”方案:?需要維護兩套代碼(Flink 流作業 + Spark 批作業),即使邏輯相同。


2. 統一的執行引擎核心 (Runtime)

  • 相同的任務調度與資源管理:?無論是流作業還是批作業,都提交給同一個 JobManager,使用相同的 TaskManager?資源池,基于相同的分布式快照 (Checkpoint/Savepoint) 機制做容錯/恢復。

  • 相同的狀態后端 (State Backend):?HashMapStateBackend?(內存),?EmbeddedRocksDBStateBackend?(磁盤) 同時服務于:

    • 流處理:?存儲窗口狀態、雙流 JOIN 狀態。

    • 批處理:?在某些優化(如部分聚合)或復雜批處理算法中臨時存儲中間狀態。

  • 相同的網絡協議與序列化:?基于 Netty 的網絡棧、高效的二進制序列化(如 Flink 自帶的 TypeSerializer)同時用于流式的 Pipelined Shuffle 和批式的 Blocking Shuffle。

對比“調度層分流”方案:?Flink 和 Spark 是兩個獨立引擎,調度器、網絡協議、內存管理、狀態存儲完全不同。


3. 統一的核心抽象:數據即流 (Data as Stream)

  • 核心哲學:?Flink 將批處理視為流處理的一個特例

    • 無界流 (Unbounded Stream):?永不結束的實時數據流(標準流處理)。

    • 有界流 (Bounded Stream):?有限、已知終點的歷史數據集(即批處理)。

  • 帶來的革命性優勢:

    • 一致的語義:?時間語義(Event Time/Processing Time)、窗口計算、狀態處理在批和流上具有完全一致的定義。例如,一個 Event Time 的滾動窗口在流上持續輸出,在批上一次性輸出最終結果,但邏輯相同。

    • 漸進式算法:?批處理算法可視為流處理算法在“輸入結束”時的最終快照。例如,批處理的?Sort-Merge-Join?是流式 JOIN 狀態管理在“數據有界”條件下的終極優化形態。

    • 真正的混合計算:?可輕松實現?“增量批處理”?或?“重放歷史流”

      • Lambda 架構的終結:?無需維護實時流鏈路(Flink)和離線批鏈路(Spark/Hive),一套 Flink 作業即可處理實時數據并定期重跑修正歷史結果(利用有界流重放)。

      • 流批同源:?同一份代碼處理實時事件流和歷史回填數據,保證結果一致性。

對比“調度層分流”方案:?流和批是割裂的概念,兩套系統計算結果可能因實現差異而不一致(如窗口觸發機制不同)。


4. 執行策略的“自適應”而非“分裂”

  • 關鍵認知:?您觀察到的差異(Push vs Event 觸發下游、Pipelined vs Blocking Shuffle)不是“兩套引擎”,而是同一引擎在統一模型下對不同數據特性(有界/無界)的智能適配

    • 流處理模式 (無界流):?優先保證?低延遲、持續處理?-> 采用?Pipelined?數據傳輸,所有算子常駐內存,背壓流量控制。

    • 批處理模式 (有界流):?優先保證?高吞吐、資源效率、利用有界性優化?-> 采用?Blocking?交換 +?EndOfPartitionEvent?觸發下游,允許中間結果落盤,使用 Sort-Merge 等算法。

  • 引擎的“一體性”體現在:

    • 自動切換:?引擎根據 Source 的?Boundedness?自動選擇執行策略,無需用戶干預。

    • 共享基礎設施:?兩種模式復用相同的網絡棧、序列化器、內存管理器、故障恢復機制Blocking Shuffle?在 Flink 中也是通過流式引擎的擴展實現的(只是加了同步點)。

    • 統一優化器:?Table API/SQL 的優化器是統一的,根據輸入性質生成最優物理計劃。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87078.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87078.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87078.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

linux 操作docker的基本命令docker倉庫

基本操作命令 docker run --nametest-host -itd centos7.6 /bin/bash 通過鏡像創建容器 登錄容器 [rootdocker101 ~]# docker exec -it test-host /bin/bash &#xff08;exec是執行&#xff0c;i是交互式。t叫tty&#xff09; 或者container id [rootdocker101 ~]# doc…

Netty學習路線圖 - 第四階段:Netty基礎應用

Netty學習路線圖 - 第四階段&#xff1a;Netty基礎應用 &#x1f4da; Netty學習系列之四 本文是Netty學習路線的第四篇&#xff0c;我們將用大白話講解Netty的基礎應用&#xff0c;帶你從理論走向實踐。 寫在前面 大家好&#xff01;在前面三篇文章中&#xff0c;我們學習了J…

開源項目推薦:MCP Registry——管理MCP服務器的利器

探索MCP Registry:未來模型上下文協議的核心注冊服務 隨著人工智能技術的迅速發展,機器學習模型的管理和配置變得愈發重要。今天,我們將探索一個頗具潛力的開源項目——MCP Registry。這是一個由社區驅動的注冊服務,專為模型上下文協議(Model Context Protocol,簡稱MCP)…

Spring Boot 統一功能處理:攔截器詳解

一、攔截器核心概念 作用&#xff1a;攔截器是 Spring 框架提供的核心功能&#xff0c;用于在請求處理前后執行預定義邏輯&#xff0c;實現統一處理&#xff08;如登錄校驗、日志記錄等&#xff09;。 核心方法&#xff1a; public class LoginInterceptor implements Handl…

在docker容器中安裝docker服務,基于fuse-overlayfs進行overlay掛載,而不是vfs

1、docker 安裝 正常安裝docker軟件&#xff0c;運行docker時&#xff0c;會提示&#xff1a;No docker socket 服務 2、啟動docker服務&#xff08;包含守護進程&#xff09; systemctl start docker #dockerd &if ! ps aux | grep -v grep | grep -q "dockerd&qu…

虛擬機配置注意事項

一.VM大部分產品免費&#xff0c;遇到付費的要斟酌一下 在小編之前的文章中有簡單下載VM的教程VMwareWorkstPro安裝-CSDN博客 二.配置過程中的設置大部分都可以在配置完成后更改 例如下圖設備所涉及到的&#xff0c;都是可以更改設置的 三.電腦關機時&#xff0c;要注意先把…

openGL+QT快速學習和入門案列

openGLQT快速學習和入門案列

深度學習03 人工神經網絡ANN

什么是神經網絡 人工神經網絡&#xff08; Artificial Neural Network&#xff0c; 簡寫為ANN&#xff09;也簡稱為神經網絡&#xff08;NN&#xff09;,是一種模仿生物神經網絡結構和功能的計算模型,人腦可以看做是一個生物神經網絡,由眾多的神經元連接而成.各個神經元傳遞復…

Linux中部署Jenkins保姆間教程

本文將以docker的方式&#xff0c;講述如何部署Jenkins 一、拉取Jenkins鏡像 1.1 最新版Jenkins介紹 最新版Jenkins地址&#xff1a;Download and deploy 當前最新版的如下圖所示&#xff1a; 1.2 各版本支持的JDK版本 地址如下&#xff1a;Java Support Policy 如果你安裝…

【軟考中級·軟件評測師】下午題·面向對象測試之架構考點全析:分層、分布式、微內核與事件驅動

一、分層架構&#xff1a;分層獨立與質量特性的雙向約束 分層架構通過“垂直分層&#xff08;表示層→服務層→業務邏輯層→數據層&#xff09;”實現職責隔離&#xff0c;是Web應用、企業級系統的主流架構模式。 1. 父類成員函數重測場景 子類繼承父類時&#xff0c;若父類…

C++ 快速回顧(五)

C 快速回顧&#xff08;五&#xff09; 前言一、Dll和Lib的區別區別在開發中使用 二、封裝并使用C庫1.封裝庫2.使用庫 三、封裝并使用C庫1.封裝庫2.使用庫 前言 用于快速回顧之前遺漏或者補充C知識 一、Dll和Lib的區別 靜態庫&#xff08;LIB&#xff09;在編譯時鏈接&#…

【ARM】解決ArmDS的工程沒有生成Map文件的問題

1、 文檔目標 在嵌入式開發過程中&#xff0c;使用Arm Development Studio&#xff08;簡稱ArmDS&#xff09;進行項目構建時&#xff0c;Map文件的生成是調試和分析代碼的重要環節。Map文件不僅記錄了程序中各個段&#xff08;sections&#xff09;的內存分布情況&#xff0c…

Java如何導出word(根據模板生成),通過word轉成pdf,放壓縮包

<!-- 導出word文檔所需依賴--><dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</artifactId><version>1.10.0-beta</version></dependency><dependency><groupId>org.apache.poi</gr…

【C#】 DevExpress.XtraEditors.SidePanel

DevExpress.XtraEditors.SidePanel&#xff0c; 它是 DevExpress 提供的“側邊滑出”面板&#xff08;類似于抽屜、浮動信息區&#xff09;&#xff0c;非常適合做可隱藏的參數區、幫助區、臨時交互區等。 SidePanel 用法核心點 1. 基本用法 可容納其它控件&#xff0c;就像普…

1.1_2 計算機網絡的組成和功能

在這個視頻中&#xff0c;我們會探討計算機網絡的組成和功能。我們會從三個視角去探討計算機網絡由哪些部分組成&#xff0c;其次&#xff0c;我們會簡單的了解計算機網絡的功能。 首先我們可以把計算機網絡看作是由硬件、軟件和協議共同組成的一個龐大復雜的系統。首先在硬件上…

Linux驅動學習day11(定時器)

定時器 定時器主要作用就是&#xff1a;設置超時時間&#xff0c;執行超時函數。 按鍵按下存在抖動&#xff0c;為了消除抖動可以設置定時器&#xff0c;如上圖所示&#xff0c;按下一次按鍵會產生多次抖動&#xff0c;即會產生多次中斷&#xff0c;在每次中斷產生的時候&…

Java 編程之觀察者模式詳解

一、什么是觀察者模式&#xff1f; 觀察者模式&#xff08;Observer Pattern&#xff09;是一種行為型設計模式&#xff0c;用于對象之間的一對多依賴關系&#xff1a;當被觀察對象&#xff08;Subject&#xff09;狀態發生變化時&#xff0c;所有依賴它的觀察者&#xff08;O…

【C++】經典string類問題

目錄 1. 淺拷貝 2. 深拷貝 3. string類傳統寫法 4. string類現代版寫法 5. 自定義類實現swap成員函數 6. 標準庫swap函數的調用 7. 引用計數和寫時拷貝 1. 淺拷貝 若string類沒有顯示定義拷貝構造函數與賦值運算符重載&#xff0c;編譯器會自動生成默認的&#xff0c…

kotlin中object:的用法

在Kotlin中&#xff0c;object: 用于聲明匿名對象&#xff08;Anonymous Object&#xff09;&#xff0c;這是實現接口或繼承類的輕量級方式&#xff0c;無需顯式定義具名類。以下是核心用法和場景&#xff1a; 1. 基本語法 val obj object : SomeInterface { // 實現接口ov…

js代碼04

題目 非常好。我們剛剛看到了回調函數在處理多個異步操作時會變得多么混亂&#xff08;回調地獄&#xff09;。為了解決這個問題&#xff0c;現代 JavaScript 提供了一個更強大、更優雅的工具&#xff1a;Promise。 Promise&#xff0c;正如其名&#xff0c;是一個“承諾”。…