Apache Flink Kafka 寫連接器源碼深度剖析

一、架構概述

Apache Flink 提供的 Kafka 寫入連接器是實現與 Kafka 消息隊列集成的關鍵組件,支持多種語義保證和靈活配置選項。本文將深入分析 Flink Kafka 寫入連接器的源碼實現,包括架構設計、核心類、事務機制和性能優化等方面。

1.1 整體架構

Flink Kafka 寫入連接器的核心組件包括:

  • KafkaSink:寫入器的入口點,負責配置和創建寫入器
  • KafkaWriter:實際執行消息寫入的工作類
  • KafkaSerializationSchema:消息序列化接口
  • KafkaCommittableManager:管理事務提交的組件
  • FlinkKafkaProducer:舊版 Kafka 寫入器實現(基于 RichSinkFunction)

整體數據流路徑為:Flink 處理數據 -> SerializationSchema 序列化消息 -> KafkaWriter 寫入 Kafka。

二、核心類與實現

2.1 KafkaSink 與構建器

KafkaSink 是創建 Kafka 寫入器的主要入口點,采用構建器模式配置各項參數:

// KafkaSink.java
public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, KafkaWriter<IN>> {private final String bootstrapServers;private final KafkaSerializationSchema<IN> serializationSchema;private final DeliveryGuarantee deliveryGuarantee;private final String transactionalIdPrefix;private final Duration kafkaProducerConfigCheckInterval;private final Properties kafkaProducerConfig;// 私有構造函數private KafkaSink(...) {// 參數初始化}// 構建器方法public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}@Overridepublic Writer<IN, KafkaCommittable, KafkaWriterState> createWriter(Sink.InitContext context,List<KafkaWriterState> states) throws IOException {// 創建 KafkaWriterreturn new KafkaWriter<>(bootstrapServers,serializationSchema,deliveryGuarantee,transactionalIdPrefix,context.metricGroup(),context.getUserCodeClassLoader(),states,kafkaProducerConfig,kafkaProducerConfigCheckInterval);}@Overridepublic Committer<KafkaCommittable> createCommitter() throws IOException {// 創建提交器return new KafkaCommitter(bootstrapServers,deliveryGuarantee,kafkaProducerConfig);}@Overridepublic GlobalCommitter<KafkaCommittable, KafkaGlobalCommittable> createGlobalCommitter() throws IOException {// 創建全局提交器return new KafkaGlobalCommitter(bootstrapServers,deliveryGuarantee,kafkaProducerConfig);}// 其他方法...
}

KafkaSinkBuilder 提供了流式配置接口,允許設置各種參數:

KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("topic1").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();

2.2 KafkaWriter 實現

KafkaWriter 是實際執行消息寫入的核心類:

// KafkaWriter.java
public class KafkaWriter<IN> implements SinkWriter<IN, KafkaCommittable, KafkaWriterState> {private final KafkaSerializationSchema<IN> serializationSchema;private final DeliveryGuarantee deliveryGuarantee;private final String transactionalIdPrefix;private final int subtaskId;private final int totalNumberOfSubtasks;private final KafkaProducer<byte[], byte[]> kafkaProducer;private final Map<Long, TransactionHolder> ongoingTransactions;private final List<TransactionHolder> pendingTransactions;private final List<TransactionHolder> completedTransactions;private final List<KafkaWriterState> recoveredStates;private final Duration producerConfigCheckInterval;private final Properties kafkaProducerConfig;private TransactionHolder currentTransaction;private long currentCheckpointId;public KafkaWriter(...) {// 初始化參數this.serializationSchema = serializationSchema;this.deliveryGuarantee = deliveryGuarantee;this.transactionalIdPrefix = transactionalIdPrefix;this.subtaskId = subtaskId;this.totalNumberOfSubtasks = totalNumberOfSubtasks;this.ongoingTransactions = new LinkedHashMap<>();this.pendingTransactions = new ArrayList<>();this.completedTransactions = new ArrayList<>();this.recoveredStates = recoveredStates;this.producerConfigCheckInterval = producerConfigCheckInterval;this.kafkaProducerConfig = kafkaProducerConfig;// 創建 KafkaProducerthis.kafkaProducer = createKafkaProducer();// 如果是精確一次語義,初始化事務if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {initializeTransactions();}}@Overridepublic void write(IN element, Context context) throws IOException {// 序列化消息ProducerRecord<byte[], byte[]> record = serializationSchema.serialize(element,context.timestamp(),context.partition(),context.topic());// 根據不同的語義保證寫入消息if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 在精確一次語義下,確保事務處于活動狀態ensureTransactionActive(context.currentProcessingTime());// 發送消息到 KafkakafkaProducer.send(record, (metadata, exception) -> {if (exception != null) {// 處理發送失敗的情況}});} else {// 在至少一次或最多一次語義下,直接發送消息kafkaProducer.send(record);}}@Overridepublic List<KafkaCommittable> prepareCommit(boolean flush) throws IOException {// 準備提交,返回待提交的事務List<KafkaCommittable> committables = new ArrayList<>();if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 對于精確一次語義,將當前事務標記為待提交if (currentTransaction != null) {pendingTransactions.add(currentTransaction);committables.add(currentTransaction.toCommittable());currentTransaction = null;}}return committables;}@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {// 快照當前狀態List<KafkaWriterState> states = new ArrayList<>();if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {// 對于精確一次語義,創建事務狀態快照if (currentTransaction != null) {states.add(currentTransaction.toWriterState());}}return states;}// 其他核心方法...
}

2.3 事務管理器實現

Flink Kafka 寫入連接器通過事務機制實現精確一次語義:

// TransactionHolder.java
public class TransactionHolder {private final String transactionalId;private final long checkpointId;private final KafkaProducer<byte[], byte[]> producer;private final boolean isRecovered;private boolean isAborted;public TransactionHolder(String transactionalId,long checkpointId,KafkaProducer<byte[], byte[]> producer,boolean isRecovered) {this.transactionalId = transactionalId;this.checkpointId = checkpointId;this.producer = producer;this.isRecovered = isRecovered;this.isAborted = false;}public void begin() {producer.beginTransaction();}public void commit() {if (!isAborted) {producer.commitTransaction();}}public void abort() {if (!isAborted) {producer.abortTransaction();isAborted = true;}}// 轉換為可提交對象public KafkaCommittable toCommittable() {return new KafkaCommittable(transactionalId, checkpointId, isRecovered);}// 轉換為寫入器狀態public KafkaWriterState toWriterState() {return new KafkaWriterState(transactionalId, checkpointId);}// 其他方法...
}

三、精確一次語義實現

Flink Kafka 寫入連接器通過 Kafka 的事務 API 實現精確一次語義:

3.1 事務初始化

// KafkaWriter.java
private void initializeTransactions() {// 恢復之前的事務if (!recoveredStates.isEmpty()) {for (KafkaWriterState state : recoveredStates) {String transactionalId = state.getTransactionalId();long checkpointId = state.getCheckpointId();// 創建恢復的事務KafkaProducer<byte[], byte[]> producer = createTransactionalProducer(transactionalId);TransactionHolder recoveredTransaction = new TransactionHolder(transactionalId, checkpointId, producer, true);ongoingTransactions.put(checkpointId, recoveredTransaction);}// 按檢查點 ID 排序List<Long> sortedCheckpointIds = new ArrayList<>(ongoingTransactions.keySet());Collections.sort(sortedCheckpointIds);// 恢復事務狀態for (long checkpointId : sortedCheckpointIds) {TransactionHolder transaction = ongoingTransactions.get(checkpointId);try {transaction.producer.initTransactions();} catch (ProducerFencedException e) {// 處理異常}}// 創建新的當前事務createNewTransaction();} else {// 如果沒有恢復的狀態,直接創建新事務createNewTransaction();}
}

3.2 消息寫入與事務管理

// KafkaWriter.java
private void ensureTransactionActive(long currentTime) {// 檢查是否需要創建新事務if (currentTransaction == null) {createNewTransaction();}// 檢查生產者配置是否需要更新if (producerConfigCheckInterval != null && currentTime - lastProducerConfigCheckTime >= producerConfigCheckInterval.toMillis()) {checkAndRecreateProducerIfNeeded();lastProducerConfigCheckTime = currentTime;}
}private void createNewTransaction() {// 生成新的事務 IDString transactionalId = generateTransactionalId();currentCheckpointId++;// 創建新的事務生產者KafkaProducer<byte[], byte[]> producer = createTransactionalProducer(transactionalId);// 初始化事務producer.initTransactions();// 創建事務持有者currentTransaction = new TransactionHolder(transactionalId, currentCheckpointId, producer, false);// 開始事務currentTransaction.begin();// 記錄正在進行的事務ongoingTransactions.put(currentCheckpointId, currentTransaction);
}

3.3 事務提交與恢復

// KafkaCommitter.java
public class KafkaCommitter implements Committer<KafkaCommittable> {private final DeliveryGuarantee deliveryGuarantee;private final Properties kafkaProducerConfig;private transient Map<String, KafkaProducer<byte[], byte[]>> producers;public KafkaCommitter(String bootstrapServers,DeliveryGuarantee deliveryGuarantee,Properties kafkaProducerConfig) {this.deliveryGuarantee = deliveryGuarantee;this.kafkaProducerConfig = new Properties();this.kafkaProducerConfig.putAll(kafkaProducerConfig);this.kafkaProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);}@Overridepublic List<KafkaCommittable> commit(List<KafkaCommittable> committables) throws IOException {List<KafkaCommittable> failedCommittables = new ArrayList<>();for (KafkaCommittable committable : committables) {try {// 獲取或創建生產者KafkaProducer<byte[], byte[]> producer = getOrCreateProducer(committable.getTransactionalId());// 如果是恢復的事務,需要先初始化if (committable.isRecovered()) {producer.initTransactions();}// 提交事務producer.commitTransaction();} catch (Exception e) {// 記錄失敗的提交failedCommittables.add(committable);}}return failedCommittables;}// 其他方法...
}

四、性能優化與調優

Flink Kafka 寫入連接器提供了多種性能優化選項:

4.1 批量寫入配置

// 在構建 KafkaSink 時配置批量寫入參數
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("batch.size", "16384")      // 批次大小,單位字節.setProperty("linger.ms", "5")          // 等待時間,增加批處理機會.setProperty("buffer.memory", "33554432") // 生產者緩沖區大小.build();

4.2 壓縮配置

// 配置消息壓縮
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("compression.type", "lz4") // 壓縮類型:none, gzip, snappy, lz4, zstd.build();

4.3 異步發送配置

// 配置異步發送參數
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("max.in.flight.requests.per.connection", "5") // 每個連接允許的最大未完成請求數.setProperty("acks", "all") // 確認模式:0, 1, all.build();

五、錯誤處理與恢復機制

Flink Kafka 寫入連接器提供了完善的錯誤處理和恢復機制:

5.1 重試機制

// 配置生產者重試參數
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(...).setProperty("retries", "3")                 // 重試次數.setProperty("retry.backoff.ms", "100")      // 重試退避時間.setProperty("delivery.timeout.ms", "120000") // 消息傳遞超時時間.build();

5.2 異常處理

// KafkaWriter.java
private void handleSendException(ProducerRecord<byte[], byte[]> record, Exception exception) {// 記錄異常信息LOG.error("Error sending record to Kafka: {}", record, exception);// 根據異常類型進行不同處理if (exception instanceof RetriableException) {// 可重試異常,記錄重試次數retryCount++;if (retryCount > maxRetries) {// 超過最大重試次數,拋出異常throw new IOException("Failed to send record after retries", exception);}// 重試發送kafkaProducer.send(record, this::handleSendResult);} else {// 不可重試異常,立即拋出throw new IOException("Failed to send record", exception);}
}

六、總結

Flink Kafka 寫入連接器通過精心設計的架構和實現,提供了高性能、可靠且靈活的 Kafka 數據寫入能力。其核心組件包括寫入器、序列化器和事務管理器,共同實現了精確一次語義、批量寫入和錯誤恢復等關鍵特性。通過深入理解其源碼實現,開發者可以更好地使用和調優該連接器,滿足不同場景下的數據處理需求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/86590.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/86590.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/86590.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

強化學習理論基礎:從Q-learning到PPO的算法演進(2)

文章目錄 Policy gradient思想(REINFORCE算法)優勢函數PPO(Proximal Policy Optimization)Policy gradient思想(REINFORCE算法) 下面我們來探討一下Policy gradient策略,也就是REINFORCE算法。 在玩剪刀石頭布這個簡單的游戲中,我們可以有不同的策略。一種是完全隨機地…

Oracle數據庫文件變成32k故障恢復--惜分飛

最近一個客戶數據庫重啟系統之后,數據文件大小變為了32kb,我接手的不是第一現場(客戶那邊嘗試了rman還原操作),查看alert日志,數據庫最初報錯 Wed Jun 18 13:09:23 2025 alter database open Block change tracking file is current. Read of datafile D:\APP\ADMINISTRATOR\OR…

移動端 uniapp 寫一個可自由拖拽的小鍵盤

寫之前要考慮&#xff1a; 鍵盤展開后&#xff0c;不能超過手機邊緣在底部展開鍵盤&#xff0c;鍵盤應出現在展開按鈕上方&#xff1b;以此類推重復點擊展開按鈕&#xff0c;關閉鍵盤 效果&#xff1a; 代碼如下&#xff0c;有些按鍵邏輯還需要優化 <template><vi…

《二分枚舉答案(配合數據結構)》題集

文章目錄 1、模板題集2、課內題集3、課后題集1. 字符串哈希2. 并查集3. ST表 1、模板題集 分巧克力 2、課內題集 倒水 冶煉金屬 連續子序列的個數 3、課后題集 括號內的整數代表完整代碼行數。 1. 字符串哈希 你猜猜是啥題(60) 2. 并查集 拯救萌萌(72) 3. ST表 GCD不小…

PY32F030單片機,優勢替代ST GD,主頻48MHz,帶LED數碼管驅動

PY32F030是一款高性能32位單片機&#xff0c;采用ARM Cortex-M0內核&#xff0c;工作頻率高達48MHz&#xff0c;具備64KB Flash和8KB SRAM。它支持1.7V~5.5V寬電壓范圍&#xff0c;集成多路I2C、SPI、USART通訊外設&#xff0c;配備12位ADC、16位定時器和比較器&#xff0c;適用…

Rockchip Uboot中修改固件探測的存儲介質

Rockchip Uboot中修改固件探測的存儲介質 Rockchip uboot中支持從 eMMC、SDcard、NAND 、SPI_NAND、SPI_NOR等存儲介質引導固件。 uboot的spl啟動的時候會默認呢都會去探測這些介質&#xff0c;這樣會導致探測時間變長&#xff0c;在實際產品中可以根據產品需求進行個性化的配…

動手學Python:從零開始構建一個“文字冒險游戲”

動手學Python&#xff1a;從零開始構建一個“文字冒險游戲” 大家好&#xff0c;我是你的技術向導。今天&#xff0c;我們不聊高深的框架&#xff0c;也不談復雜的算法&#xff0c;我們來做一點“復古”又極具趣味性的事情——用Python親手打造一個屬于自己的文字冒險游戲&…

基于Kafka實現企業級大數據遷移的完整指南

在大數據時代&#xff0c;數據遷移已成為企業數字化轉型過程中的常見需求。本文將詳細介紹如何利用Kafka構建高可靠、高性能的大數據遷移管道&#xff0c;涵蓋從設計到實施的完整流程。 一、為什么選擇Kafka進行數據遷移&#xff1f; Kafka作為分布式消息系統&#xff0c;具有…

GEO引領品牌大模型種草:邁向Web3.0與元宇宙的認知新空間

在數字技術的演進歷程中&#xff0c;我們正經歷著從Web2.0到Web3.0、從平面互聯網到沉浸式元宇宙的范式轉變。這一轉變不僅重塑了數字空間的形態和交互方式&#xff0c;更深刻改變了品牌與用戶的連接模式和價值創造邏輯。而在這個新興的數字疆域中&#xff0c;生成式引擎優化&a…

【機器學習與數據挖掘實戰 | 醫療】案例18:基于Apriori算法的中醫證型關聯規則分析

【作者主頁】Francek Chen 【專欄介紹】 ? ? ?機器學習與數據挖掘實戰 ? ? ? 機器學習是人工智能的一個分支,專注于讓計算機系統通過數據學習和改進。它利用統計和計算方法,使模型能夠從數據中自動提取特征并做出預測或決策。數據挖掘則是從大型數據集中發現模式、關聯…

83、高級特性-自定義starter細節

83、高級特性-自定義starter細節 自定義Spring Boot Starter可以將通用功能封裝成可復用的模塊&#xff0c;簡化其他項目的配置和使用。以下是創建自定義Starter的詳細步驟和關鍵細節&#xff1a; ### 1. 項目結構 通常&#xff0c;自定義Starter包含兩個模塊&#xff1a; ####…

專注推理查詢(ARQs):一種提升大型語言模型指令遵循度、決策準確性和防止幻覺的結構化方法

大型語言模型&#xff08;LLMs&#xff09;在客戶服務、自動化內容創作和數據檢索方面變得至關重要。然而&#xff0c;它們的有效性常常因其在多次交互中無法始終如一地遵循詳細指令而受到限制。在金融服務和客戶支持系統等高風險環境中&#xff0c;嚴格遵循指南是必不可少的&a…

華為云Flexus+DeepSeek征文 | DeepSeek驅動的醫療AI Agent:智能問診系統開發完整指南

華為云FlexusDeepSeek征文 | DeepSeek驅動的醫療AI Agent&#xff1a;智能問診系統開發完整指南 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 總有一行代碼&#xff0c;能點亮萬千星辰。 &#x1f50d; 在技術的宇宙中&#xff0c;我愿做永不停歇…

【大模型水印論文閱讀2】前綴文本編碼、均勻性約束

TOC &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f680; 感謝你的陪伴與支持~ 歡迎添加文末好友 &#x1f30c; 在所有感興趣的領域擴展知識&#xff0c;不定期掉落福利資訊(*^▽^*) 寫在最前面 版權聲明&#xff1a;本文為原創&#xff0c;遵循 CC 4.0 BY-SA 協議。…

破繭時刻,與光同行

凌晨五點的鬧鐘刺破薄霧&#xff0c;我摸黑打開臺燈。攤開的數學錯題本上&#xff0c;函數圖像在暖黃的光暈里舒展&#xff0c;像等待破譯的密碼。這樣的清晨已持續三百多個日夜&#xff0c;我知道&#xff0c;在無數個相似的時刻里&#xff0c;總有千萬盞臺燈在黑暗中次第亮起…

Learning PostgresSQL讀書筆記: 第8章 Triggers and Rules

本章將討論以下內容&#xff1a; ? 探索 PostgreSQL 中的規則 ? 管理 PostgreSQL 中的觸發器 ? 事件觸發器 探索 PostgreSQL 中的規則 文檔中的這段話闡述了rule和trigger的區別&#xff1a; PostgreSQL 規則系統允許定義在數據庫表中插入、更新或刪除時執行的替代操作。粗…

信創國產化替代中的開發語言選擇分析

在信息技術應用創新(信創)國產化替代過程中&#xff0c;選擇合適的開發語言至關重要。以下是適合信創環境的開發語言及其優勢分析&#xff1a; 主流適合信創的編程語言 1. Java 優勢&#xff1a;跨平臺特性(JVM)、豐富的生態體系、企業級應用成熟 信創適配&#xff1a;國內有…

Android 中 函數實現多個返回值的幾種方式

在編程中&#xff0c;函數通常只能返回一個值。但通過使用對象封裝、Pair、Triple、數組、列表或 Bundle 方式&#xff0c;可以輕松地返回多個值。 1、對象封裝方式 創建數據類來封裝需要返回的多個值。 data class Result(val code: Int, val message: String)fun getMultiV…

Leetcode百題斬-DP

又到了最好玩的dp了&#xff0c;各種玄學轉移也算是其樂無窮。前段時間剛做的LCA正是這種題的小試牛刀&#xff0c;如果當時就把這個專題刷完了&#xff0c;或許我現在已經從西溪園區跑到云谷園區了。 不過&#xff0c;恐怖如斯的dp專題居然只給了一道hard&#xff0c;基本也沒…

策略模式與工廠模式的黃金組合:從設計到實戰

策略模式和工廠模式是軟件開發中最常用的兩種設計模式&#xff0c;當它們結合使用時&#xff0c;能產生11>2的效果。本文將通過實際案例&#xff0c;闡述這兩種模式的協同應用&#xff0c;讓代碼架構更優雅、可維護性更強。 一、為什么需要組合使用&#xff1f; 單獨使用的…