分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

文章目錄

      • 01. Kafka 分區位移
      • 02. Kafka 消費位移
      • 03. kafka 消費位移的作用
      • 04. Kafka 消費位移的提交
      • 05. kafka 消費位移的存儲位置
      • 06. Kafka 消費位移與消費者提交的位移
      • 07. kafka 消費位移的提交時機
      • 08. Kafka 維護消費狀態跟蹤的方法

01. Kafka 分區位移

對于Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中對應的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。

每條消息在分區中的位置信息由一個叫位移(Offset)的數據來表征。分區位移總是從 0 開始,假設一個生產者向一個空分區寫入了 10 條消息,那么這 10 條消息的位移依次是 0、1、2、…、9。

02. Kafka 消費位移

對于kafka中的消費者而言,也有一個offset的概念,消費者使用 offset 來表示消費到分區中某個消息所在的位置。

消費位移(偏移量)是指消費者在消費分區中的消息時,記錄的已經消費的消息的位移。消費者會定期地將已經消費的消息的位移提交到Kafka集群中,以便在下一次啟動時從上次消費的位置繼續消費。

每個消費者在消費消息的過程中必然需要有個字段記錄它當前消費到了分區的哪個位置上,這個字段就是消費者位移(Consumer Offset)。注意,這和分區位移完全不是一個概念。“分區位移”表征的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器嘛。另外每個消費者有著自己的消費者位移。

03. kafka 消費位移的作用

消費者位移(偏移量)是指消費者在消費分區中的消息時,記錄的已經消費的消息的位移。它的作用主要有以下幾個方面:

① 消費者可以通過記錄偏移量來實現斷點續傳。當消費者下線或者重啟時,它可以通過記錄的偏移量來恢復之前的消費狀態,從而避免重復消費已經處理過的消息。

② Kafka 通過偏移量來保證消息的順序性。在同一個分區中,消息的順序是有序的,消費者可以通過記錄偏移量來保證消費的順序性。

③ Kafka 還可以通過偏移量來實現消息的回溯。消費者可以通過指定偏移量來重新消費之前的消息,這在某些場景下非常有用,比如重新處理之前出現的錯誤。

總之,Kafka 消息偏移量是非常重要的一個概念,它可以幫助消費者實現斷點續傳、保證消息的順序性以及實現消息的回溯等功能。

04. Kafka 消費位移的提交

消費者可以通過訂閱一個或多個主題來拉取消息。當消費者調用 poll() 方法時,它會從 Kafka 集群中拉取一批消息,這些消息會被緩存在消費者的本地緩存中,等待消費者進一步處理。在消費者處理完這批消息后,它可以再次調用 poll() 方法來拉取下一批消息。如果消費者在處理消息時發生了錯誤,那么這批消息將會被重新拉取,直到消費者成功地處理它們為止。

因此每次調用poll()方法,它總是會返回還沒有被消費者讀取過的記錄,這意味著我們可以追蹤哪些記錄是被群組里的哪個消費者讀取過的。要做到這一點,就需要記錄上一次消費時的消費位移。并且這個消費位移必須做持久化保存,而不是單單保存在內存中,否則消費者重啟之后就無法知曉之前的消費位移。再考慮一種情況,當有新的消費者加入時,那么必然會有再均衡的動作,對于同一分區而言,它可能在再均衡動作之后分配給新的消費者,如果不持久化保存消費位移,那么這個新的消費者也無法知曉之前的消費位移。

在舊消費者客戶端中,消費位移是存儲在 ZooKeeper 中的。而在新消費者客戶端中,消費位移存儲在Kafka內部的主題 __consumer_offsets 中。這里把將消費位移存儲起來(持久化)的動作稱為“提交”,消費者在消費完消息之后需要執行消費位移的提交。

05. kafka 消費位移的存儲位置

消費者默認將 offset 保存在Kafka一個內置的 topic 中,該 topic 為 __consumer_offsets。

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets

消費者會向一個叫作 __consumer_offset 的內置主題發送消息,消息里包含每個分區的偏移量。如果消費者一直處于運行狀態,那么偏移量就沒有什么實際作用。但是,如果消費者發生崩潰或有新的消費者加入群組,則會觸發再均衡。再均衡完成之后,每個消費者可能會被分配新的分區,而不是之前讀取的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的位置繼續讀取消息。

消費 offset 案例:

① __consumer_offsets 為 Kafka 中的 topic,那就可以通過消費者進行消費。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默認是 true,表示不能消費系統主題。為了查看該系統主題數據,所以該參數修改為 false。

② 創建主題 haha

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic haha
Created topic test1.

③ 啟動生產者生產數據:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>

④ 啟動消費者消費數據:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!

⑤ 查看消費者消費主題__consumer_offsets:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning# key是[消費者組,消費的主題,消費的分區],value中已經消費的消息在當前分區的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)

06. Kafka 消費位移與消費者提交的位移

如下圖,x 表示某一次拉取操作中此分區消息的最大偏移量,假設當前消費者已經消費了 x 位置的消息,那么我們就可以說消費者的消費位移為 x,不過當前消費者需要提交的消費位移并不是 x,而是 x+1,它表示下一條需要拉取的消息的位置。
在這里插入圖片描述
如果使用自動提交或不指定提交的偏移量,那么將默認提交poll()返回的最后一個位置之后的偏移量,即提交比客戶端從poll()返回的最后一個位置大1的偏移量。在進行手動提交或需要提交特定的偏移量時,一定要記住這一點。

07. kafka 消費位移的提交時機

當前一次 poll() 操作所拉取的消息集為[x+2,x+7],x+2代表上一次提交的消費位移,說明已經完成了x+1之前(包括x+1在內)的所有消息的消費,x+5表示當前正在處理的位置。
在這里插入圖片描述
① 如果最后一次提交的偏移量大于客戶端處理的最后一條消息的偏移量,那么處于兩個偏移量之間的消息就會丟失:

如圖,如果拉取到消息之后就進行了位移提交,即提交了x+8,那么當前消費x+5的時候遇到了異常,在故障恢復之后,我們重新拉取的消息是從x+8開始的。也就是說,x+5至x+7之間的消息并未能被消費,如此便發生了消息丟失的現象。

② 如果最后一次提交的偏移量小于客戶端處理的最后一條消息的偏移量,那么處于兩個偏移量之間的消息就會被重復處理:

如圖,如果消費完所有拉取到的消息之后才進行位移提交,那么當消費x+5的時候遇到了異常,在故障恢復之后,我們重新拉取的消息是從x+2開始的。也就是說,x+2至x+4之間的消息又重新消費了一遍,故而又發生了重復消費的現象。

08. Kafka 維護消費狀態跟蹤的方法

在Kafka中,消費者組可以通過消費者偏移量(consumer offset)來跟蹤它們在分區中消費的消息。消費者偏移量是一個整數,表示消費者已經成功讀取的消息的位置。當消費者讀取消息時,它會將偏移量保存在內存中,以便在下一次讀取消息時能夠從正確的位置開始讀取。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39962.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39962.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39962.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

sql server 存儲過程 set ansi_nulls set quoted_identifier,out 、output

SQL-92 標準要求在對空值(NULL) 進行等于 () 或不等于 (<>) 比較時取值為 FALSE。 當 SET ANSI_NULLS 為 ON 時&#xff0c;即使 column_name 中包含空值&#xff0c;使用 WHERE column_name NULL 的 SELECT 語句仍返回零行。即使 column_name 中包含非空值&#xff0c…

5G無人露天礦山解決方案

1、5G無人露天礦山解決方案背景 ①2010.10&#xff0c;國家安監總局《金屬非金屬地下礦山安全避險“六大系統”安裝使用和監督檢查暫行規定》 ②2016.03&#xff0c;國家發改委《能源技術革命創新行動計劃&#xff08;2016-2030&#xff09;》&#xff0c;2025 年重點煤礦區采…

每天一道leetcode:1192. 查找集群內的關鍵連接(圖論困難tarjan算法)

今日份題目&#xff1a; 力扣數據中心有 n 臺服務器&#xff0c;分別按從 0 到 n-1 的方式進行了編號。它們之間以 服務器到服務器 的形式相互連接組成了一個內部集群&#xff0c;連接是無向的。用 connections 表示集群網絡&#xff0c;connections[i] [a, b] 表示服務器 a …

Quivr 基于GPT和開源LLMs構建本地知識庫 (更新篇)

一、前言 自從大模型被炒的越來越火之后&#xff0c;似乎國內涌現出很多希望基于大模型構建本地知識庫的需求&#xff0c;大概在5月底的時候&#xff0c;當時Quivr發布了第一個0.0.1版本&#xff0c;第一個版本僅僅只是使用LangChain技術結合OpenAI的GPT模型實現了一個最基本的…

升級STM32電機PID速度閉環編程:從F1到F4的移植技巧與實例解析

引言&#xff1a; 在嵌入式系統開發中&#xff0c;STM32系列微控制器廣泛應用于各種應用領域。而對于直流有刷電機的控制&#xff0c;PID速度閉環是一種常用的控制方式。本文將以此為例&#xff0c;探討如何從STM32F1系列移植到STM32F4系列&#xff0c;并詳細介紹HAL庫在不同型…

Python學習筆記_基礎篇(十)_socket編程

本章內容 1、socket 2、IO多路復用 3、socketserver Socket socket起源于Unix&#xff0c;而Unix/Linux基本哲學之一就是“一切皆文件”&#xff0c;對于文件用【打開】【讀寫】【關閉】模式來操作。socket就是該模式的一個實現&#xff0c;socket即是一種特殊的文件&…

spring boot 簡單整合 Redis

1.添加依賴<!-- redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- commons-pool2 --><dependency><groupId>org.ap…

Linux安裝Docker

一、Docker系統版本介紹 Docker 是一個開源的應用容器引擎&#xff0c;讓開發者可以打包他們的應用以及依賴包到一個可移植的容器中&#xff0c;然后發布到任何流行的 Linux 或 Windows 操作系統的機器上&#xff0c;也可以實現虛擬化。 容器是完全使用沙箱機制&#xff0c;相…

誠邁科技榮膺小米“最佳供應商獎”

近日&#xff0c;誠邁科技受邀參加小米戰略合作伙伴HBR總結會。誠邁科技以盡職盡責的合作態度、精益求精的交付質量榮膺小米公司頒發的最佳供應商獎&#xff0c;其性能測試團隊榮獲優秀團隊獎。 誠邁科技與小米在手機終端方向一直保持著密切的合作關系&#xff0c;涉及系統框架…

centOS 快速安裝和配置 NVIDIA docker Container Toolkit

要在 CentOS 上正確安裝和配置 NVIDIA Container Toolkit&#xff0c;您可以按照以下步驟進行操作&#xff0c;如果1和2都已經完成&#xff0c;可以直接進行第3步NVIDIA Container Toolkit安裝配置。 1. 安裝 NVIDIA GPU 驅動程序&#xff1a; 您可以從 NVIDIA 官方網站下載適…

【Java基礎】Java對象的生命周期

【Java基礎】Java對象的生命周期 一、概述 一個類通過編譯器將一個Java文件編譯為Class字節碼文件&#xff0c;然后通過JVM中的解釋器編譯成不同操作系統的機器碼。雖然操作系統不同&#xff0c;但是基于解釋器的虛擬機是相同的。java類的生命周期就是指一個class文件加載到類…

Ubuntu安裝MySQL Server提示Depends: mysql-server-5.5怎么解決

在 Ubuntu 安裝 MySQL Server 時出現 Depends: mysql-server-5.5 的錯誤通常是因為系統中沒有找到所需的軟件包版本。這可能是因為軟件包源中沒有對應的版本或者軟件包版本沖突等原因。解決這個問題的方法如下&#xff1a; 更新軟件包列表&#xff1a; 在終端中運行以下命令&a…

python控制obs實現無縫切換場景!obs-websocket-py

前言 最近一直在研究孿生數字人wav2lip。目前成果可直接輸入高清嘴型&#xff0c;2070顯卡1分鐘音頻2.6分鐘輸出。在直播邏輯上可以做到1比1.3這樣&#xff0c;所以現在開始研究直播。在邏輯上涉及到了無縫切換&#xff0c;看到csdn上有一篇文章還要vip解鎖。。。那自己研究吧…

臨時用工小程序:一款便捷的用工管理軟件

隨著企業對人力資源需求的不斷增長&#xff0c;臨時用工需求也日益旺盛。為了滿足這一需求&#xff0c;我們研發了一款名為“臨時用工小程序”的軟件系統&#xff0c;旨在幫助企業實現臨時用工的高效管理。 一、技術棧介紹 后端技術棧 本系統采用Java語言作為開發語言&#…

尚硅谷MySQL筆記 3-9

我不會記錄的特別詳細 大體框架 基本的Select語句運算符排序與分頁多表查詢單行函數聚合函數子查詢 第三章 基本的SELECT語句 SQL分類 這個分類有很多種&#xff0c;大致了解下即可 DDL&#xff08;Data Definition Languages、數據定義語言&#xff09;&#xff0c;定義了…

項目難點:解決IOS調用起軟鍵盤之后頁面樣式布局錯亂問題

需求背景 &#xff1a; 開發了一個問卷系統重構項目&#xff0c;剛開始開發的為 PC 端&#xff0c;其中最頭疼的一點無非就是 IE 瀏覽器的兼容適配性問題&#xff1b; 再之后項目經理要求開發移動端&#xff0c;簡單的說就是寫 H5 頁面&#xff0c;到時候會內嵌在 App 應用、辦…

multiple definition of......first defined here

一、背景 環境&#xff1a; 銀河麒麟–ARM–GCC7.4.0 寫了一個動態庫&#xff0c;依賴opencv和freeImage等第三方庫&#xff0c;用cmake進行編譯。原本在centos6-x86-gcc7.5.0上面進行編譯非常的順利&#xff0c;但是拿到麒麟arm上面編譯就提示了這個錯誤&#xff1a;這個報錯…

Python conda命令

Windows下 Anaconda Prompt 這個東西就是用來管理Anaconda的&#xff0c;使用的是conda這樣的一種命令 在Linux中&#xff0c;可以直接在終端中輸入conda 命令 可以使用conda命令創建新的python環境&#xff08;python版本&#xff0c;包&#xff09;&#xff0c;新的環境與原…

Ruby軟件外包開發語言特點

Ruby 是一種動態、開放源代碼的編程語言&#xff0c;它注重簡潔性和開發人員的幸福感。在許多方面都具有優點&#xff0c;但由于其動態類型和解釋執行的特性&#xff0c;它可能不適合某些對性能和類型安全性要求較高的場景。下面和大家分享 Ruby 語言的一些主要特點以及適用的場…

【C語言】動態通訊錄 -- 詳解

?前言 前面詳細介紹了靜態版通訊錄【C語言】靜態通訊錄 -- 詳解_炫酷的伊莉娜的博客-CSDN博客&#xff0c;但是靜態版通訊錄的空間是無法被改變的&#xff0c;而且空間利用率也不高。為了解決靜態通訊錄這一缺點&#xff0c;這時就要有一個能夠隨著存入聯系人數量的增加而增大…