3. 基于zookeeper實現分布式鎖
????????實現分布式鎖目前有三種流行方案,分別為基于數據庫、Redis、Zookeeper的方案。這里主要介紹基于zk怎么實現分布式鎖。在實現分布式鎖之前,先回顧zookeeper的相關知識點
3.1. 知識點回顧
3.1.1. 安裝啟動
安裝:把zk安裝包上傳到/opt目錄下,并切換到/opt目錄下,執行以下指令
# 解壓
tar -zxvf zookeeper-3.7.0-bin.tar.gz
# 重命名
mv apache-zookeeper-3.7.0-bin/ zookeeper
# 打開zookeeper根目錄
cd /opt/zookeeper
# 創建一個數據目錄,備用
mkdir data
# 打開zk的配置目錄
cd /opt/zookeeper/conf
# copy配置文件,zk啟動時會加載zoo.cfg文件
cp zoo_sample.cfg zoo.cfg
# 編輯配置文件
vim zoo.cfg
# 修改dataDir參數為之前創建的數據目錄:/opt/zookeeper/data
# 切換到bin目錄
cd /opt/zookeeper/bin
# 啟動
./zkServer.sh start
./zkServer.sh status # 查看啟動狀態
./zkServer.sh stop # 停止
./zkServer.sh restart # 重啟
./zkCli.sh # 查看zk客戶端
如下,說明啟動成功:
3.1.2. 相關概念
Zookeeper提供一個多層級的節點命名空間(節點稱為znode),每個節點都用一個以斜杠(/)分隔的路徑表示,而且每個節點都有父節點(根節點除外),非常類似于文件系統。并且每個節點都是唯一的。
znode節點有四種類型:
-
PERSISTENT:永久節點。客戶端與zookeeper斷開連接后,該節點依舊存在
-
EPHEMERAL:臨時節點。客戶端與zookeeper斷開連接后,該節點被刪除
-
PERSISTENT_SEQUENTIAL:永久節點、序列化。客戶端與zookeeper斷開連接后,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號
-
EPHEMERAL_SEQUENTIAL:臨時節點、序列化。客戶端與zookeeper斷開連接后,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號
創建這四種節點:
[zk: localhost:2181(CONNECTED) 0] create /aa test # 創建持久化節點
Created /aa
[zk: localhost:2181(CONNECTED) 1] create -s /bb test # 創建持久序列化節點
Created /bb0000000001
[zk: localhost:2181(CONNECTED) 2] create -e /cc test # 創建臨時節點
Created /cc
[zk: localhost:2181(CONNECTED) 3] create -e -s /dd test # 創建臨時序列化節點
Created /dd0000000003
[zk: localhost:2181(CONNECTED) 4] ls / # 查看某個節點下的子節點
[aa, bb0000000001, cc, dd0000000003, zookeeper]
[zk: localhost:2181(CONNECTED) 5] stat / # 查看某個節點的狀態
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x5
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 5
[zk: localhost:2181(CONNECTED) 6] get /aa # 查看某個節點的內容
test
[zk: localhost:2181(CONNECTED) 11] delete /aa # 刪除某個節點
[zk: localhost:2181(CONNECTED) 7] ls / # 再次查看
[bb0000000001, cc, dd0000000003, zookeeper]
????????事件監聽:在讀取數據時,我們可以同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper針對節點的監聽有如下四種事件:
-
節點創建:stat -w /xx
當/xx節點創建時:NodeCreated
-
節點刪除:stat -w /xx
當/xx節點刪除時:NodeDeleted
-
節點數據修改:get -w /xx
當/xx節點數據發生變化時:NodeDataChanged
-
子節點變更:ls -w /xx
當/xx節點的子節點創建或者刪除時:NodeChildChanged
3.1.3. java客戶端
????????ZooKeeper的java客戶端有:原生客戶端、ZkClient、Curator框架(類似于redisson,有很多功能性封裝)。
-
引入依賴
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version>
</dependency>
-
常用api及其方法
public class ZkTest {public static void main(String[] args) throws KeeperException, InterruptedException {// 獲取zookeeper鏈接CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = null;try {zooKeeper = new ZooKeeper("172.16.116.100:2181", 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState()) && Event.EventType.None.equals(event.getType())) {System.out.println("獲取鏈接成功。。。。。。" + event);countDownLatch.countDown();}}});countDownLatch.await();} catch (Exception e) {e.printStackTrace();}// 創建一個節點,1-節點路徑 2-節點內容 3-節點的訪問權限 4-節點類型// OPEN_ACL_UNSAFE:任何人可以操作該節點// CREATOR_ALL_ACL:創建者擁有所有訪問權限// READ_ACL_UNSAFE: 任何人都可以讀取該節點// zooKeeper.create("/atguigu/aa", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zooKeeper.create("/test", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);// zooKeeper.create("/atguigu/cc", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判斷節點是否存在Stat stat = zooKeeper.exists("/test", true);if (stat != null){System.out.println("當前節點存在!" + stat.getVersion());} else {System.out.println("當前節點不存在!");}// 判斷節點是否存在,同時添加監聽zooKeeper.exists("/test", event -> {});// 獲取一個節點的數據byte[] data = zooKeeper.getData("/atguigu/ss0000000001", false, null);System.out.println(new String(data));// 查詢一個節點的所有子節點List<String> children = zooKeeper.getChildren("/test", false);System.out.println(children);// 更新zooKeeper.setData("/test", "wawa...".getBytes(), stat.getVersion());// 刪除一個節點//zooKeeper.delete("/test", -1);if (zooKeeper != null){zooKeeper.close();}}
}
3.2. 思路分析
分布式鎖的步驟:
-
獲取鎖:create一個節點
-
刪除鎖:delete一個節點
-
重試:沒有獲取到鎖的請求重試
參照redis分布式鎖的特點:
-
互斥 排他
-
防死鎖:
-
可自動釋放鎖(臨時節點) :獲得鎖之后客戶端所在機器宕機了,客戶端沒有主動刪除子節點;如果創建的是永久的節點,那么這個鎖永遠不會釋放,導致死鎖;由于創建的是臨時節點,客戶端宕機后,過了一定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節點刪除從而釋放鎖。
-
可重入鎖:借助于ThreadLocal
-
-
防誤刪:宕機自動釋放臨時節點,不需要設置過期時間,也就不存在誤刪問題。
-
加鎖/解鎖要具備原子性
-
單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。
-
集群問題:zookeeper集群是強一致性的,只要集群中有半數以上的機器存活,就可以對外提供服務。
3.3. 基本實現
實現思路:
-
多個請求同時添加一個相同的臨時節點,只有一個可以添加成功。添加成功的獲取到鎖
-
執行業務邏輯
-
完成業務流程后,刪除節點釋放鎖。
????????由于zookeeper獲取鏈接是一個耗時過程,這里可以在項目啟動時,初始化鏈接,并且只初始化一次。借助于spring特性,代碼實現如下:
@Component
public class ZkClient {private static final String connectString = "172.16.116.100:2181";private static final String ROOT_PATH = "/distributed";private ZooKeeper zooKeeper;@PostConstructpublic void init(){try {// 連接zookeeper服務器this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("獲取鏈接成功!!");}});// 創建分布式鎖根節點if (this.zooKeeper.exists(ROOT_PATH, false) == null){this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("獲取鏈接失敗!");e.printStackTrace();}}@PreDestroypublic void destroy(){try {if (zooKeeper != null){zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 初始化zk分布式鎖對象方法* @param lockName* @return*/public ZkDistributedLock getZkDistributedLock(String lockName){return new ZkDistributedLock(zooKeeper, lockName);}
}
zk分布式鎖具體實現:
public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){this.zooKeeper = zooKeeper;this.path = ROOT_PATH + "/" + lockName;}public void lock(){try {zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {// 重試try {Thread.sleep(200);lock();} catch (InterruptedException ex) {ex.printStackTrace();}}}public void unlock(){try {this.zooKeeper.delete(path, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}
改造StockService的checkAndLock方法:
@Autowired
private ZkClient client;public void checkAndLock() {// 加鎖,獲取鎖失敗重試ZkDistributedLock lock = this.client.getZkDistributedLock("lock");lock.lock();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// 釋放鎖lock.unlock();
}
Jmeter壓力測試:
????????性能一般,mysql數據庫的庫存余量為0(注意:所有測試之前都要先修改庫存量為5000)
基本實現存在的問題:
-
性能一般(比mysql分布式鎖略好)
-
不可重入
接下來首先來提高性能
3.4. 優化:性能優化
基本實現中由于無限自旋影響性能:
????????試想:每個請求要想正常的執行完成,最終都是要創建節點,如果能夠避免爭搶必然可以提高性能。
這里借助于zk的臨時序列化節點,實現分布式鎖:
3.4.1. 實現阻塞鎖
代碼實現:
public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){try {this.zooKeeper = zooKeeper;this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public void lock(){String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){return ;}// 重新檢查。是否獲取到鎖try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}public void unlock(){try {this.zooKeeper.delete(path, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 獲取指定節點的前節點* @param path* @return*/private String getPreNode(String path){try {// 獲取當前節點的序列化號Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 獲取根路徑下的所有序列化子節點List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)){return null;}// 獲取前一個節點Long flag = 0L;String preNode = null;for (String node : nodes) {// 獲取每個節點的序列化號Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag){flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}
}
主要修改了構造方法和lock方法:
并添加了getPreNode獲取前置節點的方法。
測試結果如下:
性能反而更弱了。
????????原因:雖然不用反復爭搶創建節點了,但是會自旋判斷自己是最小的節點,這個判斷邏輯反而更復雜更耗時。
解決方案:監聽。
3.4.2. 監聽實現阻塞鎖
????????對于這個算法有個極大的優化點:假如當前有1000個節點在等待鎖,如果獲得鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種情況稱為“羊群效應”;在這種羊群效應中,zookeeper需要通知1000個客戶端,這會阻塞其他的操作,最好的情況應該只喚醒新的最小節點對應的客戶端。應該怎么做呢?在設置事件監聽時,每個客戶端應該對剛好在它之前的子節點設置事件監聽,例如子節點列表為/locks/lock-0000000000、/locks/lock-0000000001、/locks/lock-0000000002,序號為1的客戶端監聽序號為0的子節點刪除消息,序號為2的監聽序號為1的子節點刪除消息。
所以調整后的分布式鎖算法流程如下:
-
客戶端連接zookeeper,并在/lock下創建臨時的且有序的子節點,第一個客戶端對應的子節點為/locks/lock-0000000000,第二個為/locks/lock-0000000001,以此類推;
-
客戶端獲取/lock下的子節點列表,判斷自己創建的子節點是否為當前子節點列表中序號最小的子節點,如果是則認為獲得鎖,否則監聽剛好在自己之前一位的子節點刪除消息,獲得子節點變更通知后重復此步驟直至獲得鎖;
-
執行業務代碼;
-
完成業務流程后,刪除對應的子節點釋放鎖。
改造ZkDistributedLock的lock方法:
public void lock(){try {String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){return ;} else {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher(){@Overridepublic void process(WatchedEvent event) {countDownLatch.countDown();}}) == null) {return;}// 阻塞。。。。countDownLatch.await();return;}} catch (Exception e) {e.printStackTrace();// 重新檢查。是否獲取到鎖try {Thread.sleep(200);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}
}
壓力測試效果如下:
由此可見性能提高不少,接近于redis的分布式鎖
3.5. 優化:可重入鎖
引入ThreadLocal線程局部變量保證zk分布式鎖的可重入性。
public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){try {this.zooKeeper = zooKeeper;if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public void lock(){Integer flag = THREAD_LOCAL.get();if (flag != null && flag > 0) {THREAD_LOCAL.set(flag + 1);return;}try {String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){THREAD_LOCAL.set(1);return ;} else {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher(){@Overridepublic void process(WatchedEvent event) {countDownLatch.countDown();}}) == null) {THREAD_LOCAL.set(1);return;}// 阻塞。。。。countDownLatch.await();THREAD_LOCAL.set(1);return;}} catch (Exception e) {e.printStackTrace();// 重新檢查。是否獲取到鎖try {Thread.sleep(200);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}}public void unlock(){try {THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);if (THREAD_LOCAL.get() == 0) {this.zooKeeper.delete(path, 0);THREAD_LOCAL.remove();}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 獲取指定節點的前節點* @param path* @return*/private String getPreNode(String path){try {// 獲取當前節點的序列化號Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 獲取根路徑下的所有序列化子節點List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)){return null;}// 獲取前一個節點Long flag = 0L;String preNode = null;for (String node : nodes) {// 獲取每個節點的序列化號Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag){flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}
}
3.6. zk分布式鎖小結
參照redis分布式鎖的特點:
-
互斥 排他:zk節點的不可重復性,以及序列化節點的有序性
-
防死鎖:
-
可自動釋放鎖:臨時節點
-
可重入鎖:借助于ThreadLocal
-
-
防誤刪:臨時節點
-
加鎖/解鎖要具備原子性
-
單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。
-
集群問題:zookeeper集群是強一致性的,只要集群中有半數以上的機器存活,就可以對外提供服務。
-
公平鎖:有序性節點
3.7. Curator中的分布式鎖
????????Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。Curator解決了很多zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊wathcer和NodeExistsException 異常等。
通過查看官方文檔,可以發現Curator主要解決了三類問題:
-
封裝ZooKeeper client與ZooKeeper server之間的連接處理
-
提供了一套Fluent風格的操作API
-
提供ZooKeeper各種應用場景(recipe, 比如:分布式鎖服務、集群領導選舉、共享計數器、緩存機制、分布式隊列等)的抽象封裝,這些實現都遵循了zk的最佳實踐,并考慮了各種極端情況
Curator由一系列的模塊構成,對于一般開發者而言,常用的是curator-framework和curator-recipes:
-
curator-framework:提供了常見的zk相關的底層操作
-
curator-recipes:提供了一些zk的典型使用場景的參考。本節重點關注的分布式鎖就是該包提供的
引入依賴:
????????最新版本的curator 4.3.0支持zookeeper 3.4.x和3.5,但是需要注意curator傳遞進來的依賴,需要和實際服務器端使用的版本相符,以我們目前使用的zookeeper 3.4.14為例。
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version>
</dependency>
添加curator客戶端配置:
@Configuration
public class CuratorConfig {@Beanpublic CuratorFramework curatorFramework(){// 重試策略,這里使用的是指數補償重試策略,重試3次,初始重試間隔1000ms,每次重試之后重試間隔遞增。RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);// 初始化Curator客戶端:指定鏈接信息 及 重試策略CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.116.100:2181", retry);client.start(); // 開始鏈接,如果不調用該方法,很多方法無法工作return client;}
}
3.7.1. 可重入鎖InterProcessMutex
????????Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。它是由類InterProcessMutex來實現。
// 常用構造方法
public InterProcessMutex(CuratorFramework client, String path)
// 獲取鎖
public void acquire();
// 帶超時時間的可重入鎖
public boolean acquire(long time, TimeUnit unit);
// 釋放鎖
public void release();
3.7.1.1. 使用案例
改造service測試方法:
@Autowired
private CuratorFramework curatorFramework;public void checkAndLock() {InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");try {// 加鎖mutex.acquire();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// this.testSub(mutex);// 釋放鎖mutex.release();} catch (Exception e) {e.printStackTrace();}
}public void testSub(InterProcessMutex mutex) {try {mutex.acquire();System.out.println("測試可重入鎖。。。。");mutex.release();} catch (Exception e) {e.printStackTrace();}
}
注意:如想重入,則需要使用同一個InterProcessMutex對象。
壓力測試結果:
3.7.1.2. 底層原理
3.7.2. 不可重入鎖InterProcessSemaphoreMutex
????????具體實現:InterProcessSemaphoreMutex。與InterProcessMutex調用方法類似,區別在于該鎖是不可重入的,在同一個線程中不可重入。
public InterProcessSemaphoreMutex(CuratorFramework client, String path);
public void acquire();
public boolean acquire(long time, TimeUnit unit);
public void release();
案例:
@Autowired
private CuratorFramework curatorFramework;public void deduct() {InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, "/curator/lock");try {mutex.acquire();// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} catch (Exception e) {e.printStackTrace();} finally {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}
}
3.7.3. 可重入讀寫鎖InterProcessReadWriteLock
????????類似JDK的ReentrantReadWriteLock。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖。從讀鎖升級成寫鎖是不成的。主要實現類InterProcessReadWriteLock:
// 構造方法
public InterProcessReadWriteLock(CuratorFramework client, String basePath);
// 獲取讀鎖對象
InterProcessMutex readLock();
// 獲取寫鎖對象
InterProcessMutex writeLock();
注意:寫鎖在釋放之前會一直阻塞請求線程,而讀鎖不會
public void testZkReadLock() {try {InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");rwlock.readLock().acquire(10, TimeUnit.SECONDS);// TODO:一頓讀的操作。。。。//rwlock.readLock().unlock();} catch (Exception e) {e.printStackTrace();}
}public void testZkWriteLock() {try {InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");rwlock.writeLock().acquire(10, TimeUnit.SECONDS);// TODO:一頓寫的操作。。。。//rwlock.writeLock().unlock();} catch (Exception e) {e.printStackTrace();}
}
3.7.4. 聯鎖InterProcessMultiLock
????????Multi Shared Lock是一個鎖的容器。當調用acquire, 所有的鎖都會被acquire,如果請求失敗,所有的鎖都會被release。同樣調用release時所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。實現類InterProcessMultiLock:
// 構造函數需要包含的鎖的集合,或者一組ZooKeeper的path
public InterProcessMultiLock(List<InterProcessLock> locks);
public InterProcessMultiLock(CuratorFramework client, List<String> paths);// 獲取鎖
public void acquire();
public boolean acquire(long time, TimeUnit unit);// 釋放鎖
public synchronized void release();
3.7.5. 信號量InterProcessSemaphoreV2
????????一個計數的信號量類似JDK的Semaphore。JDK中Semaphore維護的一組許可(permits),而Cubator中稱之為租約(Lease)。注意,所有的實例必須使用相同的numberOfLeases值。調用acquire會返回一個租約對象。客戶端必須在finally中close這些租約對象,否則這些租約會丟失掉。但是,如果客戶端session由于某種原因比如crash丟掉, 那么這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續使用這些租約。主要實現類InterProcessSemaphoreV2:
// 構造方法
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);// 注意一次你可以請求多個租約,如果Semaphore當前的租約不夠,則請求線程會被阻塞。
// 同時還提供了超時的重載方法
public Lease acquire();
public Collection<Lease> acquire(int qty);
public Lease acquire(long time, TimeUnit unit);
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)// 租約還可以通過下面的方式返還
public void returnAll(Collection<Lease> leases);
public void returnLease(Lease lease);
案例代碼:
StockController中添加方法:
@GetMapping("test/semaphore")
public String testSemaphore(){this.stockService.testSemaphore();return "hello Semaphore";
}
StockService中添加方法:
public void testSemaphore() {// 設置資源量 限流的線程數InterProcessSemaphoreV2 semaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/locks/semaphore", 5);try {Lease acquire = semaphoreV2.acquire();// 獲取資源,獲取資源成功的線程可以繼續處理業務操作。否則會被阻塞住this.redisTemplate.opsForList().rightPush("log", "10010獲取了資源,開始處理業務邏輯。" + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(10 + new Random().nextInt(10));this.redisTemplate.opsForList().rightPush("log", "10010處理完業務邏輯,釋放資源=====================" + Thread.currentThread().getName());semaphoreV2.returnLease(acquire); // 手動釋放資源,后續請求線程就可以獲取該資源} catch (Exception e) {e.printStackTrace();}
}
3.7.6. 柵欄barrier
-
DistributedBarrier構造函數中barrierPath參數用來確定一個柵欄,只要barrierPath參數相同(路徑相同)就是同一個柵欄。通常情況下柵欄的使用如下:
-
主client設置一個柵欄
-
其他客戶端就會調用waitOnBarrier()等待柵欄移除,程序處理線程阻塞
-
主client移除柵欄,其他客戶端的處理程序就會同時繼續運行。
DistributedBarrier類的主要方法如下:
setBarrier() - 設置柵欄 waitOnBarrier() - 等待柵欄移除 removeBarrier() - 移除柵欄
-
-
DistributedDoubleBarrier雙柵欄,允許客戶端在計算的開始和結束時同步。當足夠的進程加入到雙柵欄時,進程開始計算,當計算完成時,離開柵欄。DistributedDoubleBarrier實現了雙柵欄的功能。構造函數如下:
// client - the client // barrierPath - path to use // memberQty - the number of members in the barrier public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty); ? enter()、enter(long maxWait, TimeUnit unit) - 等待同時進入柵欄 leave()、leave(long maxWait, TimeUnit unit) - 等待同時離開柵欄
memberQty是成員數量,當enter方法被調用時,成員被阻塞,直到所有的成員都調用了enter。當leave方法被調用時,它也阻塞調用線程,直到所有的成員都調用了leave。
注意:參數memberQty的值只是一個閾值,而不是一個限制值。當等待柵欄的數量大于或等于這個值柵欄就會打開!
與柵欄(DistributedBarrier)一樣,雙柵欄的barrierPath參數也是用來確定是否是同一個柵欄的,雙柵欄的使用情況如下:
-
從多個客戶端在同一個路徑上創建雙柵欄(DistributedDoubleBarrier),然后調用enter()方法,等待柵欄數量達到memberQty時就可以進入柵欄。
-
柵欄數量達到memberQty,多個客戶端同時停止阻塞繼續運行,直到執行leave()方法,等待memberQty個數量的柵欄同時阻塞到leave()方法中。
-
memberQty個數量的柵欄同時阻塞到leave()方法中,多個客戶端的leave()方法停止阻塞,繼續運行。
-
3.7.7. 共享計數器
????????利用ZooKeeper可以實現一個集群共享的計數器。只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數,一個用long來計數。
3.7.7.1. SharedCount
共享計數器SharedCount相關方法如下:
// 構造方法
public SharedCount(CuratorFramework client, String path, int seedValue);
// 獲取共享計數的值
public int getCount();
// 設置共享計數的值
public void setCount(int newCount) throws Exception;
// 當版本號沒有變化時,才會更新共享變量的值
public boolean trySetCount(VersionedValue<Integer> previous, int newCount);
// 通過監聽器監聽共享計數的變化
public void addListener(SharedCountListener listener);
public void addListener(final SharedCountListener listener, Executor executor);
// 共享計數在使用之前必須開啟
public void start() throws Exception;
// 關閉共享計數
public void close() throws IOException;
使用案例:
StockController:
@GetMapping("test/zk/share/count")
public String testZkShareCount(){this.stockService.testZkShareCount();return "hello shareData";
}
StockService:
public void testZkShareCount() {try {// 第三個參數是共享計數的初始值SharedCount sharedCount = new SharedCount(curatorFramework, "/curator/count", 0);// 啟動共享計數器sharedCount.start();// 獲取共享計數的值int count = sharedCount.getCount();// 修改共享計數的值int random = new Random().nextInt(1000);sharedCount.setCount(random);System.out.println("我獲取了共享計數的初始值:" + count + ",并把計數器的值改為:" + random);sharedCount.close();} catch (Exception e) {e.printStackTrace();}
}
3.7.7.2. DistributedAtomicNumber
????????DistributedAtomicNumber接口是分布式原子數值類型的抽象,定義了分布式原子數值類型需要提供的方法。
????????DistributedAtomicNumber接口有兩個實現:DistributedAtomicLong
和 DistributedAtomicInteger
????????這兩個實現將各種原子操作的執行委托給了DistributedAtomicValue
,所以這兩種實現是類似的,只不過表示的數值類型不同而已。這里以DistributedAtomicLong
為例進行演示
????????DistributedAtomicLong除了計數的范圍比SharedCount大了之外,比SharedCount更簡單易用。它首先嘗試使用樂觀鎖的方式設置計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。此計數器有一系列的操作:
-
get(): 獲取當前值
-
increment():加一
-
decrement(): 減一
-
add():增加特定的值
-
subtract(): 減去特定的值
-
trySet(): 嘗試設置計數值
-
forceSet(): 強制設置計數值
????????你必須檢查返回結果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
????????