Kafka原生API使用Java代碼-消費者組-消費模式

文章目錄

  • 1、消費模式
    • 1.1、創建一個3分區1副本的 主題 my_topic1
    • 1.2、創建生產者 KafkaProducer1
    • 1.2、創建消費者
      • 1.2.1、創建消費者 KafkaConsumer1Group1 并指定組 my_group1
      • 1.2.3、創建消費者 KafkaConsumer2Group1 并指定組 my_group1
      • 1.2.3、創建消費者 KafkaConsumer3Group1 并指定組 my_group1
      • 1.2.4、創建消費者 KafkaConsumer1Group2 并指定組 my_group2
    • 1.3、eagle for apache kafka
      • 1.3.1、查看分區0的數據
      • 1.3.2、查看分區1的數據
      • 1.3.3、查看分區2的數據

1、消費模式

消費模式

  1. 點對點:一個組消費消息時,只能由組內的一個消費者消費一次 避免重復消費
  2. 發布訂閱:多個組消費消息時,每個組都可以消費一次消息

1.1、創建一個3分區1副本的 主題 my_topic1

在這里插入圖片描述

1.2、創建生產者 KafkaProducer1

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducer1 {/*** 主函數用于演示如何向Kafka的特定主題發送消息。** @param args 命令行參數(未使用)*/public static void main(String[] args) throws ExecutionException, InterruptedException {// 初始化Kafka生產者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口props.put("acks", "all"); // 確認消息寫入策略props.put("retries", 0); // 消息發送失敗時的重試次數props.put("linger.ms", 1); // 發送緩沖區等待時間props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定鍵的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 創建Kafka生產者實例Producer<String, String> producer = new KafkaProducer<>(props);// 發送消息到主題"my_topic3"// 異步發送消息:不接收kafka的響應//producer.send(new ProducerRecord<String, String>("my_topic3",  "hello,1,2,3"));// 注釋掉的循環代碼塊展示了如何批量發送消息//for (int i = 0; i < 100; i++)//    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));for (int i=0;i<20;i++) {producer.send(new ProducerRecord<String, String>("my_topic1",i%3,"null","我是"+i),new Callback() {//消息發送成功,kafka broker ack 以后回調@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//exception:如果有異常代表消息未能正常發送到kafka,沒有異常代表消息發送成功://此時kafka的消息不一定持久化成功(需要kafka生產者加配置)//RecordMetadata代表發送成功的消息的元數據System.out.println("partition = " + recordMetadata.partition());}});}// 關閉生產者實例producer.close();}
}
partition = 2
partition = 2
partition = 2
partition = 2
partition = 2
partition = 2
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 1
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0
partition = 0

1.2、創建消費者

1.2.1、創建消費者 KafkaConsumer1Group1 并指定組 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer1Group1 {/*** 主函數入口,創建并運行一個Kafka消費者來消費主題"foo"和"bar"的消息。** @param args 命令行參數(未使用)*/public static void main(String[] args) {// 初始化Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消費者組IDprops.setProperty("enable.auto.commit", "true"); // 自動提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自動提交偏移量的時間間隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置創建KafkaConsumer實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱要消費的主題consumer.subscribe(Arrays.asList("my_topic1"));// 持續消費消息while (true) {// 從Kafka服務器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍歷并處理收到的消息記錄for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 2,value = 我是2
offset = 1,partition: 2,value = 我是5
offset = 2,partition: 2,value = 我是8
offset = 3,partition: 2,value = 我是11
offset = 4,partition: 2,value = 我是14
offset = 5,partition: 2,value = 我是17

1.2.3、創建消費者 KafkaConsumer2Group1 并指定組 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer2Group1 {/*** 主函數入口,創建并運行一個Kafka消費者來消費主題"foo"和"bar"的消息。** @param args 命令行參數(未使用)*/public static void main(String[] args) {// 初始化Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消費者組IDprops.setProperty("enable.auto.commit", "true"); // 自動提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自動提交偏移量的時間間隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置創建KafkaConsumer實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱要消費的主題consumer.subscribe(Arrays.asList("my_topic1"));// 持續消費消息while (true) {// 從Kafka服務器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍歷并處理收到的消息記錄for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 1,value = 我是1
offset = 1,partition: 1,value = 我是4
offset = 2,partition: 1,value = 我是7
offset = 3,partition: 1,value = 我是10
offset = 4,partition: 1,value = 我是13
offset = 5,partition: 1,value = 我是16
offset = 6,partition: 1,value = 我是19

1.2.3、創建消費者 KafkaConsumer3Group1 并指定組 my_group1

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer3Group1 {/*** 主函數入口,創建并運行一個Kafka消費者來消費主題"foo"和"bar"的消息。** @param args 命令行參數(未使用)*/public static void main(String[] args) {// 初始化Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group1"); // 消費者組IDprops.setProperty("enable.auto.commit", "true"); // 自動提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自動提交偏移量的時間間隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置創建KafkaConsumer實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱要消費的主題consumer.subscribe(Arrays.asList("my_topic1"));// 持續消費消息while (true) {// 從Kafka服務器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍歷并處理收到的消息記錄for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 0,value = 我是0
offset = 1,partition: 0,value = 我是3
offset = 2,partition: 0,value = 我是6
offset = 3,partition: 0,value = 我是9
offset = 4,partition: 0,value = 我是12
offset = 5,partition: 0,value = 我是15
offset = 6,partition: 0,value = 我是18

1.2.4、創建消費者 KafkaConsumer1Group2 并指定組 my_group2

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer1Group2 {/*** 主函數入口,創建并運行一個Kafka消費者來消費主題"foo"和"bar"的消息。** @param args 命令行參數(未使用)*/public static void main(String[] args) {// 初始化Kafka消費者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.74.148:9092"); // Kafka broker的地址和端口props.setProperty("group.id", "my_group2"); // 消費者組IDprops.setProperty("enable.auto.commit", "true"); // 自動提交偏移量props.setProperty("auto.commit.interval.ms", "1000"); // 自動提交偏移量的時間間隔props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵的反序列化器props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使用配置創建KafkaConsumer實例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 訂閱要消費的主題consumer.subscribe(Arrays.asList("my_topic1"));// 持續消費消息while (true) {// 從Kafka服務器拉取一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 遍歷并處理收到的消息記錄for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d,partition: %d,value = %s%n",record.offset(),record.partition(), record.value());}}}
offset = 0,partition: 2,value = 我是2
offset = 1,partition: 2,value = 我是5
offset = 2,partition: 2,value = 我是8
offset = 3,partition: 2,value = 我是11
offset = 4,partition: 2,value = 我是14
offset = 5,partition: 2,value = 我是17
offset = 0,partition: 1,value = 我是1
offset = 1,partition: 1,value = 我是4
offset = 2,partition: 1,value = 我是7
offset = 3,partition: 1,value = 我是10
offset = 4,partition: 1,value = 我是13
offset = 5,partition: 1,value = 我是16
offset = 6,partition: 1,value = 我是19
offset = 0,partition: 0,value = 我是0
offset = 1,partition: 0,value = 我是3
offset = 2,partition: 0,value = 我是6
offset = 3,partition: 0,value = 我是9
offset = 4,partition: 0,value = 我是12
offset = 5,partition: 0,value = 我是15
offset = 6,partition: 0,value = 我是18

在這里插入圖片描述

1.3、eagle for apache kafka

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

1.3.1、查看分區0的數據

在這里插入圖片描述

[[{"partition": 0,"offset": 0,"msg": "我是0","timespan": 1717226677707,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 1,"msg": "我是3","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 2,"msg": "我是6","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 3,"msg": "我是9","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 4,"msg": "我是12","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 5,"msg": "我是15","timespan": 1717226677722,"date": "2024-06-01 07:24:37"},{"partition": 0,"offset": 6,"msg": "我是18","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

1.3.2、查看分區1的數據

在這里插入圖片描述

[[{"partition": 1,"offset": 0,"msg": "我是1","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 1,"msg": "我是4","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 2,"msg": "我是7","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 3,"msg": "我是10","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 4,"msg": "我是13","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 5,"msg": "我是16","timespan": 1717226677722,"date": "2024-06-01 07:24:37"},{"partition": 1,"offset": 6,"msg": "我是19","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

1.3.3、查看分區2的數據

在這里插入圖片描述

[[{"partition": 2,"offset": 0,"msg": "我是2","timespan": 1717226677720,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 1,"msg": "我是5","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 2,"msg": "我是8","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 3,"msg": "我是11","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 4,"msg": "我是14","timespan": 1717226677721,"date": "2024-06-01 07:24:37"},{"partition": 2,"offset": 5,"msg": "我是17","timespan": 1717226677722,"date": "2024-06-01 07:24:37"}]
]

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/20486.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/20486.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/20486.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

算法練習第25天|491. 非遞減子序列

491. 非遞減子序列 491. 非遞減子序列https://leetcode.cn/problems/non-decreasing-subsequences/ 題目描述&#xff1a; 給你一個整數數組 nums &#xff0c;找出并返回所有該數組中不同的遞增子序列&#xff0c;遞增子序列中 至少有兩個元素 。你可以按 任意順序 返回答案…

Flutter 中的 ButtonTheme 小部件:全面指南

Flutter 中的 ButtonTheme 小部件&#xff1a;全面指南 Flutter 是一個由 Google 開發的跨平臺 UI 框架&#xff0c;它提供了一系列的組件來幫助開發者構建美觀且功能豐富的應用。在 Flutter 的組件庫中&#xff0c;ButtonTheme 是一個重要的小部件&#xff0c;它允許開發者統…

Linux、Windows安裝python環境(最新版及歷史版本指定版本)-python

目錄 一、Linux環境二、windows環境最新版本下載指定版本下載 python 官網地址&#xff1a; https://www.python.org/ 一、Linux環境 以openEuler/CentOS為例 查看可安裝python源版本 dnf provides python*默認安裝新版本 dnf install -y python3. 進入python python退出p…

電源小白入門學習8——電荷泵電路原理及使用注意事項

電源小白入門學習8——電荷泵電路原理及使用注意事項 電荷泵簡介電荷泵原理電荷泵設計過程中需要注意的點fly電容的安秒平衡DC/DC功率轉換技術對比 電荷泵簡介 電荷泵&#xff08;Charge Pump&#xff09;是一種電路拓撲結構&#xff0c;用于實現電壓升壓或降壓的功能。它通過…

Python自動化測試斷言詳細實戰代碼(建議收藏)

&#x1f345; 視頻學習&#xff1a;文末有免費的配套視頻可觀看 &#x1f345; 點擊文末小卡片 &#xff0c;免費獲取軟件測試全套資料&#xff0c;資料在手&#xff0c;漲薪更快 在測試用例中&#xff0c;執行完測試用例后&#xff0c;最后一步是判斷測試結果是 pass 還是 fa…

sh發送郵件如何通過配置SMTP服務器來實現?

sh發送郵件的操作方法&#xff1f;如何使用Shell腳本自動發信&#xff1f; 在Shell腳本中實現郵件發送功能是一項常見需求&#xff0c;特別是在自動化任務執行或系統監控中。AokSend將介紹如何通過配置SMTP服務器來實現sh發送郵件的方法和注意事項。 sh發送郵件&#xff1a;安…

Redash、Superset、DataEase、Metabase、FineBI 和 Power BI 報表系統的優缺點

最近在做報表系統的選型與調研&#xff0c;其中嘗試了Redash、Superset、DataEase、Metabase、FineBI 和 Power BI幾個報表系統&#xff0c;主要想使用開源免費的&#xff0c;如果大家有好用的報表系統推薦歡迎留言。 Redash 優點&#xff1a; 開源且免費&#xff1a;Redash…

【已解決】Error in the HTTP2 framing layer

1.問題描述 在使用git將代碼上傳github的時候在最后一部push的時候遇到這個fatal 2.解決方案 由于我原先設置的origin是http協議下的&#xff0c;如下 git remote add origin https://github.com/Charlesbibi/Simple_Cloud.githttp協議下行不通不妨試一試ssh協議下&#xff…

跟風報考PMP,我真的后悔了

真的太香吧&#xff01; 我一開始沒打算報考PMP證書的&#xff0c;但是我看身邊很多朋友都因為PMP證書得到了升職加薪&#xff0c;這讓我實在是一整個羨慕住了&#xff0c;所以我也去報考了PMP。 報考PMP前期我做了什么&#xff1f; 由于我是零基礎&#xff0c;沒有什么項目…

探索網格生成技術在AI去衣應用中的作用

引言&#xff1a; 隨著人工智能技術的飛速發展&#xff0c;其在圖像處理和計算機視覺領域的應用日益廣泛。其中&#xff0c;AI去衣技術作為一種新興的應用&#xff0c;引起了廣泛的關注和討論。然而&#xff0c;要實現這一功能并非易事&#xff0c;需要借助于先進的算法和技術。…

Mybatis第一講——你會Mybatis嗎?

文章目錄 什么是MybatisMybatis的作用是什么 Mybatis 怎么使用注解的方式注解的多種使用Options注解ResultType注解 XML的方式update標簽 #{} 和 ${}符號的區別#{}占位${}占位 ${}占位的危險性(SQL注入)數據庫連接池 什么是Mybatis 首先什么是Mybatis呢&#xff1f;Mybatis是一…

latex bib引參考文獻

1.bib內容 2.sn-mathphys-num是官方的參考文獻格式 3.不用導cite包&#xff0c;文中這么寫 4.end document前ckwx是自己命名的bib的名字

Ollama教程,本地部署大模型Ollama,docker安裝方法,僅供學習使用

不可商用&#xff01;&#xff01;僅僅提供學習使用&#xff01; 先上視頻教學&#xff1a; Ollama教程&#xff0c;本地部署大模型Ollama&#xff0c;docker安裝方法&#xff0c;僅供學習使用&#xff01; 資料獲取 &#xff1a; Ollama下載包和安裝文檔在這里&#xff1…

Web自動化測試-掌握selenium工具用法,使用WebDriver測試Chrome/FireFox網頁(Java

目錄 一、在Eclipse中構建Maven項目 1.全局配置Maven 2.配置JDK路徑 3.創建Maven項目 4.引入selenium-java依賴 二、Chrome自動化腳本編寫 1.創建一個ChromeTest類 2.測試ChromeDriver 3.下載chromedriver驅動 4.在腳本中通過System.setProperty方法指定chromedriver的…

vi和vim有什么不同?

vi 和 vim 都是流行的文本編輯器&#xff0c;它們之間有以下主要區別&#xff1a; 歷史&#xff1a; vi 是一個非常古老的文本編輯器&#xff0c;最初由 Bill Joy 在 1976 年為 Unix 系統編寫。vim&#xff08;Vi IMproved&#xff09;是 vi 的一個增強版&#xff0c;由 Bram M…

Ubuntu 20.04安裝CMake 3.22.6版本

Ubuntu 20.04通過apt安裝的cmake版本是3.16.3&#xff0c;默認安裝到/usr/bin/cmake路徑。 $ cmake Command cmake not found, but can be installed with:sudo snap install cmake # version 3.29.3, or sudo apt install cmake # version 3.16.3-1ubuntu1.20.04.1See sna…

Multer 文件上傳中間件 和 Busboy表單解析

Multer 是一個node.js中間件&#xff0c;用于處理 multipart/form-data類型的表單數據&#xff0c;主要用于上傳文件。只處理 multipart/form-data 類型的表單數據。 Multer是基于Busboy解析的文件參數信息&#xff0c;獲取fileStream&#xff0c;并通過storage轉存的file.str…

Unity + 雷達 粒子互動(待更新)

效果預覽: 花海(帶移動方向) VFX 實例 腳本示例 使用TouchScript,計算玩家是否移動,且計算移動方向 using System.Collections; using System.Collections.Generic; using TouchScript; using TouchScript.Pointers; using UnityEngine; using UnityEngine.VFX;public …

AI預測福彩3D采取888=3策略+和值012路一縮定乾坤測試6月1日預測第8彈

今天繼續基于8883的大底&#xff0c;使用盡可能少的條件進行縮號。好了&#xff0c;直接上結果吧~ 首先&#xff0c;888定位如下&#xff1a; 百位&#xff1a;6,5,4,7,8,9,1,0 十位&#xff1a;7,8,6,5,9,3,1,0 個位&#xff1a;5,7,6,4,2,…

看廣告賺金幣提現小游戲app開發源碼

開發一個看廣告賺金幣并可以提現的小游戲APP&#xff0c;源碼的搭建涉及到多個方面&#xff0c;包括前端界面設計、后端邏輯處理、數據庫管理以及廣告平臺的對接等。以下是一些建議的步驟和考慮因素&#xff1a; 前端界面設計&#xff1a; 使用HTML5、CSS3和JavaScript等技術…