Kafka消費者相關

Kafka生產者相關-CSDN博客

消費者消費數據基本流程

package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-1");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));//從Kafka主題中獲取數據while (true){ConsumerRecords<String, String> poll = consumer.poll(100);for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//關閉消費者對象 因上面在無線循環//consumer.close();}
}

消費數據偏移量問題

Kafka中的 偏移量(offset)是用于標識每個消費者在某個分區內消費到的位置。每個分區的消息都有一個唯一的偏移量,消費者會根據這個偏移量來讀取消息。

Kafka偏移量的管理

Kafka默認提供兩種方式來管理偏移量:

  1. 自動提交偏移量(默認方式)
  2. 手動提交偏移量(需要顯式配置)

consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");? //默認設置

consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); //默認設置 每5秒提交一次

也就是說默認情況下? ?消費者如果后啟動? 無法讀取到生產者已經發送的消息

偏移量的重置

如果需要重新消費數據,可以通過 auto.offset.reset 配置項來控制消費者的偏移量重置行為。這個配置項有幾個常用的值:

  • earliest:如果沒有找到偏移量(比如第一次消費),消費者會從最早的消息開始消費。
  • latest:如果沒有找到偏移量,消費者會從最新的消息開始消費。
  • none:如果沒有找到偏移量,消費者會拋出異常。

例如,設置為從最早的消息開始消費:

consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

如果想從指定偏移量獲取消息

解決消費者重復消費的問題(不能完全解決)

以上示例,偏移量默認都是5秒一次提交

例如先啟動消費者

然后生產者發送了10000條數據? 不好演示的話可以在生產者那邊每發送一條數據然后

Thread.sleep 1秒

如果消費者在消費到一定程度之后? 突然停止? ?觀察再次啟動消費者? 存在消費者重復消費的情況

原因就是消費者偏移量默認5秒提交一次的原因

那么可以將消費者默認5秒提交偏移量縮短為1秒? 但是這樣不能完全解決問題

示例

消費者

代碼

package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//設置從最早的消息讀取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));
//        boolean flg = true;
//        while (flg) {
//            // 拉取數據
//            consumer.poll(Duration.ofMillis(100));
//            final Set<TopicPartition> assignment = consumer.assignment();
//
//            if (assignment != null && !assignment.isEmpty()) {
//                // 檢查分配的分區
//                for (TopicPartition topicPartition : assignment) {
//                    if ("test".equals(topicPartition.topic())) {
//                        // 將偏移量設置為2
//                        consumer.seek(topicPartition, 2);
//                        // 停止循環
//                        flg = false;
//                    }
//                }
//            }
//        }//從Kafka主題中獲取數據while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//關閉消費者對象 因上面在無線循環//consumer.close();}
}

生產者?

代碼

package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//創建配置對象Map<String,Object> configMap=new HashMap<>();//如果是集群隨意指定一個configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//對Key Value進行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置冪等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重試次數configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超時configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事務 事務基于冪等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//創建生產者對象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事務kafkaProducer.initTransactions();try {//開啟事務kafkaProducer.beginTransaction();for(int i=0;i<10000;i++){//key的作用是通過某種算法,放到topic的某個分區中//可以不設置key 默認是按照輪詢的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//發送數據  send方法還可以接收一個參數,就是回調函數  kafkaProducer.send(record);是異步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 處理發送失敗的情況e.printStackTrace();} else {// 處理發送成功的情況System.out.println("發送成功:" + recordMetadata);}}});//這樣變成同步了send.get();Thread.sleep(1000);}//提交事務kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事務kafkaProducer.abortTransaction();}finally {//關閉生產者對象kafkaProducer.close();}}
}

先啟動消費者

然后啟動生產者

觀察消費者控制臺

過會我再次啟動消費者

1.縮短自動提交偏移量的時間

因為默認消費者每5秒自動提交提交

可以縮短自動提交偏移量的時間? ?但這樣只能減少重復消費的量? 并不能徹底解決重復消費的問題

2.手動提交偏移量

package com.hrui;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//默認自動提交  改成false 變成手動提交偏移量consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//設置從最早的消息讀取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));
//        boolean flg = true;
//        while (flg) {
//            // 拉取數據
//            consumer.poll(Duration.ofMillis(100));
//            final Set<TopicPartition> assignment = consumer.assignment();
//
//            if (assignment != null && !assignment.isEmpty()) {
//                // 檢查分配的分區
//                for (TopicPartition topicPartition : assignment) {
//                    if ("test".equals(topicPartition.topic())) {
//                        // 將偏移量設置為2
//                        consumer.seek(topicPartition, 2);
//                        // 停止循環
//                        flg = false;
//                    }
//                }
//            }
//        }//從Kafka主題中獲取數據while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}//設置自動提交偏移量之后   這里要手動去保存偏移量//這里有兩種方式  同步提交 和異步提交偏移量consumer.commitAsync();//異步consumer.commitSync();//同步}//關閉消費者對象 因上面在無線循環//consumer.close();}
}

說明以上方式都不能徹底解決重復消費問題

重復消費問題還是存在

如果要進行原子綁定? 并非做不到,Kafka本身沒有提供相關功能

例如把拉取到的數據全部處理完了,才進行事務提交

一旦出現意外,業務數據恢復? 但是Kafka本身沒有提供相關功能 和與其他支持事務處理的應用結合使用

消費數據-事務隔離級別

生產者事務圖

這個報錯 寫在提交之前即可

消費者組介紹

如果兩個應用都是同一個消費者組

生產者A生產消息? ? ? 消費者B和C在同一個消費者組? 那么A的消息如果被B消費過了那么C是消費不到的? B和C默認是競爭關系

如果生產者A生產消息??消費者B和C在不同消費者組? ?那么消息會被B和C都消費

第一個場景:消費者B和C在同一個消費者組

如果消費者B和消費者C在同一個消費者組內,消息會按照負載均衡的方式分配給它們。這意味著生產者A生產的消息會被消費者B或消費者C中的一個消費,而不是同時被兩個消費者消費。所以,如果B已經消費了某條消息,消費者C就無法再消費到這條消息。這種行為是消費者組的基本特性,主要用于確保每條消息只被某個消費者處理一次。

第二個場景:消費者B和C在不同的消費者組

如果消費者B和消費者C在不同的消費者組中,那么生產者A生產的消息會分別被B和C都消費到。因為每個消費者組有自己獨立的消費進度(每個組有獨立的偏移量),所以每個消費者組都能獨立消費該消息。

第三個場景:消費者B和C同時開啟從頭消費

如果B和C都在同一個消費者組,并且設置了從頭消費,那么它們將從消息隊列的最開始位置開始消費。這種情況下,B和C是共享消費隊列的,它們會根據負載均衡規則交替消費消息,而不是同時消費同一條消息。因此,A的消息仍然不會同時被B和C消費。每條消息仍然只會被消費者組內的某個消費者消費,并且消息的消費是共享的,但并不是同時共享。

總結來說,在同一個消費者組內,消息的消費是競爭式的。即使B和C同時開啟從頭消費,它們也不會同時消費同一條消息。每條消息只會由其中一個消費者處理。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/70910.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/70910.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/70910.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【軟考-架構】備戰2025軟考

新老教材對比 科目1&#xff08;信息系統綜合&#xff09;考點詳解 科目2&#xff08;系統架構設計案例&#xff09;考點詳解 科目3&#xff08;系統架構設計論文&#xff09;考點詳解 趨于越來越具體 學習方法推薦 第一階段 – 基礎知識階段 建議一個半月&#xff1b; 先過…

MMW-1碳棒磨損機設計

摘 要 為了更好的測量在一定壓力下碳棒的磨損量&#xff0c;提高碳棒磨損量的測量精度&#xff0c;本文設計了一種MMW-1碳棒磨損機&#xff0c;該碳棒磨損機屬于柱盤式摩擦磨損試驗機的一種。該機器主要用于做和碳棒有關的摩擦磨損試驗&#xff0c;可以更準確的獲得相關的參數…

網絡運維學習筆記(DeepSeek優化版)005網工初級(HCIA-Datacom與CCNA-EI)鏈路層發現協議與VLAN技術

文章目錄 一、鏈路層發現協議1.1 思科CDP協議1.2 華為LLDP協議 二、VLAN&#xff08;Virtual Local Area Network&#xff0c;虛擬局域網&#xff09;技術詳解2.1 基本概念2.2 技術特性2.3 接口工作原理2.3.1 Access模式2.3.2 Trunk模式 2.4 廠商配置對比思科配置華為配置 2.5 …

SOME/IP-SD -- 協議英文原文講解5

前言 SOME/IP協議越來越多的用于汽車電子行業中&#xff0c;關于協議詳細完全的中文資料卻沒有&#xff0c;所以我將結合工作經驗并對照英文原版協議做一系列的文章。基本分三大塊&#xff1a; 1. SOME/IP協議講解 2. SOME/IP-SD協議講解 3. python/C舉例調試講解 5.1.2.5 S…

APNG格式圖片文件大小優化方案 轉WEBP

文章目錄 原因過程相關下載相關文檔后記 原因 頁面上有個特效動畫&#xff0c;PNG文件&#xff0c;APNG格式&#xff0c;13M大小&#xff0c;太占用內容了&#xff0c;要優化一下。 過程 直接上命令吧 ffmpeg -i input.apng -vf "formatrgba" -loop 0 output.web…

個人電腦小參數GPT預訓練、SFT、RLHF、蒸餾、CoT、Lora過程實踐——MiniMind圖文版教程

最近看到Github上開源了一個小模型的repo&#xff0c;是真正拉低LLM的學習門檻&#xff0c;讓每個人都能從理解每一行代碼&#xff0c; 從零開始親手訓練一個極小的語言模型。開源地址&#xff1a; GitHub - jingyaogong/minimind: &#x1f680;&#x1f680; 「大模型」2小時…

PHP Zip 文件處理指南

PHP Zip 文件處理指南 引言 ZIP文件是一種流行的壓縮格式&#xff0c;廣泛用于文件壓縮和歸檔。PHP作為一門強大的服務器端腳本語言&#xff0c;提供了豐富的類和方法來處理ZIP文件。本文將詳細介紹PHP中ZIP文件的處理方法&#xff0c;包括創建、添加文件、提取文件以及壓縮和…

Java使用ZXing庫生成帶有Logo的二維碼圖片,并去除白邊動態伸縮上傳到阿里云OSS

文章目錄 引言二維碼基本原理1、二維碼概述2、QR Code結構3、錯誤糾正級別 QR Code生成技術1、ZXing庫2、生成二維碼的步驟 圖像處理技術1、嵌入Logo2. 去除白邊 阿里云OSS基本概念1、OSS概述2. 主要功能3. 基本概念 實戰演示1、依賴庫2、類結構3、生成普通二維碼4. 去除白邊5、…

AI工具箱最新使用教程

先克隆項目 電腦需要先安裝 git &#xff0c;安裝的畫看這個 Git安裝教程&#xff08;超詳細&#xff09;。 git鏡像 git clone https://github.com/Escaflowne1985/MyToolsWebBackendUser.gitgitee鏡像 git clone https://gitee.com/escaflowne/MyToolsWebBackendUser.git…

Android-創建mipmap-anydpi-v26的Logo

利用 Android Studio 自動創建 創建新項目&#xff1a;打開 Android Studio&#xff0c;點擊 “Start a new Android Studio project” 創建新項目。在創建項目的過程中&#xff0c;當設置Target SDK Version為 26 或更高版本時&#xff0c;Android Studio 會在項目的res目錄下…

SEO煉金術(4)| Next.js SEO 全攻略

在上一篇文章 SEO煉金術&#xff08;3&#xff09;| 深入解析 SEO 關鍵要素 中&#xff0c;我們深入解析了 SEO 關鍵要素&#xff0c;包括 meta 標簽、robots.txt、canonical、sitemap.xml 和 hreflang&#xff0c;并探討了它們在搜索引擎優化&#xff08;SEO&#xff09;中的作…

面試葵花寶典之React(持續更新中)

1.談談你對HOC的理解 定義&#xff1a; 高階組件是一個接收組件作為參數并返回新組件的函數&#xff0c;用于復用組件邏輯&#xff0c;遵循純函數特性&#xff08;無副作用&#xff0c;輸出僅依賴輸入&#xff09;。 組合性&#xff1a;可嵌套使用多個 HOC。 HOC&#xff08;…

數據安全管理的AI工具有哪些?

數據安全管理的AI工具在不斷演進&#xff0c;它們憑借強大的算法和學習能力&#xff0c;為企業筑起了一道堅不可摧的數字防線。 在數據安全管理領域&#xff0c;AI工具的應用日益廣泛&#xff0c;以下是一些常見的工具及其功能&#xff1a; AI驅動的數據分類與標記 工具: Micr…

Matplotlib 繪圖標記

Matplotlib 繪圖標記 引言 Matplotlib 是一個功能強大的 Python 繪圖庫&#xff0c;廣泛用于數據可視化。在 Matplotlib 中&#xff0c;繪圖標記&#xff08;markers&#xff09;是數據點在圖表中顯示的方式。正確的使用繪圖標記可以增強圖表的可讀性和美觀性。本文將詳細介紹…

【DeepSeek問答】QProcess::start是異步的嗎?會使UI卡頓嗎?

好的&#xff0c;我現在需要回答用戶關于QProcess::start是否是異步的&#xff0c;以及它是否會導致UI卡頓的問題。用戶之前已經詢問過如何開發一個調用7-Zip的Qt應用&#xff0c;并且我給出了示例代碼。現在用戶可能在使用過程中遇到了問題&#xff0c;或者想更深入了解QProce…

Android 常用命令和工具解析之存儲相關

1 基本概念 2 命令解讀 2.1 adb shell df df 命令主要用于需要檢查文件系統上已使用和可用的磁盤空間的數量。如果沒有指定文件名&#xff0c;則顯示在當前所有掛載的文件系統上可用的空間。其原理是從proc/mounts 或 /etc/mtab 中檢索磁盤信息。 注意&#xff1a;df命令并…

使用ZFile打造屬于自己的私有云系統結合內網穿透實現安全遠程訪問

文章目錄 前言1.關于ZFile2.本地部署ZFile3.ZFile本地訪問測試4.ZFile的配置5.cpolar內網穿透工具安裝6.創建遠程連接公網地址7.固定ZFile公網地址 前言 在數字化的今天&#xff0c;我們每個人都是信息的小能手。無論是職場高手、攝影達人還是學習狂人&#xff0c;每天都在創造…

HarmonyOS 5.0應用開發——鴻蒙接入高德地圖實現POI搜索

【高心星出品】 文章目錄 鴻蒙接入高德地圖實現POI搜索運行結果&#xff1a;準備地圖編寫ArkUI布局來加載HTML地圖 鴻蒙接入高德地圖實現POI搜索 在當今數字化時代&#xff0c;地圖應用已成為移動設備中不可或缺的一部分。隨著鴻蒙系統的日益普及&#xff0c;如何在鴻蒙應用中…

idea + Docker + 阿里鏡像服務打包部署

一、下載docker desktop軟件 官網下載docker desktop&#xff0c;需要結合wsl使用 啟動成功的畫面(如果不是這個畫面例如一直處理start或者是stop需要重新啟動&#xff0c;不行就重啟電腦) 打包成功的鏡像在這里&#xff0c;如果頻繁打包會導致磁盤空間被占滿&#xff0c;需…

IP---網絡類型

這只是IP的其中一塊內容-網絡類型&#xff0c;IP還有更多內容可以查看IP專欄&#xff0c;前一章內容為訪問服務器流程&#xff0c;可通過以下路徑查看IP----訪問服務器流程-CSDN博客&#xff0c;歡迎指正 2.網絡類型 網絡類型---根據二層&#xff08;數據鏈路層&#xff09;所…