RabbitMQ的消息消費是有確認機制的,正常情況下,消費者在消費消息成功后,會發送一個確認消息,消息隊列接收到之后,就會將該消息從消息隊列中刪除,下次也就不會再投遞了。
但是如果存在網絡延遲的問題,導致確認消息沒有發送到消息隊列,導致消息重投了,是有肯呢個,所以當我們使用MQ的時候,消費者端自己也需要做好冪等控制來防止消息被重復消費。
那么問題來了,什么是冪等性?如何解決接口冪等的問題呢?
解決接口冪等問題,只需要記住一句口令“一鎖、二判、三更新”,只要嚴格遵守這個過程,那么就可以解決并發問題。
一鎖:第一步,先加鎖。可以加分布式鎖、或者悲觀鎖都可以。但是一定要是一個互斥鎖!
二判:第二步,進行冪等性判斷。可以基于狀態機、流水表、唯一性索引等等進行重復操作的判斷。
三更新:第三步,進行數據的更新,將數據進行持久化。
// 一鎖:先加一個分布式鎖
@DistributeLock(scene = "ORDER", keyExpression = "#request.identifier", expire = 3000)
public OrderResponse apply(OrderRequest request) {OrderResponse response = new OrderResponse();// 二判:判斷請求是否執行成功過OrderDTO orderDTO = orderService.queryOrder(request.getProduct(), request.getIdentifier());if (orderDTO != null) {response.setSuccess(true);response.setResponseCode("DUPLICATED");return response;}// 三更新:執行更新的業務邏輯return orderService.order(request);
}
三步需要嚴格控制順序,確保加鎖成功后進行數據查詢和判斷,冪等性判斷通過后再更新,更新結束后釋放鎖。
以上操作需要有一個前提,那就是第一步加鎖和第二步判斷的時候,需要有一個依據,這個就是冪等號了,通常需要和上游約定一個唯一 ID 作為冪等號。然后通過對冪等號加鎖,再通過對冪等號進行冪等判斷即可。
一鎖這個過程,建議使用 Redis 實現分布式鎖,因為他是非阻塞的高效率的互斥鎖。非常適合在冪等控制場景中。
二判這個過程,如果有操作流水,建議基于操作流水做冪等,并將冪等號作為唯一性約束,確保唯一性。如果沒有流水,那么基于狀態機也是可以的。
但是不管怎么樣,數據庫的唯一性約束都要加好,這是系統的最后一道防線了。萬一前面的鎖失效了,這里也能控制得住不會產生臟數據。
最后也就是說,我們在發送消息是需要生成一個唯一的標識并且把它放到消息體中,根據這個標識就可以判斷兩次消息是不是同一條。這樣我們在消費者端,接收到消息以后,只需要解析出消息體重的這個唯一標識,就可以通過“一鎖、二判、三更新”的方式來判斷是否消費成功過了。