1.簡介
Kafka的poll()方法消費無法精準的掌握其消費的起始位置,auto.offset.reset
參數也只能在比較粗粒度的指定消費方式。更細粒度的消費方式kafka提供了seek()
方法可以指定位移消費允許消費者從特定位置(如固定偏移量、時間戳或分區首尾)開始消費消息。
2.指定消費位置
2.1.從特定偏移量開始消費
使用seek(TopicPartition partition, long offset)
指定具體偏移量。
源碼分析:
seek()
方法更新消費者內部的subscriptions
對象的position
字段,記錄目標偏移量。- 后續
poll()
時,Fetcher
類根據此位置向Broker發送拉取請求。
代碼示例:
consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 確保分配到分區
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 設置所有分區從offset=100開始消費
assignment.forEach(tp -> consumer.seek(tp, 100));
2.2.從時間戳開始消費
使用offsetsForTimes()
獲取時間戳對應的偏移量,再調用seek()
。
源碼分析:
offsetsForTimes()
向Broker發送ListOffsetRequest
,查詢滿足時間戳條件的最早或最新偏移量。
代碼實例:
Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 獲取24小時前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});
2.3.從分區首尾消費
使用seekToBeginning()
或seekToEnd()
,或通過beginningOffsets()
/endOffsets()
獲取首尾偏移量后手動設置。
代碼實例:
// 從分區末尾開始消費(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));
2.4.注意事項
-
分區分配與poll()的依賴
seek()
必須在分區分配完成后調用,否則會拋出IllegalStateException
。需通過循環poll()
確保分配到分區。 -
數據過期問題
若指定偏移量對應的消息已被刪除(如日志清理導致),seek()
將失效。此時需使用beginningOffsets()
獲取當前最小有效偏移量。 -
異步提交與位移覆蓋風險
異步提交(commitAsync()
)失敗時不會重試,可能因位移回滾導致重復消費。需結合同步提交(commitSync()
)保證原子性。 -
seek()
方法提供了我們可以將消費者位移保存在外部的能力,還可以配合在均衡監聽器來提供更加精準的消費能力。
3.完整代碼實例
public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分區分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 獲取24小時前的時間戳對應偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 處理無有效偏移量的情況(如從頭開始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}