kafka消費的模式及消息積壓處理方案

目錄

1、kafka消費的流程

2、kafka的消費模式

2.1、點對點模式

2.2、發布-訂閱模式

3、consumer消息積壓

3.1、處理方案

3.2、積壓量

4、消息過期失效

5、kafka注意事項

????????Kafka消費積壓(Consumer Lag)是指消費者處理消息的速度跟不上生產者發送消息的速度,導致消息在Kafka主題中堆積。

關于kakfa的架構圖,如下所示:

更多關于kafka的介紹,參考:關于MQ之kafka的深入研究-CSDN博客https://blog.csdn.net/weixin_50055999/article/details/148535599?spm=1011.2415.3001.5331


1、kafka消費的流程

????????之前的章節中,介紹了kafka消息由producer通過hash函數存放到broker節點后,每個broker節點由多個topic主題組成,可水平擴展。

????????每個topic由多個partitin組成,partition里面的內容有順序,跨partition無序。

對于點對點模式下:

????????消費組內每個消費者可以消費多個partition、同時保留offset偏移位置,保證下次消費。

對于發布訂閱模式

????????不同消費組內的消費者可以消費同一個patition,兩個消費組不受影響,各自保留彼此的offset的偏移位置。

如圖所示:

在消費者消費過程的流程如下:

由上圖可知:

1、每個topic里面包含多個partition。

2、每個partition里面的內容是按順序分布的。

3、每個消費者可以消費多個partition。

4、而partition只能被一個消費者消費。

對于不同消費者組,可以共同消費同一個topic里面的消息。


2、kafka的消費模式

Kafka 的消費訂閱模式取決于消費者組的配置方式,可以分為以下兩種主要模式:

2.1、點對點模式

特點:一條消息只能被一個消費者消費

實現方式

  • 所有消費者屬于同一個消費者組(相同的?group.id

  • Kafka 會在組內消費者之間自動平衡分區分配

// 消費者1和消費者2使用相同的group.id
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "my-consumer-group"); // 相同的組ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

工作流程

  1. 假設主題有3個分區(P0, P1, P2)

  2. 如果有1個消費者,它將消費所有3個分區

  3. 如果增加第二個消費者,Kafka會重新平衡:

    • 消費者1可能獲得P0和P1

    • 消費者2獲得P2

  4. 消息在每個分區內有序,且只被分配給該分區的消費者消費

2.2、發布-訂閱模式

特點一條消息可以被多個消費者(不同消費組)消費(本質還是點對點)

實現方式

  • 不同消費者組訂閱同一個主題

  • 每個消費者組都會收到完整的消息流

// 組A的消費者
Properties propsA = new Properties();
propsA.put("group.id", "group-a"); // 不同組ID
// ...其他配置
KafkaConsumer<String, String> consumerA = new KafkaConsumer<>(propsA);// 組B的消費者
Properties propsB = new Properties();
propsB.put("group.id", "group-b"); // 不同組ID
// ...其他配置
KafkaConsumer<String, String> consumerB = new KafkaConsumer<>(propsB);

工作流程

  1. 生產者發送消息到主題

  2. 組A的所有消費者(作為一個組)會收到消息的一個副本

  3. 組B的所有消費者(作為另一個獨立的組)也會收到消息的一個副本

  4. 在每個組內部,消息仍然遵循點對點模式(組內只有一個消費者收到)


3、consumer消息積壓

????????Kafka消息積壓的問題,核心原因是生產太快、消費太慢,處理速度長期失衡,從而導致消息積壓(Lag)的場景,積壓到超過隊列長度限制,就會出現還未被消費的數據產生丟失的場景。
? ? ? ?如果長時間不解決消息積壓,可能會引發資源緊張服務延遲崩潰等問題。解決消息積壓的關鍵是提高消費者的消費能力,并優化Kafka集群的整體處理效率。

3.1、處理方案

1. 如果是Kafka消費能力不足,則可以考慮增加?topic?的 partition 的個數(提高kafka的并行度)同時提升消費者組的消費者數量,消費數 = 分區數 (二者缺一不可)

2. 若是下游數據處理不及時,則提高每批次拉取的數量。批次拉取數量過少(拉取數據/處理時間 < 生產速度),使處理的數據小于生產的數據,也會造成數據積壓。

方法:

1. 增大partion數量。
2. 消費者加了并發,服務, 擴大消費線程。
3. 增加消費組服務數量。
4. kafka單機升級成了集群。
5. 避免消費者消費消息時間過長,導致超時。
6. 使Kafka分區之間的數據均勻分布。

3.2、積壓量

  • 生產量:Kafka Topic 在一個時間周期內各partition offset 起止時間差值之和。
  • 消費量:Kafka Topic 在一個時間周期內某個消費者的消費量。
  • 積壓量:Kafka Topic 的某個Consumer Group殘留在消息中間件未被及時消費的消息量。

4、消息過期失效

????????產生消息堆積,消費不及時,kafka數據有過期時間,一些數據就丟失了,主要是消費不及時。

當出現這種現象的時候,可參考以下經驗,進行規避:

1. 消費kafka消息時,應該盡量減少每次消費時間,可通過減少調用三方接口、讀庫等操作,
? ?從而減少消息堆積的可能性。
2. 如果消息來不及消費,可以先存在數據庫中,然后逐條消費(可以保存消費記錄,方便定位問題)。
3. 每次接受kafka消息時,先打印出日志,包括消息產生的時間戳。
4. kafka消息保留時間(修改kafka配置文件, 默認一周)
5. 任務啟動從上次提交offset處開始消費處理


5、kafka注意事項

1. 由于Kafka消息key設置,在Kafka producer處,給key加隨機后綴,使其均衡。
?
2. 數據量很大,合理的增加Kafka分區數是關鍵。
? ?Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,
? ?會影響Kafka consumer消費的吞吐量. 如果利用的是Spark流和Kafka direct approach方式,
? ?也可以對KafkaRDD進行repartition重分區,增加并行度處理.


參考文章:

1、Kafka如何處理大量積壓消息_kafka消息堆積過多了怎么辦-CSDN博客https://blog.csdn.net/AlbenXie/article/details/128300018?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522dcefb6fbf11572c5ef4526b40c68a37c%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=dcefb6fbf11572c5ef4526b40c68a37c&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-128300018-null-null.142^v102^pc_search_result_base1&utm_term=kafka%E6%B6%88%E6%81%AF%E7%A7%AF%E5%8E%8B%E6%80%8E%E4%B9%88%E5%A4%84%E7%90%86&spm=1018.2226.3001.4187

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/87437.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/87437.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/87437.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

RAG實踐:Routing機制與Query Construction策略

Routing機制與Query Construction策略 前言RoutingLogical RoutingChatOpenAIStructuredRouting DatasourceConclusion Semantic RoutingEmbedding & LLMPromptRounting PromptConclusion Query ConstructionGrab Youtube video informationStructuredPrompt GithubReferen…

基于python的web系統界面登錄

#讓我們的電腦可以支持服務訪問 #需要一個web框架 #pip install Flask from flask import Flask, render_template,request from random import randint app Flask(__name__) app.route(/index) def index():uname request.args.get("uname")return f"主頁&am…

MATLAB Simulink 終極入門指南:從零設計智能控制系統

為什么工程師都愛Simulink? 想象一下:不寫一行代碼就能設計機器人控制器、飛行算法甚至核反應堆! MATLAB Simulink正是這樣的可視化神器。全球70%的汽車ECU、航天器控制系統用它開發。本文將帶你從零設計一個智能溫控系統,融入創新性的模糊PID控制,并生成可部署的C代碼!…

vue3 javascript 復雜數值計算操作技巧

在Vue 3中處理復雜數值計算&#xff0c;你可以采用多種策略來確保代碼的可讀性、可維護性和性能。以下是一些實用的技巧和最佳實踐&#xff1a; 1. 使用計算屬性&#xff08;Computed Properties&#xff09; Vue 3的computed屬性非常適合處理復雜的數值計算。它們是基于響應…

26.【.NET8 實戰--孢子記賬--從單體到微服務--轉向微服務】--單體轉微服務--角色權限管理

在現代企業級應用中&#xff0c;角色權限管理是保障系統安全和提升用戶體驗的核心基礎功能。一個高效的角色權限系統不僅能夠有效防止越權訪問&#xff0c;還能簡化系統的維護和擴展。本文將系統性介紹角色權限管理的核心實現思路&#xff0c;包括架構設計、性能優化、安全機制…

[VSCode] VSCode 設置 python 的編譯器

VSCode 設置 python 的編譯器 快捷鍵&#xff1a;CTRL SHIFT P 彈出 VSCode 的命令框輸入 Python : select Interpretor選擇自己需要的 python 環境&#xff1b;如 python 3.8 或者 python 3.10 版本

基于PEMFC質子交換膜燃料電池系統的simulink建模與仿真

目錄 1.課題概述 2.系統仿真結果 3.核心程序 4.系統仿真參數 5.系統原理簡介 6.參考文獻 7.完整工程文件 1.課題概述 本課題是一個燃料電池&#xff08;大概率是質子交換膜燃料電池&#xff0c;PEMFC &#xff09;的數學模型仿真框圖&#xff0c;用于模擬燃料電池的電特…

git-build-package 工具代碼詳細解讀

git-build-package&#xff08;gbp&#xff09;是一個用于從 Git 倉庫管理 Debian 軟件包的工具&#xff0c;其代碼架構和實現原理體現了對 Git 版本控制系統和 Debian 打包流程的深度整合。以下是對其代碼的詳細解讀&#xff1a; 代碼架構設計 gbp 的代碼架構設計圍繞其核心…

如何使用ChatGPT快速完成一篇論文初稿?

2小時寫完論文初稿&#xff0c;學境思源&#xff0c;聽起來是不是有點不真實&#xff1f;一鍵生成論文初稿&#xff01;但如果你有一個清晰的框架、良好的寫作節奏&#xff0c;acaids.com。再配合像ChatGPT這樣的寫作助手——真的可以做到。 這篇文章就是手把手告訴你&#xf…

Docker PowerJob

1. Docker PowerJob 1. 拉取PowerJob服務端鏡像 docker pull tjqq/powerjob-server:4.3.92. 創建數據卷目錄用于持久化數據 mkdir -p /home/docker/powerjob/logs mkdir -p /home/docker/powerjob/data mkdir -p /home/docker/powerjob/server mkdir -p /home/docker/powerjob…

Python數據可視化:NumPy生成與Matplotlib折線圖繪制

一、數據生成與可視化概述 在數據分析和科學計算領域,Python已成為最受歡迎的編程語言之一。這主要得益于其豐富的數據處理庫和強大的可視化工具。數據可視化是將抽象數據轉化為直觀圖形表示的過程,它能夠幫助我們發現數據中的模式、趨勢和異常值,從而做出更明智的決策。 …

26.多表查詢

1.笛卡爾集 創建倆表&#xff1a; -- 創建部門表&#xff08;dept&#xff09; use mysql_learn CREATE TABLE dept (deptno INT PRIMARY KEY, dname VARCHAR(50) NOT NULL, loc VARCHAR(50) );-- 創建員工表&#xff08;emp&#xff09; CREATE TABLE emp (em…

深度學習題目(僅供參考)

一、注意力和transformer 一、選擇題 注意力機制的核心步驟不包括&#xff1f; A. 計算注意力分布 B. 加權平均輸入信息 C. 隨機丟棄部分輸入 D. 打分函數計算相關性 答案&#xff1a;C&#xff08;硬性注意力雖隨機選擇輸入&#xff0c;但核心步驟仍為分布計算與加權&#xf…

WebWorker:提升前端性能的多線程利器

簡介 在現代Web開發中&#xff0c;隨著應用越來越復雜&#xff0c;JavaScript的單線程模型開始顯現其局限性。Web Workers的出現為解決這一問題提供了優雅的方案&#xff0c;它允許開發者在后臺線程中運行腳本&#xff0c;而不會影響主線程的性能。 Web Workers是HTML5標準的…

milvus教程:collection和scheme

環境配置&#xff1a;可以看上一節 一.數據庫使用 連接 Milvus Standalone創建數據庫 my_database_1&#xff08;無額外屬性&#xff09;創建數據庫 my_database_2&#xff08;設置副本數為 3&#xff09;列出所有數據庫查看默認數據庫&#xff08;default&#xff09;詳情修…

14:00開始面試,14:06就出來了,問的問題有點變態。。。

從小廠出來&#xff0c;沒想到在另一家公司又寄了。 到這家公司開始上班&#xff0c;加班是每天必不可少的&#xff0c;看在錢給的比較多的份上&#xff0c;就不太計較了。沒想到6月一紙通知&#xff0c;所有人不準加班&#xff0c;加班費不僅沒有了&#xff0c;薪資還要降40%…

Electron(01)

Electron Electron是什么 electron可以使用前端技術開發桌面應用&#xff0c;跨平臺性&#xff0c;開發一套應用&#xff0c;可以打包到三個平臺。 electron結合Chromium&#xff08;谷歌內核&#xff09;和 Node.js 和Native Api 當使用 Electron 時&#xff0c;很重要的一…

Kafka 攔截器深度剖析:原理、配置與實踐

引言 在構建高可用、可擴展的消息系統時&#xff0c;Kafka以其卓越的性能和穩定性成為眾多企業的首選。而Kafka攔截器作為Kafka生態中強大且靈活的功能組件&#xff0c;能夠在消息的生產和消費過程中實現自定義邏輯的注入&#xff0c;為消息處理流程帶來極大的擴展性和可控性。…

Flutter 與原生技術(Objective-C/Swift,java)的關系

在 iOS 開發中&#xff0c;Flutter 與原生技術&#xff08;Objective-C/Swift&#xff09;的關系 一、技術定位與核心差異 Flutter 語言&#xff1a;使用Dart 語言開發&#xff0c;通過 AOT&#xff08;提前編譯&#xff09;將代碼轉換為原生 ARM 指令&#xff0c;無需依賴 iOS…

最新期刊影響因子,基本包含全部期刊

原文鏈接&#xff1a;2024年期刊最新影響因子&#xff08;IF&#xff09; 2024年期刊最新影響因子&#xff08;IF&#xff09; BioinfoR生信筆記 &#xff0c;注于分享生物信息學相關知識和R語言繪圖教程。