目錄
- 生產者ack機制
- 消費者ack模式
- 手動提交ACK
生產者ack機制
Kafka 生產者的 ACK 機制指的是生產者在發送消息后,對消息副本的確認機制。ACK 機制可以幫助生產者確保消息被成功寫入 Kafka 集群中的多個副本,并在需要時獲取確認信息。
Kafka 提供了三種 ACK 機制的配置選項,分別是:
-
acks=0:生產者在成功將消息發送到網絡緩沖區后即視為消息已被提交,不等待任何服務器響應。這種配置下,可能會出現消息丟失的情況。
-
acks=1:生產者在成功將消息發送到主題的分區 leader 后即視為消息已被提交。這種配置下,生產者會收到分區 leader
的確認,但仍有可能出現消息丟失的情況,例如當 leader 出現故障,而消息尚未復制到其他副本時。 -
acks=all 或acks=-1:生產者需要等待所有分區副本都成功寫入消息后才視為消息已被提交。這種配置下,生產者會等待所有分區副本的確認,確保消息被復制到足夠數量的副本后才返回提交確認。這是最安全的確認方式,但也會導致較長的等待時間。
在實際使用中,根據對消息可靠性和延遲的要求,可以選擇不同的 ACKs 級別。一般來說,如果對消息的可靠性要求較高,可以選擇較高的 ACKs 級別,但需要考慮相應的延遲成本。
我們可以通過spring.kafka.producer.acks來配置ack機制
spring.kafka.producer.acks=1
消費者ack模式
kafka支持的消費模式,在AbstractMessageListenerContainer.AckMode的枚舉中,下面就介紹下各個模式的區別
public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer* immediately processes the commit.*/MANUAL_IMMEDIATE,}
AckMode模式
RECORD:當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
當使用 RECORD 確認模式時,消息監聽容器會在每個消息被單獨處理后進行確認。這意味著,如果一條消息被成功處理,它將作為單獨的記錄進行確認;如果處理失敗,也會針對該消息進行錯誤記錄。這種確認模式適用于需要精確處理每個消息的應用場景,例如確保每個消息都被正確處理。
BATCH:當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交
當使用 BATCH 確認模式時,消息監聽容器會在批量處理一組消息后進行確認。這意味著,消息監聽容器會將多個消息合并為批次,并將它們作為一組進行處理。只有在整個批次都被成功處理后,該批次的所有消息才會被確認。這種確認模式適用于需要提高處理效率的場景,例如批量處理大量消息以減少網絡傳輸和系統調用的開銷。
TIME:當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交
COUNT:當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
COUNT_TIME:TIME或COUNT?有一個條件滿足時提交
MANUAL:這是手動確認模式,消費者需要顯式地調用 Acknowledgment.acknowledge() 方法來確認消息。只有當消費者調用 acknowledge() 方法后,才會向 Kafka 服務器發送確認消息。這種模式可以保證消息的可靠性和順序性,但需要消費者顯式地處理確認邏輯。
MANUAL_IMMEDIATE:這是立即手動確認模式,與 MANUAL 模式類似,但消費者在調用 acknowledge() 方法時,會立即向 Kafka 服務器發送確認消息。這種模式可以提高消息處理的速度,但可能會增加重復消費的風險。
MANUAL和MANUAL_IMMEDIATE的區別
MANUAL 和 MANUAL_IMMEDIATE 都是 Kafka 消費者的手動確認模式,它們的區別在于確認的時機不同。
MANUAL 模式下,消費者需要顯式地調用 Acknowledgment.acknowledge() 方法來確認消息,在調用該方法之后,消息才會被標記為已消費,并且確認消息會在下次 poll() 時發送到 Kafka 服務器。這種模式的優點是可以保證消息的可靠性和順序性,但需要消費者顯式地處理確認邏輯。
相比之下,MANUAL_IMMEDIATE 模式下,在消費者調用 Acknowledgment.acknowledge() 方法時,會立即向 Kafka 服務器發送確認消息。這種模式可以提高消息處理的速度,但可能會增加重復消費的風險,因為如果消息處理失敗,Kafka 不會再次發送該消息,而是認為該消息已經被成功消費了。
在實際使用中,應根據業務需求和性能要求來選擇合適的確認模式。如果要求消息的可靠性和順序性比較高,可以選擇 MANUAL 模式;如果要求處理速度比較高,可以選擇 MANUAL_IMMEDIATE 模式。
AckMode 可以通過配置文件或代碼進行設置。例如,在 Spring Boot 應用中,可以使用以下配置方式指定確認模式:
spring.kafka.listener.ack-mode=manual_immediate
手動提交ACK
kafka默認是自動提交ack的,很多時候,我們都需要手動提交,這就要進行以下配置
1、設置enable-auto-commit=false,禁止自動提交
2、設置ack-mode為manual_immediate
在配置文件進行如下配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate
3、監聽方法的入參加入Acknowledgment ack 參數,并在消費完成之后調用acknowledge方法,如下所示
@KafkaListener(topics = "my-topic2",groupId = "myGroup")public void receiveMessage2(String message, Acknowledgment ack){log.info("消費消息:"+message);//ack確認ack.acknowledge();}