文章目錄
- 事務消息
- 概念
- 配置
- 惰性隊列
- 概念
- 應用場景
- 優先隊列
- 概念
- 配置
事務消息
僅在生產者端有效,消費端無效
概念
總結:
- 在生產者端使用事務消息和消費端沒有關系
- 在生產者端使用事務消息僅僅是控制事務內的消息是否發送
- 提交事務就把事務內所有消息都發送到交換機
- 回滾事務則事務內任何消息都不會被發送
配置
// 配置
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Data
public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
//測試@SpringBootTest
@Slf4j
public class RabbitMQTest {public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";@Autowiredprivate RabbitTemplate rabbitTemplate;@Test@Transactional@Rollback(value = true) // junit 默認回滾事務,所以想提交事務就設置為 falsepublic void testSendMessageInTx() {// 1、發送第一條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01~~~)");// 2、拋出異常log.info("do bad:" + 10 / 0);// 3、發送第二條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02~~~)");}}
惰性隊列
概念
創建隊列時,在Durability這里有兩個選項可以選擇
- Durable:持久化隊列,消息會持久化到硬盤上
- Transient:臨時隊列,不做持久化操作,broker重啟后消息會丟失
https://www.rabbitmq.com/docs/lazy-queues
經典隊列中的消息會盡可能早地移動到磁盤。 只有當使用者請求這些消息時,它們才會加載到 RAM 中
會從硬盤中讀取,只能作為權宜之計
應用場景
優先隊列
概念
默認情況:基于隊列先進先出的特性,通常來說,先入隊的先投遞
- 設置優先級之后:優先級高的消息更大幾率先投遞
- 關鍵參數:x-max-priority
RabbitMQ允許我們使用一個正整數給消息設定優先級
- 消息的優先級數值取值范圍:1~255
- RabbitMQ官網建議在1~5之間設置消息的優先級(優先級越高,占用CPU、
內存等資源越多)
隊列在聲明時可以指定參數:x-max-priority
- 默認值:0 此時消息即使設置優先級也無效
- 指定一個正整數值:消息的優先級數值不能超過這個值
配置
配置exchange:exchange.test.priority
配置隊列:queue.test.priority
x-max-priority
// 生產者:發三個從1開始,優先級越大消費的時候越在前面消費
// 下面是發一條消息的示例public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Test
public void test06SendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "message test proirity 3", message -> {// 消息本身的優先級數值// 切記:不能超過 x-max-priority: 10message.getMessageProperties().setPriority(3);return message;});
}
// 消費者
@Component
@Slf4j
public class MyMessageListener {public static final String QUEUE_PRIORITY = "queue.test.priority";@RabbitListener(queues = {QUEUE_PRIORITY})public void processMessagePriority(String dataString, Message message, Channel channel) throws IOException {log.info("[priority]" + dataString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}