Zookeeper 能保證數據的強一致性,用戶任何時候都可以相信集群中每個節點的數據都是相同的。一個用戶創建一個節點作為鎖,另一個用戶檢測該節點,如果存在,代表別的用戶已經鎖住,如果不存在,則可以創建一個節點,代表擁有一個鎖。
~
本篇內容包括:Demo 概述、代碼實現、測試結果
文章目錄
- 一、Demo 概述
- 1、關于 zookeeper “命名服務協調”
- 2、Demo 設計
- 3、Demo 前提
- 二、代碼實現
- 1、引用 Maven 依賴
- 2、ConnectionWatcher 類創建 Zookeeper 連接
- 3、ActiveKeyValueStore 類讀寫 Zookeeper 數據
- 4、ZkLock 類實現分布式鎖
- 三、測試結果
一、Demo 概述
1、關于 zookeeper “命名服務協調”
Zookeeper 能保證數據的強一致性,用戶任何時候都可以相信集群中每個節點的數據都是相同的。一個用戶創建一個節點作為鎖,另一個用戶檢測該節點,如果存在,代表別的用戶已經鎖住,如果不存在,則可以創建一個節點,代表擁有一個鎖。
2、Demo 設計
分布式鎖本質,就是多個資源競爭者對一份資源的排他占有
- 我們設置多個線程,分別在同一 path 下創建節點
- 沒個線程獲取當前 path 下子節點,看最小子節點是否為自身,是則加鎖成功(更好的方式是用 Watcher 對前一個地址監控,這里圖方便用子節點排序取最小的方式 )
- 線程加鎖成功后,執行任務,執行完畢后解鎖
3、Demo 前提
參考:Mac通過Docker安裝Zookeeper集群
二、代碼實現
1、引用 Maven 依賴
<!-- 選擇對應的Zookeeper版本 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>
2、ConnectionWatcher 類創建 Zookeeper 連接
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ConnectionWatcher implements Watcher {private final CountDownLatch connectedSignal = new CountDownLatch(1);private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;public void connect(String hosts) throws IOException, InterruptedException {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);connectedSignal.await();}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedSignal.countDown();}}public void close() throws InterruptedException {zk.close();}}
3、ActiveKeyValueStore 類讀寫 Zookeeper 數據
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = StandardCharsets.UTF_8;int state = 0;/*** 寫入節點數據** @param path 節點地址* @param value 數據值* @throws InterruptedException 中斷異常* @throws KeeperException ZooKeeper異常*/public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.exists(path, false);if (stat == null) {if (value == null) {zk.create(path, null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);} else {zk.create(path, value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} else {if (value == null) {zk.setData(path, null, -1);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}public boolean lock(String path, String name) throws InterruptedException, KeeperException {boolean flag = tryLock(path, name);if (flag) {state++;}return flag;}public boolean tryLock(String path, String name) throws InterruptedException, KeeperException {String lockPath = path + "/" + name;zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);List<String> waits = readChildren(path, null);Collections.sort(waits);if (waits.get(0).equals(name)) {return true;}CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < waits.size(); i++) {String cur = waits.get(i);if (!cur.equalsIgnoreCase(name)) {continue;}String prePath = path + "/" + waits.get(i - 1);zk.exists(prePath, new Watcher() {@Overridepublic void process(WatchedEvent event) {latch.countDown();}});break;}latch.await();return true;}public boolean unlock(String path, String name) {if (state > 1) {state--;return true;}String lockPath = path + "/" + name;try {Stat stat = zk.exists(lockPath, false);int version = stat.getVersion();zk.delete(lockPath, version);state--;return true;} catch (Exception e) {System.out.println("unlock:" + lockPath + " ,exception,");}return false;}/*** 獲取所有子節點** @param path 節點地址* @param watcher watcher* @return 所有子節點* @throws InterruptedException 中斷異常* @throws KeeperException ZooKeeper異常*/public List<String> readChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {List<String> childrens = null;if (watcher == null) {childrens = zk.getChildren(path, false);} else {childrens = zk.getChildren(path, watcher, null);}return childrens;}
}
4、ZkLock 類實現分布式鎖
import lombok.SneakyThrows;
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class ZkLock {/*** 開啟的線程數,模擬多客戶端操作*/private static final int CLIENTS_NUM = 3;private final ActiveKeyValueStore store;public ZkLock(String hosts) throws IOException, InterruptedException {//定義一個類store = new ActiveKeyValueStore();//連接Zookeeperstore.connect(hosts);}public static void testLock() {//線程計數器控制業務的執行final CountDownLatch countDownLatch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {new Thread() {@Overridepublic void run() {}}.start();}try {// 堵塞線程,任務執行完后釋放countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {String hosts = "localhost:2181";ZkLock zkLock = new ZkLock(hosts);// 創建父節點zkLock.store.write("/lock4", "父親節點");//CountDownLatch latch = new CountDownLatch(CLIENTS_NUM);for (int i = 0; i < CLIENTS_NUM; i++) {int finalI = i;new Thread() {@SneakyThrows@Overridepublic void run() {String name = "Thread-" + String.valueOf(finalI);zkLock.store.lock("/lock4", name);TimeUnit.SECONDS.sleep(2);System.out.println("線程-" + name + "執行完畢");latch.countDown();zkLock.store.unlock("/lock4", name);}}.start();}latch.await();System.out.println("end ...");}}
三、測試結果
ZkLock 代碼測試結果如下:
線程-Thread-0執行完畢
線程-Thread-1執行完畢
線程-Thread-2執行完畢
end ...
通過 ZkLock 打印的信息可以看出,已經成功模擬實現分布式鎖