kafka consumer配置拉取速度慢_Kafka消費者的使用和原理

這周我們學習下消費者,仍然還是先從一個消費者的Hello World學起:

public?class?Consumer?{????public?static?void?main(String[]?args)?{????????//?1.?配置參數????????Properties?properties?=?new?Properties();????????properties.put("key.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("value.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("bootstrap.servers",?"localhost:9092");????????properties.put("group.id",?"group.demo");????????//?2.?根據參數創建KafkaConsumer實例(消費者)????????KafkaConsumer?consumer?=?new?KafkaConsumer<>(properties);????????//?3.?訂閱主題????????consumer.subscribe(Collections.singletonList("topic-demo"));????????try?{????????????// 4. 輪循消費????????????while?(true)?{????????????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????????????for?(ConsumerRecord?record?:?records)?{????????????????????System.out.println(record.value());????????????????}????????????}????????}?finally?{????????????//?5.?關閉消費者????????????consumer.close();????????}????}}

前兩步和生產者類似,配置參數然后根據參數創建實例,區別在于消費者使用的是反序列化器,以及多了一個必填參數group.id,用于指定消費者所屬的消費組。關于消費組的概念在《圖解Kafka中的基本概念》中介紹過了,消費組使得消費者的消費能力可橫向擴展,這次再介紹一個新的概念“再均衡”,其意思是將分區的所屬權進行重新分配,發生于消費者中有新的消費者加入或者有消費者宕機的時候。我們先了解再均衡的概念,至于如何再均衡不在此深究。

我們繼續看上面的代碼,第3步,subscribe訂閱期望消費的主題,然后進入第4步,輪循調用poll方法從Kafka服務器拉取消息。給poll方法中傳遞了一個Duration對象,指定poll方法的超時時長,即當緩存區中沒有可消費數據時的阻塞時長,避免輪循過于頻繁。poll方法返回的是一個ConsumerRecords對象,其內部對多個分區的ConsumerRecored進行了封裝,其結構如下:

public?class?ConsumerRecords?implements?Iterable>?{????????private?final?Map>>?records;????//?...????}

而ConsumerRecord則類似ProducerRecord,封裝了消息的相關屬性:

public?class?ConsumerRecord?{????private?final?String?topic;??//?主題????private?final?int?partition;??//?分區號????private?final?long?offset;??//?偏移量????private?final?long?timestamp;??//?時間戳????private?final?TimestampType?timestampType;??//?時間戳類型????private?final?int?serializedKeySize;??//?key序列化后的大小????private?final?int?serializedValueSize;??//?value序列化后的大小????private?final?Headers?headers;??//?消息頭部????private?final?K?key;??//?鍵????private?final?V?value;??//?值????private?final?Optional?leaderEpoch;??//?leader的周期號

相比ProdercerRecord的屬性更多,其中重點講下偏移量,偏移量是分區中一條消息的唯一標識。消費者在每次調用poll方法時,則是根據偏移量去分區拉取相應的消息。而當一臺消費者宕機時,會發生再均衡,將其負責的分區交給其他消費者處理,這時可以根據偏移量去繼續從宕機前消費的位置開始。

186eb45cca4e4eec9f4981883d999062.png

而為了應對消費者宕機情況,偏移量被設計成不存儲在消費者的內存中,而是被持久化到一個Kafka的內部主題__consumer_offsets中,在Kafka中,將偏移量存儲的操作稱作提交。而消費者在每次消費消息時都將會將偏移量進行提交,提交的偏移量為下次消費的位置,例如本次消費的偏移量為x,則提交的是x+1。

2d92c93c956090c42bcc559ab3a537a5.png

在代碼中我們并沒有看到顯示的提交代碼,那么Kafka的默認提交方式是什么?默認情況下,消費者會定期以auto_commit_interval_ms(5秒)的頻率進行一次自動提交,而提交的動作發生于poll方法里,在進行拉取操作前會先檢查是否可以進行偏移量提交,如果可以,則會提交即將拉取的偏移量。

下面我們看下這樣一個場景,上次提交的偏移量為2,而當前消費者已經處理了2、3、4號消息,正準備提交5,但卻宕機了。當發生再均衡時,其他消費者將繼續從已提交的2開始消費,于是發生了重復消費的現象。

12bd84d6f35bffdd86cc77ae58e57299.png

我們可以通過減小自動提交的時間間隔來減小重復消費的窗口大小,但這樣仍然無法避免重復消費的發生。

按照線性程序的思維,由于自動提交是延遲提交,即在處理完消息之后進行提交,所以應該不會出現消息丟失的現象,也就是已提交的偏移量會大于正在處理的偏移量。但放在多線程環境中,消息丟失的現象是可能發生的。例如線程A負責調用poll方法拉取消息并放入一個隊列中,由線程B負責處理消息。如果線程A已經提交了偏移量5,而線程B還未處理完2、3、4號消息,這時候發生宕機,則將丟失消息。

cfbbeae2c67daf5776415208989f662a.png

從上述場景的描述,我們可以知道自動提交是存在風險的。所以Kafka除了自動提交,還提供了手動提交的方式,可以細分為同步提交異步提交,分別對應了KafkaConsumer中的commitSync和commitAsync方法。我們先嘗試使用同步提交修改程序:

while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitSync();;}

在處理完一批消息后,都會提交偏移量,這樣能減小重復消費的窗口大小,但是由于是同步提交,所以程序會阻塞等待提交成功后再繼續處理下一條消息,這樣會限制程序的吞吐量。那我們改為使用異步提交:

while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitAsync();;}

異步提交時,程序將不會阻塞,但異步提交在提交失敗時也不會進行重試,所以提交是否成功是無法保證的。因此我們可以組合使用兩種提交方式。在輪詢中使用異步提交,而當關閉消費者時,再通過同步提交來保證提交成功。

try?{????while?(true)?{????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????for?(ConsumerRecord?record?:?records)?{????????????System.out.println(record.value());????????}????????consumer.commitAsync();????}}?finally?{????try?{????????consumer.commitSync();????}?finally?{????????consumer.close();????}}

上述介紹的兩種無參的提交方式都是提交的poll返回的一個批次的數據。若未來得及提交,也會造成重復消費,如果還想更進一步減少重復消費,可以在for循環中為commitAsync和commitSync傳入分區和偏移量,進行更細粒度的提交,例如每1000條消息我們提交一次:

Map?currentOffsets?=?new?HashMap<>();int?count?=?0;while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????????//?偏移量加1????????currentOffsets.put(new?TopicPartition(record.topic(),?record.partition()),???????????????????????????new?OffsetAndMetadata(record.offset()?+?1));????????if?(count?%?1000?==?0)?{????????????consumer.commitAsync(currentOffsets,?null);????????}????????count++;????}}

關于提交就介紹到這里。在使用消費者的代理中,我們可以看到poll方法是其中最為核心的方法,能夠拉取到我們需要消費的消息。所以接下來,我們一起深入到消費者API的幕后,看看在poll方法中,都發生了什么,其實現如下:

public?ConsumerRecords?poll(final?Duration?timeout)?{????return?poll(time.timer(timeout),?true);}

在我們使用設置超時時間的poll方法中,會調用重載方法,第二個參數includeMetadataInTimeout用于標識是否把元數據的獲取算在超時時間內,這里傳值為true,也就是算入超時時間內。下面再看重載的poll方法的實現:

private?ConsumerRecords?poll(final?Timer?timer,?final?boolean?includeMetadataInTimeout)?{????//?1.?獲取鎖并確保消費者沒有關閉????acquireAndEnsureOpen();????try?{????????//?2.記錄poll開始????????this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());????????//?3.檢查是否有訂閱主題????????if?(this.subscriptions.hasNoSubscriptionOrUserAssignment())?{????????????throw?new?IllegalStateException("Consumer?is?not?subscribed?to?any?topics?or?assigned?any?partitions");????????}????????do?{????????????//?4.安全的喚醒消費者????????????client.maybeTriggerWakeup();????????????//?5.更新偏移量(如果需要的話)????????????if?(includeMetadataInTimeout)?{????????????????updateAssignmentMetadataIfNeeded(timer,?false);????????????}?else?{????????????????while?(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),?true))?{????????????????????log.warn("Still?waiting?for?metadata");????????????????}????????????}????????????// 6.拉取消息????????????final?Map>>?records?=?pollForFetches(timer);????????????if?(!records.isEmpty())?{????????????????//?7.如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息????????????????//?所以會再次發起拉取消息的請求(異步),提高效率????????????????if?(fetcher.sendFetches()?>?0?||?client.hasPendingRequests())?{????????????????????client.transmitSends();????????????????}????????????????//?8.調用消費者攔截器處理????????????????return?this.interceptors.onConsume(new?ConsumerRecords<>(records));????????????}????????}?while?(timer.notExpired());????????return?ConsumerRecords.empty();????}?finally?{????????//?9.釋放鎖????????release();????????//?10.記錄poll結束????????this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());????}}

我們對上面的代碼逐步分析,首先是第1步acquireAndEnsureOpen方法,獲取鎖并確保消費者沒有關閉,其實現如下:

private?void?acquireAndEnsureOpen()?{????acquire();????if?(this.closed)?{????????release();????????throw?new?IllegalStateException("This?consumer?has?already?been?closed.");????}}

其中acquire方法用于獲取鎖,為什么這里會要上鎖。這是因為KafkaConsumer是線程不安全的,所以需要上鎖,確保只有一個線程使用KafkaConsumer拉取消息,其實現如下:

private?static?final?long?NO_CURRENT_THREAD?=?-1L;private?final?AtomicLong?currentThread?=?new?AtomicLong(NO_CURRENT_THREAD);private?final?AtomicInteger?refcount?=?new?AtomicInteger(0);private?void?acquire()?{????long?threadId?=?Thread.currentThread().getId();????if?(threadId?!=?currentThread.get()?&&?!currentThread.compareAndSet(NO_CURRENT_THREAD,?threadId))????????throw?new?ConcurrentModificationException("KafkaConsumer?is?not?safe?for?multi-threaded?access");????refcount.incrementAndGet();}

用一個原子變量currentThread作為鎖,通過cas操作獲取鎖,如果cas失敗,即獲取鎖失敗,表示發生了競爭,有多個線程在使用KafkaConsumer,則會拋出ConcurrentModificationException異常,如果cas成功,還會將refcount加一,用于重入。

再看第2、3步,記錄poll的開始以及檢查是否有訂閱主題。然后進入do-while循環,如果沒有拉取到消息,將在不超時的情況下一直輪循。

第4步,安全的喚醒消費者,并不是喚醒,而是檢查是否有喚醒的風險,如果程序在執行不可中斷的方法或是收到中斷請求,會拋出異常,這里我還不是很明白,先放一下。

第5步,更新偏移量,就是我們在前文說的在進行拉取操作前會先檢查是否可以進行偏移量提交。

第6步,pollForFetches方法拉取消息,其實現如下:

private?Map>>?pollForFetches(Timer?timer)?{????long?pollTimeout?=?coordinator?==?null???timer.remainingMs()?:????Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()),?timer.remainingMs());????//?1.如果消息已經有了,則立即返回????final?Map>>?records?=?fetcher.fetchedRecords();????if?(!records.isEmpty())?{????????return?records;????}????//?2.準備拉取請求????fetcher.sendFetches();????if?(!cachedSubscriptionHashAllFetchPositions?&&?pollTimeout?>?retryBackoffMs)?{????????pollTimeout?=?retryBackoffMs;????}????Timer?pollTimer?=?time.timer(pollTimeout);????//?3.發送拉取請求????client.poll(pollTimer,?()?->?{????????return?!fetcher.hasAvailableFetches();????});????timer.update(pollTimer.currentTimeMs());????//?3.返回消息????return?fetcher.fetchedRecords();}

如果fetcher已經有消息了則立即返回,這里和下面將要講的第7步對應。如果沒有消息則使用Fetcher準備拉取請求然后再通過ConsumerNetworkClient發送請求,最后返回消息。

為啥消息會已經有了呢,我們回到poll的第7步,如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息,這時候可以使用異步的方式發起下一次的拉取消息的請求,將數據提前拉取,減少網絡IO的等待時間,提高程序的效率。

第8步,調用消費者攔截器處理,就像KafkaProducer中有ProducerInterceptor,在KafkaConsumer中也有ConsumerInterceptor,用于處理返回的消息,處理完后,再返回給用戶。

第9、10步,釋放鎖和記錄poll結束,對應了第1、2步。

對KafkaConsumer的poll方法就分析到這里。最后用一個思維導圖回顧下文中較為重要的知識點:

2b77e380fd8dbd8f7b5e69582388de92.png

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/383065.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/383065.shtml
英文地址,請注明出處:http://en.pswp.cn/news/383065.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

前綴和

前綴和 輸入一個長度為n的整數序列。 接下來再輸入m個詢問&#xff0c;每個詢問輸入一對l, r。 對于每個詢問&#xff0c;輸出原序列中從第l個數到第r個數的和。 輸入格式 第一行包含兩個整數n和m。 第二行包含n個整數&#xff0c;表示整數數列。 接下來m行&#xff0c;…

子矩陣的和

題目描述 輸入一個n行m列的整數矩陣&#xff0c;再輸入q個詢問&#xff0c;每個詢問包含四個整數x1, y1, x2, y2&#xff0c;表示一個子矩陣的左上角坐標和右下角坐標。 對于每個詢問輸出子矩陣中所有數的和。 輸入格式 第一行包含三個整數n&#xff0c;m&#xff0c;q。 …

jmeter 循環取值賦值給form_JMeter系列(三)邏輯控制器詳解

循環控制器&#xff1a;指定迭代次數&#xff0c;可以用具體數字&#xff0c;也可以通過變量控制永遠&#xff1a;表示無限循環點擊查看示例&#xff1a;Jmeter實例(四)_圖片爬蟲簡單控制器&#xff1a;這是最基礎的一個控制器&#xff0c;它可以讓腳本分層&#xff0c;變成一個…

c 復雜的前置后置面試題_OPPO Reno拆解:優秀工藝由外而內,復雜用料不負旗艦之名...

OPPO的新系列Reno手機最近吸引了不少注意力&#xff0c;不管是消費者還是手機極客都對其優秀的性能和強大的配置抱有極大的興趣。最近&#xff0c;知名數碼博主愛玩客對Reno十倍變焦版進行了拆解&#xff0c;從內部結構向我們揭示了這部手機的強大之處。并且點評道&#xff1a;…

差分矩陣

題目描述 輸入一個n行m列的整數矩陣&#xff0c;再輸入q個操作&#xff0c;每個操作包含五個整數x1, y1, x2, y2, c&#xff0c;其中(x1, y1)和(x2, y2)表示一個子矩陣的左上角坐標和右下角坐標。 每個操作都要將選中的子矩陣中的每個元素的值加上c。 請你將進行完所有操作后…

python常用的開發環境包括_Python語言主要包括哪些集成開發環境?_學小易找答案...

【填空題】Python的標準隨機數生成器模塊是【簡答題】Why does critical thinking matter?【簡答題】采集瓶子的外形進行創意設計 用點、線、面進行裝飾填充 A4紙手繪,構圖要有新意,要飽滿【簡答題】How can a lack of critical thinking cause a loss of personal freedom?【…

最長連續不重復子序列

題目描述 給定一個長度為n的整數序列&#xff0c;請找出最長的不包含重復數字的連續區間&#xff0c;輸出它的長度。 輸入格式 第一行包含整數n。 第二行包含n個整數&#xff08;均在0~100000范圍內&#xff09;&#xff0c;表示整數序列。 輸出格式 共一行&#xff0c;包…

ocp跟oce的區別 oracle_Oracle視頻10g 11g認證視頻教程 OCA/OCP 從入門到精通 數據庫DBA...

一、認證Oracle OCP認證(Database 10g Administrator Certified Professional)為Oracle公司的數據庫專家的認證。擁有OCP認證說明你擁有了大型Oracle數據庫管理的技術能力&#xff0c;具備了成為大型企業核心數據庫系統管理員的資格。OCE 1Z0-051&#xff1a;Oracle Database 1…

小愛同學app安卓版_小愛同學app下載-小米小愛同學下載2.9.21安卓版-西西軟件下載...

小米小愛同學是小米AI音箱的配套軟件&#xff0c;小愛同學是AI音箱的擬人虛擬形象&#xff0c;是一個二次元的萌妹子&#xff0c;如果你購買了小米AI音箱可以通過跟小愛同學交流來讓小米智能音箱幫你完成你想要的服務。小愛同學支持海量互聯網內容&#xff0c;包括在線音樂&…

python畫太極八卦圖_先天太極八卦圖的唯一正確畫法

我們先百度一下先天太極八卦圖.↑&#xff0c;看看結果百度出來的圖片第一頁上半部分&#xff0c;結果非常驚人&#xff0c;40張圖片&#xff0c;沒有一張是正確的。錯誤原因分為兩大類&#xff1a;1.太極圖旋轉方向或陰陽魚所在位置錯誤 2.八卦中每卦的三爻畫法錯誤1. 先天太極…

函數無法識別_PostgreSQL找不到最佳函數問題解析

最近給項目做支持&#xff0c;由于函數類型問題&#xff0c;加了幾條函數定義。用戶使用函數場景是func(string, string)。當時給用戶添加了一條函數定義&#xff1a;func(text, text)。后來由于和其他函數沖突改成了func(varchar, varchar)。varchar和text同樣都是字符串類型&…

Xshell鏈接不上云服務器的解決方案

1.ssh拒絕請求 先該配置文件 https://blog.csdn.net/u012206617/article/details/83026777?ops_request_misc&request_id&biz_id102&utm_termssh%E6%9C%8D%E5%8A%A1%E5%99%A8%E6%8B%92%E7%BB%9D%E4%BA%86%E5%AF%86%E7%A0%81%20%E8%AF%B7%E5%86%8D%E8%AF%95%E4%B8…

框架controller找不到_SpingBoot框架知識詳解

Spring boot框架1、什么是Spring Boot&#xff1f;? Spring Boot是Spring開源組織下的子項目&#xff0c;是Spring組件一站式解決方案&#xff0c;主要是簡化了使用Spring的難度&#xff0c;簡省了繁重的配置&#xff0c;提供了各種啟動器&#xff0c;開發者能快速上手。Sprin…

架構的演變

基本概念 在介紹架構之前&#xff0c;為了避免部分讀者對架構設計中的一些概念不了解&#xff0c;下面對幾個最基礎的概念進行介紹。 1.什么是分布式&#xff1f; 系統中的多個模塊在不同服務器上部署&#xff0c;即可稱為分布式系統&#xff0c;如Tomcat和數據庫分別部署在…

axure8.0導出頁面打不開問題_excel怎么轉pdf?excel打不開?轉換成PDF就行了

excel轉pdf怎么做&#xff1f;年底最后一天了&#xff0c;我都被一堆的Excel文件搞得頭疼&#xff0c;在這些時間里&#xff0c;要讓我對幾個G的文件進行操作&#xff0c;我已經是忙得不可開交&#xff0c;而在最后的最后&#xff0c;我的主管還說他的電腦無法打開我的Excel 了…

質數相關問題

試除法判定質數 題目描述 給定n個正整數ai&#xff0c;判定每個數是否是質數。 輸入格式 第一行包含整數n。 接下來n行&#xff0c;每行包含一個正整數ai。 輸出格式 共n行&#xff0c;其中第 i 行輸出第 i 個正整數ai是否為質數&#xff0c;是則輸出“Yes”&#xff0c…

python怎么爬蟲理數據_Python神技能 | 使用爬蟲獲取汽車之家全車型數據

最近想在工作相關的項目上做技術改進&#xff0c;需要全而準的車型數據&#xff0c;尋尋覓覓而不得&#xff0c;所以就只能自己動手豐衣足食&#xff0c;到網上獲&#xff08;竊&#xff09;得&#xff08;取&#xff09;數據了。汽車之家是大家公認的數據做的比較好的汽車網站…

linux運算_CentOS「linux」學習筆記22:算術運算符、邏輯運算符、關系運算符

?linux基礎操作&#xff1a;主要介紹啦算術運算符、邏輯運算符、關系運算符1.算術運算符[主要用來計算數值]注意使用expr運算時運算符和數值之間需要有空格&#xff0c;其他方式運算時不能有空格。常用算術運算符號&#xff1a;表示相加&#xff0c;&#xff0d;表示相減&…

python實現小型搜索引擎設計_基于JAVA的中小型飯店餐飲管理系統的設計與實現...

好程序設計擅長JAVA(SSM,SSH,SPRINGBOOT)、PYTHON(DJANGO/FLASK)、THINKPHP、C#、安卓、微信小程序、MYSQL、SQLSERVER等&#xff0c;歡迎咨詢今天將為大家分析一個中小型飯店餐飲管理系統(俗話說“民以食為天”,中國的飲食文化有著久遠的歷史。“吃”不僅僅指的是填飽肚子,它早…

評估報告有效期過期了怎么辦_托福成績過期了怎么辦?

托福成績是有期限的&#xff0c;考生申請美國大學的時候也只能在托福成績有效期內。所以考托福的時候一定要關注一下托福成績什么時候過期&#xff0c;以及大學申請的截止日期&#xff0c;提前做好安排。下面我們一起看看關于托福成績有效期的相關問題。托福成績有效期是多久&a…