文章目錄
- 01. Kafka 分區位移
- 02. Kafka 消費位移
- 03. kafka 消費位移的作用
- 04. Kafka 消費位移的提交
- 05. kafka 消費位移的存儲位置
- 06. Kafka 消費位移與消費者提交的位移
- 07. kafka 消費位移的提交時機
- 08. Kafka 維護消費狀態跟蹤的方法
01. Kafka 分區位移
對于Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中對應的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。
每條消息在分區中的位置信息由一個叫位移(Offset)的數據來表征。分區位移總是從 0 開始,假設一個生產者向一個空分區寫入了 10 條消息,那么這 10 條消息的位移依次是 0、1、2、…、9。
02. Kafka 消費位移
對于kafka中的消費者而言,也有一個offset的概念,消費者使用 offset 來表示消費到分區中某個消息所在的位置。
消費位移(偏移量)是指消費者在消費分區中的消息時,記錄的已經消費的消息的位移。消費者會定期地將已經消費的消息的位移提交到Kafka集群中,以便在下一次啟動時從上次消費的位置繼續消費。
每個消費者在消費消息的過程中必然需要有個字段記錄它當前消費到了分區的哪個位置上,這個字段就是消費者位移(Consumer Offset)。注意,這和分區位移完全不是一個概念。“分區位移”表征的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器嘛。另外每個消費者有著自己的消費者位移。
03. kafka 消費位移的作用
消費者位移(偏移量)是指消費者在消費分區中的消息時,記錄的已經消費的消息的位移。它的作用主要有以下幾個方面:
① 消費者可以通過記錄偏移量來實現斷點續傳。當消費者下線或者重啟時,它可以通過記錄的偏移量來恢復之前的消費狀態,從而避免重復消費已經處理過的消息。
② Kafka 通過偏移量來保證消息的順序性。在同一個分區中,消息的順序是有序的,消費者可以通過記錄偏移量來保證消費的順序性。
③ Kafka 還可以通過偏移量來實現消息的回溯。消費者可以通過指定偏移量來重新消費之前的消息,這在某些場景下非常有用,比如重新處理之前出現的錯誤。
總之,Kafka 消息偏移量是非常重要的一個概念,它可以幫助消費者實現斷點續傳、保證消息的順序性以及實現消息的回溯等功能。
04. Kafka 消費位移的提交
消費者可以通過訂閱一個或多個主題來拉取消息。當消費者調用 poll() 方法時,它會從 Kafka 集群中拉取一批消息,這些消息會被緩存在消費者的本地緩存中,等待消費者進一步處理。在消費者處理完這批消息后,它可以再次調用 poll() 方法來拉取下一批消息。如果消費者在處理消息時發生了錯誤,那么這批消息將會被重新拉取,直到消費者成功地處理它們為止。
因此每次調用poll()方法,它總是會返回還沒有被消費者讀取過的記錄,這意味著我們可以追蹤哪些記錄是被群組里的哪個消費者讀取過的。要做到這一點,就需要記錄上一次消費時的消費位移。并且這個消費位移必須做持久化保存,而不是單單保存在內存中,否則消費者重啟之后就無法知曉之前的消費位移。再考慮一種情況,當有新的消費者加入時,那么必然會有再均衡的動作,對于同一分區而言,它可能在再均衡動作之后分配給新的消費者,如果不持久化保存消費位移,那么這個新的消費者也無法知曉之前的消費位移。
在舊消費者客戶端中,消費位移是存儲在 ZooKeeper 中的。而在新消費者客戶端中,消費位移存儲在Kafka內部的主題 __consumer_offsets 中。這里把將消費位移存儲起來(持久化)的動作稱為“提交”,消費者在消費完消息之后需要執行消費位移的提交。
05. kafka 消費位移的存儲位置
消費者默認將 offset 保存在Kafka一個內置的 topic 中,該 topic 為 __consumer_offsets。
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets
消費者會向一個叫作 __consumer_offset 的內置主題發送消息,消息里包含每個分區的偏移量。如果消費者一直處于運行狀態,那么偏移量就沒有什么實際作用。但是,如果消費者發生崩潰或有新的消費者加入群組,則會觸發再均衡。再均衡完成之后,每個消費者可能會被分配新的分區,而不是之前讀取的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的位置繼續讀取消息。
消費 offset 案例:
① __consumer_offsets 為 Kafka 中的 topic,那就可以通過消費者進行消費。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默認是 true,表示不能消費系統主題。為了查看該系統主題數據,所以該參數修改為 false。
② 創建主題 haha
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2 --topic haha
Created topic test1.
③ 啟動生產者生產數據:
[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>
④ 啟動消費者消費數據:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!
⑤ 查看消費者消費主題__consumer_offsets:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning# key是[消費者組,消費的主題,消費的分區],value中已經消費的消息在當前分區的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
06. Kafka 消費位移與消費者提交的位移
如下圖,x 表示某一次拉取操作中此分區消息的最大偏移量,假設當前消費者已經消費了 x 位置的消息,那么我們就可以說消費者的消費位移為 x,不過當前消費者需要提交的消費位移并不是 x,而是 x+1,它表示下一條需要拉取的消息的位置。
如果使用自動提交或不指定提交的偏移量,那么將默認提交poll()返回的最后一個位置之后的偏移量,即提交比客戶端從poll()返回的最后一個位置大1的偏移量。在進行手動提交或需要提交特定的偏移量時,一定要記住這一點。
07. kafka 消費位移的提交時機
當前一次 poll() 操作所拉取的消息集為[x+2,x+7],x+2代表上一次提交的消費位移,說明已經完成了x+1之前(包括x+1在內)的所有消息的消費,x+5表示當前正在處理的位置。
① 如果最后一次提交的偏移量大于客戶端處理的最后一條消息的偏移量,那么處于兩個偏移量之間的消息就會丟失:
如圖,如果拉取到消息之后就進行了位移提交,即提交了x+8,那么當前消費x+5的時候遇到了異常,在故障恢復之后,我們重新拉取的消息是從x+8開始的。也就是說,x+5至x+7之間的消息并未能被消費,如此便發生了消息丟失的現象。
② 如果最后一次提交的偏移量小于客戶端處理的最后一條消息的偏移量,那么處于兩個偏移量之間的消息就會被重復處理:
如圖,如果消費完所有拉取到的消息之后才進行位移提交,那么當消費x+5的時候遇到了異常,在故障恢復之后,我們重新拉取的消息是從x+2開始的。也就是說,x+2至x+4之間的消息又重新消費了一遍,故而又發生了重復消費的現象。
08. Kafka 維護消費狀態跟蹤的方法
在Kafka中,消費者組可以通過消費者偏移量(consumer offset)來跟蹤它們在分區中消費的消息。消費者偏移量是一個整數,表示消費者已經成功讀取的消息的位置。當消費者讀取消息時,它會將偏移量保存在內存中,以便在下一次讀取消息時能夠從正確的位置開始讀取。