基于redis的stream結構作為消息隊列,實現異步秒殺下單
需求:
-
創建一個Stream類型的消息隊列,名為stream.oreders
-
修改之前的秒殺下單Lua腳本,在認定有搶夠資格后,直接向stream.orders中添加消息,內容包括voucherId、userId、orderId
-
項目啟動后,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單
首先,使用命令行來創建消息隊列:
其次,需要修改Lua腳本:
------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by 20893.--- DateTime: 2025/6/27 15:48----- 1.參數列表--1.1優惠券IDlocal voucherId = ARGV[1]--1.2用戶IDlocal userId = ARGV[2]-- 1.3 訂單IDlocal orderId = ARGV[3]--2.數據key--2.1庫存keylocal stockKey = 'seckill:stock:' .. voucherId--2.2訂單keylocal orderKey = 'seckill:order:' .. voucherId--3.業務邏輯--3.1判斷庫存是否充足-- redis.call('get',stockKey)中取出的值是String類型,需要將其轉化成int類型,調用tonumber()方法if (tonumber(redis.call('get', stockKey))<= 0) then--3.2. 庫存不足 返回1return 1end--3.3判斷用戶是否重復下單if (redis.call('sismember', orderKey, userId) == 1) then--3.4. 重復下單 返回2return 2end--3.5扣減庫存redis.call('incrby', stockKey, -1)--3.6記錄訂單redis.call('sadd', orderKey, userId)--3.7發送消息到隊列中,xadd stream.orders * k1 v1 ...redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0
還需要修改Java業務代碼:
?public Result seckillVoucher(Long voucherId) {//獲取用戶Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");//1.執行Lua腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));//2.判斷結果是否為0int r = result.intValue();if (result != 0){//2.1.不為0,代表沒有購買資格return Result.fail(r == 1 ? "庫存不足" : "不能重復下單");}//獲取代理對象proxy = (IVoucherOrderService) AopContext.currentProxy();//3.返回訂單IDreturn Result.ok(orderId);
最終,根據我們的偽代碼進行對異步線程中業務流程的修改
while(true){//嘗試今天隊列,使用阻塞模式,最長等待2000毫秒Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");if(msg == null){//null說明沒有消息,繼續下一次continue;}try{//處理消息,完成后需要確認消息(ACK)handleMessage(msg);}catch(Exception e){while(true){Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");if(msg == null){//null說明沒有異常消息,所有消息已確認,結束循環break;}try{//說明有異常消息,再次處理handleMessage(msg);}catch(Exception e){//再次出現異常,記錄日志,繼續循環continue;}}}}
代碼展示:
?private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//該注解表示在類初始化之后執行@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private ?class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while ( true){try {//獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判斷消息獲取是否成功if(list == null || list.isEmpty()){//2.1如果獲取失敗,說明沒有消息,繼續下一次循環continue;}//2.2.解析消息中的數據MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//將map對象轉換成訂單對象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果獲取成功,可以下單handleVoucherOrder(order);//3.ACK確認 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());?} catch (Exception e) {log.error("獲取訂單信息異常",e);handlePendingList();}}?}?private void handlePendingList() {while( true){try {//獲取pending-list中的訂單信息 xreadgroup group g1 c1 count 1 streams stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.from("0")));//判斷消息獲取是否成功if(list == null || list.isEmpty()){//2.1如果獲取失敗,說明pending-list沒有消息,繼續下一次循環break;}//2.2.解析消息中的數據MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();//將map對象轉換成訂單對象VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.2.如果獲取成功,可以下單handleVoucherOrder(order);//3.ACK確認 sack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());?} catch (Exception e) {log.error("獲取pending-list訂單信息異常",e);//如果害怕因為報錯導致陷入循環,可以設置休眠時間try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}}
整體業務流程描述:
首先嘗試從消息隊列中讀取數據,如果數據獲取失敗,直接進行下一次循環,再來讀一次消息隊列,如果獲取成功,說明有訂單信息需要處理,就去解析訂單信息,完成下單,在進行ack確認。在進行下一次讀取,繼續循環,如果在處理消息的過程拋出異常,導致該消息沒有確認,那么該消息就會進入pending-list,就被catch到,在catch中執行handlePendinglist()函數,在該函數中,首先去pending-list獲取未確認消息,如果讀到,則解析消息,處理,下單,ack確認,如果沒有異常消息,就會直接跳出循環,異常處理結束,如果再拋異常,就再度循環。直到pending-list中所有異常全部處理完成為止。
進行測試:
再次測試:
至此優化秒殺下單的業務需求完成。
希望對大家有所幫助。