Java異步編程之消息隊列疑難問題拆解

前言

在Java里運用消息隊列實現異步通信時,會面臨諸多疑難問題。這里對實際開發中碰到的疑難為題進行匯總及拆解,使用RabbitMQ和Kafka兩種常見的消息隊列中間件來作為示例,給出相應的解決方案:

一、消息丟失問題

消息在傳輸過程中可能會丟失,這可能發生在生產者發送消息時、消息隊列存儲消息時,或者消費者接收消息時。

解決方案
  1. 生產者確認機制
    • 使用RabbitMQ的發布確認(Publisher Confirms):
channel.confirmSelect(); // 啟用發布確認
channel.basicPublish(exchange, routingKey, null, message.getBytes());
if (!channel.waitForConfirms()) {// 處理發送失敗的情況
}
- Kafka的acks參數設置:
// acks=all表示所有副本都確認后才算發送成功
props.put("acks", "all");
  1. 消息持久化
    • RabbitMQ:
// 聲明隊列時設置持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 發送消息時設置持久化
channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());
- Kafka:消息默認持久化到磁盤。
  1. 消費者確認
    • RabbitMQ手動ACK:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {// 處理消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });

二、消息重復消費問題

由于網絡波動或重試機制,可能會導致消息被重復消費。

解決方案
  1. 冪等設計
    • 數據庫唯一索引:
try {// 插入操作,利用唯一索引避免重復sql = "INSERT INTO orders (order_id, amount) VALUES (?, ?)";
} catch (DuplicateKeyException e) {// 處理重復插入的情況
}
- 狀態機:
public void processOrder(Order order) {if (order.getStatus() == Status.PROCESSED) {return; // 已處理,直接返回}// 處理訂單order.setStatus(Status.PROCESSED);orderRepository.save(order);
}
  1. 全局唯一ID
// 生成唯一ID
String messageId = UUID.randomUUID().toString();
// 發送消息時攜帶ID
channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().messageId(messageId).build(), message.getBytes());// 消費時檢查ID
Set<String> processedIds = new ConcurrentHashMap().newKeySet();
if (processedIds.contains(messageId)) {return; // 已處理,跳過
}
processedIds.add(messageId);

三、消息順序性問題

在某些業務場景下,需要保證消息的順序,比如訂單狀態的變更。

解決方案
  1. 單隊列單消費者
// 創建一個專用隊列處理順序消息
channel.queueDeclare("order_status_queue", true, false, false, null);
// 單個消費者處理該隊列
  1. 分區策略(Kafka)
// 自定義分區器,確保同一訂單的消息發到同一分區
public class OrderPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Order order = (Order) value;return order.getOrderId().hashCode() % cluster.partitionsForTopic(topic).size();}
}

四、消息積壓問題

當消費者處理速度跟不上生產者發送速度時,會導致消息在隊列中積壓。

解決方案
  1. 水平擴展消費者
    • RabbitMQ:增加消費者實例,利用競爭消費機制。
    • Kafka:增加消費者組中的消費者數量,每個消費者處理一個分區。
  2. 優化消費邏輯
// 使用異步處理提高消費速度
CompletableFuture.runAsync(() -> {// 處理耗時操作
});
  1. 拆分隊列
// 根據業務類型拆分隊列
channel.queueDeclare("order_create_queue", true, false, false, null);
channel.queueDeclare("order_pay_queue", true, false, false, null);

五、事務一致性問題

消息隊列的異步特性與數據庫事務的原子性存在沖突。

解決方案
  1. 本地事務 + 消息表
@Transactional
public void createOrder(Order order) {// 1. 插入訂單orderRepository.save(order);// 2. 插入消息表messageRepository.save(new Message(order.getId(), "order_created"));
}// 消息發送服務
@Scheduled(fixedDelay = 1000)
public void sendPendingMessages() {List<Message> pendingMessages = messageRepository.findByStatus(PENDING);for (Message message : pendingMessages) {try {rabbitTemplate.convertAndSend("order_exchange", "order.created", message);message.setStatus(SENT);messageRepository.save(message);} catch (Exception e) {// 記錄日志,后續重試}}
}
  1. 最終一致性模式
// TCC補償模式
public void processOrder(Order order) {// Try階段:預留資源boolean reserved = resourceService.reserve(order);if (reserved) {// 發送確認消息rabbitTemplate.convertAndSend("order_confirm_exchange", "", order);} else {// 發送取消消息rabbitTemplate.convertAndSend("order_cancel_exchange", "", order);}
}

六、分布式事務問題

跨服務的事務一致性是一個復雜問題。

解決方案
  1. 最大努力通知模式
// 訂單服務
@Transactional
public void createOrder(Order order) {// 創建訂單orderRepository.save(order);// 發送消息通知庫存服務rabbitTemplate.convertAndSend("inventory_exchange", "order.created", order.getId());
}// 庫存服務
@RabbitListener(queues = "inventory_queue")
public void handleOrderCreated(Long orderId) {try {// 扣減庫存inventoryService.decrease(orderId);} catch (Exception e) {// 記錄失敗,后續通過定時任務重試}
}
  1. Seata框架
// 使用Seata的@GlobalTransactional注解
@GlobalTransactional
public void placeOrder(Order order) {// 訂單服務操作orderService.createOrder(order);// 庫存服務操作inventoryService.decrease(order.getProductId(), order.getQuantity());// 賬戶服務操作accountService.debit(order.getUserId(), order.getTotalAmount());
}

七、高可用與容災問題

確保消息隊列在故障時能正常工作。

解決方案
  1. 集群部署
    • RabbitMQ:鏡像隊列 + HAProxy/LB。
    • Kafka:多副本 + ISR(In-Sync Replicas)機制。
  2. 自動故障轉移
    • 配置自動重啟和健康檢查:
// Kafka消費者配置
props.put("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
props.put("connections.max.idle.ms", 540000); // 9分鐘無連接則關閉

八、性能調優問題

優化消息隊列的性能。

優化方向
  1. 生產者參數
    • Kafka:
props.put("batch.size", 16384); // 批處理大小
props.put("linger.ms", 1); // 延遲發送
props.put("compression.type", "snappy"); // 壓縮類型
  1. 消費者參數
    • Kafka:
props.put("fetch.min.bytes", 1024 * 1024); // 最小拉取數據量
props.put("max.poll.records", 500); // 每次拉取的最大記錄數
  1. Broker配置
    • Kafka:
num.network.threads=8  # 網絡線程數
num.io.threads=16      # IO線程數
log.flush.interval.messages=10000  # 消息刷盤間隔

總結

Java中使用消息隊列實現異步通信時,需要從多個方面進行考量和處理,包括可靠性、順序性、冪等性、事務一致性等。通過合理的架構設計、技術選型以及優化配置,可以有效解決這些難題,構建出高效、穩定的異步通信系統。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909078.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909078.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909078.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

香橙派3B學習筆記10:snap打包C/C++程序與動態鏈接庫(.so)

esnap打包C/C程序與動態鏈接庫&#xff08;.so&#xff09; 之前已經學會了snap基本的打包程序&#xff0c;現在試試打包C/C程序與動態鏈接庫&#xff08;.so&#xff09; ssh &#xff1a; orangepi本地ip 密碼 &#xff1a; orangepi 操作系統發行版&#xff1a; 基于 Ubun…

【Python工具開發】k3q_arxml 簡單但是非常好用的arxml編輯器,可以稱為arxml殺手包

k3q_arxml 介紹 倉庫地址1 倉庫地址2 極簡的arxml編輯庫&#xff0c;純python實現 用法 from pprint import pp # 可以美化打印對象&#xff0c;不然全打印在一行 import k3q_arxml # 加載arxml文件 io_arxml k3q_arxml.IOArxml(filepaths[test/model_merge.arxml])# 打印…

【CSS-8】深入理解CSS選擇器權重:掌握樣式優先級的關鍵

CSS選擇器權重是前端開發中一個基礎但極其重要的概念&#xff0c;它決定了當多個CSS規則應用于同一個元素時&#xff0c;哪條規則最終會被瀏覽器采用。理解權重機制可以幫助開發者更高效地編寫和維護CSS代碼&#xff0c;避免樣式沖突帶來的困擾。 1. 什么是CSS選擇器權重&…

大語言模型原理與書生大模型提示詞工程實踐-學習筆記

&#x1f4d8; 第五期書生葡語實戰營講座總結 &#x1f399; 主講人&#xff1a;王明&#xff08;東部大學 數據挖掘實驗室 博士生&#xff09; 一、大語言模型的生成原理 架構基礎&#xff1a;采用 Transformer&#xff08;Decoder-only&#xff09;架構&#xff0c;如 GPT …

李沐 《動手學深度學習》 | 實戰Kaggle比賽:預測房價

文章目錄 1.下載和緩存數據集2.數據預處理讀取樣本預處理樣本數值型特征處理特征標準化的好處離散值處理轉換為張量表示 訓練K折交叉驗證模型選擇最終模型確認及結果預測代碼總結提交到Kaggle 房價預測比賽鏈接&#xff1a;https://www.kaggle.com/c/house-prices-advanced-reg…

一鍵部署Prometheus+Grafana+alertmanager對網站狀態進行監控

在建設監控體系的過程中&#xff0c;針對一個系統的監控是多維度的&#xff0c;除了服務器資源狀態、中間件狀態、應用狀態外&#xff0c;對系統訪問狀態的監控也是很有必要&#xff0c;可以在系統訪問出現異常時第一時間通知到我們。本文介紹使用 Docker-compose 方式一鍵部署…

康謀方案 | 高精LiDAR+神經渲染3DGS的完美融合實踐

目錄 一、從點云到高精地圖的重建 1、數據采集 2、點云聚合 3、高精地圖建模 4、三維建模與裝飾 二、顛覆性革新&#xff1a;NeRF 與 3DGS 重建 1、僅需數日&#xff0c;完成街景重建 2、進一步消除 Domain gap&#xff0c;場景逼真如實地拍攝 3、降本增效&#xff0c…

MySQL-事務(TRANSACTION-ACID)管理

目錄 一、什么是事務&#xff1f; 1.1.事務的定義 1.2.事務的基本語句 1.3.事務的四大特性&#xff08;ACID&#xff09; 二、數據庫的并發控制 2.1.什么是并發及并發操作帶來的影響&#xff1f; 2.2.并發操作帶來的隔離級別 三、使用事務的場景 3.1.銀行轉賬場景示例 3.2.模擬…

centos系統docker配置milvus教程

本人使用的是京東云服務器配置milvus 參考教程&#xff1a;https://blog.csdn.net/withme977/article/details/137270087 首先確保安裝了docker 、docker compose docker -- version docker-compose --version創建milvus工作目錄 mkdir milvus # 進入到新建的目錄 cd milvu…

什么是JSON ?從核心語法到編輯器

一、什么是JSON &#xff1f; JSON&#xff0c;即 JavaScript 對象表示法&#xff0c;是一種輕量級、跨語言、純文本的數據交換格式 。它誕生于 JavaScript 生態&#xff0c;但如今已成為所有編程語言通用的 “數據普通話”—— 無論前端、后端&#xff0c;還是 Python、Java&…

計算機網絡(7)——物理層

1.數據通信基礎 1.1 物理層基本概念 物理層(Physical Layer)是所有網絡通信的物理基礎&#xff0c;它定義了在物理介質上傳輸原始比特流(0和1)所需的機械、電氣、功能、過程和規程特性 1.2 數據通信系統模型 信源&#xff1a;生成原始數據的終端設備&#xff0c;常見形態包括…

深度學習基礎知識總結

1.BatchNorm2d 加速收斂&#xff1a;Batch Normalization 可以使每層的輸入保持較穩定的分布&#xff08;接近標準正態分布&#xff09;&#xff0c;減少梯度更新時的震蕩問題&#xff0c;從而加快模型訓練速度。 減輕過擬合&#xff1a;批歸一化引入了輕微的正則化效果&#…

iOS 抖音首頁頭部滑動標簽的實現

抖音首頁的頭部滑動標簽(通常稱為"Segmented Control"或"Tab Bar")是一個常見的UI組件&#xff0c;可以通過以下幾種方式實現&#xff1a; 1. 使用UISegmentedControl 最簡單的實現方式是使用系統自帶的UISegmentedControl&#xff1a; let segmentedCo…

ThreadLocal實現原理

ThreadLocal 是 Java 中實現線程封閉&#xff08;Thread Confinement&#xff09;的核心機制&#xff0c;它通過為每個線程創建變量的獨立副本來解決多線程環境下的線程安全問題。 Thread └── ThreadLocalMap (threadLocals) // 每個線程持有的專屬Map├── Entry[] tab…

【筆記】結合 Conda任意創建和配置不同 Python 版本的雙軌隔離的 Poetry 虛擬環境

如何結合 Conda 任意創建和配置不同 Python 版本的雙軌隔離的Poetry 虛擬環境&#xff1f; 在 Python 開發中&#xff0c;為不同項目配置獨立且適配的虛擬環境至關重要。結合 Conda 和 Poetry 工具&#xff0c;能高效創建不同 Python 版本的 Poetry 虛擬環境&#xff0c;接下來…

defineAsyncComponent

下面,我們來系統的梳理關于 defineAsyncComponent 懶加載 的基本知識點: 一、異步組件核心概念 1.1 什么是異步組件? 異步組件是 Vue 中一種按需加載組件的機制,允許將組件代碼拆分為獨立的 chunk,在需要時再從服務器加載。這種技術能顯著提升應用初始加載速度。 1.2 為…

ANeko v1.0.3 | 在手機里養只寵物貓 實時互動 動畫細膩

ANeko是一款專為喜歡貓咪的用戶設計的互動養寵應用。它讓你在手機屏幕上擁有一只可愛的貓咪動畫&#xff0c;這只貓咪會實時跟隨你的手指觸摸軌跡&#xff0c;帶來生動有趣的互動體驗。該應用不僅保留了用戶熟悉的交互式貓動畫&#xff0c;還結合了現代高清圖形技術&#xff0c…

人工智能AI

AI 簡介 AI 使我們能夠生成可以改進衛生保健的出色軟件,讓人能夠克服生理上的不便,改進智能基礎結構,創造令人驚嘆的娛樂體驗,甚至拯救地球! 什么是 AI? 簡而言之,AI 就是一種模仿人類行為和能力的軟件。 關鍵工作負載包括: 機器學習 - 它通常是 AI 系統的基礎,也是…

Vue 中 data 選項:對象 vs 函數

Vue 中 data 選項&#xff1a;對象 vs 函數 在 Vue 開發中&#xff0c;data 選項可以使用對象或函數形式&#xff0c;了解它們的使用場景非常重要。下面我將通過一個直觀的示例來展示兩者的區別和適用場景。 <!DOCTYPE html> <html lang"zh-CN"> <h…

python打卡第49天

知識點回顧&#xff1a; 通道注意力模塊復習空間注意力模塊CBAM的定義 CBAM 注意力模塊介紹 從 SE 到 CBAM&#xff1a;注意力機制的演進 之前我們介紹了 SE&#xff08;Squeeze-and-Excitation&#xff09;通道注意力模塊&#xff0c;其本質是對特征進行增強處理。現在&#…