即使不是做電商業務的同學,也一定知道訂單超時關閉這種業務場景,這個場景大致就是用戶下單后,如果在一定時間內未支付(比如15分鐘、半小時),那么系統就會把這筆訂單給關閉掉。這個功能實現的方式有很多種,比如JDK中自帶的DelayQueue延遲隊列、Timer、ScheduledThreadPoolExecutor,強烈推薦的RocketMQ、RabblitMQ及Kafka等消息隊列,還有就是Hutool的SystemTimer、Netty的HashedWheelTimer等等,感興趣的可以去了解一下。今天我們就先看看Redis和Redisson是如何實現延遲消息的。
一、Redis如何實現延遲消息
1.1 過期key監聽
很多人都知道Redis有一個過期監聽的功能,在redis.conf中加一條notify-keyspace-events開啟過期監聽,然后在代碼中實現KeyExpirationMessageListener就可以監聽key的過期消息了。
這樣就可以在接收到過期消息的時候進行關單操作了,但是這個方案并不推薦,Redis官方明確說過Redis并不保證key在過期的時候就能被立即刪除,更不保證這個消息能被立即發出,所以,消息延遲是必然的,數據量越大延遲的時間越長。
而且,在Redis5.0之前,這個消息是通過PUB/SUB模式發出的,不會做持久化,至于你有沒有接收到,有沒有消費成功,它不管,所以,如果發消息的時候客戶端掛了,之后再恢復的話,這個消息也就徹底丟了。
1.2 Zset
我們可以借助Redis中的有序集合——zset來實現這個功能,zset是一個有序集合,每一個元素(member)都關聯了一個score,可以通過score排序來取集合中的值。
我們可以將訂單超時時間的時間戳(下單時間+超時時間)作為score,訂單號orderId作為成員(member),這樣redis會對zset按照score進行排序,再通過定時任務獲取“當前時間>=score"的延遲任務,獲取到之后就可以根據訂單號(member)進行關單操作。
這么實現的優點就是可以借助redis持久化和高可用機制,避免數據丟失。
1.3 zset實現超時關單Demo
步驟:
用戶下單后,將信息寫入redis zset緩存中,score為當前時間+延遲時間的時間戳,member為訂單號。
使用ZRangeByScore和WithScores命令,獲取當前時間戳之前的所有任務,并通過score判斷哪些任務已到期,進行關單處理。
啟動一個額外的定時任務周期性檢查并處理已到期的訂單。
導入依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Demo:
@Service
public class OrderDelayService {@Autowiredprivate RedisTemplate<String,String> redisTemplate;//延遲15分鐘private final long delayTime = 15*60*1000;//訂單號key前綴private static final String ORDER_KEY="order:";//延遲關單keyprivate static final String DELAY_KEY="close_orders";/*** 創建訂單* @param orderId*/public void createOrder(String orderId){//...//創建訂單成功//1.獲取當前時間戳long currentTime = System.currentTimeMillis();//2.score:當前時間+延遲時間long score=currentTime+delayTime;//3.加入redis zset集合redisTemplate.opsForZSet().add(DELAY_KEY, ?//redis keyORDER_KEY+orderId, //memberscore //score);}
?/*** 任務間隔一秒執行一次*/@Scheduled(fixedDelay = 1000)public void closeExpiredOrders(){//當前時間戳long currentTime = System.currentTimeMillis();ZSetOperations<String, String> zSetOps = redisTemplate.opsForZSet();//取出所有數據(已排好序)Set<String> orderKeys = zSetOps.range(DELAY_KEY, 0, -1);for (String orderKey : orderKeys) {//獲取score,double score = zSetOps.score(DELAY_KEY, orderKey);if(currentTime>=score){String orderId = orderKey.substring(ORDER_KEY.length());//進行關單操作...closeOrder(orderId)//從zset里移除該訂單zSetOps.remove(DELAY_KEY,orderKey);}}}
}
二、Redission如何實現延遲消息
2.1 實現原理
Redission中定義了分布式延遲隊列DelayedQueue,其實就是在zset的基礎上增加了一個基于內存的延遲隊列。
大致的流程如下:
當我們要添加一個數據到延遲隊列的時候,redission會把數據+超時時間放到zset中,并且起一個延時任務,當任務到期的時候,再去zset中把數據取出來,返回給客戶端使用。
2.2 案例
導入依賴:
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.13.1</version>
</dependency>
定義一個客戶端:
@Component
public class RedissionConfig {@Bean(destroyMethod = "shutdown")public RedissonClient redssion(){Config config=new Config();config.useSingleServer().setAddress("redis://127.0.0.1");return Redisson.create(config);}
}
實現:
@Component
public class RedissionDelay {
?@Autowiredprivate RedissonClient client;/*** 創建訂單,并設置過期時間* @param orderId*/public void createOrder(String orderId){//...//創建訂單成功RBlockingDeque<Object> blockingDeque = client.getBlockingDeque("orderQueue");RDelayedQueue<Object> delayedQueue = client.getDelayedQueue(blockingDeque);//將訂單加入延遲隊列,延遲時間為15分鐘delayedQueue.offer(orderId,15, TimeUnit.MINUTES);}
?/*** 關單操作*/public void closeOrder(){RBlockingDeque<Object> orderQueue = client.getBlockingDeque("orderQueue");new Thread(() -> {while (true){try {String orderId = (String) orderQueue.take();//進行關單操作,closeOrder(orderId)} catch (InterruptedException e) {e.printStackTrace();}}});}
}
上述例子,我們用RDelayedQueue的offer方法將訂單添加到了延遲隊列,并指定了延遲時間,當元素的延遲時間到達時,Redission會將元素從RDelayedQueue轉移到與之關聯的RBlockingDeque。
然后在檢查是否要關單的時候,另起了一個線程,不斷循環讀取到期的訂單。值得注意的是 take方法從RBlockingDeque中獲取元素,這是一個阻塞操作,如果沒有元素,它會一直等到,直到有元素。
End:希望對大家有所幫助,如果有紕漏或者更好的想法,請您一定不要吝嗇你的賜教🙋。