1. 相關概念
1.1 代理:Broker
使用Kafka前,我們都會啟動Kafka服務進程,這里的Kafka服務進程我們一般會稱之為Kafka Broker 或 Kafka Server。因為Kafka是分布式消息系統所以再實際的生產環境中,是需要多個服務進程形成集群提供消息服務的。所以每一個服務節點都是一個broker,而且在 Kafka 集群中,為了區分不同的服務節點,每一個 broker都應該有一個不重復的全局ID,稱之為 broker.id,這個 ID 可以在 kafka 軟件的配置文件 server.properties 中進行配置。
############################# Server Basics ######################
# The id of the broker. This must be set to a unique integer for each broker
# 集群ID
broker.id=0
咱們的 Kafka 集群中每一個節點都有自己的ID,整數且唯一。
主機 | kafka-broker1 | kafka-broker2 | kafka-broker3 |
---|---|---|---|
broker.id | 1 | 2 | 3 |
1.2 控制器:Controller
Kafka是分布式消息傳輸系統,所以存在多個 Broker 服務節點,但是它的軟件架構采用的是分布式系統中比較常見的主從(Master - Slave) 架構,也就說需要從多個 Broker 中找到一個用于管理整個 Kafka 集群的 Master 節點,這個節點,我們就稱之為 Controller。它是 Apache Kafka 的核心組件非常重要。它的主要作用在 Apache ZooKeeper 的幫助下管理和協調控制整個 Kafka 集群。
如果在運行過程中,Controller 節點出現了故障,那么Kafka 會依托于 ZooKeeper選舉其他的節點作為新的 Controller,讓Kafka 集群實現高可用。
Kafka 集群中 Controler 的基本功能:
Broker 管理
監聽 /brokers/ids 節點相關的變化:
- Broker 輸了增加或減少的變化
- Broker 對應的數據變化
Topic 管理
- 新增:監聽 /brokers/ids 節點相關的變化
- 修改:監聽 /brokers/ids 節點相關的變化
- 刪除:監聽 /admin/delete_topics 節點相關的變化
Partation 管理
- 監聽 /admin/reassign_partitions節點相關的變化
- 監聽 /isr_change_notification節點相關的變化
- 監聽 /preferred_replica_election節點相關的變化
數據服務
啟動分區狀態機和副本狀態機
2. 啟動ZooKeeper
Kafka 集群中含有多個服務節點,而分布式系統中經典的主從(Master-Slave)架構就要求從多個服務節點中找一個節點作為集群管理Master,Kafka 集群中的這個Master,我們稱之為集群控制器 Controller。
如果此時Controller節點出現故障,它就不能再管理集群功能,那么其他的Slave節點該如何是好呢?
如果從剩余的兩個Slave節點中選一個節點出來作為新的集群控制器是不是一個不錯的方案,我們將這個選擇的過程稱之為:選舉(elect)。方案是不錯,但是問題就在于選哪一個Slave節點呢?不同的軟件實現類似的選舉功能都會有一些選舉算法,而Kafka是依賴于ZooKeeper軟件實現Broker節點選舉功能。
ZooKeeper 如何實現 Kafka 的節點選舉呢?這就要說到我們用到 ZooKeeper 的3個功能:
- 一個是在 ZooKeeper軟件中創建節點 Node,創建一個 Node時,我們會設定這個節點時持久化創建,還是臨時創建,就是Node 一旦創建后會一直存在,而臨時創建,是根據當前的客戶端連接創建的臨時節點 Node,一旦客戶端連接斷開,那么這個臨時節點 Node 也會被自動刪除,所以這樣的節點稱之為臨時節點。
- ZooKeeper 節點是不允許有重復的,所以多個客戶端創建同一個節點,只能有一個創建成功。
- 另外一個是客戶端可以在 ZooKeeper 的節點上增加監聽器,用于監聽節點的狀態變化,一旦監聽的節點狀態發生變化,那么監聽器就會觸發響應,實現待監聽功能。
Kafka 是如何利用 ZooKeeper 實現 Controller 節點的選舉的:
1)第一次啟動Kafka 集群時,會同時啟動多個 Broker 節點,每一個 Broker 節點就會連接 ZooKeeper,并嘗試創建一個臨時節點 /controller
2)因為 ZooKeeper 中一個系欸但不允許重復創建,所以多個 Broker 節點,最終只能有一個 Broker 節點可以創建成功,那么這個創建成功的 Broker 節點聚會自動作為 Kafka 集群控制節點,用于管理整個 Kafa 集群。
3)沒有選舉成功的其他 Slave 節點會創建 Node 監聽器,用于監聽 /controller 節點的狀態變化。
4)一旦Controller 節點出現故障或掛掉了,那么對應的 ZooKeeper 客戶端連接就會中斷。ZooKeeper 中的 /controller 節點就會自動被刪除,而其他那些 Slave 節點因為增加了監聽器,所以當監聽到 /controller 節點被刪除后,就會馬上向 ZooKeeper 發出創建 /controller 節點的請求,一旦創建成功,那么該Broker 就變成了新的 Controller 節點了。
現在我們能明白啟動 Kafka 集群之前為什么要先啟動 ZooKeeper 集群了吧。就說因為 ZooKeeper 可以協助 Kafka 進行集群管理。
3. 啟動Kafka
ZooKeeper 已經啟動好了,那我們現在可以啟動多個 Kafka Broker節點構建 Kafka 集群了。構建的過程中,每一個 Broker 節點就是一個 Java 進程,而在這個進程中,有很多需要 提前準備好,并進行初始化的內部組件對象。
3.1 初始化 ZooKeeper
Kafka Broker 啟動時,首先會創建 ZooKeeper 客戶端(KafkaZkClinet),用于 ZooKeeper 進行交互。客戶端對象創建完成后,會通過該客戶端對象向 ZooKeeper 發送創建 Node 的請求,注意,這里創建的Node都是持久化Node。
節點 | 類型 | 說明 |
---|---|---|
/admin/delete_topics | 持久化節點 | 配置需要刪除的topic,因為刪除過程中,可能broker下線,或執行失敗,那么就需要在broker重新上線后,根據當前節點繼續刪除操作,一旦topic所有的分區數據全部刪除,那么當前節點的數據才會進行清理 |
/brokers/ids | 持久化節點 | 服務節點ID標識,只要broker啟動,那么就會在當前節點中增加子節點,brokerID不能重復 |
/brokers/topics | 持久化節點 | 服務節點中的主題詳細信息,包括分區,副本 |
/brokers/seqid | 持久化節點 | seqid主要用于自動生產brokerId |
/config/changes | 持久化節點 | kafka的元數據發生變化時,會向該節點下創建子節點。并寫入對應信息 |
/config/clients | 持久化節點 | 客戶端配置,默認為空 |
/config/brokers | 持久化節點 | 服務節點相關配置,默認為空 |
/config/ips | 持久化節點 | IP配置,默認為空 |
/config/topics | 持久化節點 | 主題配置,默認為空 |
/config/users | 持久化節點 | 用戶配置,默認為空 |
/consumers | 持久化節點 | 消費者節點,用于記錄消費者相關信息 |
/isr_change_notification | 持久化節點 | ISR列表發生變更時候的通知,在kafka當中由于存在ISR列表變更的情況發生,為了保證ISR列表更新的及時性,定義了isr_change_notification這個節點,主要用于通知Controller來及時將ISR列表進行變更。 |
/latest_producer_id_block | 持久化節點 | 保存PID塊,主要用于能夠保證生產者的任意寫入請求都能夠得到響應。 |
/log_dir_event_notification | 持久化節點 | 主要用于保存當broker當中某些數據路徑出現異常時候,例如磁盤損壞,文件讀寫失敗等異常時候,向ZooKeeper當中增加一個通知序號,Controller節點監聽到這個節點的變化之后,就會做出對應的處理操作 |
/cluster/id | 持久化節點 | 主要用于保存kafka集群的唯一id信息,每個kafka集群都會給分配要給唯一id,以及對應的版本號 |
3.2 初始化服務
Kafka Broker 中有很多的服務對象,用于實現內部管理和外部通信操作。
3.2.1 啟動任務調度器
每一個Broker 在啟動時都會創建內部調度器(KafkaScheduler) 并啟動,用于完成節點內部的工作任務。底層就是Java中的定時任務線程池。
3.2.2 創建數據管理器
每一個 Broker 在啟動時都會創建數據管理器(LogManager),用于接收到消息后,完成后續的數據創建,查詢,清理等處理。
3.2.3 創建遠程數據管理器
每一個 Broker 在啟動時都會創建遠程數據管理器(RemoteLogManager),用于和其他 Broker 節點進行數據狀態同步。
3.2.4 創建副本管理器
每一個 Broker 在啟動時都會創建副本管理器(ReplicaManager),用于對主題的副本進行處理。
3.2.5 創建 ZK 元數據緩存
每一個 Broker 在啟動時會將 ZK 的關于 Kafka 的元數據進行緩存,創建元數據對象(ZKMetadataCache)
3.2.6 創建 Broker 通信對象
每一個 Broker 在啟動時會創建 Broker 之間的通道管理器對象(BrokerToControllerChannelManager),用于管理Broker 和 Controller 之間的通信。
3.2.7 創建網絡通信對象
每一個 Broker 在啟動時會創建自己的網絡通信對象(SockerServer),用于和其他 Broker 之間的通信,其中包含了 Java 用于 NIO 通信的 Channel、Selector 對象。
3.2.8 注冊 Broker 節點
Broker啟動時,會通過 ZK 客戶端對象向 ZK 注冊當前的 Broker 節點ID,注冊后,創建的 ZK節點為臨時節點。如果當前 Broker 的 ZK 客戶端斷開和ZK的連接,注冊的節點會被刪除。
3.3 啟動控制器
控制器(KafkaController)是每一個 Broker 啟動時都會創建的核心對象,用于和ZK 之間建立連接并申請自己為整個 Kafka 集群的 Master 管理者。如果申請成功,那么會完成管理者的初始化操作,并建立和其他 Broker 之間的數據通道接收各種事件,進行封裝后交給事件管理器,并定義了 process 方法,用于真正處理各類事件。
3.3.1 初始化通道管理器
創建通道管理器(ControllerChannelManager),該管理器維護了 Controlelr 和 集群所有Broker 節點之間的網絡連接,并向 Broker 發送控制類請求及接受響應。
3.3.2 初始化事件管理器
創建事件管理器(ControllerEventManager)維護了 Controller 和集群所有Broker節點之間的網絡連接,并向 Broker 發送控制類請求及接受響應。
3.3.3 初始化狀態管理器
創建狀態管理器(ControllerChangerHandler)可以監聽 /controller 節點的操作,一旦節點創建(ControllerChange),刪除(Reelect),數據發生變化(ControllerChange),那么監聽后執行相應的處理。
3.3.4 啟動控制器
控制器對象啟動后,會向事件管理器發送 Startup 事件,事件處理現場接收到事件后會通過 ZK 客戶端向 ZK 申請 /controller 節點,申請成功后,執行當前節點成為 Controller 的一系列操作。主要是注冊各類 ZooKeeper 監聽器、刪除日志路徑變更和 ISR 副本變更通知事件、啟動 Controller 通道管理器,以及啟動副本狀態機和分區狀態機。