為什么kafka 需要 subscribe 的 group.id?我們是否需要使用 commitSync 手動提交偏移量?

目錄

  • 一、為什么需要帶有 subscribe 的 group.id
  • 二、我們需要使用commitSync手動提交偏移量嗎?
  • 三、如果我想手動提交偏移量,該怎么做?

一、為什么需要帶有 subscribe 的 group.id

  • 消費概念:
    Kafka 使用消費者組的概念來實現主題的并行消費 - 每條消息都將在每個消費者組中傳遞一次,無論該組中實際有多少個消費者。所以 group 參數是強制性的,如果沒有組,Kafka 將不知道如何對待訂閱同一主題的其他消費者。
  • 偏移量
    每當我們啟動一個消費者時,它都會加入一個消費者組,然后根據該消費者組中的其他消費者數量,為其分配要讀取的分區。對于這些分區,它會檢查列表讀取偏移量是否已知,如果找到,它將從這一點開始讀取消息。如果沒有找到偏移量,則參數 auto.offset.reset 控制是從分區中最早的消息還是從最新的消息開始讀取。

二、我們需要使用commitSync手動提交偏移量嗎?

  • 是否需要手動提交偏移?
    是否需要提交偏移量取決于作為參數 enable.auto.commit 選擇的值。默認情況下,此設置為 true,這意味著消費者將定期自動提交其偏移量(由auto.commit.interval.ms 決定提交的頻率)。如果將其設置為 false,那么將需要自己提交偏移量。這種默認行為可能也是導致很多發現 kafka 總是從最新的開始消費的原因,由于偏移量是自動提交的,因此它將使用該偏移量。

  • 有沒有辦法從頭開始重播消息?
    如果想每次都從頭開始讀取,可以調用seekToBeginning,如果不帶參數調用,它將重置為所有訂閱分區中的第一條消息,或者僅重置您傳入的那些分區。

  • seekToBeginning
    查找每個給定分區的第一個偏移量。poll(long) 該函數延遲計算,僅在調用或時才查找所有分區中的第一個偏移量position(TopicPartition)。如果未提供分區,則查找所有當前分配的分區的第一個偏移量。

    public class MyListener implements ConsumerSeekAware {...@Overridepublic void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}
    
  • 有沒有辦法從最后開始重播消息?
    有的,可以使用 seekToEnd() 查找所有分配的分區到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分區到該時間戳表示的偏移量。

    public class MyListener extends AbstractConsumerSeekAware {@KafkaListener(...)void listn(...) {...}
    }public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}
    

三、如果我想手動提交偏移量,該怎么做?

  • 1、禁用自動提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  • 提交方法
    對于手動提交,KafkaConsumers提供了兩種方法,即 commitSync() 和 commitAsync()。commitSync()是一個阻塞調用,在偏移量成功提交后返回,commitAsync()則立即返回。如果想知道提交是否成功,可以為回調處理程序 ( OffsetCommitCallback) 提供一個方法參數。請注意,在兩次提交調用中,消費者都會提交最新poll()調用的偏移量。
    舉個例子:假設一個分區主題有一個消費者并且最后一次調用poll()返回偏移量為 4、5、6 的消息。提交時,偏移量 6 將被提交,因為這是消費者客戶端跟蹤的最新偏移量。
    同時,commitSync() 和 commitAsync() 都允許更多地控制我們想要提交的偏移量:如果你使用允許你指定的相應重載,那么Map<TopicPartition, OffsetAndMetadata>消費者將僅提交指定的偏移量(即,映射可以包含分配的分區的任何子集) ,并且指定的偏移量可以為任意值)。

  • 同步提交:
    阻塞線程,直到提交成功或遇到不可恢復的錯誤(在這種情況下,它被拋出給調用者)

    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitSync();}
    }
    

    對于 for 循環中的每次迭代,只有在consumer.commitSync()成功返回或因拋出異常而中斷后,代碼才會移至下一次迭代。

  • 異步提交:
    是一種非阻塞方法。調用它不會阻塞線程。相反,它將繼續處理以下指令,無論最終是成功還是失敗。

    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitAsync(callback);}
    }
    

    對于 for 循環中的每次迭代,無論consumer.commitAsync()最終會發生什么,代碼都會移至下一次迭代。并且,提交的結果將由定義的回調函數處理。

  • 權衡:延遲與數據一致性
    1、如果必須確保數據一致性,請選擇commitSync(),因為它將確保在執行任何進一步操作之前,你將知道偏移量提交是成功還是失敗。但由于它是同步和阻塞的,你將花費更多的時間來等待提交完成,這會導致高延遲。
    2、如果可以接受某些數據不一致并希望具有低延遲,請選擇commitAsync(),因為它不會等待完成。相反,它只會發出提交請求并稍后處理來自 Kafka 的響應(成功或失敗),同時代碼將繼續執行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/43209.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/43209.shtml
英文地址,請注明出處:http://en.pswp.cn/news/43209.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vscode | linux | c++ intelliense 被棄用解決方案

每日一句&#xff0c;vscode用的爽是爽&#xff0c;主要是可配置太強了。如果也很會研究&#xff0c;可以直接去咸魚接單了 廢話少說&#xff0c;直接整。 用著用著說是c intelliense被棄用&#xff0c;很多輔助功能無法使用&#xff0c;像查看定義、查看引用、函數跳轉、智能提…

基于Rust的QuickLZ壓縮算法的詳細實現與分析

1. 引言 QuickLZ是一種被廣泛應用的高效壓縮算法。在許多應用中&#xff0c;快速的數據壓縮和解壓縮是非常關鍵的&#xff0c;特別是在網絡傳輸和存儲空間有限的場景中。為了滿足現代軟件開發的需求&#xff0c;我們將使用Rust語言來實現這一算法。Rust是一種專為系統級編程而…

Nodejs沙箱逃逸--總結

一、沙箱逃逸概念 JavaScript和Nodejs之間有什么區別&#xff1a;JavaScript用在瀏覽器前端&#xff0c;后來將Chrome中的v8引擎單獨拿出來為JavaScript單獨開發了一個運行環境&#xff0c;因此JavaScript也可以作為一門后端語言&#xff0c;寫在后端&#xff08;服務端&#…

七夕特輯——3D愛心(可監聽鼠標移動)

前言 「作者主頁」&#xff1a;雪碧有白泡泡 「個人網站」&#xff1a;雪碧的個人網站 「推薦專欄」&#xff1a; ★java一站式服務 ★ ★ React從入門到精通★ ★前端炫酷代碼分享 ★ ★ 從0到英雄&#xff0c;vue成神之路★ ★ uniapp-從構建到提升★ ★ 從0到英雄&#xff…

【005】ts學習筆記【函數擴展】

函數擴展 參數類型 //注意&#xff0c;參數不能多傳&#xff0c;也不能少傳 必須按照約定的類型來 const fn (name: string , age : number ) : string > {return name age }let desc fn( "張三", 18) console.log(desc)可選參數與默認值 //可選的參數 和 默…

深入理解Flink Mailbox線程模型

文章目錄 整體設計processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整體設計 Mailbox線程模型通過引入阻塞隊列配合一個Mailbox線程的方式&#xff0c;可以輕松修改StreamTask內部狀態的修改。Checkpoint、ProcessingTime Ti…

@Repeatable的作用以及具體如何使用

文章目錄 1. 前言2. 先說結論3. 案例演示 1. 前言 最近無意看到某些注解上有Repeatable&#xff0c;出于比較好奇&#xff0c;因此稍微研究并寫下此文章。 2. 先說結論 Repeatable的作用&#xff1a;使被他注釋的注解可以在同一個地方重復使用。 具體使用如下&#xff1a; T…

CentOS7源碼安裝MySQL詳細教程

&#x1f60a; 作者&#xff1a; Eric &#x1f496; 主頁&#xff1a; https://blog.csdn.net/weixin_47316183?typeblog &#x1f389; 主題&#xff1a;CentOS7源碼安裝MySQL詳細教程 ?? 創作時間&#xff1a; 2023年08月014日 文章目錄 1、安裝的四種方式2、源碼安裝…

深度解析:DDoS攻擊與先進防御策略

目錄 DDoS 介紹 DDoS 攻擊理論 DDoS 介紹 DDoS&#xff08;分布式拒絕服務&#xff09;攻擊是一種惡意網絡活動&#xff0c;旨在通過同時向目標系統發送大量請求或流量&#xff0c;使其無法正常運行或提供服務。攻擊者通常利用網絡上的多個計算機和設備&#xff0c;形成一個&…

極智嘉x吉利汽車 x京東物流,引領汽車行業智慧物流新變革!

近日&#xff0c;中國領先的汽車制造商吉利汽車攜手中國領先的技術驅動的供應鏈解決方案及物流服務商京東物流、全球倉儲機器人引領者極智嘉(Geek)&#xff0c;在西安吉利汽車制造基地RDC倉庫率先落地SkyPick上存下揀解決方案&#xff0c;實現了全物流鏈精益化、智能化、一體化…

Spring-4-掌握Spring事務傳播機制

今日目標 能夠掌握Spring事務配置 Spring事務管理 1 Spring事務簡介【重點】 1.1 Spring事務作用 事務作用&#xff1a;在數據層保障一系列的數據庫操作同成功同失敗 Spring事務作用&#xff1a;在數據層或業務層保障一系列的數據庫操作同成功同失敗 1.2 案例分析Spring…

STM32--TIM定時器(2)

文章目錄 輸出比較PWM輸出比較通道參數計算舵機簡介直流電機簡介TB6612 PWM基本結構PWM驅動呼吸燈PWM驅動舵機PWM控制電機 輸出比較 輸出比較&#xff0c;簡稱OC&#xff08;Output Compare&#xff09;。 輸出比較的原理是&#xff0c;當定時器計數值與比較值相等或者滿足某種…

【數據結構OJ題】有效的括號

原題鏈接&#xff1a;https://leetcode.cn/problems/valid-parentheses/ 目錄 1. 題目描述 2. 思路分析 3. 代碼實現 1. 題目描述 2. 思路分析 這道題目主要考查了棧的特性&#xff1a; 題目的意思主要是要做到3點匹配&#xff1a;類型、順序、數量。 題目給的例子是比較…

【Hibench 】完成 HDP-Spark 性能測試

&#x1f341; 博主 "開著拖拉機回家"帶您 Go to New World.?&#x1f341; &#x1f984; 個人主頁——&#x1f390;開著拖拉機回家_Linux,Java基礎學習,大數據運維-CSDN博客 &#x1f390;?&#x1f341; &#x1fa81;&#x1f341; 希望本文能夠給您帶來一定的…

單片機實訓報告

這周我們進行了單片機實訓&#xff0c;一周中我們通過七個項目1&#xff1a;P1 口輸入/輸出 2&#xff1a;繼電器控制 3 音頻控制 4&#xff1a;子程序設計 5&#xff1a;字符碰頭程序設計 6&#xff1a;外部中斷 7&#xff1a; 急救車與交通信號燈&#xff0c;練習編寫了子程…

mysql 設置 mysql 日志時間與系統時間保持一致

臨時設置 mysql> show variables like %log_timestamps%;-----------------------| Variable_name | Value |-----------------------| log_timestamps | UTC |-----------------------1 row in set (0.00 sec)系統是 CST &#xff0c; nysql 是 UTC當UTC時間為0點時&am…

docker的使用方法總結

Docker是一個非常強大的工具&#xff0c;它可以用于創建、部署和運行應用程序。以下是一些docker相關的常用指令&#xff0c; 1、查看docker版本 docker version 2、查看正在運行的Docker容器 docker ps 3、查看所有的docker容器&#xff08;包括沒有運行的容器&#xff0…

Python 之 Http 獲取網頁的 html 數據,并去掉 html 格式等相關信息

Python之 Http 獲取網頁的 html 數據,并去掉 html 格式等相關信息 目錄 Python之 Http 獲取網頁的 html 數據,并去掉 html 格式等相關信息

SCF金融公鏈新加坡啟動會 創新驅動未來

新加坡迎來一場引人矚目的金融科技盛會&#xff0c;SCF金融公鏈啟動會于2023年8月13日盛大舉行。這一受矚目的活動將為金融科技領域注入新的活力&#xff0c;并為廣大投資者、合作伙伴以及關注區塊鏈發展的人士提供一個難得的交流平臺。 在SCF金融公鏈啟動會上&#xff0c; Wil…

級聯(數據字典)

二級級聯&#xff1a; 一&#xff1a;新建兩個Bean 父級&#xff1a; /*** Description 數據字典* Author WangKun* Date 2023/7/25 10:15* Version*/ Data AllArgsConstructor NoArgsConstructor TableName("HW_DICT_KEY") public class DictKey implements Seri…