Kafka服務端NIO操作原理解析(二)

Kafka系列文章

基于Kafka2.1解讀Producer原理
基于Kafka2.1解讀Consumer原理
Kafka服務端NIO操作原理解析(一)


文章目錄

  • Kafka系列文章
  • 前言
  • 一、基本認知
  • 二、Acceptor的主體流程
    • 2.1 run方法源碼
    • 2.2 acceptNewConnections方法源碼
    • 2.3 主體邏輯流程示意圖
  • 三、Processor的主體流程
    • 3.1 run方法源碼
    • 3.2 主體邏輯流程示意圖
      • 3.2.1 configureNewConnections
      • 3.2.2 processNewResponses
      • 3.2.3 poll
        • pollSelectionKeys
      • 3.2.4 processCompletedReceives
      • 3.2.5 processCompletedSends
  • 四、問題
  • 總結


前言

廢話不多說,繼續上一篇文章,我們繼續基于Kafka3.7解讀服務端的nio原理


Kafka服務端IO類關系圖

一、基本認知

可以看到Acceptor和Processor都是線程,所以實際Kafka服務端在啟動(執行startup)方法之后,會基于一個Acceptor多個Processor的模式啟動,并把這多個線程執行起來。

二、Acceptor的主體流程

acceptor的類圖示意圖
對于Acceptor來說,主要就是通過selector監聽accept事件,然后選擇一個processor來進行后續操作

2.1 run方法源碼

  /*** Accept loop that checks for new connection attempts*/override def run(): Unit = {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)try {while (shouldRun.get()) {try {acceptNewConnections()closeThrottledConnections()}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket, selector, and any throttled sockets.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket, this))throttledSockets.clear()}}

可以看到主要流程在acceptNewConnections中,下面我們看看acceptNewConnections代碼

2.2 acceptNewConnections方法源碼

  /*** Listen for new connections and assign accepted connections to processors using round-robin.*/private def acceptNewConnections(): Unit = {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && shouldRun.get()) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {accept(key).foreach { socketChannel =>// Assign the channel to the next processor (using round-robin) to which the// channel can be added without blocking. If newConnections queue is full on// all processors, block until the last one is able to accept a connection.var retriesLeft = synchronized(processors.length)var processor: Processor = nulldo {retriesLeft -= 1processor = synchronized {// adjust the index (if necessary) and retrieve the processor atomically for// correct behaviour in case the number of processors is reduced dynamicallycurrentProcessorIndex = currentProcessorIndex % processors.lengthprocessors(currentProcessorIndex)}currentProcessorIndex += 1} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))}} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")} catch {case e: Throwable => error("Error while accepting connection", e)}}}}

2.3 主體邏輯流程示意圖

Acceptor的主體流程示意圖
注意看到Processor的accept操作做了兩件事

1. 將socketChannel放到了自身的newConnections里
2. wakeup一下自身的selector

備注:newConnections是線程安全的ArrayBlockingQueue

三、Processor的主體流程

Processor的類圖關系示意圖

每個Processor是會處理多個socketChannel的:

channels變量維護多個KafkaChannel
explicitlyMutedChannels:維護的是被mute掉的channels
completedReceives:維護單次poll操作從每個channel讀取到的數據
completedSends:維護的是單次poll操作已經通過nio寫出去的數據

explicitly:強調 “主動、明確地”,即用戶通過手動操作(而非系統默認或其他自動設置)進行的行為
注意:completedReceives和completedSends單次執行poll操作的數據

3.1 run方法源碼

override def run(): Unit = {try {while (shouldRun.get()) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()poll()processCompletedReceives()processCompletedSends()processDisconnected()closeExcessConnections()} catch {// We catch all the throwables here to prevent the processor thread from exiting. We do this because// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would// be either associated with a specific socket channel or a bad request. These exceptions are caught and// processed by the individual methods above which close the failing channel and continue processing other// channels. So this catch block should only ever see ControlThrowables.case e: Throwable => processException("Processor got uncaught exception.", e)}}} finally {debug(s"Closing selector - processor $id")CoreUtils.swallow(closeAll(), this, Level.ERROR)}}

3.2 主體邏輯流程示意圖

processor主體流程示意圖
可以看到我這個主體流程示意圖并沒有把processDisconnected()和closeExcessConnections()方法放進來,因為這兩個方法對于我們理解Kafka的nio不太重要,所以暫時忽略

3.2.1 configureNewConnections

1. 從自身的newConnections里獲取socketChannel
2. 將socketChannel封裝成KafkaChannel,并在selector上注冊讀事件

3.2.2 processNewResponses

一句話總結:將需要發送出去的數據拷貝到KafkaChannel上,也就是做真正發送的準備操作

1. 從自身responseQueue里poll一個可發送的response
2. 將response拷貝到對應KafkaChannel的send對象上
3. 在selector上注冊寫事件
4. 同時將response放到inflightRespones里:為后續執行回調函數使用

當然,responseQueue也是線程安全的,不過是LinkedBlockingDeque

3.2.3 poll

這個方法是服務端真正執行IO操作的邏輯,包括讀和寫

1. 清掉completedReceives和completedSends
2. 執行poll操作
3. 處理poll到的selectionKeys
pollSelectionKeys

attemptRead

1. 從socketChannel讀取數據
2. 將讀取到的數據存到selector的completedReceives里
3. 把當前KafkaChannel里的receive給清掉

attemptWrite
注意:此處KafkaChannel上的send數據來自于「3.2.2 processNewResponses」

4. 將KafkaChannel上send數據通過KafkaChannel寫出去
5. 將該數據append到selector的completedSends上
6. 把當前KafkaChannel里的send給清掉

3.2.4 processCompletedReceives

一言以蔽之:對上一步「3.2.3 poll」讀取到的數據進行處理

1. 將讀取到數據封裝成Request放到requestChannel的requestQueue里
2. 將該數據的Channel給禁言:mute
3. 清掉整個selector的completedReceives

mute操作主要兩件事:①刪除掉該channel在selector上注冊的讀事件②將該channel放到explicitlyMutedChannels里

3.2.5 processCompletedSends

一言以蔽之:對「3.2.3 poll」操作里發出去的response執行回調

1. 從inflightRespones里讀取response,執行該response的回調方法
2. 如果該channel被禁言了,解除禁言
3. 清掉整個selector的completedSends

解除禁言操作主要兩件事:①該channel在selector上注冊讀事件②將該channel從explicitlyMutedChannels里remove掉

四、問題

看完processor的主要操作,咱們就冒出兩個問題:

  1. processor的responseQueue里的數據是誰寫入的?
  2. 我們寫到requestChannel#requestQueue里的數據誰去處理了?

這兩個問題就是Kafka server的核心計算邏輯了,而本文著重講了Kafka server的核心IO邏輯


總結

Acceptor和Processor兩大組件主體流程示意圖

在上一篇簡單概括加原生nio的引導之后,本文詳細介紹了Kafka server端Acceptor和Processor是如何工作來處理讀入和寫出的邏輯。
后續咱們就要基于咱們的問題來介紹Kafka server的計算邏輯了:讀進來的數據怎么處理,寫出去的response是怎么來的~

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/95017.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/95017.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/95017.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

當服務器負載過高時該怎么辦?

當服務器出現負載過高時,要對內存、網絡等硬件設備進行查看,其中CPU是重中之重,對CPU使用率進行查看和了解,確定哪些進程占用了大量CPU資源,如果是某個應用程序進程導致CPU高負載,需分析該程序是否存在算法…

跨境電商增長突圍:多維變局下的戰略重構與技術賦能

在全球化商業浪潮與數字技術迭代的雙重驅動下,跨境電商行業正經歷前所未有的變革。平臺規則的動態調整、市場需求的非線性波動、技術工具的持續革新,共同構成了復雜多變的競爭格局。對于賣家而言,如何在不確定性中錨定增長路徑,已…

軟件運行時 ffmpeg.dll 丟失怎么辦?從原因排查到完美修復的完整方案

在計算機的軟件生態系統中,ffmpeg.dll 扮演著一個至關重要卻又常被普通用戶忽視的角色。這個看似普通的文件,實際上是眾多多媒體相關軟件能夠正常運行的基石之一。當你的視頻剪輯軟件突然無法導入特定格式的視頻,或者心愛的游戲在啟動時彈出令…

Axure 11

下載 Axure RP - UX Prototypes, Specifications, and Diagrams in One Tool 激活 Axure RP11 2025-8-7日親測有效: 49bb9513c40444b9bcc3ce49a7a022f9 漢化 Axure Rp 11 安裝、漢化、授權_axure11漢化包-CSDN博客 中文社區 https://www.axure.com.cn/

論文閱讀:User Behavior Simulation with Large Language Model-based Agents

https://dl.acm.org/doi/pdf/10.1145/3708985 https://www.doubao.com/chat/15495707526837250 Advances and Challenges in Foundation Agents–Memory調研 論文翻譯 基于大型語言模型代理的用戶行為模擬 摘要 在推薦系統、社交網絡等以人為中心的應用中,高質…

基于ECharts的智慧社區數據可視化

引言數據可視化是現代Web應用的重要組成部分,它能將復雜的數據以直觀的圖形方式呈現,幫助用戶快速理解數據背后的信息。本文將介紹如何使用ECharts庫實現智慧社區住戶數據的可視化展示,包括柱狀圖和餅圖兩種常見圖表類型的實現與優化。技術棧…

Qt 綜述:從基礎到一般應用

摘要: Qt,作為久經考驗的跨平臺C開發框架,以其強大的功能、豐富的類庫和靈活的機制,在桌面應用、嵌入式系統、網絡編程等領域占據重要地位。本文將深入解析Qt的核心技術體系,涵蓋基礎架構、核心機制、UI開發、外觀定制…

基于PyTorch一文講清楚損失函數與激活函數并配上詳細的圖文講解

PyTorch損失函數與激活函數 目錄 激活函數詳解損失函數詳解實戰案例性能優化技巧 激活函數詳解 1. 什么是激活函數? 激活函數是神經網絡中的關鍵組件,它決定了神經元是否應該被激活。沒有激活函數,神經網絡就只是線性變換的堆疊&#xf…

蒼穹外賣-Day1 | 環境搭建、nginx、git、令牌、登錄加密、接口文檔、Swagger

目錄 nginx: nginx反向代理和負載均衡概念 nginx反向代理和負載均衡如何配置? 后端環境:maven管理 sky-common sky-pojo sky-server: 后端環境搭建--Git進行版本控制 后端數據庫--Mysql 前后端聯調 前后端運行&#xf…

論文閱讀-ZeroDCE和ZeroDCE++

文章目錄1 概述2 模塊2.1 總體說明2.2 LE曲線(Light Enhance Curve)2.3 DCE-Net2.4 無監督損失2.4.1 空間一致性損失2.4.2 曝光控制損失2.4.3 顏色恒定損失2.4.4 照明平滑度損失2.5 ZeroDCE3 效果3.1 不同損失函數組合的效果3.2 參數設置影響3.3 訓練數據…

Web自動化技術選擇

我想學習自動化技術,我的訴求是: 1.我想做自動報社保功能,先從我們自己的系統里面下載Excel,然后自動登錄到社保局的系統,自動填寫Excel,自動上傳Excel。 2.可以自動的到社保局的系統里面查下數據&#xff…

【celeba】-數據集的介紹

CelebA 數據集在 MTCNN 中的使用 1 數據集結構 CelebA_副本/ ├── Anno/ │ ├── list_bbox_celeba.txt # 邊界框 │ ├── list_landmarks_celeba.txt # 5 關鍵點 │ ├── list_attr_celeba.txt # 40 屬性 │ └── identity_CelebA.txt …

解讀 GPT-5:從“博士級 AI 專家”能力到 OpenAI API Key 獲取與實踐(提示工程→性能調優全流程)

GPT-5深度解讀:一位“博士級專家”的誕生與思考第一部分:新范式——化繁為簡的統一智能體 OpenAI的GPT-5,遠不止是一次常規的模型升級。它的發布,標志著一種顛覆性的架構思想變革:從過去那個讓用戶在各種“Turbo”、“…

8.3.1 注冊服務中心Etcd

etcd是什么 etcd 是一個分布式鍵值對存儲,設計用來可靠而快速的保存關鍵數據并提供訪問。通過分布式鎖, leader選舉保障可靠的分布式協同。 etcd 特點 完全復制,集群中的每個節點均擁有全量數據 強一致性,etcd通過raft共識算法…

異或循環冗余

異或代碼CRC8-ITU例程比較計算CRC16異或改進測試重點代碼 def fun_crc(datas):crc 0xFF poly 0x07 for i in range(len(datas)):for j in range(7, -1, -1):if (crc & 0x80) 0x80: crc (crc << 1) ^ polyelse:crc (crc << 1)if (datas[i] & 2**j):#值…

一款輕量、免費、無廣告,強大的演示工具,支持屏幕放大、涂鴉、截圖、錄屏

軟件介紹 ZoomIt&#xff0c;是一款輕量、免費、無廣告&#xff0c;強大的演示工具&#xff0c;支持屏幕放大、涂鴉、截圖、錄屏等。在系統托盤中不顯示的運行&#xff0c;可自定義快捷鍵&#xff0c;使用方面。 軟件基礎功能 屏幕放大&#xff1a;按下快捷鍵“Ctrl1”可進入…

從街亭失守看管理

最近看了《三國演義》&#xff0c;重溫了街亭失守事件&#xff0c;從馬謖最耀眼的登場來看就是&#xff0c;火燒藤甲兵計策和諸葛亮不謀而合&#xff0c;說明馬謖確實是有真材實料的&#xff0c;但在守街亭的任務上&#xff0c;諸葛亮也躊躇過又對先帝的遺言“馬謖不可重用”記…

全面解析 URL 重定向原理:從協議、實現到安全實踐

一、什么是 URL 重定向&#xff1f; URL 重定向&#xff08;URL Redirection&#xff09;是 Web 技術中一種將用戶請求的 URL 自動轉向另一個目標 URL 的機制。用戶可能完全不會察覺跳轉發生&#xff0c;因為瀏覽器在幕后完成了一切。 重定向通常用于以下場景&#xff1a; 網…

多任務 Transformer 模型的高效任務間注意力

抽象 在計算機視覺和更廣泛的深度學習領域&#xff0c;Transformer 架構已被公認為許多應用程序的最先進技術。然而&#xff0c;對于多任務學習&#xff0c;與單任務模型相比&#xff0c;可能需要更多的查詢&#xff0c;考慮到實際的硬件限制&#xff0c;它的多頭注意力通常接近…

QT的常用控件說明

文章目錄基本的代碼的模板Label控件font字體相關Button 控件CheckBox 控件Radio控件ComboBox控件LineEdit 控件基本的代碼的模板 class MainWindow(QWidget):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs) # 調用父類初始化方法# 聲明窗口實例# 代…