RabbitMQ 延遲隊列,消息延遲推送

應用場景

目前常見的應用軟件都有消息的延遲推送的影子,應用也極為廣泛,例如:

  • 淘寶七天自動確認收貨。在我們簽收商品后,物流系統會在七天后延時發送一個消息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。
  • 12306 購票支付確認頁面。我們在選好票點擊確定跳轉的頁面中往往都會有倒計時,代表著 30 分鐘內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,如果我們在30分鐘內完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場景中,如果我們使用下面兩種傳統解決方案無疑大大降低了系統的整體性能和吞吐量:

  • 使用 redis 給訂單設置過期時間,最后通過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較于消息的延遲推送性能較低,因為我們知道 redis 都是存儲于內存中,我們遇到惡意下單或者刷單的將會給內存帶來巨大壓力。
  • 使用傳統的數據庫輪詢來判斷數據庫表中訂單的狀態,這無疑增加了IO次數,性能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用內存,而且沒有持久化策略,系統宕機或者重啟都會丟失訂單信息。

消息延遲推送的實現

首先我們創建交換機和消息隊列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MQConfig {public static final String LAZY_EXCHANGE = "Ex.LazyExchange";public static final String LAZY_QUEUE = "MQ.LazyQueue";public static final String LAZY_KEY = "lazy.#";@Beanpublic TopicExchange lazyExchange(){//Map<String, Object> pros = new HashMap<>();//設置交換機支持延遲消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);exchange.setDelayed(true);return exchange;}@Beanpublic Queue lazyQueue(){return new Queue(LAZY_QUEUE, true);}@Beanpublic Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);}
}復制代碼

我們在 Exchange 的聲明中可以設置exchange.setDelayed(true)來開啟延遲隊列,也可以設置為以下內容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現的。

 //Map<String, Object> pros = new HashMap<>();//設置交換機支持延遲消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);復制代碼

發送消息時我們需要指定延遲推送的時間,我們這里在發送消息的方法中傳入參數 new MessagePostProcessor() 是為了獲得 Message對象,因為需要借助 Message對象的api 來設置延遲時間。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class MQSender {@Autowiredprivate RabbitTemplate rabbitTemplate;//confirmCallback returnCallback 代碼省略,請參照上一篇public void sendLazy(Object message){rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 時間戳 全局唯一CorrelationData correlationData = new CorrelationData("12345678909"+new Date());//發送消息時指定 header 延遲時間rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//設置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//message.getMessageProperties().setHeader("x-delay", "6000");message.getMessageProperties().setDelay(6000);return message;}}, correlationData);}
}復制代碼

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同于我們手動設置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/*** Set the x-delay header.* @param delay the delay.* @since 1.6*/
public void setDelay(Integer delay) {if (delay == null || delay < 0) {this.headers.remove(X_DELAY);}else {this.headers.put(X_DELAY, delay);}
}復制代碼

消費端進行消費

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
public class MQReceiver {@RabbitListener(queues = "MQ.LazyQueue")@RabbitHandlerpublic void onLazyMessage(Message msg, Channel channel) throws IOException{long deliveryTag = msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, true);System.out.println("lazy receive " + new String(msg.getBody()));}復制代碼

測試結果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {@Autowiredprivate MQSender mqSender;@Testpublic void sendLazy() throws Exception {String msg = "hello spring boot";mqSender.sendLazy(msg + ":");}
}復制代碼

果然在 6 秒后收到了消息 lazy receive hello spring boot:

Java學習、面試;文檔、視頻資源免費獲取

轉載于:https://juejin.im/post/5cf7c2cd51882537465f29bf

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/251693.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/251693.shtml
英文地址,請注明出處:http://en.pswp.cn/news/251693.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

windows Navicat Premium連接oracle

需要下載并指定Instant Client 下載地址&#xff1a;在oracle官網搜索Instant Client Downloads選擇自己需要的客戶端 //說明 //Navicat 版本 9 或以上捆綁了 instant client&#xff0c;但是捆綁的用不了&#xff0c;捆綁的10.2。因此下載高版本替換之 //版本有要求&#xff0…

關于arraylist的擴容機制

ArrayList會自動改變size的長度&#xff1a; 首先&#xff0c;ArrayList定義了一個私有的未被序列化的數組elementData&#xff0c;用來存儲ArrayList的對象列表&#xff08;注意只定義未初始&#xff09;&#xff1a;private transient Object[] elementData;   其次&am…

不同級別UI設計師的區別有哪些?

不同等級的UI設計師在工作內容以及基本的薪資待遇方面也是有區別的&#xff0c;很多在UI培訓學校學習的小伙伴們并不知道各個等級的UI設計師工作內容有什么差別&#xff0c;那么合肥學碼思小編就給大家總結一下這些區別分別表現在哪些方面。 一、初級UI設計師 初級UI設計師的主…

Linux命令行參數前加--,-和不加杠

參數前“-”的表明后面的參數是字符形式。參數前“--”的則表明后面的參數是單詞形式。參數前有橫的是System V風格。 參數前沒有橫的是BSD風格。 轉載于:https://www.cnblogs.com/YYRise/p/9090476.html

反射筆記-----------------------------

1.反射基本概念&#xff1a; 01.定義&#xff1a; 反射是指在程序運行期間&#xff0c;能夠觀察和修改類或者類的對象的屬性和行為的特性&#xff01; 02.作用&#xff1a; 001.在運行期間獲取類的修飾符&#xff0c;包名&#xff0c;類名&#xff0c;實現的接口&#xff0c;繼…

kubernetes 集群部署

kubernetes 集群部署 環境JiaoJiao_Centos7-1(152.112) 192.168.152.112JiaoJiao_Centos7-2(152.113) 192.168.152.113JiaoJiao_Centos7-3(152.114) 192.168.152.114已開通 4C8G80G 集群規劃 部署方式 環境準備&#xff1a;基于主機名稱通信&#xff0c;時間同步&#xff0c;關…

PHP學習筆記--抽象類和抽象方法的應用

抽象類** 什么是抽象方法&#xff1f;** 定義&#xff1a;如果一個類中的方法&#xff0c;沒有方法體的方法就是抽象方法(就是一個方法沒有使用{}而直接使用分號結束)* * abstract function test(); //抽象方法* * function test(){ //有方法體…

wordpress 顯示數學公式 (MathJax-LaTeX)

blog 不放一堆數學公式怎么能顯得高大上&#xff0c;所以 MathJax-LaTeX 也是必裝的插件之一了。 一、安裝 MathJax-LaTex 插件 直接在 wordpress 插件中&#xff0c;搜索并安裝 MathJax-LaTeX 二、安裝本地 MathJax 服務 不過由默認的 MathJax cdn 服務經常被墻&#xff0c;所…

長春理工大學第十四屆程序設計競賽(重現賽)F.Successione di Fixoracci

鏈接&#xff1a;https://ac.nowcoder.com/acm/contest/912/F 題意&#xff1a; 動態規劃(Dynamic programming&#xff0c;簡稱dp)是一種通過把原問題分解為相對簡單的子問題的方式求解復雜問題的方法。例如&#xff0c;假設小x一步能爬1層或2層臺階&#xff0c;求小x爬n層臺階…

ConstraintLayout

ConstraintLayout使用筆記 具體使用參考&#xff1a;http://blog.csdn.net/guolin_blog/article/details/53122387 ConstraintLayout 好處還是很明顯&#xff0c;確實可以減少嵌套。性能對比參閱&#xff1a;http://www.cnblogs.com/liujingg/p/7161319.html 簡單嵌套Constrain…

css權重

權重大小 內嵌權重為1000 <p style"color: yellow;">ALEX</p> id選擇器的權重為100&#xff0c;類選擇器的權重為10&#xff0c;標簽選擇器的權重為1. /*1 1 1*/ #box1 .wrap2 p{color: red; }當權重一樣的時候&#xff0c;是以后設置的屬性為準&#xf…

手機兩列布局,正方形

手機兩列布局&#xff0c;正方形。 直接貼出調試網站的結果&#xff0c;閱讀效果還不錯。 轉載于:https://www.cnblogs.com/blogzhang/p/11002428.html

python(5)- 基礎數據類型

一 int 數字類型 #abs(x)      返回數字的絕對值&#xff0c;如abs(-10) 返回 10 # ceil(x)    返回數字的上入整數&#xff0c;如math.ceil(4.1) 返回 5 # cmp(x, y)    如果 x < y 返回 -1, 如果 x y 返回 0, 如果 x > y 返回 1 # exp(x)…

B s

666 轉載于:https://www.cnblogs.com/lovelgx/articles/9099239.html

基于HTK的語音撥號系統

為什么80%的碼農都做不了架構師&#xff1f;>>> 基于 HTK 的語音撥號系統 Veket NWPU 2011-6-22 目標&#xff1a; 該系統能夠識別連續說出的數字串和若干組姓名。建模是針對子詞&#xff08; sub-word,eg.. 音素&#xff09;&#xff0c;具有一定的…

MySQL無法重啟問題解決Warning: World-writable config file '/etc/my.cnf' is ignored

為什么80%的碼農都做不了架構師&#xff1f;>>> 今天幫朋友維護服務器&#xff0c;在關閉數據庫的命令發現mysql關不了&#xff0c;提示Warning: World-writable config file /etc/my.cnf is ignored &#xff0c;大概意思是權限全局可寫&#xff0c;任何一個用戶都…

用戶體驗分析: 以 “南通大學教務管理系統微信公眾號” 為例

基于實例分析&#xff0c;體會用戶體驗設計的 7 條準則&#xff0c;分析“南通大學教務管理系統微信公眾號” 在用戶體驗設計方面讓你覺得滿意的地方&#xff08;不少于2點&#xff09;&#xff1b;&#xff08;20分&#xff09;&#xff0c;請陳述理由。 同樣&#xff0c;分析…

JVM學習筆記(一):Java內存區域

由于Java程序是交由JVM執行的&#xff0c;所以我們在談Java內存區域劃分的時候事實上是指JVM內存區域劃分。在討論JVM內存區域劃分之前&#xff0c;先來看一下Java程序具體執行的過程&#xff1a; 首先Java源代碼文件(.java后綴)會被Java編譯器編譯為字節碼文件(.class后綴)&am…

EdgeRouter X設置外網遠程訪問和HTTPS連接指定出口網關

EdgeRouter X雖然小巧&#xff0c;但功能強大&#xff0c;為方便遠程管理&#xff0c;必須對防火墻進行設置&#xff0c;允許從外部進行訪問&#xff0c;由于公網的80、443端口都已被運營商關閉&#xff0c;必須設置端口轉發才能從外部訪問。一、設置外網遠程訪問通過瀏覽器進入…

overflow妙用--去除默認滾動條,內容仍可滾動

在開發中我們往往要去除默認滾動條&#xff0c;但是其在豎直方向的滾動效果仍然需要。 <div id"parent"><div id"child"><h1>文本區</h1><h1>文本區</h1><h1>文本區</h1></div> </div> #pare…