文章目錄
- 概述
- 控制器的選舉與故障恢復
- 控制器的選舉
- 故障恢復
- 優雅關閉
- 分區leader的選舉
概述
在Kafka集群中會有一個或多個broker,其中有一個broker會被選舉為控制器(Kafka Controler),它負責管理整個集群中所有分區和副本的狀態。當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責分區的重新分配。
這里主要講控制器的功能與實現,優雅關閉部分會簡單概括。
控制器的選舉與故障恢復
控制器的選舉
Kafka中的控制器選舉工作依賴于ZooKeeper,成功競選為控制器的broker會在ZooKeeper中創建/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:
(version":1,brokerid":0,“timestamp":”1529210278988”)
其中version在目前版本中固定為1,brokeria表示成為控制器的broker的ia編號,timestamp 表示競選成為控制器時的時間戳。
在任意時刻,集群中有且僅有一個控制器。每個broker啟動的時候會去嘗試讀取/controller節點的brokeria的值,如果讀取到brokerid的值不為-1,則表示已經有其他broker節點成功競選為控制器,所以當前broker就會放棄競選:如果ZooKeeper中不存在/controller,或者這個節點中的數據異常,那么就會嘗試去創建/controller節點。當前broker去創建節點的時候,也有可能其他broker同時去嘗試創建這個節點,只有創建成功的那個broker才會成為控制器,而創建失敗的broker競選失敗。每個broker都會在內存中保存當前控制器的brokerid值,這個值可以標識為activeControllerid。
ZooKeeper中還有一個與控制器有關的/controller_epoch節點,這個節點是持久(PERSISTENT)節點,節點中存放的是一個整型的controller_epoch值。controller_epoch用于記錄控制器發生變更的次數,即記錄當前的控制器是第幾代控制器,我們也可以稱之為“控制器的紀元”
controller_epch的初始值為1,即集群中第一個控制器的紀元為1,當控制器發生變更時,每選出一個新的控制器就將該字段值加1。每個和控制器交互的請求都會攜帶controller_epoch這個字段,如果請求的controller_epoch值小于內存中的controller_epoch值,則認為這個請求是向已經過期的控制器所發送的請求,那么這個請求會被認定為無效的請求。如果請求的controller_epoch值大于內存中的controller_epoch值,那么說明已經有新的控制器當選了。由此可見,Kafka通過controller_epoch來保證控制器的唯性,進而保證相關操作的一致性。
具備控制器身份的broker需要比其他普通的broker多一份職責,具體細節如下:
-
監聽分區相關的變化。為ZooKeeper中的/admin/reassign partitions節點注 冊
PartitionReassignmentHandler,用來處理分區重分配的動作。為ZooKeeper中的/isr change notification節點注冊IsrChangeNotificetionHandler,用來處理ISR集合變更的動作。為ZooKeeper中中的/admin/preferred-replica-election節點添加
PreferredReplicaElectionHandler,用來處理優先副本的選舉動作。 -
監聽主題相關的變化。為ZooKeeper中的/brokers/topics節點添加
TopicChangeHandler,用來處理主題增減的變化;為ZooKeeper中的/admin/ delete
topics節點添加TopicDeletionHandler,月用來處理刪除主題的動作。 -
監聽broker相關的變化。為ZooKeeper中的/brokers/ids節點添加BrokerChangeHandler,用來處理broker增減的變化。
-
從ZooKeeper中讀取獲取當前所有與主題、分區及broker有關的信息并進行相應的管理。對所有主題對應的ZooKeeper中的/brokers/topics/節點添加PartitionModificationsHandler,用來監聽主題中的分區分配變化。
-
啟動并管理分區狀態機和副本狀態機。
-
更新集群的元數據信息。
-
如果參數auto.leader.rebalance.enable設置為true,貝則還會開啟一個名為
“auto-leader-rebalance-task”的定時任務來負責維護分區的優先副本的均衡。
控制器在選舉成功之后會讀取ZooKeeper中各個節點的數據來初始化上下文信息(ControllerContext),并且需要管理這些上下文信息。比如為某個主題增加了若干分區,控制器在負責創建這些分區的同時要更新上下文信息,并且需要將這些變更信息同步到其他普通的broker節點中。不管是監聽器觸發的事件,還是定時任務觸發的事件,或者是其他事件都會讀取或更新控制器中的上下文信息,那么這樣就會涉及多線程間的同步。如果單純使用鎖機制來實現,那么整體的性能會大打折扣。針對這一現象,Kafka的控制器使用單線程基于事件隊列的模型,將每個事件都做一層封裝,然后按照事件發生的先后順序暫存到LinkedBlockingQueue中,最后使用一個專用的線程(ControllerEventThread)按照FIFO(FirstInputFirstOutput,先入先出)的原則順序處理各個事件,這樣不需要鎖機制就可以在多線程間維護線程安全,具體可以參考下圖。
在Kafka的早期版本中,并沒有采用KafkaController這樣一個概念來對分區和副本的狀態進行管理,而是依賴于ZooKeeper,每個broker都會在ZooKeeper上為分區和副本注冊大量的監聽器(Watcher)。當分區或副本狀態變化時,會喚醒很多不必要的監聽器,這種嚴重依賴
ZooKeeper的設計會有腦裂、羊群效應,以及造成ZooKeeper過載的隱患(舊版的消費者客戶端存在同樣的問題)。在目前的新版本的設計中,只有KafkaController在ZooKeeper上注冊相應的監聽器,其他的broker極少需要再監聽ZooKeeper中的數據變化,這樣省去了很多不必要的麻煩。不過每個broker還是會對/controller節點添加監聽器,以此來監聽此節點的數據變化(ControllerChangeHandler)
故障恢復
當/controller節點的數據發生變化時,每個broker都會更新自身內存中保存的activeControllerlds如果broker在數據變更前是控制器,在數據變更后自身的brokerid值與新的 activeControllerId值不一致,那么就需要“退位”,關閉相應的資源,比如關閉狀態機、注銷相應的監聽器等。有可能控制器由于異常而下線,造成/controller這個臨時節點被自動刪除;也有可能是其他原因將此節點刪除了。
當/controller節點被刪除時,每個broker都會進行選舉,如果broker在節點被刪除前是控制器,那么在選舉前還需要有一個“退位”的動作。如果有特殊需要,則可以手動刪除/controller節點來觸發新一輪的選舉。當然關閉控制器所對應的broker,以及手動向/controller節點寫入新的brokerid的所對應的數據,同樣可以觸發新一輪的選舉。
優雅關閉
Kafka自身提供了一個腳本工具,就是存放在其bin目錄下的kafka-server-stop.sh,這個腳本的內容非常簡單,具體內容如下:
PIDS=S (ps ax I grep -i kafkal.Kafka’ Igrep java l grep -v grep 1 awk print S1)') if[-Z”SPIDS”】;then echo rNokafkaserver to stop' exit 1 else kill -S TERM SPIDS fi
以上腳本直接執行會存在執行不成功,這是因為與Kafka進程有關的輸出信息太長,所以kafka-server-stop.sh腳本在很多情況下并不會奏效。
可以通過修改計算機PAGE_SIZE的大小或者修改腳本內容,這里我們可以直接修改kafka-server-storsh腳本的內容,將其中的第一行命令修改如下:
PIDS=S(ps axIgrep -i 'kafka’i grep javalgrep-v greplawkitprint Si}'
即把“.Kafka”去掉,這樣在絕大多數情況下是可以奏效的。如果有極端情況,即使這樣也不能關閉,那么只需要按照以下兩個步驟就可以優雅地關閉Kafka的服務進程:
(1)獲取Kafka的服務進程號PIDS。可以使用Java中的jps命令或使用Linux系統中的ps命令來查看。
(2)使用kill-STERMSPIDS或kill-15 $PIDS的方式來關閉進程,注意千萬不要使用kill-9的方式。
為什么這樣關閉的方式會是優雅的?Kafka服務入口程序中有一個名為“kafka-shutdown-hock”的關閉鉤子,待Kafka進程捕獲終止信號的時候會執行這個關閉鉤子中的內容,其中除了正常關閉一些必要的資源,還會執行一個控制關閉C ControlledShutdown )的動作。使用ControlledShutdown的方式關閉Kafka有兩個優點:一是可以讓消息完全同步到磁盤上,在服務下次重新上線時不需要進行日志的恢復操作;二是ControllerShutdown在關閉服務之前,會對其上的leader副本進行證移,這樣就可以減少分區的不可用時間。這里更加詳細的分析可以查下其他人的博客。
當然這里是通過腳本進行優雅關閉,你也可以自己通過KafkaAdminClient自己寫一個優雅關閉的接口去執行。這里詳細的設計與實現可以查下其他博客。
分區leader的選舉
分區leader副本的選舉由控制器負責具體實施。當創建分區(創建主題或增加分區都有創建分區的動作)或分區上線(比如分區中原先的leader副本下線,此時分區需要選舉一個新的leader上線來對外提供服務)的時候都需要執行leader的選舉動作,對應的選舉策略為OfflinePartitionLeaderElectionStrategy。這種策略的基本思路是按照AR集合中副本的順序查找第一個存活的副本,并且這個副本在ISR集合中。一個分區的AR集合在分配的時候就被指定并且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分區的ISR集合中副本的順序可能會改變。注意這里是根據AR的順序而不是ISR的順序進行選舉的。
如果ISR集合中沒有可用的副本,那么此時還要再檢查一下所配置的unclean.leader.election.enable參數(默認值為false)。如果這個參數配置為true那么表示允許從非ISR列表中的選舉leader,從AR列表中找到第一個存活的副本即為leader。
那么哪些情況下會出現leader選舉呢?
-
當分區進行重分配的時候也需要執行leader的選舉動作,對應的選舉策略為ReassignPartitionLeaderElectionStrategy。這個選舉策略的思路比較簡單:從重分配的AR列表中找到第一個存活的副本,且這個副本在日前的ISR列表中。
-
當發生優先副本的選舉時,直接將優先副本設置為leader即可,AR集合中的第一個副本即為優先副本(PreferredReplicaPartitionLeaderElectionStrategy)
-
還有一種情況會發生leader的選舉,當某節點被優雅地關閉(也就是執行 ControlledShutdown)時,位于這個節點上的leader副本都會下線,所以與此對應的分區需要執行leader的選舉。與此對應的選舉策略(ControlledShutdownPartitionLeaderElectionStrategy)為:從AR列表中找到第一個存活的副本,且這個副本在日前的ISR列表中,與此同時還要確保這個副本不處于正在被關閉的節點上。