一、rabbitmq發送消息
一、簡單模式
概述
一個生產者一個消費者
模型
代碼
//沒有交換機,兩個參數為routingKey和消息內容
rabbitTemplate.convertAndSend("test1_Queue","haha");
二、工作隊列模式
概述
一個生產者,多個消費者,消費者之間負載均衡
模型
代碼
//沒有交換機,兩個參數為routingKey和消息內容rabbitTemplate.convertAndSend("test1_Queue","haha");
三、發布訂閱模式
概述
生產者把消息給交換機,交換機把消息推送給與它綁定的所有隊列,消費者監聽自己的隊列
模型
代碼
//該模式下,交換機與隊列綁定無需routingkey,因此效率最高
rabbitTemplate.convertAndSend("fanout_Exchange","","lala");
四、路由模式
概述
交換機與隊列由routing key綁定,生產者發送消息時指定交換機和routing key,則對應的隊列便會收到消息
模型
代碼
rabbitTemplate.convertAndSend("direct_Exchange","test1_Queue","lala");
五、主題模式(通配符模式)
概述
交換機與隊列由routing key綁定,但routing key由通配符和具體的字符組成,生產者輸入具體的字符,交換機根據routing key的規則模糊匹配到對應的隊列,則對應的隊列會收到消息
模型
代碼
/*** 交換機與隊列綁定* @return*/
@Bean
Binding truckHistoryBinding(){return BindingBuilder.bind(test1Queue()).to(topicExchange()).with("*.test1.*");
}@GetMapping("/sendMessage")
public void sendMessage() {//需要字符串的模糊匹配,效率最低rabbitTemplate.convertAndSend("topic_Exchange","aa.test1.cc","lala");
}
二、rabbitmq接收消息
一、拉模式
概述
消費者可以主動拉取隊列里的消息
代碼
rabbitTemplate.execute(channel->{//通過channel.basicGet方法可以單條獲取消息,其返回值時GetReponseGetResponse response = channel.basicGet("my_queue",false);String message = new String(response.getBody());}
)
二、推模式
概述
通過發布訂閱模式,訂閱隊列里的消息
代碼
@RabbitListener(queues="my_queue")public void onMessage(Message messge,Channel channel){String msg = new String (message.getBody());}
三、消息的手動確認
注意:
手動確認需要先將自動確認的配置注釋掉;
消息確認模式有:
AcknowledgeMode.NONE:自動確認
AcknowledgeMode.AUTO:根據情況確認
AcknowledgeMode.MANUAL:手動確認
默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual
spring:rabbitmq:listener:simple:acknowledge-mode: manual
或在 RabbitListenerContainerFactory 中進行開啟手動 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ackreturn factory;
}
消費消息手動確認的監聽器
獲取消息消費的唯一標識
message.getMessageProperties().getDeliveryTag();
執行業務處理
消息確認
//消費消息的手動確認,消息確認成功-basicAck//第一個參數deliveryTag,消息的唯一標識//第二個參數multiple,消息是否支持批量確認,如果是true,代表可以一次性確認標識小于等于當前標識的所有消息//如果是false,只會確認當前消息channel.basicAck(deliveryTag,false);
消息確認失敗處理,根據條件判斷設置是否重回隊列 ,是否支持批量處理
//說明消費消息處理失敗,如果不進行確認(自動確認,投遞成功即確認,消費是否正常,不關心),消息就會丟失//消息處理失敗確認,代表消息沒有正確消費,注意:此種方式一次只能確認一個消息//第一給參數是消息的唯一標識,//第二個參數是代表是否重回隊列,如果是true,重新將該消息放入隊列,再次消費//注意:第二個參數要謹慎,必須要結合具體業務場景,根據業務判斷是否需要重回隊列,一旦處理不當,機會導致消息循環入隊,消息擠壓//不重回隊列 require = false
// channel.basicReject(deliveryTag,false);//重回隊列 require = truechannel.basicReject(deliveryTag,true);//消息處理失敗確認,代表消息沒有正確消費,注意,此種方式支持批量//第一個參數是消息的唯一標識,//第二個參數是代表是否支持批量確認//第三給參數代表是否重回隊列//不重回隊列 require = falsechannel.basicNack(deliveryTag,true,false);//重回隊列 require = truechannel.basicNack(deliveryTag,false,true);