mall整合RabbitMQ實現延遲消息

摘要

本文主要講解mall整合RabbitMQ實現延遲消息的過程,以發送延遲消息取消超時訂單為例。RabbitMQ是一個被廣泛使用的開源消息隊列。它是輕量級且易于部署的,它能支持多種消息協議。RabbitMQ可以部署在分布式和聯合配置中,以滿足高規模、高可用性的需求。

項目使用框架介紹

RabbitMQ

RabbitMQ是一個被廣泛使用的開源消息隊列。它是輕量級且易于部署的,它能支持多種消息協議。RabbitMQ可以部署在分布式和聯合配置中,以滿足高規模、高可用性的需求。

RabbitMQ的安裝和使用

  1. 安裝Erlang,下載地址:erlang.org/download/ot…

  1. 安裝RabbitMQ,下載地址:dl.bintray.com/rabbitmq/al…

  1. 安裝完成后,進入RabbitMQ安裝目錄下的sbin目錄

  1. 在地址欄輸入cmd并回車啟動命令行,然后輸入以下命令啟動管理功能:
rabbitmq-plugins enable rabbitmq_management
復制代碼

  1. 訪問地址查看是否安裝成功:http://localhost:15672/

  1. 輸入賬號密碼并登錄:guest guest

  2. 創建帳號并設置其角色為管理員:mall mall

  1. 創建一個新的虛擬host為:/mall

  1. 點擊mall用戶進入用戶配置頁面

  1. 給mall用戶配置該虛擬host的權限

  1. 至此,RabbitMQ的安裝和配置完成。

RabbitMQ的消息模型

標志中文名英文名描述
P生產者Producer消息的發送者,可以將消息發送到交換機
C消費者Consumer消息的接收者,從隊列中獲取消息進行消費
X交換機Exchange接收生產者發送的消息,并根據路由鍵發送給指定隊列
Q隊列Queue存儲從交換機發來的消息
type交換機類型typedirect表示直接根據路由鍵(orange/black)發送消息

Lombok

Lombok為Java語言添加了非常有趣的附加功能,你可以不用再為實體類手寫getter,setter等方法,通過一個注解即可擁有。

注意:需要安裝idea的Lombok插件,并在項目中的pom文件中添加依賴。

業務場景說明

用于解決用戶下單以后,訂單超時如何取消訂單的問題。

  • 用戶進行下單操作(會有鎖定商品庫存、使用優惠券、積分一系列的操作);
  • 生成訂單,獲取訂單的id;
  • 獲取到設置的訂單超時時間(假設設置的為60分鐘不支付取消訂單);
  • 按訂單超時時間發送一個延遲消息給RabbitMQ,讓它在訂單超時后觸發取消訂單的操作;
  • 如果用戶沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返還優惠券、返回積分一系列操作)。

整合RabbitMQ實現延遲消息

在pom.xml中添加相關依賴

<!--消息隊列相關依賴-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依賴-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
復制代碼

修改SpringBoot配置文件

修改application.yml文件,在spring節點下添加Mongodb相關配置。

  rabbitmq:
    host: localhost # rabbitmq的連接地址
    port: 5672 # rabbitmq的連接端口號
    virtual-host: /mall # rabbitmq的虛擬host
    username: mall # rabbitmq的用戶名
    password: mall # rabbitmq的密碼
    publisher-confirms: true #如果對異步消息需要回調必須設置為true
復制代碼

添加消息隊列的枚舉配置類QueueEnum

用于延遲消息隊列及處理取消訂單消息隊列的常量定義,包括交換機名稱、隊列名稱、路由鍵名稱。

package com.macro.mall.tiny.dto;import lombok.Getter;/*** 消息隊列枚舉配置* Created by macro on 2018/9/14.*/
@Getter
public enum QueueEnum {/*** 消息通知隊列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl隊列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交換名稱*/private String exchange;/*** 隊列名稱*/private String name;/*** 路由鍵*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}復制代碼

添加RabbitMQ的配置

用于配置交換機、隊列及隊列與交換機的綁定關系。

package com.macro.mall.tiny.config;import com.macro.mall.tiny.dto.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息隊列配置* Created by macro on 2018/9/14.*/
@Configuration
public class RabbitMqConfig {/*** 訂單消息實際消費隊列所綁定的交換機*/@BeanDirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單延遲隊列隊列所綁定的交換機*/@BeanDirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單實際消費隊列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 訂單延遲隊列(死信隊列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后轉發的交換機.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后轉發的路由鍵.build();}/*** 將訂單隊列綁定到交換機*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 將訂單延遲隊列綁定到交換機*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
復制代碼

在RabbitMQ管理頁面可以看到以下交換機和隊列

交換機及隊列說明

  • mall.order.direct(取消訂單消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發過來,會發送到此隊列。
  • mall.order.direct.ttl(訂單延遲消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發送過來,會轉發到此隊列,并在此隊列保存一定時間,等到超時后會自動將消息發送到mall.order.cancel(取消訂單消息消費隊列)。

添加延遲消息的發送者CancelOrderSender

用于向訂單延遲消息隊列(mall.order.cancel.ttl)里發送消息。

package com.macro.mall.tiny.component;import com.macro.mall.tiny.dto.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 取消訂單消息的發出者* Created by macro on 2018/9/14.*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//給延遲隊列發送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//給消息設置延遲毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}復制代碼

添加取消訂單消息的接收者CancelOrderReceiver

用于從取消訂單的消息隊列(mall.order.cancel)里接收消息。

package com.macro.mall.tiny.component;import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 取消訂單消息的處理者* Created by macro on 2018/9/14.*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}復制代碼

添加OmsPortalOrderService接口

package com.macro.mall.tiny.service;import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.dto.OrderParam;
import org.springframework.transaction.annotation.Transactional;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/
public interface OmsPortalOrderService {/*** 根據提交信息生成訂單*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消單個超時訂單*/@Transactionalvoid cancelOrder(Long orderId);
}復制代碼

添加OmsPortalOrderService的實現類OmsPortalOrderServiceImpl

package com.macro.mall.tiny.service.impl;import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.component.CancelOrderSender;
import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 執行一系類下單操作,具體參考mall項目LOGGER.info("process generateOrder");//下單完成后開啟一個延遲消息,用于當用戶沒有付款時取消訂單(orderId應該在下單后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下單成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 執行一系類取消訂單操作,具體參考mall項目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//獲取訂單超時時間,假設為60分鐘long delayTimes = 30 * 1000;//發送延遲消息cancelOrderSender.sendMessage(orderId, delayTimes);}}復制代碼

添加OmsPortalOrderController定義接口

package com.macro.mall.tiny.controller;import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;/*** 訂單管理Controller* Created by macro on 2018/8/30.*/
@Controller
@Api(tags = "OmsPortalOrderController", description = "訂單管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根據購物車信息生成訂單")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}復制代碼

進行接口測試

調用下單接口

注意:已經將延遲消息時間設置為30秒

項目源碼地址

github.com/macrozheng/…

公眾號

mall項目全套學習教程連載中,關注公眾號第一時間獲取。

轉載于:https://juejin.im/post/5cff98986fb9a07ed36ea139

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387614.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387614.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387614.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

競價打板的關鍵點

競價打板&#xff0c;主要是速度&#xff0c;其他不重要的&#xff0c;如果為了當天盈利大&#xff0c;失去競價打板的本質含義&#xff0c;因為競價可以買到&#xff0c;盤中買不到&#xff0c;才是競價打板的目的&#xff0c;也就是從競價打板的角度看&#xff0c;主要是看習…

Java常見的幾種內存溢出及解決方法

Java常見的幾種內存溢出及解決方法【情況一】&#xff1a;java.lang.OutOfMemoryError:Javaheapspace&#xff1a;這種是java堆內存不夠&#xff0c;一個原因是真不夠&#xff08;如遞歸的層數太多等&#xff09;&#xff0c;另一個原因是程序中有死循環&#xff1b;如果是java…

docker操作之mysql容器

1、創建宿主機器的掛載目錄 /opt/docker/mysql/conf /opt/docker/mysql/data /opt/docker/mysql/logs 2、創建【xxx.cnf】配置文件&#xff0c;內容如下所示&#xff1a; [mysqld]#服務唯一Idserver-id 1port 3306log-error /var/log/mysql/error.log #只能用IP地址skip_nam…

Windows10系統下wsappx占用CPU資源過高?wsappx是什么?如何關閉wsappx進程?

在Windows10系統開機的時候&#xff0c;wsappx進程占用的CPU資源非常高&#xff0c;導致電腦運行速度緩慢&#xff0c;那么我們如何關閉wsappx進程&#xff0c;讓電腦加快運行速度呢&#xff1f;下面就一起來看一下操作的方法吧。 【現象】 1、先來看一下電腦剛開機的時候&…

如何通過Windows Server 2008 R2建立NFS存儲

如何通過Windows Server 2008 R2建立NFS存儲在我們日常工作的某些實驗中&#xff0c;會需要使用存儲服務器。而硬件存儲成本高&#xff0c;如StarWind之類的iSCSI軟存儲解決方案需要單獨下載服務器端程序&#xff0c;且配置比較繁瑣&#xff0c;令很多新手們很是頭疼。事實上&a…

python-windows安裝相關問題

1.python的環境配置&#xff0c;有些時候是沒有配置的&#xff0c;需要在【系統環境】-【path】里添加。 2.安裝pip&#xff1a;從官網下載pip包&#xff0c;然后到包目錄》python setup.py install 安裝 3.安裝scrapyd&#xff1a;正常使用pip3 install scrapyd安裝不起&…

hdu 1542/1255 Atlantis/覆蓋的面積

1542 1255 兩道掃描線線段樹的入門題。 基本沒有什么區別&#xff0c;前者是模板&#xff0c;后者因為是求覆蓋次數至少在兩次以上的&#xff0c;這個同樣是具有并集性質的&#xff0c;所以把cover的判斷條件更改一下就可以了qwq。 hdu1542 代碼如下&#xff1a; #include<i…

使用了JDK自帶的jconsole查看Tomcat運行情況

最近對公司的項目進行JVM調優&#xff0c;使用了JDK自帶的jconsole查看Tomcat運行情況&#xff0c;記錄下配置以便以后參考&#xff1a;首先&#xff0c;修改Tomcat的bin目錄下的catalina.bat文件&#xff0c;在JAVA_OPTS變量中添加下面四行&#xff0c;即可set JAVA_OPTS %JAV…

jvm02

java虛擬機內存管理 每個線程就是一個順序的執行單元&#xff0c;線程共享區即多個線程共享同一塊區域&#xff0c;線程獨占區即每個線程都有自己的虛擬機棧&#xff0c;本地方法棧&#xff0c;程序計數器。 程序計數器是一個比較小的內存空間&#xff0c;可以看作是當前線程所…

搭建svn管理平臺

安裝svn服務器&#xff1a;yum -y install subversion創建svn的目錄&#xff1a;mkdir -p /data/svn初始化svn目錄&#xff1a;svnadmin create /data/svnconf下的三個目錄介紹&#xff1a;authz&#xff1a;控制權限,創建用戶。密碼在passwd創建 passwd&#xff1a;密碼文件&…

Oracle dataguard 正常切換和應急切換

Oracle dataguard 正常切換和應急切換oracle dataguard提供異地容災方案,能有效的防止單點故障和提供高可用技術,這里介紹dataguard正常主備切換和應急切換&#xff08;應急切換模擬主庫出現問題無法還原,備庫脫離dataguard接管主庫對外提供服務&#xff09;1&#xff09;Oracl…

好程序員web前端分享JS引擎的執行機制

好程序員web前端分享JS引擎的執行機制&#xff0c;請先著重牢記兩點&#xff01;JS是單線程語言。JS的EventLoop是JS的執行機制。深入了解JS的執行&#xff0c;就等于深入了解JS里的eventloop。1、靈魂三問&#xff1a;JS為什么是單線程的?為什么需要異步?單線程又是如何實現…

shutil模塊、json和pickle模塊

shutil模塊&#xff1a; 高級的文件、文件夾、壓縮包處理模塊 json和pickle模塊 之前學過eval內置方法可以將一個字符串轉化成Python對象&#xff0c;但eval方法是有局限性的&#xff0c;對于普通的數據類型&#xff0c;json.loads、eval都可以使用&#xff0c;但遇到特殊類型的…

每日一問:LayoutParams 你知道多少?

前面的文章中著重講解了 View 的測量流程。其中我提到了一句非常重要的話&#xff1a;**View 的測量匡高是由父控件的 MeasureSpec 和 View 自身的 LayoutParams 共同決定的。**我們在前面的 每日一問&#xff1a;談談對 MeasureSpec 的理解 把 MeasureSpec 的重點進行了講解&a…

kuangbin專題十六 KMP擴展KMP HDU2594 Simpsons’ Hidden Talents

Homer: Marge, I just figured out a way to discover some of the talents we weren’t aware we had. Marge: Yeah, what is it? Homer: Take me for example. I want to find out if I have a talent in politics, OK? Marge: OK. Homer: So I take some politician’s na…

SNI: 實現多域名虛擬主機的SSL/TLS認證

為什么80%的碼農都做不了架構師&#xff1f;>>> 一. 介紹 早期的SSLv2根據經典的公鑰基礎設施PKI(Public Key Infrastructure)設計&#xff0c;它默認認為&#xff1a;一臺服務器&#xff08;或者說一個IP&#xff09;只會提供一個服務&#xff0c;所以在SSL握手時…

echo(),print(),print_r(),var_dump()的區別

echo可以一次輸出多個值&#xff0c;多個值之間用逗號分隔。echo是語言結構(language construct)&#xff0c;而并不是真正的函數&#xff0c;因此不能作為表達式的一部分使用。echo是php的內部指令&#xff0c;不是函數&#xff0c;無返回值。 print()&#xff1a;函數print()…

我心目中的牛程序員、我們可以對比看看(人家還是看多年朋友面子上才肯幫忙1周,至少需支付1萬元辛苦費)...

為什么80%的碼農都做不了架構師&#xff1f;>>> 最近碰到客戶整個網站改版的需要&#xff0c;非常短的時間里只有1周時間里&#xff0c;需要把整個B2C網站徹底的進行版面&#xff0c;我自己估算了一下&#xff0c;就是往死里干一天工作48個小時&#xff0c;1周也干…

c#做端口轉發程序支持正向連接和反向鏈接

3389的時候 例子1&#xff1a;連接a機器的3389端口連不上&#xff0c;因為對方防火墻或者網關做了限制&#xff0c;只能訪問a機器的個別端口比如80。 例子2&#xff1a;連接a機器的幾乎所有端口都連不上&#xff08;對方乃內網或者防火墻網關做了限制&#xff09;&#xff0c…

Spring Boot(十四):spring boot整合shiro-登錄認證和權限管理

Spring Boot(十四)&#xff1a;spring boot整合shiro-登錄認證和權限管理 使用Spring Boot集成Apache Shiro。安全應該是互聯網公司的一道生命線&#xff0c;幾乎任何的公司都會涉及到這方面的需求。在Java領域一般有Spring Security、Apache Shiro等安全框架&#xff0c;但是由…