概述
每當我們調用Kafka的poll()方法或者使用Spring的@KafkaListener(其實底層也是poll()方法)注解消費Kafka消息時,它都會返回之前被寫入Kafka的記錄,即我們組中的消費者還沒有讀過的記錄。
這意味著我們有一種方法可以跟蹤該組消費者讀取過的記錄。 如前所述,Kafka的一個獨特特征是它不會像許多JMS隊列那樣跟蹤消費過的記錄。相反,它允許消費者使用Kafka跟蹤每個分區中的位置(位移,也稱偏移量)。
我們將更新分區中當前位置的操作稱為提交位移(commits offset)。
offset的作用和意義
offset 是 partition 中每條消息的唯一標識,是一個單調遞增且不變的值,由 kafka 自動維護,offset 用于定位和記錄消息在 partition 中的位置和消費進度,保證 partition 內的消息有序。
offset 是 Kafka 為每條消息分配的一個唯一的編號,它表示消息在分區中的順序位置。offset 是從 0 開始的,每當有新的消息寫入分區時,offset 就會加 1。offset 是不可變的,即使消息被刪除或過期,offset 也不會改變或重用。
offset 的作用主要有兩個:
- 一是用來定位消息。通過指定 offset,消費者可以準確地找到分區中的某條消息,或者從某個位置開始消費消息。
- 二是用來記錄消費進度。消費者在消費完一條消息后,需要提交 offset 來告訴 Kafka broker 自己消費到哪里了。這樣,如果消費者發生故障或重啟,它可以根據保存的 offset 來恢復消費狀態。
offset在語義上擁有兩種:Consumer Offset和Committed Offset。
Consumer & Committed
Consumer Offset
- 每個消費者在消費消息的過程中必然需要有個字段記錄它當前消費到了分區的哪個位置上,這個字段就是消費者位移(Consumer Offset),它是消費者消費進度的指示器。Consumer Offset保存在消費者客戶端中,表示當前消費序號,僅在poll()方法中使用。例如:
消費者第一次調用poll()方法后收到了20條消息,那么Current Offset就被設置為20。
這樣消費者下一次調用poll()方法時,Kafka就知道應該從序號為21的消息開始讀取。
這樣就能夠保證每次poll消息時,都能夠收到不重復的消息。
- 看上去Offset 就是一個數值而已,其實對于 Consumer Group 而言,它是一組 KV 對,Key 是分區,V 對應 Consumer 消費該分區的最新位移 TopicPartition->long
- 不過切記的是消費者位移是下一條消息的位移,而不是目前最新消費消息的位移。
Committed Offset
Committed Offset是保存在kafka客戶端上,主要通過commitSync
和commitAsync
API操作,SpringBoot集成Kafka中是使用ack.acknowledge()
。若消費者poll了消息但是不調用API,Committed Offset依舊為0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的過程中,一個分區分配給了一個消費者,消費者將從Committed Offset記錄的序號后開始消費。又或者消費者調用了poll消費了5條消息并調用API更新了Committed Offset,然后宕機了過了一會兒又重啟了,消費者也可以通過Committed Offset得知從第6條消息消費。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。Group Coordinator是運行在Kafka集群中每一個Broker內的一個進程。它主要負責Consumer Group的管理,Offset位移管理以及Consumer Rebalance。
offset 的存儲和管理
offset 的存儲和管理主要涉及到兩個方面:生產者端和消費者端。
生產者端
生產者在向 Kafka 發送消息時,可以指定一個分區鍵(Partition Key),Kafka 會根據這個鍵和分區算法來決定消息應該發送到哪個分區。如果沒有指定分區鍵,Kafka 會采用輪詢或隨機的方式來選擇分區。生產者也可以自定義分區算法。
當消息被寫入到分區后,Kafka broker 會為消息分配一個 offset,并返回給生產者。生產者可以根據返回的 offset 來確認消息是否成功寫入,并進行重試或其他處理。
消費者端
消費者在消費 Kafka 消息時,需要維護一個當前消費的 offset 值,以及一個已提交的 offset 值。當前消費的 offset 值表示消費者正在消費的消息的位置,已提交的 offset 值表示消費者已經確認消費過的消息的位置。
消費者在消費完一條消息后,需要提交 offset 來更新已提交的 offset 值。提交 offset 的方式有兩種:自動提交和手動提交。
無論是自動提交還是手動提交,offset 的實際存儲位置都是在 Kafka 的一個內置主題中:__consumer_offsets。這個主題有 50 個分區(可配置),每個分區存儲一部分消費組(Consumer Group)的 offset 信息。Kafka broker 會根據消費組 ID 和主題名來計算出一個哈希值,并將其映射到 __consumer_offsets 主題的某個分區上。__consumer_offsets主題包含每個分區需要提交的偏移量。 但是,如果消費者組的消費者崩潰或新的消費者加入消費者組,這將觸發重新平衡(rebalance),即消費者組內的消費者負責的分區會發生變化。 在重新平衡之后,可以為每個消費者分配一組新的分區而不是之前處理的分區。 然后消費者將讀取每個分區的已提交偏移量并從那里繼續。
__consumer_offsets 主題是 Kafka 0.9.0 版本引入的新特性,之前的版本是將 offset 存儲在 Zookeeper 中。但是 Zookeeper 不適合大量寫入,因此后來改為存儲在 Kafka 自身中,提高了性能和可靠性。
自動提交(Automatic Commit)
Kafka 提供了一個配置參數 enable.auto.commit,默認為 true,表示開啟自動提交功能。自動提交功能會在后臺定期(由 auto.commit.interval.ms 參數控制)將當前消費的 offset 值提交給 Kafka broker。
提交偏移量的最簡單方法是允許消費者來完成。 如果配置 enable.auto.commit=true
,則消費者每五秒鐘將提交客戶端從poll()收到的最大偏移量。 五秒間隔是默認值,可通過設置auto.commit.interval.ms
來控制。 就像消費者中的其他機制一樣,自動提交由poll loop驅動。 無論您何時輪詢,消費者都會檢查是否需要提交,如果是,它將提交它在上次輪詢中返回的偏移量。
它實際保證的是位移至少要隔一段時間才會提交,如果你是單線程處理消息,那么只有處理完消息后才會提交位移,可能遠比你設置的間隔長,因為你的處理邏輯可能需要一定的時間。
提交的時機
- Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息。
- 從順序上來說,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況。
- 但自動提交位移的一個問題在于,它可能會出現重復消費,如果處理失敗了下次開始的時候就會從上一次提交的offset 開始處理
存在的問題
數據重復寫入
假設 Consumer 當前消費到了某個主題的最新一條消息,位移是 100,之后該主題沒有任何新消息產生,故 Consumer 無消息可消費了,所以位移永遠保持在 100。由于是自動提交位移,位移主題中會不停地寫入位移 =100 的消息。顯然 Kafka 只需要保留這類消息中的最新一條就可以了,之前的消息都是可以刪除的。
位移提交和rebalance
雖然自動提交很方便,但是它也有一定的不足。
請注意,默認情況下,自動提交每五秒鐘發生一次。 假設我們在最近的提交之后三秒鐘并且觸發了重新平衡。 在重新平衡之后,所有消費者將從最后提交的偏移開始消費。 在這種情況下,偏移量是三秒鐘之前的偏移量,因此在這三秒內到達的所有事件將被處理兩次。 可以將提交間隔配置為更頻繁地提交并減少記錄將被復制的窗口,但是不可能完全消除它們。這是自動提交機制的一個缺陷(其實就是重復消費的問題)。
啟用自動提交后,對poll的調用將始終提交上一輪詢返回的最后一個偏移量。 它不知道實際處理了哪些事件,因此在再次調用poll()之前,始終處理完poll()返回的所有事件至關重要, 因為和poll()一樣,close()方法也會自動提交偏移量。
其實仔細思考,手動提交也存在這個問題,因為rebalance會先讓所以的消費者停止消費,因為在kafka的角度來看,消息消費的那一刻,消費已經完成,所以停止消費的時候,你的邏輯很可能沒有完成,那么你的offset 也很可能沒有提交。在rebalance后分區重新分配的消費者會重新從服務端獲取分區的offset值,此時可能是消費端提交前的offset,也會產生重復消費問題。
自動提交很方便,但它們不能給開發人員足夠的控制以避免重復的消息。
手動提交
如果 enable.auto.commit 設置為 false,則表示關閉自動提交功能,此時消費者需要手動調用 commitSync 或 commitAsync 方法來提交 offset。手動提交功能可以讓消費者更靈活地控制何時以及如何提交 offset。
- 很多與 Kafka 集成的大數據框架都是禁用自動提交位移的,如 Spark、Flink 等。這就引出了另一種位移提交方式:手動提交位移,即設置 enable.auto.commit = false。一旦設置了 false,Kafka Consumer API 為你提供了位移提交的方法,如 consumer.commitSync 等。當調用這些方法時,Kafka 會向位移主題寫入相應的消息。
同步手動提交
實現方案
- 設置
enable.auto.commit
為 false - 代碼中手動提交
public static void main(String[] args) {while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {// 模擬消息的處理邏輯System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});try {//處理完當前批次的消息,在輪詢更多的消息之前,調用commitSync方法提交當前批次最新的消息consumer.commitSync();} catch (CommitFailedException e) {e.printStackTrace();}}
}
SrpingBoot中是通過ack.acknowledge()
達到手動提交的目的。
@KafkaListener(topics = "order", groupId = "order_group")
public void consume(ConsumerRecord<?, ?> record, Acknowledgment ack) {System.out.println("Received: " + record);ack.acknowledge();
}
存在的問題
從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出。
commitSync()的問題在于,Consumer程序會處于阻塞狀態,直到遠端的Broker返回提交結果,這個狀態才會結束,需要注意的是同步提交會在提交失敗之后進行重試。
在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS
異步手動提交
實現方案
- 設置
enable.auto.commit
為 false - 代碼中異步提交
下面都是三個測試用例都是異步提交,不同之處在于有沒有去實現回調函數。建議生產環境中一定要實現,至少記錄下日志。
@Test
public void asynCommit1(){while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync();}
}@Test
public void asynCommit2(){while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});// 異步回調機制consumer.commitAsync(new OffsetCommitCallback(){@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){System.out.println(String.format("提交失敗:%s", offsets.toString()));}}});}
}@Test
public void asynCommit3(){while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync((offsets, exception) ->{if (exception!=null){System.out.println(String.format("提交失敗:%s", offsets.toString()));}});}
}
從名字上來看它就不是同步的,而是一個異步操作。調用 commitAsync() 之后,它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由于它是異步的,Kafka 提供了回調函數(callback),供你實現提交之后的邏輯,比如記錄日志或處理異常等。
存在的問題
commitAsync 的問題在于,出現問題時它不會自動重試。因為它是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的,所以只要在程序停止前最后一次提交成功即可。
這里提供一個解決方案,那就是不論成功還是失敗我們都將offsets信息記錄下來,如果最后一次提交成功那就忽略,如果最后一次沒有提交成功,我們可以在下次重啟的時候手動指定offset。
綜合異步和同步來提交
try {while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));records.forEach((ConsumerRecord<String, String> record) -> {System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());});consumer.commitAsync();}
} catch (CommitFailedException e) {System.out.println(String.format("提交失敗:%s", e.toString()));
} finally {consumer.commitSync();
}
同時使用了 commitSync() 和 commitAsync()。對于常規性、階段性的手動提交,我們調用 commitAsync() 避免程序阻塞,而在 Consumer 要關閉前,我們調用 commitSync() 方法執行同步阻塞式的位移提交,以確保 Consumer 關閉前能夠保存正確的位移數據。
精細化提交(分批提交)
設想這樣一個場景:你的 poll 方法返回的不是 500 條消息,而是 5000 條。那么,你肯定不想把這 5000 條消息都處理完之后再提交位移,因為一旦中間出現差錯,之前處理的全部都要重來一遍。這類似于我們數據庫中的事務處理。很多時候,我們希望將一個大事務分割成若干個小事務分別提交,這能夠有效減少錯誤恢復的時間。
對于一次要處理很多消息的 Consumer 而言,它會關心社區有沒有方法允許它在消費的中間進行位移提交。比如前面這個 5000 條消息的例子,你可能希望每處理完 100 條消息就提交一次位移,這樣能夠避免大批量的消息重新消費。
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {// 這里的參數指的是輪詢的時間間隔,也就是多長時間去拉一次數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));for (ConsumerRecord<String, String> record : records) {// 數據的處理邏輯System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());// 記錄下offset 信息offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));if (count % 100 == 0) {// 回調處理邏輯是nullconsumer.commitAsync(offsets, null);}count++;}try {//處理完當前批次的消息,在輪詢更多的消息之前,調用commitSync方法提交當前批次最新的消息consumer.commitSync(offsets);} catch (CommitFailedException e) {e.printStackTrace();}
}
實際應用場景提交方式選擇
在實際應用中,我們通常會根據業務需求的不同,選擇不同的offset提交方式。
- 如果我們的業務對數據的一致性要求較高,不允許數據的丟失或重復,那么我們應該選擇手動提交offset。在手動提交offset的模式下,我們可以在處理完消息后,再提交offset,從而避免數據的重復消費。同時,我們還可以在處理消息和提交offset之間,增加一些容錯機制,例如將消息持久化到數據庫等,從而避免數據的丟失。
- 如果我們的業務對數據的一致性要求較低,更注重系統的吞吐量,那么我們可以選擇自動提交offset。在自動提交offset的模式下,消費者會在消費消息后的一段時間內,自動提交offset,無需手動管理,從而簡化了編程模型。
offset 的提交和重置
提交 offset 是消費者在消費完一條消息后,將當前消費的 offset 值更新到 Kafka broker 中的操作。提交 offset 的目的是為了記錄消費進度,以便在消費者發生故障或重啟時,能夠從上次消費的位置繼續消費。
重置 offset 是消費者在啟動或運行過程中,將當前消費的 offset 值修改為其他值的操作。重置 offset 的目的是為了調整消費位置,以便在需要重新消費或跳過某些消息時,能夠實現這個需求。
提交 offset
Consumer 需要向 Kafka 匯報自己的位移數據,這個匯報過程被稱為提交位移(Committing Offsets)。因為 Consumer 能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即 Consumer 需要為分配給它的每個分區提交各自的位移數據。
- 位移提交的方式
- 從用戶的角度來說,位移提交分為自動提交和手動提交;
- 從 Consumer 端的角度來說,位移提交分為同步提交和異步提交。
- 大數據組件都關閉了自動提交,采取了手動提交。
前面已經介紹過自動提交和手動提交這兩種方式的區別和用法,這里不再贅述。需要注意的是,無論是自動提交還是手動提交,都不保證提交成功。因為 Kafka broker 可能發生故障或網絡延遲,導致提交失敗或延遲。因此,消費者需要處理提交失敗或延遲的情況。
提交失敗:如果提交失敗,消費者可以選擇重試或放棄。重試的話,可能會導致多次提交同一個 offset 值,但是不會影響正確性,因為 Kafka broker 會忽略重復的 offset 值。放棄的話,可能會導致下次啟動時重新消費已經消費過的消息,但是不會影響完整性,因為 Kafka 消息是冪等的。
提交延遲:如果提交延遲,消費者可以選擇等待或繼續。等待的話,可能會導致消費速度變慢,或者超過 session.timeout.ms
參數設置的時間而被認為已經死亡。繼續的話,可能會導致下次啟動時漏掉一些沒有提交成功的消息。
重置 offset
重置 offset 的方式有兩種:手動重置和自動重置。
-
手動重置是指消費者主動調用
seek
或seekToBeginning
或seekToEnd
方法來修改當前消費的 offset 值。手動重置可以讓消費者精確地控制從哪個位置開始消費。例如,如果想要重新消費某個分區的所有消息,可以調用seekToBeginning
方法將 offset 設置為 0;如果想要跳過某個分區的所有消息,可以調用seekToEnd
方法將 offset 設置為最大值;如果想要從某個具體的位置開始消費,可以調用seek
方法將 offset 設置為任意值。 -
自動重置是指消費者在啟動時根據
auto.offset.reset
參數來決定從哪個位置開始消費。消費者配置auto.offset.reset
表示Kafka中沒有存儲對應的offset信息的(有可能offset信息被刪除),亦或者offset所處位置信息過期了的情況,消費者從何處開始消費消息。auto.offset.reset
參數有三個可選值:earliest, latest 和 none。
earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
offset 的消費和保證
offset 的消費和保證主要涉及到兩個方面:順序性和一致性。
順序性
順序性是指 Kafka 消息是否按照發送和接收的順序進行處理。Kafka 只保證分區內的順序性,即同一個分區內的消息按照 offset 的順序進行發送和接收。但是不保證主題內或跨主題的順序性,即不同分區內的消息可能會亂序發送和接收。因此,如果需要保證主題內或跨主題的順序性,需要在生產者和消費者端進行額外的處理,例如使用同一個分區鍵或同一個消費組。
一致性
一致性是指 Kafka 消息是否能夠被正確地發送和接收,不會出現丟失或重復的情況。Kafka 提供了三種不同級別的一致性保證:最多一次(At most once),最少一次(At least once)和精確一次(Exactly once)。
- 最多一次:最多一次是指 Kafka 消息只會被發送或接收一次或零次,不會出現重復的情況,但是可能會出現丟失的情況。這種保證的實現方式是在生產者端關閉重試功能,在消費者端在消費消息之前提交 offset。這種保證適用于對消息丟失不敏感的場景,例如日志收集或監控。
- 最少一次:最少一次是指 Kafka 消息只會被發送或接收一次或多次,不會出現丟失的情況,但是可能會出現重復的情況。這種保證的實現方式是在生產者端開啟重試功能,在消費者端在消費消息之后提交 offset。這種保證適用于對消息重復不敏感的場景,例如計數或累加。
精確一次:精確一次是指 Kafka 消息只會被發送或接收一次,不會出現丟失或重復的情況。這種保證的實現方式是在生產者端和消費者端使用事務功能,在消費者端使用冪等功能。這種保證適用于對消息丟失和重復都敏感的場景,例如轉賬或支付。
重復消費、消息丟失
重復消費
如果提交的偏移量小于客戶端處理的最后一條消息的偏移量,那么最后處理的偏移量與提交的偏移量之間的消息將被處理兩次。
如下圖:
消息丟失
如果提交的偏移量大于客戶端實際處理的最后一條消息的偏移量,那么消費者組將忽略上次處理的偏移量與提交的偏移量之間的所有消息。如下圖:
CommitFailedException 異常處理
產生原因
從源代碼方面來說,CommitFailedException 異常通常發生在手動提交位移時,即用戶顯式調用 KafkaConsumer.commitSync() 方法時。因為KafkaConsumer.commitSync()有重試機制,所以一般的網絡原因可以排除,發生這個異常的原因主要就是超時了,但是這個超時不是說提交本身超時了,而是消息的處理時間超長,導致發生了Rebalance,已經將要提交位移的分區分配給了另一個消費者實例。
熟悉的錯誤:
Exception in thread “main” org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
解決方案
縮短單條消息處理的時間
比如之前下游系統消費一條消息的時間是 100 毫秒,優化之后成功地下降到 50 毫秒,那么此時 Consumer 端的 TPS 就提升了一倍。
增加 Consumer 端允許下游系統消費一批消息的最大時長
這取決于 Consumer 端參數 max.poll.interval.ms
的值。在最新版的 Kafka 中,該參數的默認值是 5 分鐘。如果你的消費邏輯不能簡化,那么提高該參數值是一個不錯的辦法。
值得一提的是,Kafka 0.10.1.0 之前的版本是沒有這個參數的,因此如果你依然在使用 0.10.1.0 之前的客戶端 API,那么你需要增加 session.timeout.ms
參數的值。不幸的是,session.timeout.ms
參數還有其他的含義,因此增加該參數的值可能會有其他方面的“不良影響”,這也是社區在 0.10.1.0 版本引入 max.poll.interval.ms
參數,將這部分含義從 session.timeout.ms
中剝離出來的原因之一。
減少下游系統一次性消費的消息總數
這取決于 Consumer 端參數 max.poll.records
的值。當前該參數的默認值是 500 條,表明調用一次 KafkaConsumer.poll 方法,最多返回 500 條消息。
可以說,該參數規定了單次 poll 方法能夠返回的消息總數的上限。
如果前兩種方法對你都不適用的話,降低此參數值是避免 CommitFailedException 異常最簡單的手段。
下游系統使用多線程來加速消費
這應該算是“最高級”同時也是最難實現的解決辦法了。具體的思路就是,讓下游系統手動創建多個消費線程處理 poll 方法返回的一批消息。
之前你使用 Kafka Consumer 消費數據更多是單線程的,所以當消費速度無法匹及 Kafka Consumer 消息返回的速度時,它就會拋出 CommitFailedException 異常。
如果是多線程,你就可以靈活地控制線程數量,隨時調整消費承載能力,再配以目前多核的硬件條件,該方法可謂是防止 CommitFailedException 最高檔的解決之道。
事實上,很多主流的大數據流處理框架使用的都是這個方法,比如 Apache Flink 在集成 Kafka 時,就是創建了多個 KafkaConsu
消費端消費后不提交offset情況的分析總結
故最近在使用kafka的過程中遇到了一個疑問,在查閱了一些資料和相關blog之后,做一下總結和記錄。
問題:消費者在消費消息的過程中,配置參數
spring.kafka.listener .ackMode
設置為不自動提交offset,在消費完數據之后如果不手動提交offset,那么在程序中和kafak中的數據會如何被處理呢?
spring.kafka.listener.ackMode
:指定消息確認模式,包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根據需求選擇不同的確認模式,用于控制消息的確認方式。
ackMode是個枚舉類型:
- RECORD
每處理一條commit一次 - BATCH(默認)
每次poll的時候批量提交一次,頻率取決于每次poll的調用頻率 - TIME
每次間隔ackTime的時間去commit - COUNT
累積達到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪個條件先滿足,就commit - MANUAL
處理完業務后,手動調用Acknowledgment.acknowledge()先將offset存放到map本地緩存,在下一次poll之前從緩存拿出來批量提交。最終也是批量提交。 - MANUAL_IMMEDIATE
每次處理完業務,手動調用Acknowledgment.acknowledge()后立即提交
參考Kafka系列之SpringBoot集成Kafka
————————————————————————————————————————————————————————————
首先簡單的介紹一下消費者對topic的訂閱。
- 客戶端的消費者訂閱了topic后,如果是單個消費者,那么消費者會順序消費這些topic分區中的數據;
- 如果是創建了消費組有多個消費者,那么kafak的服務端將這些topic平均分配給每個消費者。比如有2個topic,每個topic有2個分區,總共有4個分區,如果一個消費組開了四個消費者線程,那么每個消費者將被分配一個分區進行消費。一般建議是一個消費組里的消費者的個數與訂閱的topic的總分區數相等,這樣可以達到最高的吞吐量。如果消費者的個數大于訂閱的topic的總分區,那么多出的消費者將分配不到topic的分區,等于是白白創建了一個消費者線程,浪費資源。
我們進入正題,對開頭提出的問題的總結如些:
??
注意:以下情況均基于kafka的消費者關閉自動提交offset的條件下。亦是基于同一個消費者組的情況,因為不同的消費者組之間,他們彼此的offset偏移量是完全獨立的。
-
如果消費端在消費kafka的數據過程中,一直沒有提交offset,那么在此程序運行的過程中它不會重復消費。但是如果重啟之后,就會重復消費之前沒有提交offset的數據。
-
如果在消費的過程中有幾條或者一批數據數據沒有提交offset(比如異常情況程序沒有走到手動提交的代碼),后面其他的消息消費后正常提交offset至服務端,那么服務端會更新為消費后最新的offset,不會重新消費,就算重啟程序或者rebalance也不會重新消費。
-
消費端如果沒有提交offset,程序不會阻塞或者重復消費,除非在消費到這個你沒有提交offset的消息時你新增或者減少消費端,此時會發生rebalance現象,即可再次消費到這個未提交offset的數據,產生重復消費問題。因為客戶端也記錄了當前消費者的offset信息,所以程序會在每次消費了數據之后,自己記錄offset,而手動提交到服務端的offset與這個并沒有關系,所以程序會繼續往下消費。在發生rebalance現象之后,會從服務端得到最新的offset信息記錄到本地。所以說如果當前的消費的消息沒有提交offset,此時在你重新初始化消費者之后,可得到這條未提交消息的offset,從此位置開始消費。
總結就是如果消費端不提交或者拋異常,相當于一直沒有提交offset,在此程序運行過程中不會重復消費。除非是重啟,或者有新的消費者退出或者加入導致重新平衡的時候才會再次觸發消費;而且有新的消費端正常消費且提交offset以后,服務端就會更新最新的offset,這樣就算程序重啟或者重新平衡也不會重新消費。