生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式, 所有在該信道上面發布的消息都將會被指派一個唯一的 ID (從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一 ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker 回傳給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也可以設置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經得到了處理。
單個確認發布 ?
這是一種簡單的確認方式,它是一種 同步確認發布 的方式,也就是發布一個消息之后只有它
被確認發布,后續的消息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在消息被確認
的時候才返回,如果在指定時間范圍內這個消息沒有被確認那么它將拋出異常。
這種確認方式有一個最大的缺點就是: 發布速度特別的慢, 因為如果沒有確認發布的消息就會
阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對于某
些應用程序來說這可能已經足夠了。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageIndividually {private static final int MESSAGE_COUNT = 5;public static void publishMessageIndividually() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());//服務端返回 false 或超時時間內未返回,生產者可以消息重發boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息發送成功");}}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) +"ms");}}
}
耗時
?批量確認發布
上面那種方式非常慢,與單個等待確認消息相比,先發布一批消息然后一起確認可以極大地
提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題時,不知道是哪個消息出現
問題了,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種
方案仍然是同步的,也一樣阻塞消息的發布。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;public class publishMessageBatch {private static final int MESSAGE_COUNT = 5;public static void publishMessageBatch() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();//批量確認消息大小int batchSize = 100;//未確認消息個數int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;}}//為了確保還有剩余沒有確認消息 再次確認if (outstandingMessageCount > 0) {channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageBatch.publishMessageBatch();}
}
?耗時
?異步確認發布
異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,
他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,
下面就讓我們來詳細講解異步確認是怎么實現的。
import cn.hutool.core.lang.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class publishMessageAsync {private static final int MESSAGE_COUNT = 5;public static void publishMessageAsync() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);//開啟發布確認channel.confirmSelect();/*** 線程安全有序的一個哈希表,適用于高并發的情況* 1.輕松的將序號與消息進行關聯* 2.輕松批量刪除條目 只要給到序列號* 3.支持并發訪問*/ConcurrentSkipListMap<Long, String> outstandingConfirms = newConcurrentSkipListMap<>();/*** 確認收到消息的一個回調* 1.消息序列號* 2.true 可以確認小于等于當前序列號的消息* false 確認當前序列號消息*/ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {//返回的是小于等于當前序列號的未確認消息集合 是一個 mapConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(sequenceNumber, true);//清除該部分未確認消息集合confirmed.clear();}else{//只清除當前序列號的消息outstandingConfirms.remove(sequenceNumber);}};ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {String message = outstandingConfirms.get(sequenceNumber);System.out.println("發布的消息"+message+"未被確認,序列號"+sequenceNumber);};/*** 添加一個異步確認的監聽器* 1.確認收到消息的回調* 2.未收到消息的回調*/channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;/*** channel.getNextPublishSeqNo()獲取下一個消息的序列號* 通過序列號與消息體進行一個關聯* 全部都是未確認的消息體*/outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", queueName, null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +"ms");}}public static void main(String[] args) throws Exception {publishMessageAsync.publishMessageAsync();}
}
耗時
?以上 3 種發布確認速度對比
單獨發布消息
同步等待確認,簡單,但吞吐量非常有限。
批量發布消息
批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條
消息出現了問題。
異步處理: 最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微難些