基于Kafka實現動態監聽topic功能

生命無罪,健康萬歲,我是laity。

我曾七次鄙視自己的靈魂:

第一次,當它本可進取時,卻故作謙卑;

第二次,當它在空虛時,用愛欲來填充;

第三次,在困難和容易之間,它選擇了容易;

第四次,它犯了錯,卻借由別人也會犯錯來寬慰自己;

第五次,它自由軟弱,卻把它認為是生命的堅韌;

第六次,當它鄙夷一張丑惡的嘴臉時,卻不知那正是自己面具中的一副;

第七次,它側身于生活的污泥中,雖不甘心,卻又畏首畏尾。

基于Kafka實現動態監聽topic功能

業務場景:導條根據各家接口進行數據分發其中包含動態kafka-topic,各家通過監聽topic實現獲取數據從而實現后續業務。

實現邏輯

pom

yaml 方案1 接收的是String

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以設置多個,以逗號分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以設置多個,以逗號分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@EnableKafka
@Configuration
public class KafkaConfig {// 解決 Could not create message listener - MessageHandlerMethodFactory not set  TODO:WWS 不好使/*@Beanpublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());return processor;}*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new DefaultKafkaConsumerFactory<String, String>(map);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(5);// new DefaultMessageHandlerMethodFactory()return factory;}// implements KafkaListenerConfigurer + 解決 Could not create message listener - MessageHandlerMethodFactory not set/*@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());}*/
}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/*** @author laity*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {private final MyComponent component;public KafkaListenerController(MyComponent component) {this.component = component;}private String topic;// 用于接收導條分發數據接口@PostMapping("/reception")@PermitAllpublic CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {// …… 業務邏輯// 去執行 監聽固定的topiccomponent.startListening(vo.getGzTopicName());return CommonResult.success(true);}
}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;/*** @author laity 動態管理Kafka監聽器*/
@Service
public class DynamicKafkaListenerService {private final KafkaListenerEndpointRegistry registry;private final ConcurrentKafkaListenerContainerFactory<String, String> factory;@Autowiredpublic DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {this.registry = registry;this.factory = factory;}public void addListener(String topic, String groupId, Object bean, Method method) {if (AopUtils.isAopProxy(bean)) {try {bean = ((Advised) bean).getTargetSource().getTarget();} catch (Exception e) {throw new RuntimeException(e);}}MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();assert bean != null;endpoint.setBean(bean);endpoint.setMethod(method);endpoint.setTopics(topic);endpoint.setGroup(groupId);endpoint.setId(method.getName() + "_" + LocalDateTime.now());endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么點都點不出來這個屬性 突然又出來了……無語registry.registerListenerContainer(endpoint, factory, true); // 指定容器工廠}public void removeListener(String beanName) {// 斷言Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();registry.unregisterListenerContainer(beanName);}
}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;/*** @author laity*/
@Component
public class BlueKafkaConsumer {// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(Object record) {System.out.println("======================= 接收動態KafkaTopics Received message ========================");System.out.println(record.toString());}}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.lang.reflect.Method;/*** @author laity*/
@Component
public class MyComponent {private final DynamicKafkaListenerService kafkaListenerService;private final BlueKafkaConsumer blueKafkaConsumer;@Autowiredpublic MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {this.kafkaListenerService = kafkaListenerService;this.blueKafkaConsumer = blueKafkaConsumer;}public void startListening(String topic) {try {Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);} catch (NoSuchMethodException e) {throw new RuntimeException(e);}}public void stopListening(String beanName) {kafkaListenerService.removeListener(beanName);}// init@PostConstruct // 這個是服務啟動時調用 但我想要的時實時可變的public void init() {}}

世界上最可貴的兩個詞,一個叫認真,一個叫堅持,認真的人改變自己,堅持的人改變命運,有些事情不是看到了希望才去堅持,而是堅持了才有希望。我是Laity,正在前進的Laity。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/90784.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/90784.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/90784.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

機械學習初識--什么是機械學習--機械學習有什么重要算法

一、什么是機械學習機器學習&#xff08;Machine Learning&#xff09;是人工智能&#xff08;AI&#xff09;的一個重要分支&#xff0c;它使計算機能夠通過數據自動學習規律、改進性能&#xff0c;并在沒有明確編程的情況下完成特定任務。其核心思想是讓機器從數據中 “學習”…

普通大學生大三這一年的想法

目錄 大三期間的經歷與反思 公益活動&#xff1a;社會責任感的體現 比賽&#xff1a;個人成長的助推器 培訓與思想提升 大學教育的本質與人才培養 構建自我的道與未來規劃 大學教育的未來與個人定位 結語 大三期間的經歷與反思 大三&#xff0c;大學生活的分水嶺&#…

Python——入門

目錄 變量 變量類型 動態類型 注釋 輸出輸入 運算符 算術運算符 關系運算符 邏輯運算符 賦值運算符 條件語句 循環語句 函數 函數作用域 函數嵌套調用 函數默認參數 關鍵字參數 列表 切片 列表遍歷 新增元素 查找元素 刪除元素 列表拼接 元組…

華為榮耀部分機型從鴻蒙降回EMUI的一種方法

一、準備說明 1、這里介紹使用華為手機助手、海外代理軟件結合固件將部分華為榮耀手機鴻蒙系統降級回EMUI系 統的一種方式&#xff1b; 2、需要降級的手機需要再出廠時內置系統為EMUI&#xff0c;出廠時為鴻蒙系統的無法進行降級操作&#xff1b; 3、降級有風險&#xff0…

maven <dependencyManagement>標簽的作用

作用 dependencyManagement標簽的作用&#xff1a;在父工程pom文件中聲明依賴&#xff0c;但不引入&#xff1b;在子工程中用到聲明的依賴時&#xff0c;可以不加依賴的版本號&#xff0c;這樣可以統一管理工程中用到的依賴版本。 示例 先創建一個項目 dependencyManagement-de…

JSON格式化與結構對比

說明 功能格式化json字符串為最簡格式&#xff0c;并標識值類型&#xff1b;比對json字符串結構。第三方依賴fastjson: 用于解析json、判斷json值類型&#xff1b;springframework自帶的字符串判斷&#xff0c;可以不依賴該方法&#xff0c;改為自行實現&#xff1b;slf4j: 用于…

編程與數學 03-002 計算機網絡 03_物理層基礎

編程與數學 03-002 計算機網絡 03_物理層基礎一、物理層的作用與任務&#xff08;一&#xff09;傳輸媒體的類型&#xff08;二&#xff09;信號的傳輸方式二、數據編碼技術&#xff08;一&#xff09;數字數據的數字信號編碼&#xff08;二&#xff09;模擬數據的數字信號編碼…

c語言--文件操作

思維導圖:1. 為什么使用文件&#xff1f; 如果沒有文件&#xff0c;我們寫的程序的數據是存儲在電腦的內存中&#xff0c;如果程序退出&#xff0c;內存回收&#xff0c;數據就丟失了&#xff0c;等再次運?程序&#xff0c;是看不到上次程序的數據的&#xff0c;如果要將數據進…

SQL中的占位符、@Param注解和方法參數

代碼中出現的多個 username 和 password 代表不同層面的變量&#xff0c;具體含義如下&#xff08;按執行順序&#xff09;&#xff1a;### 1. Param("username") String username - 位置 &#xff1a;方法參數前的注解 - 作用 &#xff1a;- Param("username&q…

【SpringAI實戰】FunctionCalling實現企業級自定義智能客服

一、前言 二、實現效果 三、代碼實現 3.1 后端實現 3.2 前端實現 一、前言 Spring AI詳解&#xff1a;【Spring AI詳解】開啟Java生態的智能應用開發新時代(附不同功能的Spring AI實戰項目)-CSDN博客 二、實現效果 一個24小時在線的AI智能客服&#xff0c;可以給用戶提供培…

kotlin基礎【2】

變量類型var 和 val 的核心區別&#xff1a;關鍵字含義能否重新賦值類似概念&#xff08;Java&#xff09;varvariable&#xff08;可變變量&#xff09;可以普通變量&#xff08;無 final&#xff09;valvalue&#xff08;不可變變量&#xff09;不可以被 final 修飾的變量var…

【Spring AI】阿里云DashScope靈積模型

DashScope&#xff08;靈積模型&#xff09;是阿里云提供的大模型服務平臺&#xff0c;集成了阿里自研的 通義千問&#xff08;Qwen&#xff09;系列大語言模型&#xff08;LLM&#xff09;以及多模態模型&#xff0c;為企業與開發者提供開箱即用的 AI 能力。官網地址 https://…

Rust Web框架性能對比與實戰指南

Rust Actix Web Rust Web 框架的實用對比分析 以下是 Rust Web 框架的實用對比分析,涵蓋主要框架(如 Actix-web、Rocket、Warp、Axum 等)的常見使用場景示例,按功能分類整理: 基礎路由設置 Actix-web use actix_web::{get, App, HttpResponse, HttpServer, Responder}…

【解決vmware ubuntu不小心刪boot分區,進不去系統】

如果仍然提示 Unable to locate package testdisk&#xff0c;有可能是源中不包含該工具&#xff08;LiveCD 使用的是“最小環境”&#xff09;。 &#x1fa9b; 解決方法&#xff1a;切換到國內完整軟件源&#xff08;推薦&#xff09; 編輯 sources.list&#xff1a; sudo na…

04-netty基礎-Reactor三種模型

1 基本概念Reactor模型是一種事件驅動&#xff08;Event-Driven&#xff09;的設計模式&#xff0c;主要用于高效處理高并發、I/O密集型場景&#xff08;如網絡、服務器、分布式等&#xff09;。其核心思想就是集中管理事件&#xff0c;將I/O操作與業務邏輯解耦&#xff0c;避免…

踩坑無數!NFS服務從入門到放棄再到真香的血淚史

前言 說起NFS&#xff0c;我估計很多搞運維的兄弟都有一肚子話要說。這玩意兒吧&#xff0c;看起來簡單&#xff0c;用起來坑多&#xff0c;但是真正搞明白了又覺得挺香的。 前幾天有個朋友問我&#xff0c;說他們公司要搭建一個文件共享系統&#xff0c;問我推薦什么方案。我…

矩陣譜分解的證明及計算示例

1. 矩陣譜分解的條件矩陣的譜分解&#xff08;也稱為特征分解&#xff09;是將一個矩陣分解為一系列由其特征向量和特征值構成的矩陣乘積的過程。進行譜分解的前提條件包括&#xff1a;<1.> 矩陣是可對角化的&#xff08;Diagonalizable&#xff09;&#xff0c;即矩陣存…

Leetcode 07 java

169. 多數元素 給定一個大小為 n 的數組 nums &#xff0c;返回其中的多數元素。 多數元素是指在數組中出現次數 大于 ? n/2 ? 的元素。 你可以假設數組是非空的&#xff0c;并且給定的數組總是存在多數元素。 示例 1&#xff1a; 輸入&#xff1a;nums [3,2,3] 輸出&a…

CS231n-2017 Lecture6訓練神經網絡(一)筆記

本節主要講的是模型訓練時的算法設計數據預處理&#xff1a;關于數據預處理&#xff0c;我們有常用的3個符號&#xff0c;數據矩陣X&#xff0c;假設其尺寸是&#xff0c;N是數據樣本的數量&#xff0c;D是數據的維度均值減法(Mean subtraction)&#xff1a;是預處理最常用的形…

C++ 中實現 `Task::WhenAll` 和 `Task::WhenAny` 的兩種方案

&#x1f4da; C 中實現 Task::WhenAll 和 Task::WhenAny 的兩種方案 引用&#xff1a; 拈朵微笑的花 想一番人世變換 到頭來輸贏又何妨日與夜互消長 富與貴難久長 今早的容顏老於昨晚C 標準庫異步編程示例&#xff08;一&#xff09;C TAP&#xff08;基于任務的異步編程…