記一次Kafka重復消費解決過程

? ? ? ? 起因:車聯網項目開發,車輛發生故障需要給三個系統推送消息,故障上報較為頻繁,所以為了不阻塞主流程,采用了使用kafka。消費方負責推送并保存推送記錄,但在一次壓測中發現,實際只發生了10次故障,但是推送記錄卻有30多條。

????????問題排查,發現是因為其中一個系統宕機,導致往這個系統推送消息時,一直連接超時,導致每條消息的推送時長被拉長。而且kafka消息拉取參數max-poll-records設置了500,意味著一次會批量拉取500條消息到本地處理,而max.poll.interval.ms參數默認是5分鐘,當500條消息處理時長超過5分鐘后,就會認為消費者死掉了,觸發再均衡,導致同一個消息被重復消費。

解決:

? ? ? ? 主要是提高消費者的處理速度,避免不必要的Rebalance。主要采用2種措施:

  1. 減少每次拉去消息數max-poll-records,從500,降到20
  2. 拉取到消息之后異步處理(創建線程池,對推送消息的部分利用多線程處理)

常見配置

fetch.min.byte:配置Consumer一次拉取請求中能從Kafka中拉取的最小數據量,默認為1B,如果小于這個參數配置的值,就需要進行等待,直到數據量滿足這個參數的配置大小。調大可以提交吞吐量,但也會造成延遲

fetch.max.bytes,一次拉取數據的最大數據量,默認為52428800B,也就是50M,但是如果設置的值過小,甚至小于每條消息的值,實際上也是能消費成功的

fetch.wait.max.ms,若是不滿足fetch.min.bytes時,等待消費端請求的最長等待時間,默認是500ms

max.poll.records,單次poll調用返回的最大消息記錄數,如果處理邏輯很輕量,可以適當提高該值。一次從kafka中poll出來的數據條數,max.poll.records條數據需要在在session.timeout.ms這個時間內處理完,默認值為500

consumer.poll(100)?,100 毫秒是一個超時時間,一旦拿到足夠多的數據(fetch.min.bytes 參數設置),consumer.poll(100)會立即返回 ConsumerRecords<String, String> records。如果沒有拿到足夠多的數據,會阻塞100ms,但不會超過100ms就會返回

max.poll.interval.ms,兩次拉取消息的間隔,默認5分鐘;通過消費組管理消費者時,該配置指定拉取消息線程最長空閑時間,若超過這個時間間隔沒有發起poll操作,則消費組認為該消費者已離開了消費組,將進行再均衡操作(將分區分配給組內其他消費者成員)

若超過這個時間則報如下異常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has alreadyrebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or byreducing the maximum size of batches returned in poll() with max.poll.records. 

  即:無法完成提交,因為組已經重新平衡并將分區分配給另一個成員。這意味著對poll()的后續調用之間的時間比配置的max.poll.interval.ms長,這通常意味著poll循環花費了太多的時間來處理消息。

可以通過增加max.poll.interval.ms來解決這個問題,也可以通過減少在poll()中使用max.poll.records返回的批的最大大小來解決這個問題。

max.partition.fetch.bytes:該屬性指定了服務器從每個分區返回給消費者的最大字節數,默認為 1MB。

session.timeout.ms:消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s,將觸發再均衡操作。

對于每一個Consumer Group,Kafka集群為其從Broker集群中選擇一個Broker作為其Coordinator。Coordinator主要做兩件事:

  1. 維持Group成員的組成。這包括加入新的成員,檢測成員的存活性,清除不再存活的成員。

  2. 協調Group成員的行為。

poll機制

  •    每次poll的消息處理完成之后再進行下一次poll,是同步操作
  •    每次poll之前檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移
  •    每次poll時,consumer都將嘗試使用上次消費的offset作為起始offset,然后依次拉取消息
  •    poll(long timeout),timeout指等待輪詢緩沖區的數據所花費的時間,單位是毫秒

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/38022.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/38022.shtml
英文地址,請注明出處:http://en.pswp.cn/news/38022.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

“深入探究JVM內部機制:理解Java虛擬機的工作原理“

標題&#xff1a;深入探究JVM內部機制&#xff1a;理解Java虛擬機的工作原理 摘要&#xff1a;本文將深入探究Java虛擬機&#xff08;JVM&#xff09;的內部機制&#xff0c;幫助讀者理解JVM的工作原理。我們將介紹JVM的組成部分、類加載過程、內存管理和垃圾回收機制&#xf…

帶你了解ChatGPT

目錄 什么是ChatGPT 從ChatGPT角度看聊天機器人的歷史 聊天機器人的早期歷史 ChatGPT的出現 ChatGPT和其他聊天機器人的比較 總結 ChatGPT相比其他聊天機器人的優勢在哪里 1. 自然語言處理能力更強 2. 編程能力高&#xff0c;應用領域廣泛 3. 可以滿足個性化需求 4.…

Golang實現完整聊天室(內附源碼)

項目github地址&#xff1a; 由于我們項目的需要&#xff0c;我就研究了一下關于websocket的相關內容&#xff0c;去實現一個聊天室的功能。 經過幾天的探索&#xff0c;現在使用Gin框架實現了一個完整的聊天室消息實時通知系統。有什么不完善的地方還請大佬指正。 用到的技術…

使用自己的數據利用pytorch搭建全連接神經網絡進行回歸預測

使用自己的數據利用pytorch搭建全連接神經網絡進行回歸預測 1、導入庫2、數據準備3、數據拆分4、數據標準化5、數據轉換6、模型搭建7、模型訓練8、模型預測9、完整代碼 1、導入庫 引入必要的庫&#xff0c;包括PyTorch、Pandas等。 import numpy as np import pandas as pd f…

tp6 RabbitMQ

1、composer 安裝 AMQP 擴展 composer require php-amqplib/php-amqplib 2、RabbitMQ 配置 在 config 目錄下創建 rabbitmq.php 文件 <?php return [host>,port>5672,user>,password>,vhost>,exchange_name > ,queue_name > ,route_key > ,cons…

中國生產了5.07億臺,庫存高達近4億臺?國產手機徹底賣不動了?

統計數據顯示今年上半年中國的手機產量達到5.07億臺&#xff0c;國內市場手機出貨量僅有1.24億臺&#xff0c;都出現了下滑&#xff0c;那么中國手機的產量比銷量多出了3.83億臺&#xff0c;這些手機都成為了庫存&#xff1f; 中國手機市場確實不如早年那么輝煌&#xff0c;201…

【FAQ】安防監控視頻EasyCVR平臺分發的FLV視頻流在VLC中無法播放

眾所周知&#xff0c;TSINGSEE青犀視頻匯聚平臺EasyCVR可支持多協議方式接入&#xff0c;包括主流標準協議國標GB28181、RTSP/Onvif、RTMP等&#xff0c;以及廠家私有協議與SDK接入&#xff0c;包括海康Ehome、海大宇等設備的SDK等。在視頻流的處理與分發上&#xff0c;視頻監控…

P12-Retentive NetWork-RetNet挑戰Transformer

論文地址:https://arxiv.org/abs/2307.08621 目錄 Abstract 一.Introduction 二.Retentive Networks 2.1Retention 2.2Gated Multi-Scale Retention 2.3Overall Architecture of Retention Networks 2.4Relation to and Differences from Previous Methods 三.Experime…

Codeforces Round 892 (Div. 2)(VP)

A //b里放最小值&#xff0c;其他值放c。如果最大值最小值&#xff0c;則無解。 void solve() {int n; cin >> n;vi a(n); liter(x, a) cin >> x; sort(all(a));if (a[0] a[n - 1]){print(-1); return;}vi b, c;for (int i 0; i < sz(a); i){if (a[i] a[0])…

小米基于 Flink 的實時計算資源治理實踐

摘要&#xff1a;本文整理自小米高級軟件工程師張蛟&#xff0c;在 Flink Forward Asia 2022 生產實踐專場的分享。本篇內容主要分為四個部分&#xff1a; 發展現狀與規模框架層治理實踐平臺層治理實踐未來規劃與展望 點擊查看原文視頻 & 演講PPT 一、發展現狀與規模 如上圖…

【03】基礎知識:typescript中的函數

一、typescript 中定義函數的方法 函數聲明法 function test1(): string {return 返回類型為string }function test2(): void {console.log(沒有返回值的方法) }函數表達式/匿名函數 const test3 function(): number {return 1 }二、typescript 中 函數參數寫法 1、typesc…

helm安裝harbor + nerdctl 制作push 鏡像

參考 文章&#xff1a;Helm部署Harbor_helm harbor_風向決定發型丶的博客-CSDN博客 安裝好后使用 nerd containerd對接harbor_containerd 容器 insecure-registries 配置_檸是檸檬的檬的博客-CSDN博客 推送鏡像 Containerd 對接私有鏡像倉庫 Harbor - 知乎 接下來我們來…

麒麟系統相關

創建虛擬機 鏡像下載地址 選擇合適的鏡像&#xff0c;進入引導后注意不要選擇默認的第一條&#xff0c;選擇第二條進入安裝程序。 root密碼修改 使用命令 sudo passwd root 開啟ssh 配置好網絡后發現能ping通&#xff0c;但無法ssh連接&#xff0c;ps -ef | grep ssh 得…

01 qt快速入門

一 qt介紹 1.基本概念 1991年由Qt Company(奇趣)開發的跨平臺C++圖形用戶界面應用程序開發框架,GUI程序和非GUI程序。優點:一套源碼在不同的平臺通過不同的編譯器進行編譯,就可以運行到該平臺上目標機。面向對象的封裝機制來對其接口封裝。 GUI —圖形用戶界面(Graphic…

軟件測試面試題【2023整理版(含答案)】

01、您所熟悉的測試用例設計方法都有哪些&#xff1f;請分別以具體的例子來說明這些方法在測試用例設計工作中的應用。 答&#xff1a;有黑盒和白盒兩種測試種類&#xff0c;黑盒有等價類劃分方法 邊界值分析方法 錯誤推測方法 因果圖方法 判定表驅動分析方法 正交實驗設…

Vue組件之間的傳值匯總

組件之間的傳值 1、父傳子 props 2、父傳子 slot 3、父傳子 不建議用 attrs 4、 子傳父 ref 5、子傳父 emit 6、povide/inject只能在setup的時候用。 7、利用vuex和pinia去實現數據的交互 1、實現代碼App.vue <script setup>import TestProps from ./components/T…

stable-diffusion 模型效果+prompt

摘自個人印象筆記&#xff0c;圖不完整可查看原筆記&#xff1a;https://app.yinxiang.com/fx/55cda0c6-2af5-4d66-bd86-85da79c5574ePrompt運用規則及技巧 &#xff1a; 1. https://publicprompts.art/&#xff08;最適用于OpenArt 線上模型 https://openart.ai/&#xff09;…

【Vue-Router】別名

后臺返回來的路徑名不合理&#xff0c;但多個項目在使用中了&#xff0c;不方便改時可以使用別名。可以有多個或一個。 First.vue <template><h1>First Seciton</h1> </template>Second.vue&#xff0c;Third.vue代碼同理 UserSettings.vue <tem…

R語言生存分析(機器學習)(2)——Enet(彈性網絡)

彈性網絡&#xff08;Elastic Net&#xff09;:是一種用于回歸分析的統計方法&#xff0c;它是嶺回歸&#xff08;Ridge Regression&#xff09;和lasso回歸&#xff08;Lasso Regression&#xff09;的結合&#xff0c;旨在克服它們各自的一些限制。彈性網絡能夠同時考慮L1正則…

mysql 索引 區分字符大小寫

mysql 建立索引&#xff0c;特別是unique索引&#xff0c;是跟字符集、字符排序規則有關的。 對于utf8mb4_0900_ai_ci來說&#xff0c;0900代表Unicode 9.0的規范&#xff0c;ai表示accent insensitivity&#xff0c;也就是“不區分音調”&#xff0c;而ci表示case insensitiv…