kafka學習筆記(三、消費者Consumer使用教程——從指定位置消費)

在這里插入圖片描述


1.簡介

Kafka的poll()方法消費無法精準的掌握其消費的起始位置,auto.offset.reset參數也只能在比較粗粒度的指定消費方式。更細粒度的消費方式kafka提供了seek()方法可以指定位移消費允許消費者從特定位置(如固定偏移量、時間戳或分區首尾)開始消費消息。

2.指定消費位置

2.1.從特定偏移量開始消費

使用seek(TopicPartition partition, long offset)指定具體偏移量。

源碼分析:

  • seek()方法更新消費者內部的subscriptions對象的position字段,記錄目標偏移量。
  • 后續poll()時,Fetcher類根據此位置向Broker發送拉取請求。

代碼示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 確保分配到分區
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 設置所有分區從offset=100開始消費
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.從時間戳開始消費

使用offsetsForTimes()獲取時間戳對應的偏移量,再調用seek()

源碼分析:

offsetsForTimes()向Broker發送ListOffsetRequest,查詢滿足時間戳條件的最早或最新偏移量。

代碼實例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 獲取24小時前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.從分區首尾消費

使用seekToBeginning()seekToEnd(),或通過beginningOffsets()/endOffsets()獲取首尾偏移量后手動設置。

代碼實例:

// 從分區末尾開始消費(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事項

  1. 分區分配與poll()的依賴
    seek()必須在分區分配完成后調用,否則會拋出IllegalStateException。需通過循環poll()確保分配到分區。

  2. 數據過期問題
    若指定偏移量對應的消息已被刪除(如日志清理導致),seek()將失效。此時需使用beginningOffsets()獲取當前最小有效偏移量。

  3. 異步提交與位移覆蓋風險
    異步提交(commitAsync())失敗時不會重試,可能因位移回滾導致重復消費。需結合同步提交(commitSync())保證原子性

  4. seek()方法提供了我們可以將消費者位移保存在外部的能力,還可以配合在均衡監聽器來提供更加精準的消費能力。

3.完整代碼實例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分區分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 獲取24小時前的時間戳對應偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 處理無有效偏移量的情況(如從頭開始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/907369.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/907369.shtml
英文地址,請注明出處:http://en.pswp.cn/news/907369.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【JS進階】JavaScript 中 this 值的確定規則

JavaScript 中 this 值的確定規則 1. 默認綁定&#xff08;獨立函數調用&#xff09; 當函數作為普通函數調用時&#xff0c;this 指向全局對象&#xff08;瀏覽器中是 window&#xff0c;Node.js 中是 global&#xff09;&#xff0c;嚴格模式下是 undefined。 function sh…

【凌智視覺模塊】rv1106 部署 pp-humseg 模型

人像分割簡介 ? 凌智視覺模塊 是一款基于rv1106芯片開發的視覺模塊&#xff0c;專注于視覺模型部署與開發。 人像分割是一種基于計算機視覺的技術&#xff0c;通過深度學習算法精準識別圖像或視頻中的人物主體&#xff0c;將其與背景進行像素級分離。該技術可實時運行于移動端…

wangeditor富文本編輯器+vue3粘貼內容樣式處理

又是一個風格和日立的上午&#xff0c;某只菜鳥高高興興的騎著小電驢去上班&#xff0c;本著上班只要不遲到的理念飛速前行&#xff08;遲到扣錢啊~&#xff09;&#xff0c;高高興興的行走在路上。來到工位剛拴上我的繩子組長就開始滴滴俺&#xff0c;頓時我心中大感不妙&…

實測,大模型誰更懂數據可視化?

大家好&#xff0c;我是 Ai 學習的老章 看論文時&#xff0c;經常看到漂亮的圖表&#xff0c;很多不知道是用什么工具繪制的&#xff0c;或者很想復刻類似圖表。 實測&#xff0c;大模型 LaTeX 公式識別&#xff0c;出乎預料 前文&#xff0c;我用 Kimi、Qwen-3-235B-A22B、…

深度學習-梯度消失和梯度爆炸

梯度消失 在某些神經網絡中&#xff0c;隨著網絡深度的增加&#xff0c;梯度在隱藏層反向傳播時傾向于變小&#xff0c;這就意味著&#xff0c;前面隱藏層中的神經元要比后面的學習起來更慢&#xff0c;這種現象就叫做“梯度消失”&#xff1b; 梯度爆炸 如果我們進行一些特殊…

Go 語言基礎 2 Func,流程控制

更多個人筆記見&#xff1a; github個人筆記倉庫 gitee 個人筆記倉庫 個人學習&#xff0c;學習過程中還會不斷補充&#xff5e; &#xff08;后續會更新在github上&#xff09; 文章目錄 Func 函數函數棧概念 函數表示類型 Anonymous func 匿名函數closure 閉包基礎示例http利…

【Linux 學習計劃】-- 倒計時、進度條小程序

目錄 \r 、\n、fflush 倒計時 進度條 進度條進階版 結語 \r 、\n、fflush 首先我們先來認識這三個東西&#xff0c;這將會是我們接下來兩個小程序的重點之一 首先是我們的老演員\n&#xff0c;也就是回車加換行 這里面其實包含了兩個操作&#xff0c;一個叫做回車&…

從零實現wss通信示例(WebSocket SSL)

客戶端和服務端代碼框架跟上一篇一致,僅增加了ssl的證書部分用于加密通信,明文通信(ws協議)見上一篇【https://blog.csdn.net/suoxd123/article/details/148093934】 1. 證書創建 1. 安裝openssl 【官網地址】:https://slproweb.com/products/Win32OpenSSL.html 1.2 …

mysql 索引失效有哪些

InnoDB存儲引擎根據索引類型不同&#xff0c;分為聚簇索引和二級索引 聚簇索引&#xff1a;葉子節點存放的是實際數據 二級索引&#xff1a;存放的是主鍵值&#xff0c;不是實際數據 1.對索引使用左或者左右模糊匹配 select * from t_user where name like %林‘&#xff1b…

LabVIEW通用測控平臺設計

基于 LabVIEW 圖形化編程環境&#xff0c;設計了一套適用于工業自動化、科研測試領域的通用測控平臺。通過整合研華、NI等品牌硬件&#xff0c;實現多類型數據采集、實時控制及可視化管理。平臺采用模塊化架構&#xff0c;支持硬件靈活擴展&#xff0c;解決了傳統測控系統開發周…

華為OD機試真題——智能駕駛(2025A卷:200分)Java/python/JavaScript/C/C++/GO最佳實現

2025 A卷 200分 題型 本專欄內全部題目均提供Java、python、JavaScript、C、C++、GO六種語言的最佳實現方式; 并且每種語言均涵蓋詳細的問題分析、解題思路、代碼實現、代碼詳解、3個測試用例以及綜合分析; 本文收錄于專欄:《2025華為OD真題目錄+全流程解析+備考攻略+經驗分…

速賣通,國際站測評補單,如何平衡效率和安全

測評能夠幫助賣家讓平臺更喜歡自己的產品&#xff0c;給予更好排名的同時也讓后續進入店鋪的買家更容易認可自己的產品。這是進行真實交易后形成的評價&#xff0c;而不是通過機器軟件生成&#xff0c;形成虛擬數據后&#xff0c;那種刷評形式產生的評論。它符合任何電商平臺的…

學習路之PHP--easyswoole3.3入門及文件熱加載

學習路之PHP--easyswoole入門 一、框架說明二、常用命令三、文件熱加載 一、框架說明 目錄結構 目錄結構 project 項目部署目錄 ├─App 應用目錄(可以有多個) │ ├─HttpController 控制器目錄 │ │ └─Index.php …

設計模式26——解釋器模式

寫文章的初心主要是用來幫助自己快速的回憶這個模式該怎么用&#xff0c;主要是下面的UML圖可以起到大作用&#xff0c;在你學習過一遍以后可能會遺忘&#xff0c;忘記了不要緊&#xff0c;只要看一眼UML圖就能想起來了。同時也請大家多多指教。 解釋器模式&#xff08;Interp…

第三屆寧波技能大賽網絡安全賽項樣題

2025 第三屆寧波技能大賽網絡安全賽項樣題 模塊A: 網絡安全事件響應、數字取證調查和應用安全任務一:應急響應任務二:操作系統取證任務三:網絡數據包分析任務四:代碼審計 模塊B:CTF 奪旗-攻擊模塊C:CTF 奪旗-防御需要環境培訓可以私信博主&#xff01;&#xff01;&#xff01;…

GO語言進階:掌握進程OS操作與高效編碼數據轉換

&#x1f49d;&#x1f49d;&#x1f49d;歡迎蒞臨我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦&#xff1a;「storms…

IO進程(進程 Process)

什么是進程&#xff1f; 1.概念 程序&#xff1a;編譯好的可執行文件&#xff0c;存放在磁盤上的指令和數據的有序集合。 由此可見程序是靜態的&#xff0c;沒有執行的概念。 進程&#xff1a;是程序的一次執行的過程&#xff0c;是一個可調度的任務&#xff0c;也是執行一…

CSS傳統布局與定位詳解與TDK三大標簽SEO優化

一、傳統布局基礎 1. 文檔流布局 瀏覽器默認的文檔流布局方式遵循以下規則&#xff1a; 塊級元素&#xff08;如<div>、<p>、<h1>&#xff09;&#xff1a; 獨占一行寬度默認100%可以設置寬高、內外邊距 div {width: 500px;height: 200px;margin: 10px …

【GraphQL】深入解析 Apollo Client:從架構到實踐的一站式 GraphQL 解決方案

深入解析 Apollo Client&#xff1a;從架構到實踐的一站式 GraphQL 解決方案 1. 引言 GraphQL 作為現代 API 開發的核心技術&#xff0c;其靈活性和高效性正在重塑數據交互模式。Apollo Client 作為 GraphQL 生態中最受歡迎的客戶端庫&#xff0c;憑借強大的緩存機制、框架集…

docker學習基本使用教程

docker是一款用于開發部署和運行容器化平臺&#xff0c;能將應用及其依賴打包成輕量級、可移植的容器&#xff0c;實現一次構建&#xff0c;隨處運行。docker是cs架構程序&#xff08;客戶端和服務端&#xff09;&#xff0c;docker客戶端向docker守護進程發送請求&#xff0c;…