Kafka入門-消費者

消費者

Kafka消費方式:采用pull(拉)的方式,消費者從broker中主動拉去數據。使用pull的好處就是消費者可以根據自身需求,進行拉取數據,但是壞處就是如果Kafka沒有數據,那么消費者可能會陷入循環中,一直返回空數據。

消費者與消費者之間是獨立的,一個消費者可以消費多個分區數據。但是消費組不同,每個分區的數據只能由消費者組中的一個消費者消費,避免重復消費導致數據重復。

消費者組:

  • 消費者組由多個consumer組成,形成一個消費者組的條件,是所有消費者的groupid相同。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費。
  • 消費者組之間互不影響,所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。

消費者組初始化流程:

在這里插入圖片描述

消費者組詳細消費流程

在這里插入圖片描述

Java創建消費者

注意:在消費者API代碼中必須配置消費者組id。命令行啟動消費者不需要填寫是因為id被自動填寫為隨機的消費者組id。

通過API消費一個主題的數據
//配置Properties properties = new Properties();//連接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.創建一個消費者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.定義主題ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//3.消費數據while (true){//拉取的間隔時間ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}}
消費者消費指定分區

需要指定分區只需要在定義主題時,使用定義主題以及分區方法

//2.定義主題以及分區ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);
消費者組案例

創建三個消費者,進行消費不同分區

直接復制上面消費主題的代碼,因為設置的groupid都是test,因此會自動成為一個消費者組。運行消費者組test內的三個消費者,然后運行生產者對每個分區進行發送消息,可以看到每個消費者都只消費了一個分區的消息。

注意:消費者組內的消費者在底層進行了編號,跟java類取名無關。

分區的分配以及再平衡

消費者組有多個消費者,而一個topic又有多個分區,那么應該由哪個消費者消費哪個分區呢?

Kafka有三種主流的分區分配策略,可以通過配置參數partiton.assignment.strategy修改分配策略,默認的策略是Range+CooperativeSticky。Kafka可以同時使用多個分區分配策略。

//設置分區分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

再平衡:相當于原有分區的消費者突然發送意外,不能再進行消費,重新分配該分區給其他消費者;或者消費者組中新增了消費組,需要重新分配分區。

在這里插入圖片描述

  • Range

    在這里插入圖片描述

    Range的再平衡,會將原消費者負責的分區一次性全部交給剩下的某一個消費者

  • RoundRobin

    在這里插入圖片描述

    當觸發再分配時,會將原消費者負責的分區按照RoundRobin一樣進行重新分發

  • Sticky

    Sticky也是針對所有topic的策略,黏性分區是一種均勻隨機的分配策略,會在執行一次新的分配之前,考慮上一次的分配結果,盡量少的調整分配的變動,可以節省開銷。首先會盡量均衡的分配分區給消費者,在同一組內的消費者出現問題,也會盡量保持原有分配的分區不發送變化。但是在發生再平衡時,所有的消費者需要先放棄當前持有的分區資源,等待重新分配。

  • CooperativeSticky

    CooperativeSticky是2.4版本新增的策略,在原有Sticky策略上,將原本大規模的再平衡操作,拆分成了多次小規模的再平衡,直到平衡完成。

Offset位移

offset的默認維護位置

Offset,消息位移,它表示分區中每條消息的位置信息,是一個單調遞增且不變的值。換句話說,offset可以用來唯一的標識分區中每一條記錄。消費者消費完一條消息記錄之后,需要提交offset來告訴Kafka Broker自己消費到哪里了。

在這里插入圖片描述

_consumer_offsets主題采用key和value的方式存儲數據,key是group.id+topic+分區號,value就是當前offset的值,每隔一段時間,kafka內部會對這個topic進行compact,也就是每個key都保留最新數據。

默認情況下,是不允許查看消費系統主題數據的,如果需要查看該系統主題數據,要設置config/consumer.properties中添加配置exclude.internal.topics=false。默認是true,表示不能看系統相關信息。

自動提交offset

為了讓用戶更專注于自己的業務邏輯,Kafka提供了自動提交offset的功能,一段時間后自動提交offset。相關參數:

enable.auto.commit 是否開啟自動提交offset功能,默認為true

auto.commmit.intervalms 自動提交offset分時間間隔,默認是5s。

//自動提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交時間間隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
手動提交offset

自動提交是基于時間提交,開發人員很難把握提交的時機,因此Kafka還提供了手動提交offset的API。

//設置手動提交(關閉自動)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

手動提交分為同步提交和異步提交。兩者的共同點是都會將本次提交的一批數據最高的偏移量提交,不同的是同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試,而異步提交則沒有失敗重試機制,所有有可能提交失敗。

同步提交:必須等待offset提交完畢,再去消費下一批數據。

//手動提交(同步)kafkaConsumer.commitSync();

異步提交:發送完offset請求后,就開始消費下一批數據了。

//手動提交(異步)kafkaConsumer.commitAsync();
指定Offset消費
//指定位置進行消費,先獲取分區信息Set<TopicPartition> assignment = kafkaConsumer.assignment();//保證分區分配方案已經指定完畢,剛訂閱主題時不能立即獲取到分區信息while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment = kafkaConsumer.assignment();}for (TopicPartition topicPartition : assignment) {//                   分區           指定offsetkafkaConsumer.seek(topicPartition,100);}
指定時間消費
//指定位置進行消費,先獲取分區信息Set<TopicPartition> assignment = kafkaConsumer.assignment();//保證分區分配方案已經指定完畢,剛訂閱主題時不能立即獲取到分區信息while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();for (TopicPartition topicPartition : assignment) {//如果希望是一天前的當前時刻,那么就用當前時間減去一天間隔,單位為mstopicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis()- 1 * 24 * 3600 * 1000);}//將時間轉換為對應的offsetMap<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}
消費者事務

在消費者消費的過程中會遇到重復消費和漏消費的情況發生。

漏消費:先提交offset后進行消費,有可能造成數據的漏消費

重復消費:已經消費數據,但是offset沒有提交

在這里插入圖片描述

如果想精準的進行一次性消費,那么需要Kafka消費端將消費過程和提交offset過程做原子綁定。可以將Kafka的offset保存到支持事務的工具中。(比如MySQL)

數據積壓

默認日志存儲時間為7天,如果當消費速度低于消息的發送速度,那么就很可能造成數據積壓。

如果Kafka消費能力不足,那么可以增加Topic的分區數,并且同時提升消費者組的消費者數量,消費者數=分區數。

如果下游數據處理不及時,那么提高每批次拉取的數據量。批次拉取數據過少,使得處理的數據小于生產的數據,也會造成數據積壓。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/85917.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/85917.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/85917.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SpringBoot自動化部署實戰技術文章大綱

技術背景與目標 介紹SpringBoot在現代開發中的重要性自動化部署的價值&#xff1a;提升效率、減少人為錯誤、實現CI/CD適用場景&#xff1a;中小型Web應用、微服務架構 自動化部署核心方案 基于Docker的容器化部署 SpringBoot應用打包為Docker鏡像使用Docker Compose編排多容…

TDengine 集群運行監控

簡介 為了確保集群穩定運行&#xff0c;TDengine 集成了多種監控指標收集機制&#xff0c;并通過 taosKeeper 進行匯總。taosKeeper 負責接收這些數據&#xff0c;并將其寫入一個獨立的 TDengine 實例中&#xff0c;該實例可以與被監控的 TDengine 集群保持獨立。TDengine 中的…

C# 委托UI控件更新例子,何時需要使用委托

1. 例子1 private void UdpRxCallBackFunc(UdpDataStruct info) {// 1. 前置檢查防止無效調用if (textBoxOutput2.IsDisposed || !textBoxOutput2.IsHandleCreated)return;// 2. 使用正確的委托類型Invoke(new Action(() >{// 3. 雙重檢查確保安全if (textBoxOutput2.IsDis…

[10-2]MPU6050簡介 江協科技學習筆記(22個知識點)

1 2 3 歐拉角是描述三維空間中剛體或坐標系之間相對旋轉的一種方法。它們由三個角度組成&#xff0c;通常表示為&#xff1a; ? 偏航角&#xff08;Yaw&#xff09;&#xff1a;繞垂直軸&#xff08;通常是z軸&#xff09;的旋轉&#xff0c;表示偏航方向的變化。 ? 俯仰角&a…

虛擬環境共享系統包

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 虛擬環境共享系統包 python basic_pipelines/detection.py如果報錯顯示如下&#xff1a; Traceback (most recent call last):File "/home/ai/hailort/hailo-rpi5-exam…

Java求職者面試題解析:基礎概念、計算機基礎與源碼原理

Java求職者面試題解析&#xff1a;基礎概念、計算機基礎與源碼原理 第一輪&#xff1a;基礎概念問題 1. 什么是Java的跨平臺特性&#xff1f; Java的跨平臺特性是指Java程序可以在任何支持Java虛擬機&#xff08;JVM&#xff09;的設備上運行&#xff0c;而無需重新編譯。這…

網頁前端開發(基礎進階4--axios)

Ajax Ajax(異步的JavaScript和XML) 。 XML是可擴展標記語言&#xff0c;本質上是一種數據格式&#xff0c;可以用來存儲復雜的數據結構。 可以通過Ajax給服務器發送請求&#xff0c;并獲取服務器響應的數據。 Ajax采用異步交互&#xff1a;可以在不重新加載整個頁面的情況下&am…

設計模式-迪米特法則

迪米特法則 迪米特法則 (Law of Demeter, LoD)&#xff0c;也被稱為“最少知識原則 (Principle of Least Knowledge)”&#xff0c;是面向對象設計中的一個重要原則。 核心思想&#xff1a;一個對象應該對其他對象有盡可能少的了解。 更具體地說&#xff0c;它規定了一個對象…

結構性-代理模式

動態代理主要是為了處理重復創建模板代碼的場景。 使用示例 public interface MyInterface {String doSomething(); }public class MyInterfaceImpl implements MyInterface{Overridepublic String doSomething() {return "接口方法dosomething";} }public class M…

Unity大型項目資源框架

?? Unity大型項目資源管理:低端機檢測后自動切換資源框架(大廠風格) ?? 框架目標 ? 啟動時檢測機型性能,判定設備等級 ? 同一資源有高配/中配/低配不同壓縮格式 ? 根據設備等級,加載對應資源包(AB) ? 支持動態切換(可用來切換特效/貼圖分辨率/模型LOD) ? 保證…

MATLAB仿真:偏振光在光纖通信中的應用研究_可復現,有問題請聯系博主

MATLAB仿真:偏振光在光纖通信中的應用研究 1. 研究概述 本文通過MATLAB仿真研究偏振光在光纖通信中的關鍵技術,包括偏振態生成、傳輸特性和檢測方法,重點分析偏振模色散(PMD)的影響機制,并設計偏振控制優化方案。 %% 主程序框架 clc; clear; close all; addpath(Polar…

CTA-861-G-2017中文pdf版

CTA-861-G標準&#xff08;2016年11月發布&#xff09;規范未壓縮高速數字接口的DTV配置&#xff0c;涵蓋視頻格式、色彩編碼、輔助信息傳輸等&#xff0c;適用于DVI、HDMI等接口&#xff0c;還涉及EDID數據結構及HDR元數據等內容。

C++核心編程_繼承方式

繼承的語法&#xff1a;class 子類 : 繼承方式 父類 繼承降屬性權限&#xff0c;不可升屬性權限 繼承方式一共有三種&#xff1a; 公共繼承 保護繼承 私有繼承 #include <iostream> #include <string> using namespace std;class Base1 { public:int m_A; p…

Dockerfile常用指令介紹

Dockerfile常用指令介紹 Dockerfile是一個文本文件&#xff0c;用于定義Docker鏡像的構建過程。下面介紹一些最常用的Dockerfile指令及其用法&#xff1a; 基礎指令 FROM - 指定基礎鏡像 FROM python:3.9-slim這是Dockerfile的第一個指令&#xff0c;用于指定構建鏡像的基礎鏡…

Spring中@Primary注解的作用與使用

在 Spring 框架中&#xff0c;Primary 注解用于解決依賴注入時的歧義性&#xff08;Ambiguity&#xff09;問題。當 Spring 容器中存在多個相同類型的 Bean 時&#xff0c;通過 Primary 標記其中一個 Bean 作為默認的首選注入對象。 核心作用&#xff1a; 解決多個同類型 Bean …

本地優先的狀態管理與工具選型策略

本地優先&#xff1a;合理把控狀態共享邊界 在 React 應用開發過程中&#xff0c;開發者容易陷入一個認知誤區——過度追求狀態的全局化。許多新手開發者在項目初期就急于引入 Redux、Zustand 或 Jotai 等狀態管理工具&#xff0c;將一些本應屬于組件內部的瑣碎狀態&#xff0…

OpenCV CUDA模塊圖像處理-----對圖像執行 均值漂移過程(Mean Shift Procedure)函數meanShiftProc()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 執行一個均值漂移過程&#xff08;mean-shift procedure&#xff09;&#xff0c;并將處理后的點的信息&#xff08;它們的顏色和位置&#xff0…

硬件I2C和軟件I2C的區別

硬件I2C和軟件I2C的區別 一、硬件I2C 1、硬件IC的局限性及學習意義 盡管硬件IC外設在STM32等微控制器中提供了標準化的通信支持&#xff0c;但在實際應用中&#xff0c;其穩定性可能存在問題。例如&#xff0c;某些情況下外設會因事件檢測異常而進入死鎖狀態&#xff0c;僅能…

推薦12個wordpress企業網站模板

WordPress企業網站模板是一種專為企業網站設計的WordPress主題&#xff0c;旨在幫助企業創建專業、美觀且易于管理的網站。這些模板通常具備響應式設計、SEO優化、多語言支持等功能&#xff0c;能夠滿足不同行業和企業的需求。 WordPress企業網站模板的適用場景 企業官網&…

68道Hbase高頻題整理(附答案背誦版)

簡述什么是Hbase數據庫&#xff1f; Hbase是一個高可靠性、高性能、面向列、可伸縮的分布式存儲系統&#xff0c;它利用HBase技術在HDFS上提供了類似于Bigtable的能力。換句話說&#xff0c;Hbase是Apache Hadoop生態系統中的一部分&#xff0c;可以為大數據應用提供快速的隨機…