1.Redisson的分布式鎖簡單總結
Redisson分布式鎖包括:可重入鎖、公平鎖、聯鎖、紅鎖、讀寫鎖。
(1)可重入鎖RedissonLock
非公平鎖,最基礎的分布式鎖,最常用的鎖。
(2)公平鎖RedissonFairLock
各個客戶端嘗試獲取鎖時會排隊,按照隊列的順序先后獲取鎖。
(3)聯鎖MultiLock
可以一次性加多把鎖,從而實現一次性鎖多個資源。
(4)紅鎖RedLock
RedLock相當于一把鎖。雖然利用了MultiLock包裹了多個小鎖,但這些小鎖并不對應多個資源,而是每個小鎖的key對應一個Redis實例。只要大多數的Redis實例加鎖成功,就可以認為RedLock加鎖成功。RedLock的健壯性要比其他普通鎖要好。
但是RedLock也有一些場景無法保證正確性,當然RedLock只要求部署主庫。比如客戶端A嘗試向5個Master實例加鎖,但僅僅在3個Maste中加鎖成功。不幸的是此時3個Master中有1個Master突然宕機了,而且鎖key還沒同步到該宕機Master的Slave上,此時Salve切換為Master。于是在這5個Master中,由于其中有一個是新切換過來的Master,所以只有2個Master是有客戶端A加鎖的數據,另外3個Master是沒有鎖的。但繼續不幸的是,此時客戶端B來加鎖,那么客戶端B就很有可能成功在沒有鎖數據的3個Master上加到鎖,從而滿足了過半數加鎖的要求,最后也完成了加鎖,依然發生重復加鎖。
(5)讀寫鎖之讀鎖RedissonReadLock和寫鎖RedissonWriteLock
不同客戶端線程的四種加鎖情況:
情況一:先加讀鎖再加讀鎖,不互斥
情況二:先加讀鎖再加寫鎖,互斥
情況三:先加寫鎖再加讀鎖,互斥
情況四:先加寫鎖再加寫鎖,互斥
同一個客戶端線程的四種加鎖情況:
情況一:先加讀鎖再加讀鎖,不互斥
情況二:先加讀鎖再加寫鎖,互斥
情況三:先加寫鎖再加讀鎖,不互斥
情況四:先加寫鎖再加寫鎖,不互斥
2.Redisson的Semaphore簡介
(1)Redisson的Semaphore原理圖
Semaphore也是Redisson支持的一種同步組件。Semaphore作為一個鎖機制,可以允許多個線程同時獲取一把鎖。任何一個線程釋放鎖之后,其他等待的線程就可以嘗試繼續獲取鎖。
(2)Redisson的Semaphore使用演示
public class RedissonDemo {public static void main(String[] args) throws Exception {//連接3主3從的Redis CLusterConfig config = new Config();...//SemaphoreRedissonClient redisson = Redisson.create(config);final RSemaphore semaphore = redisson.getSemaphore("semaphore");semaphore.trySetPermits(3);for (int i = 0; i < 10; i++) {new Thread(new Runnable() {public void run() {try {System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]嘗試獲取Semaphore鎖");semaphore.acquire();System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]成功獲取到了Semaphore鎖,開始工作");Thread.sleep(3000);semaphore.release();System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]釋放Semaphore鎖");} catch (Exception e) {e.printStackTrace();}}}).start();}}
}
3.Redisson的Semaphore源碼剖析
(1)Semaphore的初始化
public class Redisson implements RedissonClient {//Redis的連接管理器,封裝了一個Config實例protected final ConnectionManager connectionManager;//Redis的命令執行器,封裝了一個ConnectionManager實例protected final CommandAsyncExecutor commandExecutor;...protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//初始化Redis的連接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);... //初始化Redis的命令執行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}@Overridepublic RSemaphore getSemaphore(String name) {return new RedissonSemaphore(commandExecutor, name);}...
}public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {private final SemaphorePubSub semaphorePubSub;final CommandAsyncExecutor commandExecutor;public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub();}...
}
(2)Semaphore設置允許獲取的鎖數量
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {...@Overridepublic boolean trySetPermits(int permits) {return get(trySetPermitsAsync(permits));}@Overridepublic RFuture<Boolean> trySetPermitsAsync(int permits) {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//執行命令"get semaphore",獲取到當前的數值"local value = redis.call('get', KEYS[1]); " +"if (value == false) then " +//然后執行命令"set semaphore 3"//設置這個信號量允許客戶端同時獲取鎖的總數量為3"redis.call('set', KEYS[1], ARGV[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1;" +"end;" +"return 0;",Arrays.asList(getRawName(), getChannelName()),permits);if (log.isDebugEnabled()) {future.onComplete((r, e) -> {if (r) {log.debug("permits set, permits: {}, name: {}", permits, getName());} else {log.debug("unable to set permits, permits: {}, name: {}", permits, getName());}});}return future;}...
}
首先執行命令"get semaphore",獲取到當前的數值。然后執行命令"set semaphore 3",也就是設置這個信號量允許客戶端同時獲取鎖的總數量為3。
(3)客戶端嘗試獲取Semaphore的鎖
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {...private final SemaphorePubSub semaphorePubSub;final CommandAsyncExecutor commandExecutor;public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub();}@Overridepublic void acquire() throws InterruptedException {acquire(1);}@Overridepublic void acquire(int permits) throws InterruptedException {if (tryAcquire(permits)) {return;}CompletableFuture<RedissonLockEntry> future = subscribe();commandExecutor.syncSubscriptionInterrupted(future);try {while (true) {if (tryAcquire(permits)) {return;}//獲取Redisson的Semaphore失敗,于是便調用本地JDK的Semaphore的acquire()方法,此時當前線程會被阻塞//之后如果Redisson的Semaphore釋放了鎖,那么當前客戶端便會通過監聽訂閱事件釋放本地JDK的Semaphore,喚醒被阻塞的線程,繼續執行while循環//注意:getLatch()返回的是JDK的Semaphore = "new Semaphore(0)" ==> (state - permits)//首先調用CommandAsyncService.getNow()方法//然后調用RedissonLockEntry.getLatch()方法//接著調用JDK的Semaphore的acquire()方法commandExecutor.getNow(future).getLatch().acquire();}} finally {unsubscribe(commandExecutor.getNow(future));}}@Overridepublic boolean tryAcquire(int permits) {//異步轉同步return get(tryAcquireAsync(permits));}@Overridepublic RFuture<Boolean> tryAcquireAsync(int permits) {if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}if (permits == 0) {return RedissonPromise.newSucceededFuture(true);}return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//執行命令"get semaphore",獲取到當前值"local value = redis.call('get', KEYS[1]); "+//如果semaphore的當前值不是false,且大于客戶端線程申請獲取鎖的數量"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +//執行"decrby semaphore 1",將信號量允許獲取鎖的總數量遞減1"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +//如果semaphore的值變為0,那么客戶端就無法獲取鎖了,此時返回false"return 0;",Collections.<Object>singletonList(getRawName()),permits//ARGV[1]默認是1);}...
}public class CommandAsyncService implements CommandAsyncExecutor {...@Overridepublic <V> V getNow(CompletableFuture<V> future) {try {return future.getNow(null);} catch (Exception e) {return null;}}...
}public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {private final Semaphore latch;...public RedissonLockEntry(CompletableFuture<RedissonLockEntry> promise) {super();this.latch = new Semaphore(0);this.promise = promise;}public Semaphore getLatch() {return latch;}...
}
執行命令"get semaphore",獲取到semaphore的當前值。如果semaphore的當前值不是false,且大于客戶端線程申請獲取鎖的數量。那么就執行"decrby semaphore 1",將信號量允許獲取鎖的總數量遞減1。
如果semaphore的值變為0,那么客戶端就無法獲取鎖了,此時tryAcquire()方法返回false。表示獲取semaphore的鎖失敗了,于是當前客戶端線程便會通過本地JDK的Semaphore進行阻塞。
當客戶端后續收到一個訂閱事件把本地JDK的Semaphore進行釋放后,便會喚醒阻塞線程繼續while循環。在while循環中,會不斷嘗試獲取這個semaphore的鎖,如此循環往復,直到成功獲取。
(4)客戶端釋放Semaphore的鎖
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {...@Overridepublic void release() {release(1);}@Overridepublic void release(int permits) {get(releaseAsync(permits));}@Overridepublic RFuture<Void> releaseAsync(int permits) {if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}if (permits == 0) {return RedissonPromise.newSucceededFuture(null);}RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,//執行命令"incrby semaphore 1""local value = redis.call('incrby', KEYS[1], ARGV[1]); " +"redis.call('publish', KEYS[2], value); ",Arrays.asList(getRawName(), getChannelName()),permits);if (log.isDebugEnabled()) {future.onComplete((o, e) -> {if (e == null) {log.debug("released, permits: {}, name: {}", permits, getName());}});}return future;}...
}//訂閱semaphore不為0的事件,semaphore不為0時會觸發執行這里的監聽回調
public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {public SemaphorePubSub(PublishSubscribeService service) {super(service);}@Overrideprotected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}//將客戶端本地JDK的Semaphore進行釋放value.getLatch().release(Math.min(value.acquired(), message.intValue()));}
}//訂閱鎖被釋放的事件,鎖被釋放為0時會觸發執行這里的監聽回調
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {public static final Long UNLOCK_MESSAGE = 0L;public static final Long READ_UNLOCK_MESSAGE = 1L;public LockPubSub(PublishSubscribeService service) {super(service);} @Overrideprotected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) {return new RedissonLockEntry(newPromise);}@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}value.getLatch().release();} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}//將客戶端本地JDK的Semaphore進行釋放value.getLatch().release(value.getLatch().getQueueLength());}}
}
客戶端釋放Semaphore的鎖時,會執行命令"incrby semaphore 1"。每當客戶端釋放掉permits個鎖,就會將信號量的值累加permits,這樣Semaphore信號量的值就不再是0了。然后通過publish命令發布一個事件,之后訂閱了該事件的其他客戶端都會對getLatch()返回的本地JDK的Semaphore進行加1。于是其他客戶端正在被本地JDK的Semaphore進行阻塞的線程,就會被喚醒繼續執行。此時其他客戶端就可以嘗試獲取到這個信號量的鎖,然后再次將這個Semaphore的值遞減1。
4.Redisson的CountDownLatch簡介
(1)Redisson的CountDownLatch原理圖解
CountDownLatch的基本原理:要求必須有n個線程來進行countDown,才能讓執行await的線程繼續執行。如果沒有達到指定數量的線程來countDown,會導致執行await的線程阻塞。
(2)Redisson的CountDownLatch使用演示
public class RedissonDemo {public static void main(String[] args) throws Exception {//連接3主3從的Redis CLusterConfig config = new Config();...//CountDownLatchfinal RedissonClient redisson = Redisson.create(config);RCountDownLatch latch = redisson.getCountDownLatch("myCountDownLatch");//1.設置可以countDown的數量為3latch.trySetCount(3);System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]設置了必須有3個線程執行countDown,進入等待中。。。");for (int i = 0; i < 3; i++) {new Thread(new Runnable() {public void run() {try {System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]在做一些操作,請耐心等待。。。。。。");Thread.sleep(3000);RCountDownLatch localLatch = redisson.getCountDownLatch("myCountDownLatch");localLatch.countDown();System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]執行countDown操作");} catch (Exception e) {e.printStackTrace();}}}).start();}latch.await();System.out.println(new Date() + ":線程[" + Thread.currentThread().getName() + "]收到通知,有3個線程都執行了countDown操作,可以繼續往下執行");}
}
5.Redisson的CountDownLatch源碼剖析
(1)CountDownLatch的初始
public class Redisson implements RedissonClient {//Redis的連接管理器,封裝了一個Config實例protected final ConnectionManager connectionManager;//Redis的命令執行器,封裝了一個ConnectionManager實例protected final CommandAsyncExecutor commandExecutor;...protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//初始化Redis的連接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);... //初始化Redis的命令執行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}@Overridepublic RCountDownLatch getCountDownLatch(String name) {return new RedissonCountDownLatch(commandExecutor, name);}...
}public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {...private final CountDownLatchPubSub pubSub;private final String id;protected RedissonCountDownLatch(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.id = commandExecutor.getConnectionManager().getId();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getCountDownLatchPubSub();}...
}
(2)trySetCount()方法設置countDown的數量
trySetCount()方法的工作就是執行命令"set myCountDownLatch 3"。
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {...@Overridepublic boolean trySetCount(long count) {return get(trySetCountAsync(count));}@Overridepublic RFuture<Boolean> trySetCountAsync(long count) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if redis.call('exists', KEYS[1]) == 0 then " +"redis.call('set', KEYS[1], ARGV[2]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1 " +"else " +"return 0 " +"end",Arrays.asList(getRawName(), getChannelName()),CountDownLatchPubSub.NEW_COUNT_MESSAGE,count);}...
}
(3)awati()方法進行阻塞等待
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {...@Overridepublic void await() throws InterruptedException {if (getCount() == 0) {return;}CompletableFuture<RedissonCountDownLatchEntry> future = subscribe();try {commandExecutor.syncSubscriptionInterrupted(future);while (getCount() > 0) {// waiting for open state//獲取countDown的數量還大于0,就先阻塞線程,然后再等待喚醒,執行while循環//其中getLatch()返回的是JDK的semaphore = "new Semaphore(0)" ==> (state - permits)commandExecutor.getNow(future).getLatch().await();}} finally {unsubscribe(commandExecutor.getNow(future));}}@Overridepublic long getCount() {return get(getCountAsync());}@Overridepublic RFuture<Long> getCountAsync() {//執行命令"get myCountDownLatch"return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getRawName());}...
}
在while循環中,首先會執行命令"get myCountDownLatch"去獲取countDown值。如果該值不大于0,就退出循環不阻塞線程。如果該值大于0,則說明還沒有指定數量的線程去執行countDown操作,于是就會先阻塞線程,然后再等待喚醒來繼續循環。
(4)countDown()方法對countDown的數量遞減
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {...@Overridepublic void countDown() {get(countDownAsync());}@Overridepublic RFuture<Void> countDownAsync() {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local v = redis.call('decr', KEYS[1]);" +"if v <= 0 then redis.call('del', KEYS[1]) end;" +"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",Arrays.<Object>asList(getRawName(), getChannelName()),CountDownLatchPubSub.ZERO_COUNT_MESSAGE);}...
}
countDownAsync()方法會執行decr命令,將countDown的數量進行遞減1。如果這個值已經小于等于0,就執行del命令刪除掉該CoutDownLatch。如果是這個值為0,還會發布一條消息:
publish redisson_countdownlatch__channel__{anyCountDownLatch} 0
文章轉載自:東陽馬生架構
原文鏈接:分布式鎖—6.Redisson的同步器組件 - 東陽馬生架構 - 博客園
體驗地址:引邁 - JNPF快速開發平臺_低代碼開發平臺_零代碼開發平臺_流程設計器_表單引擎_工作流引擎_軟件架構