消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成
了部分突然它掛掉了,會發生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消
息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續
發送給該消費這的消息,因為它無法接收到。
為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是: 消費者在接
收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。
消息應答的方法?
Channel.basicAck(用于肯定確認)
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
Channel.basicNack(用于否定確認)
Channel.basicReject(用于否定確認)
與 Channel.basicNack 相比少一個參數
不處理該消息了直接拒絕,可以將其丟棄了
自動應答?
消息發送后立即被認為已經傳送成功,這種模式需要在 高吞吐量和數據傳輸安全性方面做權
衡 ,因為這種模式如果消息在接收到之前,消費者那邊出現連接或者 channel 關閉,那么消息就丟
失了,當然另一方面這種模式消費者那邊可以傳遞過載的消息, 沒有對傳遞的消息數量進行限制 ,
當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終
使得內存耗盡,最終這些消費者線程被操作系統殺死, 所以這種模式僅適用在消費者可以高效并
以某種速率能夠處理這些消息的情況下使用。默認消息采用的是自動應答.
手動應答
手動應答的好處是可以批量應答并且減少網絡擁堵multiple 的 true 和 false 代表不同意思
true 代表批量應答 channel 上未應答的消息
比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時
5-8 的這些還未應答的消息都會被確認收到消息應答
false 同上面相比
只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答
消費者1?
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1 等待接收消息處理時間較短");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠1秒Thread.sleep(1000*1);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息標記tag,2.false代表只應答接收到的那個傳遞的消息,true為應答所有消息包括傳遞過來的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C1 消費者啟動等待消費......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}
?消費者2
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2 等待接收消息處理時間較長");DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);try {// 睡眠30秒Thread.sleep(1000*30);} catch (InterruptedException e) {throw new RuntimeException(e);}// 消息標記tag,2.false代表只應答接收到的那個傳遞的消息,true為應答所有消息包括傳遞過來的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C2 消費者啟動等待消費......");boolean autoAck=false;channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}
生產者?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 實現了自動 close 接口 自動關閉 不需要顯示關閉try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 默認消息存儲在內存中* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 發送一個消息* 1.發送到那個交換機* 2.路由的 key 是哪個* 3.其他的參數信息* 4.發送消息的消息體*/for (int i = 0; i < 10; i++) {channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息發送完畢");}}
}
結果
在發送者發送消息 ,發出消息之后的把 C2 消費者停掉,按理說該 C2 來處理該消息,但是
由于它處理時間較長,在還未處理完,也就是說 C2 還沒有執行 ack 代碼的時候,C2 被停掉了,
此時會看到消息被 C1 接收到了,說明消息? 被重新入隊,然后分配給能處理消息的 C1 處理了?

消息自動重新入隊
如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息
未發送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者
可以處理,它將很快將其重新分發給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確
保不會丟失任何消息。
?