Linux下的C/C++開發之操作Zookeeper

ZooKeeper C 客戶端簡介與安裝

ZooKeeper C API 簡介

ZooKeeper 官方提供了多語言客戶端,C 語言客戶端是最底層的實現之一,功能全面且穩定,適合嵌入式開發、系統級組件、C++ 項目集成等場景。

  • zookeeper.h 是 ZooKeeper 提供的 C 語言客戶端頭文件,包含了所有 API 函數、常量定義與結構體聲明。

  • libzookeeper_mt.so 是其線程安全版本的動態鏈接庫,支持多線程調用。

C API 底層通過 異步非阻塞機制 + 回調函數 + Watcher 模型 與服務端交互,結構靈活但稍顯底層,因此常用于構建:

  • 高性能系統組件(如 RPC 框架、服務注冊中心)

  • 嵌入式服務發現模塊

  • 自定義封裝(例如封裝為 C++ 類)

文件說明
zookeeper.hC API 頭文件(函數聲明、狀態碼、結構體)
libzookeeper_mt.so多線程安全版動態鏈接庫
libzookeeper_st.so單線程版本(適用于無并發環境)

?安裝 ZooKeeper C 客戶端開發庫

方法一:通過包管理器安裝

sudo apt update
sudo apt install libzookeeper-mt-dev

安裝完成后,關鍵文件位置如下:

文件路徑
頭文件/usr/include/zookeeper/zookeeper.h
動態庫/usr/lib/x86_64-linux-gnu/libzookeeper_mt.so
pkg-config 支持提供 .pc 文件可用于 CMake 自動識別

方法二:從源碼編譯安裝

下載安裝包
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4.tar.gz
tar -zxvf apache-zookeeper-3.8.4.tar.gz
cd apache-zookeeper-3.8.4編譯 C 客戶端庫
cd zookeeper-client/zookeeper-client-c
mkdir build && cd build
cmake ..
make -j4
sudo make install

驗證安裝成功:

pkg-config --cflags --libs zookeeper輸出:
-I/usr/include/zookeeper -lzookeeper_mt

安裝完成后,你就可以在 CMakeLists.txt 中這樣鏈接 ZooKeeper:

# 查找并加載 PkgConfig 模塊,這是使用 pkg-config 工具的前提條件。
# 如果系統中沒有找到 pkg-config,CMake 將會報錯并停止配置。
find_package(PkgConfig REQUIRED)# 使用 pkg_check_modules 函數通過 pkg-config 查找名為 "zookeeper" 的庫。
# ZK 是變量前綴,相關的信息會被存儲在以 ZK_ 開頭的變量中:
# - ZK_INCLUDE_DIRS:頭文件目錄
# - ZK_LIBRARY_DIRS:庫文件目錄
# - ZK_LIBRARIES:需要鏈接的庫名稱列表
# REQUIRED 表示這是一個必須存在的庫,如果找不到會報錯。
pkg_check_modules(ZK REQUIRED zookeeper)# 將 ZooKeeper 的頭文件路徑添加到編譯器的包含路徑中,
# 這樣在編譯時就可以找到 #include <zookeeper/zookeeper.h> 等頭文件。
include_directories(${ZK_INCLUDE_DIRS})# 將 ZooKeeper 的庫文件路徑添加到鏈接器的搜索路徑中,
# 這樣鏈接器才能找到對應的 .so 或 .a 文件。
link_directories(${ZK_LIBRARY_DIRS})# 將 ZooKeeper 庫鏈接到你的目標可執行文件或庫(my_target)上。
# ${ZK_LIBRARIES} 包含了所有需要鏈接的庫名稱(例如 -lzookeeper)。
target_link_libraries(my_target ${ZK_LIBRARIES})

ZooKeeper.h常用API介紹

會話管理函數

1.?初始化會話 -?zookeeper_init

創建與 ZooKeeper 集群的會話連接,返回會話句柄。

zhandle_t *zookeeper_init(  const char *host, // ZooKeeper 服務器地址列表(逗號分隔,如 "host1:2181,host2:2181")  watcher_fn fn,              // 全局監視器回調函數(會話事件通知)  int recv_timeout,           // 會話超時時間(毫秒,建議 ≥ 2000)  const clientid_t *clientid, // 先前會話的 clientid(斷連重試用,首次傳 NULL)  void *context,              // 用戶自定義上下文(傳遞到監視器回調)  int flags                   // 保留參數(必須為 0)  
);  

返回值:

  • 成功:指向?zhandle_t?的指針(會話句柄)

  • 失敗:NULL(需檢查?errno?或調用?zoo_state(zh)?獲取錯誤狀態)

說明:

  • 會話超時:recv_timeout?是 ZooKeeper 服務器判斷客戶端存活的心跳間隔,超時后會話失效,臨時節點會被刪除。

  • clientid:用于斷線重連時恢復會話(通過?zoo_client_id(zh)?獲取當前會話的?clientid)。

2.?關閉會話 -?zookeeper_close

主動關閉會話并釋放資源,所有臨時節點會被自動刪除。

int zookeeper_close(zhandle_t *zh);  

參數:

  • zhzookeeper_init?返回的會話句柄。

返回值:

  • ZOK?(0):關閉成功

  • ZBADARGUMENTS?(-8):無效句柄

注意:

  1. 線程安全:必須在所有異步操作完成后再調用?zookeeper_close,否則可能導致內存泄漏。

  2. 資源釋放:關閉后不可再使用該句柄發起任何操作。

  3. 臨時節點:會話關閉后,其創建的所有臨時節點(ZOO_EPHEMERAL)會被服務器自動刪除。

會話生命周期輔助函數

1.?獲取會話狀態 -?zoo_state

查詢當前會話的連接狀態(如是否已連接、認證失敗等)。

int zoo_state(zhandle_t *zh);  

返回值:

  • ZOO_CONNECTED_STATE:已連接

  • ZOO_EXPIRED_SESSION_STATE:會話已過期

  • ZOO_AUTH_FAILED_STATE:認證失敗

2.?獲取會話 ID -?zoo_client_id

獲取當前會話的?clientid,用于斷線重連時恢復會話。

clientid_t *zoo_client_id(zhandle_t *zh);  clientid_t *id = zoo_client_id(zh);  
zhandle_t *new_zh = zookeeper_init(..., id, ...);  // 復用舊會話  

監視器(Watcher)機制

Watcher 是 ZooKeeper 提供的事件通知系統,允許客戶端在以下場景中接收異步通知:

  1. 節點變更:數據修改、子節點增減、節點創建/刪除

  2. 會話狀態變化:連接建立、會話過期、認證失敗

其核心設計類似于發布-訂閱模型,但具有一次性觸發和輕量級的特點。

一次性觸發

Watcher 被觸發后立即自動注銷,后續變更不會通知。避免高頻事件風暴(例如頻繁數據變更導致大量通知)。

// 注冊 Watcher 監聽節點數據變化
zoo_wget(zh, "/config", watcher_cb, ctx, buffer, &len, NULL);// 當 /config 數據第一次變更時,watcher_cb 被觸發
// 后續 /config 再次變更不會觸發回調,除非重新注冊

事件類型與路徑綁定

每個Watcher 關聯到特定路徑和特定事件類型(如數據變更、子節點變更),通知中會包含觸發事件的路徑,方便多節點監聽時區分來源。Watcher 回調僅告知事件類型和節點路徑,不包含具體數據變更內容,需客戶端主動查詢最新狀態(如觸發后調用?zoo_get)。

Watcher工作原理

全局監視器原型 -?watcher_fn

處理會話事件(連接狀態變化)和節點事件(數據變更、子節點變更)。

typedef void (*watcher_fn)(  zhandle_t *zh,    // 會話句柄  int type,         // 事件類型(如 ZOO_SESSION_EVENT)  int state,        // 連接狀態(如 ZOO_CONNECTED_STATE)  const char *path, // 觸發事件的節點路徑(會話事件為 NULL)  void *watcherCtx  // 用戶設置的上下文  
);  

type=會話事件(ZOO_SESSION_EVENT)時會檢查state去判斷具體的連接狀態

狀態值含義處理建議
ZOO_CONNECTED_STATE連接已建立恢復業務操作
ZOO_EXPIRED_SESSION_STATE會話過期(臨時節點已刪除)必須重新初始化會話
ZOO_AUTH_FAILED_STATE認證失敗檢查 ACL 或重新調用?zoo_add_auth

type=節點事件時(type != ZOO_SESSION_EVENT)state無意義,只需要關注type和path

事件類型觸發條件典型注冊 API常見用途
ZOO_CREATED_EVENT被監視節點被創建zoo_wexists等待節點就緒
ZOO_DELETED_EVENT被監視節點被刪除zoo_wexists?/?zoo_wget資源釋放檢查
ZOO_CHANGED_EVENT被監視節點數據變更zoo_wget配置熱更新
ZOO_CHILD_EVENT子節點列表變更zoo_wget_children服務實例列表維護

Watcher 的注冊方式

1. 全局 Watcher

在?zookeeper_init?初始化會話時設置。僅接收會話狀態變化事件(如連接斷開、會話過期)。

zhandle_t *zh = zookeeper_init("127.0.0.1:2181", global_watcher, 3000, NULL, NULL, 0);void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *ctx) {if (type == ZOO_SESSION_EVENT) {// 處理會話事件}
}

2. 局部 Watcher

通過?zoo_wexistszoo_wget?等帶?w?前綴的 API 注冊。監聽特定節點的特定事件類型。

// 監聽節點創建/刪除
zoo_wexists(zh, "/service/new", service_watcher, "create_ctx", NULL);// 監聽數據變更
zoo_wget(zh, "/config", data_watcher, "config_ctx", buffer, &len, NULL);

節點操作相關函數

同步節點操作

1.?創建節點 -?zoo_create

同步創建指定路徑的節點,支持臨時(Ephemeral)、順序(Sequential)和持久化類型。

int zoo_create(  zhandle_t *zh,                 // 會話句柄  const char *path,              // 節點路徑(如 "/services/node")  const char *value,             // 節點數據(NULL 表示空數據)  int valuelen,                  // 數據長度(若 value=NULL 則傳 -1)  const struct ACL_vector *acl,  // 節點權限(如 &ZOO_OPEN_ACL_UNSAFE)  int flags,                     // 節點標志(ZOO_EPHEMERAL | ZOO_SEQUENCE)  char *path_buffer,             // 輸出實際節點路徑(用于順序節點)  int path_buffer_len            // 輸出緩沖區長度  
);  

返回值:

  • ZOK?:創建成功

  • ZNODEEXISTS?:節點已存在

  • ZNOCHILDRENFOREPHEMERALS?:嘗試在臨時節點下創建子節點

  • ZINVALIDSTATE?:會話已失效

節點創建標志:

標志常量作用
ZOO_EPHEMERAL1臨時節點(會話結束自動刪除)
ZOO_SEQUENCE2順序節點(路徑自動追加全局唯一序號,如?/lock-0000000001
ZOO_CONTAINER4容器節點(當最后一個子節點被刪除后,服務器會異步刪除該容器節點)
ZOO_PERSISTENT0默認持久化節點(顯式刪除或 ACL 清理前永久存在)

注意:

  • flags:

    • ZOO_EPHEMERAL:臨時節點(會話結束自動刪除)

    • ZOO_SEQUENCE:順序節點(路徑自動追加全局唯一序號,如?/node0000000001

  • path_buffer:若創建順序節點,此緩沖區返回實際路徑(需預分配足夠空間)。

2.?刪除節點 -?zoo_delete

同步刪除指定節點(必須無子節點且版本匹配)。

int zoo_delete(  zhandle_t *zh,         // 會話句柄  const char *path,      // 節點路徑  int version            // 期望版本號(-1 表示忽略版本檢查)  
);  

返回值:

  • ZOK:刪除成功

  • ZNONODE:節點不存在

  • ZNOTEMPTY:節點存在子節點

  • ZBADVERSION:版本號不匹配

注意事項:

  • 版本控制:傳入?version=-1?可跳過版本檢查,否則需與節點當前版本一致。

  • 原子性:刪除操作是原子的,若失敗則節點狀態不變。

3.?檢查節點是否存在 -?zoo_exists

同步檢查節點是否存在,并可注冊監視器(Watcher)監聽節點創建/刪除事件。

int zoo_exists(  zhandle_t *zh,            // 會話句柄  const char *path,         // 節點路徑  int watch,                // 是否注冊監視器(1=注冊,0=不注冊)  struct Stat *stat         // 輸出節點元數據(可置 NULL)  
);  

返回值:

  • ZOK:節點存在

  • ZNONODE:節點不存在

  • ZNOAUTH?:無權訪問

Stat 結構體字段(部分關鍵字段):

struct Stat {  int64_t czxid;          // 創建該節點的事務ID  int64_t mzxid;          // 最后修改的事務ID  int64_t ctime;          // 創建時間(毫秒 epoch)  int32_t version;        // 數據版本號  int32_t cversion;       // 子節點版本號  int32_t aversion;       // ACL版本號  // ... 其他字段省略  
};  

4.?獲取節點數據 -?zoo_get

同步獲取節點數據及元數據,可注冊監視器監聽數據變更事件。

int zoo_get(  zhandle_t *zh,            // 會話句柄  const char *path,         // 節點路徑  int watch,                // 是否注冊監視器(1=注冊,0=不注冊)  char *buffer,             // 輸出數據緩沖區  int *buffer_len,          // 輸入緩沖區長度,輸出實際數據長度  struct Stat *stat         // 輸出節點元數據  
);  

返回值:

  • ZOK:獲取成功

  • ZNONODE:節點不存在

  • ZNOAUTH:無權訪問

數據讀取注意事項:

  1. 緩沖區管理:

    • 調用前需預分配?buffer,并通過?buffer_len?傳入其長度。

    • 若緩沖區不足,函數返回?ZOK?但?*buffer_len?會被設為實際所需長度。

  2. 數據截斷:若數據超過緩沖區長度,會被靜默截斷(無錯誤提示)。

5.?更新節點數據 -?zoo_set

同步更新節點數據,支持版本檢查。

int zoo_set(  zhandle_t *zh,         // 會話句柄  const char *path,      // 節點路徑  const char *buffer,    // 新數據緩沖區  int buflen,            // 數據長度(-1 表示以 strlen(buffer) 計算)  int version            // 期望版本號(-1=忽略版本檢查)  
);  

返回值:

  • ZOK:更新成功

  • ZBADVERSION?(-103):版本號不匹配

  • ZNONODE:節點不存在

版本控制邏輯:

  • 每次數據更新后,節點的?version?會遞增。

  • 若傳入?version=5,則僅當節點當前版本為 5 時才會更新。

6. 獲取直接子節點列表 -?zoo_get_children

同步獲取指定節點的所有直接子節點名稱列表,并可選擇注冊監視器監聽直接子節點變更事件(創建/刪除子節點)。

int zoo_get_children(  zhandle_t *zh,               // 會話句柄  const char *path,            // 父節點路徑(如 "/services")  int watch,                   // 是否注冊監視器(1=注冊,0=不注冊)  struct String_vector *children // 輸出子節點名稱數組,需手動調用 deallocate_String_vector 釋放
);  

返回值:

  • ZOK:獲取成功

  • ZNONODE:父節點不存在

  • ZNOAUTH:無權訪問父節點

  • ZINVALIDSTATE:會話已失效

7. 增強版子節點獲取 -?zoo_get_children2

在?zoo_get_children?基礎上,額外返回父節點的元數據(Stat?結構體)。

int zoo_get_children2(  zhandle_t *zh,               // 會話句柄  const char *path,            // 父節點路徑  int watch,                   // 是否注冊監視器  struct String_vector *children, // 輸出子節點列表  struct Stat *stat            // 輸出父節點元數據(可置NULL)  
);  

異步節點操作

zookeeper提供了異步節點操作

所有?zoo_a?前綴函數(如?zoo_acreate)都是異步節點操作

  • 非阻塞模型:調用后立即返回,實際操作由后臺線程完成。

  • 回調觸發:操作完成時,ZooKeeper 客戶端線程會調用注冊的回調函數(在?zookeeper_interest()?或?zookeeper_process()?調用線程中執行)。

  • 上下文傳遞:通過?const void *data?參數傳遞用戶自定義上下文(如結構體指針),實現異步狀態管理。

// === 1. 節點創建與刪除 ===
int zoo_acreate(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑const char *value,             // 節點數據int valuelen,                  // 數據長度const struct ACL_vector *acl,  // ACL權限int flags,                     // 節點標志(如ZOO_EPHEMERAL)string_completion_t completion, // 回調函數const void *data,               // 用戶上下文char *path_buffer,             // 輸出實際路徑(用于順序節點)int path_buffer_len            // 緩沖區長度
);int zoo_adelete(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑int version,                   // 版本號(-1忽略版本檢查)void_completion_t completion,  // 回調函數const void *data               // 用戶上下文
);// === 2. 節點數據讀寫 ===
int zoo_aget(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑int watch,                     // 是否注冊監視器data_completion_t completion,  // 回調函數const void *data               // 用戶上下文
);int zoo_aset(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑const char *buffer,            // 新數據int buflen,                    // 數據長度int version,                   // 版本號(-1忽略版本檢查)stat_completion_t completion,  // 回調函數const void *data               // 用戶上下文
);// === 3. 節點元數據與子節點操作 ===
int zoo_aexists(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑int watch,                     // 是否注冊監視器stat_completion_t completion,  // 回調函數const void *data               // 用戶上下文
);int zoo_aget_children(zhandle_t *zh,                 // 會話句柄const char *path,              // 節點路徑int watch,                     // 是否注冊監視器strings_completion_t completion, // 回調函數const void *data               // 用戶上下文
);

回調函數類型

(1) 通用無返回值回調 -?void_completion_t

處理無返回數據的操作(如?zoo_adeletezoo_aset

typedef void (*void_completion_t)(int rc, const void *data);  rc		操作結果錯誤碼(同同步 API 錯誤碼,如 ZOK、ZNONODE)
data	用戶調用異步 API 時傳入的上下文指針(如對象指針、請求 ID 等)
void delete_callback(int rc, const void *data) {  const char *req_id = (const char *)data;  if (rc == ZOK) {  printf("[%s] 節點刪除成功\n", req_id);  } else {  fprintf(stderr, "[%s] 刪除失敗: %s\n", req_id, zerror(rc));  }  
}  zoo_adelete(zh, "/path/to/node", -1, delete_callback, "request_123");  

(2) 返回 Stat 元數據的回調 -?stat_completion_t

處理需返回節點元數據的操作(如?zoo_aexists

typedef void (*stat_completion_t)(int rc, const struct Stat *stat, const void *data);  stat   節點元數據指針(內存由 ZooKeeper 管理,回調結束后失效)
void exists_callback(int rc, const Stat *stat, const void *data) {  if (rc == ZOK) {  printf("節點存在,最新版本: %d\n", stat->version);  } else if (rc == ZNONODE) {  printf("節點不存在\n");  }  
}  zoo_aexists(zh, "/path/to/node", 1, exists_callback, NULL);  

(3) 返回節點數據的回調 -?data_completion_t

處理需返回節點數據的操作(如?zoo_aget

typedef void (*data_completion_t)(int rc, const char *value, int valuelen,  const Stat *stat, const void *data);  value	    節點數據指針(由 ZooKeeper 管理,回調結束后失效,需立即拷貝數據)
valuelen	數據實際長度(若 value=NULL,則 valuelen=-1)
stat指向節點元數據的結構體包含節點的所有狀態信息(如版本號、創建時間等),由 ZooKeeper 服務器返回。
data 用戶自定義上下文指針,從異步API調用處原樣傳遞到回調函數,用于跨請求傳遞狀態。
void data_callback(int rc, const char *value, int valuelen, const Stat *stat, const void *data) {const char *node_path = (const char *)data; // 從上下文獲取節點路徑if (rc == ZOK) {if (valuelen > 0) {// 安全拷貝數據(value可能不含終止符)char *data_copy = (char *)malloc(valuelen + 1);memcpy(data_copy, value, valuelen);data_copy[valuelen] = '\0'; // 手動添加終止符printf("[成功] 節點 %s 數據: %s\n", node_path, data_copy);printf("數據版本: %d\n", stat->version);free(data_copy); // 必須釋放拷貝的內存} else {printf("[成功] 節點 %s 數據為空\n", node_path);}} else {fprintf(stderr, "[失敗] 讀取節點 %s 錯誤: %s\n", node_path, zerror(rc));}
}int ret = zoo_aget(zh, node_path, 1, data_callback, node_path);

(4) 返回子節點列表的回調 -?strings_completion_t

處理返回子節點列表的操作(如?zoo_aget_children

typedef void (*strings_completion_t)(int rc, const String_vector *strings,  const void *data);  strings	const 子節點名稱數組(由 ZooKeeper 管理,回調結束后失效,需深拷貝數據)typedef struct String_vector {  int32_t count;      // 子節點數量  char **data;        // 子節點名稱數組(每個元素為以 '\0' 結尾的字符串)  
} String_vector; 
void children_callback(int rc, const String_vector *strings,  const void *data) {  if (rc == ZOK) {  for (int i = 0; i < strings->count; i++) {  printf("子節點 %d: %s\n", i, strings->data[i]);  }  }  
}  zoo_aget_children(zh, "/parent", 1, children_callback, NULL);  

Watcher節點操作

通過?zoo_wexistszoo_wget?等帶?w?前綴的 API 注冊。監聽特定節點的特定事件類型。

這類函數在同步執行ZooKeeper節點操作(如檢查存在、獲取數據、列出子節點)的同時,自動注冊Watcher監聽器,當指定節點發生特定變更(如數據修改、子節點增減、節點創建/刪除)時,ZooKeeper會通過回調函數實時通知客戶端,實現事件驅動的響應機制。所有Watcher均為一次性觸發,觸發后需重新注冊,適合用于配置熱更新、服務發現等需要實時感知節點變化的場景。

// 檢查節點是否存在并注冊 Watcher(觸發事件:ZOO_CREATED_EVENT/ZOO_DELETED_EVENT)
int zoo_wexists(zhandle_t *zh,                  /* 會話句柄 */const char *path,               /* 節點路徑 */watcher_fn watcher,             /* 事件回調函數 */void *watcherCtx,               /* 用戶上下文數據 */struct Stat *stat               /* 輸出節點元數據(可選) */
);// 獲取節點數據并注冊 Watcher(觸發事件:ZOO_CHANGED_EVENT/ZOO_DELETED_EVENT)
int zoo_wget(zhandle_t *zh,                  /* 會話句柄 */const char *path,               /* 節點路徑 */watcher_fn watcher,             /* 事件回調函數 */void *watcherCtx,               /* 用戶上下文數據 */char *buffer,                   /* 輸出數據緩沖區 */int *buffer_len,                /* 輸入緩沖區大小/輸出實際數據長度 */struct Stat *stat               /* 輸出節點元數據(可選) */
);// 獲取子節點列表并注冊 Watcher(觸發事件:ZOO_CHILD_EVENT)
int zoo_wget_children(zhandle_t *zh,                  /* 會話句柄 */const char *path,               /* 父節點路徑 */watcher_fn watcher,             /* 事件回調函數 */void *watcherCtx,               /* 用戶上下文數據 */struct String_vector *strings   /* 輸出子節點名稱數組 */
);// 獲取子節點列表及父節點元數據(觸發事件:ZOO_CHILD_EVENT)
int zoo_wget_children2(zhandle_t *zh,                  /* 會話句柄 */const char *path,               /* 父節點路徑 */watcher_fn watcher,             /* 事件回調函數 */void *watcherCtx,               /* 用戶上下文數據 */struct String_vector *strings,  /* 輸出子節點名稱數組 */struct Stat *stat               /* 輸出父節點元數據 */
);// 移除已注冊的 Watcher
int zoo_remove_watches(zhandle_t *zh,                  /* 會話句柄 */const char *path,               /* 節點路徑 */int watcher_type,               /* 監視器類型(ZOO_WATCHER_EVENT等) */watcher_fn watcher,             /* 要移除的回調函數(NULL表示全部) */void *watcherCtx,               /* 要移除的上下文(需匹配注冊值) */int local                       /* 是否僅移除本地未觸發的 Watcher */
);

錯誤處理相關函數

(1) 獲取錯誤碼字符串描述

const char *zerror(int err);  // 將錯誤碼(如ZCONNECTIONLOSS)轉換為可讀字符串

常見錯誤碼

錯誤碼常量觸發場景
ZOK0操作成功
ZNONODE-101節點不存在(zoo_delete/zoo_get?等操作路徑無效)
ZNOAUTH-102無權限操作節點(ACL 限制)
ZBADVERSION-103版本號不匹配(樂觀鎖沖突,如?zoo_set?傳入錯誤 version)
ZNOCHILDRENFOREPHEMERALS-108臨時節點不允許創建子節點
ZNODEEXISTS-110節點已存在(zoo_create?沖突)
ZNOTEMPTY-111節點有子節點(zoo_delete?非空節點)
ZCONNECTIONLOSS-112連接斷開(需檢查會話狀態并重試)
ZOPERATIONTIMEOUT-115操作超時(服務器未響應)
ZINVALIDSTATE-152會話已失效(如調用?zoo_set?時會話過期)

(2) 獲取當前會話狀態

int zoo_state(zhandle_t *zh);  // 返回會話狀態(如ZOO_EXPIRED_SESSION_STATE)

常見狀態碼

狀態碼常量含義
ZOO_EXPIRED_SESSION_STATE-112會話已過期(臨時節點會被刪除,需重新初始化會話)
ZOO_AUTH_FAILED_STATE-113認證失敗(ACL 權限不匹配)
ZOO_CONNECTING_STATE1正在連接服務器
ZOO_ASSOCIATING_STATE2正在協商會話參數(如超時時間)
ZOO_CONNECTED_STATE3已連接(會話正常)
ZOO_READONLY_STATE5只讀模式連接(連接到 Observer 節點)

Zookeeper實戰

配置中心實現

功能描述

  • 將配置存儲在 ZooKeeper 節點中

  • 客戶端啟動時讀取配置

  • 注冊 Watcher 實現配置熱更新

實現原理:

????????這段代碼實現了一個基于 ZooKeeper 的配置中心模塊,其核心功能是從 ZooKeeper 中讀取指定配置節點的內容,并在節點內容變化時自動感知更新。初始化時,ConfigCenter 通過 zookeeper_init 建立與 ZooKeeper 的連接,并通過 zoo_wget 獲取配置節點(如 /app/config)的值,同時注冊一個 watcher 監聽器。一旦該節點被修改(如配置更新)、創建或重建,注冊的 configWatcher 會被觸發,程序自動調用 reloadConfig 重新讀取最新配置,實現配置熱更新。整個機制利用 ZooKeeper 的數據節點和 watcher 通知能力,實現了輕量、實時、自動刷新的分布式配置中心,確保系統在不重啟的前提下可以動態應用配置變更。

// 配置中心類:負責連接 ZooKeeper,讀取并監聽某個配置節點
class ConfigCenter {
public:ConfigCenter(const std::string& hosts, const std::string& configPath): hosts_(hosts), configPath_(configPath), zh_(nullptr) {}~ConfigCenter() {if (zh_) zookeeper_close(zh_);}// 初始化函數:建立連接并讀取配置bool init() {// 建立連接,注冊全局 Watcher(Session 狀態監聽)zh_ = zookeeper_init(hosts_.c_str(), globalWatcher, 3000, nullptr, this, 0);if (!zh_) return false;// 等待連接真正建立成功(阻塞直到狀態變為 CONNECTED)while (zoo_state(zh_) != ZOO_CONNECTED_STATE) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 初次讀取配置內容return reloadConfig();}std::string getConfig() const {return config_;}private:// 全局 Watcher:監聽連接狀態變化(如連接成功、斷開等)static void globalWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_SESSION_EVENT && state == ZOO_CONNECTED_STATE) {std::cout << "連接建立成功\n";}}// 配置節點 Watcher:監聽配置內容變化或節點創建static void configWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {auto self = static_cast<ConfigCenter*>(context);// 節點發生變化,重新加載配置if (type == ZOO_CHANGED_EVENT || type == ZOO_CREATED_EVENT) {std::cout << "配置變更,重新加載...\n";self->reloadConfig();  // 重新讀取配置,并重新注冊 watcher}}// 讀取配置內容,并注冊監聽器bool reloadConfig() {char buffer[1024];             // 存儲配置數據的緩沖區int len = sizeof(buffer);      // 緩沖區大小// 讀取節點內容 + 注冊 watcherint rc = zoo_wget(zh_, configPath_.c_str(), configWatcher, this, buffer, &len, nullptr);if (rc == ZOK) {// 設置本地配置config_ = std::string(buffer, len);std::cout << "當前配置: " << config_ << "\n";return true;}// 錯誤處理std::cerr << "讀取配置失敗: " << zerror(rc) << "\n";return false;}zhandle_t* zh_;            std::string hosts_;       std::string configPath_;   std::string config_;        
};int main() {ConfigCenter config("localhost:2181", "/app/config");if (!config.init()) return 1;// 模擬主程序持續運行,不斷使用配置while (true) {std::cout << "使用配置: " << config.getConfig() << "\n";std::this_thread::sleep_for(std::chrono::seconds(5));}
}

服務注冊與發現

功能描述

  • 服務啟動時注冊臨時節點

  • 客戶端發現所有可用服務

  • 監聽服務列表變化

實現原理:

這段代碼實現了一個基于 ZooKeeper 的服務注冊與發現機制,其核心思想是利用 ZooKeeper 的 臨時節點和 watcher 機制 來動態維護服務列表。服務端調用 registerService 方法,在指定路徑(如 /services)下創建一個臨時節點(如 /services/serviceA),代表該服務在線;如果服務進程宕機或斷線,ZooKeeper 會自動刪除該節點。客戶端通過 startDiscovery 方法調用 zoo_wget_children 獲取子節點列表,并注冊watcher 監聽服務列表變化。當服務上下線時,watcher 被觸發,客戶端自動重新拉取并更新服務緩存。這樣,服務消費者始終能感知當前可用服務的變化,從而實現分布式系統中的動態服務注冊與自動發現。

// ServiceRegistry:封裝 ZooKeeper 服務注冊與發現邏輯
class ServiceRegistry {
public:ServiceRegistry(const std::string& hosts, const std::string& servicePath): hosts_(hosts), servicePath_(servicePath), zh_(nullptr) {}~ServiceRegistry() {if (zh_) zookeeper_close(zh_);  // 釋放連接}// 初始化:建立 ZooKeeper 會話連接bool init() {zh_ = zookeeper_init(hosts_.c_str(), nullptr, 3000, nullptr, nullptr, 0);return zh_ != nullptr;}// 服務端調用:注冊服務(創建臨時節點)bool registerService(const std::string& serviceName) {char pathBuffer[128];  // 用于接收最終創建的路徑int rc = zoo_create(zh_,(servicePath_ + "/" + serviceName).c_str(),  // 例如 /services/serviceAnullptr, -1,&ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL,  // 臨時節點,斷線自動刪除pathBuffer, sizeof(pathBuffer));return rc == ZOK;}// 客戶端調用:獲取當前本地緩存的服務列表std::vector<std::string> discoverServices() {std::lock_guard<std::mutex> lock(mutex_);return services_;}// 客戶端調用:啟動服務發現(并注冊 watcher)bool startDiscovery() {String_vector children;int rc = zoo_wget_children(zh_, servicePath_.c_str(),serviceWatcher, this,&children);if (rc != ZOK) return false;updateServiceList(&children);  // 初始拉取服務列表deallocate_String_vector(&children);  // 釋放 ZooKeeper 內部分配的內存return true;}private:// 子節點變化的 watcher 回調函數static void serviceWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_CHILD_EVENT) {auto self = static_cast<ServiceRegistry*>(context);// 重新注冊 watcher 并更新列表(一次性 watch)String_vector children;zoo_wget_children(zh, self->servicePath_.c_str(),serviceWatcher, self,&children);self->updateServiceList(&children);deallocate_String_vector(&children);}}// 更新服務列表緩存void updateServiceList(String_vector* children) {std::lock_guard<std::mutex> lock(mutex_);services_.clear();for (int i = 0; i < children->count; ++i) {services_.emplace_back(children->data[i]);}std::cout << "服務列表更新: ";for (const auto& s : services_) std::cout << s << " ";std::cout << "\n";}zhandle_t* zh_;                      // ZooKeeper 連接句柄std::string hosts_;                  // 連接地址,如 localhost:2181std::string servicePath_;            // 服務父路徑,如 /servicesstd::vector<std::string> services_;  // 當前發現的服務節點std::mutex mutex_;                   // 保護 services_ 的互斥鎖
};// 服務端進程示例:注冊服務并保持運行
void runService(const std::string& serviceName) {ServiceRegistry registry("localhost:2181", "/services");if (!registry.init()) {std::cerr << "ZooKeeper 連接失敗\n";return;}if (registry.registerService(serviceName)) {std::cout << serviceName << " 注冊成功\n";std::this_thread::sleep_for(std::chrono::minutes(10));  // 模擬長時間運行} else {std::cerr << "注冊服務失敗\n";}
}// 客戶端進程示例:持續監聽服務列表并選擇其中之一使用
void runClient() {ServiceRegistry client("localhost:2181", "/services");if (!client.init() || !client.startDiscovery()) {std::cerr << "客戶端初始化失敗\n";return;}while (true) {auto services = client.discoverServices();if (services.empty()) {std::cout << "無可用服務\n";} else {// 隨機選擇一個服務節點模擬訪問std::cout << "選擇服務: " << services[rand() % services.size()] << "\n";}std::this_thread::sleep_for(std::chrono::seconds(3));  // 模擬請求間隔}
}// 主函數:通過命令行參數區分運行模式
int main(int argc, char** argv) {if (argc < 2) {std::cout << "用法: " << argv[0] << " [server 服務名 | client]\n";return 1;}std::string mode = argv[1];if (mode == "server") {if (argc < 3) {std::cerr << "請提供服務名: " << argv[0] << " server 服務名\n";return 1;}std::string serviceName = argv[2];runService(serviceName);  // 服務端注冊} else if (mode == "client") {runClient();  // 客戶端監聽} else {std::cerr << "未知模式: " << mode << "\n";return 1;}return 0;
}# 啟動一個服務端實例,注冊為 serviceA
./zookeeper_registry server serviceA# 啟動客戶端監聽
./zookeeper_registry client

分布式鎖實現

功能描述

  • 使用順序臨時節點實現公平鎖

  • 監聽前一個節點釋放鎖

  • 實現阻塞和非阻塞兩種獲取方式

實現原理:

????????這段代碼通過 ZooKeeper 實現了一個分布式鎖機制,其核心思路是利用臨時順序節點(EPHEMERAL | SEQUENTIAL) 實現公平競爭與互斥訪問。每個客戶端線程嘗試加鎖時,會在指定目錄下創建一個唯一的臨時順序節點(如 /locks/resource/lock-00000001)。然后獲取該目錄下所有子節點,并判斷自己是否是編號最小的那個節點——如果是,就成功獲得鎖;否則,就找到比自己編號小的“前驅節點”,并設置監聽(watch),當前驅節點被刪除(即鎖被釋放)后,自己再次獲得鎖的機會。鎖釋放的方式是刪除自身創建的節點,ZooKeeper 會自動觸發 watcher 通知下一個等待者。整個機制確保了競爭公平、自動清理(臨時節點)、無單點依賴,是 ZooKeeper 在分布式協調中的經典應用之一。

// 分布式鎖類:通過 ZooKeeper 實現鎖競爭
class DistributedLock {
public:DistributedLock(zhandle_t* zh, const std::string& lockPath): zh_(zh), lockPath_(lockPath), myNode_(""), acquired_(false) {}// 嘗試加鎖bool tryLock() {char pathBuffer[256];// 1. 創建 EPHEMERAL + SEQUENTIAL 節點(臨時順序節點)int rc = zoo_create(zh_, (lockPath_ + "/lock-").c_str(), nullptr, -1, &ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL | ZOO_SEQUENCE,pathBuffer, sizeof(pathBuffer));if (rc != ZOK) return false;myNode_ = pathBuffer;  // 記錄當前節點路徑std::string nodeName = myNode_.substr(myNode_.find_last_of('/') + 1); // 提取 lock-000000xx// 2. 獲取所有子節點(所有競爭者)String_vector children;if (zoo_get_children(zh_, lockPath_.c_str(), 0, &children) != ZOK) return false;// 3. 找到當前目錄下最小的子節點(最早創建的節點)std::string minNode;for (int i = 0; i < children.count; ++i) {if (minNode.empty() || children.data[i] < minNode) {minNode = children.data[i];}}// 4. 若自己是最小節點,獲得鎖if (nodeName == minNode) {acquired_ = true;return true;}// 5. 否則,找到比自己小的前一個節點(前驅節點)std::string prevNode;for (int i = 0; i < children.count; ++i) {if (children.data[i] < nodeName && (prevNode.empty() || children.data[i] > prevNode)) {prevNode = children.data[i];}}if (!prevNode.empty()) {// 6. 監聽前驅節點是否被刪除(代表鎖被釋放)std::atomic<bool> lockReleased(false);std::string prevPath = lockPath_ + "/" + prevNode;zoo_wexists(zh_, prevPath.c_str(), lockWatcher, &lockReleased, nullptr);// 7. 阻塞等待通知(watcher 回調觸發時將 flag 設為 true)while (!lockReleased) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}acquired_ = true;return true;}return false;}// 釋放鎖void unlock() {if (acquired_) {zoo_delete(zh_, myNode_.c_str(), -1);  // 刪除自身臨時節點acquired_ = false;}}private:// watch 回調:前驅節點被刪除時觸發static void lockWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_DELETED_EVENT) {auto* flag = static_cast<std::atomic<bool>*>(context);*flag = true; // 通知等待線程:鎖已釋放}}zhandle_t* zh_;           // ZooKeeper 連接句柄std::string lockPath_;    // 鎖的父路徑(如 /locks/resource)std::string myNode_;      // 自己創建的順序節點路徑bool acquired_;           // 是否獲得鎖標志
};// 模擬臨界區操作(各線程爭奪執行)
void criticalSection(int id) {static std::mutex coutMutex;{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "進程" << id << " 進入臨界區\n";}std::this_thread::sleep_for(std::chrono::seconds(2));{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "進程" << id << " 離開臨界區\n";}
}// 每個線程工作函數:不斷嘗試加鎖 → 執行臨界區 → 解鎖
void worker(zhandle_t* zh, int id) {DistributedLock lock(zh, "/locks/resource");while (true) {if (lock.tryLock()) {criticalSection(id);lock.unlock();}std::this_thread::sleep_for(std::chrono::seconds(1));}
}int main() {// 建立與 ZooKeeper 的連接zhandle_t* zh = zookeeper_init("localhost:2181", nullptr, 3000, nullptr, nullptr, 0);if (!zh) return 1;// 創建鎖父節點(僅創建一次,無需判斷是否已存在)zoo_create(zh, "/locks", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);zoo_create(zh, "/locks/resource", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);// 啟動 3 個客戶端線程,模擬分布式競爭std::thread t1(worker, zh, 1);std::thread t2(worker, zh, 2);std::thread t3(worker, zh, 3);t1.join();t2.join();t3.join();zookeeper_close(zh);return 0;
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913680.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913680.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913680.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【openp2p】學習3:【專利分析】一種基于混合網絡的自適應切換方法、裝 置、設備及介質

本專利與開源項目無關,但可能是實際商用的一種專利。專利地址從此專利,可見p2p的重要性。透傳服務可能是實時轉發服務,提供中繼能力 透傳服務可以是指一種通過公網服務器將數據從第一客戶端傳遞到另一個設備 或客戶端的服務。這種服務通常用于克服網絡中的障礙,如防火墻、…

OpenCV中DPM(Deformable Part Model)目標檢測類cv::dpm::DPMDetector

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 OpenCV 中用于基于可變形部件模型&#xff08;DPM&#xff09; 的目標檢測器&#xff0c;主要用于行人、人臉等目標的檢測。它是一種傳統的基于特…

macOS 26快捷指令更新,融入AI打造智能操作體驗

快捷指令作為Mac系統中提升用戶操作效率的得力助手&#xff0c;在macOS 26中迎來了一次具有突破性的重大更新。此次更新融入了先進的AI技術&#xff0c;推出“智能操作”&#xff08;Intelligent Actions&#xff09;功能&#xff0c;讓快捷指令從簡單的自動化工具升級為真正的…

InstructBLIP:邁向具備指令微調能力的通用視覺語言模型

溫馨提示&#xff1a; 本篇文章已同步至"AI專題精講" InstructBLIP&#xff1a;邁向具備指令微調能力的通用視覺語言模型 摘要 大規模的預訓練與instruction tuning在構建通用語言模型方面已取得顯著成效。然而&#xff0c;構建通用的視覺-語言模型仍然具有挑戰性&…

基于dropbear實現嵌入式系統ssh服務端與客戶端完整交互

以下基于 Dropbear 實現 SSH 服務端與客戶端交互的完整步驟&#xff0c;涵蓋服務端部署、客戶端連接、認證配置及消息傳輸&#xff0c;結合了多篇權威資料的核心實踐&#xff1a;環境準備與安裝 服務端安裝 ? Linux 系統&#xff08;以 Ubuntu/CentOS 為例&#xff09; Ubuntu…

深圳安銳科技發布國內首款4G 索力儀!讓斜拉橋索力自動化監測更精準高效

近日&#xff0c;深圳安銳科技正式發布國內首款無線自供電、一體化的斜拉索實時監測設備 “4G索力監測儀”&#xff0c;成功攻克了傳統橋梁索體監測領域長期存在的實時性差、布設困難和成本高昂的行業難題&#xff0c;為斜拉橋、系桿拱橋提供全無線、自動化、云端實時同步的索力…

Pipeline 引用外部數據源最佳實踐

場景解析在企業網絡安全日志處理場景中&#xff0c;防火墻、入侵檢測系統&#xff08;IDS&#xff09;等設備會持續產生大量日志&#xff0c;記錄網絡流量、訪問請求、異常事件等基礎信息&#xff0c;但這些原始日志僅能呈現表面現象&#xff0c;難以全面剖析安全威脅&#xff…

UI + MCP Client + MCP Server(并且鏈接多個Server)

項目結構前端項目--------->MCP Client----------->MCP Serverserver就不過多贅述了&#xff0c;他只是相當于添加了多個的tools 鏈接前后端 http.createServer創建一個服務器// ---------------------------------------------------------------- // server.js import …

香港站群服務器與普通香港服務器對比

在選擇香港服務器時&#xff0c;用戶常常會遇到"站群服務器"和"普通服務器"兩種選項&#xff0c;雖然它們都基于香港數據中心的基礎設施&#xff0c;但在 IP 地址配置、功能定位和管理復雜度、成本上存在顯著差異&#xff0c;理解這些差異有助于用戶根據實…

4.B樹和B+樹的區別?為什么MySQL選擇B+樹作為索引?

區別&#xff1a;1.數據存儲位置B樹每個節點都存儲了索引和數據B樹只有葉子節點存儲數據&#xff0c;非葉子節點僅存儲索引2.葉子節點的鏈接B樹的所有葉子節點通過指針連接成一個雙向鏈表&#xff0c;可以高效地進行范圍查詢或者順序遍歷B樹則沒有這樣的連接關系&#xff0c;查…

轉換狂魔,Modbus TCP轉Profinet網關打通視覺傳感線連接之路

在汽車零部件沖壓生產線的世界中&#xff0c;液壓機的壓力穩定性是確保產品質量的秘密武器。然而&#xff0c;舊時代的人工巡檢和傳統監測方式卻好似拖累現代化進程的沉重枷鎖&#xff1a;效率低、成本高&#xff0c;還總是趕不上實時反饋的快車。這時&#xff0c;工廠決心大刀…

C++進階—二叉樹進階

第一章&#xff1a;內容安排說明 map和set特性需要先鋪墊二叉搜索樹&#xff0c;而二叉搜索樹也是一種樹形結構二叉搜索樹的特性了解&#xff0c;有助于更好的理解map和set的特性二叉樹中部分面試題稍微有點難度&#xff0c;在前面講解大家不容易接受&#xff0c;且時間長容易…

驅動下一代E/E架構的神經脈絡進化—10BASE-T1S

汽車電子電氣架構的演進正經歷一場深刻的變革&#xff0c;“中央計算單元區域控制器”的架構模式已成為當前主流車型平臺發展的明確方向。這種從傳統的“功能域”&#xff08;Domain&#xff09;架構向“區域”&#xff08;Zonal&#xff09;架構的轉型升級&#xff0c;旨在實現…

某學校系統中挖礦病毒應急排查

本篇文章主要記錄某學校長期未運營維護的程序&#xff0c;被黑客發現了漏洞&#xff0c;但好在學校有全流量設備&#xff0c;抓取到了過程中的流量包 需要你進行上機以及結合流量分析&#xff0c;排查攻擊者利用的漏洞以及上傳利用成功的木馬 文章目錄靶機介紹1.使用工具分析共…

vue 、react前端頁面支持縮放,echarts、地圖點擊左邊不準的原因和解決辦法

原因 由于以上都是通過canvas畫布生成的&#xff0c;一旦初始化&#xff0c;就會按照比例進行縮放&#xff0c;但與此同時&#xff0c;比例尺并沒有變化&#xff0c;導致坐標偏移 解決辦法 設置一個zoomVal產量&#xff0c;在頁面加載時計算縮放比例&#xff0c;然后在canvas容…

(LeetCode 每日一題) 1353. 最多可以參加的會議數目 (優先隊列、小頂堆)

題目&#xff1a;1353. 最多可以參加的會議數目 思路&#xff1a;優先隊列實現小頂堆&#xff0c;0(mx*logn) 在第i天&#xff0c;優先選endDay最小的那一個活動進行。那么遍歷每一天&#xff0c;用小頂堆來維護每個活動的最后一天即可&#xff0c;細節看注釋。 C版本&#xf…

Java結構型模式---代理模式

代理模式基礎概念代理模式是一種結構型設計模式&#xff0c;其核心思想是通過創建一個代理對象來控制對另一個真實對象的訪問。代理對象在客戶端和真實對象之間起到中介作用&#xff0c;允許在不改變真實對象的前提下&#xff0c;對其進行增強或控制。代理模式的核心組件主題接…

MySQL流程控制函數全解析

MySQL 中的流程控制函數&#xff08;也稱為條件函數&#xff09;允許你在 SQL 語句中進行邏輯判斷&#xff0c;根據不同的條件返回不同的值或執行不同的操作。它們極大地增強了 SQL 的靈活性和表達能力&#xff0c;尤其在進行數據轉換、結果格式化、條件聚合和復雜業務邏輯實現…

【7】PostgreSQL 事務

【7】PostgreSQL 事務前言使用事務事務內錯誤處理事務保存點DDL 事務前言 在 PostgreSQL 中&#xff0c;每一個操作都是一個事務。即使一個簡單的查詢(select)&#xff0c;這也是一個事務。 例如&#xff1a; postgres# select now();now --------------------…

Linux:多線程---深入互斥淺談同步

文章目錄1. 互斥1.1 為什么需要互斥1.2 互斥鎖1.3 初談互斥與同步1.4 鎖的原理1.5 可重入VS線程安全1.6 死鎖1.7 避免死鎖的算法&#xff08;擴展&#xff09;序&#xff1a;在上一章中我們知道了線程控制的三個角度&#xff1a;線程創建、線程等待和線程終止&#xff0c;分別從…