/*** 搶購秒殺券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查詢秒殺券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判斷秒殺券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒殺券的開始時間在當前時間之后return Result.fail("秒殺尚未開始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒殺券的結束時間在當前時間之前return Result.fail("秒殺已結束");}if (voucher.getStock() < 1) {return Result.fail("秒殺券已搶空");}// 5、秒殺券合法,則秒殺券搶購成功,秒殺券庫存數量減一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).setSql("stock = stock -1"));if (!flag){throw new RuntimeException("秒殺券扣減失敗");}// 6、秒殺成功,創建對應的訂單,并保存到數據庫VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag){throw new RuntimeException("創建秒殺券訂單失敗");}// 返回訂單idreturn Result.ok(orderId);}
這個代碼的邏輯:
先是查詢優惠卷 判斷是否開秒殺 沒有 返回異常結果
如果已經開始,判斷是否剩余優惠卷 沒有返回異常?
有則扣減庫存 創建訂單 返回訂單id
出現問題
假設線程1過來查詢庫存,判斷出來庫存大于1,正準備去扣減庫存,但是還沒有來得及去扣減,此時線程2過來,線程2也去查詢庫存,發現這個數量一定也大于1,那么這兩個線程都會去扣減庫存,最終多個線程相當于一起去扣減庫存,此時就會出現庫存的超賣問題。超賣問題是典型的多線程安全問題,針對這一問題的常見解決方案就是加鎖
鎖有兩種:樂觀鎖,悲觀鎖
悲觀鎖:認為線程安全問題一定會發生,因此操作數據庫之前都需要先獲取鎖,確保線程串行執行。?synchronized
、lock
樂觀鎖:認為線程安全問題不一定發生,因此不加鎖,只會在更新數據庫的時候去判斷有沒有其它線程對數據進行修改,如果沒有修改則認為是安全的,直接更新數據庫中的數據即可,如果修改了則說明不安全,直接拋異常或者等待重試。
樂觀鎖之版本號法:會有一個版本號,每次操作數據會對版本號+1,再提交回數據時,會去校驗是否比之前的版本大1 ,如果大1 ,則進行操作成功,這套機制的核心邏輯在于,如果在操作過程中,版本號只比原來大1 ,那么就意味著操作過程中沒有人對他進行過修改,他的操作就是安全的,如果不大1,則數據被修改過,當然樂觀鎖還有一些變種的處理方式比如cas
樂觀鎖解決超賣問題:一人一單邏輯
@Override
public Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}// 5.一人一單邏輯// 5.1.用戶idLong userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}//6,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}//7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}
加入一人一單的邏輯,還是會有錯誤,并發過來,查詢數據庫,都不存在訂單。在高并發場景下,多個請求可能同時執行數據庫查詢操作,此時它們都會發現?count == 0
(即沒有訂單)。然后這些請求都會繼續執行扣減庫存和創建訂單的邏輯,導致多個訂單被創建。“一人一單”邏輯是通過查詢數據庫來判斷是否已經存在訂單,但查詢操作和插入操作之間沒有加鎖,導致多個請求可以同時通過檢查。
解決辦法:加鎖。樂觀鎖比較適合更新數據,而現在是插入數據,所以我們需要使用悲觀鎖操作
超賣問題:synchronized 不斷完善
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);
}
在方法上加一個悲觀鎖 ,但是這樣添加鎖,鎖的粒度太粗了,在使用鎖過程中,控制鎖粒度 是一個非常重要的事情,因為如果鎖的粒度太大,會導致每個線程進來都會鎖住,所以我們需要去控制鎖的粒度。
intern() 這個方法是從常量池中拿到數據,如果我們直接使用userId.toString() 他拿到的對象實際上是不同的對象,new出來的對象,我們使用鎖必須保證鎖必須是同一把,所以我們需要使用intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}
詳細寫一下這個問題: 這個方法當前被spring的事務管理控制,在事務提交之前,所有的操作都是在內存中進行的臨時操作,數據庫的實際數據并沒有被修改。
如果在方法內部加鎖,鎖的釋放是在方法執行結束時(即?synchronized
?代碼塊結束時),而事務的提交是在鎖釋放之后。如果鎖釋放后,事務還未提交,其他線程可能會進入?synchronized
?代碼塊,從而導致并發問題
如何解決這個問題?
我們選擇將當前方法整體包裹起來,確保事務不會出現問題
在seckillVoucher方法中,添加以下邏輯,包裝事務特性,控制粒度
public Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 在方法外部加鎖synchronized (userId.toString().intern()) {return createVoucherOrderTransactional(userId, voucherId);}
}@Transactional
public Result createVoucherOrderTransactional(Long userId, Long voucherId) {// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);
}
但是以上做法依然有問題,因為你調用的方法,其實是this.的方式調用的,事務想要生效,還得利用代理來生效
在 Spring 中,事務管理是通過?AOP(面向切面編程)?實現的。具體來說:
-
Spring 會為被?
@Transactional
?注解標記的方法生成一個代理對象。 -
當調用被?
@Transactional
?注解的方法時,實際上是通過代理對象調用的,代理對象會在方法執行前后添加事務管理的邏輯(如開啟事務、提交事務、回滾事務等)。
然而,如果你在同一個類中直接調用被?@Transactional
?注解的方法(例如通過?this.method()
?調用),那么 Spring 的代理機制會失效,事務也不會生效。
為了解決這個問題,需要確保被?@Transactional
?注解的方法是通過代理對象調用的,而不是直接調用。
@Service
public class VoucherOrderService extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIdWorker redisIdWorker;public Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 在方法外部加鎖synchronized (userId.toString().intern()) {// 獲取當前對象的代理對象IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();// 通過代理對象調用事務方法return proxy.createVoucherOrderTransactional(userId, voucherId);}}@Transactionalpublic Result createVoucherOrderTransactional(Long userId, Long voucherId) {// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}
這樣就解決了當你在同一個類中直接調用方法時,調用是通過當前對象(this
)進行的,而不是通過代理對象的這個問題了。
集群下的超賣問題:分布式鎖(redis)
由于現在我們部署了多個tomcat,每個tomcat都有一個屬于自己的jvm,那么假設在服務器A的tomcat內部,有兩個線程,這兩個線程由于使用的是同一份代碼,那么他們的鎖對象是同一個,是可以實現互斥的,但是如果現在是服務器B的tomcat內部,又有兩個線程,但是他們的鎖對象寫的雖然和服務器A一樣,但是鎖對象卻不是同一個,所以線程3和線程4可以實現互斥,但是卻無法和線程1和線程2實現互斥,這就是 集群環境下,syn鎖失效的原因,在這種情況下,我們就需要使用分布式鎖來解決這個問題。
分布式鎖他應該滿足一些什么樣的條件
可見性:多個線程都能看到相同的結果,注意:這個地方說的可見性并不是并發編程中指的內存可見性,只是說多個進程之間都能感知到變化的意思
互斥:互斥是分布式鎖的最基本的條件,使得程序串行執行
高可用:程序不易崩潰,時時刻刻都保證較高的可用性
高性能:由于加鎖本身就讓性能降低,所有對于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能
安全性:安全也是程序中必不可少的一環
常見的分布式鎖有三種
Mysql:mysql本身就帶有鎖機制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實使用mysql作為分布式鎖比較少見
Redis:redis作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖
Zookeeper:zookeeper也是企業級開發中較好的一個實現分布式鎖的方案,由于本套視頻并不講解zookeeper的原理和分布式鎖的實現,所以不過多闡述
實現分布式鎖時需要實現的兩個基本方法:
-
獲取鎖:
-
互斥:確保只能有一個線程獲取鎖
-
非阻塞:嘗試一次,成功返回true,失敗返回false
-
-
釋放鎖:
-
手動釋放
-
超時釋放:獲取鎖時添加一個超時時間?
-
分布式鎖版本一?
利用setnx方法進行加鎖,同時增加過期時間,防止死鎖,此方法可以保證加鎖和增加過期時間具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {// 獲取線程標示String threadId = Thread.currentThread().getId()// 獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
釋放鎖?
public void unlock() {//通過del刪除鎖stringRedisTemplate.delete(KEY_PREFIX + name);
}
修改業務代碼
@Overridepublic Result seckillVoucher(Long voucherId) {// 1.查詢優惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判斷秒殺是否開始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺尚未開始!");}// 3.判斷秒殺是否已經結束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未開始return Result.fail("秒殺已經結束!");}// 4.判斷庫存是否充足if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}Long userId = UserHolder.getUser().getId();//創建鎖對象SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if (!isLock) {//獲取鎖失敗 返回錯誤return Result.fail("禁止重復下單");}try {//獲取代理對象(事務)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} catch (IllegalStateException e) {throw new RuntimeException(e);} finally {//解鎖lock.unlock();}}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//一人一單Long userId = UserHolder.getUser().getId();//查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();//if (count > 0) {log.error("用戶已經購買過該優惠券");}//5,扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock -1") //set stock = stock - 1.eq("voucher_id", voucherOrder).gt("stock", 0)//where id= ? and stock > 0;.update();if (!success) {//扣減庫存log.info("庫存不足!");}//6.創建訂單save(voucherOrder);}
出現問題:誤刪鎖
持有鎖的線程在鎖的內部出現了阻塞,導致他的鎖自動釋放,這時其他線程,線程2來嘗試獲得鎖,就拿到了這把鎖,然后線程2在持有鎖執行過程中,線程1反應過來,繼續執行,而線程1執行過程中,走到了刪除鎖邏輯,此時就會把本應該屬于線程2的鎖進行刪除,這就是誤刪別人鎖的情況。
解決方案:解決方案就是在每個線程釋放鎖的時候,去判斷一下當前這把鎖是否屬于自己,如果屬于自己,則不進行鎖的刪除,假設還是上邊的情況,線程1卡頓,鎖自動釋放,線程2進入到鎖的內部執行邏輯,此時線程1反應過來,然后刪除鎖,但是線程1,一看當前這把鎖不是屬于自己,于是不進行刪除鎖邏輯,當線程2走到刪除鎖邏輯時,如果沒有卡過自動釋放鎖的時間點,則判斷當前這把鎖是屬于自己的,于是刪除這把鎖。
分布式鎖版本二:防止誤刪鎖
添加鎖
@Overridepublic boolean tryLock(long timeOutSec) {//獲取線程標示String threadId = ID_PREFIX + Thread.currentThread().getId();//獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);//自動拆箱 可能有空指針的安全風險return Boolean.TRUE.equals(success);}
釋放鎖
public void unlock() {//獲取線程標識String threadID = ID_PREFIX + Thread.currentThread().getName();//判斷標示是否一致String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (threadID.equals(id)) {//釋放鎖stringRedisTemplate.delete(KEY_PREFIX + name);}}
這個優化主要解決了釋放鎖時的原子性問題。說到底也是鎖超時釋放的問題(業務代碼不變)
分布式鎖的原子性問題
線程1現在持有鎖之后,在執行業務邏輯過程中,他正準備刪除鎖,而且已經走到了條件判斷的過程中,比如他已經拿到了當前這把鎖確實是屬于他自己的,正準備刪除鎖,但是此時他的鎖到期了,那么此時線程2進來,但是線程1他會接著往后執行,當他卡頓結束后,他直接就會執行刪除鎖那行代碼,相當于條件判斷并沒有起到作用(已經確定鎖是自己的鎖了,于是直接就刪除了鎖,結果刪的是線程2的鎖),這就是刪鎖時的原子性問題,之所以有這個問題,是因為線程1的拿鎖,比鎖,刪鎖,實際上并不是原子性的,我們要防止剛才的情況發生
根本原因是?判斷鎖和刪除鎖的操作不是原子性的。
分布式鎖版本三:Lua腳本
Redis提供了Lua腳本功能,在一個腳本中編寫多條Redis命令,確保多條命令執行時的原子性。
釋放鎖的業務流程是這樣的
? 1、獲取鎖中的線程標示
? 2、判斷是否與指定的標示(當前線程標示)一致
? 3、如果一致則釋放鎖(刪除)
? 4、如果不一致則什么都不做
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;// 加載lua腳本static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeOutSec) {//獲取線程標示String threadId = ID_PREFIX + Thread.currentThread().getId();//獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeOutSec, TimeUnit.SECONDS);//自動拆箱 可能有空指針的安全風險return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//基于lua腳本 提前讀取腳本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId());}
}
-- 創建線程標示與鎖中的標示是否一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 釋放鎖del keyreturn redis.call('DEL', KEYS[1])
end
return 0
業務代碼還是不用動 只是修改了刪除鎖的部分
Redission? ? 《白雪》
基于setnx實現的分布式鎖存在下面的問題:
重入問題:重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如HashTable這樣的代碼中,他的方法都是使用synchronized修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的synchronized和Lock鎖都是可重入的。
不可重試:是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。
超時釋放:我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了lua表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患
主從一致性: 如果Redis提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。
Redisson實現分布式鎖
(1)依賴
(2)配置
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.100.128:6379").setPassword("123456");// 創建RedissonClient對象return Redisson.create(config);}
}
?此外還有一種引入方式,可以引入 redission 的 starter 依賴,然后在yml文件中配置Redisson,但是不推薦這種方式,因為他會替換掉 Spring官方 提供的這套對 Redisson 的配置
業務
tryLock():它會使用默認的超時時間和等待機制。具體的超時時間是由 Redisson 配置文件或者自定義配置決定的。
tryLock(long time, TimeUnit unit):它會在指定的時間內嘗試獲取鎖(等待time后重試),如果獲取成功則返回 true,表示獲取到了鎖;如果在指定時間內(Redisson內部默認指定的)未能獲取到鎖,則返回 false。
tryLock(long waitTime, long leaseTime, TimeUnit unit):指定等待時間為watiTime,如果超過 leaseTime 后還沒有獲取鎖就直接返回失敗