這周我們學習下消費者,仍然還是先從一個消費者的Hello World學起:
public?class?Consumer?{????public?static?void?main(String[]?args)?{????????//?1.?配置參數????????Properties?properties?=?new?Properties();????????properties.put("key.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("value.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("bootstrap.servers",?"localhost:9092");????????properties.put("group.id",?"group.demo");????????//?2.?根據參數創建KafkaConsumer實例(消費者)????????KafkaConsumer?consumer?=?new?KafkaConsumer<>(properties);????????//?3.?訂閱主題????????consumer.subscribe(Collections.singletonList("topic-demo"));????????try?{????????????// 4. 輪循消費????????????while?(true)?{????????????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????????????for?(ConsumerRecord?record?:?records)?{????????????????????System.out.println(record.value());????????????????}????????????}????????}?finally?{????????????//?5.?關閉消費者????????????consumer.close();????????}????}}
前兩步和生產者類似,配置參數然后根據參數創建實例,區別在于消費者使用的是反序列化器,以及多了一個必填參數group.id,用于指定消費者所屬的消費組。關于消費組的概念在《圖解Kafka中的基本概念》中介紹過了,消費組使得消費者的消費能力可橫向擴展,這次再介紹一個新的概念“再均衡”,其意思是將分區的所屬權進行重新分配,發生于消費者中有新的消費者加入或者有消費者宕機的時候。我們先了解再均衡的概念,至于如何再均衡不在此深究。
我們繼續看上面的代碼,第3步,subscribe訂閱期望消費的主題,然后進入第4步,輪循調用poll方法從Kafka服務器拉取消息。給poll方法中傳遞了一個Duration對象,指定poll方法的超時時長,即當緩存區中沒有可消費數據時的阻塞時長,避免輪循過于頻繁。poll方法返回的是一個ConsumerRecords對象,其內部對多個分區的ConsumerRecored進行了封裝,其結構如下:
public?class?ConsumerRecords?implements?Iterable>?{????????private?final?Map>>?records;????//?...????}
而ConsumerRecord則類似ProducerRecord,封裝了消息的相關屬性:
public?class?ConsumerRecord?{????private?final?String?topic;??//?主題????private?final?int?partition;??//?分區號????private?final?long?offset;??//?偏移量????private?final?long?timestamp;??//?時間戳????private?final?TimestampType?timestampType;??//?時間戳類型????private?final?int?serializedKeySize;??//?key序列化后的大小????private?final?int?serializedValueSize;??//?value序列化后的大小????private?final?Headers?headers;??//?消息頭部????private?final?K?key;??//?鍵????private?final?V?value;??//?值????private?final?Optional?leaderEpoch;??//?leader的周期號
相比ProdercerRecord的屬性更多,其中重點講下偏移量,偏移量是分區中一條消息的唯一標識。消費者在每次調用poll方法時,則是根據偏移量去分區拉取相應的消息。而當一臺消費者宕機時,會發生再均衡,將其負責的分區交給其他消費者處理,這時可以根據偏移量去繼續從宕機前消費的位置開始。

而為了應對消費者宕機情況,偏移量被設計成不存儲在消費者的內存中,而是被持久化到一個Kafka的內部主題__consumer_offsets中,在Kafka中,將偏移量存儲的操作稱作提交。而消費者在每次消費消息時都將會將偏移量進行提交,提交的偏移量為下次消費的位置,例如本次消費的偏移量為x,則提交的是x+1。

在代碼中我們并沒有看到顯示的提交代碼,那么Kafka的默認提交方式是什么?默認情況下,消費者會定期以auto_commit_interval_ms(5秒)的頻率進行一次自動提交,而提交的動作發生于poll方法里,在進行拉取操作前會先檢查是否可以進行偏移量提交,如果可以,則會提交即將拉取的偏移量。
下面我們看下這樣一個場景,上次提交的偏移量為2,而當前消費者已經處理了2、3、4號消息,正準備提交5,但卻宕機了。當發生再均衡時,其他消費者將繼續從已提交的2開始消費,于是發生了重復消費的現象。

我們可以通過減小自動提交的時間間隔來減小重復消費的窗口大小,但這樣仍然無法避免重復消費的發生。
按照線性程序的思維,由于自動提交是延遲提交,即在處理完消息之后進行提交,所以應該不會出現消息丟失的現象,也就是已提交的偏移量會大于正在處理的偏移量。但放在多線程環境中,消息丟失的現象是可能發生的。例如線程A負責調用poll方法拉取消息并放入一個隊列中,由線程B負責處理消息。如果線程A已經提交了偏移量5,而線程B還未處理完2、3、4號消息,這時候發生宕機,則將丟失消息。

從上述場景的描述,我們可以知道自動提交是存在風險的。所以Kafka除了自動提交,還提供了手動提交的方式,可以細分為同步提交和異步提交,分別對應了KafkaConsumer中的commitSync和commitAsync方法。我們先嘗試使用同步提交修改程序:
while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitSync();;}
在處理完一批消息后,都會提交偏移量,這樣能減小重復消費的窗口大小,但是由于是同步提交,所以程序會阻塞等待提交成功后再繼續處理下一條消息,這樣會限制程序的吞吐量。那我們改為使用異步提交:
while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitAsync();;}
異步提交時,程序將不會阻塞,但異步提交在提交失敗時也不會進行重試,所以提交是否成功是無法保證的。因此我們可以組合使用兩種提交方式。在輪詢中使用異步提交,而當關閉消費者時,再通過同步提交來保證提交成功。
try?{????while?(true)?{????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????for?(ConsumerRecord?record?:?records)?{????????????System.out.println(record.value());????????}????????consumer.commitAsync();????}}?finally?{????try?{????????consumer.commitSync();????}?finally?{????????consumer.close();????}}
上述介紹的兩種無參的提交方式都是提交的poll返回的一個批次的數據。若未來得及提交,也會造成重復消費,如果還想更進一步減少重復消費,可以在for循環中為commitAsync和commitSync傳入分區和偏移量,進行更細粒度的提交,例如每1000條消息我們提交一次:
Map?currentOffsets?=?new?HashMap<>();int?count?=?0;while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????????//?偏移量加1????????currentOffsets.put(new?TopicPartition(record.topic(),?record.partition()),???????????????????????????new?OffsetAndMetadata(record.offset()?+?1));????????if?(count?%?1000?==?0)?{????????????consumer.commitAsync(currentOffsets,?null);????????}????????count++;????}}
關于提交就介紹到這里。在使用消費者的代理中,我們可以看到poll方法是其中最為核心的方法,能夠拉取到我們需要消費的消息。所以接下來,我們一起深入到消費者API的幕后,看看在poll方法中,都發生了什么,其實現如下:
public?ConsumerRecords?poll(final?Duration?timeout)?{????return?poll(time.timer(timeout),?true);}
在我們使用設置超時時間的poll方法中,會調用重載方法,第二個參數includeMetadataInTimeout用于標識是否把元數據的獲取算在超時時間內,這里傳值為true,也就是算入超時時間內。下面再看重載的poll方法的實現:
private?ConsumerRecords?poll(final?Timer?timer,?final?boolean?includeMetadataInTimeout)?{????//?1.?獲取鎖并確保消費者沒有關閉????acquireAndEnsureOpen();????try?{????????//?2.記錄poll開始????????this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());????????//?3.檢查是否有訂閱主題????????if?(this.subscriptions.hasNoSubscriptionOrUserAssignment())?{????????????throw?new?IllegalStateException("Consumer?is?not?subscribed?to?any?topics?or?assigned?any?partitions");????????}????????do?{????????????//?4.安全的喚醒消費者????????????client.maybeTriggerWakeup();????????????//?5.更新偏移量(如果需要的話)????????????if?(includeMetadataInTimeout)?{????????????????updateAssignmentMetadataIfNeeded(timer,?false);????????????}?else?{????????????????while?(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),?true))?{????????????????????log.warn("Still?waiting?for?metadata");????????????????}????????????}????????????// 6.拉取消息????????????final?Map>>?records?=?pollForFetches(timer);????????????if?(!records.isEmpty())?{????????????????//?7.如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息????????????????//?所以會再次發起拉取消息的請求(異步),提高效率????????????????if?(fetcher.sendFetches()?>?0?||?client.hasPendingRequests())?{????????????????????client.transmitSends();????????????????}????????????????//?8.調用消費者攔截器處理????????????????return?this.interceptors.onConsume(new?ConsumerRecords<>(records));????????????}????????}?while?(timer.notExpired());????????return?ConsumerRecords.empty();????}?finally?{????????//?9.釋放鎖????????release();????????//?10.記錄poll結束????????this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());????}}
我們對上面的代碼逐步分析,首先是第1步acquireAndEnsureOpen方法,獲取鎖并確保消費者沒有關閉,其實現如下:
private?void?acquireAndEnsureOpen()?{????acquire();????if?(this.closed)?{????????release();????????throw?new?IllegalStateException("This?consumer?has?already?been?closed.");????}}
其中acquire方法用于獲取鎖,為什么這里會要上鎖。這是因為KafkaConsumer是線程不安全的,所以需要上鎖,確保只有一個線程使用KafkaConsumer拉取消息,其實現如下:
private?static?final?long?NO_CURRENT_THREAD?=?-1L;private?final?AtomicLong?currentThread?=?new?AtomicLong(NO_CURRENT_THREAD);private?final?AtomicInteger?refcount?=?new?AtomicInteger(0);private?void?acquire()?{????long?threadId?=?Thread.currentThread().getId();????if?(threadId?!=?currentThread.get()?&&?!currentThread.compareAndSet(NO_CURRENT_THREAD,?threadId))????????throw?new?ConcurrentModificationException("KafkaConsumer?is?not?safe?for?multi-threaded?access");????refcount.incrementAndGet();}
用一個原子變量currentThread作為鎖,通過cas操作獲取鎖,如果cas失敗,即獲取鎖失敗,表示發生了競爭,有多個線程在使用KafkaConsumer,則會拋出ConcurrentModificationException異常,如果cas成功,還會將refcount加一,用于重入。
再看第2、3步,記錄poll的開始以及檢查是否有訂閱主題。然后進入do-while循環,如果沒有拉取到消息,將在不超時的情況下一直輪循。
第4步,安全的喚醒消費者,并不是喚醒,而是檢查是否有喚醒的風險,如果程序在執行不可中斷的方法或是收到中斷請求,會拋出異常,這里我還不是很明白,先放一下。
第5步,更新偏移量,就是我們在前文說的在進行拉取操作前會先檢查是否可以進行偏移量提交。
第6步,pollForFetches方法拉取消息,其實現如下:
private?Map>>?pollForFetches(Timer?timer)?{????long?pollTimeout?=?coordinator?==?null???timer.remainingMs()?:????Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()),?timer.remainingMs());????//?1.如果消息已經有了,則立即返回????final?Map>>?records?=?fetcher.fetchedRecords();????if?(!records.isEmpty())?{????????return?records;????}????//?2.準備拉取請求????fetcher.sendFetches();????if?(!cachedSubscriptionHashAllFetchPositions?&&?pollTimeout?>?retryBackoffMs)?{????????pollTimeout?=?retryBackoffMs;????}????Timer?pollTimer?=?time.timer(pollTimeout);????//?3.發送拉取請求????client.poll(pollTimer,?()?->?{????????return?!fetcher.hasAvailableFetches();????});????timer.update(pollTimer.currentTimeMs());????//?3.返回消息????return?fetcher.fetchedRecords();}
如果fetcher已經有消息了則立即返回,這里和下面將要講的第7步對應。如果沒有消息則使用Fetcher準備拉取請求然后再通過ConsumerNetworkClient發送請求,最后返回消息。
為啥消息會已經有了呢,我們回到poll的第7步,如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息,這時候可以使用異步的方式發起下一次的拉取消息的請求,將數據提前拉取,減少網絡IO的等待時間,提高程序的效率。
第8步,調用消費者攔截器處理,就像KafkaProducer中有ProducerInterceptor,在KafkaConsumer中也有ConsumerInterceptor,用于處理返回的消息,處理完后,再返回給用戶。
第9、10步,釋放鎖和記錄poll結束,對應了第1、2步。
對KafkaConsumer的poll方法就分析到這里。最后用一個思維導圖回顧下文中較為重要的知識點:
