面試重點:本篇包含悲觀鎖,樂觀鎖,多線程以及分布式鎖的知識
目錄
3.優惠卷秒殺
3.1 -全局唯一ID
3.2 -Redis實現全局唯一Id
3.3 添加優惠卷
3.4 實現秒殺下單
3.5 庫存超賣問題分析
3.6 樂觀鎖解決超賣問題
3.7?優惠券秒殺-一人一單
3.8 集群環境下的并發問題
4、分布式鎖
4.1 、基本原理和實現方式對比
4.2 、Redis分布式鎖的實現核心思路
4.3 實現分布式鎖初級版本
4.4 Redis分布式鎖誤刪情況說明
4.5 解決Redis分布式鎖誤刪問題
4.6 分布式鎖的原子性問題
4.7 Lua腳本解決多條命令原子性問題
4.8 調用Lua腳本改造分布式鎖
5、分布式鎖-redission
5.1 redission功能
5.2 Redission入門
5.3 redission可重入鎖原理
5.4 redission鎖重試和WatchDog機制
5.5 redission鎖的MutiLock原理
3.優惠卷秒殺
3.1 -全局唯一ID
每個店鋪都可以發布優惠券:
當用戶搶購時,就會生成訂單并保存到tb_voucher_order這張表中,而訂單表如果使用數據庫自增ID就存在一些問題:
-
id的規律性太明顯
-
受單表數據量的限制
場景分析:如果我們的id具有太明顯的規則,用戶或者說商業對手很容易猜測出來我們的一些敏感信息,比如商城在一天時間內,賣出了多少單,這明顯不合適。
場景分析二:隨著我們商城規模越來越大,mysql的單表的容量不宜超過500W,數據量過大之后,我們要進行拆庫拆表,但拆分表了之后,他們從邏輯上講他們是同一張表,所以他們的id是不能一樣的, 于是乎我們需要保證id的唯一性。
全局ID生成器,是一種在分布式系統下用來生成全局唯一ID的工具,一般要滿足下列特性:
為了增加ID的安全性,我們可以不直接使用Redis自增的數值,而是拼接一些其它信息:
成部分:符號位:1bit,永遠為0
時間戳:31bit,以秒為單位,可以使用69年
序列號:32bit,秒內的計數器,支持每秒產生2^32個不同ID
3.2 -Redis實現全局唯一Id
@Component
public class RedisIdWorker {/*** 開始時間戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列號的位數*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成時間戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列號// 2.1.獲取當前日期,精確到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增長long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回return timestamp << COUNT_BITS | count;}
}
測試類
知識小貼士:countdownlatch
countdownlatch名為信號槍:主要的作用是同步協調在多線程的等待于喚醒問題
我們如果沒有CountDownLatch ,那么由于程序是異步的,當異步程序沒有執行完時,主線程就已經執行完了,然后我們期望的是分線程全部走完之后,主線程再走,所以我們此時需要使用到CountDownLatch
兩個最重要的方法:
1、countDown
2、await
await 方法 是阻塞方法,我們擔心分線程沒有執行完時,main線程就先執行,所以使用await可以讓main線程阻塞,那么什么時候main線程不再阻塞呢?當CountDownLatch 內部維護的 變量變為0時,就不再阻塞,直接放行,那么什么時候CountDownLatch 維護的變量變為0 呢,我們只需要調用一次countDown ,內部變量就減少1,我們讓分線程和變量綁定, 執行完一個分線程就減少一個變量,當分線程全部走完,CountDownLatch 維護的變量就是0,此時await就不再阻塞,統計出來的時間也就是所有分線程執行完后的時間。
@Test
void testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time = " + (end - begin));
}
@Test導的是? import?org.junit.jupiter.api.Test包
在執行測試類時需要打開虛擬機,在虛擬機上關閉防火墻,然后啟動redis才可運行
systemctl stop firewalld
cd /usr/local/src/redis-6.2.6
redis-server redis.conf
3.3 添加優惠卷
每個店鋪都可以發布優惠券,分為平價券和特價券。平價券可以任意購買,而特價券需要秒殺搶購:
tb_voucher:優惠券的基本信息,優惠金額、使用規則等
tb_seckill_voucher:優惠券的庫存、開始搶購時間,結束搶購時間。特價優惠券才需要填寫這些信息
平價卷由于優惠力度并不是很大,所以是可以任意領取
而代金券由于優惠力度大,所以像第二種卷,就得限制數量,從表結構上也能看出,特價卷除了具有優惠卷的基本信息以外,還具有庫存,搶購時間,結束時間等等字段
新增普通卷代碼: VoucherController
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {voucherService.save(voucher);return Result.ok(voucher.getId());
}
新增秒殺卷代碼:
VoucherController
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {voucherService.addSeckillVoucher(voucher);return Result.ok(voucher.getId());
}
VoucherServiceImpl
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存優惠券save(voucher);// 保存秒殺信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒殺庫存到Redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
3.4 實現秒殺下單
下單核心思路:當我們點擊搶購時,會觸發右側的請求,我們只需要編寫對應的controller即可
秒殺下單應該思考的內容:
下單時需要判斷兩點:
-
秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
-
庫存是否充足,不足則無法下單
下單核心邏輯分析:
當用戶開始進行下單,我們應當去查詢優惠卷信息,查詢到優惠卷信息,判斷是否滿足秒殺條件
比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,如果兩者都滿足,則扣減庫存,創建訂單,然后返回訂單id,如果有一個條件不滿足則直接結束。
VoucherOrderServiceImpl
@Override
public Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}//5,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}//6.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 6.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用戶idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}
3.5 庫存超賣問題分析
有關超賣問題分析:在我們原有代碼中是這么寫的
if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}//5,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}
假設線程1過來查詢庫存,判斷出來庫存大于1,正準備去扣減庫存,但是還沒有來得及去扣減,此時線程2過來,線程2也去查詢庫存,發現這個數量一定也大于1,那么這兩個線程都會去扣減庫存,最終多個線程相當于一起去扣減庫存,此時就會出現庫存的超賣問題。
超賣問題是典型的多線程安全問題,針對這一問題的常見解決方案就是加鎖:而對于加鎖,我們通常有兩種解決方案:見下圖:
加鎖
悲觀鎖:
悲觀鎖可以實現對于數據的串行化執行,比如syn,和lock都是悲觀鎖的代表,同時,悲觀鎖中又可以再細分為公平鎖,非公平鎖,可重入鎖,等等
樂觀鎖:
樂觀鎖:會有一個版本號,每次操作數據會對版本號+1,再提交回數據時,會去校驗是否比之前的版本大1 ,如果大1 ,則進行操作成功,這套機制的核心邏輯在于,如果在操作過程中,版本號只比原來大1 ,那么就意味著操作過程中沒有人對他進行過修改,他的操作就是安全的,如果不大1,則數據被修改過,當然樂觀鎖還有一些變種的處理方式比如cas
樂觀鎖的典型代表:就是cas,利用cas進行無鎖化機制加鎖,var5 是操作前讀取的內存值,while中的var1+var2 是預估值,如果預估值 == 內存值,則代表中間沒有被人修改過,此時就將新值去替換 內存值
其中do while 是為了在操作失敗時,再次進行自旋操作,即把之前的邏輯再操作一次。
int var5;
do {var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;
課程中的使用方式:
課程中的使用方式是沒有像cas一樣帶自旋的操作,也沒有對version的版本號+1 ,他的操作邏輯是在操作時,對版本號進行+1 操作,然后要求version 如果是1 的情況下,才能操作,那么第一個線程在操作后,數據庫中的version變成了2,但是他自己滿足version=1 ,所以沒有問題,此時線程2執行,線程2 最后也需要加上條件version =1 ,但是現在由于線程1已經操作過了,所以線程2,操作時就不滿足version=1 的條件了,所以線程2無法執行成功
3.6 樂觀鎖解決超賣問題
修改代碼方案一、
VoucherOrderServiceImpl 在扣減庫存時,改為:
以上邏輯的核心含義是:只要我扣減庫存時的庫存和之前我查詢到的庫存是一樣的,就意味著沒有人在中間修改過庫存,那么此時就是安全的,但是以上這種方式通過測試發現會有很多失敗的情況,失敗的原因在于:在使用樂觀鎖過程中假設100個線程同時都拿到了100的庫存,然后大家一起去進行扣減,但是100個人中只有1個人能扣減成功,其他的人在處理時,他們在扣減時,庫存已經被修改過了,所以此時其他線程都會失敗
修改代碼方案二、
之前的方式要修改前后都保持一致,但是這樣我們分析過,成功的概率太低,所以我們的樂觀鎖需要變一下,改成stock大于0 即可
知識小擴展:
針對cas中的自旋壓力過大,我們可以使用Longaddr這個類去解決
Java8 提供的一個對AtomicLong改進后的一個類,LongAdder
大量線程并發更新一個原子性的時候,天然的問題就是自旋,會導致并發性問題,當然這也比我們直接使用syn來的好
所以利用這么一個類,LongAdder來進行優化
如果獲取某個值,則會對cell和base的值進行遞增,最后返回一個完整的值
3.7?優惠券秒殺-一人一單
需求:修改秒殺業務,要求同一個優惠券,一個用戶只能下一單
現在的問題在于:
優惠卷是為了引流,但是目前的情況是,一個人可以無限制的搶這個優惠卷,所以我們應當增加一層邏輯,讓一個用戶只能下一個單,而不是讓一個用戶下多個單
具體操作邏輯如下:比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,然后再根據優惠卷id和用戶id查詢是否已經下過這個訂單,如果下過這個訂單,則不再下單,否則進行下單
VoucherOrderServiceImpl
初步代碼:增加一人一單邏輯
@Override
public Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}// 5.一人一單邏輯// 5.1.用戶idLong userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}//6,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}
存在問題:現在的問題還是和之前一樣,并發過來,查詢數據庫,都不存在訂單,所以我們還是需要加鎖,但是樂觀鎖比較適合更新數據,而現在是插入數據,所以我們需要使用悲觀鎖操作
注意:在這里提到了非常多的問題,我們需要慢慢的來思考,首先我們的初始方案是封裝了一個createVoucherOrder方法,同時為了確保他線程安全,在方法上添加了一把synchronized 鎖
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);
}
但是這樣添加鎖,鎖的粒度太粗了,在使用鎖過程中,控制鎖粒度 是一個非常重要的事情,因為如果鎖的粒度太大,會導致每個線程進來都會鎖住,所以我們需要去控制鎖的粒度,以下這段代碼需要修改為: intern() 這個方法是從常量池中拿到數據,如果我們直接使用userId.toString() 他拿到的對象實際上是不同的對象,new出來的對象,我們使用鎖必須保證鎖必須是同一把,所以我們需要使用intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}
但是以上代碼還是存在問題,問題的原因在于當前方法被spring的事務控制,如果你在方法內部加鎖,可能會導致當前方法事務還沒有提交,但是鎖已經釋放也會導致問題,所以我們選擇將當前方法整體包裹起來,確保事務不會出現問題:如下:
在seckillVoucher 方法中,添加以下邏輯,這樣就能保證事務的特性,同時也控制了鎖的粒度
但是以上做法依然有問題,因為你調用的方法,其實是this.的方式調用的,事務想要生效,還得利用代理來生效,所以這個地方,我們需要獲得原始的事務對象, 來操作事務
最終代碼
VoucherOrderServiceImpl
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;/*** <p>* 服務實現類* </p>** @author chan* @since 2025-5-2*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判斷秒殺是否開始if(voucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒殺尚未開始!");}//3.判斷秒殺是否結束if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒殺已經結束!");}//4.判斷庫存是否充足if (voucher.getStock()<1) {return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Transactionalpublic Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}
pom.xml
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjrt</artifactId></dependency>
啟動類加注解
@EnableAspectJAutoProxy(exposeProxy = true)
3.8 集群環境下的并發問題
有關鎖失效原因分析
由于現在我們部署了多個tomcat,每個tomcat都有一個屬于自己的jvm,那么假設在服務器A的tomcat內部,有兩個線程,這兩個線程由于使用的是同一份代碼,那么他們的鎖對象是同一個,是可以實現互斥的,但是如果現在是服務器B的tomcat內部,又有兩個線程,但是他們的鎖對象寫的雖然和服務器A一樣,但是鎖對象卻不是同一個,所以線程3和線程4可以實現互斥,但是卻無法和線程1和線程2實現互斥,這就是 集群環境下,syn鎖失效的原因,在這種情況下,我們就需要使用分布式鎖來解決這個問題。
4、分布式鎖
4.1 、基本原理和實現方式對比
分布式鎖:滿足分布式系統或集群模式下多進程可見并且互斥的鎖。
分布式鎖的核心思想就是讓大家都使用同一把鎖,只要大家使用的是同一把鎖,那么我們就能鎖住線程,不讓線程進行,讓程序串行執行,這就是分布式鎖的核心思路
那么分布式鎖應該滿足一些什么樣的條件呢?
可見性:多個線程都能看到相同的結果,注意:這個地方說的可見性并不是并發編程中指的內存可見性,只是說多個進程之間都能感知到變化的意思
互斥:互斥是分布式鎖的最基本的條件,使得程序串行執行
高可用:程序不易崩潰,時時刻刻都保證較高的可用性
高性能:由于加鎖本身就讓性能降低,所有對于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能
安全性:安全也是程序中必不可少的一環
常見的分布式鎖有三種
Mysql:mysql本身就帶有鎖機制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實使用mysql作為分布式鎖比較少見
Redis:redis作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖
Zookeeper:zookeeper也是企業級開發中較好的一個實現分布式鎖的方案,由于本套視頻并不講解zookeeper的原理和分布式鎖的實現,所以不過多闡述
4.2 、Redis分布式鎖的實現核心思路
實現分布式鎖時需要實現的兩個基本方法:
-
獲取鎖:
-
互斥:確保只能有一個線程獲取鎖
-
非阻塞:嘗試一次,成功返回true,失敗返回false
-
-
釋放鎖:
-
手動釋放
-
超時釋放:獲取鎖時添加一個超時時間
-
核心思路:
我們利用redis 的setNx 方法,當有多個線程進入時,我們就利用該方法,第一個線程進入時,redis 中就有這個key 了,返回了1,如果結果是1,則表示他搶到了鎖,那么他去執行業務,然后再刪除鎖,退出鎖邏輯,沒有搶到鎖的,等待一定時間后重試即可
4.3 實現分布式鎖初級版本
-
加鎖邏輯
鎖的基本接口
SimpleRedisLock
利用setnx方法進行加鎖,同時增加過期時間,防止死鎖,此方法可以保證加鎖和增加過期時間具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {// 獲取線程標示String threadId = Thread.currentThread().getId()// 獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
-
釋放鎖邏輯
SimpleRedisLock
釋放鎖,防止刪除別人的鎖
public void unlock() {//通過del刪除鎖stringRedisTemplate.delete(KEY_PREFIX + name);
}
修改業務代碼
@Overridepublic Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();//創建鎖對象(新增代碼)SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//獲取鎖對象boolean isLock = lock.tryLock(1200);//加鎖失敗if (!isLock) {return Result.fail("不允許重復下單");}try {//獲取代理對象(事務)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//釋放鎖lock.unlock();}}
4.4 Redis分布式鎖誤刪情況說明
邏輯說明:
持有鎖的線程在鎖的內部出現了阻塞,導致他的鎖自動釋放,這時其他線程,線程2來嘗試獲得鎖,就拿到了這把鎖,然后線程2在持有鎖執行過程中,線程1反應過來,繼續執行,而線程1執行過程中,走到了刪除鎖邏輯,此時就會把本應該屬于線程2的鎖進行刪除,這就是誤刪別人鎖的情況說明
解決方案:解決方案就是在每個線程釋放鎖的時候,去判斷一下當前這把鎖是否屬于自己,如果屬于自己,則不進行鎖的刪除,假設還是上邊的情況,線程1卡頓,鎖自動釋放,線程2進入到鎖的內部執行邏輯,此時線程1反應過來,然后刪除鎖,但是線程1,一看當前這把鎖不是屬于自己,于是不進行刪除鎖邏輯,當線程2走到刪除鎖邏輯時,如果沒有卡過自動釋放鎖的時間點,則判斷當前這把鎖是屬于自己的,于是刪除這把鎖。
解決方案:
此時這個鎖已經變成了線程二的鎖,線程一完成業務后,無法釋放不屬于自己的鎖,此時線程二完成之前,鎖一直沒有釋放
具體代碼如下:
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(Long timeoutSec) {//獲取當前創建所的線程標識(value)String threadId = ID_PREFIX + Thread.currentThread().getId();//獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//獲取當前需要釋放鎖的線程標識String threadId = ID_PREFIX + Thread.currentThread().getId();//獲取鎖中標識String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判斷當前線程是否為獲取鎖時的線程if (id.equals(threadId)) {//釋放鎖stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
此處的UUID是用來區別不同的JVM的,而線程ID是用來區別同一個JVM中的不同線程的
4.5 解決Redis分布式鎖誤刪問題
需求:修改之前的分布式鎖實現,滿足:在獲取鎖時存入線程標示(可以用UUID表示) 在釋放鎖時先獲取鎖中的線程標示,判斷是否與當前線程標示一致
-
如果一致則釋放鎖
-
如果不一致則不釋放鎖
核心邏輯:在存入鎖時,放入自己線程的標識,在刪除鎖時,判斷當前這把鎖的標識是不是自己存入的,如果是,則進行刪除,如果不是,則不進行刪除。
具體代碼如下:加鎖
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {// 獲取線程標示String threadId = ID_PREFIX + Thread.currentThread().getId();// 獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
釋放鎖
public void unlock() {// 獲取線程標示String threadId = ID_PREFIX + Thread.currentThread().getId();// 獲取鎖中的標示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判斷標示是否一致if(threadId.equals(id)) {// 釋放鎖stringRedisTemplate.delete(KEY_PREFIX + name);}
}
有關代碼實操說明:
在我們修改完此處代碼后,我們重啟工程,然后啟動兩個線程,第一個線程持有鎖后,手動釋放鎖,第二個線程 此時進入到鎖內部,再放行第一個線程,此時第一個線程由于鎖的value值并非是自己,所以不能釋放鎖,也就無法刪除別人的鎖,此時第二個線程能夠正確釋放鎖,通過這個案例初步說明我們解決了鎖誤刪的問題。
4.6 分布式鎖的原子性問題
更為極端的誤刪邏輯說明:
線程1現在持有鎖之后,在執行業務邏輯過程中,他正準備刪除鎖,而且已經走到了條件判斷的過程中,比如他已經拿到了當前這把鎖確實是屬于他自己的,正準備刪除鎖,但是此時他的鎖到期了,那么此時線程2進來,但是線程1他會接著往后執行,當他卡頓結束后,他直接就會執行刪除鎖那行代碼,相當于條件判斷并沒有起到作用,這就是刪鎖時的原子性問題,之所以有這個問題,是因為線程1的拿鎖,比鎖,刪鎖,實際上并不是原子性的。
鎖名稱一樣,但是鎖的線程標識不一樣。線程1判斷后發現鎖的線程標識和當前線程一樣,于是根據鎖名釋放鎖,但是業務阻塞,導致自己的鎖超時釋放,此時線程二開始執行,線程2拿到同樣名稱的鎖,開始執行業務,此時線程1的阻塞解決后,立刻根據鎖名稱把線程2的鎖誤刪了(可以刪的原因是因為已經判斷過判斷一致后,未能來的及釋放鎖,就遭遇了阻塞,所以在阻塞立刻解決,就會滯后地執行釋放鎖的行為)
4.7 Lua腳本解決多條命令原子性問題
Redis提供了Lua腳本功能,在一個腳本中編寫多條Redis命令,確保多條命令執行時的原子性。Lua是一種編程語言,它的基本語法大家可以參考網站:Lua 教程 | 菜鳥教程,這里重點介紹Redis提供的調用函數,我們可以使用lua去操作redis,又能保證他的原子性,這樣就可以實現拿鎖比鎖刪鎖是一個原子性動作了,作為Java程序員這一塊并不作一個簡單要求,并不需要大家過于精通,只需要知道他有什么作用即可。
這里重點介紹Redis提供的調用函數,語法如下:
寫好腳本以后,需要用Redis命令來調用腳本,調用腳本的常見命令如下:
接下來我們來回一下我們釋放鎖的邏輯:
釋放鎖的業務流程是這樣的
1、獲取鎖中的線程標示
2、判斷是否與指定的標示(當前線程標示)一致
3、如果一致則釋放鎖(刪除)
4、如果不一致則什么都不做
如果用Lua腳本來表示則是這樣的:
最終我們操作redis的拿鎖比鎖刪鎖的lua腳本就會變成這樣
-- 這里的 KEYS[1] 就是鎖的key,這里的ARGV[1] 就是當前線程標示
-- 獲取鎖中的標示,判斷是否與當前線程標示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,則刪除鎖return redis.call('DEL', KEYS[1])
end
-- 不一致,則直接返回
return 0
4.8 調用Lua腳本改造分布式鎖
lua腳本本身并不需要大家花費太多時間去研究,只需要知道如何調用,大致是什么意思即可,所以在筆記中并不會詳細的去解釋這些lua表達式的含義。
我們的RedisTemplate中,可以利用execute方法去執行lua腳本,參數對應關系就如下圖股
Java代碼
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}public void unlock() {// 調用lua腳本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
經過以上代碼改造后,我們就能夠實現 拿鎖比鎖刪鎖的原子性動作了~
小總結:
基于Redis的分布式鎖實現思路:
-
利用set nx ex獲取鎖,并設置過期時間,保存線程標示
-
釋放鎖時先判斷線程標示是否與自己一致,一致則刪除鎖
-
特性:
-
利用set nx滿足互斥性
-
利用set ex保證故障時鎖依然能釋放,避免死鎖,提高安全性
-
利用Redis集群保證高可用和高并發特性
-
-
老師總結:我們一路走來,利用添加過期時間,防止死鎖問題的發生,但是有了過期時間之后,可能出現誤刪別人鎖的問題,這個問題我們開始是利用刪之前 通過拿鎖,比鎖,刪鎖這個邏輯來解決的,也就是刪之前判斷一下當前這把鎖是否是屬于自己的,但是現在還有原子性問題,也就是我們沒法保證拿鎖比鎖刪鎖是一個原子性的動作,最后通過lua表達式來解決這個問題
但是目前還剩下一個問題鎖不住,什么是鎖不住呢,你想一想,如果當過期時間到了之后,我們可以給他續期一下,比如續個30s,就好像是網吧上網, 網費到了之后,然后說,來,網管,再給我來10塊的,是不是后邊的問題都不會發生了,那么續期問題怎么解決呢,可以依賴于我們接下來要學習redission啦
測試邏輯:
第一個線程進來,得到了鎖,手動刪除鎖,模擬鎖超時了,其他線程會執行lua來搶鎖,當第一天線程利用lua刪除鎖時,lua能保證他不能刪除他的鎖,第二個線程刪除鎖時,利用lua同樣可以保證不會刪除別人的鎖,同時還能保證原子性。
5、分布式鎖-redission
5.1 redission功能
基于setnx實現的分布式鎖存在下面的問題:
重入問題:重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如HashTable這樣的代碼中,他的方法都是使用synchronized修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的synchronized和Lock鎖都是可重入的。
不可重試:是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。
超時釋放:我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了lua表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患
主從一致性: 如果Redis提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。
Redission:
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。
5.2 Redission入門
引入依賴:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
配置Redisson客戶端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 創建RedissonClient對象return Redisson.create(config);}
}
如何使用Redission的分布式鎖
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//獲取鎖(可重入),指定鎖的名稱RLock lock = redissonClient.getLock("anyLock");//嘗試獲取鎖,參數分別是:獲取鎖的最大等待時間(期間會重試),鎖自動釋放時間,時間單位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判斷獲取鎖成功if(isLock){try{System.out.println("執行業務"); }finally{//釋放鎖lock.unlock();}}}
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;@Override
public Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();//創建鎖對象 這個代碼不用了,因為我們現在要使用分布式鎖//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//獲取鎖對象boolean isLock = lock.tryLock();//加鎖失敗if (!isLock) {return Result.fail("不允許重復下單");}try {//獲取代理對象(事務)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//釋放鎖lock.unlock();}}
注意:下面皆為Redisson原理講解部分
5.3 redission可重入鎖原理
在Lock鎖中,他是借助于底層的一個voaltile的一個state變量來記錄重入的狀態的,比如當前沒有人持有這把鎖,那么state=0,假如有人持有這把鎖,那么state=1,如果持有這把鎖的人再次持有這把鎖,那么state就會+1 ,如果是對于synchronized而言,他在c語言代碼中會有一個count,原理和state類似,也是重入一次就加一,釋放一次就-1 ,直到減少成0 時,表示當前這把鎖沒有被人持有。
在redission中,我們的也支持支持可重入鎖
在分布式鎖中,他采用hash結構用來存儲鎖,其中大key表示表示這把鎖是否存在,用小key表示當前這把鎖被哪個線程持有,所以接下來我們一起分析一下當前的這個lua表達式
這個地方一共有3個參數
KEYS[1] : 鎖名稱
ARGV[1]: 鎖失效時間
ARGV[2]: id + ":" + threadId; 鎖的小key
exists: 判斷數據是否存在 name:是lock是否存在,如果==0,就表示當前這把鎖不存在
redis.call('hset', KEYS[1], ARGV[2], 1);
此時他就開始往redis里邊去寫數據 ,寫成一個hash結構
Lock{id + ":" + threadId : 1}
如果當前這把鎖存在,則第一個條件不滿足,再判斷
redis.call('hexists', KEYS[1], ARGV[2]) == 1
此時需要通過大key+小key判斷當前這把鎖是否是屬于自己的,如果是自己的,則進行
redis.call('hincrby', KEYS[1], ARGV[2], 1)
將當前這個鎖的value進行+1 ,redis.call('pexpire', KEYS[1], ARGV[1]); 然后再對其設置過期時間,如果以上兩個條件都不滿足,則表示當前這把鎖搶鎖失敗,最后返回pttl,即為當前這把鎖的失效時間
如果小伙幫們看了前邊的源碼, 你會發現他會去判斷當前這個方法的返回值是否為null,如果是null,則對應則前兩個if對應的條件,退出搶鎖邏輯,如果返回的不是null,即走了第三個分支,在源碼處會進行while(true)的自旋搶鎖。
"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);"
5.4 redission鎖重試和WatchDog機制
說明:由于課程中已經說明了有關tryLock的源碼解析以及其看門狗原理,所以筆者在這里給大家分析lock()方法的源碼解析,希望大家在學習過程中,能夠掌握更多的知識
搶鎖過程中,獲得當前線程,通過tryAcquire進行搶鎖,該搶鎖邏輯和之前邏輯相同
1、先判斷當前這把鎖是否存在,如果不存在,插入一把鎖,返回null
2、判斷當前這把鎖是否是屬于當前線程,如果是,則返回null
所以如果返回是null,則代表著當前這哥們已經搶鎖完畢,或者可重入完畢,但是如果以上兩個條件都不滿足,則進入到第三個條件,返回的是鎖的失效時間,同學們可以自行往下翻一點點,你能發現有個while( true) 再次進行tryAcquire進行搶鎖
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {return;
}
接下來會有一個條件分支,因為lock方法有重載方法,一個是帶參數,一個是不帶參數,如果帶帶參數傳入的值是-1,如果傳入參數,則leaseTime是他本身,所以如果傳入了參數,此時leaseTime != -1 則會進去搶鎖,搶鎖的邏輯就是之前說的那三個邏輯
if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
如果是沒有傳入時間,則此時也會進行搶鎖, 而且搶鎖時間是默認看門狗時間 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
ttlRemainingFuture.onComplete((ttlRemaining, e) 這句話相當于對以上搶鎖進行了監聽,也就是說當上邊搶鎖完畢后,此方法會被調用,具體調用的邏輯就是去后臺開啟一個線程,進行續約邏輯,也就是看門狗線程
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;
此邏輯就是續約邏輯,注意看commandExecutor.getConnectionManager().newTimeout() 此方法
Method( new TimerTask() {},參數2 ,參數3 )
指的是:通過參數2,參數3 去描述什么時候去做參數1的事情,現在的情況是:10s之后去做參數一的事情
因為鎖的失效時間是30s,當10s之后,此時這個timeTask 就觸發了,他就去進行續約,把當前這把鎖續約成30s,如果操作成功,那么此時就會遞歸調用自己,再重新設置一個timeTask(),于是再過10s后又再設置一個timerTask,完成不停的續約
那么大家可以想一想,假設我們的線程出現了宕機他還會續約嗎?當然不會,因為沒有人再去調用renewExpiration這個方法,所以等到時間之后自然就釋放了。
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
5.5 redission鎖的MutiLock原理
為了提高redis的可用性,我們會搭建集群或者主從,現在以主從為例
此時我們去寫命令,寫在主機上, 主機會將數據同步給從機,但是假設在主機還沒有來得及把數據寫入到從機去的時候,此時主機宕機,哨兵會發現主機宕機,并且選舉一個slave變成master,而此時新的master中實際上并沒有鎖信息,此時鎖信息就已經丟掉了。
為了解決這個問題,redission提出來了MutiLock鎖,使用這把鎖咱們就不使用主從了,每個節點的地位都是一樣的, 這把鎖加鎖的邏輯需要寫入到每一個主叢節點上,只有所有的服務器都寫入成功,此時才是加鎖成功,假設現在某個節點掛了,那么他去獲得鎖的時候,只要有一個節點拿不到,都不能算是加鎖成功,就保證了加鎖的可靠性。
那么MutiLock 加鎖原理是什么呢?筆者畫了一幅圖來說明
當我們去設置了多個鎖時,redission會將多個鎖添加到一個集合中,然后用while循環去不停去嘗試拿鎖,但是會有一個總共的加鎖時間,這個時間是用需要加鎖的個數 * 1500ms ,假設有3個鎖,那么時間就是4500ms,假設在這4500ms內,所有的鎖都加鎖成功, 那么此時才算是加鎖成功,如果在4500ms有線程加鎖失敗,則會再次去進行重試.
大功告成!