- 👏作者簡介:大家好,我是愛吃芝士的土豆倪,24屆校招生Java選手,很高興認識大家
- 📕系列專欄:Spring源碼、JUC源碼、Kafka原理
- 🔥如果感覺博主的文章還不錯的話,請👍三連支持👍一下博主哦
- 🍂博主正在努力完成2023計劃中:源碼溯源,一探究竟
- 📝聯系方式:nhs19990716,加我進群,大家一起學習,一起進步,一起對抗互聯網寒冬👀
文章目錄
- 安裝部署
- 安裝 zookeeper 集群
- 安裝 kafka 集群
- Kafka 運維監控
- Kafka-Eagle 簡介
- Kafka-Eagle 安裝
- 啟動 KafkaEagle
- 訪問 web 界面
- 命令行工具
- 概述
- topic 管理操作:kafka-topics
- 查看 topic 列
- 查看 topic 狀態信息
- 創建 topic
- 刪除 topic
- 增加分區數
- 動態配置 topic 參數
- 生產者:kafka-console-producer
- 消費者:kafka-console-consumer
- 消費組
- 消費位移的記錄
- 配置管理 kafka-config
- 動態配置 topic 參數
- kafka是如何做到可以動態修改配置的?
安裝部署
安裝 zookeeper 集群
配置zookeeper集群的核心就是,以下每個zookeeper都要有
vi zoo.cfg
dataDir=/opt/apps/data/zkdata
server.1=doitedu01:2888:3888
server.2=doitedu02:2888:3888
server.3=doitedu03:2888:3888
安裝 kafka 集群
核心操作如下:
vi server.properties#為依次增長的:0、1、2、3、4,集群中唯一 id
broker.id=0# 數據存儲的?錄
# 每一個broker都把自己所管理的數據存儲在自己的本地磁盤
log.dirs=/opt/data/kafkadata#底層存儲的數據(日志)留存時長(默認 7 天)
log.retention.hours=168#底層存儲的數據(日志)留存量(默認 1G)
log.retention.bytes=1073741824#指定 zk 集群地址
zookeeper.connect=doitedu01:2181,doitedu02:2181,doitedu03:2181
查看框架兼容的版本:查看依賴的pom / 要去查看對應框架的源碼,然后進入查看
Kafka 運維監控
kafka 自身并沒有集成監控管理系統,因此對 kafka 的監控管理比較不便,好在有大量的第三方監控管
理系統來使用,比如Kafka Eagle
Kafka-Eagle 簡介
Kafka-Eagle 安裝
官方文檔地址:https://docs.kafka-eagle.org/
其中修改最核心的配置
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
####################################### 給kafka集群起一個名字
efak.zk.cluster.alias=cluster1
# 告訴它kafka集群所連接的zookeeper在哪里
cluster1.zk.list=doit01:2181,doit02:2181,doit03:2181######################################
# broker size online list
######################################
# 告訴集群broker服務器有多少臺
cluster1.efak.broker.size=3# 還需要一種數據庫,可以針對性的選擇######################################
# kafka sqlite jdbc driver address 本地的嵌入式數據庫
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/opt/data/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior
=convertToNull
#efak.username=root
#efak.password=123456如上,數據庫選擇的是 sqlite,需要手動創建所配置的 db 文件存放目錄:/opt/data/kafka-eagle/db/
如果,數據庫選擇的是 mysql,則要放一個 mysql 的 jdbc 驅動 jar 包到 kafka-eagle 的 lib 目錄中
啟動 KafkaEagle
訪問 web 界面
初始是一個快速看板,快速得到一些最要緊的信息。
啟動,先啟動zookeeper,再kafka
kafka默認對客戶端暴漏的連接端口是 9092、zookeeper默認暴漏的端口是2181
顯示信息比如說有多少分區、分布的均勻率、數據傾斜的比例、Leader傾斜的比例等參數。
Kafka-Eagle的意義:
1.對生產力的提升。
2.監控Kafka集群的手段。
命令行工具
概述
Kafka 中提供了許多命令行工具(位于$KAFKA HOME/bin 目錄下)用于管理集群的變更。
命令 | 描述 |
---|---|
kafka-console-consumer.sh | 用于消費消息 |
kafka-console-producer.sh | 用于生產消息 |
kafka-topics.sh | 用于管理主題 |
kafka-server-stop.sh | 用于關閉 Kafka 服務 |
kafka-server-start.sh | 用于啟動 Kafka 服務 |
kafka-configs.sh | 用于配置管理 |
kafka-consumer-perf-test.sh | 用于測試消費性能 |
kafka-producer-perf-test.sh | 用于測試生產性能 |
kafka-dump-log.sh | 用于查看數據日志內容 |
kafka-preferred-replica-election.sh | 用于優先副本的選舉 |
kafka-reassign-partitions.sh | 用于分區重分配 |
topic 管理操作:kafka-topics
查看 topic 列
bin/kafka-topics.sh --list --zookeeper doit01:2181
查看 topic 狀態信息
(1)查看 topic 詳細信息
bin/kafka-topics.sh --zookeeper doitedu01:2181 --describe --topic topic-doit29
從上面的結果中,可以看出,topic 的分區數量,以及每個分區的副本數量,Configs:compression.type=gzip 代表著我們topic的數據是壓縮的。
創建 topic
./kafka-topics.sh --zookeeper doitedu01:2181 --create --replication-factor 3
--partitions 3 --topic test
參數解釋:
--replication-factor 副本數量
--partitions 分區數量
--topic topic 名稱
本方式,副本的存儲位置是系統自動決定的;
手動指定分配方案:分區數,副本數,存儲位置
bin/kafka-topics.sh --create --topic tpc-1 --zookeeper doitedu01:2181
--replica-assignment 0:1:3,1:2:6
該 topic,將有如下 partition:
partition0 ,所在節點: broker0、broker1、broker3
partition1 ,所在節點: broker1、broker2、broker6而順序的不同有可能導致leader位置的不同
刪除 topic
bin/kafka-topics.sh --zookeeper doitedu01:2181 --delete --topic test
刪除 topic,server.properties 中需要一個參數處于啟用狀態: delete.topic.enable = true
使用 kafka-topics .sh 腳本刪除主題的行為本質上只是在 ZooKeeper 中的 /admin/delete_topics 路徑
下建一個與待刪除主題同名的節點,以標記該主題為待刪除的狀態。然后由 Kafka 控制器異步完成。
增加分區數
kafka-topics.sh --zookeeper doit01:2181 --alter --topic doitedu-tpc2 --partitions 3
Kafka 只支持增加分區,不支持減少分區(增加分區,不涉及歷史數據的合并,是一個輕量級的操作,而減少分區,必然涉及到歷史數據的轉移合并,代價太大)
原因是:減少分區,代價太大(數據的轉移,日志段拼接合并)
如果真的需要實現此功能,則完全可以重新創建一個分區數較小的主題,然后將現有主題中的消息按
照既定的邏輯復制過去;
動態配置 topic 參數
通過管理命令,可以為已創建的 topic 增加、修改、刪除 topic level 參數
添加/修改 指定 topic 的配置參數:
kafka-topics.sh --alter --topic tpc2 --config compression.type=gzip --zookeeper doit01:2181
# --config compression.type=gzip 修改或添加參數配置
# --add-config compression.type=gzip 添加參數配置
# --delete-config compression.type 刪除配置參
topic 配置參數文檔地址: https://kafka.apache.org/documentation/#topicconfigs
當然其核心的配置參數有蠻多的
- broker的配置參數
- consumer的配置參數
- producer的配置參數
- topic的配置參數
生產者:kafka-console-producer
bin/kafka-console-producer.sh --broker-list doitedu01:9092 --topic test
>hello word
>kafka
>nihao
其實存在著一些思考和問題,比如我們根本不知道到底是不是寫進去了,那么我們應該怎么辦?
消費者:kafka-console-consumer
消費者在消費的時候,需要指定要訂閱的主題,還可以指定消費的起始偏移量
起始偏移量的指定策略有 3 種:
? earliest 從最早的開始消費
? latest 從最新的開始消費
? 指定的 offset( 分區號:偏移量) 從你指定的位置開始消費
? 從之前所記錄的偏移量開始消費
kafka 的 topic 中的消息,是有序號的(序號叫消息偏移量),而且消息的偏移量是在各個 partition 中
獨立維護的,在各個分區內,都是從 0 開始遞增編號!
消費消息(從開始的開始消費)
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --from-beginning
--topic test
但是會存在一種情況,比如說 先生產了很多消息進集群中,然后開始消費的話,可能不會保證有序,因為數據是存儲在不同的分區中的,消費者在消費的時候,是先把一個分區的數據消費完,然后再去消息其他分區。所以這也就導致了全局順序不一致的情況。
如果不加 --from-beginning 默認從最新的開始消費 當再次執行消費者的時候,會返回0條,因為已經沒有最新的了,已經存在的都叫老數據了。
如果此時還想讓消費者 消費到數據,那就去生產新的數據。
指定要消費的分區,和要消費的起始 offset
bin/kafka-console-consumer.sh --bootstrap-server
doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic doit14 --offset 2 --partition 0
在這里其實要明白的一個點就是,生產者把數據寫入topic的時候,默認是把數據在多個分區間輪詢寫入。
每一個消息都有一個序號,對應的消息的序號(offset)遞增都是每個分區內管理的,消息的offset在topic中并不會有全局的遞增號。所以offset是在各個分區內獨立維護的,那么也就意味著每個分區中,都有offset=0的消息
消費組
消費組是 kafka 為了提高消費并行度的一種機制!
如果只有一個消費者,那么就會是這樣
消費者輪詢消費對應的分區。
而如果topic中數據量太大,而你需要多個并行處理任務去處理topic中的數據,那么就需要消費組。
消費組內的各個消費者之間,分擔數據讀取任務的最小單位是分區。
同一個分區只會被消費組內某一個消費者來負責讀取。
而如果出現,消費者組 中消費者 大于 分區數,那么就會剩下來。
在kafka的底層邏輯中,任何一個消費者都有自己所屬的組
組和組之間,沒有任何關系,大家都可以消費到目標topic中所有的數據,但是組內的各個消費者,就只能讀到自己所分配的 分區
如何讓多個消費者組成一個組: 就是讓這些消費者的 groupId 相同即可!
KAFKA 中的消費組,可以動態增減消費者
而且消費組中的消費者數量發生任意變動,都會重新分配分區消費任務
消費位移的記錄
kafka 的消費者,可以記錄自己所消費到的消息偏移量,記錄的這個偏移量就叫(消費位移);
記錄這個消費到的位置,作用就在于消費者重啟后可以接續上一次消費到位置來繼續往后面消費;
其實講白了就是為了斷點續傳。
例如上圖 消費者A,正在讀,突然消費者組里新增了一個消費者,那么這個程序,讀的進程會被中斷,先停,重新分配一下分區,然后再來。
kafka消費者是有這個功能的,它會自己去記消費到那條消息了,萬一消費者崩了,重啟也知道從哪里繼續消費。
其消費的本質是按照組來記偏移量,整個偏移量組內共享,并不是按照單個消費者來記,畢竟消費者組里的消費者可以動態收縮!!!
如果此時又來了一個新的消費者組來消費 topic,那么就沒有對應的偏移量。
有一個比較經典的問題:
如果我們消費一個數據,已經讀到了,但是還沒有來得及更新偏移量,正要更新偏移量的時候,崩潰了,那么此時重啟之后會發生什么?
此時就會被重復消費。
該模式主要是 先讀后記,如果是先記后讀,可能連讀都讀不到!!!
但是還有另外的一個情況,就是可能重復的數據不止一條。
比如 消費了好幾條,再記錄!一次去讀一批,然后去更新偏移量。
kafka的消費者去讀取數據,是消費者主動向broker去請求拉取,而不是broker服務器來推送(具體拉取多少條是有參數配置的)
如果拉取的速度比進行的速度要快的話,那么消費者就經常的處于饑餓的狀態,如果進來的速度比我拉取的要快,那么就會造成數據大量的積壓。
如果在不同機器啟動同一個消費者組里的消息者,還是能夠共享偏移量的
是因為偏移量數據并沒有存儲在本地磁盤上,在0.11.x之前,消費者確實是把自己消費到的位置(消費位移)記錄到zk上,之后,是記錄在kafka的一個內部的topic中( __consumer_offsets)。
類似于mysql,也是一樣的邏輯,內部也有一些系統內部表
通過指定 formatter 工具類,來對__consumer_offsets 主題中的數據進行解析;
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --topic __consumer_offsets --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
consumer去記錄偏移量的時候,不是 讀到一批數據就記錄一次,也不是記錄一次后再去讀數據,而是周期性定期去提交當前的位移。
如果真的發生了更新,那就去改數字,沒發生更新,就和原來一樣。(周期性 5s 去提交當前的位移)
其實也可以從上面的記錄中就可以看到,kafka的消費者,記錄新的消費位移,并不是去修改上一次的,而是重新記錄(追加新記錄,像日志一樣)。
所以就像之前說的kafka的配置文件里面,數據存儲目錄,不叫data.dirs 而是叫 log.dirs,kafka之所以把自己的數據存儲目錄稱之為 log目錄,是因為,他底層存儲數據的特性,類似于 “日志” 數據只能不斷追加。
這種日志也不能刪除,只能將超過日期的日志進行截斷,留下的各個消費者組的都有。當然針對這一點,因為消費者組再啟動消費的時候,是可以顯示指定起始偏移量,也就是說,可以忽略之前所記錄的偏移量。
如果需要獲取某個特定 consumer-group 的消費偏移量信息,則需要計算該消費組的偏移量記錄所在
分區: Math.abs(groupID.hashCode()) % numPartitions __consumer_offsets 的分區數為:50
配置管理 kafka-config
kafka-configs.sh 腳本是專門用來進行動態參數配置操作的,這里的操作是運行狀態修改原有的配置,
如此可以達到動態變更的目的;
動態配置的參數,會被存儲在 zookeeper 上,因而是持久生效的
可用參數的查閱地址: https://kafka.apache.org/documentation/#configuration
kafka-configs.sh 腳本包含:變更 alter、查看 describe 這兩種指令類型;
kafka-configs. sh 支持主題、 broker 、用戶和客戶端這 4 個類型的配置。
kafka-configs.sh 腳本使用 entity-type 參數來指定操作配置的類型,并且使 entity-name 參數來指定操
作配置的名稱。
比如查看 topic 的配置可以按如下方式執行
bin/kafka-configs.sh --zookeeper doit01:2181 --describe --entity-type topics --entity-name tpc_2
比如查看 broker 的動態配置可以按如下方式執行:
bin/kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper doit01:2181
entity-type 和 entity-name 的對應關系
示例:添加 topic 級別參數
bin/kafka-configs.sh --zookeeper doit:2181 --alter --entity-type topics --entity-name tpc22 --add-config
cleanup.policy=compact,max.message.bytes=10000
使用 kafka-configs.sh 腳本來變更( alter )配置時,會在 ZooKeeper 中創建一個命名形式為:
/config//的節點,并將變更的配置寫入這個節點
示例:添加 broker
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000
--bootstrap-server doit01:9092,doit02:9092,doit03:9092
動態配置 topic 參數
通過管理命令,可以為已創建的 topic 增加、修改、刪除 topic level 參
添加/修改 指定 topic 的配置參數:
kafka-topics.sh --topic doitedu-tpc2 --alter --config compression.type=gzip --zookeeper doit01:2181
如果利用 kafka-configs.sh 腳本來對 topic、producer、consumer、broker 等進行參數動態
添加、修改配置參數
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1
--alter --add-config compression.type=gzip
刪除配置參數
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1
--alter --delete-config compression.type
kafka是如何做到可以動態修改配置的?
Kafka之所以能夠動態配置,是因為它設計時考慮到了在運行時動態更改配置的需求。Kafka的配置信息存儲在Zookeeper中,而不是像傳統的配置文件那樣靜態地存儲在本地磁盤上。這樣一來,當需要更改配置時,只需要在Zookeeper上修改對應的配置節點,Kafka會自動檢測到變化并按照新的配置進行運行。
Kafka實現動態配置的原理是基于Zookeeper的Watcher機制。當Kafka啟動時,會將配置信息存儲在Zookeeper的一個特定目錄下,并且通過Watcher監聽該目錄的變化。當配置信息發生變化時,Zookeeper會通知Kafka,Kafka會重新加載新的配置并應用到運行中的服務。
以下是一個簡化的偽代碼示例,展示了Kafka動態配置的實現原理:
# Kafka啟動時初始化配置
def initialize_config():config = load_config_from_zookeeper() # 從Zookeeper加載配置apply_config(config) # 應用配置# 從Zookeeper加載配置
def load_config_from_zookeeper():config_data = zookeeper.get('/kafka/config') # 從Zookeeper獲取配置數據return parse_config(config_data) # 解析配置數據# 解析配置數據
def parse_config(config_data):# 將配置數據解析為可用的配置對象config = Config()config.load_from_dict(config_data)return config# 應用配置
def apply_config(config):# 根據配置更新Kafka的運行時參數update_kafka_config(config)# 監聽配置變化
def watch_config_changes():while True:changes = zookeeper.watch('/kafka/config') # 監聽配置目錄config = parse_config(changes) # 解析配置變化apply_config(config) # 應用配置# 修改配置
def modify_config(config_changes):zookeeper.set('/kafka/config', config_changes) # 更新Zookeeper上的配置數據# Kafka啟動時初始化配置
initialize_config()# 啟動監聽配置變化的線程
start_thread(watch_config_changes)# 修改配置的示例
modify_config(new_config_changes)