上期寫到高并發下RabbitMq消息中間件你應該介么玩今天給小伙伴說說!有自己看法的也可以在評論區留言探討,也可以轉發關注下我以后會長期分享!
目錄:確保消息發送到RabbitMQ服務器
確保消息被正確的路由
確保消息在隊列正確地存儲
確保消息從隊列正確地投遞到消費者
消費者回調
補償機制
消息冪等性
消息的順序性
可靠性投遞
首先需要明確,效率和可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對消息的收發效率造成影響,如過是一些業務實時性要求不是特別高的場合,可以犧牲可靠性來換取效率。
①代表消息從生產者發送到Exchange
②代表消息從Exchange路由到Queue
③ 代表消息在Queue中存儲;
④ 代表消費者訂閱Queue并消費消息。
1.確保消息發送到RabbitMQ服務器
可能因為網絡或者Broker的問題導致①失敗,而生產者是無法得知消息是否正確發送到Broker的。
有兩種解決方案:
第一種是Transaction事務模式
第二種是Confirm確認模式1.在通過channel.txSelect方法開啟事務之后,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定 到達了RabbitMQ中,如果在事務提交執行之前由于RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。使用事務機制的話會“吸干”RabbitMQ的性 能,一般不建議使用。
2.生產者通過調用channel.con?rmSelect方法(即Con?rm.Select命令)將信道設置為con?rm模式。一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。
2.確保消息被正確的路由
可能因為路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導致②失敗。使用mandatory參數和ReturnListener,可以實現消息無法路由的時候返回給生產者。
另一種方式就是使用備份交換機(alternate-exchange),無法路由的消息會發送到這個交換機上。
Map arguments = new HashMap();
// 指定交換機的備份交換機
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
3.確保消息在隊列正確地存儲
可能因為系統宕機、重啟、關閉等等情況導致存儲在隊列的消息丟失,即③出現問題。
解決方案:
1.隊列持久化
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2.交換機持久化
// String exchange, boolean durable
channel.exchangeDeclare("MY_EXCHANGE","true");
3.消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
// 2代表持久化,其他代表瞬態
.deliveryMode(2)
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
4.確保消息從隊列正確地投遞到消費者
如果消費者收到消息后未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。
為了保證消息從隊列可靠性到達消費者,RabbitMQ提供了消息確認機制(message acknowledgement),消費者在訂閱隊列時,可以指定autoAck參數,當autoAck等于false時,RabbitMQ會等待消費者顯示地回復確認消息才從隊列中刪除該消息。
如果消息消費失敗,也可以調用Basic.Reject或者BasicNack來拒絕當前消息而不是確認,如果requere參數為true,可以把這條消息重新存入隊列,以便發送給下一個消費者。
5.消費者回調
消費者處理消息之后,可以再發送一條消息給生產者,或者調用生產者地API,告知消息處理完畢。
6.補償機制
對于一定時間沒有響應地消息,可以設置一個定時重發地機制,但是要控制次數,比如最多重復三次,否則會造成消息堆積。
7.消息冪等性
服務端是沒有這種控制的,只能在消費端控制。
如何避免消息的重復消費?消息重復消費可能會有兩個原因:生產者的問題。環節①重復發送消息,比如在開啟Confirm模式但未收到確認
環節④出了問題,由于消費者未發送ACK或者其它原因,消息重復投遞
對于重復發送的消息,可以對每一條消息生成一個唯一的業務id,通過日志或者建表來做重復控制。
8.消息的順序性
消息的順序性是指消費者消費消息的順序跟生產者投遞消息的順序是一致的。
在RabbitMQ中,一個隊列有多個消費者時,由于不同的消費者消費消息的速度是不一樣的,順序無法保證
學習分享對于高并發下RabbitMq消息中間件的使用,這位大咖也有講解的視頻,在這免費分享給大家,有需要的朋友可以幫忙點個贊關注下吧,分享不易,然后關注我的專欄(Java快速進階通道)看頂部提示,或直接(點我)領取哦!
視頻教學的內容包括:削峰限流、解耦作用
有限資源完成高并發購票
RabbitMq原理透析
購票完整閉環流程
單元測試與多線程高并發測試
不管多忙,每天給自己預留至少半小時的學習時間,拒絕做代碼垃圾的搬運工!
有不對的地方可以在評論區留言,覺得不錯的朋友希望能得到您的轉發支持,同時可以持續關注我,每周定期會分享3到4篇精選干貨!