現在讓你寫一個Redis分布式鎖
大概率你會先寫一個框架
public Boolean setIfAbsent(String key, Object value,Long timeout) {try {return Boolean.TRUE.equals(objectRedisTemplate.opsForValue().setIfAbsent(key, value,timeout,TimeUnit.SECONDS));} catch (Exception e) {log.error("", e);return false;}}
private void assessInstance(){InitThreadPoolUtil.execute(() -> {while (true) {try {Boolean isMaster = setIfAbsent(RedisKeyConstant.ASSESS_INSTANCE_ONE, "admin", 2 * 60L);logger.error("執行插入===" + isMaster);if (isMaster) {// ...業務代碼略if (redisService.hasKey(RedisKeyConstant.ASSESS_INSTANCE_ONE)) {redisService.del(RedisKeyConstant.ASSESS_INSTANCE_ONE);}}} catch (Exception e) {logger.error("評估規劃發送通知失敗:", e);}try {Thread.sleep(1000 * 60 * 1);} catch (InterruptedException e) {logger.error("線程休眠異常,異常信息為:", e);}}});}
但是這樣就完了嗎?
我們來評審一下此代碼健壯性:
可以看到這是從線程池中取一個線程去執行該業務代碼。那么我給你的場景是處理訂單業務,那么你就會面對高并發情況,若某一刻發起了10個訂單請求,那么就會有10個線程進入while循環。但是有且僅有一個線程會獲取鎖,并執行業務代碼。其他9個線程會一直等待,一旦有鎖釋放,這9個線程會立刻搶鎖。
我們給redis的鎖定義了一個超時時間,某線程獲取鎖后最多使用 10s,然后必須釋放鎖。
此外你還知道執行該業務代碼最多需要10s。等于你上網時間剛清零你本局游戲剛結束。
這樣其他9個線程最多需要10s就可以獲取到鎖。
所以會出現一種現象,A線程獲取到了鎖后,開始執行業務代碼。其他9個線程會一直重試嘗試獲取鎖,累計10s。為了避免頻繁嘗試獲取鎖消耗資源,我們暫時設置線程第一次未獲取鎖后,需要休眠2s才能重新請求獲取鎖。這樣就降低了這9個線程重試請求鎖的頻率。
對于用戶而言,一個用戶的訂單正在處理,其他9個用戶的訂單需要等待10s,推算下來,最后一個用戶的訂單被處理時,已經等待了90s。如果我是用戶,我可不希望等待這么長的時間且無法進行任何操作。
我更希望等待更少的時間,比如20s沒反應,我可以繼續提交訂單。像不像搶演唱會票的過程:進入訂單界面,提交的時候一直轉圈圈,等待5s后顯示訂單提交失敗,然后你會重新提交訂單。
此外,上述代碼還有個局限性:提交了10個訂單,將會有1個線程執行業務代碼,9個線程一直在等待。
執行業務代碼的線程生命周期如下:嘗試獲取鎖—>獲取鎖---->執行業務代碼----->等待被自動回收
等待的線程生命周期如下: 休眠—>嘗試獲取鎖—>休眠---->嘗試獲取鎖—>…
可以發現等待的線程是始終無法被自動回收,除非執行完業務代碼,操作系統才能判斷:該線程已經沒有被使用了,可以自動歸還到線程池。(線程池自動管理線程的生命周期)
對于用戶而言,他等待時間太久。對于系統而言,大量資源被此處占用、消耗。
所以我們必須優化。如何優化呢?
A線程會占用鎖10s,其余9個線程會一直等待。現在我要求,一旦發現6s后,鎖還沒被釋放,等待的線程就退出等待。而用戶就可以重新提交訂單了。
我們來捋一捋:A線程搶到了鎖后,(超時時間也就是等待時間未超過6s)B線程先睡眠2s,再重新獲取鎖失敗,(超時時間也就是等待時間未超過6s)再睡眠2s,重新獲取鎖失敗,(超時時間也就是等待時間未超過6s)再睡眠2s,重新獲取鎖失敗,(超時時間也就是等待時間超過6s),不再嘗試獲取鎖,返回信息:訂單提交失敗。
推理下來,一個用戶最多等待10s,變成了最多等待6s。那么10個訂單同時提交而最后一個用戶只需等待50s。想要再縮短等待時間,可以將超時時間從6s縮短到2s,這樣10個訂單同時提交而最后一個用戶只需等待18s。
當然你也可以將業務處理時間優化,這里不討論。
代碼如下
private void assessInstance(){// 初始時間long startTime = System.currentTimeMillis();InitThreadPoolUtil.execute(() -> {while (true) {try {Boolean isMaster = setIfAbsent(RedisKeyConstant.ASSESS_INSTANCE_ONE, "admin", 2 * 60L);logger.error("執行插入===" + isMaster);if (isMaster) {// ...業務代碼略// 嘗試超過了設定值之后直接跳出循環,避免上新鎖時間過長// 例如A線程上新鎖,花費了10s,這10s內B線程無法獲取鎖,就會一直在循環里重試,設置超時時間為2s,// 一旦B線程重試超過2s就退出循環且生命周期結束。if (System.currentTimeMillis() - startTime > timeout) {return false;}if (redisService.hasKey(RedisKeyConstant.ASSESS_INSTANCE_ONE)) {redisService.del(RedisKeyConstant.ASSESS_INSTANCE_ONE);}}} catch (Exception e) {logger.error("評估規劃發送通知失敗:", e);}try {Thread.sleep(1000 * 60 * 1);} catch (InterruptedException e) {logger.error("線程休眠異常,異常信息為:", e);}}});}
這是針對高并發場景下以上代碼實現Redis鎖的問題。有些場景下使用上述代碼完全沒問題。
例如服務啟動后,需要初始化一些數據。單機環境只會執行一次初始化數據,什么都不需要考慮。
若是集群模式,有三個機子。當然只能一臺leader機子執行一次初始化數據,其余2個機子不需要執行初始化數據,所以必須上分布式鎖,且不存在高并發場景。
上述的代碼直接使用了redis的一些原生api,我們嘗試將其封裝一層供自己使用
/*** 全局鎖,包括鎖的名稱*/
public class Lock {private String name;private String value;public Lock(String name, String value) {this.name = name;this.value = value;}public String getName() {return name;}public String getValue() {return value;}}
搞一個redis分布式鎖的工具類
import com.sun.org.slf4j.internal.Logger;
import com.sun.org.slf4j.internal.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.concurrent.TimeUnit;/*** 分布式鎖*/@Component
public class DistributedLockHandler {private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class);/*** 單個業務持有鎖的時間30s,防止死鎖*/private final static long LOCK_EXPIRE = 30 * 1000L;/*** 默認30ms嘗試一次*/private final static long LOCK_TRY_INTERVAL = 30L;/*** 默認嘗試20s*/private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;@Autowiredprivate StringRedisTemplate template;/*** 嘗試獲取全局鎖** @param lock 鎖的名稱* @return true 獲取成功,false獲取失敗*/public boolean tryLock(Lock lock){return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);}/*** 嘗試獲取全局鎖** @param lock 鎖的名稱* @param timeout 獲取超時時間 單位ms* @return true 獲取成功,false獲取失敗*/public boolean tryLock(Lock lock, long timeout) {return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);}/*** 嘗試獲取全局鎖** @param lock 鎖的名稱* @param timeout 獲取鎖的超時時間* @param tryInterval 多少毫秒嘗試獲取一次* @param lockExpireTime 鎖的過期* @return true 獲取成功,false獲取失敗*/public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {return getLock(lock, timeout, tryInterval, lockExpireTime);}/*** 操作redis獲取全局鎖** @param lock 鎖的名稱* @param timeout 獲取的超時時間* @param tryInterval 多少ms嘗試一次* @param lockExpireTime 獲取成功后鎖的過期時間* @return true 獲取成功,false獲取失敗*/public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime){// 1. 鎖名不為空if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {return false;}// 2. 系統時間long startTime = System.currentTimeMillis();try{do{// 不存在鎖,上新鎖if (!template.hasKey(lock.getName())) {ValueOperations<String, String> ops = template.opsForValue();ops.setIfAbsent(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS);return true;} else {//已存在鎖logger.error("lock is exist!!!");}// 嘗試超過了設定值之后直接跳出循環,避免上新鎖時間過長// 例如A線程上新鎖,花費了10s,這10s內B線程無法獲取鎖,就會一直在循環里重試,設置超時時間為3s,一旦B線程重試超過3s就退出循環且生命周期結束。if (System.currentTimeMillis() - startTime > timeout) {return false;}// A線程剛獲取了鎖,B線程等待A線程釋放鎖Thread.sleep(tryInterval);}while(template.hasKey(lock.getName())); // 3. redis中是否存在鎖}catch (Exception e){logger.error(e.getMessage());return false;}return false;}/*** 釋放鎖*/public void releaseLock(Lock lock){if (!StringUtils.isEmpty(lock.getName())) {template.delete(lock.getName());}}}
測試代碼,可以看到這是我們自己封裝的最終效果
@RestController
public class testDemo {@Autowiredprivate DistributedLockHandler distributedLockHandler;@RequestMapping("/index")public void index(){Lock lock=new Lock("lynn","min");if (distributedLockHandler.tryLock(lock)) {// 1. 成功獲取鎖try {//為了演示鎖的效果,這里睡眠5000毫秒System.out.println("執行方法");Thread.sleep(5000);}catch (Exception e){e.printStackTrace();}// 2. 釋放鎖distributedLockHandler.releaseLock(lock);}}}
以上結合業務場景探討了實現Redis分布式鎖時,為何使用線程休眠,超時時間,以及針對超時時間的一些優化方案。
接下來引入一個新的問題:
若定義鎖的過期時間是10s,此時A線程獲取了鎖然后執行業務代碼,但是業務代碼消耗時間花費了15s。這就會導致A線程還沒有執行完業務代碼,A線程卻釋放了鎖(因為10s到了),第11s B線程發現鎖已經釋放,重新獲取鎖也開始執行業務代碼。
此時多個線程同時執行業務代碼,我們使用鎖就是為了保證僅有一個線程執行這一塊業務代碼,說明這個鎖是失效的!
如何處理這個情況,涉及到了鎖延期操作,下一篇文章指出!