@KafkaListener
是 Spring Kafka 提供的一個核心注解,用于標記一個方法作為 Kafka 消息的消費者。下面是對該注解的詳細解析:
基本用法
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {System.out.println("Received Message: " + message);
}
主要屬性
1. 必需屬性
- topics / topicPattern:指定監聽的 topic
topics
:逗號分隔的 topic 列表- `topicPattern**:使用正則表達式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")
2. 消費者配置
- groupId:指定消費者組 ID
- containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "myFactory")
3. 消息處理
- id:為監聽器指定唯一 ID
- concurrency:設置并發消費者數量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")
4. 高級配置
- containerGroup:指定容器組(Spring Kafka 2.5+)
- errorHandler:指定錯誤處理器
- idIsGroup:是否使用監聽器 ID 作為組 ID(默認 false)
消息處理方法簽名
監聽器方法可以接受多種形式的參數:
-
簡單消息處理:
@KafkaListener(topics = "myTopic") public void listen(String message) { ... }
-
帶元數據的消息處理:
@KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> record) { ... }
-
批量消息處理:
@KafkaListener(topics = "myTopic") public void listen(List<String> messages) { ... }
-
帶確認的消息處理:
@KafkaListener(topics = "myTopic") public void listen(String message, Acknowledgment ack) {// 處理消息后手動確認ack.acknowledge(); }
配置選項
可以通過 @KafkaListener
的 containerFactory
屬性引用自定義配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;
}@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }
錯誤處理
可以通過以下方式處理錯誤:
-
配置錯誤處理器:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setErrorHandler(new SeekToCurrentErrorHandler());return factory; }
-
使用 @SendTo 發送到死信隊列:
@KafkaListener(topics = "myTopic", groupId = "myGroup") @SendTo("myDltTopic") public String listen(String message) {// 處理失敗時返回錯誤消息return "error"; }
注意事項
- 監聽器方法應該是 public 的
- 避免在監聽器方法中執行長時間運行的操作
- 考慮消息處理的冪等性
- 對于批量處理,確保方法參數是 List 類型
- 在 Spring Boot 中,許多配置可以通過 application.properties/yml 設置
完整示例
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}
}@Service
public class KafkaMessageListener {@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "kafkaListenerContainerFactory")public void listen(String message, Acknowledgment ack) {try {System.out.println("Received Message: " + message);// 業務處理邏輯ack.acknowledge();} catch (Exception e) {// 錯誤處理}}
}
@KafkaListener
注解提供了靈活的方式來消費 Kafka 消息,開發者可以根據具體需求進行配置和擴展。
ConcurrentKafkaListenerContainerFactory詳解
在Spring Kafka中,ConcurrentKafkaListenerContainerFactory
是一個核心配置類,用于創建并發消息監聽容器,支持多線程消費Kafka消息,以下是其詳細介紹:
1、核心作用
- 并發消費支持:通過創建多個
KafkaMessageListenerContainer
實例(每個對應一個線程),實現多線程并發消費消息。例如設置concurrency=3
會創建3個消費者線程,每個線程處理分配到的分區。 - 線程安全保障:生成的
ConcurrentMessageListenerContainer
內部委托給多個單線程的KafkaMessageListenerContainer
實例,保證線程安全性(Kafka Consumer本身非線程安全)。
2、關鍵特性
-
并發度配置:
- 通過
setConcurrency()
方法設置并發消費者數量,可提高消息處理速度和吞吐量。 - 配置規則為
concurrency<=分區數/應用實例數
,設置過多會導致線程閑置。
- 通過
-
批量處理支持:
- 通過
setBatchListener(true)
啟用批量消費 - 配合
MAX_POLL_RECORDS_CONFIG
參數控制單次poll最大返回記錄數
- 通過
-
錯誤處理機制:
- 可配置自定義錯誤處理器(如
SeekToCurrentErrorHandler
) - 支持重試策略集成
- 可配置自定義錯誤處理器(如
-
分區分配控制:
- 可自定義分區分配邏輯
- 配合
group.id
實現消費者組協調
3、配置示例
@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 設置并發消費者數量factory.setBatchListener(true); // 啟用批量消費factory.getContainerProperties().setPollTimeout(3000); // 設置輪詢超時factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 設置錯誤處理器return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消費配置return new DefaultKafkaConsumerFactory<>(props);}
}
4、使用場景
- 高吞吐量需求:通過增加并發消費者數量提升處理能力
- 批量數據處理:需要批量處理消息的場景
- 復雜錯誤處理:需要自定義錯誤處理邏輯的場景
- 多主題監聽:需要同時監聽多個主題的場景
5、注意事項
- 順序性問題:并發消費可能導致消息順序混亂,需業務保證
- 重復處理問題:需實現冪等性處理機制
- 數據庫訪問:需注意并發訪問控制
- 資源限制:并發度設置需考慮系統資源限制