Kafka 源碼剖析:消息存儲與協議實現(二)

四、協議實現機制探秘

4.1 生產者協議

4.1.1 消息發送流程

Producer 在向 Kafka 集群發送消息時,首先會根據分區策略選擇目標分區 。常見的分區策略有輪詢、按消息鍵的哈希值分區以及自定義分區策略 。如果生產者在發送消息時指定了分區號,那么消息就會直接被發送到指定的分區;若未指定分區號,但指定了消息的鍵(key),則會根據鍵的哈希值對分區數量取模,得到的結果就是消息要發送到的分區號;若分區號和鍵都未指定,生產者會采用輪詢的方式,依次將消息發送到各個分區,以實現負載均衡。例如,假設有一個包含 3 個分區的 Topic,當生產者未指定分區和鍵時,第一條消息會被發送到分區 0,第二條發送到分區 1,第三條發送到分區 2,第四條又回到分區 0,以此類推。

消息發送前,Producer 會對消息進行序列化處理,將消息對象轉換為字節數組,以便在網絡中傳輸 。然后,消息會被批量處理,Producer 會將多條消息組合成一個批次(batch),這樣可以減少網絡請求的次數,提高傳輸效率 。批次的大小可以通過batch.size參數進行配置,默認值為 16384 字節 。當批次中的消息大小達到batch.size,或者等待時間達到linger.ms(默認值為 0,即不等待)時,Producer 就會將批次中的消息發送出去。

在發送消息時,Producer 會與目標分區的 Leader Partition 建立 TCP 連接,并將消息發送給它 。如果當前沒有可用的連接,Producer 會創建新的連接 。為了提高性能,Producer 還會對連接進行復用,避免頻繁地創建和銷毀連接 。在實際應用中,通過合理調整分區策略、批次大小以及連接管理參數,可以有效提升生產者的發送效率和系統的整體性能。例如,在高并發場景下,適當增大batch.size和linger.ms的值,可以減少網絡開銷,提高吞吐量,但同時也會增加消息的延遲;而在對消息實時性要求較高的場景下,則需要減小這些值,以降低延遲。

4.1.2 消息確認機制

Producer 發送消息后,需要等待 Broker 的確認(ACK),以確保消息已經成功發送到 Kafka 集群 。Kafka 提供了三種不同的確認級別,通過acks參數進行配置:

  • acks = 0:Producer 發送消息后,不需要等待 Broker 的確認,就認為消息已經成功發送 。這種方式的發送速度最快,但可靠性最低,因為如果在發送過程中出現網絡故障或 Broker 故障,消息可能會丟失 。例如,在一些對數據準確性要求不高的場景,如日志收集,為了追求高吞吐量,可以采用這種確認級別。
  • acks = 1:Producer 發送消息后,會等待 Leader Partition 將消息寫入日志文件后,才認為消息發送成功 。這種方式在一定程度上保證了消息的可靠性,但如果在 Leader Partition 將消息寫入日志后,還未將消息同步給 Follower Partition 時,Leader Partition 所在的 Broker 發生故障,那么這條消息可能會丟失 。在一些對數據可靠性有一定要求,但又希望保持較高吞吐量的場景下,可以選擇這種確認級別。
  • acks = -1 或 acks = all:Producer 發送消息后,會等待所有在同步副本集合(ISR)中的副本都將消息寫入日志后,才認為消息發送成功 。這種方式提供了最高的可靠性,即使 Leader Partition 發生故障,也能保證消息不會丟失 。不過,由于需要等待所有副本的確認,這種方式的延遲最高,吞吐量相對較低 。在對數據可靠性要求極高的場景,如金融交易數據處理,通常會采用這種確認級別。

消息確認機制直接影響著消息的可靠性語義。在選擇確認級別時,需要根據具體的業務需求,在可靠性和性能之間進行權衡 。例如,在電商訂單處理系統中,訂單信息的準確性至關重要,就應該選擇acks = -1的確認級別,以確保訂單消息不會丟失;而在一些實時監控系統中,對于偶爾丟失一些監控數據的容忍度較高,更注重系統的吞吐量和實時性,就可以選擇acks = 0或acks = 1的確認級別。

4.2 消費者協議

4.2.1 消息拉取流程

Consumer 向 Kafka 集群拉取消息時,首先會向 Kafka 集群發送 Fetch 請求 。在請求中,Consumer 需要指定要拉取消息的 Topic、Partition 以及起始的偏移量(offset) 。Kafka 集群接收到 Fetch 請求后,會根據請求中的信息,找到對應的 Partition 的 Leader Partition,并從 Leader Partition 中讀取消息 。

在拉取消息時,Consumer 可以通過fetch.min.bytes和fetch.max.bytes參數來控制每次拉取的最小和最大字節數 。fetch.min.bytes表示 Consumer 期望每次拉取到的最小數據量,默認值為 1 字節 。當 Broker 中可用的消息字節數小于fetch.min.bytes時,Broker 會等待,直到有足夠的數據可供拉取,或者等待時間超過fetch.wait.max.ms(默認值為 500 毫秒) 。fetch.max.bytes表示 Consumer 每次拉取消息的最大字節數,默認值為 52428800 字節(50MB) 。通過合理配置這兩個參數,可以優化 Consumer 的拉取性能。例如,在處理大數據量的場景下,可以適當增大fetch.max.bytes的值,減少拉取次數,提高效率;而在對實時性要求較高的場景下,可以減小fetch.min.bytes和fetch.wait.max.ms的值,降低延遲。

Consumer 在拉取消息時,還可以指定消費的起始位置(offset) 。如果 Consumer 是第一次消費某個 Partition 的消息,它可以從最早的消息開始消費(offset = 0),也可以從最新的消息開始消費(offset = -1) 。如果 Consumer 之前已經消費過該 Partition 的消息,它可以根據之前記錄的 offset 繼續消費 。在實際應用中,根據業務需求選擇合適的起始位置非常重要。例如,在數據備份和恢復場景中,可能需要從最早的消息開始消費,以確保數據的完整性;而在實時數據分析場景中,通常只需要從最新的消息開始消費,獲取最新的業務數據。

4.2.2 Offset 管理

在 Kafka 中,Offset 由 Consumer 自己維護 。Consumer 在消費消息的過程中,會定期將自己已經消費到的 offset 提交到 Kafka 集群中 。Offset 的提交方式有自動提交和手動提交兩種 。

  • 自動提交:Consumer 可以通過設置enable.auto.commit參數為true來開啟自動提交功能 。自動提交的時間間隔可以通過auto.commit.interval.ms參數進行配置,默認值為 5000 毫秒 。在自動提交模式下,Consumer 會每隔auto.commit.interval.ms毫秒,自動將當前消費到的 offset 提交到 Kafka 集群 。這種方式簡單方便,但存在一定的風險。例如,如果在自動提交 offset 后,Consumer 還未處理完消息就發生了故障,那么下次重啟后,Consumer 會從已提交的 offset 開始消費,可能會導致部分消息被重復消費。
  • 手動提交:Consumer 可以通過設置enable.auto.commit參數為false來關閉自動提交功能,然后使用commitSync()或commitAsync()方法手動提交 offset 。commitSync()方法會同步提交 offset,即等待 Kafka 集群確認提交成功后才返回 。這種方式可以確保 offset 的提交成功,但會阻塞 Consumer 的線程,影響消費效率 。commitAsync()方法會異步提交 offset,即不會等待 Kafka 集群的確認就返回 。這種方式不會阻塞 Consumer 的線程,提高了消費效率,但由于是異步提交,可能會出現提交失敗的情況 。在實際應用中,為了保證數據的準確性,通常會在消息處理完成后,手動提交 offset 。例如,在處理訂單消息時,當訂單處理完成后,再手動提交 offset,這樣可以確保不會重復處理訂單。

Offset 的更新時機對消息消費語義有著重要影響 。如果在消息處理之前就提交 offset,那么當 Consumer 發生故障重啟后,可能會導致部分消息未被處理就被認為已經消費,從而出現消息丟失的情況,這就是 “at most once” 的消費語義 。如果在消息處理完成后才提交 offset,那么當 Consumer 發生故障重啟后,可能會導致部分消息被重復消費,這就是 “at least once” 的消費語義 。在實際應用中,需要根據業務需求選擇合適的消費語義和 offset 更新時機 。例如,在一些對數據準確性要求極高的場景,如金融交易處理,通常會選擇 “at least once” 的消費語義,并在消息處理完成后再提交 offset;而在一些對數據準確性要求不高,但對實時性要求較高的場景,如實時監控數據處理,可以選擇 “at most once” 的消費語義,以提高處理速度。

4.3 副本同步協議

4.3.1 備份機制

Kafka 對每個 topic 的 partition 進行備份,以保證數據的可靠性和高可用性 。每個 partition 都有一個 Leader 副本和若干個 Follower 副本 。Leader 副本負責處理來自 Producer 和 Consumer 的讀寫請求,而 Follower 副本則負責從 Leader 副本同步消息,保持與 Leader 副本的數據一致性 。

Follower 副本通過向 Leader 副本發送 Fetch 請求來同步消息 。在 Fetch 請求中,Follower 副本會攜帶自己當前的日志末端偏移量(LEO,Log End Offset),表示它已經同步到的消息位置 。Leader 副本接收到 Fetch 請求后,會從自己的日志中讀取從 Follower 副本的 LEO 開始的消息,并將這些消息發送給 Follower 副本 。Follower 副本收到消息后,將其追加到自己的日志中,并更新自己的 LEO 。例如,假設 Follower 副本的 LEO 為 100,Leader 副本的日志中有消息 101、102、103,那么 Leader 副本會將消息 101、102、103 發送給 Follower 副本,Follower 副本接收并寫入這些消息后,將 LEO 更新為 104。

通過這種備份機制,即使某個 Broker 發生故障,導致其上的 Leader 副本不可用,Kafka 也可以從其他存活的 Follower 副本中選舉出新的 Leader 副本,繼續提供服務,從而保證數據的可靠性和系統的高可用性 。在實際的分布式系統中,這種備份機制能夠有效應對各種硬件故障和網絡問題,確保數據的安全和業務的連續性。例如,在一個大規模的電商系統中,Kafka 的備份機制可以保證訂單數據、用戶數據等關鍵信息不會因為個別服務器的故障而丟失或不可用。

4.3.2 ISR 與選舉機制

Kafka 通過動態維護同步備份集合(ISR,In-Sync Replicas)來確保數據的一致性和可靠性 。ISR 集合中包含了與 Leader 副本保持同步的 Follower 副本 。只有在 ISR 集合中的副本才有資格被選舉為新的 Leader 副本 。

Kafka 判斷 Follower 副本是否與 Leader 副本同步的依據是replica.lag.time.max.ms參數,默認值為 10000 毫秒(10 秒) 。如果一個 Follower 副本在超過replica.lag.time.max.ms的時間內沒有向 Leader 副本發送 Fetch 請求,或者雖然發送了 Fetch 請求,但在replica.lag.time.max.ms時間內沒有追上 Leader 副本的消息進度,那么這個 Follower 副本就會被認為與 Leader 副本不同步,會被從 ISR 集合中移除 。例如,假設replica.lag.time.max.ms為 10 秒,某個 Follower 副本在 15 秒內都沒有向 Leader 副本發送 Fetch 請求,那么它就會被移出 ISR 集合。

當 Leader 副本發生故障時,Kafka 會從 ISR 集合中選舉新的 Leader 副本 。選舉的過程由 Kafka 的 Controller 負責,Controller 是 Kafka 集群中的一個特殊的 Broker,它負責管理集群的元數據信息和分區的 Leader 選舉等工作 。在選舉新的 Leader 副本時,Controller 會優先選擇 ISR 集合中 LEO 最大的 Follower 副本作為新的 Leader 副本,因為這個副本的數據與原 Leader 副本最為接近,能夠最大程度地保證數據的一致性 。例如,ISR 集合中有三個 Follower 副本,它們的 LEO 分別為 100、105、103,那么 Controller 會選擇 LEO 為 105 的 Follower 副本作為新的 Leader 副本。

如果在選舉時 ISR 集合為空,即所有的 Follower 副本都與 Leader 副本不同步,此時 Kafka 可以選擇等待 ISR 集合中的副本恢復,或者從非 ISR 集合中的副本中選舉新的 Leader 副本 。從非 ISR 集合中選舉新的 Leader 副本可能會導致數據丟失,因為這些副本的數據可能與原 Leader 副本不一致 。因此,在實際應用中,需要根據具體的業務需求和數據一致性要求,合理配置相關參數,確保 ISR 集合的穩定性和可靠性 。例如,在對數據一致性要求極高的金融領域,通常會嚴格控制 ISR 集合的成員,避免從非 ISR 集合中選舉 Leader 副本,以防止數據丟失;而在一些對數據一致性要求相對較低,但對系統可用性要求較高的場景,如一些實時監控系統,可以適當放寬 ISR 集合的條件,允許在 ISR 集合為空時從非 ISR 集合中選舉 Leader 副本,以保證系統的持續運行。

五、總結與展望

通過對 Kafka 消息存儲與協議實現的深入剖析,我們全面了解了其內部的工作原理和機制 。在消息存儲方面,Kafka 采用了獨特的物理存儲結構,通過合理的分區分配策略和高效的文件管理機制,實現了海量消息的可靠存儲和快速檢索 。其文件格式設計以及索引文件機制,為消息的讀寫操作提供了堅實的支持,使得 Kafka 在高并發場景下依然能夠保持出色的性能 。

在協議實現方面,生產者協議、消費者協議和副本同步協議協同工作,保障了消息在 Kafka 集群中的高效傳輸、準確消費以及數據的一致性和高可用性 。生產者通過靈活的分區策略和消息確認機制,確保消息能夠可靠地發送到 Kafka 集群;消費者通過精確的消息拉取流程和可控的 Offset 管理方式,實現了對消息的有序消費;副本同步協議則通過備份機制和 ISR 與選舉機制,保證了數據在集群中的安全性和可恢復性 。

深入理解這些機制對于優化 Kafka 性能和解決生產問題具有重要意義 。在實際應用中,我們可以根據業務需求和場景特點,合理調整 Kafka 的配置參數,如分區數量、副本因子、消息確認級別、Offset 提交方式等,以達到最佳的性能表現 。同時,當遇到諸如消息丟失、重復消費、集群性能瓶頸等問題時,能夠依據對內部機制的理解,快速定位問題根源并找到解決方案 。

展望未來,隨著大數據和分布式系統技術的不斷發展,Kafka 在消息隊列領域有望繼續保持領先地位并不斷演進 。一方面,Kafka 可能會在性能優化、擴展性提升以及功能增強等方面持續創新,以滿足不斷增長的業務需求 。例如,進一步優化消息存儲和傳輸機制,提高集群的吞吐量和低延遲性能;增強對新的存儲介質和硬件架構的支持,提升系統的整體效率 。另一方面,隨著云計算、容器化技術的普及,Kafka 與這些新興技術的融合也將成為發展趨勢,實現更加便捷的部署、管理和運維 。此外,面對日益復雜的業務場景和數據處理需求,Kafka 可能會不斷拓展其應用領域,如在實時數據處理、人工智能模型訓練數據傳輸等方面發揮更大的作用 。總之,Kafka 作為消息隊列領域的佼佼者,未來充滿著無限的可能和發展空間 。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/911857.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/911857.shtml
英文地址,請注明出處:http://en.pswp.cn/news/911857.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Vue.js 與 TypeScript:最佳實踐

1. 引言 Vue.js 是一個漸進式、靈活的 JavaScript 框架,廣泛用于構建用戶界面和單頁應用(SPA)。而 TypeScript 是 JavaScript 的一個超集,添加了靜態類型和其他高級特性。將兩者結合使用,可以幫助開發者構建更具可維護…

webpack5 css-loader:從基礎到原理

webpack 處理樣式 webpack本身是不能識別樣式資源的,需要借助Loader來幫助webpack解析樣式資源,樣式資源包括但不限于css/less/sass/scss/styl 未使用樣式處理加載器前 運行webpack打包命令 bash npx webpack報錯信息如圖,提示無法識別css…

【GESP】C++三級練習 luogu-B2096 直方圖

GESP C三級練習,一維數組練習,難度★★☆☆☆。 題目題解詳見:【GESP】C三級練習 luogu-B2096 直方圖 | https://www.coderli.com/gesp-3-luogu-b2096/ 【GESP】C三級練習 luogu-B2096 直方圖 | OneCoderGESP C三級練習,一維數組…

【網站內容安全檢測】之2:從網站所有URL頁面中提取所有外部及內部域名信息

還沒寫成Go的,用Python吧,稍微慢一點 依賴內容(安裝命令pip install -r requirements.txt) requirements.txt aiohttp beautifulsoup44.12.2 tqdm4.66.1 redis5.2.1 motor3.3.1 pymongo4.6.0 chardet提取域名的程序 domain_extractor.py …

【LLaMA-Factory 實戰系列】四、API 篇 - 部署推理服務與批量調用實戰

【LLaMA-Factory 實戰系列】四、API 篇 - 部署推理服務與批量調用實戰 1. 引言2. 推理后端的選擇與對比3. 部署 API 推理服務3.1 創建 API 配置文件3.2 啟動 API 服務3.3 探索交互式 API 文檔 4. 編寫 Python 腳本進行批量調用4.1 準備工作4.2 批量調用腳本4.3 運行腳本并查看結…

C++工廠模式的作用(工廠方法、Factory Method、Factory Pattern)

文章目錄 代碼示例工廠的作用1. 對象創建的封裝 🏭2. 解耦客戶端和具體類 🔗3. 統一的創建入口 🚪4. 隱藏實現細節 🎭 在這個項目中的具體體現總結 代碼示例 https://gitee.com/arnold_s/my-learning-test/tree/master/20250610_…

9-C#修改任務管理的名稱

C#修改任務管理的名稱

Fisco Bcos學習 - 搭建第一個區塊鏈網絡

文章目錄 一、前言二、環境準備三、安裝依賴在 macOS 上安裝依賴在 Ubuntu 上安裝依賴在 CentOS 上安裝依賴 四、創建操作目錄并下載安裝腳本五、搭建單群組 4 節點聯盟鏈六、啟動 FISCO BCOS 鏈七、檢查進程八、檢查日志輸出 在數字化時代,區塊鏈技術正逐漸成為推動…

可視化圖解算法53:表達式求值

牛客網 面試筆試 TOP 101 1. 題目 描述 請寫一個整數計算器,支持加減乘三種運算和括號。 數據范圍:0≤∣s∣≤100,保證計算結果始終在整型范圍內 要求:空間復雜度: O(n),時間復雜度 O(n) 示例1 輸入…

小白成長之路-Nginx配置(二)

文章目錄 一、localtion配置1.匹配規則2.匹配優先級3.配置案例 二、rewrite1、 語法2、 可寫入字段3 配置案例4 if 指令5.sutoindex6. nginx配置中的常用變量 三、配置Nginx狀態統計1.下載vts模塊2.編譯nginx 提示:以下是本篇文章正文內容,下面案例可供參…

Qt的第一個程序

Qt的第一個程序 1.hello world2.使用圖形化拖拽方式3.使用C代碼的方式3.1.頭文件3.2.setText3.3.對象樹 4.設計MyLabel5.亂碼問題 🌟🌟hello,各位讀者大大們你們好呀🌟🌟 🚀🚀系列專欄&#xff…

圖書數據接口

基本說明: 接口地址:http://data.isbn.work/openApi/getInfoByIsbn?isbn{isbn}&appKey{appkey}返回格式:json請求方式:get請求示例:http://data.isbn.work/openApi/getInfoByIsbn?isbn9787513159074&appKey…

MongoDB原理

目錄 一、概念 二、架構 2.1 邏輯結構 2.2 數據模型 2.3 存儲引擎:WiredTiger 三、事務 一、概念 MongoDB是文檔數據庫,基本存儲單元是 文檔(Document),以BSON格式(一種類json的二進制形式&#xff…

《解碼音頻:從基礎到未來的聽覺探索》

音頻:開啟聲音世界的大門 在生活的每一個角落,音頻如影隨形,編織出豐富多彩的聽覺體驗。清晨,第一縷陽光尚未完全照進房間,手機里溫柔的鬧鐘鈴聲,將我們從睡夢中輕輕喚醒,開啟活力滿滿的一天。通…

web安全之h2注入系統學習

起初是在N1 Junior 2025 上面碰到一題,考點是h2的sql注入。由于之前沒有見過,趁此機會系統學習一番 實驗代碼 public class H2Inject {public static void main(String[] args) throws Exception{JdbcDataSource dataSource new JdbcDataSource();dataS…

AWS認證系列:考點解析 - cloud trail,cloud watch,aws config

🎯一句話總覽: 服務名類比/角色主要功能CloudTrail監控攝像頭錄像回放記錄“誰在什么時候做了什么操作”CloudWatch護士測體溫 護士喊醫生實時監控系統狀態,并能報警/自動應對AWS Config保安巡邏 記錄資產變更歷史記錄 AWS 資源的“配置狀…

Java八股文——數據結構「數據結構篇」

了解哪些數據結構? 面試官您好,我了解并使用過多種數據結構。在我的理解中,數據結構可以分為幾個大的類別,每一類都有其獨特的優勢和適用場景。 1. 線性結構 (Linear Structures) 這類結構的特點是數據元素之間存在一對一的線性…

C#測試調用EPPlus根據批注設置excel單元格內容

EPPlus也是常用的Excel文件操作庫,但不同于ClosedXML,使用EPPlus前需要設置授權信息,商業應用需要設置商業授權,個人使用或非商業應用也需要設置授權(測試的時候只需設置全名,保存excel文件時會保存到文件詳…

windows本地搭建skywalking, 線程池中traceId不丟失

1.從官網下載9.0.0版本 Downloads | Apache SkyWalking 其它歷史版本的 下載地址 Index of /dist/skywalking 這個頁面 可以下載 apm服務: apache-skywalking-apm-9.0.0.tar.gz agent的包: apache-skywalking-java-agent-9.0.0.tgz 2.解壓后, (看情況去config路徑下 appli…

多模態大語言模型arxiv論文略讀(135)

Agent S: An Open Agentic Framework that Uses Computers Like a Human ?? 論文標題:Agent S: An Open Agentic Framework that Uses Computers Like a Human ?? 論文作者:Saaket Agashe, Jiuzhou Han, Shuyu Gan, Jiachen Yang, Ang Li, Xin Eric…