一、使用Redisson步驟
Redisson各個鎖基本所用Redisson各個鎖基本所用Redisson各個鎖基本所用
二、源碼解析
lock鎖
1) 基本思想:
lock有兩種方法 一種是空參 ?另一種是帶參
? ? ? ? ?* 空參方法:會默認調用看門狗的過期時間30*1000(30秒)
? ? ? ? ?* 然后在正常運行的時候,會啟用定時任務調用重置時間的方法(間隔為開門看配置的默認過期時間的三分之一,也就是10秒)
? ? ? ? ?* 當出現錯誤的時候就會停止續期,直到到期釋放鎖或手動釋放鎖
? ? ? ? ?* 帶參方法:手動設置解鎖時間,到期后自動解鎖,或者業務完成后手動解鎖,不會自動續期
源碼:
Lock
調用lockInterruptibly()方法會默認傳入lease 為-1,該值再后面起作用
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();//獲取該鎖的過期時間,如果該鎖沒被持有,會返回一個null,如果被持有 會返回一個過期時間Long ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl != null) {//ttl不為null,說明鎖已經被搶占了RFuture<RedissonLockEntry> future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future);try {//開始循環獲取鎖while(true) {//剛進如循環先嘗試獲取鎖,獲取成功返回null,跳出循環,獲取失敗,則繼續往下走ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl == null) {return;}if (ttl >= 0L) {//如果過期時間大于0,則調用getLatch// 返回一個信號量,開始進入阻塞,阻塞時長為上一次鎖的剩余過期時長,并且讓出cup//有阻塞必然有喚醒,位于解鎖操作中this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {this.getEntry(threadId).getLatch().acquire();}}} finally {this.unsubscribe(future, threadId);}}}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {//如果leaseTime != -1,即不等于默認值,則表示手動設置了過期時間if (leaseTime != -1L) {return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//如果leaseTime = -1,表示使用默認方式,即使用看門狗默認實現自動續期RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {public void operationComplete(Future<Long> future) throws Exception {//如果tryLockInnerAsync執行成功if (future.isSuccess()) {//獲取過期時間Long ttlRemaining = (Long)future.getNow();//過期時間為空,表示加鎖成功if (ttlRemaining == null) {//開啟刷新重置過期時間步驟RedissonLock.this.scheduleExpirationRenewal(threadId);}}}});return ttlRemainingFuture;}}
// lua腳本嘗試搶占鎖,失敗返回鎖過期時間<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);//直接使用lua腳本發起命令//通過lua腳本可以看出,redisson加鎖除了使用自定義的名字以外,還要使用uuid// 加上當前線程的threadId組合,以自定義名字作hash的key,使用return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,//如果該鎖未被占有,則設置鎖,設置過期時間,過期時間為 internalLockLeaseTime ,然后返回null"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]);return nil; end; " +//如果鎖已經被占有,判斷是否是重入鎖,如果是重入鎖,則將value增加1 ,代表重入,并且設置過期時間,返回null。"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; " +//如果已經被站有所,且不是重入鎖,則返回過期時間"return redis.call('pttl', KEYS[1]);",Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});}
看門狗續命
//看門狗續命機制private void scheduleExpirationRenewal(final long threadId) {//首先會判斷該線程是否已經再重置時間的map中,僅僅第一次進來是空的。if (!expirationRenewalMap.containsKey(this.getEntryName())) {//使用了看門狗默認的時間(30秒) 除以3 ,也就是延遲10秒后執行Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {//判斷是否該線程是否還持有鎖,如果持有,返回1,并且設置過期時間,如果沒持有,返回0RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});future.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {//從map中移除該線程,這樣下次再調用該方法仍然可以執行RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());if (!future.isSuccess()) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());} else {if ((Boolean)future.getNow()) {//當lua腳本返回1表是true,也就是仍然持有鎖,則遞歸調用該方法,RedissonLock.this.scheduleExpirationRenewal(threadId);}}}});}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {task.cancel();}}}
2、unlock
源碼
public RFuture<Void> unlockAsync(final long threadId) {final RPromise<Void> result = new RedissonPromise();//調用lua腳本釋放鎖RFuture<Boolean> future = this.unlockInnerAsync(threadId);future.addListener(new FutureListener<Boolean>() {public void operationComplete(Future<Boolean> future) throws Exception {if (!future.isSuccess()) {result.tryFailure(future.cause());} else {Boolean opStatus = (Boolean)future.getNow();//如果鎖狀態為null,表示存在異常,為正常釋放鎖之前,被別人占領鎖了if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + RedissonLock.this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {//如果返回0.為false 表示可重入鎖,不取消重置過期時間,//返回1 為true,表示已解鎖,取消重置過期時間if (opStatus) {RedissonLock.this.cancelExpirationRenewal();}//解鎖result.trySuccess((Object)null);}}}});return result;}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//當key不存在,表示鎖未被持有,說明不用解鎖了,返回1 ,1在后續表示取消重置過期時間"if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;" +//key存在,但是持有鎖的線程不是當前線程,返回null,后面會提出一個異常"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; " +//鎖狀態-1后仍然大于0,表示可重入鎖,仍處于鎖定狀態,返回0,0在后續表示 不做處理,仍然重置過期時間"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; " +//返回鎖狀態不大于0,正常解鎖,返回1,1在后續表示取消重置過期時間"else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; " +"return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});}
三、集群環境下潛在問題
在Redis主從架構+哨兵模式的環境下,業務系統已經成功獲取了鎖,redis寫入數據,但是正要往從庫上存數據時,發生主庫宕機的情況,從庫在哨兵的選舉下成為了主庫,而另外一個業務請求再次需要獲取鎖,會直接訪問到新的主庫,而此時新主庫是沒有鎖信息的,此時就會出現業務重復的情況。