1.RabbitMQ安裝
RabbitMQ Windows 安裝、配置、使用 - 小白教程-騰訊云開發者社區-騰訊云下載erlang:http://www.erlang.org/downloads/https://cloud.tencent.com/developer/article/2192340
Windows 10安裝RabbitMQ及延時消息插件rabbitmq_delayed_message_exchange - 民工黑貓 - 博客園安裝RabbitMQ服務器 第一步:下載erlang 原因:RabbitMQ服務端代碼是使用并發式語言Erlang編寫的,安裝Rabbit MQ的前提是安裝Erlang。下載地址:http://www.erlang.org/downloads 第二步:下載RabbitMQ 下載地址:https://https://www.cnblogs.com/yyee/p/14281111.html
2.生產端確保消息發送到交換機和路由鍵
application.yml
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: CORRELATED # 交換機的確認publisher-returns: true # 隊列的確認
logging:level:com.atguigu.mq.config: info
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}// 發送到交換機-成功或失敗@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("confirm() 回調函數打印 correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);}// 發送到隊列-失敗@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("returnedMessage() 回調函數 msg:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",new String(returned.getMessage().getBody()), returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());}
}
RabbitMQTest
public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowiredprivate RabbitTemplate rabbitTemplate;//發消息-生產端確保消息發到交換機和路由鍵rabbitTemplate.convertAndSend(EXCHANGE_DIRECT + "~", ROUTING_KEY + "~", "Message Test Confirm~~~ ~~~");
3.消費端手動ack
application.yml
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /listener:simple:acknowledge-mode: manual # 把消息確認模式改為手動確認prefetch: 1 # 每次從隊列中取回消息的數量
MyMessageListener
public static final String QUEUE_NAME = "queue.order";//接消息-消費端手動ack//@RabbitListener(queues = {QUEUE_NAME})public void processMessage(String dataString, Message message, Channel channel) throws IOException {//獲取消息的deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();try {//核心操作log.info("消費端 消息內容:" + dataString);System.out.println(10 / 0);//核心操作成功返ACKchannel.basicAck(deliveryTag, false);} catch (Exception e) {//獲取消息是否重復投遞Boolean redelivered = message.getMessageProperties().getRedelivered();//核心操作失敗返NACKif (!redelivered) {//第一次投遞,重新放回隊列 (requeue:是否重新放回隊列)channel.basicNack(deliveryTag, false, true);//channel.basicReject(deliveryTag, true);} else {//重復投遞,不重新放回隊列 (requeue:是否重新放回隊列)channel.basicNack(deliveryTag, false, false);//channel.basicReject(deliveryTag, false);}}}
4.消息設置超時時間(通過消息后置處理器)
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", message -> {message.getMessageProperties().setExpiration("4000");return message;});
5.延遲消息(延遲插件)
Windows 10安裝RabbitMQ及延時消息插件rabbitmq_delayed_message_exchange - 民工黑貓 - 博客園安裝RabbitMQ服務器 第一步:下載erlang 原因:RabbitMQ服務端代碼是使用并發式語言Erlang編寫的,安裝Rabbit MQ的前提是安裝Erlang。下載地址:http://www.erlang.org/downloads 第二步:下載RabbitMQ 下載地址:https://https://www.cnblogs.com/yyee/p/14281111.htmlRabbitMQ延遲插件下載地址
Community Plugins | RabbitMQ RabbitMQ設置延遲消息的交換機
生產端發送延時消息(通過消息后置處理器)
//延遲消息-延時插件(最多倆天內)String msg = "Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date());rabbitTemplate.convertAndSend(EXCHANGE_DELAY, ROUTING_KEY_DELAY, msg, message -> {//x-delay參數必須基于x-delayed-message-exchange插件才能生效message.getMessageProperties().setHeader("x-delay", "10000");return message;});
消費端消費延時消息
@RabbitListener(queues = {QUEUE_DELAY})public void processMessageDelay(String dataString, Message message, Channel channel) throws IOException {log.info("[delay message][消息本身]" + dataString);log.info("[delay message][當前時間]" + new SimpleDateFormat("HH:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}