自定義kafka客戶端消費topic

文章目錄

  • 自定義kafka客戶端消費topic
    • 結論
    • 1 背景
    • 2 spring集成2.1.8.RELEASE版本不支持autoStartup屬性
    • 3 自定義kafka客戶端消費topic
      • 3.1 yml配置
      • 3.2 KafkaConfig客戶端配置
      • 3.3 手動啟動消費客戶端

自定義kafka客戶端消費topic

結論

使用自定義的KafkaConsumer給spring進行管理,之后在注入topic的set方法中,開單線程主動訂閱和讀取該topic的消息。

1 背景

后端服務不需要啟動時就開始監聽消費,而是根據啟動的模塊或者用戶自定義監聽需要監聽或者停止的topic

2 spring集成2.1.8.RELEASE版本不支持autoStartup屬性

使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒有找到可以直接配置屬性autoStartup = "false"來手動啟動topic,可能是版本低的原因,如果有可以支持的版本,也可以打在評論區,我去驗證一下。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.8.RELEASE</version>
</dependency>
@KafkaListener(topics = "<Kafka主題>", autoStartup = "false") 
public void receive(String message) {    // 處理接收到的消息 
}

3 自定義kafka客戶端消費topic

3.1 yml配置

spring:kafka:bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092consumer:group-id: data-devenable-auto-commit: trueauto-offset-reset: latestauto-commit-interval: 1000topic:costomTopic: costomData

3.2 KafkaConfig客戶端配置

kafka其他配置項和原有的kafka客戶端配置一樣,只有額外增加了一個cutomConsumer讓spring來管理,方便手動啟動客戶端來使用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;//    @Value("${spring.kafka.listener.concurrency}")
//    private Integer concurrency;@Value("${spring.kafka.consumer.auto-commit-interval}")private Integer autoCommitInterval;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// concurrencyfactory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}@Beanpublic KafkaConsumer cutomConsumer() {// 新建一個自定義啟動消費者KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs());return consumer;}
}

3.3 手動啟動消費客戶端

這里手動啟動消費客戶端只有在配置了costomTopic才開始啟動,如果需要動態指定啟停topic

@Component
public class CutomKafkaConsumer {// 使用cutomConsumer實例消費@Autowiredprivate KafkaConsumer cutomConsumer;@Value("${spring.kafka.topic.costomTopic:}")public void setCostomTopic(String costomTopic) {// 手動啟動消費類,防止下級模塊默認不配置costomTopic導致啟動報錯if (StringUtils.isEmpty(costomTopic)) {return;}// 使這個消費者訂閱對應話題cutomConsumer.subscribe(Collections.singleton(costomTopic));// 單線程拉取消息ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();consumerExecutor.submit(new Runnable() {@Overridepublic void run() {while (true) {ConsumerRecords<String, String> records = cutomConsumer.poll(3000);if (!records.iterator().hasNext()) {continue;}try {// 捕獲異常,防止頂級消費循環被異常中斷records.forEach(record -> operate(record));} catch (Exception e) {log.error("消費數據失敗,失敗原因: {}", e.getMessage(), e);}// 通過異步的方式提交位移cutomConsumer.commitAsync(((offsets, exception) -> {if (exception == null) {offsets.forEach((topicPartition, metadata) -> {System.out.println(topicPartition + " -> offset=" + metadata.offset());});} else {exception.printStackTrace();// 如果出錯了,同步提交位移cutomConsumer.commitSync(offsets);}}));}}});}
}    public void operate(ConsumerRecord<String, String> record) {log.info("kafkaTwoContainerFactory.operate start. key: {}, value : {}", record.key(), record.value());
}

參考:
Kafka消費者——API開發
Kafka Consumer如何實現精確一次消費數據
Apache Kafka - 靈活控制Kafka消費_動態開啟/關閉監聽實現
@KafkaListener 詳解及消息消費啟停控制
kafka多個消費者消費一個topic_kafka消費者組與重平衡機制,了解一下
kafka學習(五):消費者分區策略(再平衡機制)
Kafka 3.0 源碼筆記(3)-Kafka 消費者的核心流程源碼分析

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/215493.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/215493.shtml
英文地址,請注明出處:http://en.pswp.cn/news/215493.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

人體關鍵點檢測2:Pytorch實現人體關鍵點檢測(人體姿勢估計)含訓練代碼

人體關鍵點檢測2&#xff1a;Pytorch實現人體關鍵點檢測(人體姿勢估計)含訓練代碼 目錄 人體關鍵點檢測2&#xff1a;Pytorch實現人體關鍵點檢測(人體姿勢估計)含訓練代碼 1. 前言 2.人體關鍵點檢測方法 (1)Top-Down(自上而下)方法 (2)Bottom-Up(自下而上)方法&#xff1…

Android - 分區存儲 MediaStore、SAF

官方頁面 參考文章 一、概念 分區存儲&#xff08;Scoped Storage&#xff09;的推出是針對 APP 訪問外部存儲的行為&#xff08;亂建亂獲取文件和文件夾&#xff09;進行規范和限制&#xff0c;以減少混亂使得用戶能更好的控制自己的文件。 公有目錄被分為兩大類&#xff1a;…

會員運營常用的ChatGPT通用提示詞模板

會員體系&#xff1a;如何建立和完善會員體系&#xff1f; 會員等級&#xff1a;如何設定會員等級及權益&#xff1f; 會員留存&#xff1a;如何提高會員留存率&#xff1f; 會員活躍度&#xff1a;如何提高會員活躍度&#xff1f; 會員招募&#xff1a;如何招募新會員&…

ubuntu install sqlmap

refer: https://github.com/sqlmapproject/sqlmap 安裝sqlmap&#xff0c;可以直接使用git 克隆整個sqlmap項目&#xff1a; git clone --depth 1 https://github.com/sqlmapproject/sqlmap.git sqlmap-dev 2.然后進入sqlmap-dev&#xff0c;使用命令&#xff1a; python s…

靜態代理IP搭建步驟,靜態匿名在線代理IP如何使用?

靜態代理搭建步驟 1. 確定需求 在搭建靜態代理之前&#xff0c;需要明確自己的需求&#xff0c;包括代理服務器的位置、訪問速度、匿名性、安全性等方面的要求。 2. 選擇代理服務器提供商 可以選擇自己購買服務器搭建代理&#xff0c;也可以選擇使用云服務提供商的代理服務…

【Python百寶箱】探索強化學習算法的利器:航行在AI之海的羅盤指南

強化學習的工具寶盒&#xff1a;探索各色瑰寶&#xff0c;點亮智能之旅 前言 人工智能和強化學習正成為推動科技進步的重要力量。在這個領域中&#xff0c;使用適當的庫和工具可以加速算法研發和應用部署的過程。本文將深入探索一系列具有代表性的強化學習庫和工具&#xff0…

有趣的數學 用示例來闡述什么是初值問題二

一、示例 解決以下初值問題。 解決這個初始值問題的第一步是找到一個通用的解決方案。為此&#xff0c;我們找到微分方程兩邊的反導數。 即 我們能夠對兩邊進行積分&#xff0c;因為y項是單獨出現的。請注意&#xff0c;有兩個積分常數&#xff1a;C1和C2。求解前面的方程y給出…

電工--半導體器件

目錄 半導體的導電特性 PN結及其單向導電性 二極管 穩壓二極管 雙極型晶體管 半導體的導電特性 本征半導體&#xff1a;完全純凈的、晶格完整的半導體 載流子&#xff1a;自由電子和空穴 溫度愈高&#xff0c;載流子數目愈多&#xff0c;導電性能就愈好 型半導體&…

28. Python Web 編程:Django 基礎教程

目錄 安裝使用創建項目啟動服務器創建數據庫創建應用創建模型設計路由設計視圖設計模版 安裝使用 Django 項目主頁&#xff1a;https://www.djangoproject.com 訪問官網 https://www.djangoproject.com/download/ 或者 https://github.com/django/django Windows 按住winR 輸…

docker build構建報錯:shim error: docker-runc not installed on system

問題&#xff1a; docker構建鏡像時報錯&#xff1a;shim error: docker-runc not installed on system 解決&#xff1a; ln -s /usr/libexec/docker/docker-runc-current /usr/bin/docker-runc

MySQL數據庫——鎖-表級鎖(表鎖、元數據鎖、意向鎖)

目錄 介紹 表鎖 語法 特點 元數據鎖 介紹 演示 意向鎖 介紹 分類 演示 介紹 表級鎖&#xff0c;每次操作鎖住整張表。鎖定粒度大&#xff0c;發生鎖沖突的概率最高&#xff0c;并發度最低。應用在MyISAM、InnoDB、BDB等存儲引擎中。 對于表級鎖&#xff0c;主要…

選擇排序和堆排序

目錄 前言 一.選擇排序 1.思想 2.實現 3.特點 二.堆排序 1.思想 2.實現 3.特點 前言 排序算法是計算機科學中的基礎工具之一&#xff0c;對于數據處理和算法設計有著深遠的影響。了解不同排序算法的特性和適用場景&#xff0c;能夠幫助程序員在特定情況下…

【Go】基于GoFiber從零開始搭建一個GoWeb后臺管理系統(一)搭建項目

前言 最近兩個月一直在忙公司的項目&#xff0c;上班時間經常高強度寫代碼&#xff0c;下班了只想躺著&#xff0c;沒心思再學習、做自己的項目了。最近這幾天輕松一點了&#xff0c;終于有時間 摸魚了 做自己的事了&#xff0c;所以到現在我總算是搭起來一個比較完整的后臺管…

nrfutil工具安裝

準備工作&#xff0c;下載相關安裝包 鏈接&#xff1a;https://pan.baidu.com/s/1LWxhibf8LiP_Cq3sw0kALQ 提取碼&#xff1a;2dlc 解壓后&#xff0c;分別安裝以下安裝包 在C盤下創建目錄nordic_tools&#xff0c;并將nrfutil復制到剛創建的目錄下 環境變量path下添加C:\nor…

圖像采集卡 Xtium?2-XGV PX8支持高速 GigE Vision 工業相機

圖像采集卡&#xff08;Image Capture Card&#xff09;&#xff0c;又稱圖像捕捉卡&#xff0c;是一種可以獲取數字化視頻圖像信息&#xff0c;并將其存儲和播放出來的硬件設備。很多圖像采集卡能在捕捉視頻信息的同時獲得伴音&#xff0c;使音頻部分和視頻部分在數字化時同步…

python elasticsearch 日期聚合

索引以及數據如下 PUT dateagg {"mappings": {"properties": {"charge":{"type": "double"},"types":{"type": "keyword"},"create_date":{"type": "date",&…

裸機單片機適用的軟件架構

單片機通常分為三種工作模式&#xff0c;分別是 1、前后臺順序執行法 2、操作系統 3、時間片輪詢法 1、前后臺順序執行法 利用單片機的中斷進行前后臺切換&#xff0c;然后進行任務順序執行&#xff0c;但其實在…

Spring Boot Web

目錄 一. 概述 二. Spring Boot Web 1.2.1 創建SpringBoot工程&#xff08;需要聯網&#xff09; 1.2.2 定義請求處理類 1.2.3 運行測試 1.3 Web分析 三. Http協議 3.1 HTTP-概述 剛才提到HTTP協議是規定了請求和響應數據的格式&#xff0c;那具體的格式是什么呢? 3…

spring結合設計模式之策略模式

策略模式基本概念&#xff1a; 一個接口或者抽象類&#xff0c;里面兩個方法&#xff08;一個方法匹配類型&#xff0c;一個可替換的邏輯實現方法&#xff09;不同策略的差異化實現(就是說&#xff0c;不同策略的實現類) 使用策略模式替換判斷&#xff0c;使代碼更加優雅。 …

Swagger快速上手

快速開始&#xff1a; 導入maven包 <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version> </dependency><dependency><groupId>io.springfox<…