Flink的 RecordWriter 數據通道 詳解

????????本文從基礎原理到代碼層面逐步解釋?Flink?的RecordWriter 數據通道,盡量讓初學者也能理解。


1. 什么是?RecordWriter

通俗理解

????????RecordWriter?是 Flink 中負責將數據從一個任務(Task)發送到下游任務的組件。想象一下,Flink 是一個巨大的工廠,數據像流水線上的包裹,RecordWriter?就是負責把包裹打包、貼上地址標簽,然后通過“傳送帶”送到下一個站點的工人。

????????在 Flink 的分布式計算中,數據處理分為多個并行任務(Task),每個任務可能需要把自己的處理結果發送給其他任務(比如下游的計算節點)。RecordWriter?的作用是:

  • 序列化數據:把數據變成可以在網絡上傳輸的字節流。
  • 分配數據:決定數據應該發送到哪個下游任務(基于分區策略,比如 keyBy)。
  • 發送數據:通過底層的網絡通道(比如 Netty)把數據傳出去。

官方定義

????????根據 Flink 官方文檔,RecordWriter?是 Flink 數據流(DataStream)處理中用于將記錄(Record)寫入到輸出通道的核心組件。它是 Flink 運行時(Runtime)層的一部分,位于任務的輸出端,負責將上游算子處理后的數據發送到下游算子的輸入端。


2.?RecordWriter?的工作原理(宏觀視角)

????????為了讓非專業人士理解,我們先從高層次看?RecordWriter?的工作流程,之后再深入到代碼和底層細節。

工作流程(類比快遞分揀)

  1. 接收包裹(數據記錄)RecordWriter?從上游算子(比如 Map 或 Filter)接收到一條數據記錄(Record),就像快遞員拿到一個包裹。
  2. 貼標簽(分區決策):根據用戶定義的分區策略(比如 keyBy 或 broadcast),RecordWriter?決定這個包裹要送到哪個下游站點(下游子任務)。
  3. 打包(序列化):包裹不能直接扔到傳送帶上,RecordWriter?會把數據“打包”成字節流(序列化),方便在網絡上傳輸。
  4. 選擇傳送帶(通道選擇):Flink 的任務之間通過邏輯通道(Channel)連接,RecordWriter?選擇合適的通道(對應下游的子任務)。
  5. 送上傳送帶(發送數據)RecordWriter?把打包好的數據通過底層的網絡棧(Netty)發送到下游任務。

核心問題

  • 如何確保數據高效傳輸??Flink 使用緩沖區(Buffer)管理數據,避免頻繁的網絡調用。
  • 如何保證數據順序或分區正確??依賴分區器(Partitioner)和通道選擇器(ChannelSelector)。
  • 如何處理分布式環境中的復雜性??Flink 的運行時通過?ResultPartition?和?RecordWriter?抽象化網絡通信。

3. 深入?RecordWriter?的源碼實現

????????現在我們結合 Flink 源碼(基于 1.17 版本),從底層逐步分析?RecordWriter?的實現。我會用注釋和偽代碼的方式解釋關鍵部分,并盡量用類比讓邏輯清晰。

3.1?RecordWriter?的類結構

????????RecordWriter?的核心代碼位于?org.apache.flink.runtime.io.network.api.writer?包中。主要類是?RecordWriter,它是一個抽象類,實際使用的是其子類,比如?RecordWriterDelegate?或?ChannelSelectorRecordWriter

public abstract class RecordWriter<T> {protected final ResultPartitionWriter partitionWriter; // 輸出分區protected final int numberOfChannels; // 下游通道數量protected final Random random; // 用于隨機分區protected RecordWriter(ResultPartitionWriter writer) {this.partitionWriter = writer;this.numberOfChannels = writer.getNumberOfSubpartitions();this.random = new Random();}// 核心方法:發送一條記錄public abstract void emit(T record) throws IOException, InterruptedException;
}
  • ResultPartitionWriterRecordWriter?依賴的分區寫入器,負責管理輸出緩沖區和實際的網絡發送。
  • numberOfChannels:下游子任務的數量,決定了數據可以發送到多少個通道。
  • emit:核心方法,負責將一條記錄發送出去。

3.2 數據發送的核心流程(emit 方法)

????????emit?方法是?RecordWriter?的核心入口,我們以?ChannelSelectorRecordWriter(支持自定義分區策略的實現)為例,逐步分析其實現。

源碼分析(簡化和注釋)

以下是?ChannelSelectorRecordWriter?的?emit?方法的核心邏輯(簡化版,帶詳細注釋):

public class ChannelSelectorRecordWriter<T> extends RecordWriter<T> {private final ChannelSelector<T> channelSelector; // 通道選擇器(決定分區)private final SerializationDelegate<T> serializationDelegate; // 序列化代理public ChannelSelectorRecordWriter(ResultPartitionWriter writer,ChannelSelector<T> channelSelector,SerializationDelegate<T> serializationDelegate) {super(writer);this.channelSelector = channelSelector;this.serializationDelegate = serializationDelegate;}@Overridepublic void emit(T record) throws IOException, InterruptedException {// 1. 設置待序列化的記錄serializationDelegate.setInstance(record);// 2. 使用通道選擇器決定目標通道int channelIndex = channelSelector.selectChannel(record);// 3. 將記錄寫入目標通道的緩沖區partitionWriter.emitRecord(serializationDelegate.getSerializedData(), // 序列化后的數據channelIndex // 目標通道索引);}
}
步驟拆解與類比
  1. 設置記錄(serializationDelegate.setInstance)

    • 類比:快遞員拿到包裹,先登記包裹內容。
    • 原理serializationDelegate?是一個序列化代理,負責將用戶的數據(比如 Java 對象)變成字節流。Flink 使用?SerializationDelegate?包裝用戶記錄,延遲實際序列化操作,以提高性能。
    • 源碼細節serializationDelegate.setInstance(record)?只是簡單地將記錄存儲到代理對象中,實際序列化發生在后續的?getSerializedData?調用時。
  2. 選擇通道(channelSelector.selectChannel)

    • 類比:快遞員根據包裹上的地址標簽,決定送到哪個分揀中心。
    • 原理ChannelSelector?是 Flink 提供的分區邏輯接口,用戶可以通過?keyBybroadcast?等算子自定義分區策略。selectChannel?方法返回一個整數(channelIndex),表示數據應該發送到哪個下游子任務。
    • 常見實現
      • KeyGroupStreamPartitioner:基于 Key 的哈希分區(keyBy)。
      • BroadcastPartitioner:將數據廣播到所有下游子任務。
      • ForwardPartitioner:直接發送到對應的下游任務(一對一)。
    • 推導
      • 假設用戶定義了?keyBy(x -> x.getId())ChannelSelector?會提取記錄的?id?字段,計算哈希值(比如?id.hashCode()),然后通過取模(hash % numberOfChannels)決定目標通道。
      • 公式:channelIndex=hash(key)mod??numberOfChannels
      • 這確保相同?key?的記錄總是發送到同一個下游任務,滿足 keyBy 的語義。
  3. 寫入緩沖區(partitionWriter.emitRecord)

    • 類比:快遞員把包裹裝進集裝箱(緩沖區),等待卡車運走。
    • 原理ResultPartitionWriter?是 Flink 運行時中管理輸出分區的組件。emitRecord?方法將序列化后的數據寫入目標通道的緩沖區(Buffer)。Flink 使用內存池(MemoryPool)管理緩沖區,避免頻繁分配內存。
    • 源碼細節
      public void emitRecord(BufferBuilder bufferBuilder, int targetSubpartition)throws IOException, InterruptedException {// 將序列化數據寫入 BufferBuilderBufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();// 添加到目標子分區的隊列addBufferConsumer(bufferConsumer, targetSubpartition);
      }
      
      • BufferBuilder:用于構建緩沖區,負責將數據寫入內存。
      • BufferConsumer:表示一個可消費的緩沖區,供下游任務讀取。
      • addBufferConsumer:將緩沖區加入目標子分區的隊列,等待網絡層發送。

3.3 序列化與緩沖區管理

序列化和緩沖區是?RecordWriter?性能的關鍵。

  • 序列化

    • Flink 使用?TypeSerializer(用戶定義或自動推導)將數據對象轉為字節流。
    • 類比:把包裹的內容拍成照片(字節流),方便通過網絡傳輸。
    • 源碼:SerializationDelegate.getSerializedData?調用?TypeSerializer.serialize
      public class SerializationDelegate<T> {private T instance;private final TypeSerializer<T> serializer;public StreamElement getSerializedData() throws IOException {// 使用序列化器將 instance 轉為字節流return serializer.serialize(instance);}
      }
      
  • 緩沖區管理

    • Flink 的緩沖區基于?NetworkBufferPool,每個緩沖區是一個固定大小的內存塊(默認 32KB)。
    • 類比:快遞員把多個小包裹裝進一個大集裝箱,避免頻繁調用卡車。
    • BufferBuilder?動態分配緩沖區,當緩沖區滿時,會觸發?BufferConsumer?的創建,并交給?ResultPartitionWriter

3.4 網絡傳輸

  • 底層實現RecordWriter?不直接處理網絡傳輸,而是通過?ResultPartitionWriter?將緩沖區交給 Flink 的網絡棧(基于 Netty)。
  • 類比:集裝箱裝滿后,卡車(Netty)把數據送到下游站點。
  • 原理
    • ResultPartitionWriter?將緩沖區寫入?PipelinableSubpartition?的隊列。
    • Flink 的網絡層定期檢查隊列,使用 Netty 的 Channel 將數據發送到下游 TaskManager。
    • Netty 使用 TCP 協議,確保數據可靠傳輸。

4. 完整步驟總結(帶推導)

????????為了讓初學者徹底理解,我將?RecordWriter?的工作流程總結為以下步驟,并為每一步提供通俗解釋和公式推導(如果適用)。

  1. 接收數據記錄

    • 描述:上游算子調用?RecordWriter.emit(record),傳入一條數據。
    • 類比:快遞員收到一個包裹。
    • 推導:無復雜計算,只是將?record?傳遞給?serializationDelegate
  2. 選擇目標通道

    • 描述:ChannelSelector.selectChannel(record)?返回目標通道索引。
    • 類比:快遞員看包裹地址,決定送到哪個分揀中心。
    • 推導:
      • 對于?keyBy?分區:
        • 提取 key:key=keySelector(record)
        • 計算哈希:hash=key.hashCode()
        • 選擇通道:channelIndex=hashmod??numberOfChannels
      • 對于廣播分區:返回所有通道索引。
      • 公式:channelIndex=f(record,numberOfChannels)
  3. 序列化數據

    • 描述:serializationDelegate.getSerializedData()?將記錄轉為字節流。
    • 類比:把包裹內容壓縮成數字信號。
    • 推導:序列化過程依賴?TypeSerializer,復雜度為?O(size?of?record)。
  4. 寫入緩沖區

    • 描述:partitionWriter.emitRecord?將字節流寫入目標通道的緩沖區。
    • 類比:把包裹裝進集裝箱。
    • 推導:
      • 緩沖區大小固定(默認 32KB)。
      • 如果緩沖區滿,觸發?BufferBuilder.finish(),創建一個新的?BufferConsumer
      • 公式:bufferSize≤maxBufferSize
  5. 發送數據

    • 描述:緩沖區通過 Netty 傳輸到下游任務。
    • 類比:卡車把集裝箱運到下一個站點。
    • 推導:網絡傳輸的吞吐量取決于 Netty 的配置(線程數、TCP 參數等)。

5. 非專業人士的通俗總結

如果你完全不了解編程或分布式系統,可以把?RecordWriter?想象成一個智能快遞員:

  • 任務:把包裹(數據)從一個工廠(任務)送到正確的下游工廠。
  • 步驟
    1. 拿到包裹,檢查地址(分區策略)。
    2. 把包裹壓縮打包(序列化)。
    3. 裝進集裝箱(緩沖區)。
    4. 選擇正確的傳送帶(通道)。
    5. 交給卡車(網絡)運走。
  • 聰明之處
    • 它會根據包裹的類型(key)確保送到正確的下游工廠。
    • 它會攢夠一車包裹再送(緩沖區),避免浪費時間。
    • 它還能同時處理很多包裹(并行處理)。

6. 常見問題解答(Q&A)

Q1:RecordWriter?如何保證數據不丟失?

  • :Flink 的?RecordWriter?通過緩沖區和 Netty 的可靠傳輸(TCP)確保數據不丟失。如果下游任務失敗,Flink 的檢查點(Checkpoint)機制會回滾并重試。

Q2:為什么需要序列化?

  • :序列化把復雜的數據對象(比如 Java 類)變成字節流,方便通過網絡傳輸。就像把一本書的內容拍成照片,方便快遞寄出。

Q3:ChannelSelector?怎么決定分區的?

  • ChannelSelector?根據用戶定義的邏輯(比如?keyBy?的 key)計算目標通道。對于?keyBy,它用哈希函數確保相同 key 的數據總是送到同一個下游任務。

7. 結合官方文檔的補充

根據 Flink 官方文檔(https://flink.apache.org/):

  • RecordWriter?是 Flink 運行時網絡棧的一部分,位于?ResultPartition?和下游?InputGate?之間。
  • 它支持多種分區策略(StreamPartitioner),用戶可以通過?DataStream?API 靈活配置。
  • Flink 的網絡傳輸基于高效的緩沖區管理和 Netty 框架,RecordWriter?是這一流程的起點。

文檔中還提到,RecordWriter?的設計目標是:

  • 高吞吐量:通過緩沖區批量發送數據。
  • 低延遲:優化序列化和通道選擇邏輯。
  • 靈活性:支持用戶自定義分區策略。

8. 總結

????????RecordWriter?是 Flink 數據流處理中不可或缺的組件,負責將數據高效、正確地發送到下游任務。通過序列化、分區選擇、緩沖區管理和網絡傳輸,它實現了分布式環境下數據流的可靠傳遞。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/75473.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/75473.shtml
英文地址,請注明出處:http://en.pswp.cn/web/75473.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Dubbo、HTTP、RMI之間的區別

Dubbo、HTTP、RMI之間的區別如下&#xff1a; 表格 復制 特性DubboHTTPRMI通信機制基于Netty的NIO異步通信&#xff0c;采用長連接&#xff0c;支持多種序列化方式基于標準的HTTP協議&#xff0c;無狀態&#xff0c;每次請求獨立基于Java原生的RMI機制&#xff0c;支持Java對…

wkhtmltopdf生成圖片的實踐教程,包含完整的環境配置、參數解析及多語言調用示例

歡迎來到濤濤聊AI&#xff0c;最近在研究HTML生成卡片的功能&#xff0c;一起學習下吧。 一、工具特性與安裝 wkhtmltoimage是基于WebKit引擎的開源命令行工具&#xff0c;可將HTML網頁轉換為JPG/PNG等圖片格式&#xff0c;支持CSS渲染、JavaScript執行和響應式布局。安裝方式…

【在Node.js項目中引入TypeScript:提高開發效率及框架選型指南】

一、TypeScript在Node.js中的核心價值 1.1 靜態類型檢測 // 錯誤示例&#xff1a;TypeScript會報錯 function add(a: number, b: string) {return a b }1.2 工具鏈增強 # 安裝必要依賴 npm install --save-dev typescript types/node ts-node tsconfig.json1.3 代碼維護性提…

化工企業數字化轉型:從數據貫通到生態重構的實踐路徑

一、戰略定位&#xff1a;破解行業核心痛點 化工行業面臨生產安全風險高&#xff08;全國危化品企業事故率年增5%&#xff09;、能耗與排放壓力大&#xff08;占工業總能耗12%&#xff09;、供應鏈協同低效&#xff08;庫存周轉率低于制造業均值30%&#xff09;三大挑戰。《石…

C#網絡編程(Socket編程)

文章目錄 0、寫在前面的話1、Socket 介紹1.1 Socket是什么1.2 Socket在網絡中的位置 2、C# 中的Socket參數2.1 超時控制參數2.2 緩沖區參數2.3 UDP專用參數 3、C# 中的Socket API3.1 Socket&#xff08;構造函數&#xff09;3.1.1 SocketType3.1.2 ProtocolType3.1.3 AddressFa…

Docker部署ES集群

引言&#xff1a; Elasticsearch&#xff08;ES&#xff09;作為分布式搜索引擎&#xff0c;其核心價值在于通過集群部署實現高可用性和數據冗余。 本實驗對比兩種典型部署方案&#xff1a; 原生Linux部署&#xff1a;直接安裝ES服務&#xff0c;適用于生產環境&#xff0c;資…

老硬件也能運行的Win11 IoT LTSC (OEM)物聯網版

#記錄工作 Windows 11 IoT Enterprise LTSC 2024 屬于物聯網相關的版本。 Windows 11 IoT Enterprise 是為物聯網設備和場景設計的操作系統版本。它通常針對特定的工業控制、智能設備等物聯網應用進行了優化和定制&#xff0c;以滿足這些領域對穩定性、安全性和長期支持的需求…

【教程】xrdp修改遠程桌面環境為xfce4

轉載請注明出處&#xff1a;小鋒學長生活大爆炸[xfxuezhagn.cn] 如果本文幫助到了你&#xff0c;歡迎[點贊、收藏、關注]哦~ 目錄 xfce4 vs GNOME對比 配置教程 1. 安裝 xfce4 桌面環境 2. 安裝 xrdp 3. 配置 xrdp 使用 xfce4 4. 重啟 xrdp 服務 5. 配置防火墻&#xff…

【數據結構 · 初階】- 順序表

目錄 一、線性表 二、順序表 1.實現動態順序表 SeqList.h SeqList.c Test.c 問題 經驗&#xff1a;free 出問題&#xff0c;2種可能性 解決問題 &#xff08;2&#xff09;尾刪 &#xff08;3&#xff09;頭插&#xff0c;頭刪 &#xff08;4&#xff09;在 pos 位…

windows主機中構建適用于K8S Operator開發環境

基于win 10 打造K8S應用開發環境&#xff08;wsl & kind&#xff09; 一、wsl子系統安裝 1.1 確認windows系統版本 cmd/powershell 或者win r 運行winver 操作系統要> 19044 1.2 開啟wsl功能 控制面板 -> 程序 -> 啟用或關閉Windows功能 開啟適用于Linu…

計算機視覺色彩空間全解析:RGB、HSV與Lab的實戰對比

計算機視覺色彩空間全解析&#xff1a;RGB、HSV與Lab的實戰對比 一、前言二、RGB 色彩空間?2.1 RGB 色彩空間原理?2.1.1 基本概念?2.1.2 顏色混合機制? 2.2 RGB 在計算機視覺中的應用?2.2.1 圖像讀取與顯示?2.2.2 顏色識別?2.2.3 RGB 色彩空間的局限性? 三、HSV 色彩空…

PyTorch多GPU訓練實戰:從零實現到ResNet-18模型

本文將介紹如何在PyTorch中實現多GPU訓練&#xff0c;涵蓋從零開始的手動實現和基于ResNet-18的簡潔實現。代碼完整可直接運行。 1. 環境準備與庫導入 import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l from torchvisio…

micro介紹

micro介紹 Micro 的首要特點是易于安裝&#xff08;它只是一個靜態的二進制文件&#xff0c;沒有任何依賴關系&#xff09;和易于使用Micro 支持完整的插件系統。插件是用 Lua 編寫的&#xff0c;插件管理器可自動為你下載和安裝插件。使用簡單的 json 格式配置選項&#xff0…

Linux內核分頁——線性地址結構

每個進程通過一個指針&#xff08;即進程的mm_struct→pgd&#xff09;指向其專屬的頁全局目錄&#xff08;PGD&#xff09;&#xff0c;該目錄本身存儲在一個物理頁框中。這個頁框包含一個類型為pgd_t的數組&#xff0c;該類型是與架構相關的數據結構&#xff0c;定義在<as…

微信小程序開發:微信小程序上線發布與后續維護

微信小程序上線發布與后續維護研究 摘要 微信小程序作為移動互聯網的重要組成部分,其上線發布與后續維護是確保其穩定運行和持續優化的關鍵環節。本文從研究學者的角度出發,詳細探討了微信小程序的上線發布流程、后續維護策略以及數據分析與用戶反饋處理的方法。通過結合實…

分享一些使用DeepSeek的實際案例

文章目錄 前言職場辦公領域生活領域學習教育領域商業領域技術開發領域 前言 以下是一些使用 DeepSeek 的實際案例&#xff1a; DeepSeek使用手冊資源鏈接&#xff1a;https://pan.quark.cn/s/fa502d9eaee1 職場辦公領域 行業競品分析&#xff1a;剛入職的小李被領導要求一天內…

flink iceberg寫數據到hdfs,hive同步讀取

目錄 1、組件版本 環境變量配置 2、hadoop配置 hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml 3、hive配置 hive-env.sh hive-site.xml HIVE LIB 原始JAR 4、flink配置集成HDFS和YARN 修改iceberg源碼 編譯iceberg-flink-runtime-1…

qq郵箱群發程序

1.界面設計 1.1 環境配置 在外部工具位置進行配置 1.2 UI界面設計 1.2.1 進入QT的UI設計界面 在pycharm中按順序點擊&#xff0c;進入UI編輯界面&#xff1a; 點擊第三步后進入QT的UI設計界面&#xff0c;通過點擊按鈕進行界面設計&#xff0c;設計后進行保存到當前Pycharm…

【C++游戲引擎開發】第10篇:AABB/OBB碰撞檢測

一、AABB(軸對齊包圍盒) 1.1 定義 ?最小點: m i n = ( x min , y min , z min ) \mathbf{min} = (x_{\text{min}}, y_{\text{min}}, z_{\text{min}}) min=(xmin?,ymin?,zmin?)?最大點: m a x = ( x max , y max , z max ) \mathbf{max} = (x_{\text{max}}, y_{\text{…

大模型是如何把向量解碼成文字輸出的

hidden state 向量 當我們把一句話輸入模型后&#xff0c;例如 “Hello world”&#xff1a; token IDs: [15496, 995]經過 Embedding Transformer 層后&#xff0c;會得到每個 token 的中間表示&#xff0c;形狀為&#xff1a; hidden_states: (batch_size, seq_len, hidd…