基本概念和操作
基本概念
簡單概念
:::color4
- Broker:如果將kafka比喻成數據倉庫網絡,那么Broker就是網絡中的倉庫節點,比如快遞站,在該節點內可以獨立運行,并且多個Broker可以連接起來,構建kafka集群
- Topic:數據存儲時的分類標簽,即在存儲時需要指定對應的topic類型(比如訂單消息類型與物流消息)
- Consumer:消息的消費者,這里與RocketMq的區別是這里的消費者是主動獲取消息,而RocketMq是新增一個代理,先通過代理主動獲取消息,再利用代理推消息給消費者,使得消費者看起來是被動接收
- Partition:實際存儲數據的 “小分區”,可以將一個topic多分存儲到不同的partition,好處是:1??如果有消息丟失,一整個topic不會一起丟失 2??可以擴大存儲內容,比如需要存儲100T的數據,那么可以多分,每個partition只需要存儲10T即可【topic是概念的,partition才是物理上的存儲】
:::
- Topic(主題)是邏輯上對消息的分類,比如 “物流”“訂單” 這兩個 Topic,分別對應物流相關、訂單相關的消息。
- 然后,Partition(分區)是物理上存儲消息的單元。一個 Topic 可以有多個 Partition,像 “物流” Topic 有 partition1、partition2,“訂單” Topic 有 partition3。這些 Partition 可以分布在同一個 Broker 上(圖里就是這種情況),也可以分布在不同 Broker 上,具體看集群部署和需求。
分區
:::color1
即:在kafka中,分區是真正存儲數據的地方,生產者在生成消息時,可以指定topic和分區,同時不同的分區也可以分布在不同的Broker上,即不同的服務器
:::
分區詳解- 分布式存儲能力
當一個主題(Topic)的消息量非常大時,僅靠單一存儲單元難以承載。通過將 Topic 劃分成多個分區(圖中的 P1、P2、P3、P4 等),可以把消息分散存儲到不同的分區中。
這些分區又可以分布在 Kafka 集群的不同 Broker(服務器節點)上,從而實現消息的分布式存儲,突破了單節點存儲容量和性能的限制。
- 并行處理與并發存儲
不同的生產者(圖中的 Producer client 1、Producer client 2)可以同時向不同的分區發送消息。比如 Producer client 1 向 Partition 1 發送消息,Producer client 2 可以同時向 Partition 3、Partition 4 發送消息。這種機制讓 Kafka 具備了并發存儲消息的能力,提升了整體的消息處理吞吐量,能更好地應對高并發的消息生產場景。
- 負載均衡
分區的存在使得消息生產和存儲的壓力可以分散到多個分區甚至多個 Broker 上。不會出現所有消息都集中在一個存儲單元,導致該單元負載過高,而其他單元閑置的情況,實現了負載均衡,保障了系統的穩定和高效運行。
副本
- 保障數據可用性:當存儲數據的分片/分區(Partition)所在節點出現故障時,由于存在副本(多個分片副本),數據不會丟失,系統仍能通過副本對外提供服務,提升了 Kafka 系統的可用性。
- 解決數據一致性問題(同時也帶來挑戰):通過主(Leader)從(Follower)架構,Leader 負責數據的讀寫操作,并將數據同步給 Follower。雖然副本存在可能導致部分副本寫數據成功、部分失敗的一致性問題,但整體上通過主從同步機制,盡可能保證數據在多個副本間的一致性,為后續主從選舉等操作提供基礎,確保系統在故障切換后數據仍能保持相對一致。
- 與 Partition(分區)的聯系:副本是基于 Partition 存在的,每個 Partition 可以有多個副本,這些副本分布在不同的 Broker 上,進一步增強了 Partition 的可靠性和可用性。
- 與 Broker(節點)的聯系:副本分布在不同的 Broker 上,不同的 Broker 可以承載同一個 Partition 的不同副本(Leader 或 Follower),Broker 是副本的物理承載節點,多個 Broker 共同支撐起副本機制,實現數據的分布式存儲與備份。
- 與 Leader/Follower(主從)的聯系:副本分為 Leader 副本和 Follower 副本,Leader 副本處理讀寫請求并同步數據給 Follower 副本,Follower 副本接收同步數據,二者共同構成副本的主從架構,是副本機制的核心組成部分,保障了數據在多副本間的流轉與一致性維護。
- 與 Consumer(消費者)的聯系:消費者從 Leader 副本讀取數據,因為 Follower 不提供讀寫(主要為保證一致性),所以消費者的讀操作依賴于 Leader 副本,而副本的存在(尤其是 Leader 故障時能選舉新 Leader)保障了消費者能持續從可用的 Leader 副本讀取數據,不中斷消費流程。
常用命令
傳播消息
單播消息(group.id相同 )
即在同一個消費組里,只會有一個消費者能夠消費到某個topic中的消息
多播消息(group.id 不相同)
多個消費組在同一個topic下,那么當生產者向topic發送消息時,多個消費組都可以接收到消息
消費組
- CURRENT-OFFSET(當前偏移量)
- 含義:表示消費組當前已經消費到的消息在分區中的偏移量。簡單來說,就是消費組在對應的分區里,已經處理過了多少條消息,這個數字是從 0 開始計數的。它反映了消費組的消費進度。
- 舉例:假設你訂閱了一個名為
<font style="color:rgb(0, 0, 0);">news-topic</font>
的 Kafka 主題,這個主題有一個分區。你的消費組是<font style="color:rgb(0, 0, 0);">reader-group</font>
,當<font style="color:rgb(0, 0, 0);">CURRENT-OFFSET</font>
的值是 100 時,就意味著<font style="color:rgb(0, 0, 0);">reader-group</font>
這個消費組已經成功消費了這個分區里從第 0 條到第 99 條,總共 100 條消息。
- LOG-END-OFFSET(日志結束偏移量)
- 含義:也叫水位(HW),代表主題對應分區中當前最新消息的結束偏移量,也就是分區里目前總共存儲了多少條消息(從 0 開始計數)。它反映了分區中消息的總量。
- 舉例:還是以
<font style="color:rgb(0, 0, 0);">news-topic</font>
主題和它的一個分區為例,如果<font style="color:rgb(0, 0, 0);">LOG-END-OFFSET</font>
的值是 150 ,這就表示在這個分區里,目前一共有 150 條消息(從第 0 條到第 149 條 )。
- LAG(滯后量)
- 含義:是
**<font style="color:#DF2A3F;">LOG-END-OFFSET</font>**
減去**<font style="color:#DF2A3F;">CURRENT-OFFSET</font>**
的值,用來衡量消費組消費消息的速度和分區中消息產生速度之間的差距,即還有多少條消息沒有被消費組處理。如果<font style="color:rgb(0, 0, 0);">LAG</font>
的值為 0,說明消費組已經把分區里當前所有消息都消費完了,沒有積壓;如果值越大,說明積壓未消費的消息越多。 - 舉例:在
<font style="color:rgb(0, 0, 0);">news-topic</font>
主題和<font style="color:rgb(0, 0, 0);">reader-group</font>
消費組這個場景中,<font style="color:rgb(0, 0, 0);">CURRENT-OFFSET</font>
是 100,<font style="color:rgb(0, 0, 0);">LOG-END-OFFSET</font>
是 150 ,那么<font style="color:rgb(0, 0, 0);">LAG = 150 - 100 = 50</font>
,這就表示<font style="color:rgb(0, 0, 0);">reader-group</font>
這個消費組還有 50 條消息沒有消費。
__consumer_offsets
是 Kafka 內部用于存儲消費者組(Consumer Group)消費偏移量(offset)的主題(Topic)
原理:消費者會定期將消費分區的偏移量提交給 <font style="color:rgba(0, 0, 0, 0.85);">__consumer_offsets</font>
這個內部主題
作用:
- 支持消費者故障恢復和再均衡:當消費者出現故障重啟時,可以從這里獲取對應的偏移量,即可繼續由上次位置繼續進行消費
- 便于管理和監控消費狀態:通過查看
<font style="color:rgba(0, 0, 0, 0.85);">__consumer_offsets</font>
中記錄的偏移量信息,運維人員和開發者可以了解各個消費者組的消費進度,判斷是否存在消息積壓等情況。比如,
稀疏索引
稀疏索引的作用是通過「跳躍式記錄關鍵位置」,減少查詢時需要遍歷的數據量,尤其適合順序存儲的數據。
舉例假設數據按偏移量 1~9 順序排列(類似數組的連續存儲):
1 → 2 → 3 → 4 → 5 → 6 → 7 → 8 → 9
沒有索引時:要找偏移量 5 的位置,只能從 1 開始依次往后查(1→2→3→4→5),共需 5 步。
有稀疏索引表 (1,4,8) 時:
- 先查索引表,發現 4 < 5 < 8,說明目標 5 一定在索引 4 和 8 之間
- 于是直接從索引 4 的位置開始往后找,只需查 4→5 這 2 步就夠了
相當于索引幫你「跳過了前面 1→2→3 的無效遍歷」,尤其當數據量很大時(比如偏移量到 10000),稀疏索引能大幅減少查詢步驟(比如索引表是 1,100,200...,查 500 時直接定位到 200 開始找,而不是從 1 遍歷)。
Kafka集群
集群的搭建
Kafka 集群是由多個 Broker 節點組成的,每個 Broker 都可以接收生產者發送的消息,存儲消息,并為消費者提供消息讀取服務。之所以按照上述方式配置就形成了集群,主要是基于以下幾個關鍵方面:
為什么稱之為集群唯一的 Broker 標識(broker.id)
broker.id
是每個 Broker 在集群中的唯一身份標識。在這個配置里,kafka1
的broker.id
是 10 ,kafka2
的是 11,kafka3
的是 12 。Kafka 集群正是通過這些不同的broker.id
來區分各個 Broker 節點。有了唯一標識,在集群內部進行諸如選舉分區首領(Leader)、同步數據等操作時,就能準確識別和管理每個 Broker ,就好比每個員工都有唯一工號,方便公司管理和安排工作。
不同的監聽地址(listeners)
每個 Broker 都配置了不同的監聽地址和端口,kafka1
監聽localhost:9093
,kafka2
監聽localhost:9094
,kafka3
監聽localhost:9095
。這意味著它們可以獨立接收來自生產者(Producer)和消費者(Consumer)的請求,并且相互之間不會因為端口沖突等問題而影響通信。就像不同的門店有各自的地址,顧客(生產者和消費者)可以分別前往不同門店(Broker)辦理業務。
獨立的日志存儲目錄(log.dirs)
每個 Broker 的log.dirs
配置指定了獨立的日志存儲路徑,kafka1
的日志存儲在/Users/muse/Lesson/ServerContext/kafka-cluster/kafka1/kafka-logs
,kafka2
和kafka3
也類似。這保證了各個 Broker 產生的消息日志是分開存儲的,它們可以獨立管理自己的消息數據,互不干擾。例如,當一個 Broker 出現故障時,不會影響其他 Broker 日志數據的正常讀寫和管理。
集群通信與協同
在啟動后,這些配置好的 Broker 會通過 Kafka 內部的通信協議自動發現彼此,然后進行通信和協同工作。它們會參與 1.分區副本的管理,比如決定哪些 Broker 上的分區副本作為首領(Leader)副本,哪些作為跟隨(Follower)副本 。2.當生產者發送消息時,消息會根據分區策略被分發到不同 Broker 上的分區;消費者消費消息時,也能從多個 Broker 上拉取消息,實現負載均衡和高可用性 。
比如,在一個高并發的電商訂單處理場景中,多個訂單消息可以被均勻地發送到集群中不同的 Broker 上存儲,消費者也能從不同 Broker 上高效地獲取訂單消息進行處理。
通過對broker.id
、listeners
、log.dirs
等關鍵參數的配置,讓這三個 Kafka 實例各自成為獨立的 Broker,并且能夠相互通信、協同工作,共同承擔消息的存儲和處理任務,從而構成了一個 Kafka 集群。
集群的啟動
多分區與多副本
一個主題的多個分區可以存儲在不同的Broker中,同一分區的Leader副本和Follower副本可以存儲在不同的Broker中,避免數據丟失
多分區消費者組
1.一個partition只能被一個消費組中的一個消費者消費,但是多個消費者組,不是有多個消費者可以同時消費partition嗎 那么會不會因為同時有多個消費者造成消息的非順序呢?
答:不會,因為不同消費者組的偏移量互不影響【消費組 A 用于處理訂單的業務邏輯,消費組 B 用于對訂單數據進行統計分析,它們同時消費訂單主題的某個分區,消費組 A 按照自己的順序處理訂單,消費組 B 也按照自己的順序拉取和分析訂單數據,二者不會相互干擾消息順序】
2.因為一個partition只能被同一個消費組者中的一個消費者消費,所以當消費組中的消費者>partition時,就會造成有一些消費者空閑
mq流程講解
- 在創建主題topic時
- 指令:
<font style="color:rgb(0, 0, 0);">bin/kafka-topics.sh --create --bootstrap-server <server-address> --replication-factor <factor> --partitions <num> --topic <topic-name></font>
- 作用:在 Kafka 集群中創建新的主題。通過
<font style="color:rgb(0, 0, 0);">--replication-factor</font>
指定副本因子,確定每個分區的副本數量,以提升數據的可靠性和容錯性;<font style="color:rgb(0, 0, 0);">--partitions</font>
用于設置主題的分區數量(同一個分區存儲著消息和其副本),幫助實現消息的分布式存儲和并行處理;**<font style="color:#DF2A3F;">--topic</font>**
則明確要創建的主題名稱。
:::color1
分區創建之初,Kafka 會從該分區的多個副本中選舉出一個作為 Leader 副本,其他副本成為 Follower 副本(如果Leader副本故障,則會從Follower選舉出一個作為Leader副本)
:::
- 示例:
<font style="color:rgb(0, 0, 0);">bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic orders</font>
,表示在本地 Kafka 集群(地址為<font style="color:rgb(0, 0, 0);">localhost:9092</font>
)上創建一個名為<font style="color:rgb(0, 0, 0);">orders</font>
的主題,該主題有 2 個分區,每個分區有 3 個副本。
- 發送消息(生產者)
- 指令:
<font style="color:rgb(0, 0, 0);">bin/kafka-console-producer.sh --bootstrap-server <server-address> --topic <topic-name></font>
,然后在控制臺輸入消息內容回車發送。 - 作用:
+ 1??生產消息時,,可以指定存儲在哪個主體和對應的哪個分區
+ 2??消息會先存儲在Leader副本,然后kafka再自動同步到Follower副本中 - 示例:
<font style="color:rgb(0, 0, 0);">bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders</font>
,進入生產者控制臺,之后輸入的消息都會發送到<font style="color:rgb(0, 0, 0);">orders</font>
主題。
- 指令:
- 集群情況下
1??同一個分區上的Leader和Follower副本會存儲在不同的Broker中,避免相同數據全部丟失【但單機Broker的同一分區的不同副本仍然存在一起,因為不能分了】
同一個分區的多個副本會盡量分布在不同的 Broker 上。比如
<font style="color:rgb(0, 0, 0);">orders</font>
主題分區 0 的 3 個副本,可能分別存儲在 Broker A、Broker B、Broker C 上。這樣當某個 Broker 出現故障時,其他 Broker 上的副本可以繼續提供服務,保證消息的讀寫操作不受太大影響。
- Rebalance機制
- Controller
- 成為集群控制核心:集群里的 Broker 在 ZooKeeper(ZK)中創建臨時序號節點,序號最小(最先創建)的節點會成為 Controller,負責管理整個集群所有分區和副本的狀態。
- 處理 Leader 副本故障:當某個分區的 Leader 副本出現故障時,Controller 會為該分區選舉出新的 Leader 副本,保證分區的正常讀寫操作不受影響。
- 管理 ISR 集合變化:若檢測到某個分區的 ISR(In-Sync Replicas,同步副本)集合發生變化,Controller 會通知所有 Broker 更新相關元數據信息,確保集群各節點對分區副本同步狀態有一致認知。
- 同步新分區信息:使用
<font style="color:rgb(0, 0, 0);">kafka-topics.sh</font>
腳本為某個 Topic 增加分區數量時,Controller 會讓新分區被其他節點感知到,使整個集群能協同處理新分區的消息。
- 高水位和LEO
高頻面試題
1.如何防止消息丟失
答案
答:發送方是利用**確保消息備份成功**來保證消息不丟失,
接收方是**將自動提交改為手動提交**,即將消費消息后的 offset 移動改為手動提交,避免有時候因為網絡抖動問題造成消息消費中斷,但是如果是自動提交設置就已經移動了 offset,造成消息丟失,這就是自動提交的弊端
ack介紹:發出消息持久化機制參數
acks=0: 表示producer不需要等待任何broker確認收到消息的ACK回復,就可以繼續發送下一條消息。性能最高,但是最容易丟失消息
acks=1: 表示至少等待leader已經成功將數據寫入本地log,但是不需要等待所有follower都寫入成功,就可以繼續發送下一條消息。 這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息就會丟失。
acks=-1: 表示kafka ISR列表中所有的副本同步數據成功,才返回消息給客戶端,這是最強的數據保證。
min.insync.replicas 這個配置是用來設置同步副本個數的下限的, 并不是只有 min.insync.replicas 個副本同步成功就返回ack。而是,只要acks=all就意味著ISR列表里面的副本必須都要同步成功。
2.如何防止消息的重復消費?
答案
冪等性就是指:無論你重復操作多少次,產生的結果都和只操作一次是一樣的。(即 1 的 n 次方=1,0 的 n 次方=0,n 為 1 和 0 時,符合結果,即為冪等元)
比如生成 order.id,將這個 id 在緩存中緩存 10min,這樣子就只會產生一個 id
3.如何做到順序消費
答案
解釋:
發送方:非 0 是因為避免導致消息丟失,如果消失丟失,即使后部分是有序的,那么整體也是無序的,
接收方:由于一個 topic 可以有多個消費者組同時消費一個分區,所以這里面都順序無法保證,但是可以讓只配置一個消費者組去消費
4.如何解決消息積壓問題
答案
消息積壓:則利用加快消費能力解決,
1??可以分為提升一個消費者(即利用多線程),
2??提升整體消費者(比如增加 消費者**組** 數量,注意因為信息是存在 partition 中的,而一個 partition 中的消息只能被一個消費者組中的一個消費者消費,所以增多了消費者組,那么就得增加 partition 的數量,使得不會造成某消費者組中的消費者空閑等待)
3??設置消息的過期時間段(比如訂單應該 15min 餒處理,那么在處理前先判斷是否過期快速排查)
5.如何實現延遲隊列
**延遲隊列是一種允許消息在指定的延遲時間之后才被投遞和消費的消息隊列。 **
答案
這里為什么實現了呢?
- 先利用一個 topic 去存儲需要被延遲業務實現到隊列(比如**取消訂單業務**邏輯需要在 30min 后再實現該邏輯,那么就得等待 30min 之后執行)
- 在生產消息時帶上 creat_time 記錄創建時間
- 在消費者消費時,利用** now_time - creat_time判斷是否>30min,如果是>,那么就去執行業務邏輯,如果是<30min,那么就無需消費,保持 offset,暫停消費**(因為根據順序存儲的原理,后面的更不會>30min)
- 接著比如每 10s 去通過前面記錄的 offset 繼續判斷并且消費信息,即再去判斷是否>30min,重復步驟3、4.【以達到延遲的效果,即延遲了必須達到30min才能執行消費的邏輯】
5.kafka 如何做到單機上百萬的高吞吐量?
答案
高吞吐量的含義
在計算機領域,吞吐量是指系統在單位時間內處理請求或傳輸數據的能力 。高吞吐量意味著系統能在相同時間內處理更多的請求、傳輸更多的數據。對于 Kafka 來說,單機上達到百萬級的高吞吐量,就是指在一臺機器上,Kafka 每秒能高效處理百萬數量級的消息寫入和讀取操作。
寫入數據快的原因
- 磁盤順序寫
- 原理:傳統磁盤隨機寫時,磁頭需要頻繁移動來定位不同的存儲位置,尋道時間長,嚴重影響寫入速度。而 Kafka 將消息追加到分區日志文件末尾,屬于順序寫操作。順序寫不需要磁頭頻繁移動,大大減少了尋道時間,提高了寫入效率。
- 對比:與傳統數據庫的隨機寫相比,數據庫中數據更新、插入位置不確定,會導致磁頭頻繁移動,寫入性能受限。例如,關系型數據庫在插入數據時,可能需要更新索引、數據頁等,涉及大量隨機 I/O 操作,而 Kafka 的順序寫方式在寫入速度上具有明顯優勢。
- 頁面緩存
- 原理:操作系統會將磁盤上的數據塊緩存到內存的頁面緩存中。Kafka 寫入數據時,先寫入頁面緩存,并不立即刷入磁盤。后續如果有讀取操作,直接從頁面緩存讀取,減少磁盤 I/O。而且,當頁面緩存中的數據積累到一定量或者滿足特定條件時,才會批量刷入磁盤,批量操作也提升了整體寫入效率。
- 對比:如果沒有頁面緩存,每次寫入都直接操作磁盤,會產生大量的小 I/O 請求,效率低下。像一些不使用頁面緩存的簡單文件存儲系統,每次寫入都要等待磁盤操作完成,性能遠不如 Kafka。
讀取數據快的原因
- 原理:傳統的數據讀取過程中,數據會在用戶空間和內核空間多次拷貝,例如從磁盤讀取數據到內核緩沖區,再從內核緩沖區拷貝到用戶緩沖區,這會消耗大量的 CPU 和內存資源。而零拷貝技術(如 Linux 中的 sendfile 函數)允許數據直接在內核空間完成傳輸,跳過用戶空間,減少了數據拷貝次數,大大提升了數據傳輸效率,從而實現高讀取吞吐量。
- 對比:一些不支持零拷貝技術的系統,在讀取數據時由于多次數據拷貝,會占用較多的 CPU 和內存資源,導致讀取性能下降。例如,在一些早期的網絡文件傳輸系統中,數據需要在多個緩沖區之間拷貝,傳輸效率遠低于使用零拷貝技術的 Kafka。