Kafka消費者組位移重設指南

#作者:張桐瑞

文章目錄

  • 一、Kafka 與傳統消息引擎的核心差異
  • 二、重設消費者組位移的核心原因
  • 三、重設位移的兩大維度與七種策略
  • 四、重設位移的實現方式
    • (一)Java API 方式
    • (二)命令行腳本方式(Kafka 0.11+)
  • 五、注意事項

一、Kafka 與傳統消息引擎的核心差異

特性Kafka傳統消息引擎(如 RabbitMQ、ActiveMQ)
消息處理方式基于日志結構,只讀不刪除,支持消息重演破壞性處理,成功消費后刪除消息
位移控制消費者自主控制位移,可靈活修改實現重復消費由中間件自動管理,通常無法回溯
適用場景高吞吐量、低單消息處理耗時、強順序性要求復雜消息處理邏輯、弱順序性要求

二、重設消費者組位移的核心原因

  1. 重復消費歷史數據
    1)修正消費邏輯錯誤后,需要重新處理歷史消息。
    2)業務需求變更(如數據重新計算、補寫下游存儲)。
  2. 跳過異常消息
    1)處理 corrupted 消息或消費邏輯拋出異常時,通過指定位移跳過無效消息。
  3. 動態調整消費進度
    2)基于時間維度(如消費近 30 分鐘數據)或位移維度(如從最新 / 最早位置開始)靈活調整消費起點。
  4. 回滾消費進度
    1)代碼變更失敗后,需回滾到歷史位移繼續消費。

三、重設位移的兩大維度與七種策略

(一)位移維度策略

策略說明典型場景
Earliest重置到主題當前最早位移(可能大于 0,受日志保留策略影響)重新消費主題所有可保留的歷史消息
Latest重置到主題最新末端位移跳過所有歷史消息,從最新消息開始消費
Current重置到消費者當前提交的最新位移回滾代碼變更后,恢復到重啟前的消費位置
Specified-Offset指定絕對位移值手動跳過某條異常消息(如位移 1234)
Shift-By-N指定相對位移偏移量(N 可正可負)向前跳過 100 條(N=-100)或向后跳過 50 條(N=50)

(二)時間維度策略

策略說明格式要求典型場景
DateTime重置到指定時間之后的最小位移YYYY-MM-DDTHH:mm:ss.SSS(如2023-10-01T12:00:00.000)重新消費昨天 0 點之后的數據
Duration重置到相對當前時間的間隔位移符合 ISO-8601 的PnDTnHnMnS(如PT15M表示 15 分鐘前)消費 30 分鐘前的所有消息

四、重設位移的實現方式

(一)Java API 方式

核心方法

方法作用
seek(TopicPartition partition, long offset)為單個分區設置絕對位移
seekToBeginning(Collection<TopicPartition> partitions)將多個分區重置到最早位移
seekToEnd(Collection<TopicPartition> partitions)將多個分區重置到最新位移
offsetsForTimes(Map<TopicPartition, Long> timestamps)根據時間戳查找對應的位移

示例代碼

  1. Earliest 策略
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singleton("test-topic"));consumer.poll(0); // 觸發元數據更新List<TopicPartition> partitions = consumer.partitionsFor("test-topic").stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());consumer.seekToBeginning(partitions); // 重置所有分區到最早位移
}
  1. DateTime 策略(重設到 2023-10-01 12:00:00)
long timestamp = LocalDateTime.of(2023, 10, 1, 12, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeMap = consumer.partitionsFor("test-topic").stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toMap(tp -> tp, tp -> timestamp));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timeMap);
offsets.forEach((tp, oa) -> consumer.seek(tp, oa.offset()));

(二)命令行腳本方式(Kafka 0.11+)

bin/kafka-consumer-groups.sh --bootstrap-server <broker地址> --group <消費組名> --reset-offsets [策略參數] --execute
策略	命令示例
Earliest	--to-earliest
Latest	--to-latest
Current	--to-current
Specified-Offset	--to-offset 1234
Shift-By-N	--shift-by -100(向前跳 100 條)
DateTime	--to-datetime "2023-10-01T12:00:00.000"
Duration	--by-duration PT30M(30 分鐘前)

五、注意事項

  1. 消費組狀態
    1)重設位移時,確保消費組未處于運行狀態,避免位移沖突。
  2. 日志保留策略
    1)Earliest策略受log.retention.hours等配置限制,可能無法重置到 0 位移。
  3. 分區分配
    1)API 方式需顯式處理所有分區(如通過partitionsFor獲取分區列表),避免遺漏。
  4. 事務性消息
    1)若消費事務性主題,需結合isolation.level=read_committed確保一致性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/84573.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/84573.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/84573.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

分類模型:邏輯回歸

1、針對設計&#xff1a;二分類 Logistic 回歸最初是為二分類問題設計的&#xff0c; Logistic 回歸基于概率&#xff0c;通過 Sigmoid 函數轉換輸入特征的線性組合&#xff0c;將任意實數映射到 [0, 1] 區間內。 通過引入一個決策規則&#xff08;通常是概率的閾值&#xff…

CppCon 2015 學習:C++ WAT

這段代碼展示了 C 中的一些有趣和令人困惑的特性&#xff0c;尤其是涉及數組訪問和某些語法的巧妙之處。讓我們逐個分析&#xff1a; 1. assert(map[“Hello world!”] e;) 這一行看起來很不尋常&#xff0c;因為 map 在這里被用作數組下標訪問器&#xff0c;但是在前面沒有…

vscode自定義主題語法及流程

vscode c/c 主題 DIY 啟用自己的主題(最后步驟) 重啟生效 文件–>首選項–>主題–>顏色主題: 也可以在插件里找到哈 手把手教你制作 在C:\Users\jlh.vscode\extensions下自己創建一個文件夾 里面有兩個文件和一個文件夾 具體內容: package.json: {"name&…

前端傳遞日期范圍(開始時間和結束時間),后端解析及查詢

前端技術&#xff1a;Vue3 TypeScript Element Plus 后端技術&#xff1a;Java Spring Boot MyBatis 應用效果&#xff1a; 原來方案 1、前端日期控件使用 el-date-picker&#xff0c;日期顯示格式和日期值返回格式都為&#xff1a;YYYY-MM-DD <el-form :model"…

零基礎設計模式——行為型模式 - 命令模式

第四部分&#xff1a;行為型模式 - 命令模式 (Command Pattern) 接下來&#xff0c;我們學習行為型模式中的命令模式。這個模式能將“請求”封裝成一個對象&#xff0c;從而讓你能夠參數化客戶端對象&#xff0c;將請求排隊或記錄請求日志&#xff0c;以及支持可撤銷的操作。 …

禁止 Windows 更新后自動重啟

Windows 默認會在安裝重要更新后自動重啟&#xff0c;但你可以調整設置來避免這種情況&#xff1a; ??方法 1&#xff1a;通過組策略&#xff08;適用于 Windows 專業版/企業版&#xff09;?? 按 Win R&#xff0c;輸入 gpedit.msc 打開 ??本地組策略編輯器??。導航…

GoldenDB簡述

GoldenDB是國產的分布式數據庫。它徹底解決了事務一致性&#xff0c;數據實時一致性的問題。采用的是Shared Nothing&#xff08;分片式存儲&#xff09;的分布式架構。就是不共享數據&#xff0c;各自節點持有各自的數據。對比不共享的&#xff0c;還有其他兩種分布式架構&…

訓練過程中的 Loss ?

文章目錄 在我們訓練的過程中&#xff0c;設置好這個epochs也就是訓練的輪次&#xff0c;然后計算這個損失函數&#xff0c;我們可以知道這個具體的訓練的情況&#xff0c;那么在訓練的過程中&#xff0c;這個損失函數的變化有哪些情況&#xff1f;對應的一個解釋情況是怎么樣的…

S2B2B農產品供應鏈交易多平臺開發有哪些發展前景?如何維護?

一、S2B2B農產品供應鏈交易多平臺開發的未來發展前景 本文將由小編為您介紹關于S2B2B農產品供應鏈交易多平臺開發的內容&#xff0c;希望能夠幫助大家。在數字化時代&#xff0c;農產品供應鏈的數字化轉型成為了一種必然趨勢。S2B2B(Supplier to Business to Business)模式通過…

關于有害的過度使用 std::move

翻譯&#xff1a;2023 11 月 24 日On harmful overuse of std::move cppreference std::move 論 std::move 的有害過度使用 - The Old New Thing C 的 std::move 函數將其參數轉換為右值引用&#xff0c;這使得其內容可以被另一個操作“消費”&#xff08;移動&#xff09;。…

Ubuntu24.04 onnx 模型轉 rknn

前面的環境配置有點懶得寫&#xff0c;教程也很多&#xff0c;可以自己找 rknn-toolkit2 gitee 地址&#xff1a;pingli/rknn-toolkit2 試了很多開源的代碼&#xff0c;都沒辦法跑通&#xff0c; 最后自己改了一版 微調后的 qwen2 模型適用 from rknn.api import RKNN impor…

Electron通信流程

前言 今天講Electron框架的通信流程&#xff0c;首先我們需要知道為什么需要通信。這得益于Electron的多進程模型&#xff0c;它主要模仿chrome的多進程模型如下圖&#xff1a; 作為應用開發者&#xff0c;我們將控制兩種類型的進程&#xff1a;主進程和渲染器進程 。 …

uni-app項目實戰筆記1--創建項目和實現首頁輪播圖功能

ps:本筆記來自B站咸蝦米壁紙項目 一.創建項目&#xff0c;完成項目初始化搭建 1.在HBuilder X創建wallper項目&#xff0c;使用默認模塊&#xff0c;選擇vue&#xff1b; 2.在項目根目錄下創建common目錄&#xff0c;用于存放靜態資源&#xff0c;創建項目時自動生成static目…

機械制造系統中 PROFINET 與 PROFIBUS-DP 的融合應用及捷米科技解決方案

在機械制造領域&#xff0c;工業通信網絡的兼容性與靈活性直接影響產線的自動化水平與生產效率。當前&#xff0c;多數機械制造系統采用PROFINET 控制器構建核心網絡架構&#xff0c;并通過微波無線連接實現設備互聯。隨著工業網絡的發展&#xff0c;系統中常需同時集成PROFINE…

MCP 協議系列序言篇:開啟 AI 應用融合新時代的鑰匙

文章目錄 序言&#xff1a;AI 應用層進入 MCP 時代為什么 MCP 開啟 AI 應用融合新時代的鑰匙為什么是 MCP&#xff1f;它與 Function Calling、Agent 有什么區別&#xff1f;Function CallingAI AgentMCP&#xff08;Model Context Protocol&#xff09; MCP 如何工作MCP Serve…

【threejs】每天一個小案例講解:光照

代碼倉 GitHub - TiffanyHoo/three_practices: Learning three.js together! 可自行clone&#xff0c;無需安裝依賴&#xff0c;直接liver-server運行/直接打開chapter01中的html文件 運行效果圖 知識要點 常見光照類型及其特點如下&#xff1a; 1. 環境光&#xff08;Ambi…

大模型在輸尿管下段積水預測及臨床應用的研究

目錄 一、引言 1.1 研究背景與意義 1.2 研究目的 1.3 研究范圍與限制 1.4 文獻綜述 1.5 研究方法和框架 二、相關理論與概念 2.1 大模型技術原理 2.2 輸尿管下段積水病理機制 2.3 大模型在醫學預測領域的應用 三、大模型預測輸尿管下段積水的方法 3.1 數據收集 3.…

gitlab相關操作

2025.06.11今天我學習了如何在終端使用git相關操作&#xff1a; 一、需要修改新的倉庫git地址的時候&#xff1a; &#xff08;1&#xff09;檢查當前遠程倉庫 git remote -v 輸出示例&#xff1a; origin https://github.com/old-repo.git (fetch) origin https://github.c…

51c自動駕駛~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention為LLM長文本建模帶來突破性進展 琶洲實驗室、華南理工大學聯合推出關鍵上下文感知注意力機制&#xff08;CCA-Attention&#xff09;&#xff0c;…

通過共享內存在多程序之間實現數據通信

注&#xff1a;以下內容為與 GPT-4O 共同創作完成 以共享內存的方式實現多程序之間的數據通信&#xff0c;尤其適合在一臺機器上的多程序之間進行高頻數據交換。 以下示例展示了 sender.py 向 receiver.py 發送數據并接收經 receiver.py 處理后的數據&#xff0c;以及如何通過…