Kafka系列文章
基于Kafka2.1解讀Producer原理
基于Kafka2.1解讀Consumer原理
Kafka服務端NIO操作原理解析(一)
文章目錄
- Kafka系列文章
- 前言
- 一、基本認知
- 二、Acceptor的主體流程
- 2.1 run方法源碼
- 2.2 acceptNewConnections方法源碼
- 2.3 主體邏輯流程示意圖
- 三、Processor的主體流程
- 3.1 run方法源碼
- 3.2 主體邏輯流程示意圖
- 3.2.1 configureNewConnections
- 3.2.2 processNewResponses
- 3.2.3 poll
- pollSelectionKeys
- 3.2.4 processCompletedReceives
- 3.2.5 processCompletedSends
- 四、問題
- 總結
前言
廢話不多說,繼續上一篇文章,我們繼續基于Kafka3.7解讀服務端的nio原理
一、基本認知
可以看到Acceptor和Processor都是線程,所以實際Kafka服務端在啟動(執行startup)方法之后,會基于一個Acceptor多個Processor的模式啟動,并把這多個線程執行起來。
二、Acceptor的主體流程
對于Acceptor來說,主要就是通過selector監聽accept事件,然后選擇一個processor來進行后續操作
2.1 run方法源碼
/*** Accept loop that checks for new connection attempts*/override def run(): Unit = {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)try {while (shouldRun.get()) {try {acceptNewConnections()closeThrottledConnections()}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket, selector, and any throttled sockets.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)throttledSockets.foreach(throttledSocket => closeSocket(throttledSocket.socket, this))throttledSockets.clear()}}
可以看到主要流程在acceptNewConnections中,下面我們看看acceptNewConnections代碼
2.2 acceptNewConnections方法源碼
/*** Listen for new connections and assign accepted connections to processors using round-robin.*/private def acceptNewConnections(): Unit = {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && shouldRun.get()) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {accept(key).foreach { socketChannel =>// Assign the channel to the next processor (using round-robin) to which the// channel can be added without blocking. If newConnections queue is full on// all processors, block until the last one is able to accept a connection.var retriesLeft = synchronized(processors.length)var processor: Processor = nulldo {retriesLeft -= 1processor = synchronized {// adjust the index (if necessary) and retrieve the processor atomically for// correct behaviour in case the number of processors is reduced dynamicallycurrentProcessorIndex = currentProcessorIndex % processors.lengthprocessors(currentProcessorIndex)}currentProcessorIndex += 1} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))}} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")} catch {case e: Throwable => error("Error while accepting connection", e)}}}}
2.3 主體邏輯流程示意圖
注意看到Processor的accept操作做了兩件事
1. 將socketChannel放到了自身的newConnections里
2. wakeup一下自身的selector
備注:newConnections是線程安全的ArrayBlockingQueue
三、Processor的主體流程
每個Processor是會處理多個socketChannel的:
channels變量維護多個KafkaChannel
explicitlyMutedChannels:維護的是被mute掉的channels
completedReceives:維護單次poll操作從每個channel讀取到的數據
completedSends:維護的是單次poll操作已經通過nio寫出去的數據
explicitly:強調 “主動、明確地”,即用戶通過手動操作(而非系統默認或其他自動設置)進行的行為
注意:completedReceives和completedSends單次執行poll操作的數據
3.1 run方法源碼
override def run(): Unit = {try {while (shouldRun.get()) {try {// setup any new connections that have been queued upconfigureNewConnections()// register any new responses for writingprocessNewResponses()poll()processCompletedReceives()processCompletedSends()processDisconnected()closeExcessConnections()} catch {// We catch all the throwables here to prevent the processor thread from exiting. We do this because// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would// be either associated with a specific socket channel or a bad request. These exceptions are caught and// processed by the individual methods above which close the failing channel and continue processing other// channels. So this catch block should only ever see ControlThrowables.case e: Throwable => processException("Processor got uncaught exception.", e)}}} finally {debug(s"Closing selector - processor $id")CoreUtils.swallow(closeAll(), this, Level.ERROR)}}
3.2 主體邏輯流程示意圖
可以看到我這個主體流程示意圖并沒有把processDisconnected()和closeExcessConnections()方法放進來,因為這兩個方法對于我們理解Kafka的nio不太重要,所以暫時忽略
3.2.1 configureNewConnections
1. 從自身的newConnections里獲取socketChannel
2. 將socketChannel封裝成KafkaChannel,并在selector上注冊讀事件
3.2.2 processNewResponses
一句話總結:將需要發送出去的數據拷貝到KafkaChannel上,也就是做真正發送的準備操作
1. 從自身responseQueue里poll一個可發送的response
2. 將response拷貝到對應KafkaChannel的send對象上
3. 在selector上注冊寫事件
4. 同時將response放到inflightRespones里:為后續執行回調函數使用
當然,responseQueue也是線程安全的,不過是LinkedBlockingDeque
3.2.3 poll
這個方法是服務端真正執行IO操作的邏輯,包括讀和寫
1. 清掉completedReceives和completedSends
2. 執行poll操作
3. 處理poll到的selectionKeys
pollSelectionKeys
attemptRead
1. 從socketChannel讀取數據
2. 將讀取到的數據存到selector的completedReceives里
3. 把當前KafkaChannel里的receive給清掉
attemptWrite
注意:此處KafkaChannel上的send數據來自于「3.2.2 processNewResponses」
4. 將KafkaChannel上send數據通過KafkaChannel寫出去
5. 將該數據append到selector的completedSends上
6. 把當前KafkaChannel里的send給清掉
3.2.4 processCompletedReceives
一言以蔽之:對上一步「3.2.3 poll」讀取到的數據進行處理
1. 將讀取到數據封裝成Request放到requestChannel的requestQueue里
2. 將該數據的Channel給禁言:mute
3. 清掉整個selector的completedReceives
mute操作主要兩件事:①刪除掉該channel在selector上注冊的讀事件②將該channel放到explicitlyMutedChannels里
3.2.5 processCompletedSends
一言以蔽之:對「3.2.3 poll」操作里發出去的response執行回調
1. 從inflightRespones里讀取response,執行該response的回調方法
2. 如果該channel被禁言了,解除禁言
3. 清掉整個selector的completedSends
解除禁言操作主要兩件事:①該channel在selector上注冊讀事件②將該channel從explicitlyMutedChannels里remove掉
四、問題
看完processor的主要操作,咱們就冒出兩個問題:
- processor的responseQueue里的數據是誰寫入的?
- 我們寫到requestChannel#requestQueue里的數據誰去處理了?
這兩個問題就是Kafka server的核心計算邏輯了,而本文著重講了Kafka server的核心IO邏輯
總結
在上一篇簡單概括加原生nio的引導之后,本文詳細介紹了Kafka server端Acceptor和Processor是如何工作來處理讀入和寫出的邏輯。
后續咱們就要基于咱們的問題來介紹Kafka server的計算邏輯了:讀進來的數據怎么處理,寫出去的response是怎么來的~