還在重啟應用改 Topic?Spring Boot 動態 Kafka 消費的“終極形態”

圖片

場景描述:
你的一個微服務正在穩定地消費 Kafka 的?order_topic。現在,上游系統為了做業務隔離,新增加了一個?order_topic_vip,并開始向其中投遞 VIP 用戶的訂單。你需要在不重啟、不發布新版本的情況下,讓你現有的消費者同時開始消費?order_topic_vip?的消息。

這是一個典型的動態運維需求。靜態的?@KafkaListener(topics = "order_topic")?注解無法滿足這個要求。本文將提供一套完整的解決方案,教你如何利用配置中心(以 Nacos 為例)和 Spring Kafka 的底層 API,實現消費者 Topic 列表的“熱更新”。

1. 核心原理:銷毀并重建 (Destroy and Rebuild)

Spring Kafka 的消費者容器 (MessageListenerContainer) 在創建時,其核心配置(如監聽的 Topic)就已經確定。在運行時直接修改一個正在運行的容器的 Topic 列表,是一種不被推薦且存在風險的操作。

最穩健、最可靠的方案是:

  1. 1.?停止注銷監聽舊 Topic 的消費者容器。

  2. 2. 根據原始的消費者配置和新傳入的 Topic 列表,以編程方式創建一個全新的消費者容器。

  3. 3.?啟動這個新的容器。

整個過程對外界來說是“無感”的,最終效果就是消費者監聽的 Topic 列表發生了變化。

2. 方案架構

要實現上述流程,我們需要三個關鍵組件:

  1. 1.?元數據采集器 (BeanPostProcessor):?在應用啟動時,掃描并緩存所有?@KafkaListener?的“配置藍圖”(包括?id,?groupId, 原始?topics?等)。

  2. 2.?配置中心 (Nacos):?作為動態 Topic 配置的“真理之源”。

  3. 3.?動態刷新服務:?監聽 Nacos 的配置變更,并調用 Spring Kafka 的?KafkaListenerEndpointRegistry?API 來完成“銷毀并重建”的操作。

3. 完整代碼實現

這是一個可以直接集成的、完整的解決方案代碼。

步驟 3.1: 定義元數據存儲

EndpointMetadata.java

package?com.example.kafka.dynamic.core;import?java.io.Serializable;
import?java.lang.reflect.Method;// 用于存儲 @KafkaListener 的“藍圖”
public?class?EndpointMetadata?implements?Serializable?{private?String id;private?String groupId;private?String[] topics;private?Object bean;private?Method method;// ... 可按需添加 concurrency, autoStartup 等其他屬性// Getters and Setters...public?String?getId()?{?return?id; }public?void?setId(String id)?{?this.id = id; }public?String?getGroupId()?{?return?groupId; }public?void?setGroupId(String groupId)?{?this.groupId = groupId; }public?String[] getTopics() {?return?topics; }public?void?setTopics(String[] topics)?{?this.topics = topics; }public?Object?getBean()?{?return?bean; }public?void?setBean(Object bean)?{?this.bean = bean; }public?Method?getMethod()?{?return?method; }public?void?setMethod(Method method)?{?this.method = method; }
}

KafkaListenerMetadataRegistry.java?(元數據采集與注冊)

package?com.example.kafka.dynamic.processor;import?com.example.kafka.dynamic.core.EndpointMetadata;
import?org.springframework.aop.support.AopUtils;
import?org.springframework.beans.BeansException;
import?org.springframework.beans.factory.config.BeanPostProcessor;
import?org.springframework.core.annotation.AnnotationUtils;
import?org.springframework.kafka.annotation.KafkaListener;
import?org.springframework.stereotype.Component;import?java.lang.reflect.Method;
import?java.util.Map;
import?java.util.concurrent.ConcurrentHashMap;@Component
public?class?KafkaListenerMetadataRegistry?implements?BeanPostProcessor?{private?final?Map<String, EndpointMetadata> metadataStore =?new?ConcurrentHashMap<>();@Overridepublic?Object?postProcessAfterInitialization(Object bean, String beanName)?throws?BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);for?(Method method : targetClass.getMethods()) {KafkaListener?kafkaListener?=?AnnotationUtils.findAnnotation(method, KafkaListener.class);if?(kafkaListener !=?null?&& kafkaListener.id() !=?null?&& !kafkaListener.id().isEmpty()) {EndpointMetadata?metadata?=?new?EndpointMetadata();metadata.setId(kafkaListener.id());metadata.setTopics(kafkaListener.topics());metadata.setGroupId(kafkaListener.groupId());metadata.setBean(bean);metadata.setMethod(method);metadataStore.put(kafkaListener.id(), metadata);}}return?bean;}public?EndpointMetadata?getMetadata(String listenerId)?{return?metadataStore.get(listenerId);}
}
步驟 3.2: 核心實現:動態刷新服務

DynamicKafkaConsumerService.java

package?com.example.kafka.dynamic.service;import?com.alibaba.nacos.api.config.ConfigService;
import?com.alibaba.nacos.api.config.listener.Listener;
import?com.example.kafka.dynamic.core.EndpointMetadata;
import?com.example.kafka.dynamic.processor.KafkaListenerMetadataRegistry;
import?com.fasterxml.jackson.core.type.TypeReference;
import?com.fasterxml.jackson.databind.ObjectMapper;
import?jakarta.annotation.PostConstruct;
import?org.slf4j.Logger;
import?org.slf4j.LoggerFactory;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.kafka.config.KafkaListenerContainerFactory;
import?org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import?org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import?org.springframework.kafka.listener.MessageListenerContainer;
import?org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import?org.springframework.stereotype.Service;
import?org.springframework.util.StringUtils;import?java.util.Arrays;
import?java.util.Map;
import?java.util.Objects;
import?java.util.concurrent.Executor;@Service
public?class?DynamicKafkaConsumerService?{private?static?final?Logger?log?=?LoggerFactory.getLogger(DynamicKafkaConsumerService.class);@Autowiredprivate?KafkaListenerEndpointRegistry listenerRegistry;@Autowiredprivate?KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;@Autowiredprivate?KafkaListenerMetadataRegistry metadataRegistry;@Autowiredprivate?ConfigService configService;?// Nacos Config Serviceprivate?final?ObjectMapper?objectMapper?=?new?ObjectMapper();private?final?String?DATA_ID?=?"dynamic-kafka-topics.json";private?final?String?GROUP?=?"DEFAULT_GROUP";@PostConstructpublic?void?init()?throws?Exception {// 1. 應用啟動時,先拉取一次配置String?initialConfig?=?configService.getConfig(DATA_ID, GROUP,?5000);if?(StringUtils.hasText(initialConfig)) {refreshListeners(initialConfig);}// 2. 注冊 Nacos 監聽器configService.addListener(DATA_ID, GROUP,?new?Listener() {@Overridepublic?Executor?getExecutor()?{?return?null; }@Overridepublic?void?receiveConfigInfo(String configInfo)?{log.info("接收到 Kafka Topic 配置變更:\n{}", configInfo);refreshListeners(configInfo);}});}public?synchronized?void?refreshListeners(String configInfo)?{try?{Map<String, String> configMap = objectMapper.readValue(configInfo,?new?TypeReference<>() {});configMap.forEach((listenerId, topics) -> {log.info("準備刷新 Listener ID '{}' 的 Topics 為 '{}'", listenerId, topics);MessageListenerContainer?container?=?listenerRegistry.getListenerContainer(listenerId);String[] newTopics = topics.split(",");// 如果容器存在,且 Topic 列表發生了變化if?(container !=?null) {if?(!Arrays.equals(container.getContainerProperties().getTopics(), newTopics)) {recreateAndRegisterContainer(listenerId, newTopics);}}?else?{// 如果容器不存在 (可能被手動停止或首次創建),也進行創建recreateAndRegisterContainer(listenerId, newTopics);}});}?catch?(Exception e) {log.error("動態刷新 Kafka 消費者配置失敗", e);}}private?void?recreateAndRegisterContainer(String listenerId, String[] topics)?{log.info("開始重建并注冊 Listener ID '{}'", listenerId);// 1. 停止并銷毀舊容器MessageListenerContainer?container?=?listenerRegistry.getListenerContainer(listenerId);if?(container !=?null) {container.stop();// 在 Spring Kafka 2.8+ 中,注銷是內部操作,我們只需創建并注冊新的即可。}// 2. 從我們的“藍圖”中獲取元數據EndpointMetadata?metadata?=?metadataRegistry.getMetadata(listenerId);if?(metadata ==?null) {log.error("找不到 Listener ID '{}' 的元數據,無法重建。", listenerId);return;}// 3. 創建一個全新的 EndpointMethodKafkaListenerEndpoint<String, String> newEndpoint =?new?MethodKafkaListenerEndpoint<>();newEndpoint.setId(metadata.getId());newEndpoint.setGroupId(metadata.getGroupId());newEndpoint.setTopics(topics);?// <-- 核心:使用新 TopicnewEndpoint.setBean(metadata.getBean());newEndpoint.setMethod(metadata.getMethod());newEndpoint.setMessageHandlerMethodFactory(new?DefaultMessageHandlerMethodFactory());// 4. 注冊新的 EndpointlistenerRegistry.registerListenerContainer(newEndpoint, kafkaListenerContainerFactory,?true);log.info("成功重建并啟動 Listener ID '{}',現在監聽 Topics: {}", listenerId, Arrays.toString(topics));}
}

4. 實踐演練

步驟 4.1: 業務代碼

在你的 Spring Boot 應用中,正常定義你的消費者,但務必提供唯一的?id

@Service
public?class?OrderEventListener?{@KafkaListener(id = "order-listener", topics = "order_topic", groupId = "my-group")public?void?handleOrderEvent(String message)?{System.out.println("收到訂單消息: "?+ message);}
}
步驟 4.2:?application.yml?配置

確保你的應用連接到了 Nacos。

spring:cloud:nacos:config:server-addr:?127.0.0.1:8848
# ... kafka server acls
步驟 4.3: Nacos 配置

在 Nacos 中,創建一個?Data ID?為?dynamic-kafka-topics.jsonGroup?為?DEFAULT_GROUP?的配置,內容為 JSON 格式:

{"order-listener":?"order_topic"
}

Key (order-listener) 必須與?@KafkaListener?的?id?完全一致。

步驟 4.4: 啟動與驗證
  1. 1. 啟動應用。此時,order-listener?消費者會正常啟動,并開始消費?order_topic?的消息。

  2. 2.?動態變更!?去 Nacos 控制臺,將配置修改為:
    {"order-listener":?"order_topic,order_topic_vip"
    }
  3. 3. 點擊“發布”。

  4. 4.?觀察應用日志。?你會看到類似下面的日志:
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService ?: 接收到 Kafka Topic 配置變更: ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService ?: 準備刷新 Listener ID 'order-listener' 的 Topics 為 'order_topic,order_topic_vip'
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService ?: 開始重建并注冊 Listener ID 'order-listener'
    ... (舊容器停止的日志) ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService ?: 成功重建并啟動 Listener ID 'order-listener',現在監聽 Topics: [order_topic, order_topic_vip]
  5. 5.?驗證結果。?現在,你的?order-listener?已經開始同時消費?order_topic?和?order_topic_vip?兩個 Topic 的消息了,整個過程應用沒有重啟

總結

通過巧妙地結合?BeanPostProcessorKafkaListenerEndpointRegistry?和動態配置中心,我們實現了一個功能極其強大的動態 Kafka 消費管理方案。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/96767.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/96767.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/96767.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

使用vllm部署neo4j的text2cypher-gemma-2-9b-it-finetuned-2024v1模型

使用vllm部署neo4j的text2cypher-gemma-2-9b-it-finetuned-2024v1模型 系統環境準備 由于使用的基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu22.04 的 workbench,需要進行以下準備(其他系統環境可忽略) ldconfig -p | grep libcudnn 找到 libcudnn 的so庫,然…

Coze源碼分析-資源庫-創建知識庫-前端源碼-核心組件

概述 本文深入分析Coze Studio中用戶創建知識庫功能的前端實現。該功能允許用戶在資源庫中創建、編輯和管理知識庫資源&#xff0c;為開發者提供了強大的知識管理和數據處理能力。通過對源碼的詳細解析&#xff0c;我們將了解從資源庫入口到知識庫配置彈窗的完整架構設計、組件…

基于時空數據的網約車訂單需求預測與調度優化

一、引言隨著共享出行行業的蓬勃發展&#xff0c;網約車已成為城市交通的重要組成部分。如何精準預測訂單需求并優化車輛調度&#xff0c;是提升平臺運營效率、改善用戶體驗的關鍵。本文提出一種基于時空數據的網約車訂單需求預測與調度優化方案&#xff0c;通過網格化城市空間…

數據結構 Java對象的比較

在Java中&#xff0c;凡是涉及到比較的&#xff0c;可以分為兩類情況&#xff1a;一類是基本數據類型的比較&#xff0c;另一類是引用數據類型的比較。對于基本數據類型的比較&#xff0c;我們通過關系運算符&#xff08;、>、<、!、>、<&#xff09;進行它們之間的…

企智匯建筑施工項目管理系統:全周期數字化管控,賦能工程企業降本增效!?建筑工程項目管理軟件!建筑工程項目管理系統!建筑項目管理軟件企智匯軟件

在建筑施工行業&#xff0c;項目進度滯后、成本超支、質量安全隱患頻發、多方協同不暢等問題&#xff0c;一直是制約企業發展的痛點。傳統依賴人工記錄、Excel 統計的管理模式&#xff0c;不僅效率低下&#xff0c;更易因信息斷層導致決策失誤。企智匯建筑施工項目管理系統憑借…

k8s-臨時容器學習

臨時容器學習1. 什么是臨時容器2. 實驗1. 什么是臨時容器 在官網&#xff1a;https://kubernetes.io/zh-cn/docs/concepts/workloads/pods/ephemeral-containers/ 中有介紹 臨時容器是用于調試Pod中崩潰的容器或者不具備調試工具&#xff0c;比如在一個運行著業務的容器中&am…

Python 2025:低代碼開發與自動化運維的新紀元

從智能運維到無代碼應用&#xff0c;Python正在重新定義企業級應用開發范式在2025年的企業技術棧中&#xff0c;Python已經從一個"開發工具"演變為業務自動化的核心平臺。根據Gartner 2025年度報告&#xff0c;68%的企業在自動化項目中使用Python作為主要開發語言&am…

Netty 在 API 網關中的應用篇(請求轉發、限流、路由、負載均衡)

Netty 在 API 網關中的應用篇&#xff08;請求轉發、限流、路由、負載均衡&#xff09;隨著微服務架構的普及&#xff0c;API 網關成為服務之間通信和安全控制的核心組件。在構建高性能網關時&#xff0c;Netty 因其高吞吐、低延遲和異步非阻塞 IO 的特性&#xff0c;成為不少開…

基于STM32設計的青少年學習監控系統(華為云IOT)_282

文章目錄 一、前言 1.1 項目介紹 【1】項目開發背景 【2】設計實現的功能 【3】項目硬件模塊組成 【4】設計意義 【5】國內外研究現狀 【6】摘要 1.2 設計思路 1.3 系統功能總結 1.4 開發工具的選擇 【1】設備端開發 【2】上位機開發 1.5 參考文獻 1.6 系統框架圖 1.7 系統原理…

手寫Spring底層機制的實現【初始化IOC容器+依賴注入+BeanPostProcesson機制+AOP】

摘要&#xff1a;建議先看“JAVA----Spring的AOP和動態代理”這個文章&#xff0c;解釋都在代碼中&#xff01;一&#xff1a;提出問題依賴注入1.單例beans.xml<?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframe…

5G NR-NTN協議學習系列:NR-NTN介紹(2)

NTN網絡作為依賴衛星的通信方式&#xff0c;需要面對的通信距離&#xff0c;通信雙方的移動速度都和之前TN網絡存在巨大差異。在距離方面相比蜂窩地面網絡Terrestrial Network通信距離從最小幾百米到最大幾十km的情況&#xff0c;NTN非地面網絡的通信距離即使是近地軌道的LEO衛…

線掃相機采集圖像起始位置不正確原因總結

1、幀觸發開始時間問題 問題描述: 由于幀觸發決定了線掃相機的開始采集圖像位置,比如正確的位置是A點開始采集,結果你從B點開始觸發幀信號,這樣出來的圖像起始位置就不對 解決手段: 軟件需要記錄幀觸發時軸的位置 1)控制卡控制軸 一般使用位置比較觸發,我們可以通過監…

校園管理系統練習項目源碼-前后端分離-【node版】

今天給大家分享一個校園管理系統&#xff0c;前后端分離項目。這是最近在練習前端編程&#xff0c;結合 node 寫的一個完整的項目。 使用的技術&#xff1a; Node.js&#xff1a;版本要求16.20以上。 后端框架&#xff1a;Express框架。 數據庫&#xff1a; MySQL 8.0。 Vue2&a…

【項目】 :C++ - 仿mudou庫one thread one loop式并發服務器實現(模塊劃分)

【項目】 &#xff1a;C - 仿mudou庫one thread one loop式并發服務器實現一、HTTP 服務器與 Reactor 模型1.1、HTTP 服務器概念實現步驟難點1.2、Reactor 模型概念分類1. 單 Reactor 單線程2. 單 Reactor 多線程3. 多 Reactor 多線程目標定位總結二、功能模塊劃分2.1、SERVER …

浴室柜市占率第一,九牧重構數智衛浴新生態

作者 | 曾響鈴文 | 響鈴說2025年上半年&#xff0c;家居市場在政策的推動下展現出獨特的發展態勢。國家出臺的一系列鼓勵家居消費的政策&#xff0c;如“以舊換新”國補政策帶動超6000萬件廚衛產品煥新&#xff0c;以及我國超2.7億套房齡超20年的住宅進入改造周期&#xff0c;都…

源碼分析之Leaflet中TileLayer

概述 TileLayer 是 Layer 的子類&#xff0c;繼承自GridLayer基類&#xff0c;用于加載和顯示瓦片地圖。它提供了加載和顯示瓦片地圖的功能&#xff0c;支持自定義瓦片的 URL 格式和參數。 源碼分析 源碼實現 TileLayer的源碼實現如下&#xff1a; export var TileLayer GridL…

php學習(第二天)

一.網站基本概念-服務器 1.什么是服務器? 1.1定義 服務器&#xff08;server&#xff09;,也稱伺服器&#xff0c;是提供計算服務的設備。 供計算服務的設備” 這里的“設備”不僅指物理機器&#xff08;如一臺配有 CPU、內存、硬盤的計算機&#xff09;&#xff0c;也可以指…

C++(友元和運算符重載)

目錄 友元&#xff1a; 友元函數&#xff1a; 示例&#xff1a; 友元類&#xff1a; 示例&#xff1a; 優點&#xff1a; 注意事項&#xff1a; 運算符重載&#xff1a; 注意&#xff1a; 示例&#xff1a; 友元&#xff1a; C中如果想要外部函數或者類對一個類的pr…

和平精英風格射擊游戲開發指南

本教程將完整講解如何開發一款和平精英風格的HTML射擊游戲&#xff0c;涵蓋核心設計理念、代碼架構與關鍵實現細節。 核心設計架構 游戲機制系統 角色控制系統&#xff1a;通過鍵盤實現玩家移動戰斗系統&#xff1a;子彈發射與碰撞檢測道具系統&#xff1a;武器、彈藥和醫療包收…

21.1 《24GB顯存搞定LLaMA2-7B指令微調:QLoRA+Flash Attention2.0全流程實戰》

24GB顯存搞定LLaMA2-7B指令微調:QLoRA+Flash Attention2.0全流程實戰 實戰 LLaMA2-7B 指令微調 一、指令微調技術背景 指令微調(Instruction Tuning)是大模型訓練中的關鍵技術突破點。與傳統全量微調(Full Fine-Tuning)相比,指令微調通過特定格式的指令-響應數據訓練,…