前言
對于java的單進程應用來說,存在資源競爭的場景可以使用synchronized關鍵字和Lock來對資源進行加鎖,使整個操作具有原子性。但是對于多進程或者分布式的應用來說,上面提到的鎖不共享,做不到互相通訊,所以就需要分布式鎖來解決問題了。
廢話不多說,直接進入正題,下面結合AQS和Redis來實現分布式鎖。
代碼中大部分都是參考ReentrantLock來實現的,所以讀者可以先去了解一下ReentranLock和AQS
參閱:
http://www.importnew.com/27477.html
http://cmsblogs.com/?p=2210
加鎖
@Overrideprotected boolean tryAcquire(int acquires) throws AcquireLockTimeoutException {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, 1)) { // 標注1setExclusiveOwnerThread(current);// 如果是線程被中斷失敗的話,返回false,如果超時失敗的話,捕獲異常return tryAcquireRedisLock(TimeUnit.MILLISECONDS.toNanos(redisLockTimeout));}//可重入} else if (current == getExclusiveOwnerThread()) { //標注2int nextc = c + acquires;if (nextc < 0) {throw new Error("Maximum lock count exceeded");}setState(nextc);return true;}return false;}
- ?
下面會把進程內的鎖稱為進程鎖,如果有更專業的描述方法的話,歡迎指出。
對上面的步驟分析:
1. 首先看標注1,通過compareAndSetState獲取到進程鎖,只有獲取到進程鎖,才有資格去競爭redis鎖, 這樣的好處就是對于同一個進程里面的所有加鎖請求,在某一個時刻只有一個請求能去請求獲取redis鎖,有效降低redis的壓力,總的來說就是把部分競爭交給進程自己去解決了,也就是先競爭進程鎖。
2. 再看標注2,能進行到這一步,首先能確保已經獲取了進程鎖,但是是否一定獲取了redis鎖了呢,不一定,所以在tryAcquireRedisLock的過程中如果拋出異常,一定要保證使用finally代碼塊把進程鎖釋放掉,避免誤以為已經同時獲取了進程鎖和redis鎖。
獲取redis鎖
private final boolean tryAcquireRedisLock(long nanosTimeout) {if (nanosTimeout <= 0L) {return false;}final long deadline = System.nanoTime() + nanosTimeout;int count = 0;boolean interrupted = false;Jedis jedis = null;try {jedis = redisHelper.getJedisInstance();while (true) {nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L) {throw new AcquireLockTimeoutException();}String value = String.format(valueFormat, Thread.currentThread().getId());//避免系統宕機鎖不釋放,設置過期時間String response = jedis.set(lockKey, value, NX, PX, redisLockTimeout);if (OK.equals(response)) {//如果線程被中斷同時也是失敗的return !interrupted;}// 超過嘗試次數if (count > RETRY_TIMES && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD && parkAndCheckInterrupt()) {interrupted = true;}count++;}} finally {redisHelper.returnResouce(jedis);}}final boolean parkAndCheckInterrupt() {LockSupport.parkNanos(TimeUnit.NANOSECONDS.toNanos(PARK_TIME));return Thread.interrupted();
}
- ?
分析:
1. 為了避免獲取redis鎖的過程無休止的運行下去,使用超時策略,如果超時了,直接返回失敗
2. 如果還在有效時間內,則通過自旋不斷嘗試獲取鎖,如果超過了嘗試次數,暫時掛起,讓出時間片,但是不可以掛起太長的時間,幾個時間片內為好。
解鎖
//RedisDistributedLock.java
@Override
public void unlock() {sync.unlock();
}//Sync.java
public void unlock() {release(1);
}@Override
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {Jedis jedis = null;try {jedis = redisHelper.getJedisInstance();String value = String.format(valueFormat, Thread.currentThread().getId());jedis.eval(UNLOCK_SCRIPT, Arrays.asList(lockKey), Arrays.asList(value));} finally {redisHelper.returnResouce(jedis);}free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
- ?
分析:
1. 可以注意到value在加鎖和解鎖的過程都有,這個value是用來標識鎖的唯一性的,避免別的進程誤刪了該鎖。
private final UUID uuid = UUID.randomUUID();
private final String valueFormat = "%d:" + uuid.toString();
- ?
驗證
@Overridepublic void run() {SqlSession session = MybatisHelper.instance.openSession(true);try {KeyGeneratorMapper generatorMapper = session.getMapper(KeyGeneratorMapper.class);KeyFetchRecordMapper recordMapper = session.getMapper(KeyFetchRecordMapper.class);while (true) {try {lock.lock();KeyGenerator keyGenerator = generatorMapper.select(1);if (keyGenerator.getKey() >= MAX_KEY) {System.exit(0);}recordMapper.insert(new KeyFetchRecord(keyGenerator.getKey(), server));generatorMapper.increase(1, 1);session.commit();} catch (RuntimeException e) {e.printStackTrace();continue;} finally {lock.unlock();}}} finally {session.close();}}
- ?
開啟5個進程,每個進程5個線程,進行獲取一個key值,獲取到后加1,然后記錄到數據庫,這個過程不要是原子的,因為把沒有原子性的過程變成有原子性的過程,才證明了這個鎖的有效性。
結果如下
沒有重復的key,成功!