Zookeeper分布式鎖實現

? ? ? ? zookeeper最初設計的初衷就是為了保證分布式系統的一致性。本文將講解如何利用zookeeper的臨時順序結點,實現分布式鎖。

目錄

1. 理論分析

? ? ? ? 1.1 結點類型

? ? ? ? 1.2 監聽器

? ? ? ? 1.3 實現原理

2. 手寫實現簡易zookeeper分布式鎖

? ? ? ? 1.1 依賴

?? ? ? ? 1.2 常量定義

????????1.3 實現zookeeper分布式鎖

????????1.4 使用方式

3. 引入Curator框架實現zookeeper分布式鎖

????????2.1 框架依賴

????????2.2 使用方式


1. 理論分析

? ? ? ? zookeeper?和Linux一樣,采用目錄樹的方式管理結點,目錄層級間以 / 區分

????????每個數據節點在 ZooKeeper 中被稱為?znode,它是 ZooKeeper 中數據的最小單元。由于ZooKeeper 主要用于協調服務,出于性能和一致性考慮,每個節點的存放數據上限為1M

? ? ?

? ? ? ? 1.1 結點類型

? ? ? ? znode有四種類型:

? ? ? ? 1.持久化結點? (PERSISTENT): 創建節點后一直存在

? ? ? ? 2. 持久化有序結點(PERSISTENT_SEQUENTIAL):在持久化結點的基礎上,zookeeper會自動根據創建順序,在結點名稱后面加上一串序號

? ? ? ? 3. 臨時結點(EPHEMERAL):在zookeeper與客戶端失去連接后自動刪除

? ? ? ? 4. 臨時有序結點(EPHEMERAL_SEQUENTIAL):在臨時結點的基礎上,zookeeper會自動根據創建順序,在結點名稱后面加上一串序號

? ? ? ? 1.2 監聽器

? ? ? ? ? ? ? ??Watcher?監聽機制是?Zookeeper?中非常重要的特性。結點可以綁定監聽事件,當監聽事件發生的時候,Zookeeper會向客戶端發送通知事件,執行監聽器的回調方法。

????????

? ? ? ? 1.3 實現原理

? ? ? ? 我們首先新建一個"/locks"的持久化結點,用來管理表示鎖的子節點。(實際場景使用可以根據不同鎖對象劃分成更細致的持久化結點,比如"/locks/bilibili/comment/publish")

? ? ? ? 當用戶嘗試獲取鎖的時候,在"locks"結點下新建一個臨時有序結點,例如"seq-00001"

? ? ? ? 新建結點成功后,系統進行檢查,建立的結點是否是當前所有子節點中序號最小的一個

? ? ? ? 如果是最小的一個,說明用戶是當前鎖的持有者,往下執行業務邏輯,執行完成后摧毀臨時結點

???????????????

? ? ? ? 如果不是最小的一個,為了避免不斷地自旋檢查空耗性能,一般采用注冊監聽器的方式減少性能消耗:監聽前一個結點的摧毀事件。如果用戶持有的結點前面還有其他結點,說明用戶不是持有的人,不能執行業務邏輯,應當阻塞等待;直到用戶前一個結點被摧毀,說明輪到用戶持有鎖了,可以繼續往下執行業務邏輯。

? ? ? ??

2. 手寫實現簡易zookeeper分布式鎖

? ? ? ? 1.1 依賴

        <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version><scope>test</scope></dependency><!--日志--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--zookeeper--><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.6</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency>

?? ? ? ? 1.2 常量定義

public interface ZkConstants {//連接地址String connectString = "127.0.0.1:2181";// 連接超時時間int sessionTimeout = 2000;
}

????????1.3 實現zookeeper分布式鎖

public class DistributedLock {// zk客戶端連接private ZooKeeper zkClient;// 連接成功等待private CountDownLatch connectLatch = new CountDownLatch(1);// 前一個結點(鎖)private String waitPath;// 結點刪除等待private CountDownLatch waitLatch = new CountDownLatch(1);// 當前創建的結點(鎖)private String createNode;/*** 構造方法:初始化客戶端連接** @throws IOException* @throws InterruptedException* @throws KeeperException*/public DistributedLock() throws IOException, InterruptedException, KeeperException {//獲取連接zkClient = new ZooKeeper(ZkConstants.connectString, ZkConstants.sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//連接成功,釋放countDownLatchif (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}//前一個結點刪除if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {//解鎖下一個結點waitLatch.countDown();}}});//等待zk正常連接后,再往下執行connectLatch.await();//判斷根節點/locks是否存在Stat exists = zkClient.exists("/locks", false);if (exists == null) {//創建根節點 -- 持久結點zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}/*** 加鎖** @throws InterruptedException* @throws KeeperException*/public void zkLock() throws InterruptedException, KeeperException {//創建對應的臨時帶序號結點createNode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判斷創建節點是否是序號最小的結點List<String> children = zkClient.getChildren("/locks", false);if (children.size() == 1) {return;} else {//排序結點以得到當前創建結點的序號(等待鎖的序位)Collections.sort(children);//獲取生成的臨時結點序號String thisNode = createNode.substring("/locks/".length());//獲得排序int index = children.indexOf(thisNode);if (index == -1) {System.out.println("數據異常");} else if (index == 0) {//最小序號結點,直接獲取鎖return;} else {//監聽序號前一個結點waitPath = "/locks/" + children.get(index - 1);//true代表使用創建zkClient時初始化的監聽器zkClient.getData(waitPath, true, null);waitLatch.await();}}}/*** 解鎖** @throws InterruptedException* @throws KeeperException*/public void zkUnLock() throws InterruptedException, KeeperException {//刪除臨時帶序號結點zkClient.delete(createNode, -1);}}

????????1.4 使用方式

public class DistributedLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ExecutorService executorService = Executors.newFixedThreadPool(2);DistributedLock lock1 = new DistributedLock();DistributedLock lock2 = new DistributedLock();//多線程獲取鎖1CompletableFuture.supplyAsync(() -> {try {lock1.zkLock();System.out.println("線程" + Thread.currentThread().getName() + "獲取到鎖......");Thread.sleep(5000);lock1.zkUnLock();System.out.println("線程" + Thread.currentThread().getName() + "釋放鎖......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);//多線程獲取鎖2CompletableFuture.supplyAsync(() -> {try {lock2.zkLock();System.out.println("線程" + Thread.currentThread().getName() + "獲取到鎖......");Thread.sleep(5000);lock2.zkUnLock();System.out.println("線程" + Thread.currentThread().getName() + "釋放鎖......");} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}return true;}, executorService);executorService.shutdown();}
}

3. 引入Curator框架實現zookeeper分布式鎖

? ? ? ? 實際生產環境下,自然不可能手寫這么多代碼處理分布式鎖,且不提很多地方的代碼可復用,CountDownLatch反復處理帶來的代碼復雜性高,并且一些可重入鎖、異常處理等邏輯上文也并沒有完善。

? ? ? ? 生產場景中被廣泛使用的zookeeper分布式鎖的框架便是Curator

????????2.1 框架依賴

        /..省略../<!--Curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version></dependency>

????????2.2 使用方式

public class CuratorLockTest {public static void main(String[] args) {//創建分布式鎖1InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(() -> {try {lock1.acquire();System.out.println("線程1獲取到鎖");//curator支持可重入鎖lock1.acquire();System.out.println("線程1 再次獲取到鎖");Thread.sleep(5000);lock1.release();System.out.println("線程1 釋放鎖");lock1.release();System.out.println("線程1 再次釋放鎖");} catch (Exception e) {throw new RuntimeException(e);}});executorService.execute(() -> {try {lock2.acquire();System.out.println("線程2獲取到鎖");lock2.acquire();System.out.println("線程2 再次獲取到鎖");Thread.sleep(5000);lock2.release();System.out.println("線程2 釋放鎖");lock2.release();System.out.println("線程2 再次釋放鎖");} catch (Exception e) {throw new RuntimeException(e);}});executorService.shutdown();}public static CuratorFramework getCuratorFramework() {//4秒超時,重試3次ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(4000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(ZkConstants.connectString).connectionTimeoutMs(ZkConstants.sessionTimeout).sessionTimeoutMs(ZkConstants.sessionTimeout).retryPolicy(exponentialBackoffRetry).build();client.start();System.out.println("zookeeper 啟動成功...");return client;}
}

? ? ? ? 希望能對大家理解zookeeper分布式鎖有所幫助

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/71481.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/71481.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/71481.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Git是什么

簡單介紹&#xff1a; Git是一個分布式版本控制系統&#xff0c;用于跟蹤文件的更改&#xff0c;特別是在多人協作開發的環境中。 Key: 分布式 版本控制 系統 最常用于軟件開發&#xff0c;但也可以用于管理任何類型的文件和文件夾。 Git幫助團隊跟蹤和管理文件的歷史版本&a…

Pycharm 2024在解釋器提供的python控制臺中運行py文件

2024版的界面發生了變化, run with python console搬到了這里:

【分布式理論12】事務協調者高可用:分布式選舉算法

文章目錄 一、分布式系統中事務協調的問題二、分布式選舉算法1. Bully算法2. Raft算法3. ZAB算法 三、小結與比較 一、分布式系統中事務協調的問題 在分布式系統中&#xff0c;常常有多個節點&#xff08;應用&#xff09;共同處理不同的事務和資源。前文 【分布式理論9】分布式…

免費deepseek的API獲取教程及將API接入word或WPS中

免費deepseek的API獲取教程: 1 https://cloud.siliconflow.cn/中注冊時填寫邀請碼&#xff1a;GAejkK6X即可獲取2000 萬 Tokens; 2 按照圖中步驟進行操作 將API接入word或WPS中 1 打開一個word&#xff0c;文件-選項-自定義功能區-勾選開發工具-左側的信任中心-信任中心設置…

【SFRA】筆記

GK_SFRA_INJECT(x) SFRA小信號注入函數,向控制環路注入一個小信號。如下圖所示,當前程序,小信號注入是在固定占空比的基礎疊加小信號,得到新的占空比,使用該占空比控制環路。 1.2 GK_SFRA_COLLECT(x, y) SFRA數據收集函數,將小信號注入環路后,該函數收集環路的數據,以…

論文筆記-WSDM2024-LLMRec

論文筆記-WSDM2024-LLMRec: Large Language Models with Graph Augmentation for Recommendation LLMRec: 基于圖增強的大模型推薦摘要1.引言2.前言2.1使用圖嵌入推薦2.2使用輔助信息推薦2.3使用數據增強推薦 3.方法3.1LLM作為隱式反饋增強器3.2基于LLM的輔助信息增強3.2.1用戶…

Ubuntu 系統 cuda12.2 安裝 MMDetection3D

DataBall 助力快速掌握數據集的信息和使用方式&#xff0c;會員享有 百種數據集&#xff0c;持續增加中。 需要更多數據資源和技術解決方案&#xff0c;知識星球&#xff1a; “DataBall - X 數據球(free)” 貴在堅持&#xff01; ---------------------------------------…

Tomcat的升級

Tomcat 是一個開源的 Java Servlet 容器&#xff0c;用于部署 Java Servlet 和 JavaServer Pages&#xff08;JSP&#xff09;。隨著新版本的發布&#xff0c;Tomcat 通常會帶來性能改進、安全增強、新特性和對最新 Java 版本的更好支持。升級 Tomcat 服務器通常涉及到以下幾個…

Python常見面試題的詳解10

1. 哪些操作會導致 Python 內存溢出&#xff0c;怎么處理&#xff1f; 要點 1. 創建超大列表或字典&#xff1a;當我們一次性創建規模極為龐大的列表或字典時&#xff0c;會瞬間占用大量的內存資源。例如&#xff0c;以下代碼試圖創建一個包含 10 億個元素的列表&#xff0c;在…

多個用戶如何共用一根網線傳輸數據

前置知識 一、電信號 網線&#xff08;如以太網線&#xff09;中傳輸的信號主要是 電信號&#xff0c;它攜帶著數字信息。這些信號用于在計算機和其他網絡設備之間傳輸數據。下面是一些關于網線傳輸信號的詳細信息&#xff1a; 1. 電信號傳輸 在以太網中&#xff0c;數據是…

華為昇騰 910B 部署 DeepSeek-R1 蒸餾系列模型詳細指南

本文記錄 在 華為昇騰 910B(65GB) * 8 上 部署 DeepSeekR1 蒸餾系列模型&#xff08;14B、32B&#xff09;全過程與測試結果。 NPU&#xff1a;910B3 (65GB) * 8 &#xff08;910B 有三個版本 910B1、2、3&#xff09; 模型&#xff1a;DeepSeek-R1-Distill-Qwen-14B、DeepSeek…

【前端】Vue組件庫之Element: 一個現代化的 UI 組件庫

文章目錄 前言一、官網1、官網主頁2、設計原則3、導航4、組件 二、核心功能&#xff1a;開箱即用的組件生態1、豐富的組件體系2、特色功能亮點 三、快速上手&#xff1a;三步開啟組件化開發1、安裝&#xff08;使用Vue 3&#xff09;2、全局引入3、按需導入&#xff08;推薦&am…

關于uniApp的面試題及其答案解析

我的血液里流淌著戰意&#xff01;力量與智慧指引著我&#xff01; 文章目錄 1. 什么是uniApp&#xff1f;2. uniApp與原生小程序開發有什么區別&#xff1f;3. 如何使用uniApp實現條件編譯&#xff1f;4. uniApp支持哪些平臺&#xff0c;各有什么特點&#xff1f;5. 在uniApp中…

Ubuntu 下 nginx-1.24.0 源碼分析 - ngx_pool_t 類型

ngx_pool_t 定義在 src/core/ngx_core.h typedef struct ngx_pool_s ngx_pool_t; ngx_pool_s 定義在 src/core/ngx_palloc.h struct ngx_pool_s {ngx_pool_data_t d;size_t max;ngx_pool_t *current;ngx_chain_t *chain;ng…

力扣 最長遞增子序列

動態規劃&#xff0c;二分查找。 題目 由題&#xff0c;從數組中找一個最長子序列&#xff0c;不難想到&#xff0c;當這個子序列遞增子序列的數越接近時是越容易拉長的。從dp上看&#xff0c;當遍歷到這個數&#xff0c;會從前面的dp選一個最大的數加上當前數&#xff0c;注意…

Linux | 進程控制(進程終止與進程等待)

文章目錄 Linux | 進程控制 — 進程終止 & 進程等待1、進程終止進程常見退出方法1.1退出碼基本概念獲取退出碼的方式常見退出碼約定使用場景 1.2 strerror函數 & errno宏1.3 _exit函數1.4_exit和exit的區別1.4.1 所屬頭文件與函數原型1.4.2 執行過程差異**結合現象分析…

Android - Handler使用post之后,Runnable沒有執行

問題&#xff1a;子線程創建的Handler。如果 post 之后&#xff0c;在Handler.removeCallbacks(run)移除了&#xff0c;下次再使用Handler.postDelayed(Runnable)接口或者使用post時&#xff0c;Runnable是沒有執行。導致沒有收到消息。 解決辦法&#xff1a;只有主線程創建的…

魚皮面試鴨30天后端面試營

day1 1. MySQL的索引類型有哪些? MySQL里的索引就像是書的目錄&#xff0c;能幫數據庫快速找到你要的數據。以下是各種索引類型的通俗解釋&#xff1a; 按數據結構分 B樹索引&#xff1a;最常用的一種&#xff0c;數據像在一棵樹上分層存放&#xff0c;能快速定位范圍數據…

【核心算法篇十二】《深入解剖DeepSeek多任務學習:共享表示層的24個設計細節與實戰密碼 》

引言:為什么你的模型總在"精神分裂"? 想象你訓練了一個AI實習生: 早上做文本分類時準確率90%下午做實體識別卻把"蘋果"都識別成水果公司晚上做情感分析突然開始輸出亂碼這就是典型的任務沖突災難——模型像被不同任務"五馬分尸"。DeepSeek通…

DeepSeek應用——與PyCharm的配套使用

目錄 一、配置方法 二、使用方法 三、注意事項 1、插件市場無continue插件 2、無結果返回&#xff0c;且在本地模型報錯 記錄自己學習應用DeepSeek的過程&#xff0c;使用的是自己電腦本地部署的私有化蒸餾模型...... &#xff08;舉一反三&#xff0c;這個不單單是可以用…