如何使用@KafkaListener實現從nacos中動態獲取監聽的topic

1、簡介

? ? ? ? 對于經常需要變更kafka主題的場景,為了實現動態監聽topic的功能,可以使用以下方式。

2、使用步驟
2.1、添加依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
2.2、nacos中配置
    # kafka 配置
spring:kafka:bootstrap-servers: ip地址:9092topics: topic1,tpic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerenable-idempotence: trueacks: alltransactional-id: kafka-groupconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: kafka-clickhouse-groupauto-offset-reset: latestenable-auto-commit: falseisolation-level: read_committedallow-auto-create-topics: truelistener:ack-mode: MANUAL_IMMEDIATEconcurrency: 3
2.3、配置類
package org.aecsi.kafkadatatock.config;import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Configuration
@RequiredArgsConstructor
@EnableKafka
@RefreshScope
public class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaAdmin kafkaAdmin() {return new KafkaAdmin(kafkaProperties.buildAdminProperties());}@Beanpublic AdminClient adminClient(KafkaAdmin kafkaAdmin) {return AdminClient.create(kafkaAdmin.getConfigurationProperties());}@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildProducerProperties());configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-clickhouse-producer");DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps);factory.setTransactionIdPrefix("kafka-clickhouse-producer-");return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}@Bean@RefreshScopepublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildConsumerProperties());configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaTransactionManager<String, String> transactionManager) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setTransactionManager(transactionManager);return factory;}@Bean@RefreshScopepublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setAutoStartup(true);return factory;}@Beanpublic ApplicationRunner kafkaListenerStarter(KafkaListenerEndpointRegistry registry) {return args -> {// 啟動所有 Kafka 監聽器registry.start();};}
}

接收消息類

@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", autoStartup = "false")@Transactional(transactionManager = "transactionManager")public void processMessage(ConsumerRecord<String, String> record,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {try {log.info("kafka 接受 topic: {} 消息", topic);
//          處理消息acknowledgment.acknowledge();} catch (Exception e) {log.error("Error processing message for topic {}: {}", topic, e.getMessage());throw e;}}

?主啟動類添加一個注解

@EnableConfigurationProperties(KafkaProperties.class)
3、總結

? ? ? 實現kafka動態獲取topic還有其他方式,博主目前只驗證這一種,其他方式待更新。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/903014.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/903014.shtml
英文地址,請注明出處:http://en.pswp.cn/news/903014.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《TCP/IP詳解 卷1:協議》之第七、八章:Ping Traceroute

目錄 一、ICMP回顯請求和回顯應答 1、ICMP回顯請求 2、ICMP回顯應答 二、ARP高速緩存 三、IP記錄路由選項&#xff08;Record Route&#xff0c;RR&#xff09; 1、記錄路由選項的工作過程 2、RR 選項的 IP 頭部格式 2.1、RR 請求 2.2、RR響應 四、ping 的去返路徑 五…

30天通過軟考高項-第四天

30天通過軟考高項-第四天 任務&#xff1a;項目進度管理 思維導圖閱讀 知識點集錦閱讀 知識點記憶 章節習題練習 知識點練習 手寫回憶ITTO 聽一遍喜馬拉雅關于范圍的內容 進度管理-背 1. 過程定義 龜腚排池至控 規劃進度管理&#xff1a;為了規劃、編制、管理…

根據JSON動態生成表單表格

根據JSON動態生成表單表格 一. 子組件 DynamicFormTable.vue1,根據JSON數據動態生成表單表格,支持表單驗證JS部分1.1,props數據1.2,表單數據和數據監聽1.3,自動驗證1.4,表單驗證1.5,獲取表單數據1.6,事件處理1.7,暴露方法給父組件2,HTML部分二,父組件1, 模擬數據2,…

【趙渝強老師】快速上手TiDB數據庫

從TiDBv4.0起&#xff0c;提供了包管理工具TiUP&#xff0c;負責管理TiDB、PD、TiKV等組件。用戶只需通過TiUP命令即可運行這些組件&#xff0c;顯著降低了管理難度。TiUP程序只包含少數幾個命令&#xff0c;用來下載、更新、卸載組件。TiUP通過各種組件來擴展其功能。組件是一…

springboot入門-DTO數據傳輸層

在 Spring Boot 應用中&#xff0c;DTO&#xff08;Data Transfer Object&#xff0c;數據傳輸對象&#xff09; 是專門用于在不同層&#xff08;如 Controller 層、Service 層、外部系統&#xff09;之間傳輸數據的對象。它的核心目的是解耦數據模型和業務邏輯&#xff0c;避免…

安裝docker,在docker上安裝mysql,docker上安裝nginx

目錄 一.安裝docker 1.1查看Linux版本的命令這里推薦兩種&#xff1a; 1.2查看內核版本有三種方式&#xff1a; 2.安裝 2.1 如果之前安裝了docker&#xff0c;先刪除舊版本的doker 2.2 安裝需要的軟件包&#xff0c;yum-util提供yum-config-manager功能&#xff0c;另外兩…

Android killPackageProcessesLSP 源碼分析

該方法用于終止指定包名/用戶ID/應用ID下符合條件的應用進程&#xff0c;涉及多進程管理、資源凍結、進程清理及優先級更新等操作。核心流程分為進程篩選、資源凍結、進程終止與資源恢復三個階段。 /*** 從已排序的進程列表中&#xff0c;提取從指定起始索引 startIdx 開始的連…

openAICEO山姆奧特曼未來預測雄文之三個觀察

《三個觀察》 山姆奧特曼 這篇文章主要講的是關于AGI&#xff08;人工通用智能&#xff09;的未來發展及其對社會的影響&#xff0c;用大白話總結如下&#xff1a; 核心觀點&#xff1a; AGI是什么&#xff1f; AGI是一種能像人類一樣解決各種復雜問題的智能系統&#xff0c;比…

部署yolo到k230教程

訓練&#xff1a;K230 借助 AICube部署AI 視覺模型 YOLO等教程_嘉楠 ai cube多標簽分類-CSDN博客K230模型訓練ai cube報錯生成部署文件異常_aicube部署模型顯示生成部署文件異常-CSDN博客 部署&#xff1a; # 導入必要的庫和模塊 import os import ujson # 超快的JS…

Flask 應用封裝成 Docker 服務的完整技術指南

一、實現原理 容器化核心邏輯 Docker 通過將應用代碼、運行環境和依賴項打包成鏡像&#xff0c;實現環境一致性。Flask 應用容器化需包含&#xff1a; Python 基礎運行環境項目代碼及依賴庫&#xff08;requirements.txt&#xff09;WSGI服務器&#xff08;如 Gunicorn&#xf…

windows上的 Vmware Workstation 環境搭建

本文的視頻版本&#xff1a;https://www.bilibili.com/video/BV1JhLRzyESh Vmware Workstation 是一款跨平臺的桌面級虛擬化軟件&#xff0c;可以使用 Vmware 創建虛擬機&#xff0c;我們一般使用 Linux 虛擬機&#xff08;目前主流的 Linux 發行版是 Ubuntu&#xff09;&…

Linux下終端命令行安裝常見字體示例

一、準備工作&#xff1a; 準備好要安裝的字體文件&#xff0c;如宋體、微軟雅黑&#xff08;simsun.ttc、msyh.ttc)。進入字體路徑&#xff1a; /usr/share/fonts&#xff0c;使用root權限&#xff0c;新建一個目錄shell_fonts。 二、命令行安裝字體&#xff1a; 將要安裝…

CentOS中在線安裝Docker(超詳細)

1&#xff09;檢查安裝docker的基本要求&#xff1a; 64位CPU架構的計算機&#xff0c;目前不支持32為CPU架構的計算機 系統的Linux內核版本為3.10及以上 開啟CGroups和namespace功能 2&#xff09;使用命令查看當前系統的內核版本 [rootlocalhost ~]# uname -r 3.10.0-862…

武漢昊衡科技OLI光纖微裂紋檢測儀:高密度光器件的精準守護者

隨著AI技術應用越來越廣&#xff0c;算力需求激增&#xff0c;光通信系統正加速向小型化、高密度、多通道方向演進。硅光芯片、高速光模塊等核心器件內部的光纖通道數量成倍增加&#xff0c;波導結構愈發精細&#xff0c;傳統檢測手段因分辨率不足、效率低下&#xff0c;難以精…

Java數據結構——Stack

Stack 棧的概念和使用棧的概念棧的使用 棧的應用出棧元素序列有效的括號棧的壓入、彈出序列逆波蘭表達式最小棧 棧的概念和使用 棧的概念 棧(Stack)&#xff1a;一種特殊的線性表&#xff0c;只允許再棧的一端進行插入和刪除元素&#xff0c;這一端點被稱為棧頂&#xff0c;另…

神經網絡與計算機視覺

2016 年,隨著 AlphaGo 在圍棋比賽中擊敗李世石,“人工智能”、“神經網絡”、“深度 學習”等字眼便越來越多的出現在大眾眼前,智能化好像成為一種不可逆轉的趨勢,帶給大家新奇感的同時也帶來了一絲憂懼:在不遠的未來,機器是否真的擁有思維和情感?《終結者》中天網大戰人…

VS2019 與gitcode團隊管理

1、安裝git 點擊下一步安裝即可 2、vs2019連接gitcode 然后更改本地的代碼添加文件等都可以進行遠程同步操作了

Python類和對象四(十三)

魔法方法&#xff1a; 按位運算 按位于運算 只要相同才是1 或運算&#xff1a; 只要某個位是1結果就是1 、 按位非 將結果取反 按位異或&#xff1a; 左移和右移運算符&#xff1a; 右移兩位 右移動n位&#xff0c;就是除以2的n次方 左移兩位&#xff1a; 左移n位就是乘…

如何設置極狐GitLab 議題截止日?

極狐GitLab 是 GitLab 在中國的發行版&#xff0c;關于中文參考文檔和資料有&#xff1a; 極狐GitLab 中文文檔極狐GitLab 中文論壇極狐GitLab 官網 截止日期 (BASIC ALL) 可以在議題中使用截止日期&#xff0c;來跟蹤截止日期并確保功能按時交付。用戶至少需要報告者權限才…

如何在 Conda 環境中降級 Python 版本:詳細指南

如何在 Conda 環境中降級 Python 版本&#xff1a;詳細指南 Python 版本的管理在開發過程中至關重要&#xff0c;特別是在處理不同項目需求時。對于使用 Conda 環境的 Python 程序員來說&#xff0c;版本管理不僅僅是安裝不同的 Python 版本&#xff0c;還涉及到依賴關系的兼容…