總結一下實現的方法:
1、延遲隊列,首先kafka是沒有延遲隊列的,那要實現延遲隊列的話,就得使用其他方法。在發送消息的時候加上時間戳,再在時間戳上面加上延遲時間。消費的時候判斷一下,有沒有到達延遲時間,如果沒有到達的話,重新入隊,或啟用定時線程處理。
2、重試隊列,使用@RetryableTopic注解
3、死信隊列,使用@DltHandler 或 @KafkaListener監聽死信隊列
代碼非完整代碼,僅供參考
1. 添加依賴
確保你的 pom.xml
文件中包含 Spring Kafka 的依賴:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>
</dependencies>
2. 配置 Kafka
在 application.properties
文件中配置 Kafka 的連接信息和消費者的基本配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
3. 創建 Kafka 生產者
創建一個 Kafka 生產者服務,用于發送消息到指定的 Topic:
package com.example.demo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.Date;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 發送延遲消息到指定的 Topic。* @param topic 目標 Topic 名稱(延遲隊列為delay-topic,其他為my-topic)* @param message 要發送的消息內容* @param delay 延遲時間(毫秒)*/public void sendDelayedMessage(String topic, String message, long delay) {long timestamp = Instant.now().toEpochMilli() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}
4. 創建 Kafka 消費者
4.1 消費延遲隊列的消費者
@Service
public class KafkaConsumerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "delay-topic", groupId = "delay-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {long currentTimestamp = System.currentTimeMillis();long messageTimestamp = record.timestamp();// 檢查是否到達延遲時間if (currentTimestamp < messageTimestamp) {// 未到達延遲時間,重新發送到延遲隊列long remainingDelay = messageTimestamp - currentTimestamp;sendDelayedMessage(record.topic(), record.value(), remainingDelay);} else {// 到達延遲時間,處理消息System.out.println("Processing message: " + record.value());}// 確認消息已處理acknowledgment.acknowledge();}private void sendDelayedMessage(String topic, String message, long delay) {long timestamp = System.currentTimeMillis() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}
4.2 消費重試隊列,失敗放入死信隊列
創建一個 Kafka 消費者服務,用于監聽指定的 Topic 并處理消息。使用 @KafkaListener
注解來指定監聽的 Topic,并手動提交偏移量。
package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;import java.time.Instant;@Service
public class KafkaConsumer {/*** 監聽指定的 Topic 并處理消息。* 使用 @RetryableTopic 注解實現重試機制,最多嘗試 3 次,每次重試間隔 2 秒,最大延遲 60 秒。* 如果所有重試都失敗,消息將發送到死信隊列。** @param record 消費的消息記錄* @param acknowledgment 用于手動提交偏移量*/@RetryableTopic(attempts = "3", // 最大重試次數backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000), // 重試間隔和最大延遲dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR, // 失敗后發送到死信隊列autoCreateTopics = "true" // 自動創建重試和死信隊列主題)@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {System.out.println("Received message: " + record.value());// 模擬異常if (shouldFail()) {throw new RuntimeException("Simulated failure");}acknowledgment.acknowledge(); // 提交偏移量} catch (Exception e) {throw e; // 拋出異常,觸發重試機制}}/*** 模擬處理失敗的條件。* @return 是否模擬失敗*/private boolean shouldFail() {// 模擬處理失敗的條件return true;}/*** @DltHandler 注解標記的方法用于處理死信隊列中的消息。* 當消息在重試隊列中多次重試失敗后,會被發送到死信隊列。* @DltHandler 注解的方法會監聽死信隊列,并對其中的消息進行處理。* @DltHandler 它與 @RetryableTopic 注解結合使用,用于處理重試失敗后的死信消息。* * 處理死信隊列中的消息。* @param record 死信隊列中的消息記錄*/@DltHandlerpublic void dltListen(ConsumerRecord<String, String> record) {String topic = record.topic(); // 獲取死信隊列的主題名稱System.out.println("Received message in DLT: " + record.value());System.out.println("Topic: " + topic); // 打印主題名稱// 處理死信消息, 可以在這里添加對死信消息的處理邏輯}
}
5. 配置 Kafka 消費者工廠
在 Spring Boot 中,可以通過配置 ConcurrentKafkaListenerContainerFactory
來設置重試機制和死信隊列處理策略。
@RetryableTopic 和 SeekToCurrentErrorHandler 的配置不會同時生效。Spring Kafka 會優先處理 @RetryableTopic 注解的配置,因為它是一個更高級的抽象,專門用于處理重試和死信隊列的邏輯。
為了避免配置沖突,建議選擇一種方式來實現重試和死信隊列的邏輯。如果你選擇使用 @RetryableTopic,則不需要再配置 SeekToCurrentErrorHandler,即這里就可以跳過。
package com.example.demo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);// 設置錯誤處理器,最多重試 3 次,失敗后發送到死信隊列factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));return factory;}
}
6. 創建死信隊列消費者
創建一個消費者來監聽死信隊列主題,對死信消息進行后續處理(配置了@DltHandler 可以不用 @KafkaListener)。
package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class DltConsumer {/*** 監聽死信隊列主題并處理消息。* @param record 死信隊列中的消息記錄*/@KafkaListener(topics = "my-topic.DLT", groupId = "dlt-group")public void listen(ConsumerRecord<String, String> record) {System.out.println("Received message in DLT: " + record.value());// 可以在這里添加對死信消息的處理邏輯}
}
6.1 @DltHandler 與 @KafkaListener 的區別和適用場景
6.1.1 @DltHandler 的特點
與重試機制緊密結合:@DltHandler 注解的方法與 @RetryableTopic 注解的重試機制緊密結合,自動處理重試失敗的消息。
自動發送到死信隊列:當消息在重試隊列中多次重試失敗后,Spring Kafka 會自動將消息發送到死信隊列。
簡化代碼:使用 @DltHandler 注解可以簡化代碼,減少手動處理死信消息的邏輯。
6.1.2 @KafkaListener 的特點
通用性:@KafkaListener 注解適用于任何 Kafka 主題,包括死信隊列主題。
靈活性:可以用于監聽任何主題,而不僅僅是死信隊列。這使得它更加靈活,可以用于多種場景。
手動處理:需要手動配置死信隊列主題,并在代碼中顯式處理死信消息。
6.2. @DltHandler 與 @KafkaListener 總結
**使用 @DltHandler:**如果你需要與 Spring Kafka 的重試機制緊密結合,并且希望自動處理重試失敗的消息,使用 @DltHandler 是一個更簡潔和方便的選擇。
**使用 @KafkaListener:**如果你需要監聽多個主題,或者需要更靈活地處理死信消息,使用 @KafkaListener 是一個更好的選擇。
注意:如果 @KafkaListener 監聽了死信隊列的主題(例如 my-topic.DLT),那么當消息被發送到死信隊列時,@KafkaListener 會先捕獲并處理這些消息。這可能導致 @DltHandler 方法無法接收到死信隊列中的消息。因此,兩個最好不要一塊用。
7. 啟動類
創建一個 Spring Boot 應用程序的啟動類:
package com.example.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}
總結
通過以上步驟,你可以在 Spring Boot 中實現 Kafka 的延遲隊列、死信隊列和重試隊列。這些功能可以確保消息處理的可靠性和健壯性,避免消息丟失或重復處理。希望這些示例能幫助你更好地理解和使用 Kafka 的高級特性。