一,基礎知識
1,消費者與消費組
- 每個消費者都有對應的消費組,不同消費組之間互不影響。
- Partition的消息只能被一個消費組中的一個消費者所消費, 但Partition也可能被再平衡分配給新的消費者。
- 一個Topic的不同Partition會根據分配策略(消費者客戶端參數partition.assignment strategy)分給不同消費者。

2,Kafka的消息模式
消息中間件一般有兩種消息投遞模式:點對點模式和發布/訂閱模式,Kafka 同時支持兩種。
- 如果所有的消費者都屬于同一消費組,那么所有的消息都會被均衡地投遞給每個消費者,即每條消息只會被一個消費者處理,這就相當于點對點模式;
- 如果所有的消費者都隸屬于不同的消費組,那么所有的消息都會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就相當于發布/訂閱模式;
二,Client開發
1,消費邏輯需要具備以下幾個步驟
- 配置消費者參數及創建消費者實例
- 訂閱主題
- 拉取消息并消費
- 提交消費位移
- 關閉消費者實例
public class Consumer {private static final String BROKER_LIST = "localhost:9092";private static final String TOPIC = "TOPIC-A";private static final String GROUP_ID = "GROUP-A";private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);public static Properties initConfig() {Properties properties = new Properties();// 以下3個必須properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 客戶端IDproperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "eris-kafka-consumer");// 消費組IDproperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// 自動提交,默認為trueproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return properties;}public static void main(String[] args) {Properties properties = initConfig();KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));try {while (IS_RUNNING.get()) {// poll內部封裝了消費位移提交、消費者協調器、組協調器、消費者的選舉、分區分配與再均衡、心跳等// Duration用來控制在消費者的緩沖區里沒有可用數據時阻塞等待的時間,0表示不等待直接返回ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> r : records) {print("topic:" + r.topic() + ", patition:" + r.partition() + ", offset:" + r.offset());print("key:" + r.key() + ", value:" + r.value());}}} catch (WakeupException e) {// wakeup方法是KafkaConsumer中唯一可以從其他線程里安全調用的方法,調用wakeup后可以退出poll的邏輯,并拋出WakeupException。我們也不需處理WakeupException,它只是一種跳出循環的方式。} catch (Exception e) {e.printStackTrace();} finally {// maybe commit offset.kafkaConsumer.close();}}
}
注 意:KafkaConsumer是非線程安全的,wakeup()方法是 KafkaConsume 中唯一可以從其他線程里安全調用的方法。
2,subscribe有4個重載方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)// 在之后如果又創建了新主題,并且與正表達式相匹配,那么這個消費者也可以消費到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern)
3,assign訂閱指定的分區
消費者不僅可以通過 subscribe方法訂閱,還可以直接訂閱指定分區 ,如下:
consumer.assign(Arrays.asList(new TopicPartition ("topic-demo", 0))) ;
通過subscribe訂閱主題具有消費者自動再均衡功能 ,可以根據分區分配策略來自動分配各個消費者與分區的關系;
通過assign來訂閱分區時,是不具備消費者自動均衡的功能的。
4,取消訂閱
以下三行代碼等效。
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());
5,消息消費
消費者消費到的每條消息的類型為ConsumerRecord, 這個和生產者發送的ProducerRecord相對應。
注意與ConsumerRecords區別,ConsumerRecords實現了Iterable,是poll返回的對象。
public class ConsumerRecord<K, V> {private final String topic;private final int partition;pr vate final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;//省略若干方法
}
根據Partition或Topic來消費消息
① 根據分區對當前批次消息分類:public List<ConsumerRecord<K, V> records(TopicPart tion partition)for (TopicPartition tp : records.partitions()) {for (ConsumerRecord<String, String> record : records.records(tp)) {System.out.println(record.partition() + ":" + record.value());}
}② 根據主題對當前批次消息分類:public Iterable<ConsumerRecord<K, V> records(String topic)// ConsumerRecords類中并沒提供與partitions()類似的topics()方法來查看拉取的消息集中所含的主題列表。
for (String topic : Arrays.asList(TOPIC)) {for (ConsumerRecord<String, String> record : records.records(topic)) {System.out.println(record.topic() + ":" + record.value());}
}
6,反序列化
ConsumerRecord里的value,就是經過反序列化后的業務對象。
Kafka所提供的反序列器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer。
在實際應用中,在Kafka提供的序列化器和反序列化器滿足不了應用需求的前提下,推薦使用 Avro、JSON、Thrift、 ProtoBuf、Protostuff等通用的序列化工具來包裝,不建議使用自定義的序列化器或反序列化器。
三,位移提交
1,消費位移提交
在每次調用poll方法時,返回的是還沒有被消費過的消息集。
消費位移必須做持久化保存,否則消費者重啟之后就無法知曉之前的消費位移。再者,當有新的消費者加入時,那么必然會再均衡,某個分區可能在再均衡之后分配給新的消費者,如果不持久化保存消費位移,那么這個新消費者也無法知曉之前的消費位移。
在舊消費者客戶端中,消費位移存儲在ZooKeeper中,而在新消費者客戶端中則存儲在Kafka內部的主題 __consumer_offsets中。
這里把將消費位移持久化的動作稱為“提交”?,消費者在消費完消息之后需要執行消費位移的提交。

2,三個位移的關系


- lastConsumedOffset:當前消費到的位置,即poll拉到的該分區最后一條消息的offset
- committed offset:提交的消費位移
- position:下次拉取的位置
position =?committed offset = lastConsumedOffset +?1(當然position和committed offset 并不會一直相同)
TopicPartition tp = new TopicPartition("topic", 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset = 0;while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 同步提交消費位移kafkaConsumer.commitSync();System.out.println("consumed off set is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = kafkaConsumer.position(tp);System.out.println("he offset of t he next record is " + posititon);
}輸出結果:
consumed?offset is 377
commited offset is 378
the offset of the next record is 378
3,消息丟失與重復消費
- 如果poll后立馬提交位移,之后業務異常,再次拉取就從新位移開始,就丟失了數據。
- 如果poll后先處理數據,處理到一半異常了,或者最后提交位移異常,重新拉取會從之前的位移拉,就重復消費了。
4,自動提交位移原理
Kafka中默認的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable.auto.commit配置,默認值為true。
不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數auto.commit.interval.ms配置,默認值為5秒,此參數生效的前提是enable.auto.commit參數為true。
在默認的方式下,消費者每隔5秒會將拉取到的每個分區中最大的消息位移進行提交。動位移提交的動作是在poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移。
自動提交讓編碼更簡潔,但隨之而來的是重復消費和消息丟失的問題。
5,手動提交位移
① ?同步提交
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//do some logical processing?kafkaConsumer.commitSync();
}1)commitSync會根據poll拉取的最新位移來進行提交(注意提交的值對應于圖3-6 position的位置〉。2)可以使用帶參方法,提交指定位移:commitSync(final Map<TopicPartition OffsetAndMetadata> offsets)3)沒必要每條消息提交一次,可以改為批量提交。
② 異步提交
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public?void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)1)異步提交在執行的時候消費者線程不會被阻塞,可能在提交消費位移的結果還未返回之前就開始了新一次的poll操作。2)提交也可能會異常,可引入重試機制。但重試可能出現問題,若第一次commitAsync失敗在重試,第二次成功了,然后第一次重試也成功了,就會覆蓋位移為之前。解決方案:可以提交時維護一個序號,如果發現過期的序號就不再重試。
總結:不管是自動提交還是手動提交(同步、異步),都可能出現漏消費和重復消費,一般情況提交位移這一步操作很少失敗,至于業務異常如何影響提交,需要結合具體情況分析。
可以引入重試機制,重試提交或者業務處理。但重試會增加代碼邏輯復雜度。
還有對于消費者異常退出,重復消費的問題就很難避免,因為這種情況下無法及時提交消費位移,需要消費者的冪等處理。
如果消費者正常退出或發生再均衡況,那么可以 在退出或再均衡執行之前使用同步提交的方式做最后的把關。
try {while (IS_RUNNING.get()) {// poll records and do some logical processing .kafkaConsumer.commitAsync();}
} finally {try {kafkaConsumer.commitSync();} finally {kafkaConsumer.close();}
}
四,暫停或恢復消費
暫停某些分區在poll時返回數據給客戶端和恢復某些分區返回數據給客戶端。
public void pause(Collection<TopicPartition> partitions)
public roid resume(Collection<TopicPartition> partitions)
五,指定位移消費
有了消費位移的持久化,才使消費者在關閉、崩潰或者在遇到再均衡的時候,可以讓接替的消費者能夠根據存儲的消費位移繼續進行消費。
當新的消費組建立的時候,它根本沒有可以查找的消費位移。或者消費組內的新消費者訂閱了一個新的主題,它也沒有可以查找的消費位移。
auto.offset.reset 配置
① 作用:
- 決定從何處消費;
② 何時生效:
- 找不到消費位移記錄時;
- 位移越界時(seek);
③ 配置值:
- (默認值)auto.offset.reset=latest,從分區末尾開始消費;
- auto.offset.reset=earliest,從分區起始開始消費;

seek方法
① 定義:指定消費的位置,可以向前跳過若干消息或回溯消息。
//?partition:分區,offset:從哪個位置消費?
public void seek(TopicPartition partition, long offset)
② seek方法只能重置消費者分配到的分區的消費位置,而分區的分配是在 poll方法過程中實現的,也就是說在執行seek之前需要先執行一次poll,等到分配到分區之后才可以重置消費位置。
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如采不為空,則說明已經成功分配到了分區kafkaConsumer.poll(Duration.ofMillis(1000));assignment = kafkaConsumer.assignment();
}for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp, 10);
}
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//consume the record
}
③?如果對當前消費者未分配到的分區執行 seek方法,那么會報IllegalStateException。
④ 如果消費者啟動能找到位移記錄,又想從頭或者尾消費,可以通過seek結合endOffsets、beginningOffsets或者直接seekToBeginning、seekToEnd實現。
注意:分區的起始位置是0,但并不代表每時每刻都為0,因為日志清理會清理舊的數據 ,所以分區的起始位置自然會增加。
⑤ 按時間戳指定消費位置
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)給定待查分區和時間戳,返回大于等于該時間戳的第一條消息對應的offset和timestamp,對應于OffsetAndTimestamp中的offset、timestamp字段。
⑥ 將分區消費位移存儲在數據庫、文件等外部介質,再通過seek指定消費,可以配合再均衡監聽器來實現新消費者的繼續消費。
六,消費再均衡
再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為。它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費組內的消費者或往消費組內添加消費者。
不過, 在再均衡發生期間的這一小段時間,消費組會變得不可用。而且再均衡容易導致重復消費等問題,一般情況應盡量避免不必要的再均衡。
再均衡監聽器 ?ConsumerRebalanceListener
注冊:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)ConsumerRebalanceListener是一個接口,有2個方法。(1) void onPartitionsRevoked(Collection<TopicPartition> partitions)
再均衡開始之前和消費者停止讀取消息之后被調用。(2) void onPartitionsAssigned(Collection<TopicPartition> partitions)
新分配分區之后和消費者開始拉取消費之前被調用 。
示例①:消費時把位移暫存在Map,再均衡之前同步提交,避免發送再均衡的同時,異步提交還沒提交上去(重復消費)。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
kafkaConsumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {kafkaConsumer.commitSync(currentOffsets);currentOffsets.clear();}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//do nothing .}
});
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {//process the recordcurrentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));
}
kafkaConsumer.commitAsync(currentOffsets, null);
示例②:把位移存在db,再均衡后通過seek定位繼續消費
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// store offset in DB}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition tp : partitions) {// 從DB中讀取消費位移kafkaConsumer.seek(tp, getOffsetFromDB(tp));}}
}
七,消費者攔截器
與生產者攔截器對應,消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.Consumerlnterceptor接口。
Consumerlnterceptor有3個方法:
//?poll方法返回之前調用,可以修改返回的消息內容、按照某種規則過濾消息等
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K , V> records);//?提交完消費位移之后調用,可以用來記錄跟蹤所提交的位移信息,比如當使用commitSync的無參方法時,我們不知道提交的消費位移,而onCommit方法卻可以做到這一點
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);public void close();
在消費者中也有攔截鏈的概念,和生產者的攔截鏈一樣, 也是按照interceptor.classes參數配置的攔截器的順序來一一執行的。如果在攔截鏈中某個攔截器執行失敗,那么下一個攔截器會接著從上一個執行成功的攔截器繼續執行。
八,消費者多線程模型
① 每個消費者單獨線程,只消費一個分區

模型簡單,自動、手動提交位移都很簡單。
優點:每個分區可以按順序消費
缺點:多個TCP連接消耗
② 一個消費者拉取,提交線程池處理

各分區的消費是散亂在各線程,提交位移(順序)復雜。
優點:拉取快,TCP連接少
缺點:分區消費順序不好保障。
上圖是自動提交位移。如果要手動提交,可考慮共享offsets方式,同時為了避免對同一個分區,后序批次提交了更大的位移,前序批次處理失敗造成的消息丟失,可以考慮滑動窗口機制。(參考《深入理解Kafka核心設計與實踐原理》)

