kafka入門(二)

Java客戶端訪問Kafka

引入maven依賴

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka‐clients</artifactId>
<version>2.4.1</version>
</dependency>

消息發送端代碼

package com.tuling.kafka.kafkaDemo;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class MsgProducer {private final static String TOPIC_NAME = "my‐replicated‐topic";public static void main(String[] args) throws InterruptedException, ExecutionException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// props.put(ProducerConfig.ACKS_CONFIG, "1");// props.put(ProducerConfig.RETRIES_CONFIG, 3);// props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// props.put(ProducerConfig.LINGER_MS_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);// 指定發送分區// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));// 未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息發送成功的同步阻塞方法// RecordMetadata metadata = producer.send(producerRecord).get();// System.out.println("同步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"//         + metadata.partition() + "|offset-" + metadata.offset());// 異步回調方式發送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("發送消息失敗:" + exception.getStackTrace());}if (metadata != null) {System.out.println("異步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});// 送積分 TODO}countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}
}

消息接收端代碼

package com.tuling.kafka.kafkaDemo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MsgConsumer {private final static String TOPIC_NAME = "my‐replicated‐topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消費指定分區// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 消息回溯消費/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/// 指定offset消費/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/// 從指定時間點開始消費/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 從1小時前開始消費long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();// 根據消費里的timestamp確定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}/*if (records.count() > 0) {// 手動同步提交offset,當前線程會阻塞直到offset提交成功// 一般使用同步提交,因為提交之后一般也沒有什么邏輯代碼了consumer.commitSync();// 手動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理后面的程序邏輯consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.printl

Spring Boot整合Kafka

引入spring boot kafka依賴

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring‐kafka</artifactId></dependency>

application.yml配置如下:

server:port: 8080spring:kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch-size: 16384buffer-memory: 33554432acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual_immediate

發送者代碼:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my‐replicated‐topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}
}

消費者代碼:

package com.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),* @TopicPartition(topic = "topic2", partitions = "0",* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))* }, concurrency = "6")* // concurrency 就是同組下的消費者個數,就是并發消費數,必須小于等于分區總數* @param record*/@KafkaListener(topics = "my‐replicated‐topic", groupId = "zhugeGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);// 手動提交offsetack.acknowledge();}/* // 配置多個消費組@KafkaListener(topics = "my‐replicated‐topic", groupId = "tulingGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);ack.acknowledge();} */
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/81760.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/81760.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/81760.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python----目標檢測(PASCAL VOC數據集)

一、PASCAL VOC數據集 PASCAL VOC&#xff08;Visual Object Classes&#xff09;數據集是計算機視覺領域中廣泛使用的一個 標準數據集&#xff0c;用于目標檢測、圖像分割、圖像分類、動作識別等任務。該數據集由 PASCAL&#xff08;Pattern Analysis, Statistical Modelling …

mariadb 升級 (通過yum)

* 注意下 服務名, 有的服務器上是mysql,有的叫mariadb,mysqld的 #停止 systemctl stop mysql #修改源 vi /etc/yum.repos.d/MariaDB.repo baseurl http://yum.mariadb.org/11.4/centos7-amd64 #卸載 yum remove mysql #安裝 yum install MariaDB-server galera-4 MariaDB-…

vuejs處理后端返回數字類型精度丟失問題

標題問題描述 后端返回數據有5.00和3.30這種數據&#xff0c;但是前端展示的時候返回對應分別為5和3.0&#xff0c;小數點后0都丟失了。 接口返回數據展示network-Response&#xff1a; 接口返回數據展示network-Preview&#xff1a; 錯誤數據效果展示 發現問題 瀏覽器接口…

ubuntu kubeasz 部署高可用k8s 集群

ubuntu kubeasz 部署高可用k8s 集群 測試環境主機列表軟件清單kubeasz 部署高可用 kubernetes配置源配置host文件安裝 ansible 并進行 ssh 免密登錄:下載 kubeasz 項?及組件部署集群部署各組件開始安裝修改 config 配置文件增加 master 節點增加 kube_node 節點登錄dashboard…

IDEA2025版本使用Big Data Tools連接Linux上Hadoop的HDFS

目錄 Windows的準備 1. 將與Linux上版本相同的hadoop壓縮包解壓到本地 ?編輯2.設置$HADOOP HOME環境變量指向:E:\hadoop-3.3.4 3.下載hadoop.dll和winutils.exe文件 4.將hadoop.dll和winutils.exe放入$HADOOP HOME/bin中 IDEA中操作 1.下載Big Data Tools插件 2.添加并連…

Java轉Go日記(三十九):Gorm查詢

1.1.1. 查詢 // 獲取第一條記錄&#xff0c;按主鍵排序db.First(&user)SELECT * FROM users ORDER BY id LIMIT 1;// 獲取最后一條記錄&#xff0c;按主鍵排序db.Last(&user)SELECT * FROM users ORDER BY id DESC LIMIT 1;// 獲取所有記錄db.Find(&users)SELECT *…

bisheng系列(二)- 本地部署(前后端)

一、導讀 環境&#xff1a;Ubuntu 24.04、open Euler 23.03、Windows 11、WSL 2、Python 3.10 、bisheng 1.1.1 背景&#xff1a;需要bisheng二開商用&#xff0c;故而此處進行本地部署&#xff0c;便于后期調試開發 時間&#xff1a;20250519 說明&#xff1a;bisheng前后…

5G金融互聯:邁向未來金融服務的極速與智能新時代

5G金融互聯:邁向未來金融服務的極速與智能新時代 大家好,我是Echo_Wish,今天咱們聊聊一個大家都十分關心的話題:5G網絡在金融服務中的應用。咱們平時可能覺得5G只是打個電話、刷個視頻更流暢了,但在金融服務領域,5G的低延時、大帶寬和高可靠性正在悄然改變整個游戲規則。…

UE5 GAS框架解析內部數據處理機制——服務器與客戶端

當&#xff0c; gas通過點擊鼠標光標觸發事件時&#xff0c;內部的處理機制。 當通過點擊事件&#xff0c;命中中目標時&#xff0c; 可獲取到對應的TargetData 目標數據。處理相應的操作。 僅有本地的客戶端的情況下。命中并不會有什么異常。 當存在服務器時&#xff0c; 服…

Golang的Web應用架構設計

# Golang的Web應用架構設計 介紹 是一種快速、高效、可靠的編程語言&#xff0c;它在Web應用開發中越來越受歡迎。Golang的Web應用架構設計通常包括前端、后端和數據庫三個部分。在本篇文章中&#xff0c;我們將詳細介紹Golang的Web應用架構設計及其組成部分。 前端 在Golang的…

對比 HTTP-REST 與 gRPC:各自的優缺點以及適用的場景

文章目錄 對比 HTTP-REST 與 gRPC&#xff1a;各自的優缺點以及適用的場景HTTP-REST 與 gRPC 的核心區別gRPC 的優缺點HTTP-REST 的優缺點適用場景 模糊點什么是 Protobuf&#xff1f;HTTP/2 會將 HTTP 消息拆分并封裝為二進制幀&#xff0c;那還能過使用 HTTP/2 構建 RESTful …

現代健康生活養生指南

現代社會中&#xff0c;熬夜加班、久坐不動、飲食不規律成為許多人的生活常態&#xff0c;由此引發的健康問題也日益增多。想要擺脫亞健康&#xff0c;不必依賴中醫理念&#xff0c;從以下這些現代科學養生方法入手&#xff0c;就能逐步改善身體狀況。? 飲食上&#xff0c;注…

Go語言數組的定義與操作 - 《Go語言實戰指南》

在 Go 語言中&#xff0c;數組&#xff08;Array&#xff09; 是一種定長、同類型的集合。它在內存中是連續分布的&#xff0c;適合用于性能敏感的場景。 一、數組的定義 數組的基本語法如下&#xff1a; var 數組名 [長度]元素類型 示例&#xff1a; var nums [5]int …

Helm Chart 中配置多個 Docker Registry 地址以實現備用訪問

在 Helm Chart 中配置多個 Docker Registry 地址以實現備用訪問&#xff0c;可以通過以下幾種方式實現&#xff1a; 1. 在 values.yaml 中定義多個 Registry 在 values.yaml 中定義主 Registry 和備用 Registry&#xff0c;以便在部署時靈活切換&#xff1a; # values.yaml …

云原生安全:錯誤策略S3存儲桶ACL設置為Everyone:FullControl

??「炎碼工坊」技術彈藥已裝填! 點擊關注 → 解鎖工業級干貨【工具實測|項目避坑|源碼燃燒指南】 ——從基礎到實踐的深度解析 1. 基礎概念 S3存儲桶與ACL Amazon S3(Simple Storage Service)是AWS提供的對象存儲服務,支持存儲和檢索任意規模的數據。ACL(訪問控制列表…

.NET 8 kestrel 配置PEM,實現內網https

一、生成證書 mkcert 是一個簡單的工具&#xff0c;用于制作本地信任的開發證書。它不需要配置。 mkcert官方倉庫地址&#xff1a;GitHub - FiloSottile/mkcert: A simple zero-config tool to make locally trusted development certificates with any names youd like. 簡…

nodejs快速入門到精通1

參考 nodejs快速入門到精通 菜鳥教程-nodejs nodejs官方文檔 原因 視頻免費 資料收費 筆記還是自己寫吧 安裝 nodejs官網 windows下&#xff1a; #查看nodejs版本 node -v #查看npm版本 npm -v #設置npm為淘寶鏡像源 npm config set registry https://registry.npmmirror.…

nginx負載均衡及keepalive高可用

實驗前期準備&#xff1a; 5臺虛擬機&#xff1a;4臺當做服務器&#xff0c;1臺當做客戶機&#xff08;當然&#xff0c;也可以使用主機的瀏覽器&#xff09;&#xff0c;4臺服務器中&#xff0c;2臺服務器當做后端真實訪問服務器&#xff1b;另外2臺服務器當做負載均衡服務器…

go語法大賞

前些日子單機房穩定性下降&#xff0c;找了好一會才找到真正的原因。這里面涉及到不少go語法細節&#xff0c;正好大家一起看一下。 一、仿真代碼 這是仿真之后的代碼 package mainimport ("fmt""go.uber.org/atomic""time" )type StopSignal…

Android 14 解決打開app出現不兼容彈窗的問題

應用安裝到 Android 14 上&#xff0c;出現如下提示 This app isn’t compatible with the latest version of Android. Check for an update or contact the app’s developer. 通過源碼找原因。 提示的字符 根據字符找到 ./frameworks/base/core/res/res/values/strings.xm…