spring中的@KafkaListener 注解詳解

@KafkaListener 是 Spring Kafka 提供的一個核心注解,用于標記一個方法作為 Kafka 消息的消費者。下面是對該注解的詳細解析:

基本用法

@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {System.out.println("Received Message: " + message);
}

主要屬性

1. 必需屬性

  • topics / topicPattern:指定監聽的 topic
    • topics:逗號分隔的 topic 列表
    • `topicPattern**:使用正則表達式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")

2. 消費者配置

  • groupId:指定消費者組 ID
  • containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "myFactory")

3. 消息處理

  • id:為監聽器指定唯一 ID
  • concurrency:設置并發消費者數量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")

4. 高級配置

  • containerGroup:指定容器組(Spring Kafka 2.5+)
  • errorHandler:指定錯誤處理器
  • idIsGroup:是否使用監聽器 ID 作為組 ID(默認 false)

消息處理方法簽名

監聽器方法可以接受多種形式的參數:

  1. 簡單消息處理

    @KafkaListener(topics = "myTopic")
    public void listen(String message) { ... }
    
  2. 帶元數據的消息處理

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> record) { ... }
    
  3. 批量消息處理

    @KafkaListener(topics = "myTopic")
    public void listen(List<String> messages) { ... }
    
  4. 帶確認的消息處理

    @KafkaListener(topics = "myTopic")
    public void listen(String message, Acknowledgment ack) {// 處理消息后手動確認ack.acknowledge();
    }
    

配置選項

可以通過 @KafkaListenercontainerFactory 屬性引用自定義配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;
}@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }

錯誤處理

可以通過以下方式處理錯誤:

  1. 配置錯誤處理器

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setErrorHandler(new SeekToCurrentErrorHandler());return factory;
    }
    
  2. 使用 @SendTo 發送到死信隊列

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    @SendTo("myDltTopic")
    public String listen(String message) {// 處理失敗時返回錯誤消息return "error";
    }
    

注意事項

  1. 監聽器方法應該是 public 的
  2. 避免在監聽器方法中執行長時間運行的操作
  3. 考慮消息處理的冪等性
  4. 對于批量處理,確保方法參數是 List 類型
  5. 在 Spring Boot 中,許多配置可以通過 application.properties/yml 設置

完整示例

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}
}@Service
public class KafkaMessageListener {@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "kafkaListenerContainerFactory")public void listen(String message, Acknowledgment ack) {try {System.out.println("Received Message: " + message);// 業務處理邏輯ack.acknowledge();} catch (Exception e) {// 錯誤處理}}
}

@KafkaListener 注解提供了靈活的方式來消費 Kafka 消息,開發者可以根據具體需求進行配置和擴展。

ConcurrentKafkaListenerContainerFactory詳解

在Spring Kafka中,ConcurrentKafkaListenerContainerFactory是一個核心配置類,用于創建并發消息監聽容器,支持多線程消費Kafka消息,以下是其詳細介紹:

1、核心作用

  1. 并發消費支持:通過創建多個KafkaMessageListenerContainer實例(每個對應一個線程),實現多線程并發消費消息。例如設置concurrency=3會創建3個消費者線程,每個線程處理分配到的分區。
  2. 線程安全保障:生成的ConcurrentMessageListenerContainer內部委托給多個單線程的KafkaMessageListenerContainer實例,保證線程安全性(Kafka Consumer本身非線程安全)。

2、關鍵特性

  1. 并發度配置

    • 通過setConcurrency()方法設置并發消費者數量,可提高消息處理速度和吞吐量。
    • 配置規則為concurrency<=分區數/應用實例數,設置過多會導致線程閑置。
  2. 批量處理支持

    • 通過setBatchListener(true)啟用批量消費
    • 配合MAX_POLL_RECORDS_CONFIG參數控制單次poll最大返回記錄數
  3. 錯誤處理機制

    • 可配置自定義錯誤處理器(如SeekToCurrentErrorHandler
    • 支持重試策略集成
  4. 分區分配控制

    • 可自定義分區分配邏輯
    • 配合group.id實現消費者組協調

3、配置示例

@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 設置并發消費者數量factory.setBatchListener(true); // 啟用批量消費factory.getContainerProperties().setPollTimeout(3000); // 設置輪詢超時factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 設置錯誤處理器return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消費配置return new DefaultKafkaConsumerFactory<>(props);}
}

4、使用場景

  1. 高吞吐量需求:通過增加并發消費者數量提升處理能力
  2. 批量數據處理:需要批量處理消息的場景
  3. 復雜錯誤處理:需要自定義錯誤處理邏輯的場景
  4. 多主題監聽:需要同時監聽多個主題的場景

5、注意事項

  1. 順序性問題:并發消費可能導致消息順序混亂,需業務保證
  2. 重復處理問題:需實現冪等性處理機制
  3. 數據庫訪問:需注意并發訪問控制
  4. 資源限制:并發度設置需考慮系統資源限制

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/908728.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/908728.shtml
英文地址,請注明出處:http://en.pswp.cn/news/908728.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

多區域協同的異地多活AI推理服務架構

&#x1f310;多區域協同的異地多活AI推理服務架構 #mermaid-svg-TTnpRKKC7k3twxhE {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TTnpRKKC7k3twxhE .error-icon{fill:#552222;}#mermaid-svg-TTnpRKKC7k3twxhE .er…

極客時間:在 Google Colab 上嘗試 Prefix Tuning

每周跟蹤AI熱點新聞動向和震撼發展 想要探索生成式人工智能的前沿進展嗎&#xff1f;訂閱我們的簡報&#xff0c;深入解析最新的技術突破、實際應用案例和未來的趨勢。與全球數同行一同&#xff0c;從行業內部的深度分析和實用指南中受益。不要錯過這個機會&#xff0c;成為AI領…

Android設備推送traceroute命令進行網絡診斷

文章目錄 工作原理下載traceroute for android推送到安卓設備執行traceroutetraceroute www.baidu.com Traceroute&#xff08;追蹤路由&#xff09; 是一個用于網絡診斷的工具&#xff0c;主要用于追蹤數據包從源主機到目標主機所經過的路由路徑&#xff0c;以及每一跳&#x…

【Linux應用】Linux系統日志上報服務,以及thttpd的配置、發送函數

【Linux應用】Linux系統日志上報服務&#xff0c;以及thttpd的配置、發送函數 文章目錄 thttpd服務安裝thttpd配置thttpd服務thttpd函數日志效果和文件附錄&#xff1a;開發板快速上手&#xff1a;鏡像燒錄、串口shell、外設掛載、WiFi配置、SSH連接、文件交互&#xff08;RADX…

Linux 內核內存管理子系統全面解析與體系構建

一、前言: 為什么內存管理是核心知識 內存管理是 Linux 內核最核心也最復雜的子系統之一&#xff0c;其作用包括&#xff1a; 為軟件提供獨立的虛擬內存空間&#xff0c;實現安全隔離分配/回收物理內存資源&#xff0c;維持系統穩定支持不同類型的內存分配器&#xff0c;最優…

鼠標的拖動效果

1、變量的設置 let isDragging false; let startX; let startY&#xff1b; let endX; let endY; let box null;isDragging : 表示是否推拽startX、startY&#xff1a;表示起始坐標&#xff0c;相對于元素endX、endY&#xff1a;表示結束坐標&#xff0c;相對于元素box&…

SwaggerFuzzer:一款自動化 OpenAPI/Swagger 接口未授權訪問測試工具

SwaggerFuzzer &#x1f310; 一款自動化 OpenAPI/Swagger 接口未授權訪問測試工具&#x1f680; 工具介紹&#xff1a;SwaggerFuzzer? 核心功能亮點&#x1f680; 快速使用&#x1f9f0; 支持參數 &#x1f4cc; 項目結構&#x1f4e5; 獲取與下載 &#x1f310; 一款自動化 …

文獻閱讀:Exploring Autoencoder-based Error-bounded Compression for Scientific Data

目錄 論文簡介動機&#xff1a;為什么作者想要解決這個問題&#xff1f;貢獻&#xff1a;作者在這篇論文中完成了什么工作(創新點)&#xff1f;規劃&#xff1a;他們如何完成工作&#xff1f;離線訓練階段&#xff1a;在線壓縮階段 理由&#xff1a;通過什么實驗驗證它們的工作…

【業務框架】3C-相機-Cinemachine

概述 插件&#xff0c;做相機需求&#xff0c;等于相機老師傅多年經驗總結的工具 Feature Transform&#xff1a;略Control Camera&#xff1a;控制相機參數Noise&#xff1a;增加隨機性Blend&#xff1a;CameraBrain的混合列表指定一個虛擬相機到另一個相機的過渡&#xff…

設計一個算法:刪除非空單鏈表L中結點值為x的第一個結點的前驅結點

目錄 單鏈表的存儲結構定義如下 快慢指針法 三指針法版本① 三指針法版本② 單鏈表的存儲結構定義如下 typedef struct{Elemtype data;struct Node* next; }LNode,*LinkList; 快慢指針法 void deleteprex(LinkList L, Elemtype e) {if (L NULL || L->next NULL ||…

【Qt】:設置新建類模板

完整的頭文件模板 #ifndef %FILENAME%_H #define %FILENAME%_H/*** brief The %CLASSNAME% class* author %USER%* date %DATE%*/ class %CLASSNAME% { public:%CLASSNAME%();~%CLASSNAME%();// 禁止拷貝構造和賦值%CLASSNAME%(const %CLASSNAME%&) delete;%CLASSNAME%&a…

?**?CID字體?**? 和 ?**?Simple字體?**?

在PDF中&#xff0c;字體類型主要分為 ??CID字體?? 和 ??Simple字體?? 兩大類&#xff0c;它們的主要區別在于編碼方式和適用場景。以下是它們的詳細對比&#xff1a; ??1. CID字體&#xff08;CID-keyed Fonts&#xff09;?? CID&#xff08;Character Identifie…

計組_導學

2025.05.31:老湯講408計組學習筆記 導學 第1章計算機系統概述:對計算機系統有全局的認識第2章總線系統:簡單且獨立,不會依賴其他內容,它是被依賴的第3章主存儲器:只有了解主存儲器的內部結構,才能理解在主存中是如何存儲二進制的第4章數據的表示與運算:各種編碼以及計算…

【GPT模型訓練】第二課:張量與秩:從數學本質到深度學習的基礎概念解析

這里寫自定義目錄標題 張量&#xff08;Tensor&#xff09;的定義關鍵特點&#xff1a;示例&#xff1a; 張量的秩&#xff08;Rank&#xff09;示例&#xff1a;“秩”的拼音常見混淆點 總結 張量&#xff08;Tensor&#xff09;的定義 在數學和物理學中&#xff0c;張量是一…

RabbitMQ work模型

Work 模型是 RabbitMQ 最基礎的消息處理模式&#xff0c;核心思想是 ??多個消費者競爭消費同一個隊列中的消息??&#xff0c;適用于任務分發和負載均衡場景。同一個消息只會被一個消費者處理。 當一個消息隊列綁定了多個消費者&#xff0c;每個消息消費的個數都是平攤的&a…

【Linux操作系統】基礎開發工具(yum、vim、gcc/g++)

文章目錄 Linux軟件包管理器 - yumLinux下的三種安裝方式什么是軟件包認識Yum與RPMyum常用指令更新軟件安裝與卸載查找與搜索清理緩存與重建元數據 yum源更新1. 備份現有的 yum 源配置2. 下載新的 repo 文件3. 清理并重建緩存 Linux編輯器 - vim啟動vimVim 的三種主要模式常用操…

73常用控件_QFormLayout的使用

目錄 代碼?例: 使? QFormLayout 創建表單. 除了上述的布局管理器之外, Qt 還提供了 QFormLayout , 屬于是 QGridLayout 的特殊情況, 專 ??于實現兩列表單的布局. 這種表單布局多?于讓??填寫信息的場景. 左側列為提?, 右側列為輸?框 代碼?例: 使? QFormLayout 創…

蘭亭妙微 | 醫療軟件的界面設計能有多專業?

從醫療影像系統到手術機器人控制界面&#xff0c;從便攜式病原體檢測設備到多平臺協同操作系統&#xff0c;蘭亭妙微為眾多醫療設備研發企業&#xff0c;打造了兼具專業性與可用性的交互界面方案。 我們不僅做設計&#xff0c;更深入理解醫療場景的實際需求&#xff1a; 對精…

鴻蒙開發修改版本幾個步驟

鴻蒙開發修改版本幾個步驟 比如&#xff1a;5.0.4&#xff08;16&#xff09;版本改為5.0.2&#xff08;14&#xff09;版本 一、項目下的build-profile.json5 "products": [{"name": "default","signingConfig": "default&qu…