目錄
- 一、為什么需要帶有 subscribe 的 group.id
- 二、我們需要使用commitSync手動提交偏移量嗎?
- 三、如果我想手動提交偏移量,該怎么做?
一、為什么需要帶有 subscribe 的 group.id
- 消費概念:
Kafka 使用消費者組的概念來實現主題的并行消費 - 每條消息都將在每個消費者組中傳遞一次,無論該組中實際有多少個消費者。所以 group 參數是強制性的,如果沒有組,Kafka 將不知道如何對待訂閱同一主題的其他消費者。 - 偏移量:
每當我們啟動一個消費者時,它都會加入一個消費者組,然后根據該消費者組中的其他消費者數量,為其分配要讀取的分區。對于這些分區,它會檢查列表讀取偏移量是否已知,如果找到,它將從這一點開始讀取消息。如果沒有找到偏移量,則參數 auto.offset.reset 控制是從分區中最早的消息還是從最新的消息開始讀取。
二、我們需要使用commitSync手動提交偏移量嗎?
-
是否需要手動提交偏移?
是否需要提交偏移量取決于作為參數 enable.auto.commit 選擇的值。默認情況下,此設置為 true,這意味著消費者將定期自動提交其偏移量(由auto.commit.interval.ms 決定提交的頻率)。如果將其設置為 false,那么將需要自己提交偏移量。這種默認行為可能也是導致很多發現 kafka 總是從最新的開始消費的原因,由于偏移量是自動提交的,因此它將使用該偏移量。 -
有沒有辦法從頭開始重播消息?
如果想每次都從頭開始讀取,可以調用seekToBeginning,如果不帶參數調用,它將重置為所有訂閱分區中的第一條消息,或者僅重置您傳入的那些分區。 -
seekToBeginning
查找每個給定分區的第一個偏移量。poll(long) 該函數延遲計算,僅在調用或時才查找所有分區中的第一個偏移量position(TopicPartition)。如果未提供分區,則查找所有當前分配的分區的第一個偏移量。public class MyListener implements ConsumerSeekAware {...@Overridepublic void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}
-
有沒有辦法從最后開始重播消息?
有的,可以使用 seekToEnd() 查找所有分配的分區到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分區到該時間戳表示的偏移量。public class MyListener extends AbstractConsumerSeekAware {@KafkaListener(...)void listn(...) {...} }public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}
三、如果我想手動提交偏移量,該怎么做?
-
1、禁用自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
提交方法
對于手動提交,KafkaConsumers提供了兩種方法,即 commitSync() 和 commitAsync()。commitSync()是一個阻塞調用,在偏移量成功提交后返回,commitAsync()則立即返回。如果想知道提交是否成功,可以為回調處理程序 ( OffsetCommitCallback) 提供一個方法參數。請注意,在兩次提交調用中,消費者都會提交最新poll()調用的偏移量。
舉個例子:假設一個分區主題有一個消費者并且最后一次調用poll()返回偏移量為 4、5、6 的消息。提交時,偏移量 6 將被提交,因為這是消費者客戶端跟蹤的最新偏移量。
同時,commitSync() 和 commitAsync() 都允許更多地控制我們想要提交的偏移量:如果你使用允許你指定的相應重載,那么Map<TopicPartition, OffsetAndMetadata>消費者將僅提交指定的偏移量(即,映射可以包含分配的分區的任何子集) ,并且指定的偏移量可以為任意值)。 -
同步提交:
阻塞線程,直到提交成功或遇到不可恢復的錯誤(在這種情況下,它被拋出給調用者)while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitSync();} }
對于 for 循環中的每次迭代,只有在consumer.commitSync()成功返回或因拋出異常而中斷后,代碼才會移至下一次迭代。
-
異步提交:
是一種非阻塞方法。調用它不會阻塞線程。相反,它將繼續處理以下指令,無論最終是成功還是失敗。while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitAsync(callback);} }
對于 for 循環中的每次迭代,無論consumer.commitAsync()最終會發生什么,代碼都會移至下一次迭代。并且,提交的結果將由定義的回調函數處理。
-
權衡:延遲與數據一致性
1、如果必須確保數據一致性,請選擇commitSync(),因為它將確保在執行任何進一步操作之前,你將知道偏移量提交是成功還是失敗。但由于它是同步和阻塞的,你將花費更多的時間來等待提交完成,這會導致高延遲。
2、如果可以接受某些數據不一致并希望具有低延遲,請選擇commitAsync(),因為它不會等待完成。相反,它只會發出提交請求并稍后處理來自 Kafka 的響應(成功或失敗),同時代碼將繼續執行。