#作者:張桐瑞
文章目錄
- 1 生產案例:Controller 選舉在故障恢復中的關鍵作用
- 1.1 問題背景
- 1.2 核心操作原理:
- 2 Controller 元數據全景:從 ZooKeeper 到內存的數據鏡像
- 2.1元數據核心載體:ControllerContext 類
- 2.2核心元數據深度解析
1 生產案例:Controller 選舉在故障恢復中的關鍵作用
1.1 問題背景
某 Kafka 集群部分核心主題分區一直處于“不可用”狀態,通過kafka-toics.sh命令查看,發現分區leader一直處于Leader=-1的 “不可用” 狀態。嘗試重啟舊 Leader 所在 Broker 無效,并且由于是生產環境,不能通過重啟整個集群來隨意重啟,這畢竟是一個非常缺乏計劃性的事情。
如何在避免重啟集群的情況下,干掉已有Controller并執行新的Controller選舉呢?答案就在源碼中的ControllerZNode.path上,也就是ZooKeeper的/controller節點。倘若我們手動刪除/controller節點,Kafka集群就會觸發Controller選舉。于是,我們馬上實施這個方案,效果出奇得好:之前的受損分區全部恢復正常,業務數據得以正常生產和消費。
1.2 核心操作原理:
Controller 選舉后會觸發集群元數據全量同步(“重刷” 分區狀態),原理如下:
- ZooKeeper 的/controller節點存儲當前 Controller 信息,刪除該節點會觸發集群重新選舉 Controller
- 新 Controller 上任后通過UpdateMetadataRequest向所有 Broker 同步最新元數據,修復分區狀態
注意事項:此操作需謹慎,生產環境建議通過 API 或工具觸發元數據更新,避免直接操作 ZooKeeper 節點。
2 Controller 元數據全景:從 ZooKeeper 到內存的數據鏡像
Kafka Controller 作為集群元數據的 “真理之源副本”,其核心作用是:
- 緩存 ZooKeeper 元數據,避免 Broker 直接與 ZooKeeper 交互
- 未來將替代 ZooKeeper 成為唯一元數據中心(社區規劃)
2.1元數據核心載體:ControllerContext 類
類定義路徑:
core/src/main/scala/kafka/controller/ControllerContext.scala
數據結構概覽:
該類封裝 17 項元數據,以下是核心字段解析(按重要性排序):
2.2核心元數據深度解析
2.2.1 ControllerStats
private[controller] class ControllerStats extends KafkaMetricsGroup {// 統計每秒發生的Unclean Leader選舉次數val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)// Controller事件通用的統計速率指標的方法val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>state.rateAndTimeMetricName.map { metricName =>state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))}}.toMap
}
其中,前者是計算Controller每秒執行的Unclean Leader選舉數量,通常情況下,執行Unclean Leader選舉可能造成數據丟失,一般不建議開啟它。一旦開啟,你就需要時刻關注這個監控指標的值,確保Unclean Leader選舉的速率維持在一個很低的水平,否則會出現很多數據丟失的情況。
后者是統計所有Controller狀態的速率和時間信息,單位是毫秒。當前,Controller定義了很多事件,比如,TopicDeletion是執行主題刪除的Controller事件、ControllerChange是執行Controller重選舉的事件。ControllerStats的這個指標通過在每個事件名后拼接字符串RateAndTimeMs的方式,為每類Controller事件都創建了對應的速率監控指標。
2.2.2 offlinePartitionCount
該字段統計集群中所有離線或處于不可用狀態的主題分區數量。所謂的不可用狀態,就是我最開始舉的例子中“Leader=-1”的情況。
ControllerContext中的updatePartitionStateMetrics方法根據給定主題分區的當前狀態和目標狀態,來判斷該分區是否是離線狀態的分區。如果是,則累加offlinePartitionCount字段的值,否則遞減該值。方法代碼如下:
// 更新offlinePartitionCount元數據
private def updatePartitionStateMetrics(partition: TopicPartition, currentState: PartitionState,targetState: PartitionState): Unit = {// 如果該主題當前并未處于刪除中狀態if (!isTopicDeletionInProgress(partition.topic)) {// targetState表示該分區要變更到的狀態// 如果當前狀態不是OfflinePartition,即離線狀態并且目標狀態是離線狀態// 這個if語句判斷是否要將該主題分區狀態轉換到離線狀態if (currentState != OfflinePartition && targetState == OfflinePartition) {offlinePartitionCount = offlinePartitionCount + 1// 如果當前狀態已經是離線狀態,但targetState不是// 這個else if語句判斷是否要將該主題分區狀態轉換到非離線狀態} else if (currentState == OfflinePartition && targetState != OfflinePartition) {offlinePartitionCount = offlinePartitionCount - 1}}
}
該方法首先要判斷,此分區所屬的主題當前是否處于刪除操作的過程中。如果是的話,Kafka就不能修改這個分區的狀態,那么代碼什么都不做,直接返回。否則,代碼會判斷該分區是否要轉換到離線狀態。如果targetState是OfflinePartition,那么就將offlinePartitionCount值加1,畢竟多了一個離線狀態的分區。相反地,如果currentState是offlinePartition,而targetState反而不是,那么就將offlinePartitionCount值減1。
2.2.3 shuttingDownBrokerIds
顧名思義,該字段保存所有正在關閉中的Broker ID列表。當Controller在管理集群Broker時,它要依靠這個字段來甄別Broker當前是否已關閉,因為處于關閉狀態的Broker是不適合執行某些操作的,如分區重分配(Reassignment)以及主題刪除等。
另外,Kafka必須要為這些關閉中的Broker執行很多清掃工作,Controller定義了一個onBrokerFailure方法,它就是用來做這個的。代碼如下:
private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {info(s"Broker failure callback for ${deadBrokers.mkString(",")}")// deadBrokers:給定的一組已終止運行的Broker Id列表// 更新Controller元數據信息,將給定Broker從元數據的replicasOnOfflineDirs中移除deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)// 找出這些Broker上的所有副本對象val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))if (deadBrokersThatWereShuttingDown.nonEmpty)info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")// 執行副本清掃工作val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)onReplicasBecomeOffline(allReplicasOnDeadBrokers)// 取消這些Broker上注冊的ZooKeeper監聽器unregisterBrokerModificationsHandler(deadBrokers)
}
該方法接收一組已終止運行的Broker ID列表,首先是更新Controller元數據信息,將給定Broker從元數據的replicasOnOfflineDirs和shuttingDownBrokerIds中移除,然后為這組Broker執行必要的副本清掃工作,也就是onReplicasBecomeOffline方法做的事情。
該方法主要依賴于分區狀態機和副本狀態機來完成對應的工作。在后面的課程中,我們會專門討論副本狀態機和分區狀態機,這里你只要簡單了解下它要做的事情就行了。后面等我們學完了這兩個狀態機之后,你可以再看下這個方法的具體實現原理。
這個方法的主要目的是把給定的副本標記成Offline狀態,即不可用狀態。具體分為以下這幾個步驟:
- 利用分區狀態機將給定副本所在的分區標記為Offline狀態;
- 將集群上所有新分區和Offline分區狀態變更為Online狀態;
- 將相應的副本對象狀態變更為Offline。
2.2.4 liveBrokers
該字段保存當前所有運行中的Broker對象。每個Broker對象就是一個的三元組。ControllerContext中定義了很多方法來管理該字段,如addLiveBrokersAndEpochs、removeLiveBrokers和updateBrokerMetadata等。我拿updateBrokerMetadata方法進行說明,以下是源碼:
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {liveBrokers -= oldMetadataliveBrokers += newMetadata}
每當新增或移除已有Broker時,ZooKeeper就會更新其保存的Broker數據,從而引發Controller修改元數據,也就是會調用updateBrokerMetadata方法來增減Broker列表中的對象。
2.2.5 liveBrokerEpochs
該字段保存所有運行中Broker的Epoch信息。Kafka使用Epoch數據防止Zombie Broker,即一個非常老的Broker被選舉成為Controller。
另外,源碼大多使用這個字段來獲取所有運行中Broker的ID序號,如下面這個方法定義的那樣:
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet – shuttingDownBrokerIds
liveBrokerEpochs的keySet方法返回Broker序號列表,然后從中移除關閉中的Broker序號,剩下的自然就是處于運行中的Broker序號列表了。
2.2.6 epoch & epochZkVersion
這兩個字段一起說,因為它們都有“epoch”字眼,放在一起說,可以幫助你更好地理解兩者的區別。epoch實際上就是ZooKeeper中/controller_epoch節點的值,你可以認為它就是Controller在整個Kafka集群的版本號,而epochZkVersion實際上是/controller_epoch節點的dataVersion值。
Kafka使用epochZkVersion來判斷和防止Zombie Controller。這也就是說,原先在老Controller任期內的Controller操作在新Controller不能成功執行,因為新Controller的epochZkVersion要比老Controller的大。
另外,你可能會問:“這里的兩個Epoch和上面的liveBrokerEpochs有啥區別呢?”實際上,這里的兩個Epoch值都是屬于Controller側的數據,而liveBrokerEpochs是每個Broker自己的Epoch值。
2.2.7 allTopics
該字段保存集群上所有的主題名稱。每當有主題的增減,Controller就要更新該字段的值。
比如Controller有個processTopicChange方法,從名字上來看,它就是處理主題變更的。我們來看下它的代碼實現,我把主要邏輯以注釋的方式標注了出來:
private def processTopicChange(): Unit = {if (!isActive) return // 如果Contorller已經關閉,直接返回val topics = zkClient.getAllTopicsInCluster(true) // 從ZooKeeper中獲取當前所有主題列表val newTopics = topics -- controllerContext.allTopics // 找出當前元數據中不存在、ZooKeeper中存在的主題,視為新增主題val deletedTopics = controllerContext.allTopics -- topics // 找出當前元數據中存在、ZooKeeper中不存在的主題,視為已刪除主題controllerContext.allTopics = topics // 更新Controller元數據// 為新增主題和已刪除主題執行后續處理操作registerPartitionModificationsHandlers(newTopics.toSeq)val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)addedPartitionReplicaAssignment.foreach {case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty)onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)}
2.2.8 partitionAssignments
該字段保存所有主題分區的副本分配情況。在我看來,這是Controller最重要的元數據了。事實上,你可以從這個字段衍生、定義很多實用的方法,來幫助Kafka從各種維度獲取數據。
比如,如果Kafka要獲取某個Broker上的所有分區,那么,它可以這樣定義:
partitionAssignments.flatMap {case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)}.map {case (partition, _) => new TopicPartition(topic, partition)}}.toSet
再比如,如果Kafka要獲取某個主題的所有分區對象,代碼可以這樣寫:
partitionAssignments.getOrElse(topic, mutable.Map.empty).map {case (partition, _) => new TopicPartition(topic, partition)}.toSet
實際上,這兩段代碼分別是ControllerContext.scala中partitionsOnBroker方法和partitionsForTopic兩個方法的主體實現代碼。