事務
RabbitMQ是基于AMQP協議實現的,該協議實現了事務機制,因此RabbitMQ也支持事務機制.
SpringAMQP也提供了對事務相關的操作,RabbitMQ事務允許開發者確保消息的發送和接收是原子性的,要么
全部成功,要么全部失敗.|
前期準備工作:
//事務public static final String TRANS_QUEUE = "TRANS_QUEUE";public static final String TRANS_EXCHANGE = "TRANS_EXCHANGE";public static final String TRANS_KEY = "TRANS_KEY";
//事務@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(MQConstants.TRANS_QUEUE).build();}@Bean("transExchange")public Exchange transExchange() {return ExchangeBuilder.directExchange(MQConstants.TRANS_EXCHANGE).build();}@Bean("transBinding")public Binding transBinding(@Qualifier("transExchange") Exchange exchange, @Qualifier("transQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.TRANS_KEY).noargs();}
配置事務管理器
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}
添加 @Transactional
如果不添加 @Transactional,我們的事務管理器是不會在這個代碼上生效的
@Transactional@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans1 ");int n = 10 / 0;rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans2 ");return "消息發送成功";}
經過觀察,我們可以看到這兩條消息要么同時發送成功,要么同時發送失敗
消息分發
RabbitMQ隊列擁有多個消費者時,隊列會把收到的消息分派給不同的消費者,每條消息只會發送給訂閱列表里的一個消費者,這種方式非常適合擴展,如果現在負載加重,那么只需要創建更多的消費者來消費處理消息即可。
默認情況下,RabbitMQ是以輪詢的方法進行分發的,而不管消費者是否已經收到消費并已經確認了消息,這種方式是不太合理的,試想一下,如果某些消費者消費速度慢,而某些消費者消費速度快,就可能會導致某些消費者消息積壓,某些消費者空閑,進而應用整體的吞吐量下降。
如何處理呢?我們可以使用前面章節講到的**channel.basicQos(intprefetchCount)**方法,來限制當前信道上的消費者所能保持的最大未確認消息的數量比如:消費端調用了channelbasicQos(5),RabbitMQ會為該消費者數,發送一條消息計數+1,消費一條消息計數-1,當達到了設定的上限,RabbitMQ就不會再向它發送消息了,直到消費者確認了某條消息。
類似TCP/IP中的"滑動窗口".
prefetchCount設置為0時表示沒有上限。
basicQos對拉模式的消費無效
限流和負載均衡
配置信息:
listener:simple:acknowledge-mode: manual # 設置確認模式prefetch: 5
前期準備:
常量類:
//限流public static final String QOS_QUEUE = "QOS_QUEUE";public static final String QOS_EXCHANGE = "QOS_EXCHANGE";public static final String QOS_KEY = "QOS_KEY";
聲明:
//限流@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(MQConstants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(MQConstants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.QOS_KEY).noargs();}
生產者:
@RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.QOS_EXCHANGE, MQConstants.QOS_KEY, "qos");}return "消息發送成功";}
如果我們不進行手動確認,觀察最大未確認的消息接收量:
可以得知消費者最大接收到的未確認消息數量為 我們設置的 prefetch 值
我們可以通過限流這種方式實現負載均衡
@Component
public class QosListener {@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle1(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費消息:" + messageContent);Thread.sleep(10000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle2(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費消息:" + messageContent);Thread.sleep(5000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
可以觀察到消費能力強的隊列會持續消費消息,消費能力弱的隊列消費的消息會相對較少