Kafka consumer_offsets 主題深度剖析

Kafka consumer_offsets 主題深度剖析

在 Apache Kafka 的消息消費機制中,確保消息被可靠消費是一個核心問題。為了解決這個問題,Kafka 設計了一個特殊的內部主題 consumer_offsets,用于跟蹤和管理消費者組的消費進度。

consumer_offsets 的基本概念

consumer_offsets 是 Kafka 的一個內部主題,它具有以下特征:

  1. 默認包含 50 個分區(可通過 offsets.topic.num.partitions 配置)
  2. 使用 3 個副本因子(可通過 offsets.topic.replication.factor 配置)
  3. 采用日志壓縮(log compaction)的清理策略
  4. 消息格式為二進制的鍵值對

這個主題存儲了所有消費者組的位移信息。每個消費者組消費某個主題分區時,都會定期將自己的消費位置(offset)提交到這個主題中。當消費者重啟或發生再平衡時,可以從這個主題中恢復之前的消費位置,確保消息不會丟失或重復消費。

通過代碼來演示如何實現消費者位移的提交和管理:

public class ConsumerOffsetDemo {private final KafkaConsumer<String, String> consumer;private final String topic;private final String groupId;public ConsumerOffsetDemo(String bootstrapServers, String topic, String groupId) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 關閉自動提交,手動控制位移提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");this.consumer = new KafkaConsumer<>(props);this.topic = topic;this.groupId = groupId;}public void consumeAndCommit() {try {consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息processRecord(record);// 手動提交單條消息的位移Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));consumer.commitSync(offsets);}}} finally {consumer.close();}}
}

位移提交機制

位移提交是 consumer_offsets 主題的核心功能。當消費者消費消息時,需要定期將自己的消費進度提交到這個主題。提交的消息包含以下信息:

  1. key:包含 <消費者組ID, 主題名稱, 分區號> 的三元組
  2. value:包含 offset(位移)、timestamp(時間戳)等信息

提交方式分為自動提交和手動提交:

  1. 自動提交:由消費者自動定期提交,通過 auto.commit.interval.ms 配置提交間隔
  2. 手動提交:由應用程序控制提交時機,可以選擇同步提交或異步提交

下面是一個完整的位移監控實現:

public class OffsetMonitor {private final AdminClient adminClient;private final KafkaConsumer<byte[], byte[]> consumer;public OffsetMonitor(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);props.put(ConsumerConfig.GROUP_ID_CONFIG, "offset-monitor");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());this.consumer = new KafkaConsumer<>(props);}public Map<String, ConsumerGroupOffset> getConsumerGroupOffsets(String groupId) {Map<String, ConsumerGroupOffset> result = new HashMap<>();try {// 獲取消費者組的位移信息ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupId);Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();// 獲取主題的結束位移Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());// 計算消費延遲for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition tp = entry.getKey();long committedOffset = entry.getValue().offset();long endOffset = endOffsets.get(tp);long lag = endOffset - committedOffset;result.put(tp.topic(), new ConsumerGroupOffset(committedOffset, endOffset, lag));}} catch (Exception e) {e.printStackTrace();}return result;}
}

位移管理和運維

在實際運維中,我們需要對 consumer_offsets 主題進行管理和監控。主要包括以下幾個方面:

  1. 位移重置:當需要重新消費某個主題的消息時,可以重置消費者組的位移
  2. 消費者組管理:包括刪除不再使用的消費者組等操作
  3. 監控告警:監控消費延遲,及時發現消費異常

下面是一個位移管理工具的實現:

public class OffsetManager {private final AdminClient adminClient;public OffsetManager(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);}// 重置消費者組位移public void resetOffset(String groupId, String topic, int partition, long offset) {try {TopicPartition tp = new TopicPartition(topic, partition);Map<TopicPartition, OffsetAndMetadata> offsetMap = Collections.singletonMap(tp, new OffsetAndMetadata(offset));adminClient.alterConsumerGroupOffsets(groupId, offsetMap).all().get();System.out.printf("Successfully reset offset for group=%s, topic=%s, " +"partition=%d to %d%n",groupId, topic, partition, offset);} catch (Exception e) {e.printStackTrace();}}// 刪除消費者組public void deleteConsumerGroup(String groupId) {try {adminClient.deleteConsumerGroups(Collections.singleton(groupId)).all().get();System.out.printf("Successfully deleted consumer group: %s%n", groupId);} catch (Exception e) {e.printStackTrace();}}// 監控消費延遲public void monitorConsumerLag(String groupId, String topic) {try {TopicPartition tp = new TopicPartition(topic, 0);Map<TopicPartition, OffsetAndMetadata> offsetMap = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();long currentOffset = offsetMap.get(tp).offset();long endOffset = getEndOffset(tp);long lag = endOffset - currentOffset;if (lag > 10000) { // 設置告警閾值System.out.printf("Warning: High lag detected for group=%s, topic=%s: %d%n",groupId, topic, lag);}} catch (Exception e) {e.printStackTrace();}}private long getEndOffset(TopicPartition tp) {try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(new Properties())) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(tp));return endOffsets.get(tp);}}
}

consumer_offsets 主題是 Kafka 消息消費機制的核心組件,它通過存儲和管理消費位移信息,確保了消息消費的可靠性和可恢復性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/72898.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/72898.shtml
英文地址,請注明出處:http://en.pswp.cn/web/72898.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于javaweb的SpringBoot時裝購物系統設計與實現(源碼+文檔+部署講解)

技術范圍&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬蟲、數據可視化、小程序、安卓app、大數據、物聯網、機器學習等設計與開發。 主要內容&#xff1a;免費功能設計、開題報告、任務書、中期檢查PPT、系統功能實現、代碼編寫、論文編寫和輔導、論…

B站pwn教程筆記-5

復習和回顧 首先復習一下ELF文件在內存和磁盤中的不同。內存只關注讀寫這權限&#xff0c;會合并一些代碼段。 動態鏈接庫只在內存中單獨裝在一份 因為很多軟件都要用動態鏈接庫了&#xff0c;不可能一個個單獨復制一份。但是在有的調試環境下會單獨顯示出來各一份。 ld.so是裝…

云原生網絡拓撲:服務網格的量子糾纏效應

引言&#xff1a;數據平面的蟲洞躍遷 谷歌服務網格每日處理5萬億請求&#xff0c;Istio 1.20版本時延降低至0.8ms。螞蟻集團Mesh架構節省42%CPU開銷&#xff0c;AWS App Mesh實現100ms跨區故障切換。LinkedIn Envoy配置規則達1200萬條&#xff0c;騰訊云API網關QPS突破900萬。…

爬蟲——playwright獲取亞馬遜數據

目錄 playwright簡介使用playwright初窺亞馬遜安裝playwright打開亞馬遜頁面 搞數據搜索修改bug數據獲取翻頁優化結構 簡單保存 playwright簡介 playwright是微軟新出的一個測試工具&#xff0c;與selenium類似&#xff0c;不過與selenium比起來還是有其自身的優勢的&#xff…

Matrix-Breakout-2-Morpheus靶場通關心得:技巧與經驗分享

1.安裝靶機&#xff0c;并在虛擬機打開&#xff0c;確保和kali在同一個NAT網段 2.使用kali來確定該靶機的IP nmap -O 192.168.139.1/24 3.訪問該IP192.168.139.171 4.訪問robots.txt 5.掃描目錄 gobuster dir -u http://192.168.139.171 -x php,bak,txt,html -w /usr/share/d…

機器學習掃盲系列(2)- 深入淺出“反向傳播”-1

系列文章目錄 機器學習掃盲系列&#xff08;1&#xff09;- 序 機器學習掃盲系列&#xff08;2&#xff09;- 深入淺出“反向傳播”-1 文章目錄 前言一、神經網絡的本質二、線性問題解析解的不可行性梯度下降與隨機梯度下降鏈式法則 三、非線性問題激活函數 前言 反向傳播(Ba…

(一)飛行器的姿態歐拉角, 歐拉旋轉, 完全數學推導(基于坐標基的變換矩陣).(偏航角,俯仰角,橫滾角)

(這篇寫的全是基矢變換矩陣)不是坐標變換矩陣,坐標變換矩陣的話轉置一下,之后會有推導. 是通過M轉置變換到P撇點.

C語言和C++到底有什么關系?

C 讀作“C 加加”&#xff0c;是“C Plus Plus”的簡稱。 顧名思義&#xff0c;C 就是在 C 語言的基礎上增加了新特性&#xff0c;玩出了新花樣&#xff0c;所以才說“Plus”&#xff0c;就像 Win11 和 Win10、iPhone 15 和 iPhone 15 Pro 的關系。 C 語言是 1972 年由美國貝…

PCB畫圖軟件PROTEL99SE學習-05畫出銅箔來

sch設計的是各個器件的電連接。設計的就是各種節點的網絡表關系。不管你器件怎么擺放&#xff0c;好看不好看。都不重要。最終設計電路板是把網絡表中連線的網絡節點都用銅箔實物相連&#xff0c;讓他們導電。 網表導出后我們不用去看他&#xff0c;也不用管他的格式。 我們打開…

helm部署metricbeat

背景 在Elastic Stack 7.5版本之前&#xff0c;系統默認采用內置服務進行監控數據采集&#xff08;稱為內部收集機制&#xff09;&#xff0c;這種設計存在顯著局限性&#xff1a; 當ES集群崩潰時自帶的節點監控也會隨之崩潰&#xff0c;直到集群恢復前&#xff0c;崩潰期間的…

【菜鳥飛】AI多模態:vsCode下python訪問阿里云通義文生圖API

目標 有很多多模態的AI工具&#xff0c;用的少就用在線圖形化的&#xff0c;需要批量&#xff0c;就嘗試代碼生成&#xff0c;本文嘗試代碼調用多模態AI&#xff0c;阿里通義有免費額度&#xff0c;作為練手應該挺好&#xff0c;如果以后選其他的&#xff0c;技術也是相通的。…

從零實現本地文生圖部署(Stable Diffusion)

1. 依賴安裝 文件打包下載地址&#xff08;Stable Diffusion&#xff09; # git &#xff1a; 用于下載源碼 https://git-scm.com/downloads/win # Python 作為基礎編譯環境 https://www.python.org/downloads/ # Nvidia 驅動&#xff0c;用于編譯使用GPU顯卡硬件 https://ww…

緩存監控治理在游戲業務的實踐和探索

作者&#xff1a;來自 vivo 互聯網服務器團隊- Wang Zhi 通過對 Redis 和 Caffeine 的緩存監控快速發現和定位問題降低故障的影響面。 一、緩存監控的背景 游戲業務中存在大量的高頻請求尤其是對熱門游戲而言&#xff0c;而應對高并發場景緩存是一個常見且有效的手段。 游戲業…

WordPress漏洞

一&#xff0c;后臺修改模板拿WebShell 1&#xff0c;安裝好靶場后訪問 2&#xff0c;在如圖所示的位置選擇一個php文件寫入一句話木馬&#xff0c;我們這里選擇在404.php中寫入 3&#xff0c;訪問404.php 二&#xff0c;上傳主題拿WebShell 1&#xff0c;找到如圖所示的頁面…

【Linux系列】實時監控磁盤空間:`watch -n 1 ‘df -h‘` 命令詳解

&#x1f49d;&#x1f49d;&#x1f49d;歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:kwan 的首頁,持續學…

騰訊云大模型知識引擎×DeepSeek:股票分析低代碼應用實踐

項目背景與發展歷程 在金融科技快速發展的今天&#xff0c;股票分析作為投資決策的核心環節&#xff0c;正面臨數據量激增和復雜性提升的挑戰。傳統股票分析依賴人工處理&#xff0c;效率低下且成本高昂&#xff0c;而人工智能&#xff08;AI&#xff09;的引入為這一領域帶來…

llama源碼學習·model.py[3]ROPE旋轉位置編碼(4)ROPE的應用

一、源碼注釋 def apply_rotary_emb(xq: torch.Tensor, # 查詢矩陣xk: torch.Tensor, # 鍵矩陣freqs_cis: torch.Tensor, # 旋轉嵌入 ) -> Tuple[torch.Tensor, torch.Tensor]:# 首先將xq和xk張量轉換為浮點數# 然后使用reshape將最后一個維度拆分為兩個維度&#xff0c;每…

dify重磅升級:從0.15.3安全升級1.1.0新手避坑指南

Docker Compose 部署 備份自定義的 docker-compose YAML 文件(可選) cd docker cp docker-compose.yaml docker-compose.yaml.-$(date +%Y-%m-%d-%H-%M).bak從 main 分支獲取最新代碼 git checkout main git pull origin main停止服務,命令,請在 docker 目錄下執行

高性能邊緣計算網關-高算力web組態PLC網關

高性能EG8200Pro邊緣計算算力網關-超強處理能力 樣機申請測試&#xff1a;免費測試超30天&#xff08;https://www.iotrouter.com/prototype/&#xff09; 產品主要特點和特色功能 設備概覽與連接能力 設備型號&#xff1a;EG8200P。主要特點&#xff1a; 支持多種工業協議&am…

穩定運行的以MongoDB數據庫為數據源和目標的ETL性能變差時提高性能方法和步驟

在使用 MongoDB 作為數據源和目標的 ETL&#xff08;提取、轉換、加載&#xff09;過程中&#xff0c;如果性能變差&#xff0c;可能是由于多種原因導致的。為了提高性能&#xff0c;可以按照以下方法和步驟進行排查和優化&#xff1a; 提高 MongoDB ETL 性能需要從多個方面入手…