詳解RabbitMQ高級特性之發送方確認機制

目錄

發送方確認

添加配置

常量類

聲明隊列和交換機并綁定二者關系

confirm確認模式?

編寫生產消息代碼

生產消息1

解決方法

多次生產消息2

解決方法

生產消息3

return 模式

編寫生產消息代碼(路由正確)

生產消息1

編寫生產消息代碼(路由錯誤)

生產消息2

面試題


發送方確認

在使? RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰?導致的消息丟失, 但是還有?個問題, 當消息的?產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到達服務器之前已經丟失(?如RabbitMQ重啟, 那么RabbitMQ重啟期間?產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?

RabbitMQ為我們提供了兩種解決?案:

a. 通過事務機制實現
b. 通過發送?確認(publisher confirm) 機制實現

事務機制?較消耗性能, 在實際?作中使?也不多, 下面主要介紹confirm機制來實現發送?的確認.

RabbitMQ為我們提供了兩個?式來控制消息的可靠性投遞:

1. confirm確認模式
2. return退回模式

添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionpublisher-confirm-type: correlated   #消息發送確認
常量類
public class Constants {//發送方確認public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";
}
聲明隊列和交換機并綁定二者關系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//發送方確認@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
confirm確認模式?

Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, ?論消息是否到達
Exchange, 這個監聽都會被執?, 如果Exchange成功收到, ACK( Acknowledge character , 確認字符)為true, 如果沒收到消息, ACK就為false。

RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別

在RabbitMQ中, ConfirmListener和ConfirmCallback都是?來處理消息確認的機制, 但它們屬于不同的客?端庫, 并且使?的場景和?式有所不同.
1. ConfirmListener 是 RabbitMQ Java Client 庫中的接?. 這個庫是 RabbitMQ 官?提供的?個直接與RabbitMQ服務器交互的客?端庫. ConfirmListener 接?提供了兩個?法: handleAck 和handleNack, ?于處理消息確認和否定確認的事件.
2. ConfirmCallback 是 Spring AMQP 框架中的?個接?. 專?為Spring環境設計. ?于簡化與
RabbitMQ交互的過程. 它只包含?個 confirm ?法,?于處理消息確認的回調.
在 Spring Boot 應?中, 通常會使? ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可以利? Spring 的配置和依賴注?功能. ?在使? RabbitMQ Java Client 庫時, 則可能會直接實現ConfirmListener 接?, 更直接的與RabbitMQ的Channel交互

編寫生產消息代碼
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}
public interface ConfirmCallback {
????????/**
????????* 確認回調
????????* @param correlationData: 發送消息時的附加信息 , 通常?于在確認回調中識別特定的消
????????* @param ack: 交換機是否收到消息 , 收到為 true, 未收到為 false
????????* @param cause: 當消息確認失敗時 , 這個字符串參數將提供失敗的原因 . 這個原因可以?于調 試和錯誤處理 .
????????* 成功時 , cause null
????????*/
????????void confirm ( @Nullable CorrelationData correlationData, boolean ack,
????????@Nullable String cause);
}

生產消息1

第一次生產消息

第二次生產消息

此時我們看到,第一次生產消息時能夠正常生產消息,但是當我們第二次生產消息時卻拋異常了,異常信息為:java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate

解決方法

是為什么呢?從異常信息中我們可以看到,ConfirmCallback只能被設置一次,但是從我們的代碼中可以看到,我們每次生產消息時都會設置一次ConfirmCallback,顯然這就是問題所在。

下面我們把剛剛的ConfirmCallback提取出來,重新設置RabbitTemplate。

RabbitTemplateConfig

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}

ProducerController

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息發送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息發送成功";}
}

生產消息

多次生產消息2

此時我們可以看到,我們解決了前面多次生產消息導致的ConfirmCallback被設置多次的問題,但是我們此時的代碼就真的沒有問題了嗎?

當我們生產其它消息時,發現我們并沒有給這個生產消息的方法設置ConfirmCallback啊,但是為什么在控制臺上看到執行了我們設置的ConfrimCallback,這是為什么呢?

是因為我們在前面設置了RabbitTemplate,而且使用了@Autowired注解注入了RabbitTemplate,雖然我們注入了兩個,一個是rabbitTemplate,一個是confirmRabbitTemplate,但是這兩個都是同一個RabbitTemplate。

解決方法

解決辦法:我們在RabbitTemplateConfig中設置兩個RabbitTemplate.

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});return rabbitTemplate;}
}

與此同時,我們修改注入方式:

此時,當再次使用/producer/pres來生產消息時,就沒問題了。

生產消息3

下面我們修改一下生產消息時給消息設置的路由規則:

    @RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息發送成功";}

生產消息

我們知道,上面生產消息時給消息設置的路由規則并不存在,按道理說,應該會打印“未收到消息”而非“收到消息”,原因是因為,上面的confirm確認模式是用來確定生產消息是否到達了交換機,而上面的路由規則是針對消息從交換機到隊列的,解決上面的路由問題使用到另一種確認模式。

return 模式

消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果?條消息?法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回給發送者. 消息退回給發送者時, 我們可以設置?個返回回調?法, 對消息進?處理。

修改RabbitTemplateConfig,設置消息退回的回調方法

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相應的業務處理}}});//消息被退回時, 回調方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
}

使?RabbitTemplate的setMandatory?法設置消息的mandatory屬性為true(默認為false). 這個屬性
的作?是告訴RabbitMQ, 如果?條消息?法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此時 ReturnCallback 就會被觸發。

回調函數中有?個參數: ReturnedMessage, 包含以下屬性:

public class ReturnedMessage {
????????//返回的消息對象,包含了消息體和消息屬性
????????private final Message message;
????????//由 Broker 提供的回復碼 , 表?消息?法路由的原因 . 通常是?個數字代碼,每個數字代表不同 的含義 .
????????private final int replyCode;
????????//?個?本字符串 , 提供了?法路由消息的額外信息或錯誤描述 .
????????private final String replyText;
????????//消息被發送到的交換機名稱
????????private final String exchange;
????????//消息的路由鍵,即發送消息時指定的鍵
????????private final String routingKey;
}
編寫生產消息代碼(路由正確)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "returns test...", correlationData);return "消息發送成功";}
生產消息1

編寫生產消息代碼(路由錯誤)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息發送成功";}
生產消息2

此時我們可以看到,隊列中依舊是只有1條消息,而且代碼執行了消息退回,而且消息退回時打印了消息信息,顯然我們可以看到,消息的路由規則是錯誤的,不會入隊列。

面試題

如何保證RabbitMQ消息的可靠傳輸?

從這個圖中, 可以看出, 消息可能丟失的場景以及解決?案:

1. ?產者將消息發送到 RabbitMQ失敗
????????a. 可能原因: ?絡問題等
????????b. 解決辦法: [發送?確認-confirm確認模式]
2. 消息在交換機中?法路由到指定隊列:
????????a. 可能原因: 代碼或者配置層?錯誤, 導致消息路由失敗
????????b. 解決辦法: [發送?確認-return模式]
3. 消息隊列??數據丟失
????????a. 可能原因: 消息到達RabbitMQ之后, RabbitMQ Server 宕機導致消息丟失.
????????b. 解決辦法: [持久性]. 開啟 RabbitMQ持久化, 就是消息寫?之后會持久化到磁盤, 如果RabbitMQ 掛了, 恢復之后會?動讀取之前存儲的數據. (極端情況下, RabbitMQ還未持久化就掛了, 可能導致少量數據丟失, 這個概率極低, 也可以通過集群的?式提?可靠性)
4. 消費者異常, 導致消息丟失
????????a. 可能原因: 消息到達消費者, 還沒來得及消費, 消費者宕機. 消費者邏輯有問題.
????????b. 解決辦法: [消息確認]. RabbitMQ 提供了 消費者應答機制 來使 RabbitMQ 能夠感知到消費者是否消費成功消息. 默認情況下消費者應答機制是?動應答的, 可以開啟?動確認, 當消費者確認消費成功后才會刪除消息, 從?避免消息丟失. 除此之外, 也可以配置重試機制, 當消息消費異常時, 通過消息重試確保消息的可靠性。

歡迎大家來訪問我的主頁----》鏈接

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909738.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909738.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909738.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Google Play開發者賬號8.3/10.3政策違規自救指南

最近,有一位開發者焦急地向我們訴說,其辛苦開發的多個應用,毫無征兆地全部下架,賬戶提示違反政策 8.3 和 10.3。經過連夜排查,原來是換皮應用與誤導性描述導致的問題。 這并非個例,在 2024 年,G…

pythonday50

作業: 1.好好理解下resnet18的模型結構 2.嘗試對vgg16cbam進行微調策略 import torch import torch.nn as nn import torch.optim as optim import torchvision import torchvision.transforms as transforms from torchvision import models from torch.utils.d…

天貓618高增長背后:電商邁入價值戰新周期

作者 | 曾響鈴 文 | 響鈴說 這次618,來“真”的了。 天貓618玩法變得極致簡單,只設了“官方立減”的85折的基礎優惠,再疊加行業品類券、國補等優惠,最高立減可達50%,十分直觀。 讓消費者省心的結果也是顯而易見的&…

tauri+vue自動更新客戶端打包配置

拉取最新代碼打開項目根目錄下"~.tauri\myapp.key"文件并復制內容 打開項目的powershell窗口,輸入如下內容并回車 $env:TAURI_SIGNING_PRIVATE_KEY"復制的myapp.key" $env:TAURI_SIGNING_PRIVATE_KEY_PASSWORD""然后修改tauri.conf.…

硬件------51單片機

一.基本概念 1.裸機程序 BSP BSP:bord suppord pack 板級支持包 就是程序編寫的內容是沒有操作系統的,直接通過代碼去控制寄存器,讓硬件按照要求去工作。 主要內容:51單片機 IMAX6ULL 2.linux驅動部分 在裸機BSP程序的基礎…

java 基礎方法 list分頁

新增一個list 泛型分類方法 hutools沒這個方法, mybatis 里面的方法不好用 故新增此方法 package com.common.base.util.page;import lombok.Data;import java.util.List;/*** className: VoPage* description: list分頁* author: chenyuanlong* date: 2025年6月16日 0016 上午…

操作系統期末復習--操作系統初識以及進程與線程

操作系統概念與主要功能 操作系統的概念 在信息化時代,軟件是計算機系統的靈魂,而作為軟件核心的操作系統,已與現代計算機系統密不可分、融為一體。計算機系統自下而上大致分為4部分:硬件、操作系統、應用程序和用戶 操作系統管…

使用jhat查看dump.hprof文件內具體對象的屬性值信息

jhat是JDK自帶的堆轉儲分析工具,可以用來查看.hprof文件中對象的具體內容。本文演示使用的是JKD8. 一、啟動jhat 執行啟動命令。 jhat -J-Xmx4g your_heap_dump.hprof -J-Xmx4g表示為jhat分配4GB內存,根據你自己情況調整大小。your_heap_dump.hprof是…

freeRTOS之隊列(queue)

一.概述 1.介紹 隊列(queue)可以用于"任務到任務"、“任務到中斷”、"中斷到任務"直接傳輸信息。 2.核心功能 線程安全:自動處理多任務訪問時的互斥問題。 數據復制:入隊時復制數據(而非引用),…

【python】typing用法

一、基礎類型提示 1. 基本類型注解 # 變量類型注解 age: int 30 name: str "Alice" is_student: bool False height: float 1.752. 函數注解 def greet(name: str, age: int) -> str:return f"Hello {name}, you are {age} years old!"二、組合類…

web前端開發核心基礎:Html結構分析,head,body,不同標簽的作用

前端技術協同關系 協作流程:HTML構建頁面框架—>css美化樣式(選擇器屬性)—>JavaScript實現交互(類似于python的腳本語言)擴展基礎:在上面三項基礎上學習Vue\React、構建工具WePack和瀏覽器工作原理…

精益數據分析(105/126):移動應用核心指標解析與用戶分層營收策略

精益數據分析(105/126):移動應用核心指標解析與用戶分層營收策略 在移動應用市場競爭白熱化的今天,單純追求下載量已無法保證商業成功,精細化運營核心指標成為盈利關鍵。本文將深入解析每日活躍用戶平均營收&#xff…

被CC攻擊了,對服務器有什么影響?

博客正文: 最近,不少網站管理員和運維人員反映遭遇了CC攻擊,導致服務器性能異常甚至癱瘓。那么,CC攻擊究竟會對服務器造成哪些影響?本文將為你簡要解析CC攻擊的原理及其帶來的危害,幫助你更好地理解并應對…

Tensorflow安裝出現dependency conflict錯誤

Python版本: 3.11.4 pip版本已升到最新 電腦上有mac的原裝Python2.x,我裝的3.11.4,還有個什么依賴的3.9 運行 pip3 install tensorflow 出現類似以下錯誤 (我報錯的是另一個不是tensorflow—estimator,但基本就是…

2025年HTTP半開與錯誤攻擊防御指南:原理拆解與實戰防護

你以為限流就能防住HTTP攻擊?黑客用協議畸形包AI調度正在撕裂傳統防線! 一、HTTP半開攻擊:慢速絞殺服務器資源 ? 攻擊原理剖析 HTTP半開攻擊(如Slowloris)是一種應用層DoS攻擊,通過建立大量半開連接耗盡…

Mybatis(XML映射文件、動態SQL)

目錄 基礎操作 準備: 刪除: 新增: 更新: 查詢: 條件查詢: XML映射文件 動態SQL if foreach sql&include 基礎操作 準備: 準備數據庫表 創建一個新的springboot工程&#xff0…

python校園拼團系統

目錄 技術棧介紹具體實現截圖系統設計研究方法:設計步驟設計流程核心代碼部分展示研究方法詳細視頻演示試驗方案論文大綱源碼獲取/詳細視頻演示 技術棧介紹 Django-SpringBoot-php-Node.js-flask 本課題的研究方法和研究步驟基本合理,難度適中&#xf…

多模態大語言模型arxiv論文略讀(127)

When SAM2 Meets Video Camouflaged Object Segmentation: A Comprehensive Evaluation and Adaptation ?? 論文標題:When SAM2 Meets Video Camouflaged Object Segmentation: A Comprehensive Evaluation and Adaptation ?? 論文作者:Yuli Zhou, …

劍指offer32_二叉搜索樹的后序遍歷序列

二叉搜索樹的后序遍歷序列 輸入一個整數數組,判斷該數組是不是某二叉搜索樹的后序遍歷的結果。 如果是則返回true,否則返回false。 假設輸入的數組的任意兩個數字都互不相同。 數據范圍 數組長度 [ 0 , 1000 ] [0,1000] [0,1000]。 樣例 輸入&…

《仿盒馬》app開發技術分享-- 訂單結合優惠券結算(端云一體)

技術棧 Appgallery connect 開發準備 上一節我們已經實現了優惠券的選擇,并且成功的把券后的價格也展示給用戶,不能使用的優惠券我們也用友好的方式告知用戶,這一節我們來實現優惠券內容的下一步,優惠券內容結合訂單進行結算提…