摘要:
????????本文介紹了基于Redis的秒殺系統優化方案,主要包含兩部分:1)通過Lua腳本校驗用戶秒殺資格,結合Java異步處理訂單提升性能;2)使用Redis Stream實現消息隊列處理訂單。方案采用Lua腳本保證庫存校驗和一人一單的原子性,通過阻塞隊列異步保存訂單,并引入Redisson分布式鎖防止重復下單。
????????Redis Stream實現消息隊列,支持消費組和ACK確認機制,確保訂單可靠處理。系統還設計了pending-list異常處理機制,保證訂單處理的最終一致性。這種架構顯著提升了秒殺系統的高并發性能和數據一致性。
一,秒殺優化
redis校驗用戶秒殺資格(庫存是否充足且保證一人一單),通過阻塞隊列異步處理保存訂單到數據庫的操作,提升秒殺性能。
1,編寫校驗秒殺資格lua腳本
庫存不足返回1,用戶重復下單返回2,資格校驗通過返回0。
local voucherId = ARGV[1]
local userId = ARGV[2]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
---庫存不足
if tonumber(redis.call('get', 'stockKey'))<=0 thenreturn 1
end
---庫存充足,判斷用戶是否下單
if redis.call('sismember', orderKey, userId) ==1 then---重復下單return 2
end
---扣減庫存保存用戶id到set中
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
return 0
2,讀取lua腳本到Java程序
(1)lua文件讀取配置
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();//采用spring的讀取文件資源的ClassPathResourceSECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}
(2)調用stringRedisTemlate的execute方法讀取SECKILL_SCRIPT腳本,并傳入參數ARGV[..],因為腳本中不需要KEYS[..]變量,所以這里傳入空集合。
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString()
);
3,調用阻塞隊列BlockingQueue<VoucherOrder>的數組實現并指定大小,將創建好的訂單加入到阻塞隊列,方法即可結束,大大提升性能。
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
VoucherOrder voucherOrder = new VoucherOrder();
//訂單id,用戶id,優惠卷id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);
4,開啟獨立線程處理將訂單寫入數據庫,加上@PostConstruct注解保證在類初始化之后立即執行
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(() -> {while (true) {try {//獲取隊列的頭部,如果需要則等待直到元素可用為止VoucherOrder order = orderTasks.take();handleVoucherOrder(order);} catch (Exception e) {log.error("處理訂單異常", e);}}});}
5,order傳入上鎖的方法handleVoucherOrder(),防止同一個用戶的多個請求并發產生的問題,保證每個請求(線程)單獨執行
private void handleVoucherOrder(VoucherOrder order) {RLock lock = redissonClient.getLock(LOCK_ORDER_KEY + order.getUserId());try {boolean isLock = lock.tryLock();if (!isLock) {log.error("操作頻繁,請稍后重試!");return;}proxy.createVoucherOrder(order);} finally {lock.unlock();}}
6,編寫訂單寫入數據庫的方法createVoucherOrder()并開啟事務,這樣保證鎖的范圍比事務范圍大,避免出現事務未提交鎖提前釋放的問題。
@Transactionalpublic void createVoucherOrder(VoucherOrder order) {//一人一單判斷Long userId = order.getUserId();Long voucherId = order.getVoucherId();long count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {log.error("不能重復下單!");return;}//扣減庫存//這個sql語句是原子性的操作,而LambdaUpdateWrapper表達式不是boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {log.error("庫存不足!");return;}save(order);}
?
二,redis實現消息隊列
(1)基于list模擬消息隊列
(2)基于pubsub的消息隊列
(3)基于stream類型的消息隊列
基于·redis的stream結構作為消息隊列,實現異步秒殺
1,創建STREAM數據stream.order作為阻塞隊列和消費組g1
xgroup create stream.order g1 0 mkstream
2,lua腳本增加發送消息到隊列中的操作和訂單id
local orderId = ARGV[3]
redis.call('xadd', 'stream.order', '*', 'userId', userId,'voucherId', voucherId, 'id', orderId)
3,java客戶端獲取消息隊列中的消息
(1)獲取消息隊列的訂單信息,redis命令:XREADGROUP GROUP g1 c1 count 1 block 2000 streams stream.order >(消費者c1,讀取數量1,阻塞時間2000毫秒,>表示從當前消費者組中未消費的最新的消息開始讀取)
(2)如果返回的結果為空或者list是空集合,說明沒有獲取到,continue后面操作重新獲取;獲取成功,取出消息MapRecord,取出訂單信息鍵值對record.getValue(),最后填入VoucherOrder即可
(3)ack確認消息xack stream.order g1 id
while (true) {try {//獲取消息隊列的訂單信息XREADGROUP GROUP g1 c1 count 1 block 2000 streams stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//獲取失敗,進行下一次獲取循環if (list == null || list.isEmpty()) {continue;}//獲取成功,處理消息隊列中的訂單信息MapRecord<String, Object, Object> record = list.getFirst();Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ack確認消息xack stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);//如果中間出現異常那么就在pending-list中保存訂單信息handlePendingList();}
}
4,如果中間出現異常那么就在pending-list中保存訂單信息
(1)獲取pending-list中的訂單信息,redis命令:XREADGROUP GROUP g1 c1 count 1 streams stream.order 0(0表示從開始位置讀取消息)
(2)獲取失敗,說明pending-list里面沒有異常消息,那么直接結束循環;如果再次出現異常,記錄日志,休眠線程一段時間 ,重新開始循環。? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
private void handlePendingList() {while (true) {try {List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if (list == null || list.isEmpty()) {break;}MapRecord<String, Object, Object> record = list.getFirst();//獲取訂單鍵值對Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理pending-list異常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}