Acknowledgment.nack方法重試消費kafka消息異常

文章目錄

  • 問題
    • 示例
    • 異常
  • 原因
    • nack方法
    • Acknowledgment接口
    • 實現類:ConsumerAcknowledgment
    • 實現類:ConsumerBatchAcknowledgment
  • 解決方案
    • 1 批量消費指定index
      • 示例
    • 2 單條消費
      • 示例

問題

使用BatchAcknowledgingMessageListener 批量消費Kafka消息,成功則手動提交offset,失敗則重試。消費成功的情況下沒有問題,但消費失敗情況下,調用nack方法重試時則報異常。

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 處理多條消息acknowledgment.acknowledge(); // 成功處理后提交偏移量} catch (Exception e) {// 消息處理失敗,30min后重試// nack作用:將會在指定sleep時間后,重新消費消息。在sleep期間內,不會消費新消息。acknowledgment.nack(30 * 60 * 1000); // 這里報了異常}}
}

上邊的代碼乍一看沒啥問題,編譯,啟動也都沒報錯。但是在執行nack的時候進到了Acknowledgment接口默認nack(sleep) 方法里邊,并拋出異常。

nack(sleep) is not supported by this Acknowledgment

異常

在這里插入圖片描述
![在這里插入圖片描述](https://i-blog.csdnimg.cn/direct/8bbf4f37af134374bf1f4e34c2687dbe.png

原因

Acknowledgment接口有兩個nack方法:nack(long sleep)nack(int index, long sleep), 以及兩個實現類ConsumerAcknowledgmentConsumerBatchAcknowledgment。ConsumerAcknowledgment僅實現了nack(long sleep),而ConsumerBatchAcknowledgment僅實現了nack(int index, long sleep)。

nack方法

注意:調用nack方法后,將會在指定sleep時間后,重新消費消息。在sleep期間內,不會消費新消息。

Acknowledgment接口

在這里插入圖片描述

實現類:ConsumerAcknowledgment

在這里插入圖片描述

實現類:ConsumerBatchAcknowledgment

在這里插入圖片描述

這樣的設計也很好理解。。
當BatchAcknowledgingMessageListener批量消費消息時, 使用的是ConsumerBatchAcknowledgment,重試時需要告訴ConsumerBatchAcknowledgment要從這批量消息中的哪條開始重試消費,即要指定index值。我的例子中調用的是nack(long sleep),沒有指定index,所以進到了默認方法里,拋了異常。

而使用AcknowledgingMessageListener消費單條消息時,使用的是ConsumerAcknowledgment,重試時它知道重試當前的消息,因為就這一條,所以只需要指定重試時間就可以了。

也就是說批量消費時,重試要調用nack(int index, long sleep),單條消費時,重試要調用nack(long sleep),二者不搭配,就會拋不支持該方法的異常。

解決方案

1 批量消費指定index

示例

public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {int index = 0;try {for (; index < data.size(); index++) {messageHandler.handle(data.get(index)); // 處理單條消息}// 成功處理后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 消息處理失敗,30min后重試index及index之后的消息acknowledgment.nack(index, 30 * 60 * 1000);}}}

2 單條消費

改成單條消費消息,調用nack(long sleep)

示例

public class SingleCustomMessageListener implements AcknowledgingMessageListener {private MessageHandler messageHandler;public SingleCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 處理單條消息acknowledgment.acknowledge();} catch (Exception e) {acknowledgment.nack(30 * 60 * 1000);}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/72083.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/72083.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/72083.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java 反序列化 - commons collection 之困(一)

#01多余的碎碎念 說到 java 反序列化&#xff0c;去搜索的話能看到網上有很多分析關于 commons collection 利用鏈的文章&#xff0c;emm 我一開始看不懂&#xff0c;看到很多代碼的圖頭暈。 這篇文章的話其實是我跟著 p 神的文章一路走下來的&#xff0c;所以整個邏輯會按照…

python LLM工具包

阿里云鏡像pypi http://mirrors.aliyun.com/pypi/simple/ modelscope魔塔 pip install modelscope https://modelscope.cn/docs/models/download Sentence-transformers pip install -U sentence-transformers pip3 install torch -i https://pypi.tuna.tsinghua.edu.cn/sim…

Linux賬號和權限管理

用戶賬戶管理 理論 /etc/passwd 該目錄用于保存用戶名&#xff0c;宿主目錄&#xff0c;登錄shel等基本信息 /etc/shadow 該目錄用于保存 用戶密碼&#xff0c;賬戶有效期等信息 圖上每一行中都有用“&#xff1a;”隔斷的字段 字段含義&#xff1a; 第1字段:用戶賬號的名…

晉升系列4:學習方法

每一個成功的人&#xff0c;都是從底層開始打怪&#xff0c;不斷的總結經驗&#xff0c;一步一步打上來的。在這個過程中需要堅持、總結方法論。 對一件事情長久堅持的人其實比較少&#xff0c;在堅持的人中&#xff0c;不斷的總結優化的更少&#xff0c;所以最終達到高級別的…

win32匯編環境,對話框中使用樹形視圖示例四

;運行效果,當點擊張遼時,展示張遼的圖像 ;當點擊曹仁時,展示曹仁的圖像 ;win32匯編環境,對話框中使用樹形視圖示例四 ;當點擊樹形視圖treeview控件中的某項時,展示某些功能。這里展示的是當點到某個將領時,顯示某個將領的圖像 ;直接抄進RadAsm可編譯運行。重要部分加備注。…

智慧停車小程序:實時車位查詢、導航與費用結算一體化

智慧停車小程序:實時車位查詢、導航與費用結算一體化 一、城市停車困境的數字化突圍 中國機動車保有量突破4.3億輛,但車位供給缺口達8000萬。傳統停車管理模式存在三大致命傷: 盲盒式尋位:62%的車主遭遇"地圖顯示有位,到場已滿員"的窘境迷宮式導航:商場停車場…

Windows server網絡安全

摘要 安全策略 IP安全策略&#xff0c;簡單的來說就是可以通過做相應的策略來達到放行、阻止相關的端口&#xff1b;放行、阻止相關的IP&#xff0c;如何做安全策略&#xff0c;小編為大家詳細的寫了相關的步驟&#xff1a; 解說步驟&#xff1a; 阻止所有&#xff1a; 打…

充電樁快速搭建springcloud(微服務)+前后端分離(vue),客戶端實現微信小程序+ios+app使用uniapp(一處編寫,處處編譯)

充電樁管理系統是專為中小型充電樁運營商、企業和個人開發者設計的一套高效、靈活的管理平臺。系統基于Spring Cloud微服務架構開發&#xff0c;采用模塊化設計&#xff0c;支持單機部署與集群部署&#xff0c;能夠根據業務需求動態擴展。系統前端使用uniapp框架&#xff0c;可…

小肥柴慢慢手寫數據結構(C篇)(4-3 關于棧和隊列的討論)

小肥柴慢慢學習數據結構筆記&#xff08;C篇&#xff09;&#xff08;4-3 關于棧和隊列的討論&#xff09; 目錄1 雙端棧/隊列2 棧與隊列的相互轉化2-1 棧轉化成隊列2-2 隊列轉化成棧 3 經典工程案例3-1 生產者和消費者模型&#xff08;再次重溫環形緩沖區&#xff09;3-2 MapR…

labview實現大小端交換移位

在解碼時遇到了大小端交換的問題&#xff0c;需要把高低字節的16進制值進行互換&#xff0c;這里一時間不知道怎么操作&#xff0c;本來打算先把16進制轉字節數組&#xff0c;算出字節數組的大小&#xff0c;然后通過模2得到0&#xff0c;1&#xff0c;來判斷是否為奇數位和偶數…

在Windows系統上安裝和配置Redis服務

&#x1f31f; 在Windows系統上安裝和配置Redis服務 Redis是一個高性能的鍵值存儲數據庫&#xff0c;廣泛用于緩存、消息隊列和實時分析等場景。雖然Redis最初是為Linux設計的&#xff0c;但也有Windows版本可供使用。今天&#xff0c;我將詳細介紹如何在Windows系統上安裝Red…

Ateme在云端構建可擴展視頻流播平臺

Akamai Connected Cloud幫助Ateme客戶向全球觀眾分發最高質量視頻內容。 “付費電視運營商和內容提供商現在可以在Akamai Connected Cloud上通過高質量視頻吸引觀眾&#xff0c;并輕松擴展。”── Ateme首席戰略官Rmi Beaudouin ? Ateme是全球領先的視頻壓縮和傳輸解決方案提…

DeepSeek進階應用(一):結合Mermaid繪圖(流程圖、時序圖、類圖、狀態圖、甘特圖、餅圖)

&#x1f31f;前言: 在軟件開發、項目管理和系統設計等領域&#xff0c;圖表是表達復雜信息的有效工具。隨著AI助手如DeepSeek的普及&#xff0c;我們現在可以更輕松地創建各種專業圖表。 名人說&#xff1a;博觀而約取&#xff0c;厚積而薄發。——蘇軾《稼說送張琥》 創作者&…

deepseek R1提供的3d迷宮設計方案

一、技術選型方案 核心渲染技術 &#x1f3a8; 采用Raycasting算法模擬3D透視效果使用Canvas 2D上下文進行逐像素繪制材質貼圖系統實現墻面差異化表現 迷宮數據結構 &#x1f5fa;? 二維數組存儲迷宮布局&#xff08;0:通路&#xff0c;1:墻體&#xff09;遞歸回溯算法生成隨…

時序數據庫TimescaleDB基本操作示例

好的&#xff01;以下是使用 TimescaleDB 的 Java 示例&#xff08;基于 JDBC&#xff0c;因為 TimescaleDB 是 PostgreSQL 的擴展&#xff0c;官方未提供獨立的 Java SDK&#xff09;&#xff1a; 1. 添加依賴&#xff08;Maven&#xff09; <dependency><groupId&g…

linux下的網絡抓包(tcpdump)介紹

linux下的網絡抓包[tcpdump]介紹 前言tcpdump1. 安裝 tcpdump2. 基本抓包命令3. 過濾器使用4. 保存捕獲的數據包 異常指標1. 連接建立與斷開相關指標異常 SYN 包異常 FIN 或 RST 包 2. 流量相關指標異常流量峰值異常源或目的 IP 流量 3. 端口相關指標異常端口使用端口掃描 4. 數…

C/C++中使用CopyFile、CopyFileEx原理、用法、區別及分別在哪些場景使用

文章目錄 1. CopyFile原理函數原型返回值用法示例適用場景 2. CopyFileEx原理函數原型返回值用法示例適用場景 3. 核心區別4. 選擇建議5. 常見問題6.區別 在Windows系統編程中&#xff0c;CopyFile和CopyFileEx是用于文件復制的兩個API函數。它們的核心區別在于功能擴展性和控制…

Bash和Zsh在處理大文件時差異

在處理大文件時&#xff0c;Bash 和 Zsh 的差異主要體現在幾個方面&#xff1a; 1. 腳本執行速度 Bash: 性能: Bash在執行腳本時通常表現良好&#xff0c;尤其是在處理大量數據或大文件時。Bash的腳本執行速度相對較快&#xff0c;適合大多數日常使用場景。優化: Bash在處理大…

不同AI生成的PHP版雪花算法

OpenAI <?php /*** Snowflake 雪花算法生成器* 生成的 64 位 ID 結構&#xff1a;* 1 位 保留位&#xff08;始終為0&#xff0c;防止負數&#xff09;* 41 位 時間戳&#xff08;毫秒級&#xff0c;當前時間減去自定義紀元&#xff09;* 5 位 數據中心ID* 5 …

Android Telephony 四大服務和數據網絡控制面數據面介紹

在移動通信和Android系統中,涉及的關鍵概念和服務以及場景案例說明如下: 一、概念 (一)Android Telephony 的四大服務 介紹Telephony Data 與 Android Data 的四大服務在Android系統中,與電話(Telephony)和移動數據(Data)相關的核心服務主要包括以下四類: 1. Tele…