RabbitMQ高級特性1

RabbitMQ高級特性1

  • 一.消息確認
    • 1.消息確認機制
    • 2.手動確認代碼
      • 肯定確認
      • 否定確認1
      • 否定確認2
      • Spring中的代碼
  • 二.持久性
    • 1.交換機持久化
    • 2.隊列的持久化
    • 3.消息的持久化
    • 非持久化代碼實現
    • 三方面都持久化,數據也會丟失
  • 三.發送方確認
    • 1.Confirm確認模式
    • 2.return返回模式
  • 四.總結
    • RabbitMQ保證消息可靠傳輸

一.消息確認

1.消息確認機制

  1. 自動確認:當autoAck等于true時, RabbitMQ 會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正地消費到了這些消息。自動確認模式適合對于消息可靠性要求不高的場景。

  2. 手動確認:當autoAck等于false時,RabbitMQ會等待消費者顯式地調用Basic.Ack命令,回復確認信號后才從內存(或者磁盤)中移去消息。這種模式適合對消息可靠性要求比較高的場景.
    在這里插入圖片描述

2.手動確認代碼

肯定確認

Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ已知道該消息并且成功的處理消息,可以將其丟棄了。

參數說明
1)deliveryTag:消息的唯?標識,它是?個單調遞增的64位的長整型值。 deliveryTag 是每個通道(Channel)獨立維護的,所以在每個通道上都是唯?的。當消費者確認(ack)?條消息時,必須使用對應的通道上進行確認。

2)multiple:是否批量確認。在某些情況下,為了減少網絡流量,可以對?系列連續的 deliveryTag 進行批量確認。值為true則會?次性把ack所有小于或等于指定deliveryTag的消息。值為false,則只確認當前指定deliveryTag的消息。

否定確認1

Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ在2.0.0版本開始引?了 Basic.Reject 這個命令,消費者客戶端可以調用
channel.basicReject方法來告訴RabbitMQ拒絕這個消息。

參數說明

1)deliveryTag:參考channel.basicAck。

2)requeue:表示拒絕后,這條消息如何處理。如果requeue參數設置為true,則RabbitMQ會重新將這條消息存入隊列,以便可以發送給下?個訂閱的消費者。如果requeue參數設置為false,則RabbitMQ會把消息從隊列中移除,而不會把它發送給新的消費者。

否定確認2

Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)

Basic.Reject命令?次只能拒絕?條消息,如果想要批量拒絕消息,則可以使用Basic.Nack這個命令。消費者客戶端可以調用channel.basicNack方法來實現。

參數說明

前面的參數參考上述參數說明。

multiple的參數設置為true則接受deliveryTag編號之前所有未被當前消費者確認的消息,也就是批量處理未被確認的消息。

Spring中的代碼

  1. AcknowledgeMode.NONE
    這種模式下,消息?旦投遞給消費者,不管消費者是否成功處理了消息,RabbitMQ就會自動確認消息,從RabbitMQ隊列中移除消息,如果消費者處理消息失敗,消息可能會丟失。

ym配置

spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://賬號:密碼@IP:端口號/虛擬機listener:simple:acknowledge-mode: none
  1. AcknowledgeMode.AUTO(默認)
    這種模式下,消費者在消息處理成功時會自動確認消息,但如果處理過程中拋出了異常,則不會確認消息,但是會一直嘗試重發消息。

將yml配置中的 acknowledge-mode改成auto。

上述兩種模式代碼相同

Configuration


package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfiguration {//消息確認//隊列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//虛擬機@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}//隊列和虛擬機綁定@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}

** Constants**

package com.example.rabbitmqdemo.constant;
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}

Controller

package com.example.rabbitmqdemo.controller;import com.example.rabbitmqdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack is ok");return "ack is ok!";}
}

Listener

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消費者邏輯System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具體實現的消費者業務邏輯}}
  1. AcknowledgeMode.MANUAL
    手動確認模式下,消費者必須在成功處理消息后顯式調用 basicAck 方法來確認消息。如果消息未被確認,RabbitMQ會認為消息尚未被成功處理,并且會在消費者可用時重新投遞該消息,這種模式提高了消息處理的可靠性,因為即使消費者處理消息后失敗,消息也不會丟失,而是可以被重新處理。

將yml配置中的 acknowledge-mode改成manual。

Listener

package com.example.rabbitmqdemo.listener;import com.example.rabbitmqdemo.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** Created with IntelliJ IDEA.* Description:* User: hp* Date: 2025-04-03* Time: 9:26*/
@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws IOException {//消費者邏輯System.out.printf("接收到消息: %s , deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//不做具體實現的消費者業務邏輯try {//int sum = 3 / 0;//確認消息(肯定)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e) {//否定確認//最后一個參數為true,則發生異常重新入隊,false,為不再入隊channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}}}

二.持久性

1.交換機持久化

交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的。

相當于將交換機的屬性在服務器內部保存,當MQ的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建?交換機,交換機會自動建立,相當于?直存在。

如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換機元數據會丟失,對?個長期使用的交換器來說,建議將其置為持久化的。

設置交換機的持久化

在這里插入圖片描述

2.隊列的持久化

隊列的持久化是通過在聲明隊列時將 durable 參數置為true實現的。

如果隊列不設置持久化,那么在RabbitMQ服務重啟之后,該隊列就會被刪掉,此時數據也會丟失。(隊列沒有了,消息也無處可存了)

隊列的持久化能保證該隊列本?的元數據不會因異常情況而丟失,但是并不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將消息設置為持久化。

咱們前面用的創建隊列的方式都是持久化的。

隊列持久化
在這里插入圖片描述
隊列非持久化

在這里插入圖片描述

3.消息的持久化

消息實現持久化,需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是MessageDeliveryMode.PERSISTENT

設置了隊列和消息的持久化,當 RabbitMQ 服務重啟之后,消息依舊存在。如果只設置隊列持久化,重啟之后消息會丟失。

如果只設置消息的持久化,重啟之后隊列消失,繼而消息也丟失。所以單單設置消息持久化而不設置隊列的持久化顯得毫無意義

非持久化代碼實現

交換機、隊列和綁定

//非持久化隊列@Bean("presQueue")public Queue presQueue() {return QueueBuilder.nonDurable(Constants.PRES_QUEUE).build();}//非持久化交換機@Bean("presExchagne")public DirectExchange presExchange() {return ExchangeBuilder.directExchange(Constants.PRES_EXCHANGE).durable(false).build();}@Bean("presBinding")public Binding presBinding(@Qualifier("presQueue") Queue queue,@Qualifier("presExchagne") Exchange exchange) {//如果參數傳遞的是Exchange類型而不是DirectExchang類型就需要使用noargs作為收尾return BindingBuilder.bind(queue).to(exchange).with("pres").noargs();}

Producer

@RequestMapping("/pres")public String pres() {Message message  = new Message("Presistent test...".getBytes(),new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE,"pres",message);return "pres is ok!";}

RabbitMQ服務器的虛擬機和隊列

在這里插入圖片描述
在這里插入圖片描述

三方面都持久化,數據也會丟失

  1. 從消費者來說,如果在訂閱消費隊列時將autoAck參數設置為true,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據居丟失。這種情況很好解決,將autoAck參數設置為false,并進行手動確認。

  2. 在持久化的消息正確存入RabbitMQ之后,還需要有?段時間(雖然很短,但是不可忽視)才能存?磁盤中。RabbitMQ并不會為每條消息都進行同步存盤(調用內核的fsync方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將會丟失。

三.發送方確認

1.Confirm確認模式

Producer在發送消息的時候,對發送端設置?個ConfirmCallback的監聽,無論消息是否到達Exchange,這個監聽都會被執行,如果Exchange成功收到,ACK( Acknowledge character ,確認字符)為true,如果沒收到消息,ACK就為false。

yml配置

spring:application:name: rabbitmqdemorabbitmq:addresses: amqp://賬號:Miami@IP:端口號/虛擬機listener:simple:#acknowledge-mode: none#acknowledge-mode: autoacknowledge-mode: manualpublisher-confirm-type: correlated #消息發送確認

Configuration

package com.example.rabbitmqdemo.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調函數rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相應的業務處理}}});return rabbitTemplate;}
}

Producer

@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplateConfig.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test...",correlationData);return "confirm is ok!";}

2.return返回模式

Configuration

@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調函數rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("執行了confirm方法");if (ack) {System.out.printf("接收到消息, 消息ID: %s \n",correlationData == null ? null : correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n",correlationData == null ? null : correlationData.getId(),cause);//相應的業務處理}}});//return模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息退回: " + returnedMessage);}});return rabbitTemplate;}

四.總結

RabbitMQ保證消息可靠傳輸

在這里插入圖片描述

Producer -> Broker:發送方確認

  1. Producer -> Exchange :Confirm模式(網絡問題)
  2. Exchange -> Queue : return模式(代碼或者配置層錯誤,導致消息路由失敗)
  3. 隊列移除:死信等

Broker:持久化(RabbitMQ服務器宕機導致消息丟失)

  1. 交換機持久化
  2. 隊列持久化
  3. 消息持久化

Broker -> Consumer 消息確認方式(消費者未來得及消費信息,就宕機了)

  1. 自動確認
  2. 手動確認

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/75551.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/75551.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/75551.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java網絡編程NIO

一、NIO是什么? NIO可以說是比BIO更強大的IO,可以設置非阻塞模式(通過事件的方式監聽數據的到來) BIO是基于socket通信,一個線程對應一個socket連接,讀取數據要一直等待 NIO是基于channel通信,一個線程管…

【動態規劃】二分優化最長上升子序列

最長上升子序列 II 題解 題目傳送門:AcWing 896. 最長上升子序列 II 一、題目描述 給定一個長度為 N 的數列,求數值嚴格單調遞增的子序列的長度最長是多少。 輸入格式: 第一行包含整數 N第二行包含 N 個整數,表示完整序列 輸…

Dify接口api對接,流式接收流式返回(.net)

試了好多種方法除了Console.WriteLine()能打印出來,試了好些方法都不行,不是報錯就是打印只有一行,要么就是接收完才返回...下面代碼實現調用api接收流式數據,并進行流式返回給前端: using Furion.HttpRemote; using …

19-元素顯示模式及浮動(CSS3)

知識目標 掌握標準文檔流的解析規則掌握元素的顯示模式掌握元素浮動屬性語法與使用掌握浮動塌陷解決方法 1. 標準文檔流 2. 元素顯示模式 元素顯示模式就是元素&#xff08;標簽&#xff09;以什么方式進行顯示&#xff0c;比如<div>獨占一行&#xff0c;一行可以放多…

HTML jQuery 項目 PDF 批注插件庫在線版 API 示例教程

本文章介紹 HTML && jQuery Web項目中 PDF 批注插件庫 ElasticPDF 在線版 API 示例教程&#xff0c;API 包含 ① 導出批注后PDF數據&#xff1b;② 導出純批注 json 數據&#xff1b;③ 加載舊批注&#xff1b;④ 切換文檔&#xff1b;⑤ 切換用戶&#xff1b;⑥ 清空批…

CATIA裝配體全自動存儲解決方案開發實戰——基于遞歸算法的產品結構樹批量處理技術

一、功能定位與技術架構 本工具針對CATIA V5裝配體文件管理場景&#xff0c;實現了一套全自動遞歸存儲系統&#xff0c;主要功能包括&#xff1a; ?智能路徑選擇&#xff1a;通過Tkinter目錄對話框實現可視化路徑選擇?產品結構遞歸解析&#xff1a;深度優先遍歷裝配體中的子…

C#:接口(interface)

目錄 接口的核心是什么&#xff1f; 1. 什么是接口&#xff08;Interface&#xff09;&#xff0c;為什么要用它&#xff1f; 2. 如何定義和使用接口&#xff1f; 3.什么是引用接口&#xff1f; 如何“引用接口”&#xff1f; “引用接口”的關鍵點 4. 接口與抽象類的區…

基于卷積神經網絡CNN實現電力負荷多變量時序預測(PyTorch版)

前言 系列專欄:【深度學習:算法項目實戰】?? 涉及醫療健康、財經金融、商業零售、食品飲料、運動健身、交通運輸、環境科學、社交媒體以及文本和圖像處理等諸多領域,討論了各種復雜的深度神經網絡思想,如卷積神經網絡、循環神經網絡、生成對抗網絡、門控循環單元、長短期記…

關于inode,dentry結合軟鏈接及硬鏈接的實驗

一、背景 在之前的博客 缺頁異常導致的iowait打印出相關文件的絕對路徑-CSDN博客 里 2.2.3 一節里&#xff0c;我們講到了file&#xff0c;fd&#xff0c;inode&#xff0c;dentry&#xff0c;super_block這幾個概念&#xff0c;在這篇博客里&#xff0c;我們針對inode和dentr…

游戲引擎學習第201天

倉庫:https://gitee.com/mrxiao_com/2d_game_5 回顧之前的內容&#xff0c;并遇到了一次一階異常&#xff08;First-Chance Exception&#xff09;。 歡迎來到新一期的開發過程&#xff0c;我們目前正在編寫調試接口代碼。 當前&#xff0c;我們已經在布局系統上進行了一些工…

計算機視覺算法實戰——基于YOLOv8的行人流量統計系統

?個人主頁歡迎您的訪問 ?期待您的三連 ? ?個人主頁歡迎您的訪問 ?期待您的三連 ? ?個人主頁歡迎您的訪問 ?期待您的三連? ??? ????????? ?? 引言:智能客流分析的市場需求 在零售、交通、安防等領域,準確的行人流量統計對于商業決策、公共安全管理…

Redis是什么?架構是怎么樣的?

目錄 前言 一,Redis架構 1.1 本地緩存 1.2 遠程緩存 二,強大的Redis優點 2.1 支持多種數據類型 2.2 內存過期策略 2.3 內存淘汰策略 2.4 持久化 三,Redis是什么 前言 我是一個程序員,維護了一個商品服務,它的背后直連Mysql數據庫,假設商品服務對外每秒需要提供1萬次…

藍橋杯真題——傳送陣

原題連接&#xff1a;藍橋杯2024年第十五屆省賽真題-傳送陣 - C語言網 知識點&#xff1a;并查集 題目描述 小藍在環球旅行時來到了一座古代遺跡&#xff0c;里面并排放置了 n 個傳送陣&#xff0c;進入第 i 個傳送陣會被傳送到第 ai 個傳送陣前&#xff0c;并且可以隨時選擇…

彩虹表攻擊

1. 引言 密碼安全一直是信息安全領域的重要課題。攻擊者可以利用**暴力破解(Brute-Force Attack)和字典攻擊(Dictionary Attack)等方式嘗試破解密碼。然而,計算機性能的提升使得這些方法的效率不斷提高,其中彩虹表攻擊(Rainbow Table Attack)**是一種極具威脅性的密碼…

Vue2 監聽器 watcher

文章目錄 前言監聽器的作用&#xff1a;工作流程&#xff1a;基本用法1. 簡單監聽2. 對象形式配置 使用場景1. 執行異步操作2. 監聽路由變化3. 復雜對象/數組變化 關鍵配置項與計算屬性的區別動態添加監聽器注意事項 前言 提示&#xff1a;這里可以添加本文要記錄的大概內容&a…

Linux系統程序設計:從入門到高級Day02

這一篇 我帶大家復習一下&#xff0c;C語言中的文件 那一部分 大家注意 這里的圖并非原創 是當時我老師的圖片 本片作用主要是 后續會有文件相關操作&#xff0c;這篇幫大家復習C語言文件中的內容 有助于大家后面的理解。 文章中代碼大多是圖片格式&#xff0c;是因為這是我…

N元語言模型的時間和空間復雜度計算

對于N元語言模型&#xff0c;時間復雜度是O(V ^ {N-1})&#xff0c;空間復雜度是O(V ^ {N})&#xff0c;N是詞匯表的大小。 空間復雜度&#xff1a;存儲所有可能的N-1元組及其對應的詞的頻次需要大量的存儲空間。例如&#xff0c;對于一個三元模型&#xff08;N3&#xff09;&…

Tmux 核心操作速查指南

Tmux 最常用操作筆記 1. 基本概念 會話&#xff08;Session&#xff09;&#xff1a;一個tmux會話可以包含多個窗口&#xff0c;適合長期任務管理。窗口&#xff08;Window&#xff09;&#xff1a;每個窗口是一個獨立的終端界面&#xff0c;可包含多個面板。面板&#xff08…

哈希表系列一>兩數之和

目錄 題目&#xff1a;方法&#xff1a;暴力代碼&#xff1a;優化后代碼&#xff1a; 題目&#xff1a; 鏈接: link 方法&#xff1a; 暴力代碼&#xff1a; public int[] twoSum(int[] nums, int target) {解法一&#xff1a;暴力解法&#xff1a;int n nums.length;for(int…

端到端機器學習流水線(MLflow跟蹤實驗)

目錄 端到端機器學習流水線(MLflow跟蹤實驗)1. 引言2. 項目背景與意義2.1 端到端機器學習流水線的重要性2.2 MLflow的作用2.3 工業級數據處理需求3. 數據集生成與介紹3.1 數據集構成3.2 數據生成方法4. 機器學習流水線與MLflow跟蹤4.1 端到端機器學習流水線4.2 MLflow跟蹤實驗…