Kafka 的延遲隊列、死信隊列和重試隊列

總結一下實現的方法:
1、延遲隊列,首先kafka是沒有延遲隊列的,那要實現延遲隊列的話,就得使用其他方法。在發送消息的時候加上時間戳,再在時間戳上面加上延遲時間。消費的時候判斷一下,有沒有到達延遲時間,如果沒有到達的話,重新入隊,或啟用定時線程處理。
2、重試隊列,使用@RetryableTopic注解
3、死信隊列,使用@DltHandler 或 @KafkaListener監聽死信隊列

代碼非完整代碼,僅供參考

1. 添加依賴

確保你的 pom.xml 文件中包含 Spring Kafka 的依賴:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>
</dependencies>

2. 配置 Kafka

application.properties 文件中配置 Kafka 的連接信息和消費者的基本配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false

3. 創建 Kafka 生產者

創建一個 Kafka 生產者服務,用于發送消息到指定的 Topic:

package com.example.demo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.Date;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 發送延遲消息到指定的 Topic。* @param topic 目標 Topic 名稱(延遲隊列為delay-topic,其他為my-topic)* @param message 要發送的消息內容* @param delay 延遲時間(毫秒)*/public void sendDelayedMessage(String topic, String message, long delay) {long timestamp = Instant.now().toEpochMilli() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}

4. 創建 Kafka 消費者

4.1 消費延遲隊列的消費者
@Service
public class KafkaConsumerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "delay-topic", groupId = "delay-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {long currentTimestamp = System.currentTimeMillis();long messageTimestamp = record.timestamp();// 檢查是否到達延遲時間if (currentTimestamp < messageTimestamp) {// 未到達延遲時間,重新發送到延遲隊列long remainingDelay = messageTimestamp - currentTimestamp;sendDelayedMessage(record.topic(), record.value(), remainingDelay);} else {// 到達延遲時間,處理消息System.out.println("Processing message: " + record.value());}// 確認消息已處理acknowledgment.acknowledge();}private void sendDelayedMessage(String topic, String message, long delay) {long timestamp = System.currentTimeMillis() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}
4.2 消費重試隊列,失敗放入死信隊列

創建一個 Kafka 消費者服務,用于監聽指定的 Topic 并處理消息。使用 @KafkaListener 注解來指定監聽的 Topic,并手動提交偏移量。

package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;import java.time.Instant;@Service
public class KafkaConsumer {/*** 監聽指定的 Topic 并處理消息。* 使用 @RetryableTopic 注解實現重試機制,最多嘗試 3 次,每次重試間隔 2 秒,最大延遲 60 秒。* 如果所有重試都失敗,消息將發送到死信隊列。** @param record 消費的消息記錄* @param acknowledgment 用于手動提交偏移量*/@RetryableTopic(attempts = "3", // 最大重試次數backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000), // 重試間隔和最大延遲dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR, // 失敗后發送到死信隊列autoCreateTopics = "true" // 自動創建重試和死信隊列主題)@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {System.out.println("Received message: " + record.value());// 模擬異常if (shouldFail()) {throw new RuntimeException("Simulated failure");}acknowledgment.acknowledge(); // 提交偏移量} catch (Exception e) {throw e; // 拋出異常,觸發重試機制}}/*** 模擬處理失敗的條件。* @return 是否模擬失敗*/private boolean shouldFail() {// 模擬處理失敗的條件return true;}/*** @DltHandler 注解標記的方法用于處理死信隊列中的消息。* 當消息在重試隊列中多次重試失敗后,會被發送到死信隊列。* @DltHandler 注解的方法會監聽死信隊列,并對其中的消息進行處理。* @DltHandler 它與 @RetryableTopic 注解結合使用,用于處理重試失敗后的死信消息。* * 處理死信隊列中的消息。* @param record 死信隊列中的消息記錄*/@DltHandlerpublic void dltListen(ConsumerRecord<String, String> record) {String topic = record.topic(); // 獲取死信隊列的主題名稱System.out.println("Received message in DLT: " + record.value());System.out.println("Topic: " + topic); // 打印主題名稱// 處理死信消息, 可以在這里添加對死信消息的處理邏輯}
}

5. 配置 Kafka 消費者工廠

在 Spring Boot 中,可以通過配置 ConcurrentKafkaListenerContainerFactory 來設置重試機制和死信隊列處理策略。
@RetryableTopic 和 SeekToCurrentErrorHandler 的配置不會同時生效。Spring Kafka 會優先處理 @RetryableTopic 注解的配置,因為它是一個更高級的抽象,專門用于處理重試和死信隊列的邏輯。
為了避免配置沖突,建議選擇一種方式來實現重試和死信隊列的邏輯。如果你選擇使用 @RetryableTopic,則不需要再配置 SeekToCurrentErrorHandler,即這里就可以跳過。

package com.example.demo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);// 設置錯誤處理器,最多重試 3 次,失敗后發送到死信隊列factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));return factory;}
}

6. 創建死信隊列消費者

創建一個消費者來監聽死信隊列主題,對死信消息進行后續處理(配置了@DltHandler 可以不用 @KafkaListener)。

package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class DltConsumer {/*** 監聽死信隊列主題并處理消息。* @param record 死信隊列中的消息記錄*/@KafkaListener(topics = "my-topic.DLT", groupId = "dlt-group")public void listen(ConsumerRecord<String, String> record) {System.out.println("Received message in DLT: " + record.value());// 可以在這里添加對死信消息的處理邏輯}
}

6.1 @DltHandler 與 @KafkaListener 的區別和適用場景
6.1.1 @DltHandler 的特點
與重試機制緊密結合:@DltHandler 注解的方法與 @RetryableTopic 注解的重試機制緊密結合,自動處理重試失敗的消息。
自動發送到死信隊列:當消息在重試隊列中多次重試失敗后,Spring Kafka 會自動將消息發送到死信隊列。
簡化代碼:使用 @DltHandler 注解可以簡化代碼,減少手動處理死信消息的邏輯。
6.1.2 @KafkaListener 的特點
通用性:@KafkaListener 注解適用于任何 Kafka 主題,包括死信隊列主題。
靈活性:可以用于監聽任何主題,而不僅僅是死信隊列。這使得它更加靈活,可以用于多種場景。
手動處理:需要手動配置死信隊列主題,并在代碼中顯式處理死信消息。
6.2. @DltHandler 與 @KafkaListener 總結
**使用 @DltHandler:**如果你需要與 Spring Kafka 的重試機制緊密結合,并且希望自動處理重試失敗的消息,使用 @DltHandler 是一個更簡潔和方便的選擇。
**使用 @KafkaListener:**如果你需要監聽多個主題,或者需要更靈活地處理死信消息,使用 @KafkaListener 是一個更好的選擇。
注意:如果 @KafkaListener 監聽了死信隊列的主題(例如 my-topic.DLT),那么當消息被發送到死信隊列時,@KafkaListener 會先捕獲并處理這些消息。這可能導致 @DltHandler 方法無法接收到死信隊列中的消息。因此,兩個最好不要一塊用。

7. 啟動類

創建一個 Spring Boot 應用程序的啟動類:

package com.example.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}

總結

通過以上步驟,你可以在 Spring Boot 中實現 Kafka 的延遲隊列、死信隊列和重試隊列。這些功能可以確保消息處理的可靠性和健壯性,避免消息丟失或重復處理。希望這些示例能幫助你更好地理解和使用 Kafka 的高級特性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/73640.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/73640.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/73640.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

DCAT模型:雙交叉注意力革新醫學影像診斷,AUC 99.75%

一、研究背景&#xff1a;醫學影像診斷的挑戰 在醫學影像領域&#xff08;如X光、OCT&#xff09;&#xff0c;精準分類疾病直接影響患者治療決策。傳統深度學習模型存在兩大痛點&#xff1a; 1.過度自信&#xff1a;即使圖像模糊或存在噪聲&#xff0c;模型仍可能給出高…

2.2.2 Spark單機版環境

本文介紹了如何搭建和使用Spark單機版環境。首先&#xff0c;確保安裝配置好JDK&#xff0c;然后從群共享下載Spark安裝包并上傳至云主機的/opt目錄。接著&#xff0c;解壓到/usr/local目錄并配置環境變量&#xff0c;通過spark-submit --version驗證安裝成功。在使用Spark單機…

AI音樂的革命:邁向格萊美級別的藝術表現力

摘要 近期&#xff0c;AI技術在音樂領域的突破性進展令人矚目。這項新技術賦予了AI格萊美級別的歌唱能力&#xff0c;使其不僅能夠進行寫作和繪畫創作&#xff0c;還能以接近人類的藝術表現力演繹音樂作品。這一成就標志著AI在藝術領域的技術進步達到了新的高度&#xff0c;為…

SAP消息號類型(E/I/W)的定制

比如這樣的M8088的標準的消息號&#xff0c;希望變更消息類型&#xff0c;查詢之后&#xff0c;網上提供的消息&#xff0c;都是SE91,OMRM&#xff0c;OBA5之類的消息。事實上&#xff0c;SE91是不能變更消息類型的。 而在OMRM界面&#xff0c;只看到有限的幾個消息號。 原來&a…

wazuh安全管理工具

Wazuh 通過監控操作系統和應用程序層面的終端設備&#xff0c;增強您基礎設施的安全可見性。其核心功能涵蓋日志分析、文件完整性監控、入侵檢測以及合規性監控。 一、介紹 1. 核心功能 1.1 主機入侵檢測&#xff08;HIDS&#xff09; 文件完整性監控&#xff08;FIM&#…

SAP-ABAP:OData 協議深度解析:架構、實踐與最佳應用

OData 協議深度解析:架構、實踐與最佳應用 一、協議基礎與核心特性 協議定義與目標 定位:基于REST的開放數據協議,標準化數據訪問接口,由OASIS組織維護,最新版本為OData v4.01。設計哲學:通過統一資源標識符(URI)和HTTP方法抽象數據操作,降低異構系統集成復雜度。核心…

MATLAB 控制系統設計與仿真 - 29

用極點配置設計伺服系統 方法1-前饋修正 對于一個可控的系統&#xff0c;我們知道可以用極點配置來得到系統的動態響應指標&#xff0c;但是系統有時會存在較大的靜態誤差。 例如&#xff1a; 系統的狀態矩陣如下&#xff0c;試求取其階躍響應。 MATLAB 代碼如下&#xff1…

編譯原理——自底向上語法優先分析

文章目錄 自底向上優先分析概述一、自底向上優先分析概述二、簡單優先分析法&#xff08;一&#xff09;優先關系定義&#xff08;二&#xff09;簡單優先文法的定義&#xff08;三&#xff09;簡單優先分析法的操作步驟 三、算法優先分析法&#xff08;一&#xff09;直觀算符…

Opencv計算機視覺編程攻略-第四節 圖直方圖統計像素

Opencv計算機視覺編程攻略-第四節 圖直方圖統計像素 1.計算圖像直方圖2.基于查找表修改圖像3.直方圖均衡化4.直方圖反向投影進行內容查找5.用均值平移法查找目標6.比較直方圖搜索相似圖像7.用積分圖統計圖像 1.計算圖像直方圖 圖像統計直方圖的概念 圖像統計直方圖是一種用于描…

5、vim編輯和shell編程【超詳細】

一、vim 1、了解 Vim (Vi IMproved) 是一款功能強大的文本編輯器。 正常模式&#xff1a;vim 文件&#xff0c;剛打開的樣子vim模式&#xff1a;輸入文本的地方命令模式&#xff1a;輸入 :wq等等的位置&#xff0c;可以對文本進行一些操作&#xff0c;比如&#xff1a;保存文…

《Robust Synthetic-to-Real Transfer for Stereo Matching》

論文地址&#xff1a;https://arxiv.org/pdf/2403.07705 源碼地址&#xff1a;https://github.com/jiaw-z/DKT-Stereo 概述 通過在合成數據上預訓練的模型在未見領域上表現出強大的魯棒性。然而&#xff0c;在現實世界場景中對這些模型進行微調時&#xff0c;其領域泛化能力可…

藍橋杯第10屆 后綴表達式

題目描述 給定 N 個加號、M 個減號以及 NM1 個整數 A1,A2,???,ANM1?&#xff0c;小明想知道在所有由這N 個加號、M 個減號以及 NM1 個整數湊出的合法的 后綴表達式中&#xff0c;結果最大的是哪一個&#xff1f; 請你輸出這個最大的結果。 例如使用 1 2 3 -&#xff0c…

C++前綴和

個人主頁&#xff1a;[PingdiGuo_guo] 收錄專欄&#xff1a;[C干貨專欄] 大家好&#xff0c;今天我們來了解一下C的一個重要概念&#xff1a;前綴和 目錄 1.什么是前綴和 2.前綴和的用法 1.前綴和的定義 2.預處理前綴和數組 3.查詢區間和 4.數組中某個區間的和是否為特定…

uni app跨端開發遇到的問題

技術棧 uni app&#xff0c;vue3&#xff0c;uview puls&#xff0c;map… nvue 因為項目中有地圖&#xff0c;要使用到map標簽&#xff0c;所以考慮用原生nvue開發&#xff0c;它是有痛點的&#xff0c;首先瀏覽器不支持&#xff0c;我是要開發ios和Android&#xff0c;所以…

SQL注入操作

sql注入 一&#xff0c;SQL注入分類按照注入的網頁功能類型分類按照注入點值的屬性分類基于從服務器返回內容按照注入的程度和順序 一&#xff0c;SQL注入分類 按照注入的網頁功能類型分類 登錄注入cms注入 cms邏輯&#xff1a;index.php首頁展示內容&#xff0c;具有文章列表…

微信 MMTLS 協議詳解(五):加密實現

常用的解密算法&#xff0c;對稱非對稱 加密&#xff0c;密鑰協商&#xff0c; 帶消息認證的加解密 #生成RSA 密鑰對 void GenerateRsaKeypair(std::string& public_key,std::string& private_key) {RSA* rsa RSA_new();BIGNUM* bn BN_new();// 生成 RSA 密鑰對BN_s…

ROS melodic 安裝 python3 cv_bridge

有時候&#xff0c;我們需要處理這些兼容性問題。此處列舉我的過程&#xff0c;以供參考 mkdir -p my_ws_py39/src cd my_ws_py39 catkin_make_isolated-DPYTHON_EXECUTABLE/usr/bin/python3 \-DPYTHON_INCLUDE_DIR/usr/include/python3.8 \-DPYTHON_LIBRARY/usr/lib/x86_64-l…

深入學習:SpringQuartz的配置方式!

全文目錄&#xff1a; 開篇語前言摘要概述1. 基于 XML 的傳統配置配置步驟1.1 Maven 依賴1.2 XML 配置文件1.3 實現 Job 類 2. 基于 Java Config 的現代配置方式配置步驟2.1 Maven 依賴2.2 配置類2.3 實現 Job 類 3. 動態任務調度動態添加任務動態刪除任務 4. Quartz 持久化配置…

ClickHouse與TiDB實操對比:從入門到實戰的深度剖析

ClickHouse與TiDB實操對比&#xff1a;從入門到實戰的深度剖析 寶子們&#xff0c;在當今數據驅動的時代&#xff0c;選擇合適的數據庫對于處理海量數據和支撐業務發展至關重要。ClickHouse和TiDB作為兩款備受關注的數據庫&#xff0c;各自有著獨特的優勢和適用場景。今天&…

element-ui messageBox 組件源碼分享

messageBox 彈框組件源碼分享&#xff0c;主要從以下兩個方面&#xff1a; 1、messageBox 組件頁面結構。 2、messageBox 組件屬性。 一、組件頁面結構。 二、組件屬性。 2.1 title 標題&#xff0c;類型為 string&#xff0c;無默認值。 2.2 message 消息正文內容&#xf…