kafka 腳本發送_Kafka筆記歸納(第五部分:一致性保證,消息重復消費場景及解決方式)...

28fcb114ce2f3fba9fdcc18109c6976e.png

寫在開頭:

本章是Kafka學習歸納第五部分,著重于強調Kafka的事一致性保證,消息重復消費場景及解決方式,記錄偏移量的主題,延時隊列的知識點。

文章內容輸出來源:拉勾教育大數據高薪訓練營。

一致性保證

水位標記

水位或水印(watermark)一詞,表示位置信息,即位移(offset)。Kafka源碼中使用的名字是高水位,HW(high watermark)。

LEO和HW

每個分區副本對象都有兩個重要的屬性:LEO和HW

LEO:即日志末端位移(log end offset),記錄了該副本日志中下一條消息的位移值。如果 LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有區別的。

HW:即上面提到的水位值。對于同一個副本對象而言,其HW值不會大于LEO值。小于等于 HW值的所有消息都被認為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同

5d31db3c27b0b6e6c85dee37962a9dc5.png

上圖中,HW值是7,表示位移是 0~7 的所有消息都已經處于“已提交狀態”(committed),而LEO值是14,8~13的消息就是未完全備份(fully replicated)——為什么沒有14?LEO指向的是下一條消息到來時的位移。

消費者無法消費分區下Leader副本中位移大于分區HW的消息

Follower副本何時更新LEO

Follower副本不停地向Leader副本所在的broker發送FETCH請求,一旦獲取消息后寫入自己的日志中進行備份。那么Follower副本的LEO是何時更新的呢?首先我必須言明,Kafka有兩套Follower副本

LEO:

1. 一套LEO保存在Follower副本所在Broker的副本管理機中;

2. 另一套LEO保存在Leader副本所在Broker的副本管理機中。Leader副本機器上保存了所有的follower副本的LEO。

Kafka使用前者幫助Follower副本更新其HW值;利用后者幫助Leader副本更新其HW。

1. Follower副本的本地LEO何時更新? Follower副本的LEO值就是日志的LEO值,每當新寫入一條消息,LEO值就會被更新。當Follower發送FETCH請求后,Leader將數據返回給Follower,此時Follower開始Log寫數據,從而自動更新LEO值。

2. Leader端Follower的LEO何時更新? Leader端的Follower的LEO更新發生在Leader在處理 Follower FETCH請求時。一旦Leader接收到Follower發送的FETCH請求,它先從Log中讀取 相應的數據,給Follower返回數據前,先更新Follower的LEO。

Follower副本何時更新HW

Follower更新HW發生在其更新LEO之后,一旦Follower向Log寫完數據,嘗試更新自己的HW值。

比較當前LEO值與FETCH響應中Leader的HW值,取兩者的小者作為新的HW值。

即:如果Follower的LEO大于Leader的HW,Follower HW值不會大于Leader的HW值。

Leader副本何時更新LEO

和Follower更新LEO相同,Leader寫Log時自動更新自己的LEO值。

Leader副本何時更新HW值

Leader的HW值就是分區HW值,直接影響分區數據對消費者的可見性

Leader會嘗試去更新分區HW的四種情況:

1. Follower副本成為Leader副本時:Kafka會嘗試去更新分區HW。

2. Broker崩潰導致副本被踢出ISR時:檢查下分區HW值是否需要更新是有必要的。

3. 生產者向Leader副本寫消息時:因為寫入消息會更新Leader的LEO,有必要檢查HW值是否需要更新

4. Leader處理Follower FETCH請求時:首先從Log讀取數據,之后嘗試更新分區HW值

結論:

當Kafka broker都正常工作時,分區HW值的更新時機有兩個:

1. Leader處理PRODUCE請求時

2. Leader處理FETCH請求時。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。當嘗試確定分區HW時,它會選出所有滿足條件的副本,比較它們的LEO(包括Leader的LEO),并選擇最小的LEO值作為HW值

需要滿足的條件,(二選一):

1. 處于ISR中

2. 副本LEO落后于Leader LEO的時長不大于 replica.lag.time.max.ms 參數值(默認是10s)

如果Kafka只判斷第一個條件的話,確定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了“立刻進入ISR”的資格,因此就可能出現分區HW值越過ISR中副本LEO的情況——不允許。因為分區HW定義就是ISR中所有副本LEO的最小值

消息重復的場景及解決方案

消息重復和丟失是kafka中很常見的問題,主要發生在以下三個階段:

1. 生產者階段

2. broke階段

3. 消費者階段

生產者階段重復場景

生產發送的消息沒有收到正確的broke響應,導致生產者重試。

生產者發出一條消息,broke落盤以后因為網絡等種種原因發送端得到一個發送失敗的響應或者網絡中斷,然后生產者收到一個可恢復的Exception重試消息導致消息重復。

生產者發送重復解決方案

啟動kafka的冪等性

要啟動kafka的冪等性,設置: enable.idempotence=true ,以及 ack=all 以及 retries > 1

ack=0,不重試。 可能會丟消息,適用于吞吐量指標重要性高于數據丟失,例如:日志收集。

生產者和broke階段消息丟失場景

ack=0,不重試

生產者發送消息完,不管結果了,如果發送失敗也就丟失了。

ack=1,leader crash

生產者發送消息完,只等待Leader寫入成功就返回了,Leader分區丟失了,此時Follower沒來及同步,消息丟失

unclean.leader.election.enable 配置true

允許選舉ISR以外的副本作為leader,會導致數據丟失,默認為false。生產者發送異步消息,只等待Lead寫入成功就返回,Leader分區丟失,此時ISR中沒有Follower,Leader從OSR中選舉,因為OSR中本來落后于Leader造成消息丟失

解決生產者和broke階段消息丟失

禁用unclean選舉,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable=false

生產者發完消息,等待Follower同步完再返回,如果異常則重試。副本的數量可能影響吞吐量,不超過5個,一般三個。 不允許unclean Leader選舉。

配置:min.insync.replicas > 1

當生產者將 acks 設置為 all (或 -1 )時, min.insync.replicas>1 。指定確認消息寫成功需要的最小副本數量。達不到這個最小值,生產者將引發一個異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

當一起使用時, min.insync.replicas 和 ack 允許執行更大的持久性保證。一個典型的場景是創建一個復制因子為3的主題,設置min.insync復制到2個,用 all 配置發送。將確保如果大多數副本沒有收到寫操作,則生產者將引發異常。

失敗的offset單獨記錄

生產者發送消息,會自動重試,遇到不可恢復異常會拋出,這時可以捕獲異常記錄到數據庫或緩存,進行單獨處理。

消費者數據重復場景及解決方案

數據消費完沒有及時提交offset到broker。

消息消費端在消費過程中掛掉沒有及時提交offset到broke,另一個消費端啟動拿之前記錄的offset開始消費,由于offset的滯后性可能會導致新啟動的客戶端有少量重復消費。

解決方案

取消自動提交

每次消費完或者程序退出時手動提交。這可能也沒法保證一條重復。

下游做冪等

一般是讓下游做冪等或者盡量每消費一條消息都記錄offset,對于少數嚴格的場景可能需要把 offset或唯一ID(例如訂單ID)和下游狀態更新放在同一個數據庫里面做事務來保證精確的一次更新或者在下游數據表里面同時記錄消費offset,然后更新下游數據的時候用消費位移做樂觀鎖拒絕舊位移的數據更新。

__consumer_offsets

Kafka 1.0.2將consumer的位移信息保存在Kafka內部的topic中,即__consumer_offsets主題,并且默認提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。

創建topic “tp_test_01”

kafka-topics.sh --zookeeper node1:2181/myKafka --create -- 
topic tp_test_01 --partitions 5 --replication-factor 1

使用kafka-console-producer.sh腳本生產消息

[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt; 
done 
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic 
tp_test_01 < messages.txt 

由于默認沒有指定key,所以根據round-robin方式,消息分布到不同的分區上。 (本例中生產了60條消息)

驗證消息生產成功

kafka-run-class.sh  kafka.tools.GetOffsetShell 
--broker-list node1:9092 --topic tp_test_01 --time  -1

78cc76e1d5c96e5e6eaf8e10b9043382.png

創建一個console consumer group

kafka-console-consumer.sh 
--bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning

獲取該consumer group的group id(后面需要根據該id查詢它的位移信息)

kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list

d78de41e681721442d3a21b91f84ccc0.png

查詢__consumer_offsets topic所有內容

注意:運行下面命令前先要在consumer.properties中設置exclude.internal.topics=false

kafka-console-consumer.sh --topic __consumer_offsets 
--bootstrap-server node1:9092 --formatter 
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" 
--consumer.config config/consumer.properties --from-beginning 

默認情況下__consumer_offsets有50個分區,如果你的系統中consumer group也很多的話,那么這個命令的輸出結果會很多。

計算指定consumer group在__consumer_offsets topic中分區信息

這時候就用到了group.id :console-consumer-77682

Kafka會使用下面公式計算該group位移保存在__consumer_offsets的哪個分區上:

Math.abs(groupID.hashCode()) % numPartitions 

ea6c644321739f52490d9946c4a957b1.png

9efeba35cb4e97bb92a5036bc220397c.png

__consumer_offsets的分區41保存了這個consumer group的位移信息。

獲取指定consumer group的位移信息

kafka-simple-consumer-shell.sh --topic __consumer_offsets 
--partition 41  --broker-list linux121:9092 
--formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

a852b2f9067f8699287963355472df8f.png

可以看到__consumer_offsets topic的每一日志項的格式都是:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

延時隊列

兩個follower副本都已經拉取到了leader副本的最新位置,此時又向leader副本發送拉取請求,而leader副本并沒有新的消息寫入,那么此時leader副本該如何處理呢?可以直接返回空的拉取結果給follower副本,不過在leader副本一直沒有新消息寫入的情況下,follower副本會一直發送拉取請求,并且總收到空的拉取結果,消耗資源。

95549b512b92a6125053e4bd4276c745.png

Kafka在處理拉取請求時,會先讀取一次日志文件,如果收集不到足夠多(fetchMinBytes,由參數fetch.min.bytes配置,默認值為1)的消息,那么就會創建一個延時拉取操作(DelayedFetch)以等待拉取到足夠數量的消息。當延時拉取操作執行時,會再讀取一次日志文件,然后將拉取結果返回給follower副本。

延遲操作不只是拉取消息時的特有操作,在Kafka中有多種延時操作,比如延時數據刪除、延時生產等。

對于延時生產(消息)而言,如果在使用生產者客戶端發送消息的時候將acks參數設置為-1,那么就意味著需要等待ISR集合中的所有副本都確認收到消息之后才能正確地收到響應的結果,或者捕獲超時異常。

4da1cbad3a68e740f8e433800ce56ffc.png

假設某個分區有3個副本:leader、follower1和follower2,它們都在分區的ISR集合中。不考慮ISR變動的情況,Kafka在收到客戶端的生產請求后,將消息3和消息4寫入leader副本的本地日志文件。

由于客戶端設置了acks為-1,那么需要等到follower1和follower2兩個副本都收到消息3和消息4后才能告知客戶端正確地接收了所發送的消息。如果在一定的時間內,follower1副本或follower2副本沒能夠完全拉取到消息3和消息4,那么就需要返回超時異常給客戶端。生產請求的超時時間由參數request.timeout.ms配置,默認值為30000,即30s。

b1bd0bc6373d47c5e529990f796d225f.png

那么這里等待消息3和消息4寫入follower1副本和follower2副本,并返回相應的響應結果給客戶端的動作是由誰來執行的呢?在將消息寫入leader副本的本地日志文件之后,Kafka會創建一個延時的生產操作(DelayedProduce),用來處理消息正常寫入所有副本或超時的情況,以返回相應的響應結果給客戶端。

延時操作需要延時返回響應的結果,首先它必須有一個超時時間(delayMs),如果在這個超時時間內沒有完成既定的任務,那么就需要強制完成以返回響應結果給客戶端。其次,延時操作不同于定時操作,定時操作是指在特定時間之后執行的操作,而延時操作可以在所設定的超時時間之前完成,所以延時操作能夠支持外部事件的觸發。

就延時生產操作而言,它的外部事件是所要寫入消息的某個分區的HW(高水位)發生增長。也就是說,隨著follower副本不斷地與leader副本進行消息同步,進而促使HW進一步增長,HW每增長一次都會檢測是否能夠完成此次延時生產操作,如果可以就執行以此返回響應結果給客戶端;如果在超時時間內始終無法完成,則強制執行。

延時拉取操作,是由超時觸發或外部事件觸發而被執行的。超時觸發很好理解,就是等到超時時間之后觸發第二次讀取日志文件的操作。外部事件觸發就稍復雜了一些,因為拉取請求不單單由follower副本發起,也可以由消費者客戶端發起,兩種情況所對應的外部事件也是不同的。如果是follower副本的延時拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消費者客戶端的延時拉取,它的外部事件可以簡單地理解為HW的增長。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/530367.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/530367.shtml
英文地址,請注明出處:http://en.pswp.cn/news/530367.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Java基礎篇】集合排序

所謂集合排序是指對集合內的元素進行排序。 集合工具類Collections中提供了兩種排序算法&#xff0c;分別是&#xff1a; Collections.sort(List list)Collections.sort(List list,Comparator c) Collections.sort(List list)這種方式需要對象實現Comparable接口&#xff0c;…

語言nomogram校準曲線圖_預測模型的概率校準

1.背景 機器學習分為:監督學習,無監督學習,半監督學習(也可以用hinton所說的強化學習)等。在這里,先簡要介紹一下監督學習從給定的訓練數據集中學習出一個函數(模型參數),當新的數據到來時,可以根據這個函數預測結果。監督學習的訓練集要求包括輸入輸出,也可以說是特征和…

eclipse debug 工程源碼時出現source not found問題解決

問題描述&#xff1a;使用eclipse debug啟動應用&#xff0c;并且打斷點在工程的源碼上面&#xff0c;提示source not found。 問題解決&#xff1a; 1、選中工程&#xff0c;右鍵Debug As》Debug Configurations 2、在Java Application下面選中需要debug的程序&#xff0c;然…

代碼中有個get是啥意思_是時候秀一波了,甩掉get和set,Lombok讓代碼更簡潔

前言前幾天有個新來的同事(實習生)驚訝的對我說&#xff1a;我們的代碼里好多錯誤&#xff0c;我的程序本地都啟動不了。我一臉懵逼的質問他&#xff1a;目前線上的代碼&#xff0c;怎么會有問題嗎&#xff1f;他不服氣的說&#xff1a;你來看嘛&#xff0c;就是有問題&#xf…

JavaWeb工程師知識圖譜

一個工作快三年的的Java菜鳥&#xff0c;總結梳理了一下JavaWeb工程師必須掌握的一些知識點&#xff08;持續更新中。。。&#xff09;。 預覽效果 xmind原始文件 百度云盤 鏈接&#xff1a;https://pan.baidu.com/s/1hp3MWGOX2I8APw75Suu52Q 提取碼&#xff1a;j6w6

【Java中級篇】基于jxl讀寫Excel文件遇到的問題

發生異常&#xff0c;并且提示&#xff1a;unable to recognize ole stream 遇到這個問題需要將Excel文件另存為Excel 97-2003&#xff08;*.xls&#xff09;

松下a6伺服x4接線圖_2021中山東鳳松下溫控器回收價高同行

2021中山東鳳松下溫控器回收價高同行西門子TDC&#xff0c;西門子存儲卡,西門子變頻器等全線西門子自動化產品。小汪 滿意的價格&#xff0c;快的付款速度&#xff0c;熱誠歡迎全國各地朋友洽談合作。具體回收業務&#xff1a;SIEMENS可編程控制器 1、SIMATIC&#xff0c;S7系列…

eclipse啟動發生Failed to load JNI shared library

今天啟動eclipse發生下面的情況 從網上知道是eclipse和jdk位數不一致導致的。 輸入java -version ,查看JDK是多少位&#xff0c;顯示64位的就是64位JDK&#xff0c;未顯示的為32位的JDK。 eclipse的安裝目錄下有一個叫eclipse.ini的配置文件&#xff0c;打開后能看到 x86_64說…

imx226_相機選型器

-- 全部 --AR1820HS (8)CMV2000-2E5C1PP (2)CMV2000-3E12M1PP (2)CMV2000-3E5M1PP (2)CMV4000-3E12M1PP (1)CMV4000-3E5C1PP (5)CMV4000-3E5M1PP (5)EV76C560ABT (25)EV76C560ACT (25)EV76C570ABT (23)EV76C570ACT (23)EV76C661ABT (20)IMX174LLJ-C (8)IMX174LQJ-C (8)IMX178LL…

算法的時間復雜度和空間復雜度的原理

一、算法分析 如何判斷一個算法的好壞呢&#xff1f;首先算法必須要正確&#xff0c;這是最基本的要求。其次&#xff1a; 算法花費的時間算法占用的空間小&#xff08;輔助存儲空間&#xff09;算法要容易調試&#xff0c;測試&#xff0c;理解&#xff0c;編碼&#xff0c;…

5條件篩選功能_一分鐘,徹底學會Excel高級篩選,坐等升職加薪!

Excel中高級篩選是普通篩選的加強&#xff0c;能夠實現更加復雜的篩選功能。請您看下面的示例圖&#xff1a;數據示例圖如果要求篩選出班級為2班且語文成績大于100分的數據&#xff0c;那么使用普通篩選連續篩選兩次就可以得到結果。請您看下面的操作演示&#xff1a;普通篩選操…

數據結構之樹【完善中】

一、樹的概念 樹是一種分組的層次結構。 樹的定義&#xff1a; 樹是n(n>0)個數據元素的集合,在任意一棵非空樹中&#xff0c;有如下特征 有且只有一個根結點&#xff08;無前驅結點&#xff09;當n>1時&#xff0c;其他結點被分為若干個互不相交集合&#xff0c;并且…

phpgif圖片包_PHP生成GIF動態圖片驗證碼

1 <?php2 /**3 * 調用示例4 **/5 session_start();6 $randCode ;7 //驗證碼隨機8 $str"abcdefghjkmnpqrstuvwsyzABCDEFGHJKMNPQRSTUVWSYZ23456789";9 for($i0;$i<4;$i){10 $safe.substr($str,rand(0,strlen($str)),1);11 }12 $_SESSION["imgcode"]…

工程圖標注粗糙度_Inventor教程之工程圖標注實例

1工程圖標注實例對以下實體零件進行全部的標注演示。操作步驟如下&#xff1a;(1)打開文件。運行Inventor&#xff0c;單擊“快速入門”選項卡“啟動”面板上的“打開”按鈕&#xff0c;在“打開”對話框中選擇“實體零件”&#xff0c;單擊“打開”按鈕進入實體零件。(2)新建工…

Oracle數據庫 invalid character問題解決

今天使用PL/SQL Developer這個工具來操作Oracle數據時發現了一個問題&#xff1a; select * from tb_student_grade pivot(max(grade) for course in(math as 數學,chinese as 語文,english as 英語)); 執行這個SQL語句提示invalid character,原因是我的數據庫編碼是AMERICAN…

定時線程_SpringBoot定時任務,@Async多線程異步執行

一、使用SpringBoot實現定時任務這個不是重點&#xff0c;就簡單的實現一下&#xff0c;至于cron表達式怎么寫也不是重點&#xff0c;自行百度即可。1-1、基于 Scheduled 注解的方式import org.springframework.scheduling.annotation.EnableScheduling;import org.springframe…

SpringBoot入門一

SpringBoot能夠很簡單的創建一個直接運行的單體Spring應用 特性&#xff1a; 單體Spring應用內置的tomcat、Jetty提供默認的starter來構建配置自動配置Spring和第三方庫 推薦一個很好的學習教程&#xff0c;https://blog.csdn.net/u010486495/article/details/79348302 1 構…

mysql怎么把datetime類型轉換_mysql怎樣實現time轉datetime

mysql實現time轉datetime的方法&#xff1a;使用在sql語句中【FROM_UNIXTIME(時間值)】&#xff0c;代碼為【insert into test(time) values(FROM_UNIXTIME(%d))",time(NULL)】。mysql實現time轉datetime的方法&#xff1a;FROM_UNIXTIME(time(NULL))將liunx系統的time_t類…

SpringBoot入門二

參考Spring Boot Starters - 御坂研究所 創建自己的starter starter是依賴的一種synthesize&#xff08;合成&#xff09;。 starter會把需要用到的依賴全部包含進來&#xff0c;避免開發者自己手動引入依賴。 starter的邏輯 pom.xml<parent><groupId>org.spri…

Tomcat入門

一&#xff0c;tomcat啟動 雙擊startup.bat,如果出現一閃而過的情況&#xff0c;在文件的末尾添加pause&#xff0c;就可以看到環境變量設置的路徑是否正確 如果無法在電腦的高級系統設置中設置環境變量&#xff0c;可以在setclasspath.bat中設置環境變量 set JAVA_HOMEC:\P…