? ? ? ? zookeeper最初設計的初衷就是為了保證分布式系統的一致性。本文將講解如何利用zookeeper的臨時順序結點,實現分布式鎖。
目錄
1. 理論分析
? ? ? ? 1.1 結點類型
? ? ? ? 1.2 監聽器
? ? ? ? 1.3 實現原理
2. 手寫實現簡易zookeeper分布式鎖
? ? ? ? 1.1 依賴
?? ? ? ? 1.2 常量定義
????????1.3 實現zookeeper分布式鎖
????????1.4 使用方式
3. 引入Curator框架實現zookeeper分布式鎖
????????2.1 框架依賴
????????2.2 使用方式
1. 理論分析
? ? ? ? zookeeper?和Linux一樣,采用目錄樹的方式管理結點,目錄層級間以 / 區分
????????每個數據節點在 ZooKeeper 中被稱為?znode,它是 ZooKeeper 中數據的最小單元。由于ZooKeeper 主要用于協調服務,出于性能和一致性考慮,每個節點的存放數據上限為1M
? ? ?
? ? ? ? 1.1 結點類型
? ? ? ? znode有四種類型:
? ? ? ? 1.持久化結點? (PERSISTENT): 創建節點后一直存在
? ? ? ? 2. 持久化有序結點(PERSISTENT_SEQUENTIAL):在持久化結點的基礎上,zookeeper會自動根據創建順序,在結點名稱后面加上一串序號
? ? ? ? 3. 臨時結點(EPHEMERAL):在zookeeper與客戶端失去連接后自動刪除
? ? ? ? 4. 臨時有序結點(EPHEMERAL_SEQUENTIAL):在臨時結點的基礎上,zookeeper會自動根據創建順序,在結點名稱后面加上一串序號
? ? ? ? 1.2 監聽器
? ? ? ? ? ? ? ??Watcher?監聽機制是?Zookeeper?中非常重要的特性。結點可以綁定監聽事件,當監聽事件發生的時候,Zookeeper會向客戶端發送通知事件,執行監聽器的回調方法。
????????
? ? ? ? 1.3 實現原理
? ? ? ? 我們首先新建一個"/locks"的持久化結點,用來管理表示鎖的子節點。(實際場景使用可以根據不同鎖對象劃分成更細致的持久化結點,比如"/locks/bilibili/comment/publish")
? ? ? ? 當用戶嘗試獲取鎖的時候,在"locks"結點下新建一個臨時有序結點,例如"seq-00001"
? ? ? ? 新建結點成功后,系統進行檢查,建立的結點是否是當前所有子節點中序號最小的一個
? ? ? ? 如果是最小的一個,說明用戶是當前鎖的持有者,往下執行業務邏輯,執行完成后摧毀臨時結點
???????????????
? ? ? ? 如果不是最小的一個,為了避免不斷地自旋檢查空耗性能,一般采用注冊監聽器的方式減少性能消耗:監聽前一個結點的摧毀事件。如果用戶持有的結點前面還有其他結點,說明用戶不是持有的人,不能執行業務邏輯,應當阻塞等待;直到用戶前一個結點被摧毀,說明輪到用戶持有鎖了,可以繼續往下執行業務邏輯。
? ? ? ??
2. 手寫實現簡易zookeeper分布式鎖
? ? ? ? 1.1 依賴
<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version><scope>test</scope></dependency><!--日志--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--zookeeper--><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.6</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency>
?? ? ? ? 1.2 常量定義
public interface ZkConstants {//連接地址String connectString = "127.0.0.1:2181";// 連接超時時間int sessionTimeout = 2000;
}
????????1.3 實現zookeeper分布式鎖
public class DistributedLock {// zk客戶端連接private ZooKeeper zkClient;// 連接成功等待private CountDownLatch connectLatch = new CountDownLatch(1);// 前一個結點(鎖)private String waitPath;// 結點刪除等待private CountDownLatch waitLatch = new CountDownLatch(1);// 當前創建的結點(鎖)private String createNode;/*** 構造方法:初始化客戶端連接** @throws IOException* @throws InterruptedException* @throws KeeperException*/public DistributedLock() throws IOException, InterruptedException, KeeperException {//獲取連接zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//連接成功,釋放countDownLatchif (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}//前一個結點刪除if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {//解鎖下一個結點waitLatch.countDown();}}});//等待zk正常連接后,再往下執行connectLatch.await();//判斷根節點/locks是否存在Stat exists = zkClient.exists("/locks", false);if (exists == null) {//創建根節點 -- 持久結點zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}/*** 加鎖** @throws InterruptedException* @throws KeeperException*/public void zkLock() throws InterruptedException, KeeperException {//創建對應的臨時帶序號結點createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判斷創建節點是否是序號最小的結點List<String> children = zkClient.getChildren("/locks", false);if (children.size() == 1) {return;} else {//排序結點以得到當前創建結點的序號(等待鎖的序位)Collections.sort(children);//獲取生成的臨時結點序號String thisNode = createNode.substring("/locks/".length());//獲得排序int index = children.indexOf(thisNode);if (index == -1) {System.out.println("數據異常");} else if (index == 0) {//最小序號結點,直接獲取鎖return;} else {//監聽序號前一個結點waitPath = "/locks/" + children.get(index - 1);//true代表使用創建zkClient時初始化的監聽器zkClient.getData(waitPath, true, null);waitLatch.await();}}}/*** 解鎖** @throws InterruptedException* @throws KeeperException*/public void zkUnLock() throws InterruptedException, KeeperException {//刪除臨時帶序號結點zkClient.delete(createNode, -1);}}
????????1.4 使用方式
public class DistributedLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ExecutorService executorService = Executors.newFixedThreadPool(2);DistributedLock lock1 = new DistributedLock();DistributedLock lock2 = new DistributedLock();//多線程獲取鎖1CompletableFuture.supplyAsync(() -> {try {lock1.zkLock();System.out.println("線程" + Thread.currentThread().getName() + "獲取到鎖......");Thread.sleep(5000);lock1.zkUnLock();System.out.println("線程" + Thread.currentThread().getName() + "釋放鎖......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);//多線程獲取鎖2CompletableFuture.supplyAsync(() -> {try {lock2.zkLock();System.out.println("線程" + Thread.currentThread().getName() + "獲取到鎖......");Thread.sleep(5000);lock2.zkUnLock();System.out.println("線程" + Thread.currentThread().getName() + "釋放鎖......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);executorService.shutdown();}
}
3. 引入Curator框架實現zookeeper分布式鎖
? ? ? ? 實際生產環境下,自然不可能手寫這么多代碼處理分布式鎖,且不提很多地方的代碼可復用,CountDownLatch反復處理帶來的代碼復雜性高,并且一些可重入鎖、異常處理等邏輯上文也并沒有完善。
? ? ? ? 生產場景中被廣泛使用的zookeeper分布式鎖的框架便是Curator
????????2.1 框架依賴
/..省略../<!--Curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>
????????2.2 使用方式
public class CuratorLockTest {public static void main(String[] args) {//創建分布式鎖1InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(() -> {try {lock1.acquire();System.out.println("線程1獲取到鎖");//curator支持可重入鎖lock1.acquire();System.out.println("線程1 再次獲取到鎖");Thread.sleep(5000);lock1.release();System.out.println("線程1 釋放鎖");lock1.release();System.out.println("線程1 再次釋放鎖");} catch (Exception e) {throw new RuntimeException(e);}});executorService.execute(() -> {try {lock2.acquire();System.out.println("線程2獲取到鎖");lock2.acquire();System.out.println("線程2 再次獲取到鎖");Thread.sleep(5000);lock2.release();System.out.println("線程2 釋放鎖");lock2.release();System.out.println("線程2 再次釋放鎖");} catch (Exception e) {throw new RuntimeException(e);}});executorService.shutdown();}public static CuratorFramework getCuratorFramework() {//4秒超時,重試3次ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString).connectionTimeoutMs(ZkConstants.sessionTimeout).sessionTimeoutMs(ZkConstants.sessionTimeout).retryPolicy(exponentialBackoffRetry).build();client.start();System.out.println("zookeeper 啟動成功...");return client;}
}
? ? ? ? 希望能對大家理解zookeeper分布式鎖有所幫助