Kafka源碼P2-生產者緩沖區

歡迎來到啾啾的博客🐱。
記錄學習點滴。分享工作思考和實用技巧,偶爾也分享一些雜談💬。
有很多很多不足的地方,歡迎評論交流,感謝您的閱讀和評論😄。

目錄

  • 1 引言
  • 2 緩沖區
    • 2.1 消息在Partition內有序
    • 2.2 批次消息ProducerBatch
      • 2.2.1 內存分配
      • 2.2.2 線程安全
  • 3 發送消息Sender
  • 4 總結

1 引言

繼續看Kafka源碼,看其是如何批量發送消息的。

2 緩沖區

當調用producer.send(record)時,消息將先到緩沖區,在緩沖區按照目標的Topic-Partition進行組織,滿足以條件后隨批次發送給Broker。

// KafkaProducer.java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// ... 省略了部分代碼 ...return doSend(record, callback); // 轉交給 doSend 方法
}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {// ...// 1. 等待元數據更新(如果需要的話)// ...// 2.【核心步驟】調用 RecordAccumulator 的 append 方法RecordAccumulator.RecordAppendResult result = accumulator.append(tp,timestamp, key, value, headers, interceptors, remainingWaitMs);// ...// 3. 喚醒 Sender 線程,告訴他“可能有新活兒干了”this.sender.wakeup();// ...return result.future;
}

我們看一下“緩沖區”RecordAccumulator。
![[Kafka源碼P2-緩沖區.png]]

2.1 消息在Partition內有序

RecordAccumulator維護了一個數據結構:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

append代碼簡化如下:

// RecordAccumulator.java
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, ...) {// 1. 獲取該分區的批次隊列 batches中獲取,沒有則創建Deque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) { // 對該分區的隊列加鎖,保證線程安全// 2. 嘗試追加到最后一個(當前活躍的)批次中ProducerBatch last = dq.peekLast();if (last != null) {FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, ...);if (future != null) {// 如果追加成功,直接返回return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false);}}// 3. 如果最后一個批次滿了,或者不存在,就需要一個新的批次// 從 BufferPool 申請一塊內存,大小由 batch.size 配置決定ByteBuffer buffer = free.allocate(batchSize, maxTimeToBlock);// 4. 創建一個新的 ProducerBatch (貨運箱)ProducerBatch batch = new ProducerBatch(tp, memoryRecordsBuilder, now);FutureRecordMetadata future = batch.tryAppend(timestamp, key, value, headers, ...); // 把當前消息放進去// 5. 將新的批次加入到隊列的末尾dq.addLast(batch);// ...return new RecordAppendResult(future, ...);}
}

可以看到batches的value類型為Deque,所以生產者可以維護發送時partition內的順序結構。
但是在網絡抖動時這樣做還是不夠,時序性還是難以保障,所以生產者還有別的配置:
每個連接上允許發送的未確認請求的最大數量

max.in.flight.requests.per.connection
  • 當 max.in.flight.requests.per.connection = 1 時,Sender 線程在發送完 Batch-1 后,會阻塞自己,直到 Batch-1 的請求得到響應(成功或失敗),它絕不會在此期間發送 Batch-2。這樣一來,即使 Batch-1 需要重試,Batch-2 也只能乖乖地在后面排隊。這就從根本上杜絕了因重試導致亂序的可能。
  • 默認 max.in.flight.requests.per.connection = 5。即它允許 Producer 在還沒收到 Batch-1 的 ACK 時,就繼續發送 Batch-2、3、4、5。這極大地提升了吞吐量(不用傻等),但犧牲了順序性。

一般max.in.flight.requests.per.connection還需要與生產者冪等性配合。

enable.idempotence = true

開啟冪等后,Producer 會被分配一個唯一的 Producer ID (PID),并且它發送的每一批消息都會帶上一個從0開始遞增的序列號。Broker 端會為每個 TopicPartition 維護這個 PID 和序列號。如果收到的消息序列號不是預期的下一個,Broker 就會拒絕它。

// NetworkClient.java// 這個方法判斷我們是否可以向某個節點發送更多數據
@Override
public boolean isReady(Node node, long now) {// ... 省略了連接狀態的檢查 ...// 檢查在途請求數是否小于該連接配置的上限return !connectionStates.isBlackedOut(node.idString(), now) &&canSendRequest(node.idString(), now);
}// canSendRequest 方法內部會調用 inFlightRequests.canSendMore()
// InFlightRequests.java
public boolean canSendMore(String nodeId) {// this.requests 是一個 Map<String, Deque<NetworkClient.InFlightRequest>>// 它記錄了每個節點上所有在途(已發送但未收到響應)的請求Deque<InFlightRequest> queue = requests.get(nodeId);// 如果隊列為空,當然可以發送if (queue == null) {return true;}// 將在途請求數 與 從配置中讀到的max.in.flight.requests.per.connection比較// this.maxInFlightRequestsPerConnection 就是你配置的那個值return queue.size() < this.maxInFlightRequestsPerConnection;
}

2.2 批次消息ProducerBatch

可以看到在結構private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches中,批次消息被封裝為ProducerBatch。

![[Kafka源碼P2-緩沖區-1.png]]

2.2.1 內存分配

這個類的核心是MemoryRecordsBuilder。
總所周知,頻繁地創建和銷毀對象,特別是大塊的byte[]對GC非常不友好。MemoryRecordsBuilder內部管理者一個巨大的、連續的ByteBuffer。
這個 ByteBuffer 不是每次創建 ProducerBatch 時都 new 出來的。它是在 RecordAccumulator 初始化時(我們在上面的RecordAccumulator中有看到BufferPool),從一個叫 BufferPool 的內存池中借用 (allocate) 的。當 ProducerBatch 發送完畢,這塊內存會歸還 (deallocate) 給池子,供下一個 ProducerBatch 復用
當你調用 tryAppend 添加消息時,消息的 key, value 等內容被直接序列化成字節,并寫入到這個 ByteBuffer 的末尾。它不是在發送時才做序列化,而是在追加時就完成了。

池化:對于那些需要頻繁創建和銷毀的、生命周期短暫的、昂貴的對象(如數據庫連接、線程、大塊內存),一定要使用池化技術。這能極大地降低GC壓力,提升系統穩定性。

Redis和Kafka都有共同的高頻內存使用的特性,也都設計了預分配和復用。Kafka生產者與其用多少申請多少,不如一次性申請一塊大內存,然后通過內部的指針移動(position, limit)來管理這塊內存的使用。

2.2.2 線程安全

ProducerBatch 會被多個線程訪問:

  • 你的業務線程(Producer主線程):調用 tryAppend() 往里面寫數據。
  • Sender 線程:檢查它是否已滿 (isFull)、是否超時 (isExpired),并最終把它發送出去。

ProducerBatch 內部有一個精密的“狀態機”,并用 volatile 和 synchronized 保護。

// ProducerBatch.java (簡化后)
private final List<Thunk> thunks;
private final MemoryRecordsBuilder recordsBuilder;// 【關鍵狀態】volatile 保證了多線程間的可見性
private volatile boolean closed; 
private int appends; // 記錄追加次數public FutureRecordMetadata tryAppend(...) {// 【關鍵檢查】在方法入口處檢查狀態,快速失敗if (this.closed) {return null; }// ... 將消息寫入 recordsBuilder ...// ...
}// 這個方法會被 Sender 線程調用
public void close() {this.closed = true;
}// 當批次被確認后,由 Sender 線程調用
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {// for-each 循環是線程安全的,因為 thunks 列表在 close 之后就不再被修改for (Thunk thunk : this.thunks) {try {// 【核心】執行每個 send() 調用對應的回調函數thunk.callback.onCompletion(metadata, exception);} catch (Exception e) {// ...}}
}

總的來說是職責分離+最小化鎖的設計以保證線程安全。

3 發送消息Sender

Sender 是一個實現了 Runnable 接口的類,它在一個獨立的線程里無限循環,最終發送消息。

// Sender.java
public void run() {while (running) {try {runOnce();} catch (Exception e) {// ...}}
}void runOnce() {// ...// 1. 【核心】找出所有可以發送的批次// linger.ms 決定了可以等待的最長時間RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// 2. 如果有準備好的節點(分區),就發送它們if (!result.readyNodes.isEmpty()) {// ...// 從累加器中“榨干”所有準備好的批次Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, ...);// ...// 將批次轉換成網絡請求并發送sendProducerData(now, ... , batches);}// ...
}

RecordAccumulator.ready() 方法是決定何時發送的關鍵。它會遍歷所有的 ProducerBatch,滿足以下任意一個條件的批次,就會被認為是 “ready”(準備就緒):

  1. 批次已滿:批次大小達到了 batch.size。
  2. 等待超時:批次從創建到現在,等待的時間超過了 linger.ms。
  3. 其他原因:比如 Producer 被關閉,或者有新的 Producer 加入導致需要立即發送等。

Sender 的工作模式是:
不斷地問accumulator.ready()有到linger.ms時間的或者裝滿batch.size的批次沒有。然后依據節點列表,通過NetworkClient發送ProducerBatch到Kafka Broker。

4 總結

Kafka Producer 在客戶端內部通過 RecordAccumulator 維護了一個按 TopicPartition 分類的內存緩沖區。當用戶調用 send() 方法時,消息并不會立即發送,而是被追加到對應分區的某個 ProducerBatch 中。一個獨立的 Sender 線程在后臺運行,它會持續檢查 RecordAccumulator 中的批次,一旦某個批次滿足了“大小達到 batch.size”或“等待時間超過 linger.ms”這兩個條件之一,Sender 線程就會將這個批次以及其他所有準備好的批次一同取出,打包成一個請求,通過網絡一次性發送給 Broker,從而實現批量發送,極大地提升了吞吐能力。

這個設計是經典的 “空間換時間”“攢一批再處理” 的思想,通過犧牲一點點延遲(linger.ms),換取了巨大的吞吐量提升。理解了這個機制,你就能更好地去配置 batch.size 和 linger.ms 這兩個核心參數,以平衡你的業務對吞吐和延遲的需求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/85270.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/85270.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/85270.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

力扣網C語言編程題:三數之和

一. 簡介 本文記錄力扣網上的邏輯編程題&#xff0c;涉及數組方面的&#xff0c;這里記錄一下 C語言實現和Python實現。 二. 力扣網C語言編程題&#xff1a;三數之和 題目&#xff1a;三數之和 給你一個整數數組 nums &#xff0c;判斷是否存在三元組 [nums[i], nums[j], nu…

2.2 Windows MSYS2編譯FFmpeg 4.4.1

一、安裝編譯工具 # 更換pacman源 sed -i "s#mirror.msys2.org/#mirrors.ustc.edu.cn/msys2/#g" /etc/pacman.d/mirrorlist* pacman -Sy# 安裝依賴 pacman -S --needed base-devel mingw-w64-x86_64-toolchain pacman -S mingw-w64-x86_64-nasm mingw-w64-x86_64-ya…

驅動開發,隊列,環形緩沖區:以GD32 CAN 消息處理為例

對環形緩沖區進行進一步的優化和功能擴展&#xff0c;以應對更復雜的實際應用場景&#xff0c;特別是針對 CAN 總線消息處理的場景。 一、優化點 1&#xff1a;動態配置環形緩沖區大小在原始實現中&#xff0c;我們固定了緩沖區大小為 RINGBUFF_LEN 64。這種方式雖然簡單&am…

SQL基礎知識,MySQL學習(長期更新)

1、基本操作&#xff0c;增刪查改 INSERT INTO 表名 (字段1, 字段2, ...) VALUES (值1, 值2, ...); DELETE FROM 表名 WHERE 條件 SELECT * FROM 表名 WHERE 條件 UPDATE 表名 SET 字段1 值, 字段2 值, ... WHERE 條件; SELECT * INTO 新表 FROM 舊表 WHERE… INSERT INTO 語…

Git(一):初識Git

文章目錄 Git(一)&#xff1a;初識GitGit簡介核心功能分布式特性結構與操作優勢與適用場景 創建本地倉庫git init配置name與email--global 工作區、暫存區與版本庫git addgit commitcommit后.git的變化 Git(一)&#xff1a;初識Git Git簡介 Git 是一個分布式版本控制系統&…

第19天:初級數據庫學習筆記3

分組函數&#xff08;多行處理函數&#xff09; 即多個輸入對應一個輸出。前面講的數據處理函數是單行處理函數。&#xff08;在公司中常說單&#xff0c;多行處理函數&#xff09; 分組函數包括五個&#xff1a; max&#xff1a;最大值min&#xff1a;最小值avg&#xff1a…

Windows11下搭建Raspberry Pi Pico編譯環境

1. 系統與工具要求 PC平臺&#xff1a; Windows 11 專業版 Windows GCC: gcc-15.1.0-64.exe GNU Make: 4.3 Git: 2.49.0 cmake: 4.0.2 python:3.12.11 Arm GNU Toolchain Downloads – Arm Developer 2. 工具安裝與驗證 2.1 工具安裝 winget安裝依賴工具&#xff08;Windows …

【C語言極簡自學筆記】重講運算符

一、算術操作符 算術操作符描述把兩個操作數相加-第一個操作數減去第二個操作數*把兩個操作數相乘/分子除以分母%取模運算符&#xff0c;整除后的余數 注意&#xff1a;1.除號的兩端都是整數的時候執行的是整數的除法&#xff0c;兩端只要有一個浮點數&#xff0c;就執行浮點…

持續集成 CI/CD-Jenkins持續集成GitLab項目打包docker鏡像推送k8s集群并部署至rancher

Jenkins持續集成GitLab項目 GitLab提交分支后觸發Jenkis任務 之前是通過jar包在shell服務器上進行手動部署&#xff0c;麻煩且耗時。現通過Jenkins進行持續集成實現CI/CD。以test分支為例 提交即部署。 由于是根據自己實際使用過程 具體使用到了 gitlabjenkinsdockerharborra…

Apache Iceberg與Hive集成:非分區表篇

引言 在大數據處理領域&#xff0c;Apache Iceberg憑借其先進的表格式設計&#xff0c;為大規模數據分析帶來了新的可能。當Iceberg與Hive集成時&#xff0c;這種強強聯合為數據管理與分析流程提供了更高的靈活性和效率。本文將聚焦于Iceberg與Hive集成中的非分區表場景&#…

webpack 如何區分開發環境和生產環境

第一種方法: 方法出處&#xff1a;命令行接口&#xff08;CLI&#xff09; | webpack 中文文檔 1.利用webpack.config.js 返回的是個函數&#xff0c;利用函數的參數&#xff0c;來區分環境 具體步驟 1&#xff09; package.json文件&#xff1a;在npm scripts 命令后面追加 …

React組件通信——context(提供者/消費者)

Context 是 React 提供的一種組件間通信方式&#xff0c;主要用于解決跨層級組件 props 傳遞的問題。它允許數據在組件樹中"跨級"傳遞&#xff0c;無需顯式地通過每一層 props 向下傳遞。 一、Context 核心概念 1. 基本組成 React.createContext&#xff1a;創建 C…

“微信短劇小程序開發指南:從架構設計到上線“

1. 引言&#xff1a;短劇市場的機遇與挑戰 近年來&#xff0c;短視頻和微短劇市場呈現爆發式增長&#xff0c;用戶碎片化娛樂需求激增。短劇小程序憑借輕量化、社交傳播快、變現能力強等特點&#xff0c;成為內容創業的新風口。然而&#xff0c;開發一個穩定、流暢且具備商業價…

RPC與RESTful對比:兩種API設計風格的核心差異與實踐選擇

# RPC與RESTful對比&#xff1a;兩種API設計風格的核心差異與實踐選擇 ## 一、架構哲學與設計目標差異 1. **RPC&#xff08;Remote Procedure Call&#xff09;** - **核心思想**&#xff1a;將遠程服務調用偽裝成本地方法調用&#xff08;方法導向&#xff09; - 典型行為…

【pytest進階】pytest之鉤子函數

什么是 hook (鉤子)函數 經常會聽到鉤子函數(hook function)這個概念,最近在看目標檢測開源框架mmdetection,里面也出現大量Hook的編程方式,那到底什么是hook?hook的作用是什么? what is hook ?鉤子hook,顧名思義,可以理解是一個掛鉤,作用是有需要的時候掛一個東西…

深度學習計算——動手學深度學習5

環境&#xff1a;PyCharm python3.8 1. 層和塊 塊&#xff08;block&#xff09;可以描述 單個層、由多個層組成的組件或整個模型本身。 使用塊進行抽象的好處&#xff1a; 可將塊組合成更大的組件(這一過程通常是遞歸) 如 圖5.1.1所示。通過定義代碼來按需生成任意復雜度…

NodeJS的fs模塊的readFile和createReadStream區別以及常見方法

Node.js 本身沒有像 Java 那樣嚴格區分字符流和字節流&#xff0c;區別主要靠編碼&#xff08;encoding&#xff09;來控制數據是以 Buffer&#xff08;二進制字節&#xff09;形式還是字符串&#xff08;字符&#xff09;形式處理。 詳細解釋&#xff1a; 方面JavaNode.js字節…

基于二進制XOR運算的機器人運動軌跡與對稱圖像自動生成算法

原創&#xff1a;項道德&#xff08;daode3056,daode1212&#xff09; 新的算法出現&#xff0c;往往能給某些行業與產業帶來革命與突破。為探索機器人運動軌跡與對稱圖像自動生成算法&#xff0c;本人已經通過18種算法的測試&#xff0c;最終&#xff0c;以二進制的XOR運算為…

Spring AI 項目實戰(七):Spring Boot + Spring AI Tools + DeepSeek 智能工具平臺(附完整源碼)

系列文章 序號文章名稱1Spring AI 項目實戰(一):Spring AI 核心模塊入門2Spring AI 項目實戰(二):Spring Boot + AI + DeepSeek 深度實戰(附完整源碼)3Spring AI 項目實戰(三):Spring Boot + AI + DeepSeek 打造智能客服系統(附完整源碼)4Spring AI 項目實戰(四…

spring-webmvc @RequestHeader 典型用法

典型用法 基礎用法&#xff1a;獲取指定請求頭值 GetMapping("/info") public String getInfo(RequestHeader("User-Agent") String userAgent) {return "User-Agent: " userAgent; }如果請求中包含 User-Agent 請求頭&#xff0c;則其值將被…