Zookeeper:實現“分布式鎖”的 Demo

Zookeeper 能保證數據的強一致性,用戶任何時候都可以相信集群中每個節點的數據都是相同的。一個用戶創建一個節點作為鎖,另一個用戶檢測該節點,如果存在,代表別的用戶已經鎖住,如果不存在,則可以創建一個節點,代表擁有一個鎖。

本篇內容包括:Demo 概述、代碼實現、測試結果


文章目錄

    • 一、Demo 概述
        • 1、關于 zookeeper “命名服務協調”
        • 2、Demo 設計
        • 3、Demo 前提
    • 二、代碼實現
        • 1、引用 Maven 依賴
        • 2、ConnectionWatcher 類創建 Zookeeper 連接
        • 3、ActiveKeyValueStore 類讀寫 Zookeeper 數據
        • 4、ZkLock 類實現分布式鎖
    • 三、測試結果


一、Demo 概述

1、關于 zookeeper “命名服務協調”

Zookeeper 能保證數據的強一致性,用戶任何時候都可以相信集群中每個節點的數據都是相同的。一個用戶創建一個節點作為鎖,另一個用戶檢測該節點,如果存在,代表別的用戶已經鎖住,如果不存在,則可以創建一個節點,代表擁有一個鎖。

2、Demo 設計

分布式鎖本質,就是多個資源競爭者對一份資源的排他占有

  • 我們設置多個線程,分別在同一 path 下創建節點
  • 沒個線程獲取當前 path 下子節點,看最小子節點是否為自身,是則加鎖成功(更好的方式是用 Watcher 對前一個地址監控,這里圖方便用子節點排序取最小的方式 )
  • 線程加鎖成功后,執行任務,執行完畢后解鎖

3、Demo 前提

參考:Mac通過Docker安裝Zookeeper集群


二、代碼實現

1、引用 Maven 依賴

        <!--    選擇對應的Zookeeper版本    --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>

2、ConnectionWatcher 類創建 Zookeeper 連接

import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal = new CountDownLatch(1);private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}

3、ActiveKeyValueStore 類讀寫 Zookeeper 數據

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = StandardCharsets.UTF_8;int state = 0;/*** 寫入節點數據** @param path  節點地址* @param value 數據值* @throws InterruptedException 中斷異常* @throws KeeperException      ZooKeeper異常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.exists(path, false);if (stat == null) {if (value == null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value == null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag = tryLock(path, name);if (flag) {state++;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath = path + "/" + name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List<String> waits = readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < waits.size(); i++) {String cur = waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath = path + "/" + waits.get(i - 1);zk.exists(prePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state > 1) {state--;return true;}String lockPath = path + "/" + name;try {Stat stat = zk.exists(lockPath, false);int version = stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println("unlock:" + lockPath + " ,exception,");}return false;}/*** 獲取所有子節點** @param path    節點地址* @param watcher watcher* @return 所有子節點* @throws InterruptedException 中斷異常* @throws KeeperException      ZooKeeper異常*/public List<String> readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {List<String> childrens = null;if (watcher == null) {childrens = zk.getChildren(path, false);} else {childrens = zk.getChildren(path, watcher, null);}return childrens;}
}

4、ZkLock 類實現分布式鎖

import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 開啟的線程數,模擬多客戶端操作*/private static final int CLIENTS_NUM = 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定義一個類store = new ActiveKeyValueStore();//連接Zookeeperstore.connect(hosts);}public static void testLock() {//線程計數器控制業務的執行final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {new Thread() {@Overridepublic void run() {}}.start();}try {// 堵塞線程,任務執行完后釋放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts = "localhost:2181";ZkLock zkLock = new ZkLock(hosts);// 創建父節點zkLock.store.write("/lock4", "父親節點");//CountDownLatch latch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {int finalI = i;new Thread() {@SneakyThrows@Overridepublic void run() {String name = "Thread-" + String.valueOf(finalI);zkLock.store.lock("/lock4", name);TimeUnit.SECONDS.sleep(2);System.out.println("線程-" + name + "執行完畢");latch.countDown();zkLock.store.unlock("/lock4", name);}}.start();}latch.await();System.out.println("end ...");}}

三、測試結果

ZkLock 代碼測試結果如下:

線程-Thread-0執行完畢
線程-Thread-1執行完畢
線程-Thread-2執行完畢
end ...

通過 ZkLock 打印的信息可以看出,已經成功模擬實現分布式鎖

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/535441.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/535441.shtml
英文地址,請注明出處:http://en.pswp.cn/news/535441.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JavaIO流:案例

java.io 包下需要掌握的流有 16 個&#xff0c;本篇內容包括&#xff1a;java.io包下需要掌握的流、Java IO 案例。 文章目錄一、java.io包下需要掌握的流二、Java IO 案例1、Demo 1&#xff08;FileInputStream&#xff09;2、Demo 2&#xff08;FileInputStream&#xff09;3…

比對excel數據

#!/usr/bin/env pythonimport openpyxl from openpyxl.styles import PatternFill from openpyxl.styles import colors from openpyxl.styles import Font, Color aD:/測算單位設置/比對/吉林/tmp001.xlsx bD:/測算單位設置/比對/吉林/國網吉林電力.xlsx cD:/測算單位設置/比對…

CPU 是如何執行任務的

前言 你清楚下面這幾個問題嗎&#xff1f; 有了內存&#xff0c;為什么還需要 CPU Cache&#xff1f; CPU 是怎么讀寫數據的&#xff1f; 如何讓 CPU 能讀取數據更快一些&#xff1f; CPU 偽共享是如何發生的&#xff1f;又該如何避免&#xff1f; CPU 是如何調度任務的&a…

Ansible 的自動化運維

1、Ansible 特點 Ansible 自 2012 年發布以來&#xff0c;很快在全球流行&#xff0c;其特點如下&#xff1a; Ansible 基于 Python 開發&#xff0c;運維工程師對其二次開發相對比較容易&#xff1b; Ansible 豐富的內置模塊&#xff0c;幾乎可以滿足一切要求&#xff1b; …

Shell 信號發送與捕捉

1、Linux信號類型 信號&#xff08;Signal&#xff09;&#xff1a;信號是在軟件層次上對中斷機制的一種模擬&#xff0c;通過給一個進程發送信號&#xff0c;執行相應的處理函數。 進程可以通過三種方式來響應一個信號&#xff1a; 1&#xff09;忽略信號&#xff0c;即對信…

運維面試題總結

集群相關 簡述 ETCD 及其特點&#xff1f; etcd 是 CoreOS 團隊發起的開源項目&#xff0c;是一個管理配置信息和服務發現&#xff08;service discovery&#xff09;的項目&#xff0c;它的目標是構建一個高可用的分布式鍵值&#xff08;key-value&#xff09;數據庫&#x…

詳解設計模式:建造者模式

建造者模式&#xff08;Builder Pattern&#xff09;也叫做生成器模式&#xff0c;是 GoF 的 23 種設計模式的一種&#xff0c;它將一個復雜對象的構建與它的表示分離&#xff0c;使得同樣的構建過程可以創建不同的表示。 當我們需要實列化一個復雜的類&#xff0c;以得到不同結…

圖文并茂 VLAN 詳解,讓你看一遍就理解 VLAN

一、為什么需要VLAN 1.1、什么是VLAN? VLAN(Virtual LAN)&#xff0c;翻譯成中文是“虛擬局域網”。LAN可以是由少數幾臺家用計算機構成的網絡&#xff0c;也可以是數以百計的計算機構成的企業網絡。VLAN所指的LAN特指使用路由器分割的網絡——也就是廣播域。 在此讓我們先復習…

認識VLAN,并學會VLAN的劃分和網絡配置實例

VLAN的劃分和網絡的配置實例 1、VLAN基礎知識 VLAN&#xff08;Virtual Local Area Network&#xff09;的中文名為&#xff1a;“虛擬局域網”&#xff0c;注意和VPN&#xff08;虛擬專用網&#xff09;進行區分。 VLAN是一種將局域網設備從邏輯上劃分&#xff08;不是從物…

VLAN劃分及配置注意事項

VLAN&#xff08;Virtual Local Area Network&#xff09;即虛擬局域網&#xff0c;是將一個物理的LAN在邏輯上劃分成多個廣播域的通信技術。VLAN內的主機間可以直接通信&#xff0c;而VLAN間不能直接通信&#xff0c;從而將廣播報文限制在一個VLAN內。VLAN之間的通信是通過第3…

Docker原理剖析

一、簡介 1、了解Docker的前生LXC LXC為Linux Container的簡寫。可以提供輕量級的虛擬化&#xff0c;以便隔離進程和資源&#xff0c;而且不需要提供指令解釋機制以及全虛擬化的其他復雜性。相當于C中的NameSpace。容器有效地將由單個操作系統管理的資源劃分到孤立的組中&#…

獲取Linux內存、cpu、磁盤IO等信息

#!/bin/bash # 獲取要監控的本地服務器IP地址 IPifconfig | grep inet | grep -vE inet6|127.0.0.1 | awk {print $2} echo "IP地址&#xff1a;"$IP# 獲取cpu總核數 cpu_numgrep -c "model name" /proc/cpuinfo echo "cpu總核數&#xff1a;"$c…

Docker容器網絡解析

Docker 容器網絡的發展歷史 在 Dokcer 發布之初&#xff0c;Docker 是將網絡、管理、安全等集成在一起的&#xff0c;其中網絡模塊可以為容器提供橋接網絡、主機網絡等簡單的網絡功能。 從 1.7 版本開始&#xff0c;Docker正是把網絡和存儲這兩部分的功能都以插件化形式剝離出來…

將指定excel的一列數據提取到另一個excel的指定列

#!/usr/bin/env python import openpyxl bjD:/地市縣公司/西藏臺賬數據分析-設備臺帳分析.xlsx wb openpyxl.load_workbook (bj) get_sheets wb.sheetnames #print(get_sheets) TA01TA01 TA02TA02 TA03TA03 TE01TE01 YG201YG201 YG202YG202 YG203YG203 YG204YG204 YG205YG205…

Docker 數據管理介紹

默認容器的數據是保存在容器的可讀寫層&#xff0c;當容器被刪除時其上的數據也會丟失&#xff0c;所以為了實現數據的持久性則需要選擇一種數據持久技術來保存數據。官方提供了三種存儲方式&#xff1a;Volumes、Bind mounts和tmpfs。前面還介紹了&#xff1a;Docker 服務終端…

Docker 數據持久化的三種方案

容器中的數據可以存儲在容器層。但是將數據存放在容器層存在以下問題&#xff1a; 數據不是持久化。意思是如果容器刪除了&#xff0c;這些數據也就沒了 主機上的其它進程不方便訪問這些數據 對這些數據的I/O會經過存儲驅動&#xff0c;然后到達主機&#xff0c;引入了一層間…

Git 存儲原理及相關實現

Git 是目前最流行的版本控制系統&#xff0c;從本地開發到生產部署&#xff0c;我們每天都在使用 Git 進行我們的版本控制&#xff0c;除了日常使用的命令之外&#xff0c;如果想要對 Git 有更深一步的了解&#xff0c;那么研究下 Git 的底層存儲原理將會對理解 Git 及其使用非…

Git內部原理

Git有什么特點&#xff1f; fast&#xff0c;scalable&#xff0c;distributed revision control system&#xff08;快速&#xff0c;可擴展的分布式版本控制系統&#xff09; 幾乎所有操作都是本地執行 每一個clone都是整個生命周期的完整副本 the stupid content tracker&a…

git存儲原理

四種數據類型 實際上Git基于數據類型的不同&#xff0c;把對象分為四種&#xff1a;數據對象、樹對象、提交對象、標簽對象。Git文件系統的設計思路與linux文件系統相似&#xff0c;即將文件的內容與文件的屬性分開存儲&#xff0c;文件內容以“裝滿字節的袋子”存儲在文件系統…

詳解設計模式:中介者模式

中介者模式&#xff08;Mediator Pattern&#xff09;也被稱為調停者模式&#xff0c;是在 GoF 23 種設計模式中定義了的行為型模式。 中介者模式 是用來降低多個對象和類之間的通信復雜性。這種模式提供了一個中介類&#xff0c;該類通常處理不同類之間的通信&#xff0c;并支…