1.Redisson公平鎖RedissonFairLock概述
(1)非公平和公平的可重入鎖
一.非公平可重入鎖
鎖被釋放后,排隊獲取鎖的線程會重新無序獲取鎖,沒有任何順序性可言。
二.公平可重入鎖
鎖被釋放后,排隊獲取鎖的線程會按照請求獲取鎖時候的順序去獲取鎖。公平鎖可以保證線程獲取鎖的順序,與其請求獲取鎖的順序是一樣的。也就是誰先申請獲取到這把鎖,誰就可以先獲取到這把鎖。公平可重入鎖會把各個線程的加鎖請求進行排隊處理,保證先申請獲取鎖的線程,可以優先獲取鎖,從而實現所謂的公平性。
三.可重入的非公平鎖和公平鎖不同點
可重入的非公平鎖和公平鎖,在整體的技術實現框架上都是一樣的。唯一的不同點就是加鎖和解鎖的邏輯不一樣。非公平鎖的加鎖邏輯,比較簡單。公平鎖的加鎖邏輯,要加入排隊機制,保證各個線程排隊能按順序獲取鎖。
(2)Redisson公平鎖的簡單使用
Redisson的可重入鎖RedissonLock指的是非公平可重入鎖,Redisson的公平鎖RedissonFairLock指的是公平可重入鎖。
Redisson的公平可重入鎖實現了java.util.concurrent.locks.Lock接口,保證了當多個線程同時請求加鎖時,優先分配給先發出請求的線程。所有請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒之后才會繼續分配下一個線程。
RedissonFairLock是RedissonLock的子類。RedissonFairLock的鎖實現框架,和RedissonLock基本一樣。而在獲取鎖和釋放鎖的lua腳本中,RedissonFairLock的邏輯才有所區別。
//1.最常見的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//2.10秒鐘以后自動解鎖,無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);//3.嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();//4.Redisson為公平的可重入鎖提供了異步執行的相關方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
(3)Redisson公平鎖的初始化
public class RedissonDemo {public static void main(String[] args) throws Exception {...//創建RedissonClient實例RedissonClient redisson = Redisson.create(config);//獲取公平的可重入鎖RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();//加鎖fairLock.unlock();//釋放鎖}
}public class Redisson implements RedissonClient {//Redis的連接管理器,封裝了一個Config實例protected final ConnectionManager connectionManager;//Redis的命令執行器,封裝了一個ConnectionManager實例protected final CommandAsyncExecutor commandExecutor;...protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//初始化Redis的連接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);... //初始化Redis的命令執行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}public RLock getFairLock(String name) {return new RedissonFairLock(commandExecutor, name);}...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;...public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;...}...
}public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;final CommandAsyncExecutor commandExecutor;...public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//與WatchDog有關的internalLockLeaseTime//通過命令執行器CommandExecutor可以獲取連接管理器ConnectionManager//通過連接管理器ConnectionManager可以獲取Redis的配置信息類Config//通過Redis的配置信息類Config可以獲取lockWatchdogTimeout超時時間this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();...}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...protected long internalLockLeaseTime;final String id;final String entryName;final CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();//獲取UUIDthis.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;}...
}abstract class RedissonExpirable extends RedissonObject implements RExpirable {RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {super(connectionManager, name);}...
}public abstract class RedissonObject implements RObject {protected final CommandAsyncExecutor commandExecutor;protected String name;protected final Codec codec;public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);}public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {this.codec = codec;this.commandExecutor = commandExecutor;if (name == null) {throw new NullPointerException("name can't be null");}setName(name);}...
}public class ConfigSupport {...//創建Redis的連接管理器public static ConnectionManager createConnectionManager(Config configCopy) {//生成UUIDUUID id = UUID.randomUUID();...if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());//返回ClusterConnectionManager實例return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);}...}...
}public class ClusterConnectionManager extends MasterSlaveConnectionManager {public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...}...
}public class MasterSlaveConnectionManager implements ConnectionManager {protected final String id;//初始化時為UUIDprivate final Config cfg;protected Codec codec;...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id = id.toString();//傳入的是UUID...this.cfg = cfg;this.codec = cfg.getCodec();...}public String getId() {return id;}public Codec getCodec() {return codec;}...
}
2.公平鎖源碼之加鎖和排隊
(1)加鎖時的執行流程
使用Redisson的公平鎖RedissonFairLock進行加鎖時:首先調用的是RedissonLock的lock()方法,然后會調用RedissonLock的tryAcquire()方法,接著會調用RedissonLock的tryAcquireAsync()方法。
在RedissonLock的tryAcquireAsync()方法中,會調用一個可以被RedissonLock子類重載的tryLockInnerAsync()方法。對于非公平鎖,執行到這會調用RedissonLock的tryLockInnerAsync()方法。對于公平鎖,執行到這會調用RedissonFairLock的tryLockInnerAsync()方法。
在RedissonFairLock的tryLockInnerAsync()方法中,便執行具體的lua腳本。
public class RedissonDemo {public static void main(String[] args) throws Exception {...//創建RedissonClient實例RedissonClient redisson = Redisson.create(config);//獲取公平的可重入鎖RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();//加鎖fairLock.unlock();//釋放鎖}
}public class RedissonLock extends RedissonBaseLock {...//不帶參數的加鎖public void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}//帶參數的加鎖public void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//加鎖成功if (ttl == null) {return;}//加鎖失敗...}private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//非公平鎖,接下來調用的是RedissonLock.tryLockInnerAsync()方法//公平鎖,接下來調用的是RedissonFairLock.tryLockInnerAsync()方法ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}//對RFuture<Long>類型的ttlRemainingFuture添加回調監聽CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {//tryLockInnerAsync()里的加鎖lua腳本異步執行完畢,會回調如下方法邏輯://加鎖成功if (ttlRemaining == null) {if (leaseTime != -1) {//如果傳入的leaseTime不是-1,也就是指定鎖的過期時間,那么就不創建定時調度任務internalLockLeaseTime = unit.toMillis(leaseTime);} else {//創建定時調度任務scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;//線程可以等待鎖的時間private final CommandAsyncExecutor commandExecutor;private final String threadsQueueName;private final String timeoutSetName;public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);//傳入60秒*5=5分鐘}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;threadsQueueName = prefixName("redisson_lock_queue", name);timeoutSetName = prefixName("redisson_lock_timeout", name);}...@Override<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {long wait = threadWaitTime;if (waitTime != -1) {//將傳入的指定的獲取鎖等待時間賦值給wait變量wait = unit.toMillis(waitTime);} ...if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//步驟一:remove stale threads,移除等待超時的線程"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//remove the item from the queue and timeout set NOTE we do not alter any other timeout//從有序集合 + 隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//check if the lock can be acquired now//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對線程排隊的隊列;//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID + ThreadID;"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步驟三:當前線程執行獲取鎖的操作//remove this thread from the queue and timeout set//彈出隊列的第一個元素 + 從有序集合中刪除UUID:ThreadID對應的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//遞減有序集合中每個線程的分數,也就是遞減每個線程獲取鎖時的已經等待時間//zrange返回有序集合KEYS[3]中指定區間內(0,-1)的成員,也就是全部成員"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//對有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])//ARGV[3]就是線程獲取鎖時可以等待的時間,默認是5分鐘"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset設置Hash值進行加鎖操作 + pexpire設置鎖key的過期時間 + 最后返回nil表示加鎖成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//check if the lock is already held, and this is a re-entry(可重入鎖)//步驟四:判斷鎖是否已經被當前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當前線程的UUID + ThreadID;"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//the lock cannot be acquired, check if the thread is already in the queue//步驟五:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊//KEYS[3]是對線程排序的有序集合,ARGV[2]是當前線程的UUID + ThreadID;"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +//the real timeout is the timeout of the prior thread in the queue, //but this is approximately correct, and avoids having to traverse the queue//如果當前獲取鎖失敗的線程已經在隊列中排隊//那么就返回該線程等待獲取鎖時,還剩多少時間就超時了,外部代碼拿到這個時間會阻塞等待這個時間//ARGV[3]是當前線程獲取鎖時可以等待的時間,ARGV[4]是當前時間"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +//add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of//the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime//步驟六:對獲取鎖失敗的線程進行排隊處理"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +//如果在隊列中排隊的最后一個元素不是當前線程"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在隊列中排最后的線程,ARGV[2]是當前線程的UUID+線程ID,ARGV[4]是當前時間//因為擁有最大過期時間的線程在隊列中是排最后的//所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間//從而保證新加入隊列和有序集合的線程的過期時間是最大的//下面這一行會計算出:還有多少時間,當前隊列中排最后的線程就會過期,外部代碼拿到這個時間會阻塞等待這個時間//這樣后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +//下面這一行會計算出:還有多少時間,鎖就會過期,外部代碼拿到這個時間會阻塞等待這個時間"ttl = redis.call('pttl', KEYS[1]);" +"end;" +//計算當前線程在排隊等待鎖時的過期時間"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間//然后再把當前線程作為一個元素插入隊列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime),getLockName(threadId),wait,currentTime);}...}...
}
(2)獲取公平鎖的lua腳本相關參數說明
KEYS[1]是getRawName(),它是一個Hash數據結構的key,也就是鎖的名字,比如"myLock"。
KEYS[2]是threadsQueueName,它是一個用來對線程排隊的隊列的名字,多個客戶端線程申請獲取鎖時,會到這個隊列里進行排隊。比如"redisson_lock_queue:{myLock}"。
KEYS[3]是timeoutSetName,它是一個用來對線程排序的有序集合的名字,這個有序集合可以自動按照每個數據指定的分數進行排序。比如"redisson_lock_timeout:{myLock}"。
ARGV[1]是leaseTime,代表鎖的過期時間。如果leaseTime沒有指定,默認就是internalLockLeaseTime = 30秒。
ARGV[2]是getLockName(threadId),代表客戶端UUID + 線程ID。
ARGV[3]是threadWaitTime,代表線程可以等待的時間(默認5分鐘)。
ARGV[4]是currentTime,代表當前時間。
(3)lua腳本步驟一:進入while循環移除隊列和有序集合中等待超時的線程
while循環中首先執行命令:"lindex redisson_lock_queue:{myLock} 0",也就是獲取"redisson_lock_queue:{myLock}"這個隊列中的第一個元素。一開始該隊列是空的,所以什么都獲取不到,firstThreadId2為false。此時就會break掉,退出while循環。
如果獲取到隊列中的第一個元素,那么就會執行zscore命令:從有序集合中獲取該元素對應的分數,也就是該元素對應線程的過期時間。如果過期時間比當前時間小,那么就要從隊列和有序集合中移除該元素。否則,也會break掉,退出while循環。
//步驟一:remove stale threads,移除等待超時的線程
"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待鎖超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//remove the item from the queue and timeout set NOTE we do not alter any other timeout//從有序集合+隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +
"end;" +
(4)lua腳本步驟二:判斷當前線程能否獲取鎖
判斷條件一:
首先執行命令"exists myLock",判斷鎖是否存在。一開始沒有線程加過鎖,所以判斷條件肯定是成立的,該條件為true。
判斷條件二:
接著執行命令"exists redisson_lock_queue:{myLock}",看隊列是否存在。一開始也沒有這個隊列,所以這個條件也肯定成立,該條件為true。
判斷條件三:
如果有這個隊列,則判斷隊列存在的條件不成立,執行"或"后面的判斷。也就是執行命令"lindex redisson_lock_queue:{myLock} 0",判斷隊列的第一個元素是否是當前線程的UUID + ThreadID。
//check if the lock can be acquired now
//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖
//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +...
"end;" +
總結當前線程現在可以嘗試獲取鎖的情況如下:
情況一:鎖不存在 + 隊列也不存在
情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程
(5)lua腳本步驟三:執行獲取鎖的操作
當判斷現在能否嘗試獲取鎖的條件通過后,便會執行如下操作:
步驟一:執行命令"lpop redisson_lock_queue:{myLock}",彈出隊列第一個元素。一開始該隊列是空的,所以該命令不會進行處理。接著執行命令"zrem redisson_lock_timeout:{myLock} UUID1:ThreadID1",也就是從有序集合中刪除UUID1:ThreadID1對應的元素。一開始該有序集合也是空的,所以該命令不會進行處理。
步驟二:執行命令"hset myLock UUID1:ThreadID1 1",進行加鎖操作。在設置key為myLock的Hash值中,field為UUID1:ThreadID1的value值為1。接著執行命令"pexpire myLock 30000",設置鎖key的過期時間為30秒。
最后返回nil,這樣在外層代碼中,就會認為加鎖成功。于是就會創建一個WatchDog看門狗定時調度任務,10秒后對鎖進行檢查。如果檢查發現當前線程還持有這個鎖,那么就重置鎖key的過期時間為30秒,并且重新創建一個WatchDog看門狗定時調度任務在10秒后繼續進行檢查。
//check if the lock can be acquired now
//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖
//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對線程排隊的隊列;
//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步驟三:當前線程執行獲取鎖的操作//remove this thread from the queue and timeout set//彈出隊列的第一個元素 + 從有序集合中刪除UUID:ThreadID對應的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//遞減有序集合中每個線程的分數,也就是遞減每個線程獲取鎖時的已經等待時間//zrange返回有序集合KEYS[3]中指定區間內(0,-1)的成員,也就是全部成員"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//對有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])//ARGV[3]就是線程獲取鎖時可以等待的時間,默認是5分鐘"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset設置Hash值進行加鎖操作 + pexpire設置鎖key的過期時間 + 最后返回nil表示加鎖成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +
(6)lua腳本步驟四:判斷鎖是否已經被當前線程持有(可重入鎖)
此時會執行命令"hexists myLock UUID:ThreadID"。如果判斷條件通過,則說明是持有鎖的線程對鎖進行了重入。于是會執行命令"hincrby myLock UUID:ThreadID 1",對key為鎖名的Hash值中,field為UUID + 線程ID的value值累加1。并且執行命令"pexpire myLock 300000"重置鎖key的過期時間。最后返回nil,表示重入加鎖成功。
//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經被當前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當前線程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +
(7)lua腳本步驟五:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊
通過執行命令"zscore redisson_lock_timeout:{myLock} UUID:ThreadID",獲取當前線程在有序集合中的對應的分數,也就是過期時間。如果獲取成功則返回:當前線程等待獲取鎖的超時時間還剩多少,外部代碼拿到這個時間會阻塞等待這個時間。
//the lock cannot be acquired, check if the thread is already in the queue
//步驟五:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊
//KEYS[3]是對線程排序的有序集合,ARGV[2]是當前線程的UUID+ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +//the real timeout is the timeout of the prior thread in the queue, //but this is approximately correct, and avoids having to traverse the queue//如果當前獲取鎖失敗的線程已經在隊列中排隊//那么就返回該線程等待獲取鎖時,還剩多少時間就超時了,外部代碼拿到這個時間會阻塞等待這個時間//ARGV[3]是當前線程獲取鎖時可以等待的時間,ARGV[4]是當前時間"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
(8)lua腳本步驟六:對獲取鎖失敗的線程進行排隊
首先獲取隊列中的最后一個元素。因為擁有最大過期時間的線程在隊列中是排最后的,所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間。從而保證新加入隊列和有序集合的線程的過期時間是最大的。然后獲取鎖或者隊列中排最后的線程剩余的存活時間,接著計算當前線程在排隊等待鎖時的過期時間。
然后把當前線程作為一個元素插入有序集合,并設置有序集合中該元素的分數為該線程在排隊等待鎖時的過期時間,接著再把當前線程作為一個元素插入隊列尾部。
最后返回鎖或者隊列中排第一的線程剩余的存活時間ttl給外層代碼。如果外層代碼拿到的返回值是非null,那么客戶端會進入一個while循環。在while循環會每阻塞等待ttl時間再嘗試去進行加鎖,重新執行lua腳本。
如果隊列里沒有元素,那么第一個加入隊列的線程,會阻塞等待鎖的過期時間。如果隊列里有元素,那么后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間。
//步驟六:對獲取鎖失敗的線程進行排隊處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊列中排隊的最后一個元素不是當前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在隊列中排最后的線程,ARGV[2]是當前線程的UUID + 線程ID,ARGV[4]是當前時間//因為擁有最大過期時間的線程在隊列中是排最后的//所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間//從而保證新加入隊列和有序集合的線程的過期時間是最大的//下面這一行會計算出:還有多少時間,當前隊列中排最后的線程就會過期,外部代碼拿到這個時間會阻塞等待這個時間//這樣后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面這一行會計算出:還有多少時間,鎖就會過期,外部代碼拿到這個時間會阻塞等待這個時間"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計算當前線程在排隊等待鎖時的過期時間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間
//然后再把當前線程作為一個元素插入隊列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
(9)獲取鎖失敗的第一個線程執行lua腳本的流程
公平鎖的核心在于申請加鎖時,加鎖失敗的各個客戶端會排隊。之后鎖被釋放時,會依次獲取鎖,從而實現公平性。
假設此時第一個客戶端線程已加鎖成功,第二個客戶端線程也來嘗試加鎖,那么會進行如下排隊處理。
步驟一:進入while循環,移除等待超時的線程。執行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊列排第一元素。由于此時隊列還是空的,所以獲取到的是false,于是退出while循環。
步驟二:判斷當前線程現在能否嘗試獲取鎖。因為執行命令"exists myLock",發現鎖已經存在了,于是判斷不通過。
步驟三:判斷鎖是否已經被當前線程持有,由于第二個客戶端線程的UUID + 線程ID必然不等于第一個客戶端線程。所以此時執行命令"hexists myLock UUID2:ThreadID2",發現不存在。所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊。由于當前線程是第一個獲取鎖失敗的線程,所以判斷不通過。
步驟五:接下來進行排隊處理。
//對獲取鎖失敗的線程進行排隊處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊列中排隊的最后一個元素不是當前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在隊列中排最后的線程,ARGV[2]是當前線程的UUID+線程ID,ARGV[4]是當前時間//因為擁有最大過期時間的線程在隊列中是排最后的//所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間//從而保證新加入隊列和有序集合的線程的過期時間是最大的//下面這一行會計算出:還有多少時間,當前隊列中排最后的線程就會過期,外部代碼拿到這個時間會阻塞等待這個時間 //這樣后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面這一行會計算出:還有多少時間,鎖就會過期,外部代碼拿到這個時間會阻塞等待這個時間"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計算當前線程在排隊等待鎖時的過期時間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間
//然后再把當前線程作為一個元素插入隊列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
首先執行命令"lindex redisson_lock_queue:{myLock} 0"。也就是從隊列中獲取最后一個元素,由于此時隊列是空,所以獲取不到元素。然后執行命令"ttl = pttl myLock",獲取鎖剩余的存活時間。
接著計算當前線程在排隊等待鎖時的過期時間。假設myLock剩余的存活時間ttl為20秒,那么timeout = ttl + 5分鐘 + 當前時間 = 20秒 + 5分鐘 + 10:00:00 = 10:05:20;
然后執行命令"zadd redisson_lock_timeout:{myLock} 10:05:20 UUID2:ThreadID2",這行命令的意思是,在有序集合中插入一個元素。元素值是UUID2:ThreadID2,元素對應的分數是10:05:20。分數會用時間的Long型時間戳來表示,時間越靠后,時間戳就越大。有序集合Sorted Set會自動根據插入的元素分數從小到大進行排序。
接著執行命令"rpush redisson_lock_queue:{myLock} UUID2:TheadID2",這行命令的意思是,將UUID2:ThreadID2插入到隊列的尾部。
最后返回ttl給外層代碼,也就是返回myLock剩余的存活時間。如果外層代碼拿到的ttl是非null,那么客戶端會進入一個while循環。在while循環會每阻塞等待ttl時間就嘗試進行加鎖,重新執行lua腳本。
(10)獲取鎖失敗的第二個線程執行lua腳本的流程
如果此時有第三個客戶端線程也來嘗試加鎖,那么會進行如下排隊處理。
步驟一:進入while循環,移除等待超時的線程。執行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊列排第一元素。此時獲取到UUID2:ThreadID2,代表著第二個客戶端線程正在隊列里排隊。
繼續執行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對應的分數,timeout = 10:05:20。
假設當前時間是10:00:25,那么timeout <= 10:00:25的這個條件不成立,于是退出while循環。
步驟二:判斷當前線程現在能否嘗試獲取鎖,發現不能通過。因為執行命令"exists myLock"時,發現鎖已經存在。
步驟三:判斷鎖是否已經被當前線程持有。由于第三個客戶端線程的UUID + 線程ID必然不等于第一個客戶端線程。所以此時執行命令"hexists myLock UUID3:ThreadID3",發現不存在。所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊。由于當前線程是第二個獲取鎖失敗的線程,所以判斷不通過。
步驟五:接下來進行排隊處理。
//對獲取鎖失敗的線程進行排隊處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊列中排隊的最后一個元素不是當前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在隊列中排最后的線程,ARGV[2]是當前線程的UUID + 線程ID,ARGV[4]是當前時間//因為擁有最大過期時間的線程在隊列中是排最后的//所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間//從而保證新加入隊列和有序集合的線程的過期時間是最大的//下面這一行會計算出:還有多少時間,當前隊列中排最后的線程就會過期,外部代碼拿到這個時間會阻塞等待這個時間 //這樣后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +//下面這一行會計算出:還有多少時間,鎖就會過期,外部代碼拿到這個時間會阻塞等待這個時間"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計算當前線程在排隊等待鎖時的過期時間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間
//然后再把當前線程作為一個元素插入隊列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
首先執行命令"lindex redisson_lock_queue:{myLock} 0",獲取到隊列中的最后一個元素UUID2:ThreadID2。
然后判斷條件是否成立:lastThreadId不為false + lastThreadId不是自己。由于此時的ARGV[2] = UUID3:ThreadID3,所以判斷條件成立。即在隊列里排隊的最后一個元素并不是當前嘗試獲取鎖的客戶端線程。
于是執行:"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" - 當前時間,也就是獲取在隊列中排最后的線程還有多少時間就會過期,從而得到ttl。
接著根據ttl計算當前線程在排隊等待鎖時的過期時間timeout,然后執行zadd和rpush命令對當前線程進行入隊和排隊,最后返回ttl。
3.公平鎖源碼之可重入加鎖
持有公平鎖的客戶端重復進行lock.lock(),執行加鎖lua腳本的流程如下:
步驟一:進入while循環,移除等待超時的線程。執行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊列排第一元素。此時獲取到UUID2:ThreadID2,代表著第二個客戶端線程正在隊列里排隊。
繼續執行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對應的分數,timeout = 10:05:20。
假設當前時間是10:00:25,那么timeout <= 10:00:25的這個條件不成立,于是退出while循環。
步驟二:判斷當前線程現在能否嘗試獲取鎖,發現不能通過。因為執行命令"exists myLock"時,發現鎖已經存在。
步驟三:判斷鎖是否已經被當前線程持有。由于當前線程的UUID + 線程ID等于持有鎖的線程。即此時執行命令"hexists myLock UUID:ThreadID"發現key是存在的,所以此處的可重入鎖的判斷條件成立。
于是會執行命令"hincrby myLock UUID:ThreadID 1",對key為鎖名的Hash值中,key為UUID + 線程ID的Hash值累加1。并且執行命令"pexpire myLock 300000"重置鎖key的過期時間。最后返回nil,表示重入加鎖成功。
//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經被當前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當前線程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;" +
4.公平鎖源碼之新舊版本對比
當客戶端線程嘗試加公平鎖失敗處于排隊狀態時,會進入while循環。在while循環中,每次都會等待一段時間,再重新進行嘗試加公平鎖。
public class RedissonLock extends RedissonBaseLock {...//加鎖@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//線程ID,用來生成設置Hash的值long threadId = Thread.currentThread().getId();//嘗試加鎖,此時執行RedissonLock.lock()方法默認傳入的leaseTime=-1Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//ttl為null說明加鎖成功if (ttl == null) {return;}//加鎖失敗時的處理CompletableFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {//再次嘗試獲取鎖ttl = tryAcquire(-1, leaseTime, unit, threadId);//返回的ttl為null,獲取到鎖,就退出while循環if (ttl == null) {break;}//返回的ttl不為null,則說明其他客戶端或線程還持有鎖//那么就利用同步組件Semaphore進行阻塞等待一段ttl的時間if (ttl >= 0) {try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();} else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(commandExecutor.getNow(future), threadId);}}...
}
假設第二個客戶端線程第一次加鎖是在10:00:00,然后在10:00:15該客戶端線程再次發起請求嘗試進行加鎖,但第一個客戶端線程在10:00:00~10:00:15之間一直持有這把鎖,此時第二個客戶端線程的再次加鎖流程如下:
(1)新版本再次加鎖失敗不會刷新排隊分數(等待超時的時間點timeout)
步驟一:進入while循環,移除等待超時的線程。執行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊列排第一元素。此時獲取到UUID2:ThreadID2,代表著第二個客戶端線程正在隊列里排隊。
繼續執行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對應的分數,比如獲取到的timeout = 10:05:20。根據當前時間是10:00:15,那么timeout <= 10:00:15的這個條件不成立,于是退出while循環。
步驟二:判斷當前線程現在能否嘗試獲取鎖,發現不能通過。因為執行命令"exists myLock"時,發現鎖已經存在。
步驟三:判斷鎖是否已經被當前線程持有。由于第二個客戶端線程的UUID + 線程ID必然不等于第一個客戶端線程,所以此時執行命令"hexists myLock UUID2:ThreadID2",發現不存在,所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊。由于當前線程是第二次嘗試獲取鎖,所以判斷通過。然后返回第二個客戶端線程等待獲取鎖時,還剩多少時間就超時,不會刷新排隊分數。
//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,//步驟一:remove stale threads,移除等待超時的線程"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end;" +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//從有序集合 + 隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2);" +"redis.call('lpop', KEYS[2]);" +"else " +"break;" +"end;" +"end;" +//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對線程排隊的隊列;//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID + ThreadID;"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步驟三:當前線程執行獲取鎖的操作//彈出隊列的第一個元素 + 從有序集合中刪除UUID:ThreadID對應的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//遞減有序集合中每個線程的分數,也就是遞減每個線程獲取鎖時的已經等待時間//zrange返回有序集合KEYS[3]中指定區間內(0,-1)的成員,也就是全部成員"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//對有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])//ARGV[3]就是線程獲取鎖時可以等待的時間,默認是5分鐘"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//hset設置Hash值進行加鎖操作 + pexpire設置鎖key的過期時間 + 最后返回nil表示加鎖成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步驟四:判斷鎖是否已經被當前線程持有(可重入鎖),KEYS[1]是鎖的名字,ARGV[2]是當前線程的UUID+ThreadID;"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +"redis.call('hincrby', KEYS[1], ARGV[2],1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +"end;" +//步驟五:判斷當前獲取鎖失敗的線程是否已經在隊列中排隊//KEYS[3]是對線程排序的有序集合,ARGV[2]是當前線程的UUID + ThreadID;"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +"if timeout ~= false then " +//如果當前獲取鎖失敗的線程已經在隊列中排隊//那么就返回該線程等待獲取鎖時,還剩多少時間就超時了,外部代碼拿到這個時間會阻塞等待這個時間//ARGV[3]是當前線程獲取鎖時可以等待的時間,ARGV[4]是當前時間"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +"end;" +//步驟六:對獲取鎖失敗的線程進行排隊處理"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +"local ttl;" +//如果在隊列中排隊的最后一個元素不是當前線程"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +//lastThreadId是在隊列中排最后的線程,ARGV[2]是當前線程的UUID + 線程ID,ARGV[4]是當前時間//因為擁有最大過期時間的線程在隊列中是排最后的//所以可通過隊列中的最后一個元素的過期時間,計算當前線程的過期時間//從而保證新加入隊列和有序集合的線程的過期時間是最大的//下面這一行會計算出:還有多少時間,當前隊列中排最后的線程就會過期,外部代碼拿到這個時間會阻塞等待這個時間 //這樣后一個加入隊列的線程,會阻塞等待前一個加入隊列的線程的過期時間"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +"else " +//下面這一行會計算出:還有多少時間,鎖就會過期,外部代碼拿到這個時間會阻塞等待這個時間"ttl = redis.call('pttl', KEYS[1]);" +"end;" +//計算當前線程在排隊等待鎖時的過期時間"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間//然后再把當前線程作為一個元素插入隊列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end;" +"return ttl;",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),unit.toMillis(leaseTime),getLockName(threadId),wait,//默認是5分鐘currentTime);
}
(2)舊版本再次加鎖失敗會刷新排隊分數(等待超時的時間點timeout)
舊版本公平鎖的lua腳本如下所示,當第二個客戶端線程再次加鎖時會再次進入排隊邏輯。
首先會出計算隊列中的第一個元素還有多少時間就超時,即ttl。然后根據ttl + 傳入的等待時間,計算當前線程等待鎖的超時時間timeout。
接著執行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分數為timeout。客戶端線程每次重復嘗試加鎖,都會將其對應的過期時間往后延長,也就是刷新了排隊的分數。
zadd命令在添加存在的元素時,會返回0,但會更新該元素的分數。
//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,//步驟一:移除等待超時的線程"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//從有序集合 + 隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對線程排隊的隊列;//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID+ThreadID;"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步驟三:當前線程執行獲取鎖的操作//彈出隊列的第一個元素 + 從有序集合中刪除UUID:ThreadID對應的元素"redis.call('lpop', KEYS[2]); " +"redis.call('zrem', KEYS[3], ARGV[2]); " +//hset設置Hash值進行加鎖操作 + pexpire設置鎖key的過期時間 + 最后返回nil表示加鎖成功"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步驟四:判斷鎖是否已經被當前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當前線程的UUID+ThreadID;"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +//步驟五:對獲取鎖失敗的線程進行排隊處理"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +"local ttl; " +//如果在隊列中排隊的第一個元素不是當前線程"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +//計算隊列中第一個元素還有多少時間就超時了"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +"else " +"ttl = redis.call('pttl', KEYS[1]);" +"end; " +//計算當前線程等待鎖的超時時間"local timeout = ttl + tonumber(ARGV[3]);" +//把當前線程作為一個元素插入有序集合,并設置元素分數為該線程在排隊等待鎖時的過期時間//然后再把當前線程作為一個元素插入隊列尾部"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +"redis.call('rpush', KEYS[2], ARGV[2]);" +"end; " +"return ttl;",Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]internalLockLeaseTime,//ARGV[1]getLockName(threadId),//ARGV[2]currentTime + threadWaitTime,//ARGV[3] = 當前時間 + 5秒currentTime//ARGV[4]);
}
注意:如果僅僅使用有序集合是不行的,因為有序集合的分數在lua腳本執行過程中也會發生變化。舊版本中,客戶端線程每次嘗試加鎖,有序集合中的分數會更新。新版本中,當前線程可以嘗試獲取鎖時,也會遍歷更新有序集合中的分數。
此外,有序集合獲取第一個元素的時間復雜度比隊列要高。如果僅僅使用隊列也是不行的,因為需要管理排隊線程的等待超時時間。如果沒有有序集合,那么就不能移除在隊列中排隊已超時的線程。當然,為了管理線程的等待超時時間,將有序集合換成兩層Hash值也可以。
5.公平鎖源碼之隊列重排
(1)新版本在5分鐘后嘗試再次加鎖才會隊列重排
新版本的公平鎖中,獲取鎖失敗的線程默認會進入隊列最多等待5分鐘。
在這5分鐘內,該線程不管再次加鎖多少次,都不會刷新隊列排序和分數。
在這5分鐘內,該線程沒有進行再次加鎖嘗試,就會被移出隊列和有序集合。所以5分鐘后,該線程才嘗試再次加鎖,那么會重新入隊,導致隊列重排。
(2)舊版本在5秒后嘗試再次加鎖就會隊列重排
舊版本的公平鎖中,獲取鎖失敗的線程默認會進入隊列最多等待5秒鐘。
在這5秒鐘內,該線程只要重新嘗試進行加鎖,那么就會延長其最多等待時間,也就是刷新有序集合中的排隊分數。
在這5秒鐘內,該線程沒有進行再次加鎖嘗試,就會被移出隊列和有序集合。所以5秒鐘后,該線程才嘗試再次加鎖,那么會重新入隊,導致隊列重排。
(3)導致隊列重排的是lua腳本的步驟一(移除等待超時的線程)
也就是公平鎖lua腳本中while循環的作用。
當客戶端線程使用RedissonLock的tryAcquire()方法嘗試獲取公平鎖,并且指定了一個獲取鎖的超時時間時。比如指定客戶端線程在隊列里排隊超過了20秒,就不再嘗試獲取鎖了。如果獲取鎖的超時時間沒有指定,新版本是默認5分鐘超時,舊版本是默認5秒后超時。
此時由于這些等待獲取鎖已超時的線程元素還存在隊列和有序集合里,所以可以通過while循環的邏輯來清除這些不再嘗試獲取鎖的客戶端線程。
在新版本,隨著時間推移,這些等待獲取鎖超時的線程就會被移出隊列。在舊版本,隨著時間推移,這些等待獲取鎖超時的線程只要不再嘗試加鎖,那么其等待獲取鎖的超時時間就不會更新被不斷延長,就會被移除隊列。
如果客戶端宕機了,那么客戶端就不會重新嘗試獲取鎖。在新版本中,隨著時間推移,宕機的客戶端線程就會被移出隊列。在舊版本中,就不會刷新和延長有序集合中的超時時間分數,這樣while循環的邏輯就會將這些宕機的客戶端線程從隊列中移出。
在新版本中,最多5分鐘后,宕機的客戶端線程會被移出隊列。在舊版本中,最多5秒鐘后,宕機的客戶端線程就會被移出隊列。
因為網絡延遲等原因,可能會導致客戶端線程等待鎖時間過長,從而觸發各個客戶端線程的排隊順序的重排序。有的客戶端如果在隊列里等待時間過長,可能就會觸發一次隊列的重排序。新版本觸發重排序的頻率是每5分鐘,舊版本觸發重排序的頻率是每5秒。
//步驟一:移除等待超時的線程
"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//從有序集合 + 隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +
"end;" +
6.公平鎖源碼之釋放鎖
(1)釋放公平鎖的流程
釋放公平鎖首先調用的還是RedissonLock的unlock()方法。
在RedissonLock的unlock()方法中,會調用get(unlockAsync())。也就是首先調用RedissonBaseLock的unlockAsync()方法,然后調用RedissonObject的get()方法。
其中個RedissonBaseLock的unlockAsync()方法是異步化執行的方法,釋放鎖的操作是異步執行的。而RedisObject的get()方法會通過RFuture同步等待獲取異步執行的結果。所以,可以將get(unlockAsync())理解為異步轉同步。
在RedissonBaseLock的unlockAsync()方法中,就會調用公平鎖RedissonFairLock的unlockInnerAsync()方法進行釋放鎖。然后當完成釋放鎖的處理后,會通過異步去取消定時調度任務。
public class Application {public static void main(String[] args) throws Exception {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");//創建RedissonClient實例RedissonClient redisson = Redisson.create(config);//獲取公平的可重入鎖RLock fairLock = redisson.getFairLock("myLock");fairLock.lock();fairLock.unlock();...}
}public class RedissonLock extends RedissonBaseLock {...@Overridepublic void unlock() {...//異步轉同步//首先調用的是RedissonBaseLock的unlockAsync()方法//然后調用的是RedissonObject的get()方法get(unlockAsync(Thread.currentThread().getId()));...}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {...@Overridepublic RFuture<Void> unlockAsync(long threadId) {//異步執行釋放鎖的lua腳本RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {//取消定時調度任務cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);...
}public class RedissonFairLock extends RedissonLock implements RLock {private final long threadWaitTime;private final CommandAsyncExecutor commandExecutor;private final String threadsQueueName;private final String timeoutSetName;public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {this(commandExecutor, name, 60000*5);}public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.threadWaitTime = threadWaitTime;threadsQueueName = prefixName("redisson_lock_queue", name);timeoutSetName = prefixName("redisson_lock_timeout", name);}@Overrideprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//步驟一:移除等待超時的線程"while true do " +//獲取隊列中的第一個元素//KEYS[2]是一個用來對線程排隊的隊列的名字"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +"if firstThreadId2 == false then " +"break;" +"end; " +//獲取隊列中第一個元素對應的分數,也就是排第一的線程的過期時間//KEYS[3]是一個用來對線程排序的有序集合的名字"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +//如果排第一的線程的過期時間小于當前時間,說明該線程等待超時了都還沒獲取到鎖,所以要移除//ARGV[4]是當前時間"if timeout <= tonumber(ARGV[4]) then " +//從有序集合 + 隊列中移除這個線程"redis.call('zrem', KEYS[3], firstThreadId2); " +"redis.call('lpop', KEYS[2]); " +"else " +"break;" +"end; " +"end;" +//步驟二:判斷鎖是否還存在,判斷key為鎖名的Hash值是否存在"if (redis.call('exists', KEYS[1]) == 0) then " +//獲取隊列中排第一的線程"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//ARGV[1]為通知事件的類型"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; " +"end;" +//步驟二:判斷鎖是否還存在,判斷key為UUID+線程ID的Hash值是否存在"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +//對key為UUID+線程ID的Hash值還存遞減1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"end; " +"redis.call('del', KEYS[1]); " +"local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " +//發布一個事件給在隊列中排第一的線程"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +"end; " +"return 1; ",Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),LockPubSub.UNLOCK_MESSAGE,//ARGV[1]internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());}...
}
(2)釋放公平鎖的lua腳本分析
步驟一:移除等待超時的線程
首先也會進入while循環,移除等待超時的線程。即獲取隊列中排第一的線程,判斷該線程的過期時間是否已小于當前時間。如果小于當前時間,那么就說明該線程在隊列中的排隊已經過期,于是便將該線程從有序集合 + 隊列中移除。后續如果該線程再次嘗試加鎖,那么會重新排序 + 重新入隊。
步驟二:判斷鎖是否還存在
如果key為鎖名的Hash值已不存在,那么先獲取隊列中排第一的線程,然后發布一個事件給該線程對應的客戶端讓其獲取鎖。
如果key為鎖名的Hash值還存在,那么判斷field為UUID + 線程ID的映射是否存在。如果field為UUID + 線程ID的映射不存在,那么表示鎖已經被釋放了,直接返回nil。如果field為UUID + 線程ID的映射存在,那么在key為鎖名的Hash值中,對field為UUID + 線程ID的value值遞減1。也就是調用Redis的hincrby命令,進行遞減1處理。
步驟三:對遞減1后的結果進行如下判斷處理
如果遞減1后的結果大于0,表示線程還在持有鎖。對應于持有鎖的線程多次重入鎖,此時需要重置鎖的過期時間。
如果遞減1后的結果小于0,表示線程不再持有鎖,則刪除鎖對應的key,并且發布一個事件給在隊列中排第一的線程所對應的客戶端。
7.公平鎖源碼之按順序依次加鎖
假設客戶端A先持有鎖,而客戶端B在隊列里面是排在客戶端C的后面。那么如果客戶端A釋放了鎖后,客戶端B和C是如何按順序加鎖的。
(1)鎖被釋放后,排第二的客戶端線程先來加鎖
鎖被客戶端A釋放掉,鎖key被刪除之后,客戶端B先來進行嘗試加鎖。此時客戶端B執行的lua腳本步驟二的邏輯:
//check if the lock can be acquired now
//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖
//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +...
"end;"
首先,執行判斷"exists myLock = 0",由于當前鎖存在,所以條件不成立。
然后,執行判斷"exists redisson_lock_queue:{myLock} = 0",由于隊列存在,所以條件不成立。
接著,執行判斷"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于隊列存在,但是在隊列中排第一的不是客戶端B而是客戶端C,所以條件不成立,客戶端B無法加鎖。
由此可見:即使鎖釋放掉后,多個客戶端來嘗試加鎖也只認隊列中排第一的客戶端。從而實現按隊列的順序依次獲取鎖,保證了公平性。
(2)鎖被釋放后,排第一的客戶端線程再來加鎖
當在隊列中排第一的客戶端C此時過來嘗試加鎖時,就會執行如下步驟三的嘗試加鎖邏輯:
//check if the lock can be acquired now
//步驟二:判斷當前線程現在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進行嘗試獲取鎖
//情況一:鎖不存在 + 隊列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對線程排隊的隊列;
//情況二:鎖不存在 + 隊列存在 + 隊列的第一個元素就是當前線程;ARGV[2]是當前線程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +"and ((redis.call('exists', KEYS[2]) == 0) " +"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +//步驟三:當前線程執行獲取鎖的操作//remove this thread from the queue and timeout set//彈出隊列的第一個元素 + 從有序集合中刪除UUID:ThreadID對應的元素"redis.call('lpop', KEYS[2]);" +"redis.call('zrem', KEYS[3], ARGV[2]);" +//decrease timeouts for all waiting in the queue//遞減有序集合中每個線程的分數,也就是遞減每個線程獲取鎖時的已經等待時間//zrange返回有序集合KEYS[3]中指定區間內(0,-1)的成員,也就是全部成員"local keys = redis.call('zrange', KEYS[3], 0, -1);" +"for i = 1, #keys, 1 do " +//對有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])//ARGV[3]就是線程獲取鎖時可以等待的時間,默認是5分鐘"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +"end;" +//acquire the lock and set the TTL for the lease//hset設置Hash值進行加鎖操作 + pexpire設置鎖key的過期時間 + 最后返回nil表示加鎖成功"redis.call('hset', KEYS[1], ARGV[2], 1);" +"redis.call('pexpire', KEYS[1], ARGV[1]);" +"return nil;" +
"end;"
首先,執行命令"lpop redisson_lock_queue:{myLock}",將隊列中的第一個元素彈出來。
然后,執行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",將有序集合中客戶端C的線程對應的元素給刪除掉。
接著,執行"hset myLock UUID3:ThreadID3 1"進行加鎖,設置field為UUID + 線程ID的value值為1。
最后,執行命令"pexpire myLock 30000",設置key為鎖名的Hash值的過期時間為30000毫秒。
客戶端C完成加鎖后,客戶端C就會從隊列中出隊,此時排在隊頭的就是客戶端B。
文章轉載自:東陽馬生架構
原文鏈接:分布式鎖—3.Redisson的公平鎖 - 東陽馬生架構 - 博客園
體驗地址:JNPF快速開發平臺