消費者API

目錄

  • 獨立消費者案例(訂閱主題)
  • 獨立消費者案例(訂閱分區)
  • 消費者組案例

獨立消費者案例(訂閱主題)

在這里插入圖片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

獨立消費者案例(訂閱分區)

在這里插入圖片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消費者組案例

測試同一個主題的分區數據,只能由于一個消費者組中的一個一個消費

消費者1

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消費者2

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消費者3

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

三個消費者的組ID相同,會形成消費者組,每個消費者消費一個分區數據

生產者發送數據

package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",2,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",1,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}

```c
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",0,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/93693.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/93693.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/93693.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C# NX二次開發:操作按鈕控件Button和標簽控件Label詳解

大家好&#xff0c;今天介紹ug二次開發過程中的一個叫操作按鈕的控件&#xff0c;這個控件在塊UI編輯器中可以使用。 ? Button這個控件的屬性和方法如下所示&#xff1a; namespace NXOpen.BlockStyler { public class Label : UIBlock { protected intern…

Vue.prototype 的作用

在 Vue.js 中&#xff0c;Vue.prototype 是用來向所有 Vue 實例添加屬性或方法的機制。通過它添加的屬性或方法可以在所有 Vue 組件實例中通過 this 訪問。主要作用添加全局方法或屬性&#xff1a;可以在所有組件中使用的工具方法或常量擴展 Vue 功能&#xff1a;添加 Vue 本身…

Javaee 多線程 --進程和線程之間的區別和聯系

文章目錄進程和線程進程線程進程和線程的區別創建線程的五種寫法繼承Thread,重寫run實現Runnable(接口)&#xff0c;重寫run繼承Thread,重寫run,但是使用匿名內部類實現Runnable(接口)&#xff0c;重寫run&#xff0c;但是使用匿名內部類使用lambda表達式請說明Thread類中run和…

企業如何讓內部視頻僅限指定域名播放,確保視頻不被泄露?

在數字化辦公時代&#xff0c;企業內部的培訓視頻、產品演示或機密會議錄像等敏感內容&#xff0c;一旦被非法傳播或泄露&#xff0c;可能帶來嚴重的商業風險。如何確保這些視頻只能在公司官網或指定域名播放&#xff0c;防止被惡意下載、盜鏈或二次傳播&#xff1f;今天介紹一…

端口映射原理操作詳解教程:實現外網訪問內網服務,本地路由器端口映射公網ip和軟件端口映射域名2種方法

端口映射作為一種不同網絡間通信的關鍵網絡技術&#xff0c;在遠程訪問和內外網連接服務需求日益增長的如今&#xff0c;理解端口映射的原理和設置方法是確保網絡服務可用性的必要技能。本文將深入探討端口映射的基本概念、路由器端口映射設置步驟以及無公網IP用端口映射軟件映…

【PyTorch】多對象分割項目

對象分割任務的目標是找到圖像中目標對象的邊界。實際應用例如自動駕駛汽車和醫學成像分析。這里將使用PyTorch開發一個深度學習模型來完成多對象分割任務。多對象分割的主要目標是自動勾勒出圖像中多個目標對象的邊界。 對象的邊界通常由與圖像大小相同的分割掩碼定義&#xf…

SSH 使用密鑰登錄服務器

用這種方法遠程登陸服務器的時候無需手動輸入密碼 具體步驟 客戶端通過 ssh-keygen 生成公鑰和私鑰 ssh-keygen -t rsa 生成的時候會有一系列問題&#xff0c;根據自己的需要選擇就行。生成的結果為兩個文件&#xff1a; 上傳公鑰至服務器&#xff0c;上述兩個文件一般在客戶…

MySQL 8.4 企業版啟用TDE功能和表加密

一、系統環境操作系統&#xff1a;Ubuntu 24.04 數據庫:8.4.4-commercial for Linux on x86_64 (MySQL Enterprise Server - Commercial)二、安裝TDE組件前提&#xff1a;檢查組件文件是否存在ls /usr/lib/mysql/plugin/component_keyring_encrypted_file.so1.配置全局清單文件…

【Altium designer】導出的原理圖PDF亂碼異常的解決方法

一、有些電源名字無法顯示或器件丟失 解決辦法 (1)首先AD18以及以上的新版本AD不存在該問題。 (2)其次AD17以及更舊版本的AD很可能遇到該問題,參考如下博客筆記進行操作即可: 大致的操作如下:DXP → Preferences → Schematic → Options里面“Render Text with GDI+”…

4.Ansible自動化之-部署文件到主機

4 - 部署文件到受管主機 實驗環境 先通過以下命令搭建基礎環境&#xff08;創建工作目錄、配置 Ansible 環境和主機清單&#xff09;&#xff1a; # 在控制節點&#xff08;controller&#xff09;上創建web目錄并進入&#xff0c;作為工作目錄 [bqcontroller ~]$ mkdir web &a…

Vuex的使用

Vuex 超詳細使用教程&#xff08;從入門到精通&#xff09;一、Vuex 是什么&#xff1f;Vuex 是專門為 Vue.js 設計的狀態管理庫&#xff0c;它采用集中式存儲管理應用的所有組件的狀態。簡單來說&#xff0c;Vuex 就是一個"全局變量倉庫"&#xff0c;所有組件都可以…

pytorch 數據預處理,加載,訓練,可視化流程

流程定義自定義數據集類定義訓練和驗證的數據增強定義模型、損失函數和優化器訓練循環&#xff0c;包括驗證訓練可視化整個流程模型評估高級功能擴展混合精度訓練?分布式訓練?{:width“50%” height“50%”} 定義自定義數據集類 # #1. 自定義數據集類 # class CustomImageD…

Prompt工程:OCR+LLM文檔處理的精準制導系統

在PDF OCR與大模型結合的實際應用中&#xff0c;很多團隊會發現一個現象&#xff1a;同樣的OCR文本&#xff0c;不同的Prompt設計會產生截然不同的提取效果。有時候準確率能達到95%&#xff0c;有時候卻只有60%。這背后的關鍵就在于Prompt工程的精細化程度。 &#x1f3af; 為什…

RecSys:粗排模型和精排特征體系

粗排 在推薦系統鏈路中&#xff0c;排序階段至關重要&#xff0c;通常分為召回、粗排和精排三個環節。粗排作為精排前的預處理階段&#xff0c;需要在效果和性能之間取得平衡。 雙塔模型 后期融合&#xff1a;把用戶、物品特征分別輸入不同的神經網絡&#xff0c;不對用戶、…

spring聲明式事務,finally 中return對事務回滾的影響

finally 塊中使用 return 是一個常見的編程錯誤&#xff0c;它會&#xff1a; 跳過正常的事務提交流程。吞掉異常&#xff0c;使錯誤處理失效 導致不可預測的事務行為Java 中 finally 和 return 的執行機制&#xff1a;1. finally 塊的基本特性 在 Java 中&#xff0c;finally …

WPF 打印報告圖片大小的自適應(含完整示例與詳解)

目標&#xff1a;在 FlowDocument 報告里&#xff0c;根據 1~6 張圖片的數量&#xff0c; 自動選擇 2 行 3 列 的最佳布局&#xff1b;在只有 1、2、4 張時保持“占滿感”&#xff0c;打印清晰且不變形。規則一覽&#xff1a;1 張 → 占滿 23&#xff08;大圖居中&#xff09;…

【AI大模型前沿】百度飛槳PaddleOCR 3.0開源發布,支持多語言、手寫體識別,賦能智能文檔處理

系列篇章&#x1f4a5; No.文章1【AI大模型前沿】深度剖析瑞智病理大模型 RuiPath&#xff1a;如何革新癌癥病理診斷技術2【AI大模型前沿】清華大學 CLAMP-3&#xff1a;多模態技術引領音樂檢索新潮流3【AI大模型前沿】浙大攜手阿里推出HealthGPT&#xff1a;醫學視覺語言大模…

迅為RK3588開發板Android12 制作使用系統簽名

在 Android 源碼 build/make/target/product/security/下存放著簽名文件&#xff0c;如下所示&#xff1a;將北京迅為提供的 keytool 工具拷貝到 ubuntu 中&#xff0c;然后將 Android11 或 Android12 源碼build/make/target/product/security/下的 platform.pk8 platform.x509…

Day08 Go語言學習

1.安裝Go和Goland 2.新建demo項目實踐語法并使用git實踐版本控制操作 2.1 Goland配置 路徑**&#xff1a;** GOPATH workspace GOROOT golang 文件夾&#xff1a; bin 編譯后的可執行文件 pkg 編譯后的包文件 src 源文件 遇到問題1&#xff1a;運行 ‘go build awesomeProject…

Linux-文件創建拷貝刪除剪切

文章目錄Linux文件相關命令ls通配符含義touch 創建文件命令示例cp 拷貝文件rm 刪除文件mv剪切文件Linux文件相關命令 ls ls是英文單詞list的簡寫&#xff0c;其功能為列出目錄的內容&#xff0c;是用戶最常用的命令之一&#xff0c;它類似于DOS下的dir命令。 Linux文件或者目…