Kafka數據寫入流程源碼深度剖析(Broker篇)

在Kafka數據寫入流程中,Broker端負責接收客戶端發送的消息,并將其持久化存儲,是整個流程的關鍵環節。本文將深入Kafka Broker的源碼,詳細解析消息接收、處理和存儲的具體實現。

一、網絡請求接收與解析

Broker通過Processor線程池接收來自客戶端的網絡請求,Processor線程基于Java NIO的Selector實現非阻塞I/O,負責監聽網絡連接和讀取數據。其核心處理邏輯如下:

public class Processor implements Runnable {private final Selector selector;private final KafkaApis kafkaApis;public Processor(Selector selector, KafkaApis kafkaApis) {this.selector = selector;this.kafkaApis = kafkaApis;}@Overridepublic void run() {while (!stopped) {try {// 輪詢獲取就緒的網絡事件selector.poll(POLL_TIMEOUT);Set<SelectionKey> keys = selector.selectedKeys();for (SelectionKey key : keys) {if (key.isReadable()) {// 讀取網絡數據NetworkReceive receive = selector.read(key);if (receive != null) {// 處理接收到的請求kafkaApis.handle(receive);}}}} catch (Exception e) {log.error("Processor failed to process requests", e);}}}
}

Selector檢測到有可讀事件時,會從對應的SocketChannel中讀取數據,并封裝成NetworkReceive對象,然后傳遞給KafkaApis進行進一步處理。

KafkaApis是Broker處理請求的核心組件,它根據請求類型調用相應的處理器:

public class KafkaApis {private final Map<ApiKeys, RequestHandler> requestHandlers;public KafkaApis(Map<ApiKeys, RequestHandler> requestHandlers) {this.requestHandlers = requestHandlers;}public void handle(NetworkReceive receive) {try {// 解析請求頭RequestHeader header = RequestHeader.parse(receive.payload());ApiKeys apiKey = ApiKeys.forId(header.apiKey());// 獲取對應的請求處理器RequestHandler handler = requestHandlers.get(apiKey);if (handler != null) {// 處理請求handler.handle(receive);} else {// 處理未知請求類型handleUnknownRequest(header, receive);}} catch (Exception e) {// 處理請求解析和處理過程中的異常handleException(receive, e);}}
}

對于生產者發送的消息寫入請求(ApiKeys.PRODUCE),會由ProduceRequestHandler進行處理。

二、消息寫入處理與驗證

ProduceRequestHandler負責處理生產者發送的消息寫入請求,其核心職責包括驗證請求合法性、將消息寫入對應分區日志以及生成響應。關鍵處理邏輯如下:

public class ProduceRequestHandler implements RequestHandler {private final LogManager logManager;private final ReplicaManager replicaManager;public ProduceRequestHandler(LogManager logManager, ReplicaManager replicaManager) {this.logManager = logManager;this.replicaManager = replicaManager;}@Overridepublic void handle(NetworkReceive receive) {try {// 解析ProduceRequestProduceRequest request = ProduceRequest.parse(receive.payload());// 驗證請求版本和元數據validateRequest(request);// 處理每個分區的消息Map<TopicPartition, PartitionData> partitionDataMap = new HashMap<>();for (Map.Entry<TopicPartition, MemoryRecords> entry : request.data().entrySet()) {TopicPartition tp = entry.getKey();MemoryRecords records = entry.getValue();// 獲取分區日志Log log = logManager.getLog(tp);if (log != null) {// 將消息追加到日志LogAppendInfo appendInfo = log.append(records);// 記錄分區數據信息partitionDataMap.put(tp, new PartitionData(appendInfo.offset(), appendInfo.logAppendTime()));} else {// 處理分區不存在的情況partitionDataMap.put(tp, new PartitionData(RecordBatch.NO_OFFSET, -1L));}}// 構建響應ProduceResponse response = new ProduceResponse(request.version(), request.correlationId(), partitionDataMap);// 發送響應sendResponse(response, receive);} catch (Exception e) {// 處理請求處理過程中的異常handleException(receive, e);}}
}

在上述代碼中,validateRequest方法會對請求的版本、主題和分區的合法性進行檢查;log.append方法將消息追加到對應分區的日志文件中;最后根據處理結果構建ProduceResponse響應,并發送回給生產者。

三、消息持久化存儲

Kafka使用日志(Log)來持久化存儲消息,每個分區對應一個日志實例。Log類負責管理日志文件、分段以及消息的讀寫操作,其核心的消息追加方法如下:

public class Log {private final LogSegmentManager segmentManager;// 省略其他成員變量public LogAppendInfo append(MemoryRecords records) throws IOException {try {// 獲取當前活躍的日志分段LogSegment segment = segmentManager.activeSegment();long offset = segment.sizeInBytes();long baseOffset = segment.baseOffset();// 將消息追加到日志分段long appended = segment.append(records);// 更新日志元數據updateHighWatermark(segment);// 返回追加信息return new LogAppendInfo(baseOffset + offset, time.milliseconds());} catch (Exception e) {// 處理寫入異常handleWriteException(e);throw e;}}
}

LogSegment類表示一個日志分段,它包含了日志文件、索引文件等,具體的消息寫入操作在LogSegmentappend方法中完成:

public class LogSegment {private final FileMessageSet fileMessageSet;// 省略其他成員變量public long append(MemoryRecords records) throws IOException {// 計算寫入位置long position = fileMessageSet.sizeInBytes();// 將消息寫入文件long written = fileMessageSet.append(records);// 更新索引updateIndex(records.sizeInBytes(), position);return written;}
}

FileMessageSet類負責實際的文件I/O操作,它利用Java NIO的FileChannel實現高效的磁盤寫入,并且支持零拷貝技術,進一步提升寫入性能:

public class FileMessageSet {private final FileChannel fileChannel;// 省略其他成員變量public long append(MemoryRecords records) throws IOException {try (FileLock lock = fileChannel.lock()) {// 使用零拷貝技術寫入數據long written = fileChannel.transferFrom(new ReadOnlyByteBufferChannel(records.buffer()), sizeInBytes(), records.sizeInBytes());sizeInBytes += written;return written;}}
}

通過上述一系列操作,Kafka將接收到的消息高效、可靠地持久化存儲到磁盤中,保證了數據的安全性和一致性。

通過對Kafka Broker端數據寫入流程的源碼剖析,我們全面了解了從網絡請求接收到消息持久化存儲的完整過程。各組件通過嚴謹的設計和高效的實現,確保了Kafka在高并發場景下能夠穩定、快速地處理大量消息寫入請求,為整個消息系統的可靠運行提供了堅實保障。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/85045.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/85045.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/85045.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

名稱 深度學習(監督學習) Iteration 一次 mini-batch 前向+反向傳播更新 Epoch 所有數據集訓練一遍。這兩個概念不一樣嗎?

你的問題非常專業&#xff0c;確實容易混淆&#xff0c;下面我用科研術語 通俗比喻來清晰地區分&#xff1a; ? 簡明對照表&#xff1a;Iteration vs. Epoch&#xff08;在監督學習中&#xff09; 名稱專業術語解釋通俗理解是否屬于監督學習&#xff08;深度學習&#xff09…

Blender 模型下載

1.CGModel網站 https://www.cgmodel.com/ 免費模型數量多&#xff0c;國風類題材的模型多&#xff0c;中文搜索方便 2.Blender官網 3.3d溜溜網 https://3d.3d66.com/ 4.free3d.com free3d.com

Spring涉及的設計模式以及實際使用場景(含代碼)

Spring涉及的設計模式以及實際使用場景(含代碼) 1.工廠模式&#xff08;Factory Pattern&#xff09; 作用: 隱藏對象創建的細節&#xff0c;通過工廠類統一管理對象的實例化。 場景&#xff1a;Spring的BeanFactory和ApplicationContext是工廠模式的典型實現。 // 通過App…

ROM 只讀存儲器 隨機存取

ROM&#xff08;Read-Only Memory&#xff0c;只讀存儲器&#xff09;的存取方式為&#xff1a; ? 隨機存取方式&#xff08;Random Access&#xff09; 盡管“ROM”強調的是“只讀”&#xff0c;它的數據訪問方式與 RAM 類似&#xff0c;都是隨機存取。 &#x1f50d; 解釋如…

opensuse解決微信無法登錄的問題

思路啟發 https://forum.suse.org.cn/t/topic/17183/2 實際解決 https://forum.suse.org.cn/t/topic/17204/5 解決方法 先安裝 sudo zypper install execstackcd /opt/wechatsudo bash -c execstack -c ./*.so

Adixen ASM380 氦氣檢漏儀 阿爾卡特Mobile high performance helium leak detector

Adixen ASM380 氦氣檢漏儀 阿爾卡特Mobile high performance helium leak detector

堆的自動管理

由于程序員必須編寫出到分配和釋放存儲器的明確的調用&#xff0c;所以用m a l l o c和f r e e完成指針的動態分配和重新分配是管理堆的手工( m a n u a l )方法。相反地&#xff0c;運行時棧則是由調用序列自動地( a u t o m a t i c a l l y )管理。在一種需要完全動態的運行…

智能出入庫管理系統:自動化管控平臺

部隊裝備庫室智能管控系統是集智能化、集成化、網絡化于一體的綜合管理系統&#xff0c;由智慧營區庫室綜合管控平臺、出入口控制子系統、智能QD柜子系統、裝備物資管理子系統、視頻監控系統、入侵報警子系統、環境監測子系統等七大核心子系統構成。各子系統通過數據自動交互&a…

歸并排序:高效分治的藝術

歸并排序(Merge Sort)原理詳解 歸并排序是一種基于分治法(Divide and Conquer)的高效排序算法,由馮諾依曼于1945年提出。它的核心思想是將大問題分解為小問題,解決小問題后再合并結果。 核心原理 1. 分治策略(Divide and Conquer) 分(Divide):將無序數組遞歸地拆…

知識庫建設方案有哪些?全面解析

知識庫建設方案主要包括本地部署方案、云端在線方案、混合部署方案。其中&#xff0c;云端在線方案以其靈活性、實時更新能力和低維護成本&#xff0c;逐漸成為大多數企業的首選方案。云端在線方案可隨時隨地提供實時更新的知識內容&#xff0c;確保企業員工和客戶始終獲得最新…

政務大廳智能引導系統:基于數字孿生的技術架構與實踐

本文面向政務信息化開發者、系統集成工程師、智能導視領域技術人員。解析政務大廳智能引導系統的技術實現路徑&#xff0c;提供從定位導航到數據驅動的技術方案&#xff0c;助力解決傳統導視系統效率低下、體驗不佳的技術痛點。 一、技術架構全景&#xff1a;從物理空間到數字映…

java設計模式[2]之創建型模式

文章目錄 一 創建型模式1.1 單例模式的設計與實現1.1.1 餓漢式模式1.1.2 懶漢式單例模式1.1.3 懶漢式單例模式完善1.1.4 雙重檢測鎖式1.1.4.1 volatile關鍵字1.1.4.2 在雙重檢查鎖定中的作用 1.1.5 靜態內部類式單例模式1.1.6 枚舉式單例模式1.1.7 反射暴力破解解決方案1.1.8 序…

PHP設計模式實戰:構建高性能API服務

在前一篇電子商務系統設計的基礎上,我們將深入探討如何運用設計模式構建高性能、可擴展的API服務。現代Web應用越來越依賴API作為前后端分離架構的核心,良好的API設計對系統性能和維護性至關重要。 倉庫模式實現數據訪問層 倉庫模式(Repository Pattern)可以抽象數據訪問邏…

ComfyUI Flux.1 ACE++ 圖像編輯原理詳解

關注不迷路&#xff0c;點贊走好運&#xff01;&#xff01;&#xff01; ComfyUI Flux.1 ACE 圖像編輯原理詳解 ——從“拼圖游戲”到“魔法畫筆”的技術革命 目錄 ACE 的核心思想&#xff1a;用“指令”指揮圖像生成 1.1 什么是上下文感知內容填充&#xff1f;1.2 條件單元&…

Datawhale-爬蟲

task1-初始爬蟲 爬蟲用python好&#xff0c;python庫多&#xff0c;功能全 反爬機制和反反爬機制 顧名思義&#xff0c;一個是防范爬蟲的&#xff0c;一個是應對限制爬蟲的方法 好的&#xff0c;我們來更深入地探討反爬機制和反反爬策略的細節&#xff0c;包括具體的技術手段…

雙token三驗證(Refresh Token 機制?)

單token存在的問題 我們都知道&#xff0c;token是我們在前后端數據傳輸的時候為了保證安全從而必須需要進行設置的東西&#xff0c;他的主要作用實際上就是為了保證我們的數據安全&#xff0c;進行身份驗證和授權&#xff0c;并且相對于session而言更加適合如今的分布式系統&a…

青少年編程與數學 01-011 系統軟件簡介 22 VMware 虛擬化軟件

青少年編程與數學 01-011 系統軟件簡介 22 VMware 虛擬化軟件 一、歷史沿革&#xff08;一&#xff09;創立階段&#xff08;1998-2003&#xff09;&#xff08;二&#xff09;快速擴張&#xff08;2004-2010&#xff09;&#xff08;三&#xff09;云時代轉型&#xff08;2011…

FPGA基礎 -- Verilog門級建模之奇偶校驗電路

? 一、什么是奇偶校驗&#xff08;Parity Check&#xff09; &#x1f4cc; 定義&#xff1a; 奇偶校驗是一種錯誤檢測編碼方式&#xff0c;用于判斷一個二進制數據在傳輸或存儲過程中是否發生了單比特錯誤。 奇校驗&#xff08;Odd Parity&#xff09;&#xff1a;總共有奇…

UWB協議精讀:IEEE 802.15.4z-2020,15. HRP UWB PHY, STS, HRP-ERDEV, BPRF, HPRF,

跟UWB相關的IEEE標準主要有2個: 1,IEEE 802.15.4-2020 2,IEEE 802.15.4z-2020 IEEE Std 802.15.4z? ‐ 2020 Amendment 1: Enhanced Ultra Wideband (UWB) Physical Layers (PHYs) and Associated Ranging Techniques scrambled timestamp sequence (STS): A sequence of…

6.IK分詞器拓展詞庫

比如一些行業專業詞匯、簡單無意義詞&#xff08;例如&#xff1a;的、得、地、是等)、網絡流行詞、后來形成的詞、再或者一些禁忌詞&#xff08;比如&#xff1a;領導人的名字、黃賭毒犯罪等詞要排除的&#xff09; 在es的插件目錄下查找配置文件&#xff1a; 找到IKAnalyzer…