kafka的位移

文章目錄

    • 概要
    • 消費位移
    • __consumer_offsets主題
    • 位移提交

概要

本文主要總結kafka的位移是如何管理的,在broker端如何通過命令行查看到位移信息,并從代碼層面總結了位移的提交方式。

消費位移

對于 Kafka 中的分區而言,它的每條消息都有唯一offset ,用來表示消息在分區中對應位置;對于消費者來說,它也有 offset 的概念,消費者使用 offse 來表示消費到分區中某個消息所在的位置。可通過命令行在查看到一個群組,在topic中兩者當前的位置
bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group kafka-boot

[root@node1 kafka_2.13-3.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe  --group kafka-bootConsumer group 'kafka-boot' has no active members.GROUP           TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-boot      test-error-topic 0          26              26              0               -               -               -
kafka-boot      normal-test      0          23              24              1               -               -               -

這里對offse 做些區分 對于消息在分區中的位置 CURRENT-OFFSET稱為“偏移量” 或消息位移;對于消費者消費到的位置,LOG-END-OFFSET稱為“位移 ,有時候也會更明確地稱之為“消費位移“。

生產者位移跟消費者位移的關系可以用下圖來說明:
在這里插入圖片描述

總結幾個需要注意的點:

  • 分區副本有兩種類型
    領導者副本:生產者跟消費者的請求都只會經過領導者副本
    跟隨者副本:首領之外的副本,不處理客戶端請求,從領導者副本那里通過拉取的方式同步消息
  • 消費位移存儲在Zookeeper或Kafka中,新消費者客戶端,偏移量存儲咋Kafka內部主題 __consumer_offsets
  • 消費者提交的位移是當前消費消息位移的下一個位置,即:lastConsumeedOffset+1

__consumer_offsets主題

Consumer需要向Kafka記錄自己的位移數據,這個匯報過程稱為提交位移(Committing Offsets)。

老版本 Consumer 的位移是提交到 ZooKeeper 中保存的。當 Consumer 重啟后,它能自動從 ZooKeeper 中讀取位移數據,從而在上次消費截止的地方繼續消費。這種設計使得Kafka Broker 不需要保存位移數據,減少了 Broker 端需要持有的狀態空間,因而有利于實現高伸縮性。

但是,ZooKeeper 其實并不適用于這種高頻的寫操作,Kafka 社區自 0.8.2.x 版本開始推出了全新的位移管理
機制,將 Consumer 的位移數據作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 中。可以這么說,
__consumer_offsets 的主要作用是保存 Kafka 消費者的位移信息。這種方式能夠滿足高頻的寫操作。

兩個相關參數:
offsets.topic.num.partitions : 設置 __consumer_offsets主題的分區數,默認是50個分區
offsets.topic.replication.factor : 設置__consumer_offsets主題的副本數,默認是3(下載安裝的包中此值可能為1 )

當Kafka 集群中的第一個 Consumer 程序啟動時,Kafka 會自動創建位移主題

一共有50個分區,那么消費者將位移提交到了哪個分區呢?

通過如下公式可以選出consumer消費的offset要提交到__consumer_offsets的哪個分區,這個分區leader對應的broker
就是這個consumer group的coordinator
公式:Math.abs(groupID.hashCode()) % numPartitions

Kafka 1.0.2及以后提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息

1. 創建一個topic,分區數設置為1,副本數設置為1

[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic test-offset --partitions 1 --replication-factor 1
Created topic test-offset.[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic test-offset
Topic: test-offset      TopicId: in6gxQ5OQS6x9R8V3oJ7AQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824Topic: test-offset      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

2. 向主題test-offset中發送消息

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test-offset
>hello

3. 創建一個消費組,并從頭開始消費

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092  --from-beginning --consumer-property group.id=testOffsetGroup   --topic test-offset
hello

4. 用代碼根據上面的公式計算消費組testOffsetGroup提交位移的分區數

@Test
void getCommitOffsetPartitionTest() {String groupId = "testOffsetGroup";// 運行結果為16System.out.println(Math.abs(groupId.hashCode() % 50));
}
  1. 將kafka配置文件consumer.properties中設置exclude.internal.topics=false,并重啟服務
    6. 查看主題__consumer_offsets第16分區上的信息,可以看到消費組testOffsetGroup提交的位移確實保存在了16分區上

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 16 --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896116191, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896121189, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896126188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896131188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896133573, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896162124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896167124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896172123, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896177124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896178781, expireTimestamp=None)

從上面也可看出__consumer_offsets topic的每一日志項的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

客戶端提交消費位移是使用OffsetCommitRequest 請求實現的,其結構如下
在這里插入圖片描述

__consumer_offsets這個主題中的消息格式為KV對,key為[Group, Topic, Partition],value可以簡單理解為記錄了偏移量;這樣的記錄方式,使得broker端不需要關系group下有多少個消費者,新增消費者或者減少消費者發生重平衡時,都能準確地定位到對應地分區應該從哪個位置開始消費。

位移提交

鑒于位移提交甚至是位移管理對 Consumer 端的巨大影響,Kafka,特別是KafkaConsumer API,提供了多種提交位移的方法。從用戶的角度來說,位移提交分為自動提交和手動提交;從 Consumer 端的角度來說,位移提交分為同步提交和異步提交。

自動提交

自動提交,就是指 Kafka Consumer 在后臺默默地為你提交位移
兩個重要的參數

  • enable.auto.commit設置是否自動提交位移,默認是true
  • auto.commit.interval.ms:設置自動提交為true時,該參數生效,標識多久提交一次位移,默認5s,
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");// 設置偏移量自動提交。自動提交是默認值。這里做示例。configs.put("enable.auto.commit", "true");// 偏移量自動提交的時間間隔configs.put("auto.commit.interval.ms", "2000");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.topic()+ "\t" + record.partition()+ "\t" + record.offset()+ "\t" + record.key()+ "\t" + record.value());}}}

設置了 enable.auto.commit 為 true,Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息。從順序上來說,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況。但是會出現消息重復消費

在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之后的 3秒發生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。

手動同步提交

開啟手動提交位移的方法就是設置enable.auto.commit 為 false。但是,僅僅設置它為 false 還不夠,因為你只是告訴
Kafka Consumer 不要自動提交位移而已,你還需要調用相應的 API 手動提交位移。

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 處理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 處理提交失敗異常}}
}

調用 commitSync() 時,Consumer 程序會處于阻塞狀態,直到遠端的 Broker 返回提交結果,這個狀態才會結束,這樣就會影響TPS。

鑒于此問題,還有另外一個提交方式

手動異步提交

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));process(records); // 處理消息consumer.commitAsync((offsets, exception) -> {if (exception != null) {handle(exception);}});}
}

commitAsync 是否能夠替代 commitSync 呢?答案是不能。commitAsync 的問題在于,出現問題時它不會自動重試。因為它是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。

是手動提交,需要將 commitSync 和 commitAsync 組合使用才能到達最理想的效果,原因有兩個:

  1. 利用 commitSync 的自動重試來規避那些瞬時錯誤,比如網絡的瞬時抖動,Broker 端 GC 等。這些問題都是短暫的,自動重試通常都會成功。
  2. 不希望程序總處于阻塞狀態,影響 TPS。
public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);consumer.subscribe(Collections.singleton("tp_demo_01"));try {while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));consumer.commitAsync();process(records); // 處理消息consumer.commitAsync(); // 異步提交}} catch (Exception e) {handle(e); // 處理異常} finally {try {consumer.commitSync();// 最后一次提交使用同步阻塞式提交} finally {consumer.close();}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/41630.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/41630.shtml
英文地址,請注明出處:http://en.pswp.cn/news/41630.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

0基礎學習VR全景平臺篇 第86篇:智慧眼-為什么要設置分組選擇?

一、功能說明 分組選擇&#xff0c;也就是給全景的每個分組去設置其所屬的行政區劃&#xff0c;設置后只有屬于同行政區劃的成員才可進入其場景進行相關操作&#xff0c;更便于實現城市的精細化管理。 二、后臺編輯界面 分組名稱&#xff1a;場景的分組名稱。 對應分類&…

網絡安全--linux下Nginx安裝以及docker驗證標簽漏洞

目錄 一、Nginx安裝 二、docker驗證標簽漏洞 一、Nginx安裝 1.首先創建Nginx的目錄并進入&#xff1a; mkdir /soft && mkdir /soft/nginx/cd /soft/nginx/ 2.下載Nginx的安裝包&#xff0c;可以通過FTP工具上傳離線環境包&#xff0c;也可通過wget命令在線獲取安裝包…

【數據結構與算法】隊列

文章目錄 一&#xff1a;隊列1.1 隊列的概念1.2 隊列的介紹1.3 隊列示意圖 二&#xff1a;數組模擬隊列2.1 介紹2.2 思路2.3 代碼實現2.3.1 定義隊列基本信息2.3.2 初始化隊列2.3.3 判斷隊列是否滿&#xff0c;是否為空2.3.4 添加數據到隊列2.3.5 獲取隊列數據&#xff0c;出隊…

垃圾回收機制

什么是內存泄漏&#xff1f; 內存泄漏是指程序中已經不再使用的內存卻沒有被正確釋放或回收的情況。在編程中&#xff0c;當對象或數據不再被程序使用&#xff0c;但其所占用的內存空間沒有被垃圾回收機制回收&#xff0c;就會導致內存泄漏。 內存泄漏可能會導致程序的內存消…

圖數據庫_Neo4j和SpringBoot整合使用_創建節點_刪除節點_創建關系_使用CQL操作圖譜---Neo4j圖數據庫工作筆記0009

首先需要引入依賴 springboot提供了一個spring data neo4j來操作 neo4j 可以看到它的架構 這個是下載下來的jar包來看看 有很多cypher對吧 可以看到就是通過封裝的驅動來操作graph database 然后開始弄一下 首先添加依賴

【實用黑科技】如何 把b站的緩存視頻弄到本地——數據恢復軟件WinHex 和 音視頻轉碼程序FFmpeg

&#x1f468;?&#x1f4bb;個人主頁&#xff1a;元宇宙-秩沅 &#x1f468;?&#x1f4bb; hallo 歡迎 點贊&#x1f44d; 收藏? 留言&#x1f4dd; 加關注?! &#x1f468;?&#x1f4bb; 本文由 秩沅 原創 &#x1f468;?&#x1f4bb; 收錄于專欄&#xff1a;效率…

onnxruntime 支持的所有后端

1 代碼導出 import onnxruntime as ort aaa ort.get_all_providers() print(aaa)1. 1 下面是ort支持的所有后端 TensorrtExecutionProvider, CUDAExecutionProvider, MIGraphXExecutionProvider, ROCMExecutionProvider, OpenVINOExecutionProvider, DnnlExecutionProvider…

Baumer工業相機堡盟工業相機如何通過BGAPISDK設置相機的固定幀率(C#)

Baumer工業相機堡盟工業相機如何通過BGAPI SDK設置相機的固定幀率&#xff08;C#&#xff09; Baumer工業相機Baumer工業相機的固定幀率功能的技術背景CameraExplorer如何查看相機固定幀率功能在BGAPI SDK里通過函數設置相機固定幀率 Baumer工業相機通過BGAPI SDK設置相機固定幀…

藍牙資訊|中國智能家居前景廣闊,藍牙Mesh照明持續火爆

據俄羅斯衛星通訊社報道&#xff0c;中國已成為全球最大的智能家居消費國&#xff0c;占全球50%—60%的市場份額。未來&#xff0c;隨著人工智能技術的發展以及智能家居生態的不斷進步&#xff0c;智能家居在中國的滲透率將加速提升。德國斯塔蒂斯塔調查公司數據顯示&#xff0…

win10系統docker創建ubuntu容器解決開發環境問題

一、win10系統使用docker的原因 最近啊&#xff0c;在學習人工智能-深度學習&#xff0c;用的win10系統進行開發&#xff0c;老是出現一些莫名其妙的問題&#xff0c;無法解決&#xff0c;每天都在為環境問題搞得傷透了腦筋。 說到底還是要使用Linux系統進行開發比較合適。 …

【MT32F006】MT32F006之HT1628驅動LED

本文最后修改時間&#xff1a;2023年03月30日 一、本節簡介 本文介紹如何使用MT32F006連接HT1628芯片驅動LED。 二、實驗平臺 庫版本&#xff1a;V1.0.0 編譯軟件&#xff1a;MDK5.37 硬件平臺&#xff1a;MT32F006開發板&#xff08;主芯片MT32F006&#xff09; 仿真器&a…

LeetCode算法心得——限制條件下元素之間的最小絕對差(TreeSet)

大家好&#xff0c;我是晴天學長&#xff0c;今天用到了Java一個非常實用的類TreeSet&#xff0c;能解決一些看起來棘手的問題。 1 &#xff09;限制條件下元素之間的最小絕對差 2) .算法思路 初始化變量&#xff1a;n為列表nums的大小。 min為整型最大值&#xff0c;用于記錄…

python3 0學習筆記之基本知識

0基礎學習筆記之基礎知識 &#x1f4da; 基礎內容1. 條件語句 if - elif - else2. 錯誤鋪捉try - except(一種保險策略&#xff09;3. 四種開發模式4. 函數&#xff1a;def用來定義函數的5. 最大值最小值函數&#xff0c;max &#xff0c;min6. is 嚴格的相等&#xff0c;is no…

機器學習:基本介紹

機器學習介紹 Hnad-crafted rules Hand-crafted rules&#xff0c;叫做人設定的規則。那假設今天要設計一個機器人&#xff0c;可以幫忙打開或關掉音樂&#xff0c;那做法可能是這樣&#xff1a; 設立一條規則&#xff0c;就是寫一段程序。如果輸入的句子里面看到**“turn of…

C#__使用Type類反射數據的基本用法

// 簡單介紹 // 元數據&#xff08;metadata&#xff09;&#xff1a;與程序及其類型有關的數據。 // 反射&#xff1a;一個運行的程序查看本身元數據或其他程序集中的元數據的行為 // Assembly類&#xff1a;允許訪問給定程序集的元數據&#xff0c;包含了可以加載和執行程序…

Maven框架SpringBootWeb簡單入門

一、Maven ★ Maven:是Apache旗下的一個開源項目,是一款用于管理和構建java項目的工具。 官網:https://maven.apache.org/ ★ Maven的作用: 1. 依賴管理:方便快捷的管理項目依賴的資源(jar包),避免版本沖突問題。 2. 統一項目結構:提供標準、統一的項目結構。 …

LightDB 23.3 plorasql 函數支持inout參數輸出

開篇立意 oracle PLSQL函數中返回值有兩種情況&#xff1a; &#xff08;1&#xff09;使用return返回值&#xff1b; &#xff08;2&#xff09;使用out修飾的參數&#xff08;oracle不支持inout&#xff09; SQL> create function yu(id inout int) return int asbeginn…

【C# 基礎精講】文件讀取和寫入

文件讀取和寫入是計算機程序中常見的操作&#xff0c;用于從文件中讀取數據或將數據寫入文件。在C#中&#xff0c;使用System.IO命名空間中的類來進行文件讀寫操作。本文將詳細介紹如何在C#中進行文件讀取和寫入&#xff0c;包括讀取文本文件、寫入文本文件、讀取二進制文件和寫…

選擇大型語言模型自定義技術

推薦&#xff1a;使用 NSDT場景編輯器 助你快速搭建可二次編輯器的3D應用場景 企業需要自定義模型來根據其特定用例和領域知識定制語言處理功能。自定義LLM使企業能夠在特定的行業或組織環境中更高效&#xff0c;更準確地生成和理解文本。 自定義模型使企業能夠創建符合其品牌…

PAT 1013 Battle Over Cities

個人學習記錄&#xff0c;代碼難免不盡人意。 It is vitally important to have all the cities connected by highways in a war. If a city is occupied by the enemy, all the highways from/toward that city are closed. We must know immediately if we need to repair a…