分布式鎖之基于zookeeper實現分布式鎖(三)

3. 基于zookeeper實現分布式鎖

????????實現分布式鎖目前有三種流行方案,分別為基于數據庫、Redis、Zookeeper的方案。這里主要介紹基于zk怎么實現分布式鎖。在實現分布式鎖之前,先回顧zookeeper的相關知識點

3.1. 知識點回顧

3.1.1. 安裝啟動

安裝:把zk安裝包上傳到/opt目錄下,并切換到/opt目錄下,執行以下指令

# 解壓
tar -zxvf zookeeper-3.7.0-bin.tar.gz
# 重命名
mv apache-zookeeper-3.7.0-bin/ zookeeper
# 打開zookeeper根目錄
cd /opt/zookeeper
# 創建一個數據目錄,備用
mkdir data
# 打開zk的配置目錄
cd /opt/zookeeper/conf
# copy配置文件,zk啟動時會加載zoo.cfg文件
cp zoo_sample.cfg zoo.cfg
# 編輯配置文件
vim zoo.cfg
# 修改dataDir參數為之前創建的數據目錄:/opt/zookeeper/data
# 切換到bin目錄
cd /opt/zookeeper/bin
# 啟動 
./zkServer.sh start
./zkServer.sh status # 查看啟動狀態
./zkServer.sh stop # 停止
./zkServer.sh restart # 重啟
./zkCli.sh # 查看zk客戶端

如下,說明啟動成功:

3.1.2. 相關概念

Zookeeper提供一個多層級的節點命名空間(節點稱為znode),每個節點都用一個以斜杠(/)分隔的路徑表示,而且每個節點都有父節點(根節點除外),非常類似于文件系統。并且每個節點都是唯一的。

znode節點有四種類型:

  • PERSISTENT:永久節點。客戶端與zookeeper斷開連接后,該節點依舊存在

  • EPHEMERAL:臨時節點。客戶端與zookeeper斷開連接后,該節點被刪除

  • PERSISTENT_SEQUENTIAL:永久節點、序列化。客戶端與zookeeper斷開連接后,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號

  • EPHEMERAL_SEQUENTIAL:臨時節點、序列化。客戶端與zookeeper斷開連接后,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號

創建這四種節點:

[zk: localhost:2181(CONNECTED) 0] create /aa test  # 創建持久化節點
Created /aa
[zk: localhost:2181(CONNECTED) 1] create -s /bb test  # 創建持久序列化節點
Created /bb0000000001
[zk: localhost:2181(CONNECTED) 2] create -e /cc test  # 創建臨時節點
Created /cc
[zk: localhost:2181(CONNECTED) 3] create -e -s /dd test  # 創建臨時序列化節點
Created /dd0000000003
[zk: localhost:2181(CONNECTED) 4] ls /   # 查看某個節點下的子節點
[aa, bb0000000001, cc, dd0000000003, zookeeper]
[zk: localhost:2181(CONNECTED) 5] stat /  # 查看某個節點的狀態
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x5
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 5
[zk: localhost:2181(CONNECTED) 6] get /aa  # 查看某個節點的內容
test
[zk: localhost:2181(CONNECTED) 11] delete /aa  # 刪除某個節點
[zk: localhost:2181(CONNECTED) 7] ls /  # 再次查看
[bb0000000001, cc, dd0000000003, zookeeper]

????????事件監聽:在讀取數據時,我們可以同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper針對節點的監聽有如下四種事件:

  1. 節點創建:stat -w /xx

    當/xx節點創建時:NodeCreated

  2. 節點刪除:stat -w /xx

    當/xx節點刪除時:NodeDeleted

  3. 節點數據修改:get -w /xx

    當/xx節點數據發生變化時:NodeDataChanged

  4. 子節點變更:ls -w /xx

    當/xx節點的子節點創建或者刪除時:NodeChildChanged

3.1.3. java客戶端

????????ZooKeeper的java客戶端有:原生客戶端、ZkClient、Curator框架(類似于redisson,有很多功能性封裝)。

  • 引入依賴

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version>
</dependency>
  • 常用api及其方法

public class ZkTest {public static void main(String[] args) throws KeeperException, InterruptedException {// 獲取zookeeper鏈接CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = null;try {zooKeeper = new ZooKeeper("172.16.116.100:2181", 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState()) && Event.EventType.None.equals(event.getType())) {System.out.println("獲取鏈接成功。。。。。。" + event);countDownLatch.countDown();}}});countDownLatch.await();} catch (Exception e) {e.printStackTrace();}// 創建一個節點,1-節點路徑 2-節點內容 3-節點的訪問權限 4-節點類型// OPEN_ACL_UNSAFE:任何人可以操作該節點// CREATOR_ALL_ACL:創建者擁有所有訪問權限// READ_ACL_UNSAFE: 任何人都可以讀取該節點// zooKeeper.create("/atguigu/aa", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zooKeeper.create("/test", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);// zooKeeper.create("/atguigu/cc", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// zooKeeper.create("/atguigu/dd", "haha~~".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判斷節點是否存在Stat stat = zooKeeper.exists("/test", true);if (stat != null){System.out.println("當前節點存在!" + stat.getVersion());} else {System.out.println("當前節點不存在!");}// 判斷節點是否存在,同時添加監聽zooKeeper.exists("/test", event -> {});// 獲取一個節點的數據byte[] data = zooKeeper.getData("/atguigu/ss0000000001", false, null);System.out.println(new String(data));// 查詢一個節點的所有子節點List<String> children = zooKeeper.getChildren("/test", false);System.out.println(children);// 更新zooKeeper.setData("/test", "wawa...".getBytes(), stat.getVersion());// 刪除一個節點//zooKeeper.delete("/test", -1);if (zooKeeper != null){zooKeeper.close();}}
}

3.2. 思路分析

分布式鎖的步驟:

  1. 獲取鎖:create一個節點

  2. 刪除鎖:delete一個節點

  3. 重試:沒有獲取到鎖的請求重試

參照redis分布式鎖的特點:

  1. 互斥 排他

  2. 防死鎖:

    1. 可自動釋放鎖(臨時節點) :獲得鎖之后客戶端所在機器宕機了,客戶端沒有主動刪除子節點;如果創建的是永久的節點,那么這個鎖永遠不會釋放,導致死鎖;由于創建的是臨時節點,客戶端宕機后,過了一定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節點刪除從而釋放鎖。

    2. 可重入鎖:借助于ThreadLocal

  3. 防誤刪:宕機自動釋放臨時節點,不需要設置過期時間,也就不存在誤刪問題。

  4. 加鎖/解鎖要具備原子性

  5. 單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。

  6. 集群問題:zookeeper集群是強一致性的,只要集群中有半數以上的機器存活,就可以對外提供服務。

3.3. 基本實現

實現思路:

  1. 多個請求同時添加一個相同的臨時節點,只有一個可以添加成功。添加成功的獲取到鎖

  2. 執行業務邏輯

  3. 完成業務流程后,刪除節點釋放鎖。

????????由于zookeeper獲取鏈接是一個耗時過程,這里可以在項目啟動時,初始化鏈接,并且只初始化一次。借助于spring特性,代碼實現如下:

@Component
public class ZkClient {private static final String connectString = "172.16.116.100:2181";private static final String ROOT_PATH = "/distributed";private ZooKeeper zooKeeper;@PostConstructpublic void init(){try {// 連接zookeeper服務器this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("獲取鏈接成功!!");}});// 創建分布式鎖根節點if (this.zooKeeper.exists(ROOT_PATH, false) == null){this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("獲取鏈接失敗!");e.printStackTrace();}}@PreDestroypublic void destroy(){try {if (zooKeeper != null){zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 初始化zk分布式鎖對象方法* @param lockName* @return*/public ZkDistributedLock getZkDistributedLock(String lockName){return new ZkDistributedLock(zooKeeper, lockName);}
}

zk分布式鎖具體實現:

public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){this.zooKeeper = zooKeeper;this.path = ROOT_PATH + "/" + lockName;}public void lock(){try {zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {// 重試try {Thread.sleep(200);lock();} catch (InterruptedException ex) {ex.printStackTrace();}}}public void unlock(){try {this.zooKeeper.delete(path, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}

改造StockService的checkAndLock方法:

@Autowired
private ZkClient client;public void checkAndLock() {// 加鎖,獲取鎖失敗重試ZkDistributedLock lock = this.client.getZkDistributedLock("lock");lock.lock();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// 釋放鎖lock.unlock();
}

Jmeter壓力測試:

????????性能一般,mysql數據庫的庫存余量為0(注意:所有測試之前都要先修改庫存量為5000)

基本實現存在的問題:

  1. 性能一般(比mysql分布式鎖略好)

  2. 不可重入

接下來首先來提高性能

3.4. 優化:性能優化

基本實現中由于無限自旋影響性能:

????????試想:每個請求要想正常的執行完成,最終都是要創建節點,如果能夠避免爭搶必然可以提高性能。

這里借助于zk的臨時序列化節點,實現分布式鎖:

3.4.1. 實現阻塞鎖

代碼實現:

public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){try {this.zooKeeper = zooKeeper;this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public void lock(){String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){return ;}// 重新檢查。是否獲取到鎖try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}public void unlock(){try {this.zooKeeper.delete(path, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 獲取指定節點的前節點* @param path* @return*/private String getPreNode(String path){try {// 獲取當前節點的序列化號Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 獲取根路徑下的所有序列化子節點List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)){return null;}// 獲取前一個節點Long flag = 0L;String preNode = null;for (String node : nodes) {// 獲取每個節點的序列化號Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag){flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}
}

主要修改了構造方法和lock方法:

并添加了getPreNode獲取前置節點的方法。

測試結果如下:

性能反而更弱了。

????????原因:雖然不用反復爭搶創建節點了,但是會自旋判斷自己是最小的節點,這個判斷邏輯反而更復雜更耗時。

解決方案:監聽。

3.4.2. 監聽實現阻塞鎖

????????對于這個算法有個極大的優化點:假如當前有1000個節點在等待鎖,如果獲得鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種情況稱為“羊群效應”;在這種羊群效應中,zookeeper需要通知1000個客戶端,這會阻塞其他的操作,最好的情況應該只喚醒新的最小節點對應的客戶端。應該怎么做呢?在設置事件監聽時,每個客戶端應該對剛好在它之前的子節點設置事件監聽,例如子節點列表為/locks/lock-0000000000、/locks/lock-0000000001、/locks/lock-0000000002,序號為1的客戶端監聽序號為0的子節點刪除消息,序號為2的監聽序號為1的子節點刪除消息。

所以調整后的分布式鎖算法流程如下:

  • 客戶端連接zookeeper,并在/lock下創建臨時的且有序的子節點,第一個客戶端對應的子節點為/locks/lock-0000000000,第二個為/locks/lock-0000000001,以此類推;

  • 客戶端獲取/lock下的子節點列表,判斷自己創建的子節點是否為當前子節點列表中序號最小的子節點,如果是則認為獲得鎖,否則監聽剛好在自己之前一位的子節點刪除消息,獲得子節點變更通知后重復此步驟直至獲得鎖;

  • 執行業務代碼;

  • 完成業務流程后,刪除對應的子節點釋放鎖。

改造ZkDistributedLock的lock方法:

public void lock(){try {String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){return ;} else {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher(){@Overridepublic void process(WatchedEvent event) {countDownLatch.countDown();}}) == null) {return;}// 阻塞。。。。countDownLatch.await();return;}} catch (Exception e) {e.printStackTrace();// 重新檢查。是否獲取到鎖try {Thread.sleep(200);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}
}

壓力測試效果如下:

由此可見性能提高不少,接近于redis的分布式鎖

3.5. 優化:可重入鎖

引入ThreadLocal線程局部變量保證zk分布式鎖的可重入性。

public class ZkDistributedLock {private static final String ROOT_PATH = "/distributed";private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();private String path;private ZooKeeper zooKeeper;public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){try {this.zooKeeper = zooKeeper;if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public void lock(){Integer flag = THREAD_LOCAL.get();if (flag != null && flag > 0) {THREAD_LOCAL.set(flag + 1);return;}try {String preNode = getPreNode(path);// 如果該節點沒有前一個節點,說明該節點時最小節點,放行執行業務邏輯if (StringUtils.isEmpty(preNode)){THREAD_LOCAL.set(1);return ;} else {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher(){@Overridepublic void process(WatchedEvent event) {countDownLatch.countDown();}}) == null) {THREAD_LOCAL.set(1);return;}// 阻塞。。。。countDownLatch.await();THREAD_LOCAL.set(1);return;}} catch (Exception e) {e.printStackTrace();// 重新檢查。是否獲取到鎖try {Thread.sleep(200);} catch (InterruptedException ex) {ex.printStackTrace();}lock();}}public void unlock(){try {THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);if (THREAD_LOCAL.get() == 0) {this.zooKeeper.delete(path, 0);THREAD_LOCAL.remove();}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 獲取指定節點的前節點* @param path* @return*/private String getPreNode(String path){try {// 獲取當前節點的序列化號Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 獲取根路徑下的所有序列化子節點List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)){return null;}// 獲取前一個節點Long flag = 0L;String preNode = null;for (String node : nodes) {// 獲取每個節點的序列化號Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag){flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}
}

3.6. zk分布式鎖小結

參照redis分布式鎖的特點:

  1. 互斥 排他:zk節點的不可重復性,以及序列化節點的有序性

  2. 防死鎖:

    1. 可自動釋放鎖:臨時節點

    2. 可重入鎖:借助于ThreadLocal

  3. 防誤刪:臨時節點

  4. 加鎖/解鎖要具備原子性

  5. 單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。

  6. 集群問題:zookeeper集群是強一致性的,只要集群中有半數以上的機器存活,就可以對外提供服務。

  7. 公平鎖:有序性節點

3.7. Curator中的分布式鎖

????????Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。Curator解決了很多zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊wathcer和NodeExistsException 異常等。

通過查看官方文檔,可以發現Curator主要解決了三類問題:

  • 封裝ZooKeeper client與ZooKeeper server之間的連接處理

  • 提供了一套Fluent風格的操作API

  • 提供ZooKeeper各種應用場景(recipe, 比如:分布式鎖服務、集群領導選舉、共享計數器、緩存機制、分布式隊列等)的抽象封裝,這些實現都遵循了zk的最佳實踐,并考慮了各種極端情況

Curator由一系列的模塊構成,對于一般開發者而言,常用的是curator-framework和curator-recipes:

  • curator-framework:提供了常見的zk相關的底層操作

  • curator-recipes:提供了一些zk的典型使用場景的參考。本節重點關注的分布式鎖就是該包提供的

引入依賴:

????????最新版本的curator 4.3.0支持zookeeper 3.4.x和3.5,但是需要注意curator傳遞進來的依賴,需要和實際服務器端使用的版本相符,以我們目前使用的zookeeper 3.4.14為例。

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version>
</dependency>

添加curator客戶端配置:

@Configuration
public class CuratorConfig {@Beanpublic CuratorFramework curatorFramework(){// 重試策略,這里使用的是指數補償重試策略,重試3次,初始重試間隔1000ms,每次重試之后重試間隔遞增。RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);// 初始化Curator客戶端:指定鏈接信息 及 重試策略CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.116.100:2181", retry);client.start(); // 開始鏈接,如果不調用該方法,很多方法無法工作return client;}
}

3.7.1. 可重入鎖InterProcessMutex

????????Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。它是由類InterProcessMutex來實現。

// 常用構造方法
public InterProcessMutex(CuratorFramework client, String path)
// 獲取鎖
public void acquire();
// 帶超時時間的可重入鎖
public boolean acquire(long time, TimeUnit unit);
// 釋放鎖
public void release();
3.7.1.1. 使用案例

改造service測試方法:

@Autowired
private CuratorFramework curatorFramework;public void checkAndLock() {InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");try {// 加鎖mutex.acquire();// 先查詢庫存是否充足Stock stock = this.stockMapper.selectById(1L);// 再減庫存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// this.testSub(mutex);// 釋放鎖mutex.release();} catch (Exception e) {e.printStackTrace();}
}public void testSub(InterProcessMutex mutex) {try {mutex.acquire();System.out.println("測試可重入鎖。。。。");mutex.release();} catch (Exception e) {e.printStackTrace();}
}

注意:如想重入,則需要使用同一個InterProcessMutex對象。

壓力測試結果:

3.7.1.2. 底層原理

3.7.2. 不可重入鎖InterProcessSemaphoreMutex

????????具體實現:InterProcessSemaphoreMutex。與InterProcessMutex調用方法類似,區別在于該鎖是不可重入的,在同一個線程中不可重入。

public InterProcessSemaphoreMutex(CuratorFramework client, String path);
public void acquire();
public boolean acquire(long time, TimeUnit unit);
public void release();

案例:

@Autowired
private CuratorFramework curatorFramework;public void deduct() {InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, "/curator/lock");try {mutex.acquire();// 1. 查詢庫存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判斷庫存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣減庫存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} catch (Exception e) {e.printStackTrace();} finally {try {mutex.release();} catch (Exception e) {e.printStackTrace();}}
}

3.7.3. 可重入讀寫鎖InterProcessReadWriteLock

????????類似JDK的ReentrantReadWriteLock。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖。從讀鎖升級成寫鎖是不成的。主要實現類InterProcessReadWriteLock:

// 構造方法
public InterProcessReadWriteLock(CuratorFramework client, String basePath);
// 獲取讀鎖對象
InterProcessMutex readLock();
// 獲取寫鎖對象
InterProcessMutex writeLock();

注意:寫鎖在釋放之前會一直阻塞請求線程,而讀鎖不會

public void testZkReadLock() {try {InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");rwlock.readLock().acquire(10, TimeUnit.SECONDS);// TODO:一頓讀的操作。。。。//rwlock.readLock().unlock();} catch (Exception e) {e.printStackTrace();}
}public void testZkWriteLock() {try {InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");rwlock.writeLock().acquire(10, TimeUnit.SECONDS);// TODO:一頓寫的操作。。。。//rwlock.writeLock().unlock();} catch (Exception e) {e.printStackTrace();}
}

3.7.4. 聯鎖InterProcessMultiLock

????????Multi Shared Lock是一個鎖的容器。當調用acquire, 所有的鎖都會被acquire,如果請求失敗,所有的鎖都會被release。同樣調用release時所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。實現類InterProcessMultiLock:

// 構造函數需要包含的鎖的集合,或者一組ZooKeeper的path
public InterProcessMultiLock(List<InterProcessLock> locks);
public InterProcessMultiLock(CuratorFramework client, List<String> paths);// 獲取鎖
public void acquire();
public boolean acquire(long time, TimeUnit unit);// 釋放鎖
public synchronized void release();

3.7.5. 信號量InterProcessSemaphoreV2

????????一個計數的信號量類似JDK的Semaphore。JDK中Semaphore維護的一組許可(permits),而Cubator中稱之為租約(Lease)。注意,所有的實例必須使用相同的numberOfLeases值。調用acquire會返回一個租約對象。客戶端必須在finally中close這些租約對象,否則這些租約會丟失掉。但是,如果客戶端session由于某種原因比如crash丟掉, 那么這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續使用這些租約。主要實現類InterProcessSemaphoreV2:

// 構造方法
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);// 注意一次你可以請求多個租約,如果Semaphore當前的租約不夠,則請求線程會被阻塞。
// 同時還提供了超時的重載方法
public Lease acquire();
public Collection<Lease> acquire(int qty);
public Lease acquire(long time, TimeUnit unit);
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)// 租約還可以通過下面的方式返還
public void returnAll(Collection<Lease> leases);
public void returnLease(Lease lease);

案例代碼:

StockController中添加方法:

@GetMapping("test/semaphore")
public String testSemaphore(){this.stockService.testSemaphore();return "hello Semaphore";
}

StockService中添加方法:

public void testSemaphore() {// 設置資源量 限流的線程數InterProcessSemaphoreV2 semaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/locks/semaphore", 5);try {Lease acquire = semaphoreV2.acquire();// 獲取資源,獲取資源成功的線程可以繼續處理業務操作。否則會被阻塞住this.redisTemplate.opsForList().rightPush("log", "10010獲取了資源,開始處理業務邏輯。" + Thread.currentThread().getName());TimeUnit.SECONDS.sleep(10 + new Random().nextInt(10));this.redisTemplate.opsForList().rightPush("log", "10010處理完業務邏輯,釋放資源=====================" + Thread.currentThread().getName());semaphoreV2.returnLease(acquire); // 手動釋放資源,后續請求線程就可以獲取該資源} catch (Exception e) {e.printStackTrace();}
}

3.7.6. 柵欄barrier

  1. DistributedBarrier構造函數中barrierPath參數用來確定一個柵欄,只要barrierPath參數相同(路徑相同)就是同一個柵欄。通常情況下柵欄的使用如下:

    1. 主client設置一個柵欄

    2. 其他客戶端就會調用waitOnBarrier()等待柵欄移除,程序處理線程阻塞

    3. 主client移除柵欄,其他客戶端的處理程序就會同時繼續運行。

    DistributedBarrier類的主要方法如下:

    setBarrier() - 設置柵欄
    waitOnBarrier() - 等待柵欄移除
    removeBarrier() - 移除柵欄
  2. DistributedDoubleBarrier雙柵欄,允許客戶端在計算的開始和結束時同步。當足夠的進程加入到雙柵欄時,進程開始計算,當計算完成時,離開柵欄。DistributedDoubleBarrier實現了雙柵欄的功能。構造函數如下:

    // client - the client
    // barrierPath - path to use
    // memberQty - the number of members in the barrier
    public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);
    ?
    enter()、enter(long maxWait, TimeUnit unit) - 等待同時進入柵欄
    leave()、leave(long maxWait, TimeUnit unit) - 等待同時離開柵欄

    memberQty是成員數量,當enter方法被調用時,成員被阻塞,直到所有的成員都調用了enter。當leave方法被調用時,它也阻塞調用線程,直到所有的成員都調用了leave。

    注意:參數memberQty的值只是一個閾值,而不是一個限制值。當等待柵欄的數量大于或等于這個值柵欄就會打開!

    與柵欄(DistributedBarrier)一樣,雙柵欄的barrierPath參數也是用來確定是否是同一個柵欄的,雙柵欄的使用情況如下:

    1. 從多個客戶端在同一個路徑上創建雙柵欄(DistributedDoubleBarrier),然后調用enter()方法,等待柵欄數量達到memberQty時就可以進入柵欄。

    2. 柵欄數量達到memberQty,多個客戶端同時停止阻塞繼續運行,直到執行leave()方法,等待memberQty個數量的柵欄同時阻塞到leave()方法中。

    3. memberQty個數量的柵欄同時阻塞到leave()方法中,多個客戶端的leave()方法停止阻塞,繼續運行。

3.7.7. 共享計數器

????????利用ZooKeeper可以實現一個集群共享的計數器。只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數,一個用long來計數。

3.7.7.1. SharedCount

共享計數器SharedCount相關方法如下:

// 構造方法
public SharedCount(CuratorFramework client, String path, int seedValue);
// 獲取共享計數的值
public int getCount();
// 設置共享計數的值
public void setCount(int newCount) throws Exception;
// 當版本號沒有變化時,才會更新共享變量的值
public boolean  trySetCount(VersionedValue<Integer> previous, int newCount);
// 通過監聽器監聽共享計數的變化
public void addListener(SharedCountListener listener);
public void addListener(final SharedCountListener listener, Executor executor);
// 共享計數在使用之前必須開啟
public void start() throws Exception;
// 關閉共享計數
public void close() throws IOException;

使用案例:

StockController:

@GetMapping("test/zk/share/count")
public String testZkShareCount(){this.stockService.testZkShareCount();return "hello shareData";
}

StockService:

public void testZkShareCount() {try {// 第三個參數是共享計數的初始值SharedCount sharedCount = new SharedCount(curatorFramework, "/curator/count", 0);// 啟動共享計數器sharedCount.start();// 獲取共享計數的值int count = sharedCount.getCount();// 修改共享計數的值int random = new Random().nextInt(1000);sharedCount.setCount(random);System.out.println("我獲取了共享計數的初始值:" + count + ",并把計數器的值改為:" + random);sharedCount.close();} catch (Exception e) {e.printStackTrace();}
}
3.7.7.2. DistributedAtomicNumber

????????DistributedAtomicNumber接口是分布式原子數值類型的抽象,定義了分布式原子數值類型需要提供的方法。

????????DistributedAtomicNumber接口有兩個實現:DistributedAtomicLongDistributedAtomicInteger

????????這兩個實現將各種原子操作的執行委托給了DistributedAtomicValue,所以這兩種實現是類似的,只不過表示的數值類型不同而已。這里以DistributedAtomicLong 為例進行演示

????????DistributedAtomicLong除了計數的范圍比SharedCount大了之外,比SharedCount更簡單易用。它首先嘗試使用樂觀鎖的方式設置計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。此計數器有一系列的操作:

  • get(): 獲取當前值

  • increment():加一

  • decrement(): 減一

  • add():增加特定的值

  • subtract(): 減去特定的值

  • trySet(): 嘗試設置計數值

  • forceSet(): 強制設置計數值

????????你必須檢查返回結果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

????????

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/165427.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/165427.shtml
英文地址,請注明出處:http://en.pswp.cn/news/165427.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

『亞馬遜云科技產品測評』活動征文|搭建圖床chevereto

『亞馬遜云科技產品測評』活動征文&#xff5c;搭建圖床chevereto 提示&#xff1a;本篇文章授權活動官方亞馬遜云科技文章轉發、改寫權&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒體平臺&#xff0c;第三方開發者媒體等亞馬遜云科技官方渠道 文章目錄 『…

Python 獲取本地和廣域網 IP

Python 獲取本地IP &#xff0c;使用第三方庫&#xff0c;比如 netifaces import netifaces as nidef get_ip_address():try:# 獲取默認網絡接口&#xff08;通常是 eth0 或 en0&#xff09;default_interface ni.gateways()[default][ni.AF_INET][1]# 獲取指定網絡接口的IP地…

字符串相加

題意&#xff1a; 給定兩個字符串形式的非負整數 num1 和num2 &#xff0c;計算它們的和并同樣以字符串形式返回。 你不能使用任何內建的用于處理大整數的庫&#xff08;比如 BigInteger&#xff09;&#xff0c; 也不能直接將輸入的字符串轉換為整數形式。 示例 1&#xff…

利用STM32CubeMX解讀時鐘樹

1&#xff0c;低速時鐘 LSE是外部晶振作時鐘源&#xff0c;主要提供給實時時鐘模塊&#xff0c;所以一般采用32.768KHz。LSI是由內部RC振蕩器產生&#xff0c;也主要提供給實時時鐘模塊&#xff0c;頻率大約為40KHz。(LSE和LSI)只是提供給芯片中的RTC(實時時鐘)及IWDG(獨立看門…

【c++】棧教程

今天來講講棧 棧是什么&#xff1f; 老樣子&#xff0c;先來看一道題&#xff1a; 【棧】棧的基本操作 描述 棧的定義&#xff1a;棧是一種特殊的表這種表只在表頭進行插入和刪除操作。因此&#xff0c;表頭對于棧來說具有特殊的意義&#xff0c;稱為棧頂。相應地&#xff0…

佳易王羽毛球館計時計費軟件燈控系統安裝教程

佳易王羽毛球館計時計費軟件燈控系統安裝教程 佳易王羽毛球館計時計費軟件&#xff0c;點擊開始計時的時候&#xff0c;自動打開燈&#xff0c;結賬后自動關閉燈。 因為場館每一場地的燈功率都很大&#xff0c;需要加裝交流接觸器。這個由專業電工施工。 1、計時計費功能 &…

docker安裝mysql8

之前自己在網上找了一些docker安裝mysql8的方法&#xff0c;結果都不行&#xff0c;于是自己根據自己遇到的情況再結合網上搜索到的安裝方式調整了一下&#xff0c;成功執行安裝。以下是我自己的執行命令 先拉取docekr鏡像 docker pull mysql:8.0.20啟動鏡像 docker run -p 3…

使用Git bash切換Gitee、GitHub多個Git賬號

Git是分布式代碼管理工具&#xff0c;使用命令行的方式提交commit、revert回滾代碼。這里介紹使用Git bash軟件來切換Gitee、GitHub賬號。 ? ? 假設在gitee.com上的郵箱是alicefoxmail.com 、用戶名為alice&#xff1b;在github上的郵箱是bobfoxmail.com、用戶名為bob。 賬號…

tcp/ip協議2實現的插圖,數據結構2 (19 - 章)

(68) 68 十九1 選路請求與消息 函rtalloc,rtalloc1,rtfree (69)

HarmonyOS ArkTS 保存應用數據(十)

1 概述 在移動互聯網蓬勃發展的今天&#xff0c;移動應用給我們生活帶來了極大的便利&#xff0c;這些便利的本質在于數據的互聯互通。因此在應用的開發中數據存儲占據了非常重要的位置&#xff0c;HarmonyOS應用開發也不例外。 2 什么是首選項 首選項為應用提供Key-Value鍵…

Java面向對象第2天

精華筆記&#xff1a; 構造方法&#xff1a;構造函數、構造器、構建器---------------復用給成員變量賦初始值代碼 作用&#xff1a;給成員變量賦初始值 與類同名&#xff0c;沒有返回值類型(連void都沒有) 在創建(new)對象時被自動調用 若自己不寫構造方法&#xff0c;則編…

Electron+VUE3開發簡版的編輯器【文件預覽】

簡版編輯器的功能主要是: 打開對話框,選擇文件后臺讀取文件文件前端展示文件內容。主要技術棧是VUE3、Electron和Nodejs,VUE3做頁面交互,Electron提供一個可執行Nodejs的環境以及支撐整個應用的環境,nodeJS負責讀取文件內容。 環境配置、安裝依賴這些步驟就不再敘述了。 …

SQL Server 百萬數據查詢優化技巧三十則

點擊上方藍字關注我 互聯網時代的進程越走越深&#xff0c;使用MySQL的人也越來越多&#xff0c;關于MySQL的數據庫優化指南很多&#xff0c;而關于SQL SERVER的T-SQL優化指南看上去比較少&#xff0c;近期有學習SQLSERVER的同學問到SQL SERVER數據庫有哪些優化建議&#xff1f…

Linux進程通信——信號(一)

原理 對于 Linux來說&#xff0c;實際信號是軟中斷&#xff0c;許多重要的程序都需要處理信號。 信號&#xff0c;為 Linux 提供了一種處理異步事件的方法。比如&#xff0c;終端用戶輸入了ctrlc來中斷程序&#xff0c;會通過信號機制停止一個程序。 概述 信號的名字和編號 …

【Docker】從零開始:8.Docker命令:Commit提交命令

【Docker】從零開始&#xff1a;8.Docker命令:Commit命令 基本概念鏡像鏡像分層什么是鏡像分層為什么 Docker 鏡像要采用這種分層結構 本章要點commit 命令命令格式docker commit 操作參數實例演示1.下載一個新的ubuntu鏡像2.運行容器3.查看并安裝vim4.退出容器5提交自己的鏡像…

【數據結構/C++】線性表_雙鏈表基本操作

#include <iostream> using namespace std; typedef int ElemType; // 3. 雙鏈表 typedef struct DNode {ElemType data;struct DNode *prior, *next; } DNode, *DLinkList; // 初始化帶頭結點 bool InitDNodeList(DLinkList &L) {L (DNode *)malloc(sizeof(DNode))…

成為AI產品經理——模型評估概述

目錄 一、模型宣講和評估的原因 二、模型宣講 三、模型評估 1. 重要特征 ① 特征來源 ②特征意義 2.選擇測試樣本 3.模型性能和穩定性 一、模型宣講和評估的原因 劉海豐老師提到他們在做一個金融AI產品未注重模型指標&#xff0c;過于注重業務指標&#xff0c;導致產生…

解決:ImportError: cannot import name ‘Adam‘ from ‘keras.optimizers‘

解決&#xff1a;ImportError: cannot import name ‘Adam‘ from ‘keras.optimizers‘ 背景 在使用之前的代碼時&#xff0c;報錯&#xff1a; from keras.optimizers import Adam ImportError: cannot import name ‘Adam’ 報錯問題 from keras.optimizers import Adam I…

2023年亞太數學建模大賽--A題(水果采摘機器人的圖像識別功能)

中國是世界上最大的蘋果生產國&#xff0c;年產量約為 3500 萬噸。同時&#xff0c;中國也是世界上最大的蘋果出口國&#xff0c;世界上每兩個蘋果中就有一個出口到國。世界上每兩個蘋果中就有一個來自中國&#xff0c;中國出口的蘋果占全球出口量的六分之一以上。來自中國。中…

Vue CLI的介紹【vue利器之一】

文章目錄 前言Vue CLI 介紹CLICLI 服務CLI 插件后言 前言 hello world歡迎來到前端的新世界 &#x1f61c;當前文章系列專欄&#xff1a;vue.js &#x1f431;?&#x1f453;博主在前端領域還有很多知識和技術需要掌握&#xff0c;正在不斷努力填補技術短板。(如果出現錯誤&am…