Kafka Controller 元數據解析與故障恢復實戰指南

#作者:張桐瑞

文章目錄

  • 1 生產案例:Controller 選舉在故障恢復中的關鍵作用
    • 1.1 問題背景
    • 1.2 核心操作原理:
  • 2 Controller 元數據全景:從 ZooKeeper 到內存的數據鏡像
    • 2.1元數據核心載體:ControllerContext 類
    • 2.2核心元數據深度解析

1 生產案例:Controller 選舉在故障恢復中的關鍵作用

1.1 問題背景

某 Kafka 集群部分核心主題分區一直處于“不可用”狀態,通過kafka-toics.sh命令查看,發現分區leader一直處于Leader=-1的 “不可用” 狀態。嘗試重啟舊 Leader 所在 Broker 無效,并且由于是生產環境,不能通過重啟整個集群來隨意重啟,這畢竟是一個非常缺乏計劃性的事情。

如何在避免重啟集群的情況下,干掉已有Controller并執行新的Controller選舉呢?答案就在源碼中的ControllerZNode.path上,也就是ZooKeeper的/controller節點。倘若我們手動刪除/controller節點,Kafka集群就會觸發Controller選舉。于是,我們馬上實施這個方案,效果出奇得好:之前的受損分區全部恢復正常,業務數據得以正常生產和消費。

1.2 核心操作原理:

Controller 選舉后會觸發集群元數據全量同步(“重刷” 分區狀態),原理如下:

  1. ZooKeeper 的/controller節點存儲當前 Controller 信息,刪除該節點會觸發集群重新選舉 Controller
  2. 新 Controller 上任后通過UpdateMetadataRequest向所有 Broker 同步最新元數據,修復分區狀態
    注意事項:此操作需謹慎,生產環境建議通過 API 或工具觸發元數據更新,避免直接操作 ZooKeeper 節點。

2 Controller 元數據全景:從 ZooKeeper 到內存的數據鏡像

Kafka Controller 作為集群元數據的 “真理之源副本”,其核心作用是:

  • 緩存 ZooKeeper 元數據,避免 Broker 直接與 ZooKeeper 交互
  • 未來將替代 ZooKeeper 成為唯一元數據中心(社區規劃)

2.1元數據核心載體:ControllerContext 類

類定義路徑:
core/src/main/scala/kafka/controller/ControllerContext.scala
數據結構概覽:
該類封裝 17 項元數據,以下是核心字段解析(按重要性排序):
在這里插入圖片描述

2.2核心元數據深度解析

2.2.1 ControllerStats

private[controller] class ControllerStats extends KafkaMetricsGroup {// 統計每秒發生的Unclean Leader選舉次數val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)// Controller事件通用的統計速率指標的方法val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>state.rateAndTimeMetricName.map { metricName =>state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))}}.toMap
}

其中,前者是計算Controller每秒執行的Unclean Leader選舉數量,通常情況下,執行Unclean Leader選舉可能造成數據丟失,一般不建議開啟它。一旦開啟,你就需要時刻關注這個監控指標的值,確保Unclean Leader選舉的速率維持在一個很低的水平,否則會出現很多數據丟失的情況。

后者是統計所有Controller狀態的速率和時間信息,單位是毫秒。當前,Controller定義了很多事件,比如,TopicDeletion是執行主題刪除的Controller事件、ControllerChange是執行Controller重選舉的事件。ControllerStats的這個指標通過在每個事件名后拼接字符串RateAndTimeMs的方式,為每類Controller事件都創建了對應的速率監控指標。

2.2.2 offlinePartitionCount
該字段統計集群中所有離線或處于不可用狀態的主題分區數量。所謂的不可用狀態,就是我最開始舉的例子中“Leader=-1”的情況。

ControllerContext中的updatePartitionStateMetrics方法根據給定主題分區的當前狀態和目標狀態,來判斷該分區是否是離線狀態的分區。如果是,則累加offlinePartitionCount字段的值,否則遞減該值。方法代碼如下:

// 更新offlinePartitionCount元數據
private def updatePartitionStateMetrics(partition: TopicPartition, currentState: PartitionState,targetState: PartitionState): Unit = {// 如果該主題當前并未處于刪除中狀態if (!isTopicDeletionInProgress(partition.topic)) {// targetState表示該分區要變更到的狀態// 如果當前狀態不是OfflinePartition,即離線狀態并且目標狀態是離線狀態// 這個if語句判斷是否要將該主題分區狀態轉換到離線狀態if (currentState != OfflinePartition && targetState == OfflinePartition) {offlinePartitionCount = offlinePartitionCount + 1// 如果當前狀態已經是離線狀態,但targetState不是// 這個else if語句判斷是否要將該主題分區狀態轉換到非離線狀態} else if (currentState == OfflinePartition && targetState != OfflinePartition) {offlinePartitionCount = offlinePartitionCount - 1}}
}

該方法首先要判斷,此分區所屬的主題當前是否處于刪除操作的過程中。如果是的話,Kafka就不能修改這個分區的狀態,那么代碼什么都不做,直接返回。否則,代碼會判斷該分區是否要轉換到離線狀態。如果targetState是OfflinePartition,那么就將offlinePartitionCount值加1,畢竟多了一個離線狀態的分區。相反地,如果currentState是offlinePartition,而targetState反而不是,那么就將offlinePartitionCount值減1。

2.2.3 shuttingDownBrokerIds
顧名思義,該字段保存所有正在關閉中的Broker ID列表。當Controller在管理集群Broker時,它要依靠這個字段來甄別Broker當前是否已關閉,因為處于關閉狀態的Broker是不適合執行某些操作的,如分區重分配(Reassignment)以及主題刪除等。
另外,Kafka必須要為這些關閉中的Broker執行很多清掃工作,Controller定義了一個onBrokerFailure方法,它就是用來做這個的。代碼如下:

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {info(s"Broker failure callback for ${deadBrokers.mkString(",")}")// deadBrokers:給定的一組已終止運行的Broker Id列表// 更新Controller元數據信息,將給定Broker從元數據的replicasOnOfflineDirs中移除deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)// 找出這些Broker上的所有副本對象val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))if (deadBrokersThatWereShuttingDown.nonEmpty)info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")// 執行副本清掃工作val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)onReplicasBecomeOffline(allReplicasOnDeadBrokers)// 取消這些Broker上注冊的ZooKeeper監聽器unregisterBrokerModificationsHandler(deadBrokers)
}

該方法接收一組已終止運行的Broker ID列表,首先是更新Controller元數據信息,將給定Broker從元數據的replicasOnOfflineDirs和shuttingDownBrokerIds中移除,然后為這組Broker執行必要的副本清掃工作,也就是onReplicasBecomeOffline方法做的事情。

該方法主要依賴于分區狀態機和副本狀態機來完成對應的工作。在后面的課程中,我們會專門討論副本狀態機和分區狀態機,這里你只要簡單了解下它要做的事情就行了。后面等我們學完了這兩個狀態機之后,你可以再看下這個方法的具體實現原理。
這個方法的主要目的是把給定的副本標記成Offline狀態,即不可用狀態。具體分為以下這幾個步驟:

  1. 利用分區狀態機將給定副本所在的分區標記為Offline狀態;
  2. 將集群上所有新分區和Offline分區狀態變更為Online狀態;
  3. 將相應的副本對象狀態變更為Offline。

2.2.4 liveBrokers
該字段保存當前所有運行中的Broker對象。每個Broker對象就是一個的三元組。ControllerContext中定義了很多方法來管理該字段,如addLiveBrokersAndEpochs、removeLiveBrokers和updateBrokerMetadata等。我拿updateBrokerMetadata方法進行說明,以下是源碼:

def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {liveBrokers -= oldMetadataliveBrokers += newMetadata}

每當新增或移除已有Broker時,ZooKeeper就會更新其保存的Broker數據,從而引發Controller修改元數據,也就是會調用updateBrokerMetadata方法來增減Broker列表中的對象。

2.2.5 liveBrokerEpochs
該字段保存所有運行中Broker的Epoch信息。Kafka使用Epoch數據防止Zombie Broker,即一個非常老的Broker被選舉成為Controller。
另外,源碼大多使用這個字段來獲取所有運行中Broker的ID序號,如下面這個方法定義的那樣:
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet – shuttingDownBrokerIds

liveBrokerEpochs的keySet方法返回Broker序號列表,然后從中移除關閉中的Broker序號,剩下的自然就是處于運行中的Broker序號列表了。

2.2.6 epoch & epochZkVersion
這兩個字段一起說,因為它們都有“epoch”字眼,放在一起說,可以幫助你更好地理解兩者的區別。epoch實際上就是ZooKeeper中/controller_epoch節點的值,你可以認為它就是Controller在整個Kafka集群的版本號,而epochZkVersion實際上是/controller_epoch節點的dataVersion值。

Kafka使用epochZkVersion來判斷和防止Zombie Controller。這也就是說,原先在老Controller任期內的Controller操作在新Controller不能成功執行,因為新Controller的epochZkVersion要比老Controller的大。
另外,你可能會問:“這里的兩個Epoch和上面的liveBrokerEpochs有啥區別呢?”實際上,這里的兩個Epoch值都是屬于Controller側的數據,而liveBrokerEpochs是每個Broker自己的Epoch值。

2.2.7 allTopics
該字段保存集群上所有的主題名稱。每當有主題的增減,Controller就要更新該字段的值。

比如Controller有個processTopicChange方法,從名字上來看,它就是處理主題變更的。我們來看下它的代碼實現,我把主要邏輯以注釋的方式標注了出來:

private def processTopicChange(): Unit = {if (!isActive) return // 如果Contorller已經關閉,直接返回val topics = zkClient.getAllTopicsInCluster(true) // 從ZooKeeper中獲取當前所有主題列表val newTopics = topics -- controllerContext.allTopics // 找出當前元數據中不存在、ZooKeeper中存在的主題,視為新增主題val deletedTopics = controllerContext.allTopics -- topics // 找出當前元數據中存在、ZooKeeper中不存在的主題,視為已刪除主題controllerContext.allTopics = topics // 更新Controller元數據// 為新增主題和已刪除主題執行后續處理操作registerPartitionModificationsHandlers(newTopics.toSeq)val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)addedPartitionReplicaAssignment.foreach {case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty)onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)}

2.2.8 partitionAssignments
該字段保存所有主題分區的副本分配情況。在我看來,這是Controller最重要的元數據了。事實上,你可以從這個字段衍生、定義很多實用的方法,來幫助Kafka從各種維度獲取數據。

比如,如果Kafka要獲取某個Broker上的所有分區,那么,它可以這樣定義:

partitionAssignments.flatMap {case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)}.map {case (partition, _) => new TopicPartition(topic, partition)}}.toSet

再比如,如果Kafka要獲取某個主題的所有分區對象,代碼可以這樣寫:

partitionAssignments.getOrElse(topic, mutable.Map.empty).map {case (partition, _) => new TopicPartition(topic, partition)}.toSet

實際上,這兩段代碼分別是ControllerContext.scala中partitionsOnBroker方法和partitionsForTopic兩個方法的主體實現代碼。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89474.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89474.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89474.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《尋北技術的全面剖析與應用前景研究報告》

一、引言 1.1 研究背景與意義 尋北,作為確定地理北極方向的關鍵技術,在眾多領域中扮演著舉足輕重的角色。在軍事領域,精確的尋北對于武器系統的瞄準、導彈的精確制導以及部隊的戰略部署都至關重要。例如,火炮在發射前需要精確尋…

深入比較 Gin 與 Beego:Go Web 框架的兩大選擇

引言 在 Go 語言生態系統中,Gin 和 Beego 是兩個非常受歡迎的 Web 框架。它們各自有著不同的設計理念和目標用戶群體。本文將對這兩個框架進行深入比較,并幫助你理解它們之間的區別,以便根據項目需求做出合適的選擇。 一、Gin 概述 Gin是一…

全新大模型開源,騰訊(int4能打DeepSeek) Vs 谷歌(2GB運行多模態)

大家好,我是 Ai 學習的老章 最近除了阿里 Qwen3 模型更新了圖片生成和處理能力,大家都可以玩轉吉卜力風格 還有幾個最近發布的大模型值得關注 1 是騰訊開源了 80B 混元 A13B 模型,亮點是精度無損的 int4 很能打 2 是谷歌開源的小參數 Gemm…

向量數據庫milvus中文全文檢索取不到數據的處理辦法

?檢查中文分詞配置? Milvus 2.5 支持原生中文全文檢索,但需顯式配置中文分詞器: 創建集合時指定分詞器類型為 chinese python schema.add_field(field_name"text", datatypeDataType.VARCHAR, max_length65535, enable_analyzerTrue, an…

Stable Diffusion 項目實戰落地:從0到1 掌握ControlNet 第一篇 打造光影字形的創意秘技

大家好呀,歡迎來到 AI造字工坊! 在這篇文章中,我們將帶領你走進一個神奇的世界——ControlNet。你可能聽說過它,但可能還沒摸清它的深奧之處。 今天,我們就來揭開它神秘的面紗,輕松帶你玩轉字形設計! 話說回來,相信大家對圖片生成、提示詞、放大操作、抽卡這些基本操…

從零用java實現 小紅書 springboot vue uniapp (12)實現分類篩選與視頻筆記功能

移動端演示 http://8.146.211.120:8081/#/ 管理端演示 http://8.146.211.120:8088/#/ 項目整體介紹及演示 前言 在前面的系列文章中,我們已經基本完成了小紅書項目的核心框架搭建和圖文筆記的發布、展示流程。為了豐富App的功能和用戶體驗,今天我們將在…

Python與Web3.py庫交互實踐

目錄 Python與Web3.py庫交互實踐引言:連接Python與區塊鏈的橋梁1. 環境配置與基礎連接1.1 安裝Web3.py1.2 連接以太坊節點2. 基礎區塊鏈交互2.1 賬戶與余額查詢2.2 創建并發送交易3. 智能合約交互3.1 加載和部署合約3.2 與已部署合約交互4. 高級功能實踐4.1 事件監聽4.2 與ERC…

《匯編語言:基于X86處理器》第6章 條件處理(2)

本章向程序員的匯編語言工具箱中引入一個重要的內容,使得編寫出來的程序具備作決策的功能。幾乎所有的程序都需要這種能力。首先,介紹布爾操作,由于能影響CPU狀態標志,它們是所有條件指令的核心。然后,說明怎樣使用演繹…

深度剖析NumPy核心函數reshape()

深度剖析NumPy核心函數reshape reshape()函數基礎概念reshape()函數語法與參數詳解reshape()函數使用示例基本的形狀重塑使用-1自動計算維度多維數組的形狀重塑不同order參數的效果 reshape()函數的應用場景數據預處理機器學習模型輸入算法實現 當我們使用np.array()創建好數組…

Linux平臺MinGW32/MinGW64交叉編譯完全指南:原理、部署與組件詳解

一、MinGW是什么?為什么需要交叉編譯? MinGW(Minimalist GNU for Windows)是一套在Linux上構建Windows應用程序的完整工具鏈。它允許開發者: 在Linux環境下編譯Windows可執行文件(.exe/.dll)避…

為什么我畫的頻譜圖和audacity、audition不一樣?

文章目錄 系列文章目錄 目錄 文章目錄 前言 一、問題引入 二、使用步驟 三、分析和改進 總結 前言 我們知道audacity和audition都有頻譜分析這個窗口,一般過程肯定是分幀加窗,fft變換然后呈現, 大體這個過程是沒問題的,但為什…

責任鏈模式 Go 語言實戰

責任鏈模式(Chain of Responsibility) 責任鏈模式是一種行為設計模式,它允許將請求沿著處理者鏈進行傳遞,直到有一個處理者能夠處理它。這個模式的主要目的是解耦請求的發送者和接收者,使得多個對象都有機會處理這個請…

使用開源項目youlai_boot 導入到ecplise 中出現很多錯誤

我是使用ecplise 導入得youlai_boot 這個項目,但是導入到ecplise 中一直出現報錯,然后各種maven clean 和maven install 以及update Maven 都沒有效果不知道怎么辦才好,怎么樣解決這個問題,原來是我本地的環境中沒有安裝 lombok.…

06_Americanas精益管理項目_數據分析

文章目錄 Americanas精益管理項目_數據分析(一)思維方法1、數據分析思維2、零售行業-萬能「人貨場」分析框架(二)商品分析1、品類銷量分析2、銷量趨勢分析3、帕累托法則分析4、商品TopN分析(三)用戶分析(四)場景分析Americanas精益管理項目_數據分析 數據分析與數據開…

ES6從入門到精通:類與繼承

ES6 類的基本概念 ES6 引入了基于類的面向對象編程語法,通過 class 關鍵字定義類。類可以包含構造函數、方法和屬性。 class Person {constructor(name) {this.name name;}greet() {console.log(Hello, ${this.name}!);} }const person new Person(Alice); pers…

【經驗】新版Chrome中Proxy SwitchyOmega2已實效,改為ZeroOmega

1、問題描述 手欠更新了 Chrome 導致無法“上網”,原因是 Proxy SwitchyOmega2 已實效。 2、解決方法 2.1 下載 新版Chrome中Proxy SwitchyOmega2已實效,改為ZeroOmega; 想方設法去下載 ZeroOmega 的crx包,最新的為&#xff1…

在windows上設置python的環境

安裝好了python,再具體說下python語言的相關環境。 #01 關于Python Python 是一個高級別的、邊運行邊解釋的、動態類型的編程語言,以簡潔的語法、強大的功能和豐富的資源庫而聞名。廣泛應用于 Web 開發、數據分析、人工智能、自動化腳本等多個領域。 目前 Python 語言有兩…

3D 建模與點云建模:從虛擬構建到實景復刻的數字孿生雙引擎

在數字化浪潮席卷全球的當下,3D 建模與點云建模如同數字世界的左膀右臂,一個以抽象化的創意構建虛擬藍圖,一個以高精度的實景數據復刻現實世界。它們不僅深刻重塑了影視娛樂、工業制造、建筑設計等傳統領域,更成為數字孿生技術蓬勃…

智能檢測原理和架構

大家讀完覺得有幫助記得關注和點贊!!! 智能檢測系統基于AI和大數據分析技術,通過主動感知、行為建模與實時響應構建動態防御體系。其核心在于將傳統規則匹配升級為**多模態威脅認知**,實現對新型攻擊(如AI…

2025年6月個人工作生活總結

本文為 2025年6月工作生活總結。 研發編碼 某國產操作系統curl下載sftp服務器文件問題記錄 場景: 某國產系統curl版本信息: # curl --version curl 7.71.1 (x86_64-koji-linux-gnu) libcurl/7.71.1 OpenSSL/1.1.1f-fips zlib/1.2.11 brotli/1.0.7 li…