黑馬點評redis改 part 3

優惠券秒殺

全局唯一id

每個店鋪都可以發布優惠券:
當用戶搶購時,就會生成訂單并保存到tb_voucher_order這張表中,而訂單表如果使用數據庫自增ID就存在一些問題:實際開發中數據庫ID一般不會參與業務邏輯 增加一個訂單號字段就好

  • id的規律性太明顯
  • 受單表數據量的限制

全局ID生成器,是一種在分布式系統下用來生成全局唯一ID的工具,一般要滿足下列特性:

  • 唯一性?時間戳 uuid雪花算法
  • 高可用
  • 高性能
  • 遞增性 不是遞增的話每次插入數據,都會重建索引,當數據量大的時候重建索引的時候比較耗時
  • 安全性

我們redis里string數據結構是自增特性的,有一個increase的命令,而這個首先可以確保唯一,因為什么redis是獨立于數據庫之外的,第二高可用是redis將來有集群方案和主從方案哨兵方案

全局id生成器里,為了增加ID的安全性,我們可以不直接使用Redis自增的數值,而是拼接一些其它信息:
ID的組成部分:
符號位:1bit,永遠為0
時間戳:31bit,以秒為單位,可以使用69年
序列號:32bit,秒內的計數器,支持每秒產生2^32個不同ID

?我們在utils中新建一個RedisIdWorker,實現一個基于redis的id生成器,

package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;public class RedisIdWorker {//開始時間戳private static final long BEGIN_TIMESTAMP = 1640995200L;private static final int COUNT_BITS=32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {//1.生成時間戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列號//2.1.獲取當前日期,精確到天String date =now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));//2.2.白增長long count =stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix +":"+ date);//這里講的應該是序列號會一直增而不會隨著時間戳變化而刷新回1的意思吧,所有的業務生成id次數超過也是可能的//3.拼接并返回//直接加也行,不過沒有或運算效率高return timestamp << COUNT_BITS | count;}
}

?HmDianPingApplicationTests.java如下

package com.hmdp;import com.hmdp.entity.Shop;
import com.hmdp.service.impl.ShopServiceImpl;
import com.hmdp.utils.CacheClient;
import com.hmdp.utils.RedisIdWorker;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;@SpringBootTest
class HmDianPingApplicationTests {@Resourceprivate ShopServiceImpl shopService;@Resourceprivate CacheClient cacheClient;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate ExecutorService es=Executors.newFixedThreadPool(500);// CountDownLatch大致的原理是將任務切分為N個,讓N個子線程執行,并且有一個計數器也設置為N,哪個子線程完成了就N-1@Testvoid testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task=()->{for(int i =0;i<100;i++) {long id = redisIdWorker.nextId("order");System.out.println("id =" + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time =" + (end - begin));}@Testvoid testSaveShop()throws InterruptedException {Shop shop= shopService.getById(1L);cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop, 10L,TimeUnit.SECONDS);}}

死了媽一樣沒有任何引用問題為什么報錯????

這一節課我們其實講的是基于redis自增的策略,對吧?事實上除了這種策略以外,全球eid的策略還有很多種別的,比方說UUID,UID是大家非常熟悉的一種方案了,對吧?直接利用吉利可以自帶的uid工具類就能夠生成了,這種生成策略它生成的其實是一個16進制的一長串的數值,這時候因為是16進制,所以說它反饋的結果其實是個字符串結構,并且的話也不是這種單調遞增的一種特性,因此雖然可以作為ead,但是不夠友好,沒有滿足我們這些所說的那些個特性,所以說這種用的比較少,我們采用是redis的自增方案,這種方案相對來講各種特性都能夠滿足,而且整體是單調遞增的,數值的長度也不大,總共也不超過了。

而且它是一個數字類型,它存儲在數據庫里占的空間相對來講也比較小,也比較友好一些。然后其實除了這些以外,還會有這種思路,福利卡學生算法,什么福利和算法的話,也是世界知名的一種全局VIP的一種生成策略,它也采用的是一個浪費型的64位數字,有興趣同學可以去百度一下看一看它的一個原理,你會發現跟我們還是有一些接近的,只不過它的自動采購是當前機器的這種自增內部維護的,所以說它需要維護一個機器ID,因為我們是用的race都是不管你是任何的分布式系統,它都是用redis作為,所以它不需要維護機械維,相對來講結構更簡單一點。

那么雪崩算法也是一個非常不錯的算法,它不依賴于類似,所以說它的性能來講可能理論上講會比redis要更好一點。但是它有自己的缺點,就是對于時鐘依賴也是比較高的,如果時間不準確,可能會出現一些異常的問題。

最后一種方法是利用數據庫自增的,我們剛才不是說數據庫自動不行嗎?這怎么還說還能使用數據庫字等等。這其實是,因為我們在這數據庫的增長,不是說我們在新增訂單表的時候把訂單而是單獨整一張表,這張表專門用來做子燈,這樣一來不管你的訂單表是10張表還是8張表,他們的ID其實還不是自動的,他們的ID從哪來?從專門用來做自動的那張表去獲取,等于是這N張表用的是不是同一個表的自動ID,這樣的話其實就可以實現這樣的一個唯一效果了,其實你可以把它理解成是什么?就是redis自增的數據庫版,也就是因為它不再使用類似的,而是用數據庫的。

所以說原理上來講也可能很像,但是從性能來角度來考慮,數據庫的性能肯定不如redis智能性能更好,對不對?所以說企業里頭去使用稅務自動的時候往往會采取一些方案,比如說批量的去獲取ID,然后在內存的緩存起來,然后再去使用這樣一種方案,我覺得也可以一定程度上提高它的性能,所以這些方案的話,后面這三種方案在企業里都有去應用,同學們可以自己去百度一下,研究一下這些方案它的一些不同點。

那么我們的認識之中的策略核心,大家需要注意的,首先它的整體結構時間戳加自增ID,對吧?首先說加總裁也是整體結構,然后在redis保存 key的時候有一個需要注意的事項,就是我們是每天一個key,還有兩個優勢,第一是方便統計每天的訂單量,每月訂單量和每年訂單量。第二,他還可以去限定什么 T字,中的一個值不會讓它太大,以至于超過了我們存儲的上限。另外就是它的結構,剛才說了時間出現計數器的方式

實現優惠券秒殺下單

每個店鋪都可以發布優惠券,分為平價券和特價券。平價券可以任意購買,而特價券需要秒殺搶購

表關系如下:
●tb_voucher:優惠券的基本信息,優惠金額、使用規則等
●tb_seckilL_voucher:優惠券的庫存、開始搶購時間,結束搶購時間。特價優惠券才需要填寫這些信息

為http://localhost:8081/voucher/seckill在postman里面post信息

{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全場通用\\n無需預約\\n可無限疊加\\不兌現、不找零\\n僅限堂食","payValue":8000,"actualValue":10000,"type": 1,"stock":100,"beginTime":"2024-08-24T11:45:14","endTime":"2024-08-24T19:19:10"
}

?可以看到sql

?這里如果餐廳名字和圖片不顯示的因為前面解決緩存擊穿使用了邏輯過期時間,人需要數據預熱,如果你把redis數據清除了就沒預熱的數據了,可以把解決緩存擊穿那個改成用緩存穿透的,就不用添加熱點數據了;?如果出現新添加的秒殺券不顯示的,去數據庫該過期時間往后延一延就ok

下單時需要判斷兩點:
秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
庫存是否充足,不足則無法下單

?

?VoucherOrderController.java

package com.hmdp.controller;import com.hmdp.dto.Result;
import com.hmdp.service.IVoucherOrderService;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {@Resourceprivate IVoucherOrderService voucherOrderService;@PostMapping("seckill/{id}")public Result seckillVoucher(@PathVariable("id") Long voucherId) {return voucherOrderService.seckillVoucher(voucherId);}
}

IVoucherOrderService.java

package com.hmdp.service;import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.baomidou.mybatisplus.extension.service.IService;public interface IVoucherOrderService extends IService<VoucherOrder> {Result seckillVoucher(Long voucherId);
}

VoucherOrderServicelmpl.java(可能有nullntr的問題)

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券拿到信息;其實后續Redission可以直接用信號量來鎖庫存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未開始return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock()<1) {//庫存不足return Result.fail("庫存不足!");}//5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id",voucherId).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}//6.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 6.1. 訂單idlong orderId = redisIdWorker.nextId( "order");voucherOrder.setId(orderId);// 6.2. 用戶idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回訂單idreturn Result.ok(orderId);}
}

超賣問題

線程數:200
Ramp-Up時間(秒):1

去RefreshTokenInterceptor類中獲取對應的authorization值(可用redis題目的方式獲取),再將值填入JMeter的登錄狀態頭中;以新增一個信息頭管理器 然后把authorization參數值帶上就可以了

發給你的文件里有這個jmx可以直接用,刪除剛剛你那條記錄,你發現100張卷賣出109張,明顯的超賣,居然是-9張

?為什么?實際上就是在庫存為1的時候 有多個線程都通過了if判斷

超賣問題是典型的多線程安全問題,針對這一問題的常見解決方案就是加鎖:我們之前在解決緩存擊穿的時候,所采用的鎖就是悲觀鎖

  1. 悲觀鎖
    認為線程安全問題一定會發生,因此在操作數據之前先獲取鎖,確保線程串行執行。例如Synchronized、Lock都屬于悲觀鎖?。當然性能不咋地
  2. 樂觀鎖
    認為線程安全問題不一定會發生,因此不加鎖,只是在更新數據時去判斷有沒有其它線程對數據做了修改。
    如果沒有修改則認為是安全的,自己才更新數據。
    如果已經被其它線程修改說明發生了安全問題,此時可以重試或異常。

?VoucherOrderServicelmpl.java修改一下

        //5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id",voucherId).eq("stock",voucher.getStock()).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}

以后對庫存去做一個判斷,完了以后去扣減庫存。那么按照我們之前分析的所謂的樂觀鎖方案,其實就是在執行扣減的這一刻去加上一個條件,判斷什么?我更新時的這一刻,庫存值與我查詢到的庫存值是不是同一個?如果是證明在我之前沒人修改過,我是不是就可以放心大膽修改了。所以說我需要在word條件里去添加一個對stock值的判斷,它的值就是我查到的值,我查到的值是不是就是stock值
這個代碼給大家解釋一下,這里的set其實等于set條件,也就是setstock=stock-1,下面這兩個是where條件?它們等于?Where?Id等于一個?Stock等于一個?好,所以其實對羅伯斯的判
斷,只要我在更新時where條件庫存值,等于我查詢到的庫存值證明我上面這個判斷是沒有問題的,證明在我之前沒有人修改過,我放心大膽去扣是不是就沒問題?

為什么不是把一開始查到的庫存用變量存起來去做where判斷?而是再調用一次,調用的不就是此刻庫存的嗎?那不就一定成立嗎??

?tb voucher order數據全部刪除,優惠卷重新改成100張余額

結果只賣出去20張,這是為什么呢????

?現在假設說我的庫存還剩100,然后有無數的線程都涌入進來了,沒加鎖那這些線程是不是就并行執行,比如說100個線程,都查到了100,然后我們接下來只有一個線程會去執行這個更新的動作,判斷是否大于0,然后就是那樣,現在變成99了,那這剩下的99個線程都會認為什么在我之前有人改了,不能去犯錯誤就取消;失敗率大大提高

?我們不再判斷庫存是否與我查到的相等了,我們只要判斷庫存大于零,

        //5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id",voucherId).gt("stock",0).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}

刪掉數據庫的內容, 重新設置余額,我們可以看到數據庫100張正好賣完(這其實也是悲觀鎖,數據庫的悲觀

update語句有排它鎖,不可能同時兩個事務修改該數據

一人一單

修改秒殺業務,要求同一個優惠券,一個用戶只能下一單,在當前情況下我們在這個訂單表里面的所有的,這100張訂單竟然都是同一個用戶買

能否 給用戶id和優惠券id建立聯合的唯一索引???

?修改秒殺邏輯

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券拿到信息;其實后續Redission可以直接用信號量來鎖庫存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未開始return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock()<1) {//庫存不足return Result.fail("庫存不足!");}//6.一人一單Long userId = UserHolder.getUser().getId();//6.1.查詢訂單int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();//6.2.判斷是否存在if(count>0) {//用戶已經購買過了return Result.fail("用戶已經購買過一次!");}//5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id",voucherId).gt("stock",0).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 訂單idlong orderId = redisIdWorker.nextId( "order");voucherOrder.setId(orderId);// 7.2. 用戶idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回訂單idreturn Result.ok(orderId);}
}

?該代碼在一人一單校驗 環節存在明顯的線程安全問題,可能導致并發下單。具體分析如下:

是的,這句話是正確的。該代碼在**一人一單校驗**環節存在明顯的線程安全問題,可能導致并發下單。

**問題根源**
1. **競態條件(Race Condition)** ?
? ?代碼中的 `查詢訂單` 和 `創建訂單` 是兩個獨立的操作,不具有原子性。當多個請求同時到達時:
? ?- 線程A查詢訂單,發現用戶未下單(`count=0`)。
? ?- 線程B幾乎同時查詢訂單,也得到 `count=0`。
? ?- 兩個線程都會通過校驗,繼續執行后續的扣庫存和下單邏輯,導致同一用戶生成多個訂單。

2. **事務邊界問題** ?
? ?雖然方法標注了 `@Transactional`,但事務的生效范圍是**方法內整體**,而數據庫的隔離級別(如 `READ COMMITTED`)無法保證 `query().count()` 的實時可見性。在并發場景下,事務內的中間狀態可能對其他事務不可見。

**解決方案**
需通過以下方式保證原子性:

?1. **數據庫唯一約束**
? ?- 在 `voucher_order` 表的 `(user_id, voucher_id)` 字段上添加**唯一索引**。
? ?- 捕獲數據庫插入異常(如 `DuplicateKeyException`),防止重復下單。
? ?```java
? ?// 創建訂單時捕獲異常
? ?try {
? ? ? ?save(voucherOrder);
? ?} catch (DuplicateKeyException e) {
? ? ? ?return Result.fail("用戶已下單");
? ?}
? ?```
? ?**優點**:簡單可靠,直接依賴數據庫約束。 ?
? ?**缺點**:需處理異常,可能影響性能。

?2. **分布式鎖**
? ?- 使用 Redis 或其他分布式鎖,確保同一用戶同一優惠券的下單操作串行化。
? ?```java
? ?// 示例:使用Redisson的RLock
? ?RLock lock = redissonClient.getLock("lock:order:user:" + userId + ":voucher:" + voucherId);
? ?try {
? ? ? ?lock.lock();
? ? ? ?// 執行原有邏輯
? ?} finally {
? ? ? ?lock.unlock();
? ?}
? ?```
? ?**優點**:完全避免并發沖突。 ?
? ?**缺點**:增加系統復雜度,需處理鎖競爭和死鎖。

3. **樂觀鎖(CAS)**
? ?- 在扣減庫存時,結合版本號或條件更新(如 `stock=stock-1 WHERE stock>0`)保證庫存扣減的原子性。
? ?- 但需注意:這只能解決庫存超賣問題,無法直接解決一人多單問題。

*代碼改進建議**
結合唯一索引和事務回滾:

@Transactional
@Override
public Result seckillVoucher(Long voucherId) {// ...其他校驗邏輯(時間、庫存)...// 一人一單校驗(依賴數據庫唯一索引)Long userId = UserHolder.getUser().getId();try {// 直接嘗試插入訂單,依賴唯一索引拋出異常VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(redisIdWorker.nextId("order"));voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder); // 若重復插入,拋出DuplicateKeyException} catch (DuplicateKeyException e) {return Result.fail("用戶已下單");}// 扣減庫存(需保證原子性)boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {// 扣減失敗,回滾訂單(需手動處理或依賴事務)throw new RuntimeException("庫存扣減失敗");}return Result.ok(orderId);
}


原代碼的線程安全問題確實存在,必須通過**數據庫約束**或**分布式鎖**確保原子性。推薦優先使用數據庫唯一索引,因其簡單且可靠。此處的問題依然是由于隔離級別的問題,默認RR

對于一人一單的判斷是在這里 ,?先查詢出訂單,根據用戶id和vouchar id。那我拿這個查結果一定是0.那么

if(count>0) {//用戶已經購買過了return Result.fail("用戶已經購買過一次!");
}

就一定是不是就往下走直接下單了,那因為呢你是一個多線程并發操作,你就會出現多個線程穿插執行的情況,那個一人一單也是如此;現在呢假設說我們的數據庫里根本就沒有訂單,有100個線程都來執行這個邏輯,大家都來做查詢查到都是0,也就是說現在沒有任何一個人去做插入,要并發的情況下都是0,都不成立,都往下走,于是就連續插入了n多條訂單;如果不允許多平臺多端登錄的話這里確實已經解決了,后續講的是一個用戶瞬間并發的解決方案,涉及的知識點有點多

我們需要把這一段邏輯啊給它做一個悲觀鎖封裝

?而是當前用戶我們以用戶id再去加鎖,這樣的話我們是不是可以去把鎖的范圍縮小了,也就是說同一個用戶我去加一把鎖,不同用戶加不同鎖。因此在這里不用synchronize的這種方法,用關鍵字

?userId.toString,我們期望的是id值一樣的作為一把鎖,但是每一個請求來這個id對象是不是都是一個全新的id對象,因此的話呢這個對象變了鎖也變了,這不行,我們要求的是值一樣所有使用tostring。但是tostring就能保證它是按照值來加鎖的嗎?其實在這個tostring方法里面底層調用的啊是這個long的一個靜態的特征函數,在它的內部其實是new了一個字符串,那么每調用一次tostring都是一個全新的字符串對象,也就是這個鎖對象是還會再變,即使你id一樣(常量池才會復用,new的話是每次在堆里開辟一個新對象,因此值一樣對象地址也不同)JVM學的

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券拿到信息;其實后續Redission可以直接用信號量來鎖庫存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未開始return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock()<1) {//庫存不足return Result.fail("庫存不足!");}return createVoucherOrder(voucherId);}//你會注意到上面那個標黃了,這里事務是沒用的,事務注解的本質是一個切面類,這里只有采用代理對象調用才會生效,直接在本類調用是不經過切面的@Transactionalpublic Result createVoucherOrder(Long voucherId) {//我們的同步鎖就是this,是當前對象//另外呢這個事務的范圍其實是更新數據庫的一個范圍:也就是說做減庫存操作和創建電子操作而不是整個操作//6.一人一單Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//實現同步代碼塊//6.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//6.2.判斷是否存在if (count > 0) {//用戶已經購買過了return Result.fail("用戶已經購買過一次!");}//5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用戶idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回訂單idreturn Result.ok(orderId);}}
}

我們現在呢是在方法內部來去加的鎖?,那之前我們synchronized加在方法上的話,是不是對整個方法加鎖那現在在方法內部加鎖就會存在一個問題啊,比方說我們現在這里開啟事務開始執行,執行了之后我們獲取鎖或鎖了以后,我開始做這個查詢對吧,那然后查詢完了以后,我就減庫存,提交訂單,提交訂單完了以后,我先釋放鎖才會提交事務,對不對啊,因為這個事務是被我們Spring管理了,所以這個事務的提交,它是在我們方法函數執行完以后,由Spring做的提交,那么這個時候其實鎖在這個大括號結束了以后,已經釋放了,那同學們如果鎖釋放了,意味著其他線程是不是已經可以進來了,而此時因為事務尚未提交啊,如果有其他線程進來去查詢訂單的話,那我們剛剛新增的這個訂單很有可能還沒有寫入數據庫,對不對,因為你沒提交,所以說他再去查詢的時候依然不存在,是不是有可能出現并發安全問題,那么因此我們這個鎖它鎖定的范圍有點小了,他應該是把這整個函數鎖起來,那這樣一來應該是事務提交之后,我們再去釋放鎖,對不對啊,那你說那怎么辦呢,那你這個synchronized加在這里其實就不合適了啊,大家要注意這個范圍了啊,那要加在哪里呢,我們把這個刪掉,我們應該是加在這個函數到外邊對吧,把整個函數鎖起來啊,在這個地方去加鎖,那當然了,你在這個地方加鎖的對象是用戶的id嗎,所以說你需要在外邊去獲取這個用戶id,然后上鎖對吧,哎,這么來做才是ok的。

    //4.判斷庫存是否充足if (voucher.getStock() < 1) {//庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {return createVoucherOrder(voucherId);}
}//你會注意到上面那個標黃了,這里事務是沒用的,事務注解的本質是一個切面類,這里只有采用代理對象調用才會生效,直接在本類調用是不經過切面的
@Transactional
public Result createVoucherOrder(Long voucherId) {//我們的同步鎖就是this,是當前對象//另外呢這個事務的范圍其實是更新數據庫的一個范圍:也就是說做減庫存操作和創建電子操作而不是整個操作//6.一人一單Long userId = UserHolder.getUser().getId();//6.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

?注意此時的鎖跟方法的鎖有很大區別:方法的鎖是this對象,而現在的鎖還是user對象??用 Lock 鎖就可以手動解鎖了,比較方便

?直接加載方法上是對方法加鎖,所有線程執行該方法都是單線程的,但是synchronized關鍵字是對相同的對象進行加鎖,不同的對象互不影響,相同的對象為單線程(實際上這里是錯誤的 事務底層是aop實現的 ?因此 調用被包裝方法應該采用aopcontext)

這里還存在一個事務的問題啊,什么事務問題呢?我們在這里是對當前的`createVoucherOrder`函數加了事務,但沒有給外面這個函數加。而外面這個函數在調用的時候是這樣調用的,這種調用其實是用`this`調的,對不對?就省去了一個`this`。那么這種調用的話,其實這個`this`拿的是誰呢?是當前的VoucherOrderServiceImpl`對象,而不是它的代理對象。我們知道,事務要想生效,是因為Spring對當前這個類做了動態代理,拿到了它的代理對象,用它來做的事務處理。那現在你這個`this`其實指的是非代理對象,也就是目標對象,所以它是沒有事務功能的,對不對?這個是我們講Spring事務失效的幾種可能性之一嘛,那這就是一種可能性啊。那么大家在網上應該也都看到過對應問題的解決方案吧,怎么解決呢?其實,在這個地方的話,我們也可以借助一個API,叫做`AopContext`,利用它里面的`currentProxy`方法,那么就能夠拿到當前對象的代理對象了,我們稱之為`proxy`。那當前對象是誰呢?其實就是這個接口`IVoucherOrderService`的代理對象對吧?所以這里就用這個來接收就可以了啊,然后給它做個強轉,它返回的是`Object`,但我們知道它肯定是`IVoucherOrderService`,那我們就拿到了當前代理對象了。這個時候呢,我們用代理對象來調用這個`createVoucherOrder`函數,而不是用`this`,那么這樣的話,事務就能生效了,因為這個代理對象是Spring創建的嘛,所以它是帶有事務的這樣一個函數啊。那現在不存在的原因是因為什么呢?因為這個接口里是不存在這個函數的,我們現在是在實現類里做的對吧?所以,你把這個函數也給它創建一下,創建到我們的`IVoucherOrderService`接口里,那現在接口里有了,我們才能夠基于接口的調用嘛,那現在這個事務就能生效了。

?黑馬SpringWeb的AOP:事務是基于AOP實現 AOP會將目標對象方法增強(在原方法前面加上開啟事務 后面加上提交)生成一個新的代理對象放到bean中 所以只有代理對象有事務功能 而原對象沒有

        Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}//記得在service里加一下

?pom中添加依賴

<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
</dependency>

?為啟動類暴露這個代理對象,默認是不暴露的

@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy=true)
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}
}

?或者在成員變量中直接注入自己,通過注入自己的對象去調用事務

?重新運行成功只賣出一張票

?ctrl+D復制一份,?選擇添加虛擬機選項,就是jvm,是視頻里environment的vm options

可以改下名字,實現兩個兩個

?修改nginx下面的nginx.conf

            proxy_pass_request_headers on;#more_clear_input_headers Accept-Encoding;  proxy_next_upstream error timeout;  #proxy_pass http://127.0.0.1:8081;proxy_pass http://backend;}}upstream backend {server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;}  
}

需要把這里的43行打開,42行注釋掉,然后去把任務管理器的所有ngix的進程通通殺死,然后再重啟服務。

nginx.exe -s reload

是的,不同端口不斷發送,sql中優惠卷恢復原貌,打上斷點

?同一個用戶這里有一個并發鎖的控制,只能一個用戶進來,現在我們打開我們的postman

兩臺tomcat,兩個jvm,兩個字符串常量池,兩把鎖,當然都進來了?

現在我們先來看一下,在正常情況下,我們之前是怎么做的。之前我們是不是單體項目?我們假設把這個灰色的框當成我們的一個單體項目。然后我們啟動以后,其中有一個線程來查詢訂單,然后這個時候查完了以后,他去判斷一下這個訂單是否存在。如果說不存在,他就會去插入這個新訂單。他執行完了以后,另外一個線程來查詢,查詢完了以后,因為這邊已經插入了,所以說這個查詢他再去判斷的時候是否存在,肯定是存在的,是不是就報錯啊,無法插入。這是正常串行執行的情況。

那現在呢,在多線程并發的情況下,在沒有加鎖的情況下,它不可能每一次都是這么正常的串行執行,它可能會出現交叉執行的情況。一旦出現交叉執行,線程一查詢訂單,現在發現不存在,線程二也來查詢,也是不存在。好,緊接著線程一和線程二都去插入新訂單,那么這個訂單是不是就被插入了兩次?這就是線程安全問題產生的原因。

后來我們干了什么事?后來我們加了鎖,對不對?我們加了鎖以后,也就是說一個線程來了以后,必須先獲取鎖,拿到鎖了以后才可以去執行查詢訂單的這個動作,還有判斷的動作。那假如說他拿到以后,他去執行插入新訂單,然后去釋放鎖。此時另外一個線程,他也想要來獲取鎖,但是因為這把鎖在這一時刻已經被線程一拿到了,所以線程二是拿不到的。他會失敗。失敗會干什么?根據synchronized的原理,失敗了以后,他是不會等待的,那它會等待鎖釋放。等了很長時間,這邊釋放了,他終于可以獲取成功了,然后才能執行。那你這個時候再去執行查詢,那邊已經插入了,你再來查是不是已經存在了,那么這個地方就會報錯,不會插入,是不是就確保了串行執行了啊?這是我們理想的情況啊。

那現在發生了什么呢?現在我們不再是一臺機器了,是多臺了。那大家要知道的是,在當前這一個JVM內部鎖的原理是什么?其實是在我們JVM內部維護了一個鎖的監視器對象。這個監視器對象,我們用的是什么?用的是`id`,是不是在我們的常量池里邊?那么在這個JVM內部,它維護了這一個池子。當`id`相同的情況下,他們是不是永遠是同一個鎖?也就是鎖的監視器是同一個。所以說無論是線程一也好,還是線程二也好,他們來獲取鎖的時候,當線程一來獲取鎖的時候,那么鎖監視器就會記錄線程一的這個名稱。那么當線程二再來獲取的時候,他一看說不行,這已經有了,那他還能獲取嗎?不能,就不能了,這就是它的一個原理。

但是呢,當我們做集群部署的時候,一個新的部署就意味著這是一個全新的Tomcat,那就意味著這是一個全新的JVM。也就是說有兩套JVM。那兩個JVM是不是有各自的堆棧、各種方法區之類的?所以說呢,我們的JVM2,它也會有自己的常量池。所以呢,他用`id`作為鎖的時候,它的監視器對象就會有一個全新的鎖監視器了,跟JVM1的那個監視器是不是同一個呀?那現在當我們的線程三來獲取鎖的時候,他走的是自己的這個監視器。那這個監視器顯然是空的呀,他那邊已經有了,在這邊沒有,所以呢他也能獲取鎖成功,對不對?好,當然了,線程四肯定還是失敗的,這沒問題。也就是說我們這個鎖監視器在當前JVM內部可以監視到這些線程實現互斥,但是呢,如果你有多個JVM,就會有多個鎖監視器,是不是每一個JVM內部都會有一個線程是成功的?如果我們將來集群比如說部署了十臺、20臺,也就意味著并行的,至少會有十個線程是同時在運行的,那不就又一次出現了線程安全問題了?明白了嗎?所以產生安全問題的原因是什么?在集群模式下,或者有一些是在分布式系統下,有多個JVM的存在,每個JVM里都有自己的鎖,導致了每一個鎖都可以有一個線程獲取,于是就出現了并行運行,那么就可能出現安全問題。

好,那么要想解決這個問題,我們必須得想辦法讓多個JVM只能使用同一把鎖,對不對?那么這樣的鎖可不是我們JVM里面提供的,需要我們自己去實現。在下節課我們就來講一講這種跨JVM的,或者講跨進程的鎖。

分布式鎖

我們已經發現在集群模式下,`synchronized`的鎖失效了。`synchronized`啊,它只能夠保證單個JVM內部的多個線程之間的互斥,而沒有辦法讓我們集群下的多個JVM進程之間互斥。那要想解決這個問題,我們必須使用分布式鎖。所以這節課呢,來分析一下分布式鎖的工作原理,當然也會動手去實踐一個分布式鎖。

好,下面呢我們先來分析一下分布式鎖的工作原理。那這里呢有兩個JVM。`synchronized`啊,就是利用JVM內部的鎖監視器來控制線程的。在JVM的內部,因為只有一個鎖監視器,所以只會有一個線程獲取鎖,可以實現線程間的互斥。但是當有多個JVM的時候呢,就會有多個鎖監視器,那么就會有多個線程獲取到鎖,那這樣呢就沒有辦法實現多JVM進程之間的互斥了。那要想解決這個問題,我們肯定不能再去使用JVM內部的鎖監視器了吧,我們必須讓多個JVM都去使用同一個鎖監視器。那因此呢,它一定是一個在JVM外部的、多個JVM進程都可以看到的這樣一個鎖監視器。這個時候呢,無論是單個JVM內部的線程,還是多個JVM的線程,都應該來找這個監視器來獲取鎖。那這樣呢也就只會有一個線程獲取到就能夠實現多進程之間的互斥了。

好,那我們來分析一下這個過程。現在呢假設我們的這個JVM1里邊的線程一來去執行業務,那肯定要去獲取鎖。這時候他來獲取呢,就是找我們這個外部的鎖監視器。一旦獲取成功,就會去記錄當前獲取的是這個線程一。那緊接著呢,此時如果有其他線程也來獲取鎖,比方說是在JVM2內部的一個線程三,那么它線程三來獲取鎖。那么這個時候呢,因為鎖監視器里已經有線程一持有了,所以線程三獲取一定會失敗。那失敗了他要怎么辦呢?那它一旦失敗,肯定要去等待鎖釋放,是不是啊?所以呢他就不停地等待。那這個時候呢,我們的線程一獲取鎖成功了,他就可以去執行自己的業務代碼了。比方說,他下面要去查詢訂單,因為一人一單要做判斷,判斷一下這個訂單是否存在。如果說它不存在,那現在他是第一個來的,肯定不存在。那這個時候他就可以去插入新訂單了。好,那么插入之后他的業務完成,他就可以去釋放鎖了。當然了,此時用戶已經有這個訂單了。所以他釋放完了以后呢,這邊的線程三他拿到鎖了。這個時候他去執行業務,獲取鎖成功,執行業務,查詢。那他查詢的時候呢,因為對方已經插入了,他一定能夠查詢到。查詢到代表存在,存在就不能去插入了,所以就報錯了。所以這樣是不是就避免了這種安全問題的發生呢?而且呢,這個時候不僅僅是線程三,那么不管是我們JVM內部的線程,還是多個JVM的線程,比如說新增的線程四其實都是一樣的。只要在線程一獲取鎖成功以后,他們來獲取肯定都是失敗的,因為他們都是找同一個鎖監視器嘛,所以他們都會獲取失敗,那么都會出現一個阻塞等待,就跟線程三一樣。所以呢,這個時候就能夠實現什么呢?無論是單個JVM內部的線程,還是跨JVM的線程,都可以達到一個互斥效果。好,這就是分布式鎖的一個工作原理,其實還是非常簡單的吧。所以它最關鍵的點就是什么?一定要讓多個JVM進程都能夠看到同一個鎖監視器,也就是多進程可見,最后呢還要互斥,也就是只有一個線程能拿到鎖。

?mysql zookeeper 都可以做分布式鎖 只要是獨立于jvm的 都可以

分布式鎖?

分布式鎖:滿足分布式系統或集群模式下多進程可見并且互斥的鎖。分布式鎖的核心是實現多進程之間互斥,而滿足這一點的方式有很多,常見的有三種:

MySQL?RedisZookeeper
互斥利用mysql本身的互斥鎖機制利用setnx這樣的互斥命令利用節點的唯一性和有序性實現互斥
高可用
高性能一般一般
安全性斷開連接,自動釋放鎖利用鎖超時時間,到期釋放臨時節點,斷開連接自動釋放

好的同學們,那比較常見的分布式鎖實現方式主要有這么三大類吧。第一類是使用MySQL,或者說使用數據庫來實現;第二類是使用Redis來實現,也就是類似于這種緩存的;第三類就是使用ZooKeeper這樣的東西來實現。這個東西有些同學可能沒學過,但不重要,因為今天我們的重點是基于Redis來實現。我們先來分析一下這三種實現方式在互斥、高可用、高性能和安全性上的一些差異。

首先,互斥。MySQL怎么來實現互斥呢?我們都知道MySQL數據庫或者其他數據庫都具備一個事務機制,對不對?那么在事務執行的時候,或者說我們在執行寫操作的時候,MySQL是不是自動會給你分配一個互斥的鎖?這樣一來,在多個事務之間是不是就是互斥的,只有一個人能去執行?我們完全可以利用這個原理來實現一個鎖。比方說我們現在有一段業務需要去實現這樣的分布式互斥鎖,那么我們就可以在業務執行前先去MySQL里申請一個鎖,然后去執行我們的業務。當業務執行完了以后,我們去提交事務,那這樣一來鎖不就釋放了嗎?那當我們的業務拋異常的時候,它會自動地觸發回滾,鎖是不是也釋放了呀?這樣一來,我們這個互斥效果還有鎖的釋放和獲取是不是都很容易實現了對吧?這是MySQL或者說數據庫的這種方式,怎么去實現鎖呢?利用數據庫本身的事務鎖去做。

而它的可用性就依賴于MySQL本身的可用性能。我們知道MySQL是可以支持主從模式的,所以說它的可用性應該說還是不錯的。那至于性能,我們所有的性能就會受限于MySQL的性能。我們知道像MySQL它的性能跟Redis相比肯定還是有些差距的,所以它的性能屬于是比較一般,不能說好,但也不是很差。

那它的安全性呢?也就是說一旦出現異常情況,這個鎖能不能及時地釋放呢?其實是可以的。因為我們知道在MySQL里面,你利用這種事務機制去獲取了鎖,但是一旦你系統崩潰了,其實連接斷開以后,鎖是會自動去釋放的,數據也會回滾,對不對?所以說這個問題不用考慮,非常好。這是基于MySQL這樣的數據庫去實現分布式鎖。

那么用Redis去實現呢?這種方式我們其實以前介紹過,它利用的就是`SETNX`這樣的一個互斥命令。還記得嗎?我們是不是說過`SETNX`呢?是指我們去往Redis里`SET`一個數據的時候,只有數據不存在時才能`SET`成功,如果已經存在了,那就會`SET`失敗。那么你想想看,如果我們有無數的線程,大家都來執行`SETNX`,并且`key`是同一個`key`,那肯定只能有一個人成功,其他人都會失敗。那這樣不就實現互斥了嗎?這是它的互斥原理。那將來要釋放鎖也很簡單,我只要把這個`key`刪了,那么其他人是不是就能夠去獲取這個鎖了,就能夠`SETNX`成功了嘛?就這個意思。所以這是它獲取鎖和釋放鎖的原理,我們之前是不是講過的。

當然了,它還有很多一些細節需要我們去考慮。就目前來講,就是基于這個簡單的這樣一個`SETNX`效果就行了。那么它的可用性就不用多說了吧?我們知道Redis不僅僅只是主從,它還支持這種集群模式,所以它的可用性是能夠得到保障的。而性能就更不用講了,Redis性能是遠遠好于我們的MySQL的,所以說它實現鎖的性能也是非常好的。

而安全性上呢?那我們獲取鎖成功,也就是我們`SETNX`執行完了,一旦我的服務出現故障了,那鎖能不能得到釋放呢?這個就不行了啊。因為我們利用`SETNX`是設置了一個`key`到Redis當中,那如果你服務宕機了,將來沒有人去執行這個刪除的動作,那么這個`key`就會一直在那里,那鎖是不是就得不到釋放?那這樣一來其他人也拿不到鎖,是不是就產生這種類似于死鎖的效果了?那么就出現問題了。

所以為了應對這個問題呢,我們在利用`SETNX`獲取鎖的時候,必須想一個辦法,將來一旦出現故障,鎖也能釋放。有什么辦法呢?這里可以直接告訴大家,就是利用Redis當中的`key`的過期機制。我們知道Redis的`key`是可以設置過期時間的。那大家想一下,如果我在`SETNX`獲取鎖的時候,同時給它加上一個過期的時間,那么將來一旦我的服務出現了故障,沒有人去手動釋放,但是一旦到期以后,這個鎖是不是也會自動釋放呀?所以可以利用這個超時時間來解決這個安全性的問題。

不過呢,這個鎖的釋放時間設置多長呢?如果太長了,那這個鎖呀它的無效等待時間就會比較多;那如果時間太短了,那萬一我的業務還沒執行完呢?所以說呢這種方式去做安全性的一個保證是可以的,但是還需要去完善。具體怎么解決呢,我們在后續會給大家去講。

這是Redis,那么還有最后一種就是ZooKeeper了。ZooKeeper呢,它這個鎖的原理其實是利用了它內部的這種節點機制。ZooKeeper內部是可以去創建這種數據節點的,而節點具備這種唯一性、有序性,另外還可以創建這種臨時節點。所謂的唯一性就是我們去創建節點的時候,節點不能重復。所以它有序性就是每創建一個節點,節點的ID就是遞增的。那利用這個特性怎么去獲取鎖實現互斥呢?這里我們可以利用這個有序性來實現互斥。比如說現在我們有很多線程都來在ZooKeeper里創建節點,然后這樣一來,每一個節點它的ID是不是就是單調遞增的?那如果我們約定,ID最小的那個算是獲取鎖成功,那這樣一來是不是就實現了互斥?因為最小的是不是只有一個?當然你也可以利用唯一性,比如說大家都去創建節點,并且這個節點名稱大家都是一樣的,這樣來是不是只有一個人成功?所以也可以,但一般情況下,我們都會利用這個有序性去實現這種互斥。

那這是獲取鎖,釋放鎖也很簡單。將來呢你把節點刪除,是不是你就不是最小的,另外一個人就變成最小的。所以這是釋放鎖。所以ZooKeeper里面,鎖釋放鎖還是比較簡單的。

那從可用性來考慮呢,它也是不錯的,因為ZooKeeper天生也是支持集群的,所以它的可用性很好。而性能上呢,因為ZooKeeper它的集群強調的是強的一致性,而這種強的一致性都會導致它的主從之間做數據同步,去消耗一定的時間,所以它的性能相對來講是比Redis要差一些的。

而安全性來講,它做得也很好。因為它的這些節點往往創建的是臨時節點,這個時候一旦出現故障,比如說服務宕機了,那么斷開連接以后,它的節點會自動釋放掉,所以鎖就等于釋放了。所以這塊兒也不用去考慮。

所以其實從安全性的角度來講,ZooKeeper和MySQL它們兩個原理基本上類似,安全性是比較好的,比Redis來講要稍微好一些。Redis只能利用這種超時機制去做。那么但是呢從可用性和性能上來講,Redis是非常好的。那么因此呢,接下來我們就會帶著大家去學習一下如何基于Redis來實現分布式鎖。那至于MySQL和ZooKeeper的這種方式,我們在這里就不多講了,因為我們整個教程主要是來學習Redis的。那么大家如果有興趣,也可以去網上看一看其他兩種方式實現的一個原理。

實現分布式鎖時需要實現的兩個基本方法:
獲取鎖:
互斥:確保只能有一個線程獲取鎖

#添加鎖,利用setnx的互斥特性
SETNx lock thread1
#添加鎖過期時間,避免服務巖機引起的死鎖
EXPIRE lock 10

釋放鎖:
手動釋放
超時釋放:獲取鎖時添加一個超時時間

#釋放鎖,刪除即可
DEL key
實現真正的原子性
#添加鎖,NX是互斥、EX是設置超時時間
SET lock thread1 NX EX 10

?非阻塞:嘗試一次,成功返回true,失敗返回false

Redis優化秒殺

基于Redis實現分布式鎖初級版本

需求:定義一個類,實現下面接口,利用Redis實現分布式鎖功能。

?在utils定義Ilock接口和SimpleRedisLock類

package com.hmdp.utils;public interface ILock {/*** 嘗試獲取鎖* @param timeoutSec 鎖持有的超時時間,過期后自動釋放* @return true代表獲取鎖成功; false代表獲取鎖失敗*/boolean tryLock(long timeoutSec);/*** 釋放鎖*/void unlock();
}
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;//鎖的名稱,你可以給鎖加一個統一前綴private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {//獲取線程標識long threadId = Thread.currentThread().getId();//值不能隨便設置,因為會有特殊情況,第一個線程執行到刪除鎖之前,key過期了,//其他線程就進來執行操作,這樣就會把其他線程的鎖給刪了,這個值會用來比較是否是該線程的鎖。具體可以看谷粒商城p158//獲取鎖Boolean success =stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//釋放鎖stringRedisTemplate.delete(KEY_PREFIX+name);}
}

修改?VoucherOrderServicelmpl.java中機制改為自己實現的鎖

/***************************************************///4.判斷庫存是否充足if (voucher.getStock() < 1) {//庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();//創建鎖對象SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);//獲取鎖boolean isLock=lock.tryLock(1200);if(!isLock){return Result.fail("不能重復下單");}try {//獲取代理對象(事務)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {//釋放鎖lock.unlock();}//try finally 事務可以生效,因為沒有捕獲異常。如果catch捕獲了異常,需要拋出RuntimeException類型異常,不然事務失效。}
/*************************************************/

要每個用戶都能拿到自己的鎖,因為這里加鎖的目的就是為了實現一人一單,超賣是通過update自帶的行鎖解決的,不是分布式鎖解決的;如果 同一個人在鎖過期了之后再來下單不是又能重復下單了嘛? 所以鎖的過期時間要久一點

這種情況是同一個用戶發了多次請求的情況吧,如果是不同的用戶,那么userId不一樣的,那么key就不一樣(最主要是因為線程無法分清楚自己的鎖)

好的同學們,在剛剛我們已經實現了基于Redis分布式鎖的一個初級版本。在這個版本當中,獲取鎖的時候,我們采用了`SETNX`和`EX`的這種方式來實現這個互斥。在釋放鎖的時候呢,我們采用的是`DEL`直接把鎖刪掉,那么其他人就能夠去獲取鎖了,實現方式非常的簡單。在大多數情況下,這個鎖都能夠正常工作,但是,在一些極端情況下,它依然會存在一些問題。所以這節課我們就來分析一下,它可能存在什么樣的問題。

在這兒我首先準備了這樣一個時間線,我們可以把它理解成Redis鎖的一個持有周期。然后呢,在這里我們假設說來了一個線程,這是一個線程1。那么這個線程1他在執行的過程中,首先要嘗試去獲取鎖對吧,那么他就會向Redis去發起一個請求,要獲取鎖。那因為他是第一個來的,所以說他能夠正確的獲取鎖,沒有任何人去阻攔他,對吧。那么這個地方我用藍色的線來標識,就是當前這個線程1拿到了鎖。然后拿鎖了以后呢,他要開始去執行自己的業務了,對不對?但是因為某種原因,他的業務產生了阻塞,那么這樣一來,他的鎖的持有周期是不是就會變長?哎,到什么時候為止呢?兩種情況,一種呢就是他執行完了,他去釋放,還有一種啊就是他阻塞時間太長了,甚至于超過了我們設置的那個超時時間。大家別忘了,我們獲取鎖的時候,是不是有一個`EX`超時時間,它就是一個兜底方案啊,避免就是因為沒有人釋放鎖。雖然說我們設了個超時時間,但是如果這個業務阻塞的太久了,以至于都超過了這個超時時間了,同學們仔細看,那么這個時候是不是也會觸發這個鎖的一個超時釋放啊?那么也就是說業務還沒完呢,他提前釋放了。那么它一旦提前釋放,同學們先來看,這個時候其他線程再來獲取鎖的時候,是不是就能趁虛而入,獲取成功了?如果說這個線程2此時來獲取啊,他就成功了,這里是用紫色標識的,對吧。

那他成功以后是不是要去執行自己的業務了?沒錯。而就在線程2剛剛獲取鎖了以后,假設說線程1醒了,那么他的業務完成了。同學們,他的業務完成了以后,他要干什么?沒錯,業務完了肯定要釋放鎖,對吧?那我們釋放鎖的動作是怎么做的?我們是不是直接`DEL`,也就是說我們的線程1二話不說,上來直接來了個`DEL`刪除,于是呢鎖被釋放了。誰的所謂什么了?誒,紫色的這個,這是線程2的鎖被釋放了。同學們,線程1把別人的鎖是不是給釋放了呀?那這個時候我們信號知不知道?唉,不知道,現在還在去執行自己的業務呢。就在這時,線程3來了,他趁虛而入,他說哎我也來獲取一下,結果是因為鎖被刪了,他也能獲取成功,開始執行自己的業務。同學們想要看,此時此刻是不是就同時有兩個線程都拿到了鎖,都在執行業務。所以又一次出現了什么呀?這種并行執行的情況,那么線程安全的問題就有可能再次發生。唉,這就是所謂的極端情況啊,同學們。我們分析一下這種極端情況產生的原因是什么?首先呢是因為業務阻塞導致了我們這個鎖提前釋放了,好,這是第一。其二呢,當這個哥們兒醒過來以后,這個時候的鎖已經不是線程1的鎖,而是線程2的鎖,但是線程1二話不說,上來就把別人的鎖給刪了。這就像什么呢?啊,你那個放了學了,去這個自行車棚里啊,想去推自己的自行車,當然了,你得先開鎖,你在這開了半天發現打不開是吧?啊,于是你一氣之下拿了一個大鉗子,直接把鎖給剪斷了,剪斷了以后抬頭一看不是自己的車,是不是啊?你這個鎖就給弄錯了,你把別人的鎖給剪了。好,就是這個問題。所以這里歸根結底要發生安全問題的最重要的原因就是線程1在釋放鎖的時候,他把別人的鎖給刪了,對吧?如果說你在釋放鎖的時候,不要那么著急,你先看一眼這個鎖是不是你的了,你有沒有資格去釋放沒有?所以說要想解決這個問題,關鍵在于什么?是不是在釋放鎖的時候要去做一個什么判斷,對吧?判斷什么?判斷一下鎖的標識是否一致,什么意思呢?

就是說當你醒了以后,你要去釋放鎖的時候,你要看一看這個鎖里面存的這個標識啊,跟你當前線程是否一樣的。大家別忘了啊,我們之前在去獲取鎖的時候,是不是存了一個線程標識進去。你看一下我們之前獲取鎖的時候,存在這個`33`是不是就成了個線程的標識,還記得吧。好,那如果說我在釋放鎖的時候,我把這個線程標識啊取出來,判斷一下跟我當前這個線程是否一樣,是不是就可以避免這個問題了?如果發現不一樣,比如說當前是紫色的啊,我是藍的,那肯定不行,那么我就什么都不做,它表示內容代表不一致,我就什么都不做。這樣一來是不是就會避免誤刪別人的鎖啊?那同樣道理的線程二也是,線程二呢,他去執行自己的業務的時候,但因為線程一沒有刪他的鎖,這樣來的鎖依然存在,因此線程二是不是就可以正常的執行,不受干擾的去執行了,就不會存在剛才那種情況。

那么當線程二執行業務完了以后,他卻釋放鎖的時候,他也要做這個動作,任何一次釋放鎖,咱們以后都要做這個動作判斷鎖標識。那因為這個地方鎖呢依然是紫色跟它一致,所以標識一樣,跟自己返回,ok,那沒事呢,他是不是就可以正常的去釋放鎖了啊?于是再發一次請求去做釋放鎖,那么它的鎖就被釋放掉了。那么此時線程三才能夠去獲取鎖,去執行自己的業務。那這樣一來是不是就避免了這個問題的發生?

好,那我們總結一下,其實要解決這個問題的關鍵就是什么呀?是不是在釋放鎖時做一個判斷啊。那因此我們這個業務流程與原來相比,就會有一些變化。以前呢我們這里上來就去釋放鎖,現在不行了。現在第一,在獲取鎖的時候,你要把線程的這個標識給他存進去啊,我們之前存的是不是線程id,反正不管怎樣,你要存一個標識進去。第二,在執行完了業務要釋放鎖的時候,你一定要判斷一下這個標識是不是自己的,如果是你才能釋放,如果不是,你是不是就不能釋放呀?其實這個時候不是就代表你的鎖早就沒了,你就不用管就行了嗎?也就是說我們業務就變成這樣一個樣子。ok,這就是我們剛才那個分布式鎖它所存在的一個問題,以及對應的這個解決方案了。

?改進Redis的分布式鎖

需求:修改之前的分布式鎖實現,滿足:
1.在獲取鎖時存入線程標示(可以用UUID表示)
2.? 在釋放鎖時先獲取鎖中的線程標示,判斷是否與當前線程標示一致
如果一致則釋放鎖
如果不一致則不釋放鎖

?SimpleRedisLock.java

    private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";@Overridepublic boolean tryLock(long timeoutSec) {//獲取線程標識String threadId = ID_PREFIX + Thread.currentThread().getId();//值不能隨便設置,因為會有特殊情況,第一個線程執行到刪除鎖之前,key過期了,//其他線程就進來執行操作,這樣就會把其他線程的鎖給刪了,這個值會用來比較是否是該線程的鎖。具體可以看谷粒商城p158//獲取鎖Boolean success =stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//釋放鎖//這里static會使同一個jvm的所有線程的uuid一樣,但是線程id不一樣,而兩個jvm的線程id有可能相同,但是uuid不一樣//相同jvm有線程號id區分,不同jvm有uuid區分//獲取線程標示String threadId=ID_PREFIX+Thread.currentThread().getId();//獲取鎖中的標示IString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判斷標示是否一致if(threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}

uuid是區分不同服務器的,線程id是區分相同服務器的,static final的功能就是讓同一個JVM進程只有一個UUID,這樣就可以通過UUID區分不同的JVM進程,用線程id區分相同JVM進程下的線程

那同樣呢,是前面這個Redis鎖的一個周期,下邊呢是一個線程。我們假設線程一上來去申請這個鎖,因為就他自己嘛,所以這個獲取鎖肯定能夠成功。于是他開始執行自己的業務。現在我們假設這個業務并沒有阻塞啊,他成功執行完了。緊接著他要去釋放鎖了,而且釋放鎖是不是要先判斷鎖標識?于是他去判斷鎖標識,那么這個判斷,因為鎖是他自己,是不是一定是成功的,所以返回的是ok啊。

好,那么緊接著他要干什么了,同學們?緊接著是不是要執行釋放鎖的動作了?同學們注意了,判斷鎖標識和釋放鎖是兩個動作啊,所以說呢判斷是成功了,緊接著要釋放。但是就在要釋放時產生了阻塞,有這種可能性吧?唉,有的人會說了,這怎么會阻塞呢?判斷完了直接就是釋放了,中間又沒有別的代碼,為什么會堵塞呢?啊,同學們想想看啊,其實在JVM里有一個東西啊,叫做垃圾回收,對不對?所以當我們的JVM去做這種服務,那么它其實是會阻塞我們的所有的代碼,所以這個時候就會產生阻塞。不是因為你的業務阻塞,是因為這邊我們本身產生阻塞,是有這種可能性的啊。

那一旦發生了阻塞,那也就是說我輪到我去釋放了啊,但是我被阻塞了,是不是就沒有釋放呀?而這個阻塞的時間如果足夠長,很有可能就觸發了我們那個鎖的一個超時釋放。那么鎖一旦超時釋放,其他的線程是不是就又可以趁虛而入了?比如說線程二,他呢就來獲取鎖,那因為這個鎖被超時釋放掉了,他是不是可以成功獲取?于是他開始執行自己的業務。

而就在他獲取鎖成功的那一刻,如果說現在GC結束了,那么阻塞結束我們的線程恢復運行,而此時他去執行什么動作?哎,沒錯,要執行釋放鎖的動作了。因為判斷是不是已經執行過了,他認為鎖還是自己的,但其實現在鎖是自己的嗎?不是,已經是線程二的了,對不對?但是呢它不判斷了,因為已經判斷過了,所以直接執行釋放鎖,于是就把線程二的鎖給刪掉了,又一次發生了誤刪。那么此時又來了個線程三,趁虛而入,獲取鎖成功執行自己的業務。你看這種并發的問題是不是又一次發生了?

所以同學們這一次產生的原因是什么?我們做了判斷了,對不對?那為什么又出問題了?唉,就是因為判斷鎖標識和釋放是兩個動作,這兩個動作之間產生了阻塞,最后出現了問題。那因此要想避免這個問題的發生,我們必須確保什么?必須確保判斷鎖標識的動作和釋放鎖的動作這兩個得成一個原子性的操作,也就是說一起執行不能出現間隔。那我們怎么樣保證兩個動作的原子性呢?是鎖的值不一樣啊,不是鎖的鍵不一樣啊,鎖名稱一樣,但是鎖的線程標識不一樣。

實際上就是線程1判斷后發現鎖的線程標識和當前線程一樣,于是根據鎖名釋放鎖,但是業務阻塞,鎖超時釋放,線程2拿到同樣名稱的鎖,線程1根據鎖名稱把線程2的鎖誤刪了。

lua腳本

Redis提供了Lua腳本功能,在一個腳本中編寫多條Redis命令,確保多條命令執行時的原子性。Lua是一種編程語言,它的基本語法大家可以參考網站:https://www.runoob.com/lua/lua-tutorial.html

這里重點介紹Redis提供的調用函數,語法如下:#執行redis命令
redis.call('命令名稱','key','其它參數', ...)例如,我們要執行setnamejack,則腳本是這樣:#執行 setname jack
redis.call('set','name','jack')例如,我們要先執行setnameRose,再執行getname,則腳本如下:#先執行 setname jack
redis.call('set','name','jack')
#再執行getname
local name = redis.call('get', 'name')
#返回
return name

理解成java的面向對象,前面那個redis就是提供了一個對象

寫好腳本以后,需要用Redis命令來調用腳本,調用腳本的常見命令如下:

127.0.0.1:6379>help @scriptingEVAL script numkeys key [key...] arg [arg...]summary:Execute a Lua script server sidesince: 2.6.0

例如,我們要執行redis.call('set','name',jack")這個腳本,語法如下:
#調用腳本
EVAL "return redis.call('set','name','jack')" 0
前面是腳本內容? ? ? ? ? ? ? ? ? ? ? ? 0是腳本需要的key類型的參數個數?

進入centos7,依次輸入以下操作:

redis-cli
AUTH 123321  #你的密碼
EVAL "return redis.call('set','name','Rose')" 0keys * # 可以通過這個看一下是否錄入

?如果腳本中的key、Value不想寫死,可以作為參數傳遞。key類型參數會放入KEYS數組,其它參數會放入ARGV數組,在腳本中可以從KEYS和ARGV數組獲取這些參數:

?釋放鎖的業務流程是這樣的:
1.獲取鎖中的線程標示
2.判斷是否與指定的標示(當前線程標示)一致
3.如果一致則釋放鎖(刪除)
4.如果不一致則什么都不做

-- 獲取鎖中的線程標示getkey
local id =redis.call('get',KEYS[1])
--比較線程標示與鎖中的標示是否一致
if(id ==ARGV[1]) then
--釋放鎖delkeyreturn redis.call('del', KEYS[1])
end
return 0

?lua再次改進Redis的分布式鎖

Lua本身不具備原子性的,是因為redis執行命名是單線程執行的,它會把Lua腳本作為一個命令執行,會阻塞期間接收到其他1線程命令,這就保證了Lua腳本的原子性

提示:RedisTemplate調用Lua腳本的APl如下:

在resource文件夾下設置unlock.lua文件,

--比較線程標式與鎖中的標識是否一致
if(redis.call('get',KEYS[1])==ARGV[1])then--釋放鎖 del keyreturn redis.call('del',KEYS[1])
end
return 0

?simpleredislock如下

package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.yaml.snakeyaml.events.Event;import java.util.Collections;
import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;//鎖的名稱,你可以給鎖加一個統一前綴private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript <Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {//獲取線程標識String threadId = ID_PREFIX + Thread.currentThread().getId();//值不能隨便設置,因為會有特殊情況,第一個線程執行到刪除鎖之前,key過期了,//其他線程就進來執行操作,這樣就會把其他線程的鎖給刪了,這個值會用來比較是否是該線程的鎖。具體可以看谷粒商城p158//獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX + Thread.currentThread().getId());}/*@Overridepublic void unlock() {//釋放鎖//這里static會使同一個jvm的所有線程的uuid一樣,但是線程id不一樣,而兩個jvm的線程id有可能相同,但是uuid不一樣//相同jvm有線程號id區分,不同jvm有uuid區分//獲取線程標示String threadId=ID_PREFIX+Thread.currentThread().getId();//獲取鎖中的標示IString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判斷標示是否一致if(threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}*/}

基于Redis的分布式鎖實現思路:
利用setnxex獲取鎖,并設置過期時間,保存線程標示
釋放鎖時先判斷線程標示是否與自己一致,一致則刪除鎖
特性:
利用setnx滿足互斥性
利用setex保證故障時鎖依然能釋放,避免死鎖,提高安全性
利用Redis集群保證高可用和高并發特性?

?只是解決了誤刪鎖,但是并沒有完全解決一人多單的可能,你把兩個的斷點都打到查詢用戶是否下單的那個count的那里,可能兩個線程得到的都是0,這樣還是會一人多單(業務沒走完鎖唄釋放了,就出現一人多單的問題了,這里并沒有解決一人多單問題)

ReentrantLock不是分布式鎖吧?

Redisson

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-MemoryDataGrid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。

  • 8.分布式鎖(Lock)和同步器(Synchronizer)
  • 8.1.可重入鎖(Reentrant Lock)
  • 8.2.公平鎖(Fair Lock)
  • 8.3.聯鎖(MultiLock)
  • 8.4.紅鎖 (RedLock)
  • 8.5.讀寫鎖(ReadWriteLock)
  • 8.6.信號量(Semaphore)
  • 8.7.可過期性信號量(PermitExpirableSemaphore)
  • 8.8.閉鎖(CountDownLatch)

官網地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson

在pom.xml中添加依賴

        <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>

在config中新建RedissonConfig,按照自己的redis配置輸入

package com.hmdp.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redisson() {//配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.111.130:6379").setPassword("123456");//創建RedissonClient對象return Redisson.create(config);}
}

voucherOrderserviceimpl中的鎖對象修改

        Long userId = UserHolder.getUser().getId();//創建鎖對象//SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:"+userId);//獲取鎖boolean isLock=lock.tryLock();//其實是3個參數,也可以選擇無參if(!isLock){return Result.fail("不能重復下單");}

RedissonTest.java

package com.hmdp;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@Slf4j
@SpringBootTest
class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp() {lock = redissonClient.getLock("order");}@Testvoid method1() throws InterruptedException {// 嘗試獲取鎖boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);if (!isLock) {log.error("獲取鎖失敗 .... 1");return;}try {log.info("獲取鎖成功 .... 1");method2();log.info("開始執行業務 ... 1");} finally {log.warn("準備釋放鎖 .... 1");lock.unlock();}}void method2() {// 嘗試獲取鎖boolean isLock = lock.tryLock();if (!isLock) {log.error("獲取鎖失敗 .... 2");return;}try {log.info("獲取鎖成功 .... 2");log.info("開始執行業務 ... 2");} finally {log.warn("準備釋放鎖 .... 2");lock.unlock();}}
}

Redisson可重入鎖原理

自定義的分布式鎖有一個缺陷,就是無法實現可重入,而Redis它就可以做到。那這是為什么呢?這節課我們就一起來研究一下。咱們自定義的分布式鎖采用的是`SETNX`和`EXPIRE`命令,也就是簡單的key-value,整個鎖的流程是這樣子的:

在一開始嘗試獲取鎖,其實就是執行這個`SET`命令,當然要加上`NX`和`EX`參數。那`NX`的目的就是為了實現一個互斥,滿足分布式鎖的基本要求。同時呢,在獲取鎖的時候,我們要存入這個線程的標識,其目的就是將來在釋放鎖的時候做判斷,避免誤刪,只有鎖是自己的才去做這個刪除。

那么這樣的一個流程為什么不能重入呢?我們一起來看一下這樣一個demo。

// 創建鎖對象
RLock lock = redissonClient.getLock("lock");@Test
void method1() {boolean isLock = lock.tryLock();if(!isLock){log.error("獲取鎖失敗,1");return;}try {log.info("獲取鎖成功,1");method2();} finally {log.info("釋放鎖,1");lock.unlock();}
}void method2(){boolean isLock = lock.tryLock();if(!isLock){log.error("獲取鎖失敗,2");return;}try {log.info("獲取鎖成功,2");} finally {log.info("釋放鎖,2");lock.unlock();}
}

首先在這里我們會去創建一個鎖的對象,接下來呢有一個測試方法`method1`,在`method1`里會首先嘗試獲取鎖,獲取鎖了以后,如果失敗,他就會報錯,而如果成功,那么他就會去調用一個方法`method2`,而在`method2`里又一次嘗試獲取鎖。那么`method1`和`method2`啊,他們兩個是在一個線程里的,那一個線程連續兩次去獲取鎖,這其實就是鎖的重入了。

那我們來看一下,如果按照我們這個流程,它能不能實現重入。首先`method1`嘗試獲取鎖,那按照我們這里就會去`SET`這個鎖名稱以及鎖標識進去,那在這就是`lock`以及比如說這個線程名叫`t1`,我們把它存進去了。接下來呢往下執行調用這個`method2`,那么`method2`又一次嘗試獲取鎖,那么又要執行這個`SET lock t1`的命令,那因為這里加了`NX`的一個參數,也就是說只有第一個能成功,那這里已經有值了,所以說`method2`在執行的時候一定是失敗。所以說我們就沒有辦法實現重入了。

那怎么樣解決這個問題呢?其實啊,要想實現可重入鎖,我們可以參考JDK里面提供的`ReentrantLock`,它的原理啊。那如果大家不知道我們JDK的`ReentrantLock`它的原理的話,建議大家也去B站上搜索,我們黑馬程序員的一個課程啊,叫做Java并發編程(JUC)。你在這搜Java并發編程排第一的,這個up主是黑馬程序員的,那就是我們出品的一個課程了。在這個并發課程里邊,其實就有講到`ReentrantLock`它的原理。

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已經獲得了鎖,線程還是當前線程,表示發生了鎖重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}// sync 繼承過來的方法,方便閱讀,放在此處
protected final boolean tryRelease(int releases) {int c = getState() - releases;if ((Thread.currentThread() != getExclusiveOwnerThread()))throw new IllegalMonitorStateException();boolean free = false;// 只有鎖重入,只有 state 減為 0,才釋放成功if (c == 0) {
/**************************************************************/free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}

那他為什么能實現可重入呢?哎,在這簡單給大家說一下啊,其實所謂的可重入無非就是在獲取鎖的時候,當我判斷這個鎖已經有人的情況下,那我會看一下是不是我自己,也就是說是不是同一個線程。如果是同一個線程的話,我也會讓他獲取鎖,但是要多做一件事,就是有一個計數器,去記錄你重入的次數,就是說你總共獲取了幾次。那么你在`tryAcquire`,就是獲取鎖的時候,我們這個次數會去累加,不斷加一,不斷加一。而將來你再去`release`,釋放鎖的時候,這個次數會減一。哎,這就是可重入的一個基本的一個原理啊。

因此再去實現時,也可以參考它。那也就是說我們其實不僅僅要在鎖里邊要去記錄獲取鎖的線程,還要去記錄這個線程它的重復的次數,總共拿了幾次。每拿一次次數就加一,讓同學們思考一下,現在呢我們既要記錄線程標識,又要記錄重復的次數,那這樣一來,我們的這個`String`類型的結構顯然就不行了吧。什么樣的數據結構可以滿足,在一個key里存儲兩個東西?其實可以使用它里面的哈希結構,因為哈希結構,它的value里面是不是可以又分成兩個,一個field的一個value。那這個時候呢,我們就可以在key的位置記錄鎖的名稱,然后在field的位置去記錄線程標識,在value位置記錄這個鎖的重復次數

KEYVALUE
fieldvalue
lockthread11

比方說在這里,我們創建鎖對象,然后去`tryLock`,那么第一次來的時候,在這里我們會記錄這個線程標識,并且把這個次數重置為1,因為他是第一個來的嘛。好,然后往下走,我們認為它會鎖成功了嗎?那接下來調用`method2`,而在`method2`的又一次嘗試過去鎖,那么他首先要做的肯定是看一看這個鎖是不是有人了。那這個鎖已經有人了,這個時候不代表失敗,還要再判斷一下獲取鎖這個線程是不是我自己呀。哎,那因為`method1`是調`method2`,所以他倆是在一個線程里的,所以這個標識肯定是一樣的呀。那標識一樣怎么辦,ok,只需要把重復的次數怎么樣,哎,加一就可以了,代表呢我是第二次獲取鎖了。以此類推啊,如果說再有重復,那就繼續累加三、四,這樣子好了。這個時候`method2`拿到鎖了,已經重入了,是不是可以執行自己的業務了呀?好,執行完了自己的業務最終要干什么,要釋放鎖。那么釋放鎖能不能像以前一樣啊,首先判斷一下標識是不是自己哦,一判斷是的,就是我,然后接下來直接刪除,能不能這么做?哎,答案是不能啊。同學們思考一下,如果我在這啊,在`method2`里直接把這個鎖刪了,你要知道,`method2`結束并不代表整個業務結束吧,因為`method2`出來以后,還要去執行`method1`的剩余業務。雖然我們這沒寫,但是很有可能在真實場景下是有其他業務的。那也就是說在`method1`業務尚未執行完的時候,`method2`直接就把鎖給刪了,那么此時是不是就有其他線程可以進來了,是不是就有可能發生一些并發的安全問題了?

所以說啊,對于這種可重入鎖來講,在內部被調用的方法里邊,釋放鎖的時候是不能直接刪除鎖的。那么他采取的措施是什么,是把重入次數減一。也就是說每釋放一次我就減一就行了。那么問題來了,我們什么時候才能釋放鎖,真的把這個鎖給他刪除呢?哎,那肯定是當我們所有的業務都執行完,走到最外層的這個方法,它在結束釋放時才能刪除。但問題來了,我們怎么知道這個業務釋放的時候,是不是已經到最外層了呢?讓大家思考一下啊,我們每次獲取鎖這個重復次數就會加一,而每次釋放鎖這個值會減一。但是在方法當中,獲取鎖和釋放鎖都是成對出現的,也就是說你加了多少次,將來釋放鎖的時候就一定會減多少次。那同學們當方法走到最外層時,這個重復次數的值一定會被減成零,對不對?比方說現在我`method2`,他在釋放鎖時,這個值已經減到了一好,他業務結束了,走到了`method1`,`method1`就是最外層。那此時又一次釋放鎖,那么又減一次,它的值就變成了零。所以說呢,我們每一次在釋放鎖的時候,除了要去把重復次數減一以外,還應該判斷一下這個值是不是已經為零了。如果已經是零了,那就證明我已經到了最外層的這個方法了,也就是說沒有其他業務需要執行了,此時你就可以放心大膽地把這個鎖刪除了。如果說不是零,證明啊還有別的業務,那么你就不要管了,交給其他業務去處理就行了。這就是我們可重入鎖的最終原理了,也就是說我們利用哈希結構代替了字符串結構,不僅僅存儲線程標識,還有存儲可重入的次數。那因此呢,獲取鎖和釋放鎖的操作啊,跟以前就會有很大的差別了。

那以前我們采用的是`String`類型的結構,采用的命令呢是`SET NX EX`,其中`SET NX`啊是來判斷鎖是否存在實現互斥,而`EX`是設置過期時間。但是現在呢我們用的是哈希結構,哈希結構里可沒有這樣的組合命令啊,沒有`NX`這樣的東西。所以說呀,我們只能是把之前那個邏輯給它拆開,先判斷鎖是否存在手動判斷啊,沒有`NX`怎么判斷呀?其實就是用一個`EXISTS`判斷一下這個key啊,是不是存在。那結果有兩種啊,要么不存在,那么存在。如果不存在,那就好辦了呀,證明我是第一個來的呀,就像這個`method1`對吧?那怎么辦呀,肯定是來去獲取鎖標識啊,添加進去,并且這個值要改成一對不對?唉,這是第一次獲取啊。好了,那獲取完了以后沒結束啊,因為在之前除了`NX`,還有`EX`,也就設置過期時間是個兜底方案。那在這兒呢我們同樣要收入過幾時間,那只不過改成了手動去設置,改成了`EXPIRE`的命令好。那只有這兩步都走完了,才等于以前那個`NX`和`EX`,這個時候獲取鎖才算成功,你才可以去執行自己的業務啊。

那如果說我們判斷的時候,這個鎖它已經存在了呢,就相當于是現在掉了`method2`,他又來獲取了。那如果已經存在,是不是我就直接失敗了呢?如果是以前是這樣子啊,但這不是因為我們是允許重入,所以說呢我們在這判斷鎖已經存在的情況下,我們還要看一下這個標識啊,是不是自己是不是同一個線程。所以說有一個判斷線程標識的一個業務邏輯啊。如果說判斷完了以后發現不是自己,那證明這個鎖根本就不是自己拿的,那這個時候就真的是失敗了。但是如果我們判斷完了,發現這個標識就是自己啊,同一個線程,那證明是重入。那重入我們之前說了,只需要把次數怎么樣,哎,加一是不是就可以了啊?那次數加一了以后就可以去執行`method2`了。當然了,在執行`method2`的時候,我們還要先重置一下有效期,因為啊在`method1`執行的過程中,可能已經消耗了一定的鎖的時間了,所以在這最好重置一下,讓`method2`有充足的時間去執行自己的業務。那到這兒啊,其實就完成了一次鎖的重入了。

好,那等`method2`執行完自己的業務,是不是就可以去釋放鎖了呀?那釋放鎖的時候,以前我們是先判斷鎖是不是自己的,如果是我們再去刪啊,避免誤刪了。這里呢其實也是類似,首先呢肯定也得先判斷鎖是不是自己的。如果說這個鎖根本就不是你的,那證明什么,證明你這個鎖啊已經釋放了,可能是超時了,對不對?所以說呢這種情況下你就不要管了。但是如果這個鎖是自己的,那么我們就要去釋放了。當然這個釋放不是上來就刪,我們講了,我們要先把鎖的重復次數減一啊,然后呢才能去判斷一下什么呢,這個值是不是已經到零了啊。如果說他不是零,那證明啊這個時候我們根本就不是最外層,還有別的業務,所以說我們不能刪,而是干什么呢,哎,重置有效期啊。然后去執行其他業務去。需要注意的是呢,這里同樣需要去重置鎖有效期啊,給后續業務執行啊留夠充足的執行時間啊。比方說這里`method1`,當`method1`業務執行完畢,執行到釋放鎖的時候,同樣還會走這個邏輯。首先呢重復次數減一啊,然后呢判斷鎖標識看它是否為零,結果一判斷呀,哎,現在真的是零,那就說明已經到了最外層了。那這個時候就可以放心大膽的去釋放鎖了。哎,這個呢就是我們整個釋放鎖的一個流程了。

那大家可以看到啊,無論是獲取鎖還是釋放鎖,跟以前相比啊,是不是就復雜很多了?而且呢這個地方啊代碼有多個步驟,所以說呀,像這樣的邏輯,咱不可能啊再用Java代碼去實現了。那這套東西一定要采用什么哎,Lua腳本來確保獲取鎖和釋放鎖的原子性。那這個Lua腳本又該怎么寫呢?

local key = KEYS[1]; -- 鎖的key
local threadId = ARGV[1]; -- 線程唯一標識
local releaseTime = ARGV[2]; -- 鎖的自動釋放時間
-- 判斷是否存在
if(redis.call('exists', key) == 0) then-- 不存在,獲取鎖redis.call('hset', key, threadId, '1');-- 設置有效期redis.call('expire', key, releaseTime);return 1; -- 返回結果
end;
-- 鎖已經存在,判斷threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then-- 不存在,獲取鎖,重入次數+1redis.call('hincrby', key, threadId, '1');-- 設置有效期redis.call('expire', key, releaseTime);return 1; -- 返回結果
end;
return 0; -- 代碼走到這里,說明獲取鎖的不是自己,獲取鎖失敗

在這呢我就帶著大家簡單來看一看啊,我們就不一個一個去寫了啊。其實流程跟這些差不多嘛。首先呢在這里有三個參數啊,這里的key啊就是我們鎖的這個名稱就是個`lock`,然后這里的`t1`啊就是線程的標識,`releaseTime`是鎖的釋放時間。與之前相比啊,要多了這樣一個參數,為什么需要釋放時間呢?因為在獲取鎖的過程當中,我們要去設置有效期,所以說呢這個也作為參數之一了。然后呢腳本就開始了啊,那腳本開始以后,首先要判斷的就是鎖是否存在,我們講的就是`lock`是否存在,不是用`NX`啊,是手動判斷的,所以在這里用了`EXISTS`命令,那這個key呢比如說是`lock`,這里就是`EXISTS lock`,看是否存在。那返回零或者一,如果是零,就證明是不存在好。那不存在,說明你是第一個,這個時候就可以放心大膽來獲取鎖,就是存一個值進去存儲,因為是哈希結構,所以用的是`HSET`好,指定key,在指定線程標識,在指定這個一,也就是第一次來的時候做這個初始化的動作。這個工作做完了以后呢,不要忘了還要給鎖設置有效期,也就是執行這個`EXPIRE`命令,然后呢設置這個釋放時間,也就是`releaseTime`。到這兒呢啊,獲取鎖其實就算是成功了。那這一部分啊其實對應的就是我們會為鎖離啊,這一部分的一個流程啊,也就是說是鎖不存在的情況。那如果說鎖已經存在了呢,那鎖如果存在,我們就要去判斷鎖標識啊是不是自己的。那怎么判斷呢?唉,這里我們傳了一個線程的標識threadId,你就判斷一下這個標識啊,在我們這個哈希里邊存不存在就行了。所以這里用了`HEXISTS`命令,傳這個key,傳這個線程標識。如果說它存在,那么結果就是一,這個時候就證明這個鎖是自己的好。那鎖如果是自己的干什么呀?我們是不是鎖的這個重復的次數要去加一啊,對不對?重復次數加一,所以說在這執行了個`HINCRBY`,就是自增的意思啊。那也就是說對應的這個key對應這個標識,然后加一啊。也就是執行這個加完了以后重置有效期,所以大家看又一次`EXPIRE`啊,所以呢這一段邏輯其實也就是我們剛換的啊,這一條線就這條線這個分支。那也是一種獲取鎖成功的情況啊。那最后一種情況呢,也就是說鎖標識不是自己的,跟你沒有任何關系,也就鎖就失敗了。那也就是做到這`return 0`,所以在這整個流程里邊啊,返回一的就是成功了,零就是失敗的好。這是獲取鎖的一個流程了。

那我們再來看一下釋放鎖。釋放鎖呢也就是我們的下半部分啊,首先同樣是這三個參數啊,key數字名稱,線程標識鎖的釋放時間。一上來以后啊,同樣我們要先去做這個判斷,判斷什么呢?判斷這個鎖的標識是不是自己啊。那同樣呢就是拿著線程id留`EXISTS`去判斷啊,放完了后,如果是一就是自己,如果是零,那就不是自己。不是自己就走在這條路,邏輯吧,代表鎖怎么了,是不是已經被釋放掉了?唉,沒錯啊。好然后往下走,這個時候呢那就證明鎖是自己的啊,因為上面不成立,往下走是不是鎖是自己。如果鎖是自己怎么辦,能做出簡易除數減一怎么減,還是`HINCRBY`,只不過這里改成-1啊,`HINCRBY`是自增嗎,增減增-1不就是減一嗎?哎,重復次數減一完了去判斷標識啊,是不是零?那這個時候就是看它是否大于零啊。但如果大于零,證明什么,證明現在還沒有到最外層,是不是應該去重置有效期啊?然后去執行業務呀。所以在這`EXPIRE`重置有效期。那這一部分邏輯,同學們其實就對應了這里開始啊,這一條分支對吧。那如果說我們最終的判斷它是零呢?是零是不是就可以去釋放鎖做這個`DEL`的操作了?所以啊這一段邏輯對應的,其實就是從這開始的這部分。ok,那整個腳本我們就看完了。

那么我們的Redisson啊是不是這么實現的呢?下面呢我們就一起跟蹤一下源碼來看一下。好,我們打開IDEA,在這呢我提前準備好了一個單元測試啊,這里是Redisson提供的一個鎖,然后呢在`method1`方法當中啊,我會嘗試獲取鎖,然后會去失敗了,就結束,會成功了,我就叫`method2`,在`method2`里再次獲取鎖,然后呢去執行業務,最后釋放鎖。這就是一個可重入的一個demo了對吧。下面呢我們在這打上一個斷點啊,去運行一下,看一下啊。在這呢我們走debug,好進入的點,然后這里呢我們去放行一下,走好回鎖成功是個`true`對吧。我們打開這個桌面客戶端刷新一下,可以看到呢這里有個`order`的,因為我們鎖名稱起的是`order`,可以看到這里邊有一個key,這個就是那個鎖的標識啊,后面呢是一個value值為一,就是重復次數為一嘛。這樣呢我們往下走,然后走走進入`method2`,進入`method2`,又一次會鎖,你看又是`true`,對不對?那這個時候我們再來看一下Redis,好,來在這刷新一下啊。好大家看看到值是不是變成二了,也就是說重入次數是不是加了一哎,沒問題啊。好往下走,然后拿走走到這里往下釋放鎖,那么釋放鎖以后重置次數應該會減一,對不對?我們再來看一下,在這里啊,再去重置一下,看是不是減一了?唉,沒有真的刪除啊,是減一,然后出來出來以后又回到了誰,回到了`method1`,在`method1`里,他往下走也會去釋放鎖,那么這次釋放鎖再看好,回來這里再點一次是不是就沒有了?說明這個鎖被刪除了。所以這就是我們剛剛分析的那個邏輯啊,通過重復次數來去記錄這個可重入鎖好,整個流程呢就分析完畢了啊。

來看一下整個這里也是按照我們剛才分析的,先是鎖`method1`,然后呢鎖`method2`執行業務二,世貿鎖`method2`執行業務一,世貿鎖`method1`,就是按照一個重復的一個流程吧,沒有問題。那么我們也可以去跟蹤一下我們Redisson的源碼,來看一下哈。`tryLock`,那么在`tryLock`里,我們往下跟找到它的實現類啊,這里我們采用的是一個最普通的`RLock`對吧。好在這呢掉了一個`getLock`,里邊有個`tryLockInner`,我們根據這個在這個方法里面繼續融入進去,這里呢`tryAcquireOnce`走,那在這里呢他會先判斷這個`leaseTime`是否等于-1,這`leaseTime`呀,就是我們之前講過的鎖自動釋放的一個時間。這里其實我們是沒有傳的,那在沒傳的情況下,大家可以看到他給的是一個-1啊。所以說呢在進入這個方法以后啊,他去判斷的時候,這個地方肯定不成立,那于是呢就會往下走,走到這行,那這一行呢他會去`tryLock`,也就是嘗試獲取鎖,所其實啊跟這塊兩個邏輯是一樣的,只不過呢如果說你給了釋放時間,他就會用你給的那個值啊。如果你沒給,他會走一個默認值。所以最終呢都是會進到這個方法啊,那我們跟進去好,還是這個`releaseLock`。進來了,進來以后呢,大家就會看到這里有一段邏輯,這段邏輯啊就是一個Lua腳本,我們可以看到Redisson呢是把這個腳本啊通過字符串也是直接寫死的,不需要我們之前是通過一個文件啊,啊這種方式其實也可以。那這段腳本呢大家如果仔細去看啊,跟我們剛剛PPT上的腳本就是一樣的。首先判斷鎖是否存在,如果等于零,代表不存在,不存在怎么辦呀?哎,當然是去獲取鎖了。他在這個地方是直接`HINCRBY`了,也就是自增一自增一啊,如果你不存在,它會自動幫你創建啊,所以跟那個`HSET`的效果是一樣的。緊接著設置過期時間,然后呢結束好,這是不存在,直接回溯。那如果存在呢,存在也不要緊,我判斷一下這個鎖是不是自己等于一,代表是自己是自己怎么辦,同樣是加一,然后試著過期時間,然后結束。所以這兩個就是獲取鎖成功的情況。那其他情況就是失敗了。

好,那么這個流程就看完了。那然后呢我們再來看一下釋放鎖啊,我再回來。那釋放鎖上走的是這個`unlock`,我們跟進去。那么`unlock`能組的`RLock`啊,首先在這里調用這個`unlockSync`,繼續跟入。在這里我們繼續跟這個`unlockEnterSync`,好釋放,同樣是找到這個`RLock`里面。大家可以看到同樣有一段Lua腳本,

那么這段Lua腳本啊,就是我們剛剛分析的一段啊。首先一樣先判斷鎖是不是自己的啊,是否存在好。如果說呢它等于零,就是不存在,那就直接結束了啊。因為這種就是說鎖早就已經釋放了,你就不用管了。然后呢哎讓他鎖抽的次數減一,你看`HINCRBY`,然后-1,這就是減一的意思。解完了以后啊,判斷一下鎖重入次數是不是零。如果大于零啊,證明好我們現在還不在最下層,怎么辦,重置有效期`EXPIRE`,那這一部分邏輯,同學們其實就對應了這里開始啊,這一條分支對吧。那如果說我們最終的判斷它是零呢,是零是不是就可以去釋放鎖做這個`DEL`刪除他。這里還執行了一個`PUBLISH`,這個是干什么的?后面我們會講他其實是發了一條消息啊,也就是去通知大家這個鎖我釋放了。那有什么用呢?后邊我們再去給大家講好了。

我們回到PPT,到這兒呢啊Redisson可重入鎖的這個原理,我們就分析完畢了。獲取鎖和釋放鎖的整個流程啊,在這里通過這幅圖啊就給大家講述完畢了。其核心就是利用一個哈希結構啊,然后去記錄獲取鎖的線程以及重入的次數,與我們啊`ReentrantLock` JDK里面提供哪個,它的原理是一致的。好了,那我們這節課的內容啊,就到這里。下節課呢我們繼續去分析啊Redisson它提供的其他的一些功能。

redisson如何解決三大問題?

01
不可重入
同一個線程無法多次獲取同把鎖

02
不可重試
獲取鎖只嘗試一次就返回false,沒有重試機制

03
超時釋放
鎖超時釋放雖然可以避免死鎖,但如果是業務執行耗時較長,也會導致鎖釋放,存在安全隱患

04
主從一致性
如果Redis提供了主從集群,主從同步存在延遲,當主巖機時,如果從并同步主中的鎖數據,則會出現鎖實現

?實戰篇-20.分布式鎖-Redisson的鎖重試和WatchDog機制_嗶哩嗶哩_bilibili詳細闡述的RedissonLock.java的完整流程的全部代碼,哎呀感慨前人的智慧,聽說代碼隨想錄星球里面還有什么手搓spring,RPC哎呀我操,這就是cpp選手的高深,重視底層代碼

實戰篇-21.分布式鎖-Redisson的multiLock原理_嗶哩嗶哩_bilibili

好同學們,經過前面幾節課的學習啊,我們已經分析了Redisson分布式鎖如何實現啊鎖的可重入啊,還有鎖獲取時的自動續期,以及啊鎖釋放時間的一個自動續約。解決了前面的這三個問題哦,但是現在啊還有最后的一個問題,就是主從一致性的問題還沒有得到解決。所以呢這節課我們就一起來分析一下啊,Redisson是怎么解決主從一致性問題的。

首先呢我們先來分析一下主從一致性問題產生的原因。在之前所有的案例當中啊,我們采用的都是單節點的Redis。大家不妨思考一下,如果這一臺Redis發生了故障啊,那么會引發什么樣的問題?那可以想象的是,我們所有依賴于Redis的業務是不是都會出現問題,包括我們的分布式鎖。那在一些核心業務當中,那這樣的情況肯定是不允許發生的。所以說啊為了解決這個問題啊,提高Redis的可用性,往往在實際的應用當中,我們會覺得搭建Redis的主從模式。

那什么叫做主從呢?其實呢就是由多臺Redis,只不過呢他們的角色不同,有一臺呢我們可以把它作為主節點,而剩下的呢作為從節點。而主從他們的職責也不一樣,往往會做讀寫的分離。也就是說呀,在主節點里用來處理所有Redis的寫的操作,比如說增刪改,而從節點只負責處理讀的一些操作。那這樣就有問題了呀,既然主節點處理寫操作,那所有的數據都是在主節點里存在的,Slave節點,也就是從節點沒有數據,怎么來處理讀的請求呢?所以呢主和從之間啊需要做數據的同步,主節點啊會不斷的把自己的數據同步給從節點,確保主從之間數據是一致的。但是呢畢竟啊不是在同一臺機器,它主從之間會有一定的延遲。所以呢這個數據的同步啊也會存在一定的延時,盡管這個延時很短,但是它也存在啊。我們所說的主從一致性問題啊,正是因為這樣的延時而導致的。

比方說啊現在我們有一個Java應用啊,他現在要來獲取鎖,要執行一個`SET lock t1 1 NX EX 10`這樣一個命令,這不就是一個寫操作嗎?那這個操作執行到主節點的時候,主節點上就會去保存這樣的一個鎖的標識,`t1`。而后啊,主節點就會向我們的從節點進行同步。但是就在此時主節點發生了故障,也就是說同步尚未完成,這個時候我們Redis里會有哨兵啊,去監控集群狀態。當他發現主節點當機的以后,那首先客戶端連接會斷開,而后啊,它會從Slave當中那兩個從節點點去選出一個,作為新的主節點。但是因為之前主從同步未完成啊,也就是說鎖已經丟失了,對不對?所以此時我們的Java應用,再來訪問這個新的主節點時就會發現,鎖就沒有了。也就是說鎖失效了,此時啊再有其他線程來獲取鎖也能獲取成功,是不是就會出現并發的安全問題了啊?這就是主從一致性導致的鎖失效問題。

那么Redisson又是怎么解決這個問題的呢?唉,他的思路啊非常簡單粗暴,既然主從關系是導致一致性問題發生的原因,那干脆我就不要主從了唄。我的所有的節點都變成了獨立的Redis節點,相互之間沒有任何關系,沒有主從啊,都可以去做讀寫。那么此時我們獲取鎖的方式就變了,以前我們獲取鎖只需要找到Master節點,然后在它里面獲取鎖就行了。但是現在我們必須依次的向多個Redis節點,都去獲取鎖啊,不管你是三個Redis節點還是五個Redis節點,必須依次都獲取鎖,都保存了這個鎖的標識,才算是你獲取鎖成功。

那此時會不會出現安全問題呢?首先啊因為我們沒有主從,所以是不是不會有主從一致性問題。其次可用性,現在真的有一個節點宕機了,我們的Redis可不可以用呢?哎,還是可以用的呀,因為你宕機了,我們還有倆節點的嗎。那只要這兩個節點在存活著,我的鎖是不是依然有效啊。而這個可用性是不會隨著節點的增多,而越來越高啊。所以如果你覺得三個不夠,你是不是可以加五個節點。所以我們不僅僅解決了主從一致性的問題,是不是也保證了它的可用性啊。

當然如果你覺得這樣還不夠,你還想讓它的可用性更強一點,說如果現在掛一個,我就少一臺啊,那怎么辦?沒關系,我們也可以給這里的節點去建立主從關系,讓他們去做主從同步。那加了主從同步以后會不會出現安全問題呢?其實不會,讓大家思考一下,現在假設說真的有一個節點發生了宕機,就他那他宕機的時候剛好沒有完成同步啊,那也就是說現在這個Slave上是不是沒有這個鎖的標識。現在呢利用我們之前所說的原理啊,那么這個Slave點就會變成新的主節點,對不對?那它下面沒有鎖標識,此時有一個線程趁虛而入,想要來獲取鎖,能不能獲取成功?答案是不能,因為我們說了,只有在每一個節點都拿到鎖才算成功。現在盡管有一個節點是空的,你能在他這獲取鎖,但是其他兩個節點能不能拿到,還不能吧。所以說呢,你最終還是失敗的。那也就是說只要沒有任意一個節點在存活著,那么其他線程就不可能拿到鎖,就不會出現鎖失效的問題。

那這樣一套方案呢,它保留了這種主從同步機制,確保了整個Redis集群的這種高可用的特性,同時也避免了主從一致引發的鎖失效問題。那這套方案呀,在Redis里有一個名字叫`Multi Lock`啊,也就是連鎖,把多個獨立的鎖聯合在一起,變成一個聯合起來的鎖。而我們在使用這樣的鎖的時候啊,也是比較靈活的啊,你可以就弄幾個獨立節點啊,不建主從關系也是可以的,你也可以建立主從關系啊,讓它的可能性變得更強。

那接下來呢我們在測試的時候就不去做主從了,因為這樣太復雜了,需要的節點太多了啊,就需要六個了對吧。在這兒呢我們就去整三個獨立節點來去測試這個連鎖就可以了。那我提前呢已經準備好了,我利用虛擬機已經搭建好了三個Redis節點啊,可以看到呢這里我提前已經建立好了連接了啊,第一個是我們最早搭建的那個6379的啊,然后呢再往下,這里有一個6380和16381啊,這兩個是新的啊,提前準備好的。那將來呢我們就利用這三個節點啊,三個獨立的節點形成一個連鎖好。

那下邊呢我們就去編寫這個代碼,我們打開IDEA,首先呢我們需要去配置這個Redis客戶端啊,來找到這個`RedisConfig`,因為我們有三個獨立的Redis客戶端的節點啊,那現在呢其實只配了一個,對不對,端口是6379。所以說呢我們需要把這個復制一下,還是C啊,然后粘貼兩個啊,然后名字也不一樣啊,這三個bean它們的類型是一樣,但名字必須不一樣,分別叫二和三吧。然后呢端口是6379,這塊呢是6380,還有一個呢是6381。然后呢這塊密碼就不需要了,因為我們這塊新安裝的這兩個Redis節點,我沒有配密碼啊。所以這樣我們就把三個獨立的Redis客戶端給配好了,那么將來呢我們就利用這三個,分別獲取三個獨立的鎖,把這三個獨立鎖合在一起變成連鎖。

RedissonConfig/*********************************************************/
@Bean
public RedissonClient redissonClient2(){Config config =new Config();config.useSingleServer().setAddress("redis://198.168.111.130:6380")return Redisson.create(config);
}@Bean
public RedissonClient redissonClient3(){Config config =new Config();config.useSingleServer().setAddress("redis://198.168.111.130:6381")return Redisson.create(config);
}

那怎么做呢,來我們的單元測試好,在這個地方我們去做這個連鎖啊。那怎么去創建連鎖呢,首先大家可以看到,在這我們只注入了一個Redis客戶端,現在不行,需要有三個啊,分別是這個二和三啊。因為你在這邊配的名字是叫二和三,對不對。好,有三個client,那將來就可以獲取三個獨立的鎖。在這呢我們給它的名字也都改了,分別叫lock1,然后lock 2和lock 3。

RedissonTest@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;/********************************/
void setUp(){RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient2.getLock("order");RLock lock3 = redissonClient3.getLock("order");//創建聯鎖multilocklock = redissonClient.getMultiLoc(lock1,lock2,lock3);
}

那對應到這邊啊,分別就用二和三啊來去獲取。這樣呢,這也就是三個獨立節點對應的三個獨立鎖,對不對。好,那接下來才是真正去創建連鎖好,那么這個連鎖叫`MultiLock`,怎么創建的呢,有多種方式啊。那給大家來個最簡單的`lock`,等于啊`RedissonClient`點啊,`getMultiLock`,可以看到它的參數列表里邊是個可變參數,接受了多個lock,對不對。那我只需要把這三個lock,依次啊給它放進去就可以了。那這樣一來呢,它就自動的幫我們把這三個連在一起,變成連鎖了,非常的簡單吧。

那有同學會說了,你在這里為什么用了`RedissonClient`來干,而不是用這兩個去get呢?哎大家來看啊,其實你用哪個都一樣跟進去啊,`ctrl`加`b`進入Redisson的getmultilock函數,進來以后你會發現這里是new的,看到沒有。那我去調的時候,不管用誰調,最終是不是都是牛啊,甚至于我將來我自己扭行不行,沒問題。所以說呢其實用哪個client來調,沒有區別好。那傳進來的是一個可變參數,我們知道可變參數是數組,數組傳進來以后干了什么事,可以發現啊,這個可變參數啊被它轉變成了一個集合,然后一起添加到成員變量`locks`里去了。那么這個`locks`就是個集合唄,也就是說我們的多個獨立的鎖,是不是都知道這個集合里了。那按照連鎖的原理,將來啊他在獲取鎖時,是不是應該依次啊把這個集合里邊的每一個鎖,都嘗試去獲取一遍,都成功了才算成功呀。是這樣子的啊。

好啊,那么這塊看完了,我們回去啊。那拿到鎖了以后,它的使用方式上跟以前沒有區別,這代碼都不用動啊,直接就可以,這樣就行了。那下面呢我們就先測試一下,看看是不是真的能獲取鎖啊,然后我們再去分析它的原理,看看跟我們想的是否一樣。那我們第八個運行走,好那現在進入斷點了啊,那我們先放行一下走,那大家可以看到這個地方是true,那證明獲取鎖是不是成功了。那我們呢打開這個Redis端看一下啊,首先在這刷新,那我們可以看到這多了一個`multiLock`嘛,啊前面標識后面是不是那個鎖的重置次數。大家看這個啊,同樣這里會多出來一個,你看是不是在這三個節點上都獲取到鎖了,哎沒問題哦。好然后我們放心啊,放行,其實可以進到`method2`,然后進到這邊去獲取鎖,再來一次好,我們再走又成功,此時我們再來看一下好,這邊是不是變成二了,說明是第二次重入嘛,對不對,這里也是二啊,可以看出來連鎖中的每一個獨立的鎖啊,都是一個可重入鎖啊,與之前的實現啊沒什么差別啊,只不過把三個聯合起來分別去獲取而已啊。再回來,那我們去釋放鎖,那釋放鎖啊,第一次釋放鎖重入次數是減一,對不對,回到Redis里看一下,好變成一變成一,你看是不是都剪掉了,然后再回答IDEA,再釋放一次,到這邊直接放行吧,好全部釋放完成,那么這個時候我們回到Redis里發現沒有了,對不對,說明這個鎖是不是被刪掉了啊,真的釋放掉了。

看到這兒大家就發現了啊,所謂的連鎖就是多個獨立的鎖,而每一個獨立的鎖,就跟我們之前講的原理是不是一模一樣的,哎沒錯,正是如此。那我們接下來呢回答IDEA,通過源碼的跟蹤啊,我們再來看一下它的一個實現方式,是不是我們所想的那樣。我們跟入這個`chooseLock`,這次就不要跟這個了,跟下邊這個`multiLock`了。跟進來以后呢,這里有一個`tryLock`,你可以看到第一個是等待時間,第二個時間單位,而這個釋放時間咱沒傳,沒傳,就是-1唄。跟進去根據以后呢,哎進到這里了,這就跟以前有一些差別了,同學們一上來這一段是在干什么呢?這段其實是在判斷你有沒有傳釋放時間,這個釋放時間啊,如果不等于-1,代表你除了對不對。好,那我們其實有沒有傳,我們沒傳沒傳,是不是就往下走了,哎沒錯啊。那如果你傳了呢,如果你傳了的話,那么他進來以后會判斷一下`waitTime`是否是-1,如果`waitTime`是否一說明什么,說明你只想獲取一次,也就是說不重視好。那你的釋放時間是多久,我就用多久。但是如果`waitTime`不等于-1,說明什么,說明你想要去做重視啊,所獲取失敗我要重視。那要重視,這個時候他就不會用你原來的釋放時間了,他會用你`waitTime`乘以二。為什么呢?因為重視可能耗時較久,萬一你的生長時間小于等待時間的話,那我還沒重視完了,你就是忘了,這不有問題了嗎?所以說呢,他在這里會放棄你這個類似`time`,而是用`waitTime`代替啊。好,這是對于一個釋放時間的處理這塊,我們不管啊,因為我們傳的是-1乘-1,這就不成立,對不對,那就往下走。

那么往下走的話呢,大家會發現在這里啊,他會先獲取當前時間,接下來呢這里有個`remainTime`,`remainTime`就是剩余時間初始化是-1,但是呢如果`waitTime`不等于-1,它其實就會用`waitTime`代替這個`remainTime`,也就是說這個`remainTime`就是剩余的等待時間。緊接著又來了一個叫`calc`,`calc`計算鎖的等待時間,這又是怎么回事呢?你跟著這個方法,你會發現啊,在`math`里它原封不動返回了,也就是說其實這個鎖等待時間和剩余等待時間,他倆是不是一樣的呀?啊沒錯啊。那接下來再往下有一個叫`fieldLocksLimit`,失敗的鎖的一個限制,那這個值是多少呢?跟進去一看就知道了,好是零,也就是說鎖失敗的限制是零。這東西有什么用,等一會兒你就知道了。往下接著看,這個地方有一個集合名字叫`acquiredLocks`,`acquire`是獲取的啊,過去是什么,代表已經獲取的,所以這個集合里保存的呀是獲取成功的。那他剛創建出來肯定是零,對不對,那么接下來開始for循環遍歷遍歷遍歷`locks`,點`iterator`,這不疊在一起嗎,`locks`是我們三個獨立的鎖,對不對,那這里其實是在便利我們的三個獨立的鎖,便利完拿到每一個鎖,那接下來該干嘛了,同學們按照我們說的,是不是應該依次去獲取每一把鎖呀?所以進來以后他就開始去獲取了啊,那或許呢因為有兩種啊,第一種呢就是空才沒有傳播的`time`,沒傳位的`time`呢,那它其實就嘗試一次,所以說呢他在獲取鎖時候是空倉的,`tryLock`,這個咱們以前是不是見過空三,就是指一次啊不從事,但是如果你傳了`waitTime`呢,那就是走下邊就是帶有重視的`tryLock`,傳等待時間,還有釋放時間的啊,這兩個源碼咱們之前是不是都跟蹤過的,所以這里就不再贅述了啊。那不管怎么樣,最終你踹了以后會得到一個結果,那么這個結果啊他這里是布爾值啊,也就是true或false代表會有所成功,或者獲取鎖怎么了,失敗好。那拿到這個結果了以后呢,往下走,他會做一個判斷判斷獲取所有沒有成功,那這個地方如果是true,那不就成功了嗎?成功了就把當前鎖放到這個`acquiredLocks`里,就是什么,就是啊已經成功的鎖的集合里邊,沒錯吧,已經成功的鎖在集合里啊。然后那`else`是不是會有失敗,`else`咱們先不管了,咱們先管成功,那我鎖成功了,我就添加進去,添加進去,完了往下走,這塊是判斷這個`remainTime`,剩余等待時間是不是-1,那如果不等于-1,說明啊現在剩余時間是不是很充足,那么我就會去做一個計算啊,我用當前時間減開始的那個時間,這以前咱們是不是見過呀,它是計算獲取鎖的一個耗時,那我用剩余的時間減去耗時,得到的是不是就是現在剩的時間呀,那現在剩的時間是不是小于零啊,如果小于零,證明啊剛才獲取鎖已經把等待時間給耗盡了,那我們知道啊,等待時間耗盡了,代表是不是鎖超時了,那因此只能`return false`代表失敗,但是在`return false`之前,他干了一件事啊,他會先去把`acquiredLocks`啊,也就是已經獲取到的這些鎖給他干嘛呢?`unlock`釋放掉,因為你已經失敗了呀,你前面拿到的鎖就不能不能再拿著了,因為你拿著別人是不是拿不了了,所以給他干脆釋放掉就算了,那如果時間還很充足呢,哎那沒關系,我是不是可以繼續去拿下一把鎖呀,那到這剛好for循環結束了,來往上哎,是不是剛好進行下一次for循環了,好,那下一次for循環來又干嘛,再拿到鎖,再嘗試獲取鎖好,再判斷鎖有沒有成功,如果成功添加了這個已獲取的鎖里,然后再判斷一下剩余時間,剩余時間還充足嗎,哦充足充足,我再進行下一次for循環好,再來獲取下一把鎖,這樣直到我把所有的鎖都拿到為止,那么這樣一來,我們的這個集合里是不是就拿到所有的鎖了,然后等for循環結束,`return true`,是不是就結束了?當然這是鎖成功啊。

那么有沒有可能說我第一次來拿鎖就失敗了呢?上來第一個啊,便利的第一個`lock`的數直接失敗了,這個值是個`false`,有這種可能性吧。好,那如果他是`false`,走的是什么,走的是`else`,他進了`else`以后,他會做什么呢?首先做個判斷,判斷一下`lock.size`,這個是鎖的總的數量,對不對,比如說我們這里是三再去減去啊,已經獲取的鎖的數量,那因為你前面獲取都失敗了呀,所以這個是不是零啊,那3-0的是不是就是三,它會判斷它等不等于這個東西,這個東西咱們以前看過這是什么呀,是不是鎖失敗的一個上限,失敗的上限是幾呢?我們之前看到過是零,對不對,那也就是說這個地方啊,因為我們一個都沒拿到,這里是零,前面是三三減0=3等不等于零,不成立啊。什么時候成立,只有已經獲取的鎖的數量等于鎖的總數量,這個減完是不是才是零,才能`break`跳出循環。那也換言之,只有你把所有的鎖都拿到了,是不是才能結束,否則能不能結束啊,就不能,那不能就干嘛往下走,往下走了以后判斷他是不是零啊,是零是零的話啊,假設說你已經拿到鎖了,這里是不是把已經拿到鎖給釋放掉啊,然后判斷一下`waitTime`是否是-1,`waitTime`,如果是-1,證明什么,證明你是不想做重試的,不想重試那一次失敗了,直接失敗結束了好。那如果你想重試呢,想重試好辦呀,我先把你現在已經拿到鎖的清空啊,不管你已經拿到幾把了啊,你現在假如說你一把都沒拿到,反正我也給你清空,然后把迭代器往前迭代,其實就是把這個指針是不是指的第一個,為什么你要重試啊,重試是不是要從頭再來,從第一把鎖開始,所以這里把指針重置,然后好結束啊,又開始for循環重來,這樣呢你是不是可以重頭再試了,要么呢我們把鎖的鎖都拿到結束,要么就是失敗或重試,知道什么呢,就是這個`waitTime`小于零,超時為止,這就是啊整個我們獲取鎖的一個邏輯了,跟我們之前分析的是不是差不多好。

那等所有的鎖都拿到了,最終肯定要`return true`嘛,對不對,但是在`return true`之前啊,他還有一段業務邏輯,我們可以看到在這兒他會判斷啊,這個`leaseTime`是否等于-1,那這個又是干什么呢?這個`leaseTime`啊是鎖的釋放時間啊,我們之前是沒傳的,是-1,所以`if`不成立,是不是直接往下走了,但是假設說你傳了`time`呢,那就意味著你需要自己指定鎖的釋放時間,在這種情況下,他會干什么事呢?我們進來,你會發現他會去便利已經拿到的每一把鎖,然后給他執行`EXPIRE`命令,我們知道`EXPIRE`是不是設置鎖的有效期的意思,那也就是說它會給每一個鎖都重新設置一下有效期。為什么要做這件事呢?同學們,我們獲取鎖的時候,會有多個Redis所需要依次獲取第一把獲取的鎖,在獲取之后立即就開始倒計時了,而最后一把鎖是剛開始倒計時,也就是說在這個集合內的多把鎖啊,其中第一把他的剩余有效期,一定會比最后一把的剩余有效期要短一些,對不對,這樣呢就有可能會出現一個問題,就是有些釋放了,有些沒釋放的情況。那么為了避免這個問題的發生,他在這干了一件什么事情呢,等所有鎖都拿完了,我再重新給每一把鎖都配一下有效期,確保大家的有效期是一樣的。那為什么只有在不等于-1的時候,才需要做這些事呢?同學們,當`leaseTime`等于-1的時候會觸發什么呀?我們之前講過是不是會觸發看門狗機制啊,那有了看門狗機制,所有鎖的有效期都會自動去續,因此需不需要你這里就處理就不需要了。所以說啊我們建議大家一般這個釋放時間啊,就不要設置好,讓他走看門狗,它們最終呢`return true`結束。

那么我們獲取鎖的邏輯啊,也就分析完畢了。那我們回到PPT啊,最后呢我們來做一個總結啊,在分布式鎖這個章節,我們其實啊總共講了三大類鎖啊。那第一類呢就是不可重入的Redis分布鎖,這種其實就是我們最早自定義的那種嘛,那它的原理是利用了`SETNX`的互斥性,然后利用`EX`設置過期時間,避免死鎖。在釋放鎖的時候啊,去判斷線程標識啊,避免誤刪。那它的缺陷啊就是不可重入啊,而且呢無法實現鎖的自動續期機制啊,并且呢鎖超時啊,可能會有這樣的一個自動釋放的一個風險啊。但是它的實現方式非常簡單啊,在大多數場景下還是比較實用的啊。只不過呢對于有可重復性需求,重置需求,或者是安全性要求較高的業務場景,就不太適合了,此時我們可以選擇可重入的Redisson鎖。

它的原理是利用了哈希結構記錄線程標識和重復次數啊,這樣就可以實現可重入。再利用watch dog延續這個鎖的一個剩余時間,不斷的去重新續約,確保這個鎖不會因為自動超時而釋放,除非是服務宕機。最后呢,再利用這個信號量去控制這個鎖的一個重試機制啊,利用發布訂閱這樣一個方案。那這種方案的話呢,對于CPU的利用率也比較高,不會無效的這種等待啊,所以呢它的性能啊各方面都還是不錯的。但它的缺陷啊就是Redis宕機引起鎖失效的問題啊,就是主從的一致性問題。那要解決這個問題呢,最后我們又給大家講了Redis里的`MultiLock`,也就是連鎖。那連鎖機制呢它是利用多個獨立的Redis節點啊,那這樣一來節點之間沒有主從關系就不會啊,因為主從一致導致鎖失效了。但是在獲取鎖的時候,就不是說只獲取一次了,而是要在所有節點都獲取成功才算成功。你可以把`MultiLock`看成多個啊,可重入鎖的一個集合。那么它的缺點呢就比較明顯了,因為成本比較高。當然你可以建立三個獨立節點,那最少得三個啊,啊當然大于三個,比如說五個以上會更好。而且呢你也可以給節點再去建立主從關系啊,這樣可以提高可用性,只不過就是成本會更高一點,實現起來會更加復雜一點。但是呢是所有里邊最安全的一種方案了。

好了到這呢分布式鎖的所有知識啊,我們就講完了。之前呢我們也用Redis的分布式鎖,代替自定義分布式鎖,實現了一人一單,測試呢也沒有什么問題。那到這為止啊,整個秒殺相關的業務我們就做完了。但是啊正是因為我們加入了各種各樣的鎖,秒殺業務的性能呀也會受到巨大的影響。所以說啊,我們后續還需要對秒殺的業務,去做進一步的優化,從而提升它的性能。那具體該怎么去做呢?

實現異步秒殺

視頻里的代碼如下,沒有換源,實際上還是存在多線程問題的

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券拿到信息;其實后續Redission可以直接用信號量來鎖庫存SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock() < 1) {//庫存不足return Result.fail("庫存不足!");}return createVoucherOrder(voucherId);}@Resourceprivate RedissonClient redissonClient;@Transactionalpublic Result createVoucherOrder(Long voucherId) {//6.一人一單Long userId = UserHolder.getUser().getId();//分布式鎖創建鎖對象RLock redisLock =redissonClient.getLock("lock:order:"+userId);//嘗試獲取鎖boolean isLock =redisLock.tryLock();if(!isLock){//獲取鎖失敗,直接返回失敗或者重試return Result.fail("不允許重復下單");}try{//6.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//6.2.判斷是否存在if (count > 0) {//用戶已經購買過了return Result.fail("用戶已經購買過一次!");}//5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {//扣減失敗return Result.fail("庫存不足!");}//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用戶idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回訂單idreturn Result.ok(orderId);}finally{//釋放鎖redisLock.unlock();}}}

?他準備了1000個用戶(在sql里面已經保存了)并且登錄在redis中

先回顧一下咱們秒殺的業務流程

我們的秒殺業務能接收一個`voucherId`,就是當前這個用戶正在搶購的優惠券的ID。而業務功能呢核心是兩個:第一呢我們要去扣減這個優惠券的庫存,而第二呢就是將用戶搶購的優惠券信息寫入訂單,完成訂單的創建。只不過因為是秒殺業務,所以呢他需要加入一些業務的限制啊。首先就是庫存絕對不能出現超賣現象,必須判斷庫存啊是否充足;第二呢還要限制呀,一個用戶對于一個優惠券只能下一單,也就是一人一單的功能啊。但因此整個業務啊就變得復雜了。

首先啊,要想判斷優惠券的庫存,你得先查詢優惠券吧?那這個呢肯定是訪問數據庫的啊。查到了以后啊,去判斷一下秒殺的時間,判斷一下庫存是否充足,如果不足,直接報錯啊,避免超賣。那如果充足的情況下,我們再去嘗試下單。那接下來呢其實就是對一人一單的這個判斷了。那一人一單怎么去判斷呢?啊,我們其實就是看一看,在數據庫里有沒有這個訂單。但是啊,可能存在并發安全問題,所以在這我們利用了分布式鎖,首先嘗試去獲取鎖,獲取成功再去做這個判斷。而判斷的時候,就是看一下當前用戶有沒有買過這個券啊,拿著用戶ID和券的ID啊,去查詢用戶訂單表,查完以后看一看存不存在。如果說呀已經存在了,那就證明他買過了,那肯定拒絕他呀。而如果沒買過,那才可以繼續。這個時候放心大膽的去扣減庫存和創建訂單,就可以了啊。最終業務結束,釋放鎖啊,整個流程也就完成了。

那大家可以看到啊,其實雖然整體來看業務非常的復雜,但是核心就是兩點啊,哪兩點呢?第一是對于購買資格的一個判斷啊,就是這個用戶他能不能買啊,能不能買依據什么依據是庫存夠不夠,還有這個人買沒買過。第二才是真正下單啊,就是學扣庫存和創建訂單。但是呢,因為那這里邊呢有大量的數據庫的操作啊,所以說呢整個業務它的性能并不是很好。那么我們做一個測試看一看啊。

嗯,在這里啊,因為我們要測試高的一個并發,所以說呢我們不能用一個用戶或者兩個用戶去測啊。在這兒呢我們打開數據庫看一下,那我提前準備了1000個用戶,可以看到這里有1000條用戶。而且呢這一新用戶我提前已經讓他們登錄了啊,我們看一下Redis。在這里我提前準備好了這些用戶的登錄token啊,提前就創建好了。等一會兒呢,我們就會用這1000個token去發起請求啊。

好我們再回到數據庫,在數據庫里啊,秒殺的庫存我已經提前改成了200,然后呢訂單表里面也是空的啊。那接下來呢我們就去利用JMeter進行一個壓測,看看目前這個接口的性能怎么樣。打開JMeter,在這個地方啊,我準備了1000個請求。那我們怎么樣去得到這1000個用戶的token呢?大家應該知道啊,我們是把這個登錄token放在請求頭里的,這個請求頭啊,之前我們是寫死的,你發現現在我沒有寫死了啊。他從哪來的呢?這里寫了個`${token}`的,這里是引用了一個名為token的變量,這個變量從哪來的呢?在這tokens,那么這個tokens里,大家可以看到它會去讀取這個文件啊,這個文件在我們課件資料里叫`token4.txt`。打開看一下,好在這里啊,好,大家可以看到這個里邊,就是我提前準備好了1000個token啊。

那現在呢我們把這個文件準備好,告訴了JMeter,那么JMeter就會去讀取這里面的每一行數據啊,然后把它復制給一個變量,變量名就叫token。那這樣一來呢,我們在請求頭中就可以利用美元符大括號引用這個變量了。那這樣他發起的請求啊,每一次請求就可以使用不同的token了啊。我們有1000個請求,然后呢是200個庫存啊,我們來發起一下走。我們可以看到很快業務就結束了,在這個地方呢可以看到整個這個業務啊,它的一個情況。首先這里的最小值,最大值是我們業務的顯示時間啊,可以看到最小的時候顯示時間126ms,最大值800多毫秒。那為什么這里的波動這么大呢?哎,這就是因為我們這個并發利用JMeter模擬的,它不是真的一次性發了1000個請求啊,它會有一個時間一開始少,后來越來越多啊。所以說呢,他在隨著這個并發的量越來越高的情況下,他的這個請求的響應時間會越來越長。因為并發越高,CPU呢就需要在多個線程之間進行來回的切換,所以這個時候呢會導致某一個請求,他處理的耗時就會增加。你可以看到平均耗時竟然達到了400多毫秒,所以說我們這個業務其實還是比較慢的啊。那這里的吞吐量是1000每秒,這里看好像還可以啊,其實隨著并發越來越高,這個值只會越來越低啊。

好,這是我們現在做的一個測試。那接下來呢我們就要對它做優化了,那么具體該怎么樣去做優化呢?我們回到PPT啊來去做一個分析啊。首先先回顧一下我們整個秒殺業務的流程啊,那我們前端發起請求到達我們的Index,我們的Index會把請求啊負載均衡到我們的Tomcat,而在他們干的內部啊,我們的業務流程是這樣子的:查詢優惠券啊,查的目的為了做庫存的判斷,庫存判斷如果沒問題再去查訂單,這個呢是為了做一人一單的檢驗。那如果這個卷還沒有問題,就可以真正的去扣減庫存,創建訂單了。而在整個這個業務流程當中啊,它是串行執行的啊,也就是說一個執行完再往下執行,是這樣子的。所以呢整個業務耗時啊,其實就是每一步的耗時之和。但是在整個這個流程里邊,其中查詢優惠券、查詢訂單啊,還有減庫存、創建訂單,這四步都是走的數據庫。而我們知道啊,數據庫的并發能力本身就是比較差的,更何況這里減庫存和創建訂單還是對數據庫的寫操作。另外呢,為了避免安全問題,我們這里還加了分布式鎖。那整個業務的性能可想而知啊,所以呢我們整個業務的耗時就變得比較長。那并發的能力呢就比較弱了。

那現在我們該怎么樣去優化它,提高我們這個業務的并發能力呢?哎,在這呢我給大家舉一個例子啊,這個例子呢能給我們一個很好的提示啊。比如說我現在呢開了一個飯店啊,然后呢成本有限,我就請了一個人啊,一個小姐姐。那她既負責去接待顧客,還得去做飯啊。現在我們的業務流程是這樣子的啊,有一個顧客來了啊,他首先呢去接待啊,你點餐啊,是你要吃什么呀,我得記錄一下。然后呢收錢,那接下來他做到后處理去幫這個顧客做飯啊。那做好了以后呢,哎再給這個顧客好,流程結束了。這個流程呢就跟我們現在這個業務啊有點像啊,這個小姐姐上來要負責接待顧客點餐下單,然后呢去做飯,做完最后把這個餐給了顧客才算結束。你看他一個人負責了一條龍的完整服務,所以說呢他接待顧客的業務流程啊,就是整個流程的所有耗時之和。那因為啊做飯其實是比較耗時的,所以就導致了接待顧客的總的這個時間呀,就變長了。那處理一個顧客就要耗這么久的時間,那么他在單位時間內,能夠接待的顧客的數量是不是就變少了,你的這個工作效率就變得極低。而且呢后邊的顧客可能等不及人家就走了,這些人啊不是我們希望看到的。那該怎么解決呢,哎大家都能想到了,我是不是得再請一個人啊。哎,小姐姐在前面負責接待顧客,后邊再請一個后廚來做飯。那這個時候的流程變成什么樣子的呢?

好,顧客來了,那小姐姐呢首先去接待一下,然后點餐啊,付錢把這個信息記錄一下,你要吃什么啊,交給后廚。后廚呢慢慢去做,到這小姐姐的工作已經結束了,來您旁邊請啊,旁邊等著來,下一位是不是還繼續重復這個過程就行了。那當然了,顧客越來越多啊,我們怎么去分得清楚誰點了什么東西,誰先到誰后到呢?所以呢我們在點餐以后啊,其實要給顧客一個小票上面記錄的誒,你是幾號,然后你吃了什么,點了什么對吧?但這個信息不僅僅給顧客一份啊,給后廚也一份。后廚那里呢其實就會有一個點餐的列表,貼在墻上。那這個廚師呢就會按照點餐的順序,依次去做這些東西,然后呢將來做完了以后叫號哎,是誰的,你來拿就完了。那大家來看,因為我們要把這個點餐收錢這一塊業務啊,他耗時較短,分給了小姐姐去做。然后呢做飯的義務啊,他是叫九分給了另外一個人啊,后廚去做。因此呢小姐姐的工作效率是不是大大提高。那這個飯店啊它能夠處理顧客的速度,是不是也提升了,單位時間內能夠接待是不是更多的顧客。那同理啊,咱們這個業務是不是也可以參考這種模式啊,把它分成兩部分,第一部分啊就是對于秒殺資格的判斷,也就是說判斷一下庫存是否充足啊,判斷一下一人一單這部分的耗時啊其實比較短,就相當于是小姐姐接待顧客的那部分。第二塊呢是減庫存下單,因為他們是對數據庫的寫操作,所以號是叫九,就相當于是后廚做飯的那一塊。而我們現在要做的事情,就是把這兩部分交給兩個人去做,那就別說了,那java代碼里怎么要交給兩個人呢?唉我們知道執行代碼的不是人啊,是線程。也就是說,我們只需要把這兩塊交給兩個線程就行了。那請求進來以后,主線程要做的事情啊,就是去判斷用戶他的一個購買資格,然后如果他有購買資格,我們還可以開啟一個獨立的線程來處理耗時較久的減庫存和下單操作,相當于是后廚。這樣咱們這個業務的效率,是不是也就能大大提升了。

當然為了更進一步的提高咱們這個項目的性能,除了要把它分離成兩塊以外,我們還要盡可能的提高對于秒殺資格判斷這部分業務的效率。那現在呢這部分對于秒殺資格判斷呀,依然要去查詢數據庫,所以它的性能依然是會受到數據庫的一個限制的。那我們知道相比于數據庫來講,什么東西的性能更好啊,沒錯就是Redis。所以說呢在這我們完全可以將優惠券信息、訂單信息啊緩存在Redis當中,然后把對于秒殺資格的判斷啊放到Redis里去做。主線程進來以后,首先呢就去找我們的Redis,完成對于秒殺資格的判斷,這部分判斷完啊,代表小姐姐的工作都完成了,其實已經可以結束了啊。那如果發現他有資格,我們再去執行后續的減庫存下單操作就ok了。

但是就像剛才說的那樣,我們要把這塊分離要交給兩個人去做,你不能說我調完Redis,再調他們蓋后邊這部分業務,那這樣一來不就又變成串聯成一條龍的執行了,那它的性能其實沒有增加,反而降低了。所以說呢在這里我們不能出去調用它,而是干什么的,我們將開啟獨立線程去執行。那問題來了,我怎么知道該執行誰呢?給誰哪個用戶去創建訂單的?所以說我們之前講過,是不是得有一個小票啊對吧?人家點餐完了得有小票,記住是誰啊,點了什么。那我們在這也是我們要記錄什么呢?誰買了什么東西,并且呢還要返回給用戶一個編號,將來的話,他是不是可以依據這個去完成付款的操作,后續的操作。哎,沒錯,所以在這里呢我們要做的第一件事情,其實就是記錄一下啊,優惠券信息、用戶信息,這里呢就是說誰買了什么,然后再去記錄訂單的ID,就相當于是那個叫號是吧,你是幾號,然后把這封信息存儲到一個隊列里。那這個存儲在一個隊列以后,我們將來就可以開啟一個獨立的線程啊,去讀取這個隊列里面的信息,完成下單了,就相當于是后廚里邊的那個什么了,哎,那個點餐的列表。點餐完成以后,用戶可以得到一個參號,那將來呢可以用來叫號。那我們這里返回的則是一個訂單的一方面啊,這個用戶可以拿這個訂單ID去付款,另一方面呢,他拿到ID就代表著他真的是搶單成功了。當然雖然此時啊這個真正的訂單還沒創建,但是我們將來會確保它一定會創建啊。

將來呢我們會一步的開啟一個獨立的線程,也就是我們這個后廚是吧,去讀取這個隊列中的優惠券信息、用戶信息啊,完成下單操作。那這樣同學們可以看到啊,我們其實接受用戶請求以后的業務流程就變成什么了,進來直接判斷啊,我們的秒殺資格,而且這個判斷是在Redis里做的。做完以后啊,只需要把它保存到隊列里,直接就結束了。那整個業務的流程是不是變短了,而且因為是基于Redis,所以性能可以得到極大的提升。整個業務的吞吐能力、并發能力是不是可以大大提高了。

不過呢這里就有一個難點的問題了,就是如何在Redis里完成對于秒殺庫存的判斷和一人一單的判斷。好,那在這里呢我們一起來分析一下啊。要想在Redis里判斷庫存是否充足以及一人一單,我們肯定需要把優惠券的庫存信息,以及有關的訂單信息啊,給它緩存在Redis當中。那這里就會有一個問題了,我們應該選擇什么樣的數據結構,來保存這兩個信息。因為是庫存比較簡單啊,庫存就是一個數值嘛,所以說我們只需要一個普通的String結構就行了,他的Key就是優惠券的ID,值呢就是庫存的值了。將來去做庫存判斷的時候,其實就是看一下這個值是不是大于零。如果大于零,那不就代表庫存充足嗎,我們就可以去做后續的業務了。但是呢大家一定要注意哦,當我們判斷用戶確實有購買資格之后啊,這個庫存值一定要減一,相當于我們要在Redis里提前預減庫存啊,否則的話這個值永遠不變,那豈不是永遠都是庫存充足了,那這樣呢庫存判斷就實現了啊。

然后我們再來說一下一人一單,要實現一人一單功能啊,我們就需要在Redis里去記錄當前這個優惠券被哪些用戶購買過。那以后再有用戶來的時候,只需要判斷它是否存在,存在證明他購買過,那就不能再買了。那這樣功能是不是就實現了。不過呢大家也可以再思考一下啊,什么樣的數據結構比較適合用來保存啊,購買過這個優惠券的用戶的信息。大家可以把答案打在彈幕上。其實這里呢需要滿足的就是兩點啊,第一我們知道一個優惠券他庫存有很多,所以呢將來購買的用戶是不是也會有很多呀,所以要滿足第一個特點,就是能夠在一個Key里保存很多值,也就是一個列表。第二呢我們講一人一單,那也就是說在這個優惠券里面保存的這些用戶的ID是肯定不能重復的,他要保證唯一性。那什么樣的列表具有唯一性呢?哎,這個時候啊答案是不是就很明顯了,就是我們的Set集合。我們知道Set集合是可以確保元素的唯一性的,并且可以在一個Key里保存多個值。雖然將來當有用戶來搶購的時候,我們只需要在這個Set里記錄用戶的ID,再有更多用戶來啊,也是依次去記錄就行了。當然如果有用戶重復的來購買,比如說2號用戶啊,我們以判斷發現在這個集合里已經存在了,那肯定就不允許的購買,他代表他是重復購買。這樣一人一單的功能是不是也就實現了。

那這個思路還是挺簡單的吧,下面呢我們就一起來梳理一下整個流程啊。首先一上來呢,我們就要先去判斷一下庫存是否充足,就是看一下這個值是否大于零。如果說庫存不足,我們肯定是返回一個錯誤的信息,比如說在這我們返回一啊,代表一個標識就是庫存不足。然后呢業務其實就結束了。但如果庫存充足呢,我們還需要去判斷一人一單,其實就是看一下這個用戶在這里面是否存在,是否下過單。那如果說他下過單啊,也就是存在,那這個時候代表什么是重復下單,同樣我們是不是也結束返回一個二標識,是重復下單。那如果說這里不存在呢,那代表他沒下過單,那代表有購買資格,我們是不是就可以去扣減庫存了啊,這個地方扣減庫存不是真正去數據庫扣啊,而是把Redis這個值啊扣一下,減一就行了。我們講過啊,是預減。接下來不要忘了,我們還需要把用戶ID是不是保存到這個集合里,這樣是不是可以作為下一次再來判斷時的一個依據啊。所以說還要做一件事情,就是將用戶的ID存入這個優惠券的在Set集合里。到這兒呢我們整個流程才算結束,我們就可以去返回結果了,比如說返回零。那這樣來零代表的就是什么,哎,是有購買資格,一和二代表的是不是沒有購買資格在這里啊,因為對Redis的判斷啊,有很多個判斷,業務流程比較長,而我們呢必須確保這一段流程執行時的一個原子性啊。那大家就能夠想到了,用什么東西可以確保代碼執行的原子性啊。哎沒錯,就是我們的Lua腳本啊。所以說呢我們將來啊,這部分內容直接用Lua腳本來實現

那么將來我們在Tomcat里面,我們進入到用戶請求以后,要干的事情是什么?唉,上來呢就先去調用這個Lua的腳本,根據Lua腳本執行的結果是零一或者是二,來去做一個后續的判斷。那如果說你不是零啊,那么代表就是失敗的情況了,可能是一,可能是二對吧。那這個時候我們肯定就返回一個錯誤信息,就行了啊,然后就結束了。但如果說這里是零,那證明什么啊,如果是零,證明是不是你有購買資格。那有購買資格,我們就需要去按照之前說的,把這些信息首先保存在一個堆里面,什么信息呢,優惠券ID,用戶ID,訂單ID啊,這相當于是小票嗎,誰購買了什么東西,還有你的編號是什么。保存下來,方便將來我們的異步線程去執行它,完成真正的下單。其次呢,我們還需要返回一個訂單的ID,也就是那個號啊,給用戶好了。那到這里呢,基于Redis的秒殺業務的流程就分析完畢了。

可以看到在這個版本當中,我們接收到用戶請求以后,核心要做的事情啊,僅僅是判斷一下用戶有沒有購買資格,沒資格就報錯,有資格呢就返回一個ID給他,而耗時較久的秒殺下單減庫存等等,核心的這種數據庫寫操作,并沒有在我們這個流程當中出現啊。更何況呀,我們這里判斷用戶購買資格,也是在Redis里通過腳本來執行的,可以說整個業務流程變得非常的短,而且呢執行的性能又非常的好。因此整個業務的耗時就非常非常短了,可以想象的是,我們這套實現方案當中,它的并發能力就會變得非常的高。

那什么時候再去做下單和減庫存的邏輯呢?哎,我們之前也說過了,將來呢,我們只需要開啟一個獨立的線程來讀取啊,提前保存好的這些用戶信息啊,優惠券信息就可以完成異步的啊,數據庫的這種寫的操作了。而且呢在我們返回訂單ID給用戶的那一刻,其實秒殺業務已經結束,用戶已經可以拿著這個ID去付款了。所以呢我們什么時候將優惠券信息,用戶信息寫入數據庫里,完成下單減庫存的操作,其實就沒有那么重要了啊,時效性上要求就沒那么高了。我們完全可以按照數據庫能夠承受的頻率啊,去將數據寫入數據庫。當然如果你想提高寫入數據庫時的性能,我們不妨多開幾個線程,甚至是把這種單個的寫變成批量的寫,從而提高一下這個異步操作的一個效率啊。

好吧,那到這為止呢,基于Redis啊這樣的異步秒殺的業務流程,我們就分析完畢了。正是因為啊,我們將這種同步的寫數據庫操作,變成了異步操作啊,他一方面呢縮短了秒殺業務的流程,從而大大提高了秒殺業務的并發,另一方面呢還減輕了數據庫的壓力啊,可以說是一舉多得啊,是一種非常好的方案啊。那下節課呢我們就去嘗試著來實現一下,這套異步秒殺的方案。

改進秒殺業務,提提高并發性能

需求:
①新增秒殺優惠券的同時,將優惠券信息保存到Redis中
②基于Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶購成功
③如果搶購成功,將優惠券id和用戶id封裝后存入阻塞隊列
④開啟線程任務,不斷從阻塞隊列中獲取信息,實現異步下單功能

?VoucherServicelmpl.java修改

前面加一個 @Resourceprivate StringRedisTemplate stringRedisTemplate;@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存優惠券save(voucher);// 保存秒殺信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);//報保存秒殺庫存到redis中//秒殺時間在前端已經判斷了,不在時間內是不顯示的stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());}
}

postman給他發一個這個?http://localhost:8081/voucher/seckill

{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全場通用\\n無需預約\\n可無限疊加\\不兌現、不找零\\n僅限堂食","payValue":8000,"actualValue":10000,"type": 1,"stock":100,"beginTime":"2024-08-24T12:13:14","endTime":"2025-09-17T11:11:11"
}?

?寫一個seckill.lua腳本

--首先要判斷的就是庫存是否重組,得去讀取redis當中的這個key(比如seckill:stock:9)的值
--1.參數列表
--1.1.優惠券id
local voucherId =ARGV[1]
--1.2.用戶id
local userId =ARGV[2]--2.數據key
--2.1.庫存key
local stockKey= 'seckill:stock:'..voucherId
--2.2.訂單key
local orderKey= 'seckill:order:'..voucherId--3.腳本業務
--3.1.判斷庫存是否充足getstockKey
if (tonumber(redis.call('get',stockKey))<=0) then
--3.2,庫存不足,返回1
return 1
end
--3.2.判斷用戶是否下單SISMEMBERorderKeyuserId
if(redis.call('sismember',orderKey,userId) == 1) then
--3.3.存在,說明是重復下單,返回2
return 2
end
--3.4.扣庫存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5.下單(保存用戶)sadd orderKey userId
redis.call('sadd',orderKey,userId)

對voucherorderserviceimpl的seckillvoucher重構

 @Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {//獲取用戶Long userId =UserHolder.getUser().getId();//1.執行lua腳本Long result =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2. 判斷結果是0int r=result.intValue();if(r!=0){//2.1 不是0就沒有購買資格return Result.fail( r==1?"庫存不足":"");}//2.2  為0 有購買資格 ,把下單信息保存到阻塞隊列long orderId = redisIdWorker.nextId("order");//返回訂單idreturn Result.ok(0);}

VoucherOrderServiceImpl.java修改如下?

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import static com.baomidou.mybatisplus.core.toolkit.Wrappers.query;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024*1024);//阻塞隊列,阻塞隊列特點:當一個線程嘗試從隊列中獲取元素,沒有元素,線程就會被阻塞,直到隊列中有元素,線程才會被喚醒,并去獲取元素private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1. 獲取隊列中的訂單信息VoucherOrder voucherOrder= orderTasks.take();//2.創建訂單handlerVoucherOrder(voucherOrder);}catch (Exception e) {log.error("處理訂單異常",e);}}}}private void handlerVoucherOrder(VoucherOrder voucherOrder) {//1.獲取用戶Long userId = voucherOrder.getUserId();//2.創建鎖對象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.獲取鎖boolean isLock = lock.tryLock();//4.判斷是否獲取鎖成功if (!isLock){///獲取鎖失敗,返回錯誤或重試log.error("不允許重復下單");return;}try{proxy.createVoucherOrder(voucherOrder);} finally {//釋放鎖lock.unlock();}}private  IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//獲取用戶Long userId =UserHolder.getUser().getId();//1.執行lua腳本Long result =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2. 判斷結果是0int r=result.intValue();if(r!=0){//2.1 不是0就沒有購買資格return Result.fail( r==1?"庫存不足":"不能重復下單");}//2.2  為0 有購買資格 ,把下單信息保存到阻塞隊列VoucherOrder voucherOrder = new VoucherOrder();//訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4.用戶idvoucherOrder.setUserId(userId);//2.5.代金券idvoucherOrder.setVoucherId(voucherId);//2.6.放入阻塞隊列orderTasks.add(voucherOrder);//3.獲取代理對象proxy= (IVoucherOrderService)AopContext.currentProxy();// proxy作為實例變量在多線程環境下可能被覆蓋,導致數據不一致。//修復: 在需要時直接獲取代理,避免使用實例變量://IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//proxy.createVoucherOrder(voucherOrder);//返回訂單idreturn Result.ok(orderId);}/*@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券拿到信息;其實后續Redission可以直接用信號量來鎖庫存SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//尚未開始return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock() < 1) {//庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();//創建鎖對象//SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:"+userId);//獲取鎖boolean isLock=lock.tryLock();//其實是3個參數,也可以選擇無參if(!isLock){return Result.fail("不能重復下單");}try {//獲取代理對象(事務)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {//釋放鎖lock.unlock();}//try finally 事務可以生效,因為沒有捕獲異常。如果catch捕獲了異常,需要拋出RuntimeException類型異常,不然事務失效。}
*/@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//我們的同步鎖就是this,是當前對象//另外呢這個事務的范圍其實是更新數據庫的一個范圍:也就是說做減庫存操作和創建電子操作而不是整個操作//6.一人一單Long userId = UserHolder.getUser().getId();//6.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//6.2.判斷是否存在if (count > 0) {//用戶已經購買過了log.error("用戶已經購買過一次!");return;}//5.扣減庫存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//這里一定是相等的吧,應該是之前查到的,你現在現查,肯定相同.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {//扣減失敗log.error("庫存不足!");return;}/*//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用戶idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);*/save(voucherOrder);//異步執行無需返回}}

lua表示在執行的時候也是一把鎖;哪怕是多實例部署,這里都不需要加鎖,因為Redis是獨立于服務之外的,多實例可見的;但是你這里加鎖,不還是通過redis,如果前面redis壞了,你這里大概率也不起作用;

報腳本錯誤應該是因為tonumber函數沒辦法轉成數字,找了源碼看了原因是說函數里的類型沒辦法轉為數字,應該是因為Redis存儲的鍵值中含有引號,需要去改一下Redistribution的默認序列化器;報錯attempt to compare nil with number的,可能是優惠券信息沒有保存到redis中,也就是第一步

IVoucherOrderService修改void createVoucherOrder(VoucherOrder voucherOrder);

秒殺業務的優化思路是什么?
1. 先利用Redis完成庫存余量、一人一單判斷,完成搶單業務
2. 再將下單業務放入阻塞隊列,利用獨立線程異步下單

基于阻塞隊列的異步秒殺存在哪些問題?
內存限制問題
數據安全問題

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/79757.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/79757.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/79757.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

低代碼開發平臺:企業數字化轉型的加速器

一、引言 在數字化時代&#xff0c;企業的轉型需求日益迫切。為了在激烈的市場競爭中保持領先地位&#xff0c;企業需要快速響應市場變化、優化業務流程、提升運營效率。然而&#xff0c;傳統的軟件開發模式往往面臨開發周期長、成本高、靈活性差等問題&#xff0c;難以滿足企業…

個人所得稅

文章目錄 一、名詞解釋二、個人所得稅計算方法 (舉例)1.累計預扣預繳應納稅所得額、本期應預扣預繳稅額2.個人所得稅預扣率表一3.個人所得稅計算舉例 三、專項附加扣除政策介紹四、年度匯算清繳政策介紹五、常見問答 一、名詞解釋 累計預扣法是指扣繳義務人在一個納稅年度內預…

二進制和docker兩種方式部署Apache pulsar(standalone)

#作者&#xff1a;閆乾苓 文章目錄 1、二進制安裝部署Pulsar(standalone)1.1 安裝配置JDK1.2 下載解壓pulsar安裝包1.3 啟動獨立模式的Pulsar 集群1.4 創建主題測試1.5 向主題寫入消息測試1.6 從主題中讀取消息測試 2.docker安裝部署Pulsar(standalone)2.1 使用docker 啟動Pul…

如何在 Go 中創建和部署 AWS Lambda 函數

AWS Lambda 是一個無服務器計算平臺&#xff0c;您可以使用自己喜歡的編程語言編寫代碼&#xff0c;無需擔心設置虛擬機。 您只需為 Lambda 函數的調用次數和運行時間&#xff08;毫秒&#xff09;付費。 我們大多數人都了解 JavaScript 和 Python&#xff0c;但它們的內存效率…

STM32配置系統時鐘

1、STM32配置系統時鐘的步驟 1、系統時鐘配置步驟 先配置系統時鐘&#xff0c;后面的總線才能使用時鐘頻率 2、外設時鐘使能和失能 STM32為了低功耗&#xff0c;一開始是關閉了所有的外設的時鐘&#xff0c;所以外設想要工作&#xff0c;首先就要打開時鐘&#xff0c;所以后面…

[安全實戰]逆向工程核心名詞詳解

逆向工程核心名詞詳解 一、調試與執行類 1. 斷點&#xff08;Breakpoint&#xff09; 定義&#xff1a;在代碼中設置標記&#xff0c;使程序執行到此處時暫停類型&#xff1a; 普通斷點&#xff1a;通過INT3指令實現條件斷點&#xff1a;滿足特定條件時觸發內存斷點&#xf…

Mac mini 安裝mysql數據庫以及出現的一些問題的解決方案

首先先去官網安裝一下mysql數據庫&#xff0c;基本上都是傻瓜式安裝的流程&#xff0c;我也就不詳細說了。 接下來就是最新版的mysql安裝的時候&#xff0c;他就會直接讓你設置一個新的密碼。 打開設置&#xff0c;拉到最下面就會看到一個mysql的圖標&#xff1a; 我設置的就是…

聚寬策略----國九條后中小板微盤小改,年化135.40%

最近在研究的聚寬策略&#xff0c;一般技術分析的我直接轉qmt了&#xff0c;財務因子有一點麻煩&#xff0c;我直接利用我開發強大的服務器系統&#xff0c;直接讀取信號&#xff0c;最近在優化一下系統&#xff0c;最近在開發對接bigquant的交易系統&#xff0c;完成了api數據…

C語言狀態字與庫函數詳解:概念辨析與應用實踐

C語言狀態字與庫函數詳解&#xff1a;概念辨析與應用實踐 一、狀態字與庫函數的核心概念區分 在C語言系統編程中&#xff0c;"狀態字"和"庫函數"是兩個經常被混淆但本質完全不同的概念&#xff0c;理解它們的區別是掌握系統編程的基礎。 1. 狀態字&…

End-to-End從混沌到秩序:基于LLM的Pipeline將非結構化數據轉化為知識圖譜

摘要:本文介紹了一種將非結構化數據轉換為知識圖譜的端到端方法。通過使用大型語言模型(LLM)和一系列數據處理技術,我們能夠從原始文本中自動提取結構化的知識。這一過程包括文本分塊、LLM 提示設計、三元組提取、歸一化與去重,最終利用 NetworkX 和 ipycytoscape 構建并可…

Leetcode 3523. Make Array Non-decreasing

Leetcode 3523. Make Array Non-decreasing 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;3523. Make Array Non-decreasing 1. 解題思路 這一題思路上來說就是一個棧的問題&#xff0c;就是從后往前依次考察每一個元素&#xff0c;顯然&#xff0c;當前位置要么被舍棄&…

探秘STM32如何成為現代科技的隱形引擎

STM32單片機原理與應用 前言&#xff1a;微型計算機的硅腦 在我們身邊的每一個智能設備中&#xff0c;都隱藏著一個小小的"硅腦"——單片機。它們體積微小&#xff0c;卻能執行復雜的運算和控制功能&#xff0c;就像是現代科技世界的"神經元"。STM32系列…

機制的作用

“機制”是一個廣泛使用的概念&#xff0c;其含義和應用范圍因領域而異。在不同的學科和實際應用中&#xff0c;機制有著不同的定義和功能。以下從幾個主要領域對“機制”進行詳細解釋&#xff1a; 一、自然科學中的機制 &#xff08;一&#xff09;物理學 定義 在物理學中&…

prim最小生成樹+最大生成樹【C++】板子題

什么是最小生成樹&#xff1f; 在一給定的無向圖G (V, E) 中&#xff0c;(u, v) 代表連接頂點 u 與頂點 v 的邊&#xff0c;而 w(u, v) 代表此的邊權重&#xff0c;若存在 T 為 E 的子集&#xff08;即&#xff09;且為無循環圖&#xff0c;使得的 w(T) 最小&#xff0c;則此 …

讀書筆記--MySQL索引

索引(在 MySQL 中也叫做“鍵(key)”)是存儲引擎用于快速找到記錄的一種數據結構。 索引對于良好的性能非常關鍵。尤其是當表中的數據量越來越大時&#xff0c;索引對性能的影響愈發重要。在數據量較小且負載較低時&#xff0c;不恰當的索引對性能的影響可能還不明顯&#xff0c…

VS Code 遠程連接服務器:Anaconda 環境與 Python/Jupyter 運行全指南。研0大模型學習(第六、第七天)

VS Code 遠程連接服務器&#xff1a;Anaconda 環境與 Python/Jupyter 運行全指南 在使用 VS Code 通過 SSH 遠程連接到服務器進行開發時&#xff0c;尤其是在進行深度學習等需要特定環境的工作時&#xff0c;正確配置和使用 Anaconda 環境以及理解不同的代碼運行方式非常關鍵。…

字節頭條golang二面

docker和云服務的區別 首先明確Docker的核心功能是容器化&#xff0c;它通過容器技術將應用程序及其依賴項打包在一起&#xff0c;確保應用在不同環境中能夠一致地運行。而云服務則是由第三方提供商通過互聯網提供的計算資源&#xff0c;例如計算能力、存儲、數據庫等。云服務…

數據結構和算法(七)--樹

一、樹 樹是我們計算機中非常重要的一種數據結構&#xff0c;同時使用樹這種數據結構&#xff0c;可以描述現實生活中的很多事物&#xff0c;例如家譜、單位的組織架構、等等。 樹是由n(n>1)個有限結點組成一個具有層次關系的集合。把它叫做"樹"是因為它看起來像一…

狀態管理最佳實踐:Provider使用技巧與源碼分析

狀態管理最佳實踐&#xff1a;Provider使用技巧與源碼分析 前言 Provider是Flutter官方推薦的狀態管理解決方案&#xff0c;它簡單易用且功能強大。本文將從實戰角度深入講解Provider的使用技巧和源碼實現原理&#xff0c;幫助你更好地在項目中應用Provider進行狀態管理。 基…

使用 NEAT 進化智能體解決 Gymnasium 強化學習環境

使用 NEAT 進化智能體解決 Gymnasium 強化學習環境 0. 前言1. 環境定義2. 配置 NEAT3. 解決強化學習問題小結系列鏈接0. 前言 在本節中,我們使用 NEAT 解決經典強化學習 (reinforcement learning, RL) Gym 問題。但需要注意的是,我們用于推導網絡和解決方程的方法不是 RL,而…