1.maven中引入rabbitmq的依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.application.yml中進行rabbitmq相關配置:
# rabbitmq空一格寫,內嵌在spring里邊
rabbitmq:host: 192.168.56.10port: 5672virtual-host: /#發送端確認機制 correlated:發布消息成功到交換器后會觸發回調方法publisher-confirm-type: correlatedtemplate:# 只要抵達隊列,以異步發送優先回調以異步發送優先回調我們這個returnconfirmmandatory: true# 開啟發送消息抵達隊列的確認publisher-returns: true
?在項目啟動類中添加開啟rabbitmq的注解@EnableRabbit
3.創建交換機,隊列,并將隊列綁定到指定交換機:
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;@Configuration
public class MyMQConfig {@Beanpublic Queue orderDelayQueue(){/* * 構造一個死信隊列* x-dead-letter-exchange: order-event-exchange* x-dead-letter-routing-key: order.release.order* x-message-ttl: 60000* @create 2025/1/1**/Map<String,Object>arguments=new HashMap<>();arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);Queue queue = new Queue("order.delay.queue", true, false, false,arguments);return queue;}@Beanpublic Queue orderReleaseOrderQueue(){Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}@Beanpublic Exchange orderEventExchange(){return new TopicExchange("order-event-exchange",true,false);}@Beanpublic Binding orderCreateOrderBinding(){return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding(){return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}
}
?在上面的代碼中,創建了2個隊列:order.delay.queue和order.release.order.queue,其中order.delay.queue是延時隊列,為了方便演示,將延時時間設置為1分鐘(60000ms)。order.release.order.queue是普通隊列。創建了1個交換機:order-event-exchange。order.delay.queue隊列通過路由鍵order.create.order綁定到order-event-exchange交換機,order.release.order.queue隊列通過路由鍵order.release.order也綁定到order-event-exchange交換機。在交換機的選擇上,考慮到要綁定到不同的隊列和路由鍵,支持模糊匹配,這里使用Topic交換機。
4.在MyMQConfig 創建一個方法監聽隊列:
@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity order, Channel channel, Message message) throws IOException {System.out.println("收到過期的訂單信息,準備關閉訂單"+order.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
?添加上面的方法后,當我們的服務連上rabbitmq,rabbitmq就會創建上面@Bean標注的交換機和隊列了。
5.寫一個創建訂單的接口,簡單模擬下這個過程:
@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/test/createOrder")@ResponseBodypublic String creatOrder(){OrderEntity order=new OrderEntity();order.setOrderSn(UUID.randomUUID().toString());order.setCreateTime(new Date());rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);return "ok";}
使用rabbitTemplate將訂單數據通過交換機order-event-exchange和路由鍵order.create.order將訂單數據order發送給隊列order.delay.queue。啟動服務調用接口,來觀察rabbitmq后臺隊列的變化:
可以看到?order.delay.queue隊列中有1條消息等待消費,由于rabbitmq后臺每隔5秒刷新一次,過1分鐘再去看,隊列中已有消息了。