ZooKeeper Java客戶端與分布式應用實戰

1. ZooKeeper Java客戶端實戰

ZooKeeper應用開發主要通過Java客戶端API連接和操作ZooKeeper集群,有官方和第三方兩種客戶端選擇。

1.1 ZooKeeper原生Java客戶端

依賴引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>

注意:客戶端版本需與服務端保持一致,避免兼容性問題

基本使用
public class ZkClientDemo {private static final String CLUSTER_CONNECT_STR = "192.168.22.156:2181,192.168.22.190:2181,192.168.22.200:2181";public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None) {countDownLatch.countDown();System.out.println("連接建立");}}});countDownLatch.await();System.out.println(zooKeeper.getState()); // CONNECTED// 創建持久節點zooKeeper.create("/user", "fox".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}
原生API的局限性
  • Watcher監測為一次性,需重復注冊
  • 無自動重連機制
  • 異常處理復雜
  • 僅提供byte[]接口,缺少POJO序列化支持
  • 需手動檢查節點存在性
  • 不支持級聯刪除
常用方法
  • create(path, data, acl, createMode):創建節點
  • delete(path, version):刪除節點
  • exists(path, watch):判斷節點存在性
  • getData(path, watch):獲取節點數據
  • setData(path, data, version):設置節點數據
  • getChildren(path, watch):獲取子節點列表
  • sync(path):同步客戶端與leader節點

所有方法都提供同步和異步兩個版本,且支持條件更新(通過version參數控制)。

同步創建節點
@Test
public void createTest() throws KeeperException, InterruptedException {String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}", path);
}
異步創建節點
@Test
public void createAsyncTest() throws InterruptedException {zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name),"context");
}
修改節點數據
@Test
public void setTest() throws KeeperException, InterruptedException {Stat stat = new Stat();byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}", new String(data));zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}", new String(dataAfter));
}

1.2 Curator開源客戶端(常用)

依賴引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
客戶端創建
// 方式一:使用newClient方法
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();// 方式二:使用builder模式(推薦)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base") // 命名空間隔離.build();
client.start();
重試策略類型
  • ExponentialBackoffRetry:重試間隔按指數增長
  • RetryNTimes:最大重試次數
  • RetryOneTime:只重試一次
  • RetryUntilElapsed:在指定時間內重試
基本操作
// 創建節點
@Test
public void testCreate() throws Exception {String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{} successfully.", path);
}// 創建層級節點
@Test
public void testCreateWithParent() throws Exception {String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);
}// 獲取數據
@Test
public void testGetData() throws Exception {byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));
}// 更新數據
@Test
public void testSetData() throws Exception {curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));
}// 刪除節點
@Test
public void testDelete() throws Exception {String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
異步接口
@Test
public void testAsync() throws Exception {// 默認在EventThread中執行curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}).forPath(ZK_NODE);// 指定自定義線程池ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}, executorService).forPath(ZK_NODE);
}
監聽器機制

Curator提供了三種Cache監聽模式:

  1. NodeCache - 監聽單個節點
public class NodeCacheTest {public static final String NODE_CACHE = "/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(() -> {log.info("{} path nodeChanged: ", NODE_CACHE);printNodeData();});nodeCache.start();}
}
  1. PathChildrenCache - 監聽子節點(不包含二級子節點)
public class PathCacheTest {public static final String PATH = "/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener((client, event) -> {log.info("event: {}", event);});pathChildrenCache.start(true);}
}
  1. TreeCache - 監聽當前節點及所有遞歸子節點
public class TreeCacheTest {public static final String TREE_CACHE = "/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener((client, event) -> {log.info("tree cache: {}", event);});treeCache.start();}
}

2. ZooKeeper在分布式命名服務中的實戰

2.1 分布式API目錄

Dubbo框架使用ZooKeeper實現分布式JNDI功能:

  • 服務提供者在啟動時向/dubbo/${serviceName}/providers節點寫入API地址
  • 服務消費者訂閱該節點下的URL地址,獲取所有服務提供者的API

2.2 分布式節點命名

動態節點命名方案:

  1. 使用數據庫自增ID特性
  2. 使用ZooKeeper持久順序節點的順序特性

ZooKeeper方案流程:

  • 啟動服務,連接ZooKeeper,檢查/創建根節點
  • 在根節點下創建臨時順序節點,取回編號作為NodeId
  • 根據需要刪除臨時順序節點

2.3 分布式ID生成器

方案對比
  1. Java UUID
  2. Redis INCR/INCRBY操作
  3. Twitter SnowFlake算法
  4. ZooKeeper順序節點
  5. MongoDB ObjectId
基于ZooKeeper的實現
public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String makeId(String path) throws Exception {String str = createSeqNode(path);if (null != str) {int index = str.lastIndexOf(path);if (index >= 0) {index += path.length();return index <= str.length() ? str.substring(index) : "";}}return str;}
}
基于SnowFlake算法的實現
public class SnowflakeIdGenerator {private static final long START_TIME = 1483200000000L;private static final int WORKER_ID_BITS = 13;private static final int SEQUENCE_BITS = 10;private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;private static final long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;private long workerId;private long lastTimestamp = -1L;private long sequence = 0L;public synchronized void init(long workerId) {if (workerId > MAX_WORKER_ID) {throw new IllegalArgumentException("worker Id wrong: " + workerId);}this.workerId = workerId;}private synchronized long generateId() {long current = System.currentTimeMillis();if (current < lastTimestamp) {return -1; // 時鐘回撥}if (current == lastTimestamp) {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE) {current = this.nextMs(lastTimestamp);}} else {sequence = 0L;}lastTimestamp = current;long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}
}

3. ZooKeeper實現分布式隊列

3.1 設計思路

  1. 創建持久節點作為隊列根節點
  2. 入隊:在根節點下創建臨時有序節點
  3. 出隊:獲取最小序號節點,讀取數據后刪除

3.2 Curator實現

public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 序列化器QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 消費者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消費消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 創建隊列(可指定鎖路徑保證原子性)DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).lockPath("/orderlock") // 可選:分布式鎖路徑.buildQueue();queue.start();// 生產消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生產消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}

3.3 注意事項

  • ZooKeeper不適合大數據量存儲,官方不推薦作為隊列使用
  • 在吞吐量不高的小型系統中較為適用
  • 使用鎖路徑(lockPath)可保證操作的原子性和順序性
  • 不指定鎖路徑可提高性能,但可能面臨并發問題

總結

ZooKeeper提供了強大的分布式協調能力,通過原生API或Curator客戶端可以實現多種分布式場景下的解決方案。在選擇方案時需要根據具體需求權衡性能、一致性和復雜性,特別是在高并發場景下需要考慮ZooKeeper的適用性和局限性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/96704.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/96704.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/96704.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

0303 【軟考高項】項目管理概述 - 組織系統(項目型組織、職能型組織、矩陣型組織)

0303 【軟考高項】項目管理概述 - 組織系統&#xff08;項目型組織、職能型組織、矩陣型組織&#xff09; 目錄0303 【軟考高項】項目管理概述 - 組織系統&#xff08;項目型組織、職能型組織、矩陣型組織&#xff09;一、基本概念二、職能型組織二、項目型組織三、矩陣型組織3…

計算機視覺與模式識別前沿一覽:2025年8月arXiv 熱點研究趨勢解析

本推文分析了arXiv中Computer Vision and Patteren Recognition(計算機視覺與模式識別)領域2025年8月發布的近50篇論文的研究熱點&#xff0c;旨在幫助讀者快速了解近期領域內的前沿技術與研究方向。arXiv是全球最具影響力的開放電子預印本平臺之一&#xff0c;由美國國家科學基…

vim復制本地到linux服務器上,換行縮進過大,不對的問題

所搜的試了:setlocal shiftwidth? :setlocal tabstop? :setlocal expandtab? :setlocal softtabstop?" 設置為 4 個空格縮進 :setlocal shiftwidth4" 通常你會希望 tabstop 和 softtabstop 也保持一致 :setlocal tabstop4 :setlocal softtabstop4嘗試完不起作用&…

【小程序】微信小程序九宮格抽獎動畫(完整版)

這是一個微信小程序九宮格抽獎頁面的完整代碼&#xff0c;包括 WXML、WXSS、JS 和 JSON。 效果 九宮格抽獎功能說明&#xff1a; 靜態頁面布局&#xff1a; 3x3 九宮格&#xff0c;中間是“立即抽獎”按鈕&#xff0c;周圍是獎品金額。抽獎動畫&#xff1a; 點擊“立即抽獎”…

java類沖突

一、為什么會發生類沖突&#xff1f; 在 Java 的類加載機制中&#xff0c;類的唯一性是由“類加載器類的全限定名”共同決定的。當你的項目依賴了多個 jar 包&#xff0c;這些 jar 包里有同名的類&#xff08;包名和類名完全一樣&#xff09;&#xff0c;但實現卻不同。類加載器…

GIT客戶端配置支持中文

環境&#xff1a;windows10、Git-2.42.0.2-64-bit.exe1. 問題描述客戶端安裝后&#xff0c;默認是不支持中文顯示的&#xff0c;中文名的文件顯示亂碼&#xff0c;提交時打的標簽內容也不支持中文顯示。2. 解決新建Git全局配置文件&#xff0c;文件名為.gitconfig&#xff0c;內…

Teable vs NocoDB 開源、在線協同 多維表格大PK

文章目錄 Teable 簡介 特性 docker-compose部署 功能截圖 NocoDB 簡介 docker-compose部署 功能截圖 總結 Teable 簡介 Teable 是一款企業級高性能多維表格解決方案,通過無代碼方式快速構建業務管理系統,支持私有部署和精細權限管理。 官方文檔 特性 ?? 卓越性能 輕松處…

SQL專家云能做哪些事兒?

背景數據庫是信息化的基石&#xff0c;支撐著整個業務系統&#xff0c;發揮著非常重要的作用&#xff0c;被喻為“IT的心臟”。因此&#xff0c;讓數據庫安全、穩定、高效地運行已經成為IT管理者必須要面對的問題。但是很多組織沒有專業的DBA&#xff0c;數據庫運維面臨著極大的…

Python 高效實現 Word 轉 PDF:告別 Office 依賴

在工作中&#xff0c;經常會遇到需要把 Word 文檔轉換成 PDF 的情況。比如生成報表、分發文檔、或者做歸檔保存&#xff0c;PDF 格式在排版和跨平臺顯示上更穩定。傳統的做法往往依賴 Microsoft Office 或 LibreOffice 等軟件來完成轉換&#xff0c;但在自動化環境&#xff08;…

SQL優化簡單思路

1. 背景 在實際生產中&#xff0c;因為SQL較慢、SQL關聯不合理、不了解索引的性質、不熟悉mysql執行計劃分析&#xff0c;可能會出現一些生產事故&#xff0c;本文會簡單說明SQL通常的優化分析思路。 基本的優化原則&#xff1a; 先優化SQL再優化mysql server最后優化硬件 2. 優…

軟考 系統架構設計師系列知識點之雜項集萃(144)

接前一篇文章:軟考 系統架構設計師系列知識點之雜項集萃(143) 第268題 甲、乙、丙、丁4人加工A、B、C、D四種工件所需工時如下表所示。指派每人加工一種工件,四人加工四種工件其總工時最短的最優方案中,工件B應由()加工。 A B C D 甲

P1168 中位數

題目描述給定一個長度為 N 的非負整數序列 A&#xff0c;對于前奇數項求中位數。輸入格式第一行一個正整數 N。第二行 N 個正整數 A1…N?。輸出格式共 ?2N1?? 行&#xff0c;第 i 行為 A1…2i?1? 的中位數。輸入輸出樣例輸入 #1復制7 1 3 5 7 9 11 6輸出 #11 3 5 6輸入 #…

【CE】圖形化CE游戲教程通關手冊

【CE】圖形化CE游戲教程通關手冊 文章目錄【CE】圖形化CE游戲教程通關手冊導讀需求1?? 第一關提示操作總結2?? 第二關&#xff08;代碼共享&#xff09;提示操作驗證3?? 第三關提示提示總結導讀 需求 除了Tutorial-x86_64.exe教程外&#xff0c;CE還提供了圖形化教程gtu…

leetcode 2785. 將字符串中的元音字母排序 中等

給你一個下標從 0 開始的字符串 s &#xff0c;將 s 中的元素重新 排列 得到新的字符串 t &#xff0c;它滿足&#xff1a;所有輔音字母都在原來的位置上。更正式的&#xff0c;如果滿足 0 < i < s.length 的下標 i 處的 s[i] 是個輔音字母&#xff0c;那么 t[i] s[i] 。…

支付子系統架構及常見問題

支付流程對于支付系統來說&#xff0c;它最重要的其實是安全&#xff0c;所以整個支付流程采用秘鑰加簽的方式進行操作&#xff0c;一共四對秘鑰&#xff0c;以支付寶在線支付為例子&#xff0c;首先通過RSA2算法生成商戶公鑰以及商戶私鑰&#xff0c;同時支付寶平臺會提供支付…

內存傳輸速率MT/s

1 0 0 0 0 0 0 0 0 010 9 8 7 6 5 4 3 2 1十 億 千 百 十 萬 千 百 十 個億 萬 萬 萬傳輸速率 …

.env文件的作用和使用方法

目錄 什么是 .env 文件&#xff1f; 為什么要使用 .env 文件&#xff1f;&#xff08;好處&#xff09; 如何使用 .env 文件&#xff1f; 通用步驟&#xff1a; 具體技術棧中的實現&#xff1a; 最佳實踐和注意事項 總結 什么是 .env 文件&#xff1f; .env 文件&#x…

深度拆解 Python 裝飾器參數傳遞:從裝飾器生效到參數轉交的每一步

在 Python 裝飾器的學習中&#xff0c;“被裝飾函數的參數如何傳遞到裝飾器內層函數”是一個高頻疑問點。很多開發者能寫出裝飾器的基本結構&#xff0c;卻對參數傳遞的底層邏輯一知半解。本文將以一段具體代碼為例&#xff0c;把參數傳遞過程拆成“裝飾器生效→調用觸發→參數…

【Vue2 ?】Vue2 入門之旅 · 進階篇(七):Vue Router 原理解析

在前幾篇文章中&#xff0c;我們介紹了 Vue 的性能優化機制、組件緩存等內容。本篇將深入解析 Vue Router 的原理&#xff0c;了解 Vue 如何管理路由并進行導航。 目錄 Vue Router 的基本概念路由模式&#xff1a;hash 和 history路由匹配原理導航守衛Vue Router 的路由過渡動…

Linux磁盤級文件/文件系統理解

Linux磁盤級文件/文件系統理解 1. 磁盤的物理結構 磁盤的核心是一個利用磁性介質和機械運動進行數據讀寫的、非易失性的存儲設備。 1.1 盤片 盤片是傳統機械硬盤中最核心的部件&#xff0c;它是數據存儲的物理載體。盤片是一個堅硬的、表面極度光滑的圓形碟片&#xff0c;被安裝…