Java學習--------消息隊列的重復消費、消失與順序性的深度解析?

????????在 Java 分布式系統開發中,消息隊列的應用已十分普遍。但隨著業務規模擴大,消息的重復消費、意外消失、順序錯亂等問題逐漸成為系統穩定性的隱患。本文將從 Java 開發者的視角,深入分析這三大問題的產生原因、業務后果,并結合具體代碼示例給出可落地的解決方案。?
消息重復消費是 Java 開發中最易遇到的問題。例如,使用 Spring Kafka 消費支付消息時,若處理邏輯未做防護,可能導致用戶賬戶被重復扣款,這在金融場景中是致命的。?
產生原因通常有以下幾點
生產者重試配置不當:例如:生產者使用RetryTemplate時,若未設置合理的重試條件,網絡波動時會重復發送消息。?
消費者 ACK 機制誤用:例如:Spring Kafka 默認ack-mode=BATCH,若消費者處理消息后未及時提交 offset,重啟后會重復消費批次內消息。?
分布式事務補償:例如:在 Seata 等分布式事務框架中,回滾后觸發的消息重發可能導致重復。?
在不同場景下,帶來的后果也十分嚴重,如在金融領域會導致出現重復扣款、重復轉賬的問題,從而引發資損和用戶投訴。?在庫存管理系統中會導致重復扣減庫存,從而出現超賣或負庫存。為了解決這個? ? ?

? ? ? ? 通常采用多種方法解決,如:

? ? ? ? (1)冪等性設計

? ? ? ? (2全局 ID+Redis 去重

@Service
public class OrderConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order_pay")public void handlePayMessage(ConsumerRecord<String, String> record) {String messageId = record.headers().lastHeader("messageId").value().toString();// 利用Redis的SETNX判斷是否已處理Boolean isFirstHandle = redisTemplate.opsForValue().setIfAbsent("msg:processed:" + messageId, "1", 24, TimeUnit.HOURS);if (Boolean.TRUE.equals(isFirstHandle)) {// 首次處理:執行業務邏輯orderService.processPayment(record.value());} else {// 重復消息:直接返回log.info("重復消息,messageId:{}", messageId);}}
}

? ? ? ? (3)數據庫唯一約束

@Transactional
public void processPayment(String orderJson) {OrderDTO order = JSON.parseObject(orderJson, OrderDTO.class);// 插入時若messageId重復,會拋出DuplicateKeyExceptionorderMapper.insert(new OrderPO().setOrderId(order.getOrderId()).setMessageId(order.getMessageId()).setStatus("PAID"));
}


消息消失指消息未被消費卻永久丟失,例如用戶下單消息消失會導致訂單 “幽靈下單”,用戶已付款但系統無記錄。?
產生原因通常有以下幾點
生產者未開啟確認機制:如Kafka 生產者未設置acks=all,消息未寫入分區副本即返回成功。?
Spring 容器關閉導致丟失:消費者在@PreDestroy階段被強制關閉,未處理的消息被丟棄。?
中間件配置疏漏:如RabbitMQ 隊列未設置durable=true,重啟后隊列消失;Kafka 未設置log.retention.hours,消息提前過期。?
其也會導致許多嚴重的后果,如:
交易鏈路斷裂:支付成功但訂單狀態未更新,用戶投訴。?
數據同步失敗:跨系統數據未同步,導致庫存、會員信息不一致。?
任務調度失效:定時任務觸發消息丟失,導致任務漏執行。?
通常采用如下的解決方案:全鏈路可靠性保障?
1. 生產者確認機制(

@Configuration?
public class KafkaProducerConfig {?@Bean?public ProducerFactory<String, String> producerFactory() {?Map<String, Object> config = new HashMap<>();?config.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本確認?config.put(ProducerConfig.RETRIES_CONFIG, 3); // 重試3次?config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 開啟冪等性生產?return new DefaultKafkaProducerFactory<>(config);?}?
?@Bean?public KafkaTemplate<String, String> kafkaTemplate() {?KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());?// 發送結果回調?template.setProducerListener(new ProducerListener<>() {?@Override?public void onError(ProducerRecord<String, String> record, Exception exception) {?log.error("消息發送失敗,topic:{}, msg:{}", record.topic(), record.value(), exception);?// 失敗后可存入本地消息表,定時重試?}?});?sdareturn template;?}?
}?


?
2. 消費者手動確認

?
@Configuration?
public class KafkaConsumerConfig {?@Bean?public ConsumerFactory<String, String> consumerFactory() {?Map<String, Object> config = new HashMap<>();?config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 關閉自動提交?return new DefaultKafkaConsumerFactory<>(config);?}?
?@Bean?public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {?ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();?factory.setConsumerFactory(consumerFactory());?factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 手動確認?return factory;?}?
}?
?
// 消費者代碼?
@KafkaListener(topics = "order_create")?
public void handleCreateMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {?try {?orderService.createOrder(record.value());?ack.acknowledge(); // 處理成功:手動提交offset?} catch (Exception e) {?log.error("處理失敗,暫不確認", e);?// 可將消息轉發到死信隊列?}?
}?


消息順序性指消息處理順序與發送順序一致。例如,訂單的 “創建→支付→發貨” 消息若順序錯亂,會導致 “未支付就發貨” 的邏輯錯誤。?產生原因通常有以下幾點:
多線程消費:Spring Kafka 的concurrency>1時,多個線程并行處理同一分區消息。?
分區路由錯誤:Kafka 生產者未指定partitioner.class,導致同一訂單的消息被分配到不同分區。?
重試機制打亂順序:失敗消息進入重試隊列后,后續消息先被處理。?
其可能會導致許多后果,如:
狀態機異常:訂單從 “待支付” 直接跳至 “已發貨”,狀態流轉斷裂。?
數據計算錯誤:賬戶先扣款后充值,導致余額計算錯誤。?
日志時序混亂:分布式追蹤日志順序錯亂,難以排查問題。?
通常開發人員采用以下方案解決
1. 單分區 + 單線程消費
2. 按業務 ID 路由分區?
通過訂單 ID 哈希到固定分區,確保同一訂單的消息在同一分區:?
?

public class OrderPartitioner implements Partitioner {?@Override?public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {?// 訂單ID作為key,哈希后取模分區數?String orderId = (String) key;?List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);?return Math.abs(orderId.hashCode()) % partitions.size();?}?
}?



本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/89615.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/89615.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/89615.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Oracle】centos7離線靜默安裝oracle11g(p13390677_112040)

博文地址&#xff1a;https://blog.csdn.net/gitblog_06670/article/details/142569814 倉庫地址&#xff1a;https://gitcode.com/Open-source-documentation-tutorial/31eb1/?utm_sourcedocument_gitcode&indexbottom&typecard 參考安裝地址&#xff1a; 收費版&…

智能設備暢想

### 智能設備暢想 突然想到了一個好主意 因為最近在查無人機的相關資料&#xff08;很早之前就想搞個無人機玩玩但始終沒有買&#xff09; 在了解自組裝方面的內容時&#xff0c;和AI溝通了下 正好之前組裝的 小智AI 基本上已經完善了&#xff0c;也正在考慮其在其他方向拓展的…

SpringAI——ChatModel

我的前面一篇文章&#xff08;SpringAI——ChatClient配置與使用&#xff09;中講了ChatClient&#xff0c;它是一個構建于 ChatModel 之上的高層封裝&#xff0c;它提供了更豐富的對話交互能力。可以這么說ChatModel相當于發動機&#xff0c;ChatClient相當于一臺含有發動機、…

Zabbix監控K8S的PV信息詳細教程!

文將介紹如何使用Zabbix自定義鍵值腳本方式監控K8S的PV卷狀態等信息。 在Kubernetes (K8S) 中&#xff0c;PersistentVolume (PV) 是集群中的一個抽象層&#xff0c;它代表了底層存儲資源&#xff0c;例如網絡存儲系統&#xff08;如NFS、Ceph、GlusterFS等&#xff09;或本地存…

wx小程序原生開發使用高德地圖api

第一步&#xff1a;注冊高德地圖api的key第二步&#xff1a;下載amap-wx.js 放到項目的某個目錄第三步&#xff1a;配置app.json&#xff08;必須&#xff0c;因為需要定位功能&#xff0c;&#xff09;"requiredPrivateInfos": ["getLocation"],"per…

如何通過mac的前24bit,模糊確認是那一臺什么樣的設備

MAC Address Lookup - MAC/OUI/IAB/IEEE Vendor Manufacturer Search Wireshark ? Go Deep 上面這兩個網址提供了&#xff0c;正對mac 的前24位&#xff0c;查找對應的網絡設備廠商信息&#xff0c;可以讓我們在運維過程中模糊的判斷大約是什么型號的設備 使用macvendorloo…

【爬蟲】04 - 高級數據存儲

爬蟲04 - 高級數據存儲 文章目錄爬蟲04 - 高級數據存儲一&#xff1a;加密數據的存儲二&#xff1a;JSON Schema校驗三&#xff1a;云原生NoSQL(了解)四&#xff1a;Redis Edge近端計算(了解)五&#xff1a;二進制存儲1&#xff1a;Pickle2&#xff1a;Parquet一&#xff1a;加…

UDP和TCP的主要區別是什么?

在網絡通信中&#xff0c;TCP&#xff08;傳輸控制協議&#xff09;和UDP&#xff08;用戶數據報協議&#xff09;是兩種核心的傳輸層協議。它們各自的特點和應用場景截然不同&#xff0c;理解兩者的區別對于選擇合適的通信方式至關重要。本文將通過幾個關鍵點&#xff0c;用簡…

Softhub軟件下載站實戰開發(十八):軟件分類展示

Softhub軟件下載站實戰開發&#xff08;十八&#xff09;&#xff1a;軟件分類展示 &#x1f5a5;? 在之前文章中&#xff0c;我們實現了后臺管理相關部分&#xff0c;本篇文章開始我們來實現用戶端頁面&#xff0c;由于內網使用&#xff0c;不需要sso優化等特性&#xff0c;我…

linux--------------------BlockQueue的生產者消費模型

1.基礎BlockingQueue的生產者消費模型 1.1 BlockQueue 在多線程編程中阻塞隊列是一種常用于實現生產者和消費者模型的數據結構&#xff0c;它與普通的隊列區別在于&#xff0c;當隊列為空時&#xff0c;從隊列獲取元素的操作將被阻塞&#xff0c;直到隊列中放入了新的數據。當…

堆排序算法詳解:原理、實現與C語言代碼

堆排序&#xff08;Heap Sort&#xff09;是一種高效的排序算法&#xff0c;利用二叉堆數據結構實現。其核心思想是將待排序序列構造成一個大頂堆&#xff08;或小頂堆&#xff09;&#xff0c;通過反復調整堆結構完成排序。下面從原理到實現進行詳細解析。一、核心概念&#x…

SSM框架——注入類型

引用類型的注入&#xff1a;Setter方法簡單類型的注入&#xff1a;定義簡單實例和方法在配置文件中對bean進行配置&#xff0c;使用porperty屬性 值用value&#xff08;ref是用來獲取bean的&#xff09;構造器方法&#xff1a;構造器方法中需要寫name&#xff0c;這樣程序就會耦…

信息學奧賽一本通 1552:【例 1】點的距離

【題目鏈接】 ybt 1552&#xff1a;【例 1】點的距離 【題目考點】 1. 最近公共祖先&#xff08;LCA&#xff09;&#xff1a;倍增求LCA 知識點講解見&#xff1a;洛谷 P3379 【模板】最近公共祖先&#xff08;LCA&#xff09; 【解題思路】 首先用鄰接表保存輸入的無權圖…

1Panel中的OpenResty使用alias

問題 在服務器上使用了1Panel的OpenResty來管理網站服務&#xff0c;當作是一個Nginx用&#xff0c;想做一個alias來直接管理某個文件夾的文件&#xff0c;于是直接在其中一個網站中使用了alias配置。 location /upload {alias /root/upload;autoindex on;charset utf-8;charse…

小明記賬簿煥新記:從單色到多彩的主題進化之路

【從冷靜藍到多彩世界&#xff0c;這一次我們重新定義記賬美學】 曾經&#xff0c;打開“小明記賬簿”是一片沉穩的藍色海洋&#xff0c;它像一位理性的財務管家&#xff0c;默默守護著你的每一筆收支。但總有人悄悄問&#xff1a;“能不能多一些顏色&#xff1f;”今天&#x…

Apache IoTDB(1):時序數據庫介紹與單機版安裝部署指南

目錄一、Apache IoTDB 是什么&#xff1f;1.1 產品介紹1.2 產品體系1.3 產品架構二、IoTDB 環境配置2.1 Linux系統需準備環境2.2 Windows系統需準備環境2.3 網絡配置2.3.1 關閉防火墻2.3.2 查看端口是否占用2.3.3 避雷經驗三、IoTDB 單機版系統部署安裝指南3.1 產品下載3.2 注意…

Python 圖片爬取入門:從手動下載到自動批量獲取

前言 想批量下載網頁圖片卻嫌手動保存太麻煩&#xff1f;本文用 Python 帶你實現自動爬取&#xff0c;從分析網站到代碼運行&#xff0c;步驟清晰&#xff0c;新手也能快速上手&#xff0c;輕松搞定圖片批量獲取。 1.安裝模塊 在開始爬取圖片前&#xff0c;我們需要準備好工具…

aspect-ratio: 1 / 1樣式在部分手機瀏覽器中失效的問題怎么解決?

最近在uniapp開發時又遇到了安卓手機不兼容問題&#xff0c;ios系統無影響。開發背景&#xff1a;小編想通過網格布局來實現答題卡的布局&#xff0c;實現五列多行的形式。代碼片段&#xff1a;<view class"question-grid"><viewv-for"(question, inde…

RecyclerView與ListView深度對比分析

1. 使用流程對比ListView: 布局XML&#xff1a; 在布局文件中放置 <ListView> 控件&#xff0c;指定 id (如 android:id"id/listView")。數據適配器 (Adapter)&#xff1a; 繼承 BaseAdapter 或 ArrayAdapter / CursorAdapter / SimpleAdapter。 重寫 getCount…

deepseekAI對接大模型的網頁PHP源碼帶管理后臺(可實現上傳分析文件)

前端后端都已進行優化&#xff0c;新增可上傳文件功能&#xff08;拖拽進去也可以&#xff09;&#xff0c;后端進行風格主題設置&#xff0c;優化數據結構&#xff01;依舊測試網站&#xff1a;iEPMS我的工具箱&#xff0c;你的智慧助手&#xff01;還是那句話兄弟們輕點搞我的…