ZooKeeper學習專欄(五):Java客戶端開發(原生API)詳解

文章目錄

  • 前言
  • 一、核心類解析
    • 1.1 ZooKeeper類 - 連接管理核心
    • 1.2 Watcher接口 - 事件處理核心
  • 二、原生API實踐
    • 2.1 創建會話(連接管理)
    • 2.2 創建節點(支持多種類型)
    • 2.3 獲取節點數據和狀態信息
    • 2.4 修改節點數據(版本控制)
    • 2.5 刪除節點(版本控制)
    • 2.6 注冊Watcher監聽節點變化
    • 2.7 處理連接狀態變化事件
  • 三、最佳實踐與注意事項
  • 總結


前言

本文是Zookeeper第五個學習專欄,將深入探討如何使用原生Java API進行Zookeeper客戶端開發。通過詳細的代碼示例和注釋,幫助開發者掌握核心API的使用方法


一、核心類解析

前置條件先引入Zookeeper客戶端依賴,在Maven項目中添加以下依賴:

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.1</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions>
</dependency>

注意事項:
客戶端版本應與服務端版本匹配。
建議排除沖突的日志依賴,使用項目統一的日志框架。

在ZooKeeper的Java客戶端開發中,有兩個核心類構成了整個API的基礎框架:ZooKeeper類負責連接管理和基礎操作,Watcher接口負責事件處理機制。下面我們將深入剖析這兩個核心組件。

1.1 ZooKeeper類 - 連接管理核心

ZooKeeper類是客戶端與ZooKeeper服務交互的主要入口,負責:

  • 建立和維護與ZooKeeper集群的連接。
  • 管理客戶端會話生命周期。
  • 提供節點操作API(CRUD)。
  • 處理請求響應和序列化。

1. 構造方法:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException

參數解析:

參數類型說明示例值
connectStringString集群連接字符串 格式:host1:port1,host2:port2“zk1:2181,zk2:2181,zk3:2181”
sessionTimeoutint會話超時時間(毫秒) 服務器端最小會話超時為tickTime*23000
watcherWatcher全局事件處理器 處理連接狀態變化new MyWatcher()

2. 核心方法詳解:
節點操作API

// 創建節點
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)// 刪除節點
void delete(String path, int version)// 獲取節點數據
byte[] getData(String path, boolean watch, Stat stat)// 設置節點數據
Stat setData(String path, byte[] data, int version)// 檢查節點是否存在
Stat exists(String path, boolean watch)// 獲取子節點列表
List<String> getChildren(String path, boolean watch)

連接管理

// 獲取當前會話ID
long getSessionId()// 獲取會話密碼(用于重連)
byte[] getSessionPasswd()// 獲取連接狀態
States getState()// 關閉連接
void close()

4. 連接狀態枚舉(States)

public enum States {CONNECTING,     // 連接建立中ASSOCIATING,    // 關聯中CONNECTED,      // 已連接CONNECTEDREADONLY, // 只讀連接CLOSED,         // 已關閉AUTH_FAILED,    // 認證失敗NOT_CONNECTED;  // 未連接
}

1.2 Watcher接口 - 事件處理核心

1. 接口定義與事件模型

public interface Watcher {void process(WatchedEvent event);
}

Watcher采用觀察者模式,當ZooKeeper狀態變化或節點變更時,會通過process()方法回調通知客戶端。
2. WatchedEvent結構分析
WatchedEvent包含三個關鍵信息:

public class WatchedEvent {private final KeeperState keeperState; // 連接狀態private final EventType eventType;     // 事件類型private final String path;            // 事件路徑
}

3. 連接狀態(KeeperState)

狀態觸發條件處理建議
SyncConnected成功連接到集群恢復正常操作
Disconnected與集群斷開連接暫停寫操作,嘗試重連
Expired會話超時重建連接,恢復臨時節點
AuthFailed認證失敗檢查ACL配置
ConnectedReadOnly連接到只讀服務器避免寫操作

4. 節點事件類型(EventType)

事件類型觸發條件注冊方式
NodeCreated節點被創建exists()
NodeDeleted節點被刪除exists()/getData()
NodeDataChanged節點數據變更getData()
NodeChildrenChanged子節點變化getChildren()
DataWatchRemoved數據監視移除系統自動
ChildWatchRemoved子節點監視移除系統自動

5. Watcher特性深度解析
(1) 一次性觸發機制
特性:Watcher在觸發后會自動失效
影響:需要重新注冊才能繼續監聽
解決方案

@Override
public void process(WatchedEvent event) {if (event.getType() == EventType.NodeDataChanged) {try {// 重新注冊WatcherzooKeeper.getData(event.getPath(), this, null);} catch (Exception e) {// 處理異常}}
}

(2) 輕量級通知
特性:事件通知不包含具體變更內容
優勢:減少網絡傳輸開銷
處理流程
輕量級通知
(3) 順序保證
特性:客戶端按事件發生的順序接收通知
重要性:確保狀態一致性
示例場景
節點數據變更(setData)
節點刪除(delete)
客戶端將按此順序收到NodeDataChanged和NodeDeleted事件

(4) 會話事件優先級
特性:連接狀態事件優先于節點事件
影響:當連接斷開時,節點事件可能丟失
處理方案

public void process(WatchedEvent event) {// 優先處理連接狀態事件if (event.getState() != KeeperState.SyncConnected) {handleSessionEvent(event.getState());return;}// 處理節點事件handleNodeEvent(event.getType(), event.getPath());
}

6. Watcher注冊機制
下面給出三種注冊方式:
構造方法注冊:全局連接狀態Watcher

ZooKeeper zk = new ZooKeeper(connectString, timeout, globalWatcher);

API調用注冊:操作時指定Watcher

zk.getData("/node", specificWatcher, null);

默認Watcher:使用構造方法的Watcher

zk.exists("/node", true); // true表示使用默認Watcher

核心類協作流程:
協作流程

二、原生API實踐

2.1 創建會話(連接管理)

public class ZookeeperConnector implements Watcher {private static final CountDownLatch connectedLatch = new CountDownLatch(1);private ZooKeeper zooKeeper;public ZooKeeper connect(String hosts, int timeout) throws Exception {zooKeeper = new ZooKeeper(hosts, timeout, this);connectedLatch.await(); // 等待連接建立return zooKeeper;}@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {connectedLatch.countDown(); // 連接建立時釋放鎖System.out.println("Successfully connected to ZooKeeper!");}}public static void main(String[] args) throws Exception {ZookeeperConnector connector = new ZookeeperConnector();ZooKeeper zk = connector.connect("localhost:2181", 3000);// 執行后續操作...zk.close();}
}

2.2 創建節點(支持多種類型)

// 創建持久節點
String persistentPath = zk.create("/test-persistent",        // 節點路徑"persistent data".getBytes(), // 節點數據ZooDefs.Ids.OPEN_ACL_UNSAFE, // ACL權限控制CreateMode.PERSISTENT       // 節點類型
);
System.out.println("Created persistent node: " + persistentPath);// 創建臨時順序節點
String ephemeralPath = zk.create("/test-ephemeral-",        // 注意結尾的破折號"ephemeral data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL // 臨時順序節點
);
System.out.println("Created ephemeral node: " + ephemeralPath);

2.3 獲取節點數據和狀態信息

// 獲取節點數據(不注冊Watcher)
byte[] data = zk.getData("/test-persistent", false, null);
System.out.println("Node data: " + new String(data));// 獲取節點狀態信息(Stat對象)
Stat stat = new Stat();
byte[] dataWithStat = zk.getData("/test-persistent", false, stat);// 輸出節點狀態信息
System.out.println("Version: " + stat.getVersion()); // 數據版本
System.out.println("Ctime: " + new Date(stat.getCtime())); // 創建時間
System.out.println("Mtime: " + new Date(stat.getMtime())); // 修改時間
System.out.println("Num children: " + stat.getNumChildren()); // 子節點數

2.4 修改節點數據(版本控制)

// 先獲取當前版本
Stat currentStat = zk.exists("/test-persistent", false);
int currentVersion = currentStat.getVersion();// 更新數據(指定版本)
Stat newStat = zk.setData("/test-persistent","updated data".getBytes(),currentVersion // 指定版本確保原子操作
);
System.out.println("New version: " + newStat.getVersion());// 錯誤示例:使用過期版本
try {zk.setData("/test-persistent", "wrong data".getBytes(), currentVersion);
} catch (KeeperException.BadVersionException e) {System.err.println("Version conflict: " + e.getMessage());
}

2.5 刪除節點(版本控制)

// 獲取當前版本
Stat delStat = zk.exists("/test-to-delete", false);
if (delStat != null) {zk.delete("/test-to-delete", delStat.getVersion());System.out.println("Node deleted successfully");
}// 遞歸刪除非空節點(原生API需自行實現遞歸)
deleteRecursive(zk, "/parent-node");private void deleteRecursive(ZooKeeper zk, String path) throws Exception {List<String> children = zk.getChildren(path, false);for (String child : children) {deleteRecursive(zk, path + "/" + child);}zk.delete(path, -1); // -1 忽略版本檢查
}

2.6 注冊Watcher監聽節點變化

public class NodeWatcher implements Watcher {private final ZooKeeper zk;public NodeWatcher(ZooKeeper zk) {this.zk = zk;}@Overridepublic void process(WatchedEvent event) {try {if (event.getType() == Event.EventType.NodeDataChanged) {System.out.println("Node data changed: " + event.getPath());// 重新注冊Watcher(Watcher是單次的)zk.getData(event.getPath(), this, null);} else if (event.getType() == Event.EventType.NodeChildrenChanged) {System.out.println("Node children changed: " + event.getPath());// 重新注冊子節點Watcherzk.getChildren(event.getPath(), this);}} catch (Exception e) {e.printStackTrace();}}public void watchNode(String path) throws Exception {// 注冊數據變更Watcherzk.getData(path, this, null);// 注冊子節點變更Watcherzk.getChildren(path, this);}
}// 使用示例
NodeWatcher watcher = new NodeWatcher(zk);
watcher.watchNode("/test-watch");

2.7 處理連接狀態變化事件

public class ConnectionWatcher implements Watcher {private ZooKeeper zk;private volatile boolean connected = false;private volatile boolean expired = false;public ZooKeeper connect(String hosts) throws Exception {zk = new ZooKeeper(hosts, 3000, this);while (!connected) {Thread.sleep(100);}return zk;}@Overridepublic void process(WatchedEvent event) {switch (event.getState()) {case SyncConnected:connected = true;System.out.println("Connected to ZooKeeper cluster");break;case Disconnected:connected = false;System.out.warn("Disconnected from ZooKeeper cluster");break;case Expired:expired = true;connected = false;System.err.println("Session expired. Need to reinitialize.");break;case AuthFailed:System.err.println("Authentication failed");break;}}public void close() throws InterruptedException {zk.close();}public boolean isConnected() {return connected;}public boolean isExpired() {return expired;}
}

三、最佳實踐與注意事項

  1. 連接管理:
    • 使用CountDownLatch確保連接建立后再執行操作。
    • 實現自動重連機制處理Disconnected狀態。
    • 會話過期后需要重建所有臨時節點和Watcher。
  2. Watcher使用要點:
    • Watcher是單次觸發的,事件處理后需重新注冊。
    • 在連接斷開期間發生的事件不會觸發Watcher。
    • 避免在Watcher中進行長時間阻塞操作。
  3. 版本控制:
    • 使用版本號實現樂觀鎖控制
    • 在并發更新場景中必須處理BadVersionException
    • -1表示忽略版本檢查(慎用)
  4. 異常處理:
try {// Zookeeper操作
} catch (KeeperException e) {switch (e.code()) {case NONODE:// 節點不存在處理break;case NODEEXISTS:// 節點已存在處理break;// 其他錯誤碼處理...}
} catch (InterruptedException e) {Thread.currentThread().interrupt();
}

總結

本文系統介紹了使用ZooKeeper原生Java API進行客戶端開發的核心技術:通過ZooKeeper類管理集群連接和會話生命周期,利用Watcher接口處理連接狀態變化(SyncConnected/Disconnected/Expired)和節點事件(數據變更/子節點變化);詳細演示了節點CRUD操作(含版本控制機制)、Watcher注冊策略及一次性觸發特性;強調連接管理的最佳實踐(CountDownLatch同步、會話恢復)、異常處理方案(KeeperException錯誤碼解析)和高效監聽模式設計,為構建分布式協調服務提供堅實基礎。

完整流程示意圖:
完整流程

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/91516.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/91516.shtml
英文地址,請注明出處:http://en.pswp.cn/web/91516.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

卸油管鏈接檢測誤報率↓76%:陌訊多模態融合算法實戰解析

原創聲明本文為原創技術解析&#xff0c;核心技術參數與架構設計引用自《陌訊技術白皮書》&#xff0c;禁止未經授權的轉載與商用。一、行業痛點&#xff1a;卸油管鏈接檢測的三大技術瓶頸在石化倉儲與運輸場景中&#xff0c;卸油管鏈接的密封性檢測是保障安全生產的關鍵環節。…

MongoDB用戶認證authSource

文章目錄authSource遇到的問題authSource MongoDB用戶認證邏輯與以往我認知的關系型數據庫邏輯不太一樣&#xff0c;多了一層用戶與數據庫關系的綁定。 在建立用戶時&#xff0c;需要先指定數據庫&#xff0c;則存在一個概念&#xff1a;用戶歸屬于數據庫。額外&#xff0c;依…

插件升級:Chat/Builder 合并,支持自定義 Agent、MCP、Rules

TRAE 插件全新升級&#xff0c;Chat、Builder 合并&#xff0c;支持自定義智能體、MCP 及自定義規則&#xff0c;體驗對齊 IDE&#xff0c;現已上線 JetBrains 和 VSCode。 1. Chat/Builder 合并&#xff0c;一個對話框即可智能協作 在 TRAE 插件的 Chat 對話框中&#xff0…

【歷史人物】【王安石】簡歷與生平

目錄 一、王安石個人簡歷 二、個人主要經歷 三、個人成就及影響 1、散文 2、詩歌 3、詞 四、經典評價摘錄 一、王安石個人簡歷 基本信息? 姓名&#xff1a;王安石&#xff0c;字介甫&#xff0c;號半山。小名獾郎 性別&#xff1a;男 年齡&#xff1a;1021年-1086年…

Codeforces Round 1040 (Div. 2) A - D題詳細題解

本文為Codeforces Round 1040 (Div. 2) A - D題的詳細題解, 覺得有幫助或者寫的不錯可以點個贊&#xff01; 目錄 題目A: 題目大意: 解題思路: 代碼(C): 題目B: 題目大意: 解題思路: 代碼(C): 題目C: 題目大意: 解題思路: 代碼(C): 題目D: 題目大意: 解題思路:…

數據結構 之 【排序】(計數排序)

目錄 1.計數排序的思想 2.計數排序圖解 3.計數排序代碼邏輯 3.1求原數組最大最小值及計數數組的創建 3.2計數 3.3覆蓋寫 3.4釋放資源 4.計數排序的注意事項 5.計數排序的時間復雜度與空間復雜度 以升序為例 1.計數排序的思想 前面我們學習的快排、歸并排序、希爾排序.…

Ascend CANN/ACL API 模型部署加速最佳實踐

1. 模型輸入相關問題 圖像尺寸信息 模型輸入尺寸由原始模型決定,在轉換時固定 圖像尺寸信息是模型固有屬性,不是轉換時添加的 對于使用動態尺寸,可以在推理時自動根據當前的輸入尺寸推導輸出尺寸。 輸入格式(NCHW/NHWC) --input_format 不同框架默認格式不同: Caffe: 支持…

QT信號和槽怎么傳輸自己定義的數據結構

在 Qt 中&#xff0c;信號&#xff08;Signal&#xff09;和槽&#xff08;Slot&#xff09;機制默認支持許多內置類型&#xff08;如 int、QString、QList 等&#xff09;&#xff0c;但如果要傳輸 自定義數據結構&#xff08;如結構體、類對象&#xff09;&#xff0c;需要額…

借助于llm將pdf轉化為md文本

pdf轉化為md格式后&#xff0c;意味著非結構化文本轉為結構化文本&#xff0c;能清晰定位大標題、子標題&#xff0c;圖表。 方便后續處理&#xff0c;因為llamaindex和langchain能更有效切分md類文本&#xff0c;避免信息丟失。 1&#xff09;讀取pdf為txt 讀取pdf&#xf…

設計模式:中介者模式 Mediator

目錄前言問題解決方案結構代碼前言 中介者是一種行為設計模式&#xff0c;能讓你減少對象之間混亂無序的依賴關系。該模式會限制對象之間的直接交互&#xff0c;迫使它們通過一個中介者對象進行合作。 問題 假如你有一個創建和修改客戶資料的對話框&#xff0c; 它由各種控件…

計算機基礎速通--數據結構·線性表應用

如有問題大概率是我的理解比較片面&#xff0c;歡迎評論區或者私信指正。 考察線性表&#xff0c;核心圍繞其存儲結構特性、核心操作實現、場景應用選型三大維度&#xff0c;重點檢驗對基礎概念的理解、代碼實現能力及問題分析能力&#xff0c;通常會結合算法設計、復雜度分析和…

LeetCode Hot 100:42. 接雨水

題目 給定 n 個非負整數表示每個寬度為 1 的柱子的高度圖&#xff0c;計算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 解析 和題目 盛水最多的容器 類似&#xff0c; LeetCode Hot 100&#xff1a;11. 盛最多水的容器-CSDN博客 只是這里將每一個柱子視為一個寬度為…

【C語言入門級教學】字符指針變量

文章目錄1.字符指針變量2. 數組指針變量2.1 數組指針變量初始化3.?維數組傳參的本質1.字符指針變量 在指針的類型中我們知道有?種指針類型為字符指針 char* ; ?般使?: int main() { char ch w; char* pc &ch;//pc的類型是char**pcw;//對pc解引用 修改ch存放的內容…

【Shell腳本自動化編寫——報警郵件,檢查磁盤,web服務檢測】

Shell腳本自動化編寫Shell腳本自動化編寫一、判斷當前磁盤剩余空間是否有20G&#xff0c;如果小于20G&#xff0c;則將報警郵件發送給管理員&#xff0c;每天檢查一次磁盤剩余空間。第一步&#xff1a;準備工作第二步&#xff1a;配置郵件信息第三步&#xff1a;檢查磁盤的自動…

Java 接口(下)

三、接口的繼承性【基礎重點】 1. Java中的接口之間的繼承關系是多繼承&#xff0c;一個接口可以有多個父接口(1) 語法&#xff1a;interface 接口名 extends 父接口1,父接口2{} 2. 類和接口之間是多實現的關系&#xff1a;一個類可以同時實現多個接口(1) 語法&#xff1a;clas…

學習游戲制作記錄(各種水晶能力以及多晶體)8.1

1.實現創建水晶并且能與水晶進行交換位置的能力創建好水晶的預制體&#xff0c;添加動畫控制器&#xff0c;傳入待機和爆炸的動畫創建Crystal_Skill_Control腳本&#xff1a;掛載在水晶預制體上private float crystalExstTime;//水晶存在時間public void SetupCrystal(float _c…

在vscode 如何運行a.nut 程序(Squirrel語言)

在 VS Code 中運行 Squirrel 語言編寫的 .nut 程序&#xff0c;需要先配置 Squirrel 運行環境并安裝相關插件&#xff0c;具體步驟如下&#xff1a; 一、安裝 Squirrel 解釋器 Squirrel 程序需要通過其官方解釋器 squirrel 或 sq 執行&#xff0c;首先需要安裝解釋器&#xf…

【數據結構】生活中的數據結構:從吃飯與編程看棧與隊列思維

生活中的數據結構&#xff1a;從吃飯與編程看棧與隊列思維 在軟件開發的世界里&#xff0c;棧&#xff08;Stack&#xff09;和隊列&#xff08;Queue&#xff09;是兩種基礎的數據結構&#xff0c;它們以不同的順序管理數據&#xff1a;棧遵循后進先出&#xff08;LIFO&#x…

牛客——接頭密匙

題目鏈接&#xff1a;牛客--接頭密匙 該題是一個很顯然的前綴樹問題&#xff0c;只需要構建a中所有數組對應的前綴樹&#xff0c;之后求b所處前綴個數即可。關于前綴樹的構建&#xff0c;可以觀看左老師算法講解045的視頻&#xff0c;簡單來講就是用特殊字符將實際數據隔開&…

【Linux基礎知識系列】第六十三篇 - 文件編輯器基礎:vim

在 Linux 系統中&#xff0c;文本編輯器是系統管理員和開發人員不可或缺的工具。vim 是一個功能強大的文本編輯器&#xff0c;廣泛應用于 Linux 系統中。它支持多種編輯模式&#xff0c;提供了豐富的文本編輯功能&#xff0c;適用于編寫代碼、配置文件和文檔。掌握 vim 的基本使…