ZooKeeper C 客戶端簡介與安裝
ZooKeeper C API 簡介
ZooKeeper 官方提供了多語言客戶端,C 語言客戶端是最底層的實現之一,功能全面且穩定,適合嵌入式開發、系統級組件、C++ 項目集成等場景。
zookeeper.h 是 ZooKeeper 提供的 C 語言客戶端頭文件,包含了所有 API 函數、常量定義與結構體聲明。
libzookeeper_mt.so 是其線程安全版本的動態鏈接庫,支持多線程調用。
C API 底層通過 異步非阻塞機制 + 回調函數 + Watcher 模型 與服務端交互,結構靈活但稍顯底層,因此常用于構建:
高性能系統組件(如 RPC 框架、服務注冊中心)
嵌入式服務發現模塊
自定義封裝(例如封裝為 C++ 類)
文件 | 說明 |
---|---|
zookeeper.h | C API 頭文件(函數聲明、狀態碼、結構體) |
libzookeeper_mt.so | 多線程安全版動態鏈接庫 |
libzookeeper_st.so | 單線程版本(適用于無并發環境) |
?安裝 ZooKeeper C 客戶端開發庫
方法一:通過包管理器安裝
sudo apt update
sudo apt install libzookeeper-mt-dev
安裝完成后,關鍵文件位置如下:
文件 | 路徑 |
---|---|
頭文件 | /usr/include/zookeeper/zookeeper.h |
動態庫 | /usr/lib/x86_64-linux-gnu/libzookeeper_mt.so |
pkg-config 支持 | 提供 .pc 文件可用于 CMake 自動識別 |
方法二:從源碼編譯安裝
下載安裝包
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4.tar.gz
tar -zxvf apache-zookeeper-3.8.4.tar.gz
cd apache-zookeeper-3.8.4編譯 C 客戶端庫
cd zookeeper-client/zookeeper-client-c
mkdir build && cd build
cmake ..
make -j4
sudo make install
驗證安裝成功:
pkg-config --cflags --libs zookeeper輸出:
-I/usr/include/zookeeper -lzookeeper_mt
安裝完成后,你就可以在 CMakeLists.txt
中這樣鏈接 ZooKeeper:
# 查找并加載 PkgConfig 模塊,這是使用 pkg-config 工具的前提條件。
# 如果系統中沒有找到 pkg-config,CMake 將會報錯并停止配置。
find_package(PkgConfig REQUIRED)# 使用 pkg_check_modules 函數通過 pkg-config 查找名為 "zookeeper" 的庫。
# ZK 是變量前綴,相關的信息會被存儲在以 ZK_ 開頭的變量中:
# - ZK_INCLUDE_DIRS:頭文件目錄
# - ZK_LIBRARY_DIRS:庫文件目錄
# - ZK_LIBRARIES:需要鏈接的庫名稱列表
# REQUIRED 表示這是一個必須存在的庫,如果找不到會報錯。
pkg_check_modules(ZK REQUIRED zookeeper)# 將 ZooKeeper 的頭文件路徑添加到編譯器的包含路徑中,
# 這樣在編譯時就可以找到 #include <zookeeper/zookeeper.h> 等頭文件。
include_directories(${ZK_INCLUDE_DIRS})# 將 ZooKeeper 的庫文件路徑添加到鏈接器的搜索路徑中,
# 這樣鏈接器才能找到對應的 .so 或 .a 文件。
link_directories(${ZK_LIBRARY_DIRS})# 將 ZooKeeper 庫鏈接到你的目標可執行文件或庫(my_target)上。
# ${ZK_LIBRARIES} 包含了所有需要鏈接的庫名稱(例如 -lzookeeper)。
target_link_libraries(my_target ${ZK_LIBRARIES})
ZooKeeper.h常用API介紹
會話管理函數
1.?初始化會話 -?zookeeper_init
創建與 ZooKeeper 集群的會話連接,返回會話句柄。
zhandle_t *zookeeper_init( const char *host, // ZooKeeper 服務器地址列表(逗號分隔,如 "host1:2181,host2:2181") watcher_fn fn, // 全局監視器回調函數(會話事件通知) int recv_timeout, // 會話超時時間(毫秒,建議 ≥ 2000) const clientid_t *clientid, // 先前會話的 clientid(斷連重試用,首次傳 NULL) void *context, // 用戶自定義上下文(傳遞到監視器回調) int flags // 保留參數(必須為 0)
);
返回值:
成功:指向?
zhandle_t
?的指針(會話句柄)失敗:
NULL
(需檢查?errno
?或調用?zoo_state(zh)
?獲取錯誤狀態)
說明:
會話超時:
recv_timeout
?是 ZooKeeper 服務器判斷客戶端存活的心跳間隔,超時后會話失效,臨時節點會被刪除。clientid:用于斷線重連時恢復會話(通過?
zoo_client_id(zh)
?獲取當前會話的?clientid
)。
2.?關閉會話 -?zookeeper_close
主動關閉會話并釋放資源,所有臨時節點會被自動刪除。
int zookeeper_close(zhandle_t *zh);
參數:
zh
:zookeeper_init
?返回的會話句柄。
返回值:
ZOK
?(0):關閉成功ZBADARGUMENTS
?(-8):無效句柄
注意:
線程安全:必須在所有異步操作完成后再調用?
zookeeper_close
,否則可能導致內存泄漏。資源釋放:關閉后不可再使用該句柄發起任何操作。
臨時節點:會話關閉后,其創建的所有臨時節點(
ZOO_EPHEMERAL
)會被服務器自動刪除。
會話生命周期輔助函數
1.?獲取會話狀態 -?zoo_state
查詢當前會話的連接狀態(如是否已連接、認證失敗等)。
int zoo_state(zhandle_t *zh);
返回值:
ZOO_CONNECTED_STATE
:已連接ZOO_EXPIRED_SESSION_STATE
:會話已過期ZOO_AUTH_FAILED_STATE
:認證失敗
2.?獲取會話 ID -?zoo_client_id
獲取當前會話的?clientid
,用于斷線重連時恢復會話。
clientid_t *zoo_client_id(zhandle_t *zh); clientid_t *id = zoo_client_id(zh);
zhandle_t *new_zh = zookeeper_init(..., id, ...); // 復用舊會話
監視器(Watcher)機制
Watcher 是 ZooKeeper 提供的事件通知系統,允許客戶端在以下場景中接收異步通知:
節點變更:數據修改、子節點增減、節點創建/刪除
會話狀態變化:連接建立、會話過期、認證失敗
其核心設計類似于發布-訂閱模型,但具有一次性觸發和輕量級的特點。
一次性觸發
Watcher 被觸發后立即自動注銷,后續變更不會通知。避免高頻事件風暴(例如頻繁數據變更導致大量通知)。
// 注冊 Watcher 監聽節點數據變化
zoo_wget(zh, "/config", watcher_cb, ctx, buffer, &len, NULL);// 當 /config 數據第一次變更時,watcher_cb 被觸發
// 后續 /config 再次變更不會觸發回調,除非重新注冊
事件類型與路徑綁定
每個Watcher 關聯到特定路徑和特定事件類型(如數據變更、子節點變更),通知中會包含觸發事件的路徑,方便多節點監聽時區分來源。Watcher 回調僅告知事件類型和節點路徑,不包含具體數據變更內容,需客戶端主動查詢最新狀態(如觸發后調用?zoo_get
)。
Watcher工作原理
全局監視器原型 -?watcher_fn
處理會話事件(連接狀態變化)和節點事件(數據變更、子節點變更)。
typedef void (*watcher_fn)( zhandle_t *zh, // 會話句柄 int type, // 事件類型(如 ZOO_SESSION_EVENT) int state, // 連接狀態(如 ZOO_CONNECTED_STATE) const char *path, // 觸發事件的節點路徑(會話事件為 NULL) void *watcherCtx // 用戶設置的上下文
);
type=會話事件(ZOO_SESSION_EVENT
)時會檢查state去判斷具體的連接狀態
狀態值 | 含義 | 處理建議 |
---|---|---|
ZOO_CONNECTED_STATE | 連接已建立 | 恢復業務操作 |
ZOO_EXPIRED_SESSION_STATE | 會話過期(臨時節點已刪除) | 必須重新初始化會話 |
ZOO_AUTH_FAILED_STATE | 認證失敗 | 檢查 ACL 或重新調用?zoo_add_auth |
type=節點事件時(type != ZOO_SESSION_EVENT
)state無意義,只需要關注type和path
事件類型 | 觸發條件 | 典型注冊 API | 常見用途 |
---|---|---|---|
ZOO_CREATED_EVENT | 被監視節點被創建 | zoo_wexists | 等待節點就緒 |
ZOO_DELETED_EVENT | 被監視節點被刪除 | zoo_wexists ?/?zoo_wget | 資源釋放檢查 |
ZOO_CHANGED_EVENT | 被監視節點數據變更 | zoo_wget | 配置熱更新 |
ZOO_CHILD_EVENT | 子節點列表變更 | zoo_wget_children | 服務實例列表維護 |
Watcher 的注冊方式
1. 全局 Watcher
在?zookeeper_init
?初始化會話時設置。僅接收會話狀態變化事件(如連接斷開、會話過期)。
zhandle_t *zh = zookeeper_init("127.0.0.1:2181", global_watcher, 3000, NULL, NULL, 0);void global_watcher(zhandle_t *zh, int type, int state, const char *path, void *ctx) {if (type == ZOO_SESSION_EVENT) {// 處理會話事件}
}
2. 局部 Watcher
通過?zoo_wexists
、zoo_wget
?等帶?w
?前綴的 API 注冊。監聽特定節點的特定事件類型。
// 監聽節點創建/刪除
zoo_wexists(zh, "/service/new", service_watcher, "create_ctx", NULL);// 監聽數據變更
zoo_wget(zh, "/config", data_watcher, "config_ctx", buffer, &len, NULL);
節點操作相關函數
同步節點操作
1.?創建節點 -?zoo_create
同步創建指定路徑的節點,支持臨時(Ephemeral)、順序(Sequential)和持久化類型。
int zoo_create( zhandle_t *zh, // 會話句柄 const char *path, // 節點路徑(如 "/services/node") const char *value, // 節點數據(NULL 表示空數據) int valuelen, // 數據長度(若 value=NULL 則傳 -1) const struct ACL_vector *acl, // 節點權限(如 &ZOO_OPEN_ACL_UNSAFE) int flags, // 節點標志(ZOO_EPHEMERAL | ZOO_SEQUENCE) char *path_buffer, // 輸出實際節點路徑(用于順序節點) int path_buffer_len // 輸出緩沖區長度
);
返回值:
ZOK
?:創建成功ZNODEEXISTS
?:節點已存在ZNOCHILDRENFOREPHEMERALS
?:嘗試在臨時節點下創建子節點ZINVALIDSTATE
?:會話已失效
節點創建標志:
標志常量 | 值 | 作用 |
---|---|---|
ZOO_EPHEMERAL | 1 | 臨時節點(會話結束自動刪除) |
ZOO_SEQUENCE | 2 | 順序節點(路徑自動追加全局唯一序號,如?/lock-0000000001 ) |
ZOO_CONTAINER | 4 | 容器節點(當最后一個子節點被刪除后,服務器會異步刪除該容器節點) |
ZOO_PERSISTENT | 0 | 默認持久化節點(顯式刪除或 ACL 清理前永久存在) |
注意:
flags:
ZOO_EPHEMERAL
:臨時節點(會話結束自動刪除)ZOO_SEQUENCE
:順序節點(路徑自動追加全局唯一序號,如?/node0000000001
)
path_buffer:若創建順序節點,此緩沖區返回實際路徑(需預分配足夠空間)。
2.?刪除節點 -?zoo_delete
同步刪除指定節點(必須無子節點且版本匹配)。
int zoo_delete( zhandle_t *zh, // 會話句柄 const char *path, // 節點路徑 int version // 期望版本號(-1 表示忽略版本檢查)
);
返回值:
ZOK
:刪除成功ZNONODE
:節點不存在ZNOTEMPTY
:節點存在子節點ZBADVERSION
:版本號不匹配
注意事項:
版本控制:傳入?
version=-1
?可跳過版本檢查,否則需與節點當前版本一致。原子性:刪除操作是原子的,若失敗則節點狀態不變。
3.?檢查節點是否存在 -?zoo_exists
同步檢查節點是否存在,并可注冊監視器(Watcher)監聽節點創建/刪除事件。
int zoo_exists( zhandle_t *zh, // 會話句柄 const char *path, // 節點路徑 int watch, // 是否注冊監視器(1=注冊,0=不注冊) struct Stat *stat // 輸出節點元數據(可置 NULL)
);
返回值:
ZOK
:節點存在ZNONODE
:節點不存在ZNOAUTH
?:無權訪問
Stat 結構體字段(部分關鍵字段):
struct Stat { int64_t czxid; // 創建該節點的事務ID int64_t mzxid; // 最后修改的事務ID int64_t ctime; // 創建時間(毫秒 epoch) int32_t version; // 數據版本號 int32_t cversion; // 子節點版本號 int32_t aversion; // ACL版本號 // ... 其他字段省略
};
4.?獲取節點數據 -?zoo_get
同步獲取節點數據及元數據,可注冊監視器監聽數據變更事件。
int zoo_get( zhandle_t *zh, // 會話句柄 const char *path, // 節點路徑 int watch, // 是否注冊監視器(1=注冊,0=不注冊) char *buffer, // 輸出數據緩沖區 int *buffer_len, // 輸入緩沖區長度,輸出實際數據長度 struct Stat *stat // 輸出節點元數據
);
返回值:
ZOK
:獲取成功ZNONODE
:節點不存在ZNOAUTH
:無權訪問
數據讀取注意事項:
緩沖區管理:
調用前需預分配?
buffer
,并通過?buffer_len
?傳入其長度。若緩沖區不足,函數返回?
ZOK
?但?*buffer_len
?會被設為實際所需長度。
數據截斷:若數據超過緩沖區長度,會被靜默截斷(無錯誤提示)。
5.?更新節點數據 -?zoo_set
同步更新節點數據,支持版本檢查。
int zoo_set( zhandle_t *zh, // 會話句柄 const char *path, // 節點路徑 const char *buffer, // 新數據緩沖區 int buflen, // 數據長度(-1 表示以 strlen(buffer) 計算) int version // 期望版本號(-1=忽略版本檢查)
);
返回值:
ZOK
:更新成功ZBADVERSION
?(-103):版本號不匹配ZNONODE
:節點不存在
版本控制邏輯:
每次數據更新后,節點的?
version
?會遞增。若傳入?
version=5
,則僅當節點當前版本為 5 時才會更新。
6. 獲取直接子節點列表 -?zoo_get_children
同步獲取指定節點的所有直接子節點名稱列表,并可選擇注冊監視器監聽直接子節點變更事件(創建/刪除子節點)。
int zoo_get_children( zhandle_t *zh, // 會話句柄 const char *path, // 父節點路徑(如 "/services") int watch, // 是否注冊監視器(1=注冊,0=不注冊) struct String_vector *children // 輸出子節點名稱數組,需手動調用 deallocate_String_vector 釋放
);
返回值:
ZOK
:獲取成功ZNONODE
:父節點不存在ZNOAUTH
:無權訪問父節點ZINVALIDSTATE
:會話已失效
7. 增強版子節點獲取 -?zoo_get_children2
在?zoo_get_children
?基礎上,額外返回父節點的元數據(Stat
?結構體)。
int zoo_get_children2( zhandle_t *zh, // 會話句柄 const char *path, // 父節點路徑 int watch, // 是否注冊監視器 struct String_vector *children, // 輸出子節點列表 struct Stat *stat // 輸出父節點元數據(可置NULL)
);
異步節點操作
zookeeper提供了異步節點操作
所有?zoo_a
?前綴函數(如?zoo_acreate
)都是異步節點操作
非阻塞模型:調用后立即返回,實際操作由后臺線程完成。
回調觸發:操作完成時,ZooKeeper 客戶端線程會調用注冊的回調函數(在?
zookeeper_interest()
?或?zookeeper_process()
?調用線程中執行)。上下文傳遞:通過?
const void *data
?參數傳遞用戶自定義上下文(如結構體指針),實現異步狀態管理。
// === 1. 節點創建與刪除 ===
int zoo_acreate(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑const char *value, // 節點數據int valuelen, // 數據長度const struct ACL_vector *acl, // ACL權限int flags, // 節點標志(如ZOO_EPHEMERAL)string_completion_t completion, // 回調函數const void *data, // 用戶上下文char *path_buffer, // 輸出實際路徑(用于順序節點)int path_buffer_len // 緩沖區長度
);int zoo_adelete(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑int version, // 版本號(-1忽略版本檢查)void_completion_t completion, // 回調函數const void *data // 用戶上下文
);// === 2. 節點數據讀寫 ===
int zoo_aget(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑int watch, // 是否注冊監視器data_completion_t completion, // 回調函數const void *data // 用戶上下文
);int zoo_aset(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑const char *buffer, // 新數據int buflen, // 數據長度int version, // 版本號(-1忽略版本檢查)stat_completion_t completion, // 回調函數const void *data // 用戶上下文
);// === 3. 節點元數據與子節點操作 ===
int zoo_aexists(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑int watch, // 是否注冊監視器stat_completion_t completion, // 回調函數const void *data // 用戶上下文
);int zoo_aget_children(zhandle_t *zh, // 會話句柄const char *path, // 節點路徑int watch, // 是否注冊監視器strings_completion_t completion, // 回調函數const void *data // 用戶上下文
);
回調函數類型
(1) 通用無返回值回調 -?void_completion_t
處理無返回數據的操作(如?zoo_adelete
、zoo_aset
)
typedef void (*void_completion_t)(int rc, const void *data); rc 操作結果錯誤碼(同同步 API 錯誤碼,如 ZOK、ZNONODE)
data 用戶調用異步 API 時傳入的上下文指針(如對象指針、請求 ID 等)
void delete_callback(int rc, const void *data) { const char *req_id = (const char *)data; if (rc == ZOK) { printf("[%s] 節點刪除成功\n", req_id); } else { fprintf(stderr, "[%s] 刪除失敗: %s\n", req_id, zerror(rc)); }
} zoo_adelete(zh, "/path/to/node", -1, delete_callback, "request_123");
(2) 返回 Stat 元數據的回調 -?stat_completion_t
處理需返回節點元數據的操作(如?zoo_aexists
)
typedef void (*stat_completion_t)(int rc, const struct Stat *stat, const void *data); stat 節點元數據指針(內存由 ZooKeeper 管理,回調結束后失效)
void exists_callback(int rc, const Stat *stat, const void *data) { if (rc == ZOK) { printf("節點存在,最新版本: %d\n", stat->version); } else if (rc == ZNONODE) { printf("節點不存在\n"); }
} zoo_aexists(zh, "/path/to/node", 1, exists_callback, NULL);
(3) 返回節點數據的回調 -?data_completion_t
處理需返回節點數據的操作(如?zoo_aget
)
typedef void (*data_completion_t)(int rc, const char *value, int valuelen, const Stat *stat, const void *data); value 節點數據指針(由 ZooKeeper 管理,回調結束后失效,需立即拷貝數據)
valuelen 數據實際長度(若 value=NULL,則 valuelen=-1)
stat指向節點元數據的結構體包含節點的所有狀態信息(如版本號、創建時間等),由 ZooKeeper 服務器返回。
data 用戶自定義上下文指針,從異步API調用處原樣傳遞到回調函數,用于跨請求傳遞狀態。
void data_callback(int rc, const char *value, int valuelen, const Stat *stat, const void *data) {const char *node_path = (const char *)data; // 從上下文獲取節點路徑if (rc == ZOK) {if (valuelen > 0) {// 安全拷貝數據(value可能不含終止符)char *data_copy = (char *)malloc(valuelen + 1);memcpy(data_copy, value, valuelen);data_copy[valuelen] = '\0'; // 手動添加終止符printf("[成功] 節點 %s 數據: %s\n", node_path, data_copy);printf("數據版本: %d\n", stat->version);free(data_copy); // 必須釋放拷貝的內存} else {printf("[成功] 節點 %s 數據為空\n", node_path);}} else {fprintf(stderr, "[失敗] 讀取節點 %s 錯誤: %s\n", node_path, zerror(rc));}
}int ret = zoo_aget(zh, node_path, 1, data_callback, node_path);
(4) 返回子節點列表的回調 -?strings_completion_t
處理返回子節點列表的操作(如?zoo_aget_children
)
typedef void (*strings_completion_t)(int rc, const String_vector *strings, const void *data); strings const 子節點名稱數組(由 ZooKeeper 管理,回調結束后失效,需深拷貝數據)typedef struct String_vector { int32_t count; // 子節點數量 char **data; // 子節點名稱數組(每個元素為以 '\0' 結尾的字符串)
} String_vector;
void children_callback(int rc, const String_vector *strings, const void *data) { if (rc == ZOK) { for (int i = 0; i < strings->count; i++) { printf("子節點 %d: %s\n", i, strings->data[i]); } }
} zoo_aget_children(zh, "/parent", 1, children_callback, NULL);
Watcher節點操作
通過?zoo_wexists
、zoo_wget
?等帶?w
?前綴的 API 注冊。監聽特定節點的特定事件類型。
這類函數在同步執行ZooKeeper節點操作(如檢查存在、獲取數據、列出子節點)的同時,自動注冊Watcher監聽器,當指定節點發生特定變更(如數據修改、子節點增減、節點創建/刪除)時,ZooKeeper會通過回調函數實時通知客戶端,實現事件驅動的響應機制。所有Watcher均為一次性觸發,觸發后需重新注冊,適合用于配置熱更新、服務發現等需要實時感知節點變化的場景。
// 檢查節點是否存在并注冊 Watcher(觸發事件:ZOO_CREATED_EVENT/ZOO_DELETED_EVENT)
int zoo_wexists(zhandle_t *zh, /* 會話句柄 */const char *path, /* 節點路徑 */watcher_fn watcher, /* 事件回調函數 */void *watcherCtx, /* 用戶上下文數據 */struct Stat *stat /* 輸出節點元數據(可選) */
);// 獲取節點數據并注冊 Watcher(觸發事件:ZOO_CHANGED_EVENT/ZOO_DELETED_EVENT)
int zoo_wget(zhandle_t *zh, /* 會話句柄 */const char *path, /* 節點路徑 */watcher_fn watcher, /* 事件回調函數 */void *watcherCtx, /* 用戶上下文數據 */char *buffer, /* 輸出數據緩沖區 */int *buffer_len, /* 輸入緩沖區大小/輸出實際數據長度 */struct Stat *stat /* 輸出節點元數據(可選) */
);// 獲取子節點列表并注冊 Watcher(觸發事件:ZOO_CHILD_EVENT)
int zoo_wget_children(zhandle_t *zh, /* 會話句柄 */const char *path, /* 父節點路徑 */watcher_fn watcher, /* 事件回調函數 */void *watcherCtx, /* 用戶上下文數據 */struct String_vector *strings /* 輸出子節點名稱數組 */
);// 獲取子節點列表及父節點元數據(觸發事件:ZOO_CHILD_EVENT)
int zoo_wget_children2(zhandle_t *zh, /* 會話句柄 */const char *path, /* 父節點路徑 */watcher_fn watcher, /* 事件回調函數 */void *watcherCtx, /* 用戶上下文數據 */struct String_vector *strings, /* 輸出子節點名稱數組 */struct Stat *stat /* 輸出父節點元數據 */
);// 移除已注冊的 Watcher
int zoo_remove_watches(zhandle_t *zh, /* 會話句柄 */const char *path, /* 節點路徑 */int watcher_type, /* 監視器類型(ZOO_WATCHER_EVENT等) */watcher_fn watcher, /* 要移除的回調函數(NULL表示全部) */void *watcherCtx, /* 要移除的上下文(需匹配注冊值) */int local /* 是否僅移除本地未觸發的 Watcher */
);
錯誤處理相關函數
(1) 獲取錯誤碼字符串描述
const char *zerror(int err); // 將錯誤碼(如ZCONNECTIONLOSS)轉換為可讀字符串
常見錯誤碼
錯誤碼常量 | 值 | 觸發場景 |
---|---|---|
ZOK | 0 | 操作成功 |
ZNONODE | -101 | 節點不存在(zoo_delete /zoo_get ?等操作路徑無效) |
ZNOAUTH | -102 | 無權限操作節點(ACL 限制) |
ZBADVERSION | -103 | 版本號不匹配(樂觀鎖沖突,如?zoo_set ?傳入錯誤 version) |
ZNOCHILDRENFOREPHEMERALS | -108 | 臨時節點不允許創建子節點 |
ZNODEEXISTS | -110 | 節點已存在(zoo_create ?沖突) |
ZNOTEMPTY | -111 | 節點有子節點(zoo_delete ?非空節點) |
ZCONNECTIONLOSS | -112 | 連接斷開(需檢查會話狀態并重試) |
ZOPERATIONTIMEOUT | -115 | 操作超時(服務器未響應) |
ZINVALIDSTATE | -152 | 會話已失效(如調用?zoo_set ?時會話過期) |
(2) 獲取當前會話狀態
int zoo_state(zhandle_t *zh); // 返回會話狀態(如ZOO_EXPIRED_SESSION_STATE)
常見狀態碼
狀態碼常量 | 值 | 含義 |
---|---|---|
ZOO_EXPIRED_SESSION_STATE | -112 | 會話已過期(臨時節點會被刪除,需重新初始化會話) |
ZOO_AUTH_FAILED_STATE | -113 | 認證失敗(ACL 權限不匹配) |
ZOO_CONNECTING_STATE | 1 | 正在連接服務器 |
ZOO_ASSOCIATING_STATE | 2 | 正在協商會話參數(如超時時間) |
ZOO_CONNECTED_STATE | 3 | 已連接(會話正常) |
ZOO_READONLY_STATE | 5 | 只讀模式連接(連接到 Observer 節點) |
Zookeeper實戰
配置中心實現
功能描述:
將配置存儲在 ZooKeeper 節點中
客戶端啟動時讀取配置
注冊 Watcher 實現配置熱更新
實現原理:
????????這段代碼實現了一個基于 ZooKeeper 的配置中心模塊,其核心功能是從 ZooKeeper 中讀取指定配置節點的內容,并在節點內容變化時自動感知更新。初始化時,ConfigCenter
通過 zookeeper_init
建立與 ZooKeeper 的連接,并通過 zoo_wget
獲取配置節點(如 /app/config
)的值,同時注冊一個 watcher 監聽器。一旦該節點被修改(如配置更新)、創建或重建,注冊的 configWatcher
會被觸發,程序自動調用 reloadConfig
重新讀取最新配置,實現配置熱更新。整個機制利用 ZooKeeper 的數據節點和 watcher 通知能力,實現了輕量、實時、自動刷新的分布式配置中心,確保系統在不重啟的前提下可以動態應用配置變更。
// 配置中心類:負責連接 ZooKeeper,讀取并監聽某個配置節點
class ConfigCenter {
public:ConfigCenter(const std::string& hosts, const std::string& configPath): hosts_(hosts), configPath_(configPath), zh_(nullptr) {}~ConfigCenter() {if (zh_) zookeeper_close(zh_);}// 初始化函數:建立連接并讀取配置bool init() {// 建立連接,注冊全局 Watcher(Session 狀態監聽)zh_ = zookeeper_init(hosts_.c_str(), globalWatcher, 3000, nullptr, this, 0);if (!zh_) return false;// 等待連接真正建立成功(阻塞直到狀態變為 CONNECTED)while (zoo_state(zh_) != ZOO_CONNECTED_STATE) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 初次讀取配置內容return reloadConfig();}std::string getConfig() const {return config_;}private:// 全局 Watcher:監聽連接狀態變化(如連接成功、斷開等)static void globalWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_SESSION_EVENT && state == ZOO_CONNECTED_STATE) {std::cout << "連接建立成功\n";}}// 配置節點 Watcher:監聽配置內容變化或節點創建static void configWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {auto self = static_cast<ConfigCenter*>(context);// 節點發生變化,重新加載配置if (type == ZOO_CHANGED_EVENT || type == ZOO_CREATED_EVENT) {std::cout << "配置變更,重新加載...\n";self->reloadConfig(); // 重新讀取配置,并重新注冊 watcher}}// 讀取配置內容,并注冊監聽器bool reloadConfig() {char buffer[1024]; // 存儲配置數據的緩沖區int len = sizeof(buffer); // 緩沖區大小// 讀取節點內容 + 注冊 watcherint rc = zoo_wget(zh_, configPath_.c_str(), configWatcher, this, buffer, &len, nullptr);if (rc == ZOK) {// 設置本地配置config_ = std::string(buffer, len);std::cout << "當前配置: " << config_ << "\n";return true;}// 錯誤處理std::cerr << "讀取配置失敗: " << zerror(rc) << "\n";return false;}zhandle_t* zh_; std::string hosts_; std::string configPath_; std::string config_;
};int main() {ConfigCenter config("localhost:2181", "/app/config");if (!config.init()) return 1;// 模擬主程序持續運行,不斷使用配置while (true) {std::cout << "使用配置: " << config.getConfig() << "\n";std::this_thread::sleep_for(std::chrono::seconds(5));}
}
服務注冊與發現
功能描述:
服務啟動時注冊臨時節點
客戶端發現所有可用服務
監聽服務列表變化
實現原理:
這段代碼實現了一個基于 ZooKeeper 的服務注冊與發現機制,其核心思想是利用 ZooKeeper 的 臨時節點和 watcher 機制 來動態維護服務列表。服務端調用 registerService
方法,在指定路徑(如 /services
)下創建一個臨時節點(如 /services/serviceA
),代表該服務在線;如果服務進程宕機或斷線,ZooKeeper 會自動刪除該節點。客戶端通過 startDiscovery
方法調用 zoo_wget_children
獲取子節點列表,并注冊watcher 監聽服務列表變化。當服務上下線時,watcher 被觸發,客戶端自動重新拉取并更新服務緩存。這樣,服務消費者始終能感知當前可用服務的變化,從而實現分布式系統中的動態服務注冊與自動發現。
// ServiceRegistry:封裝 ZooKeeper 服務注冊與發現邏輯
class ServiceRegistry {
public:ServiceRegistry(const std::string& hosts, const std::string& servicePath): hosts_(hosts), servicePath_(servicePath), zh_(nullptr) {}~ServiceRegistry() {if (zh_) zookeeper_close(zh_); // 釋放連接}// 初始化:建立 ZooKeeper 會話連接bool init() {zh_ = zookeeper_init(hosts_.c_str(), nullptr, 3000, nullptr, nullptr, 0);return zh_ != nullptr;}// 服務端調用:注冊服務(創建臨時節點)bool registerService(const std::string& serviceName) {char pathBuffer[128]; // 用于接收最終創建的路徑int rc = zoo_create(zh_,(servicePath_ + "/" + serviceName).c_str(), // 例如 /services/serviceAnullptr, -1,&ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL, // 臨時節點,斷線自動刪除pathBuffer, sizeof(pathBuffer));return rc == ZOK;}// 客戶端調用:獲取當前本地緩存的服務列表std::vector<std::string> discoverServices() {std::lock_guard<std::mutex> lock(mutex_);return services_;}// 客戶端調用:啟動服務發現(并注冊 watcher)bool startDiscovery() {String_vector children;int rc = zoo_wget_children(zh_, servicePath_.c_str(),serviceWatcher, this,&children);if (rc != ZOK) return false;updateServiceList(&children); // 初始拉取服務列表deallocate_String_vector(&children); // 釋放 ZooKeeper 內部分配的內存return true;}private:// 子節點變化的 watcher 回調函數static void serviceWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_CHILD_EVENT) {auto self = static_cast<ServiceRegistry*>(context);// 重新注冊 watcher 并更新列表(一次性 watch)String_vector children;zoo_wget_children(zh, self->servicePath_.c_str(),serviceWatcher, self,&children);self->updateServiceList(&children);deallocate_String_vector(&children);}}// 更新服務列表緩存void updateServiceList(String_vector* children) {std::lock_guard<std::mutex> lock(mutex_);services_.clear();for (int i = 0; i < children->count; ++i) {services_.emplace_back(children->data[i]);}std::cout << "服務列表更新: ";for (const auto& s : services_) std::cout << s << " ";std::cout << "\n";}zhandle_t* zh_; // ZooKeeper 連接句柄std::string hosts_; // 連接地址,如 localhost:2181std::string servicePath_; // 服務父路徑,如 /servicesstd::vector<std::string> services_; // 當前發現的服務節點std::mutex mutex_; // 保護 services_ 的互斥鎖
};// 服務端進程示例:注冊服務并保持運行
void runService(const std::string& serviceName) {ServiceRegistry registry("localhost:2181", "/services");if (!registry.init()) {std::cerr << "ZooKeeper 連接失敗\n";return;}if (registry.registerService(serviceName)) {std::cout << serviceName << " 注冊成功\n";std::this_thread::sleep_for(std::chrono::minutes(10)); // 模擬長時間運行} else {std::cerr << "注冊服務失敗\n";}
}// 客戶端進程示例:持續監聽服務列表并選擇其中之一使用
void runClient() {ServiceRegistry client("localhost:2181", "/services");if (!client.init() || !client.startDiscovery()) {std::cerr << "客戶端初始化失敗\n";return;}while (true) {auto services = client.discoverServices();if (services.empty()) {std::cout << "無可用服務\n";} else {// 隨機選擇一個服務節點模擬訪問std::cout << "選擇服務: " << services[rand() % services.size()] << "\n";}std::this_thread::sleep_for(std::chrono::seconds(3)); // 模擬請求間隔}
}// 主函數:通過命令行參數區分運行模式
int main(int argc, char** argv) {if (argc < 2) {std::cout << "用法: " << argv[0] << " [server 服務名 | client]\n";return 1;}std::string mode = argv[1];if (mode == "server") {if (argc < 3) {std::cerr << "請提供服務名: " << argv[0] << " server 服務名\n";return 1;}std::string serviceName = argv[2];runService(serviceName); // 服務端注冊} else if (mode == "client") {runClient(); // 客戶端監聽} else {std::cerr << "未知模式: " << mode << "\n";return 1;}return 0;
}# 啟動一個服務端實例,注冊為 serviceA
./zookeeper_registry server serviceA# 啟動客戶端監聽
./zookeeper_registry client
分布式鎖實現
功能描述:
使用順序臨時節點實現公平鎖
監聽前一個節點釋放鎖
實現阻塞和非阻塞兩種獲取方式
實現原理:
????????這段代碼通過 ZooKeeper 實現了一個分布式鎖機制,其核心思路是利用臨時順序節點(EPHEMERAL | SEQUENTIAL) 實現公平競爭與互斥訪問。每個客戶端線程嘗試加鎖時,會在指定目錄下創建一個唯一的臨時順序節點(如 /locks/resource/lock-00000001
)。然后獲取該目錄下所有子節點,并判斷自己是否是編號最小的那個節點——如果是,就成功獲得鎖;否則,就找到比自己編號小的“前驅節點”,并設置監聽(watch),當前驅節點被刪除(即鎖被釋放)后,自己再次獲得鎖的機會。鎖釋放的方式是刪除自身創建的節點,ZooKeeper 會自動觸發 watcher 通知下一個等待者。整個機制確保了競爭公平、自動清理(臨時節點)、無單點依賴,是 ZooKeeper 在分布式協調中的經典應用之一。
// 分布式鎖類:通過 ZooKeeper 實現鎖競爭
class DistributedLock {
public:DistributedLock(zhandle_t* zh, const std::string& lockPath): zh_(zh), lockPath_(lockPath), myNode_(""), acquired_(false) {}// 嘗試加鎖bool tryLock() {char pathBuffer[256];// 1. 創建 EPHEMERAL + SEQUENTIAL 節點(臨時順序節點)int rc = zoo_create(zh_, (lockPath_ + "/lock-").c_str(), nullptr, -1, &ZOO_OPEN_ACL_UNSAFE,ZOO_EPHEMERAL | ZOO_SEQUENCE,pathBuffer, sizeof(pathBuffer));if (rc != ZOK) return false;myNode_ = pathBuffer; // 記錄當前節點路徑std::string nodeName = myNode_.substr(myNode_.find_last_of('/') + 1); // 提取 lock-000000xx// 2. 獲取所有子節點(所有競爭者)String_vector children;if (zoo_get_children(zh_, lockPath_.c_str(), 0, &children) != ZOK) return false;// 3. 找到當前目錄下最小的子節點(最早創建的節點)std::string minNode;for (int i = 0; i < children.count; ++i) {if (minNode.empty() || children.data[i] < minNode) {minNode = children.data[i];}}// 4. 若自己是最小節點,獲得鎖if (nodeName == minNode) {acquired_ = true;return true;}// 5. 否則,找到比自己小的前一個節點(前驅節點)std::string prevNode;for (int i = 0; i < children.count; ++i) {if (children.data[i] < nodeName && (prevNode.empty() || children.data[i] > prevNode)) {prevNode = children.data[i];}}if (!prevNode.empty()) {// 6. 監聽前驅節點是否被刪除(代表鎖被釋放)std::atomic<bool> lockReleased(false);std::string prevPath = lockPath_ + "/" + prevNode;zoo_wexists(zh_, prevPath.c_str(), lockWatcher, &lockReleased, nullptr);// 7. 阻塞等待通知(watcher 回調觸發時將 flag 設為 true)while (!lockReleased) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}acquired_ = true;return true;}return false;}// 釋放鎖void unlock() {if (acquired_) {zoo_delete(zh_, myNode_.c_str(), -1); // 刪除自身臨時節點acquired_ = false;}}private:// watch 回調:前驅節點被刪除時觸發static void lockWatcher(zhandle_t* zh, int type, int state, const char* path, void* context) {if (type == ZOO_DELETED_EVENT) {auto* flag = static_cast<std::atomic<bool>*>(context);*flag = true; // 通知等待線程:鎖已釋放}}zhandle_t* zh_; // ZooKeeper 連接句柄std::string lockPath_; // 鎖的父路徑(如 /locks/resource)std::string myNode_; // 自己創建的順序節點路徑bool acquired_; // 是否獲得鎖標志
};// 模擬臨界區操作(各線程爭奪執行)
void criticalSection(int id) {static std::mutex coutMutex;{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "進程" << id << " 進入臨界區\n";}std::this_thread::sleep_for(std::chrono::seconds(2));{std::lock_guard<std::mutex> lock(coutMutex);std::cout << "進程" << id << " 離開臨界區\n";}
}// 每個線程工作函數:不斷嘗試加鎖 → 執行臨界區 → 解鎖
void worker(zhandle_t* zh, int id) {DistributedLock lock(zh, "/locks/resource");while (true) {if (lock.tryLock()) {criticalSection(id);lock.unlock();}std::this_thread::sleep_for(std::chrono::seconds(1));}
}int main() {// 建立與 ZooKeeper 的連接zhandle_t* zh = zookeeper_init("localhost:2181", nullptr, 3000, nullptr, nullptr, 0);if (!zh) return 1;// 創建鎖父節點(僅創建一次,無需判斷是否已存在)zoo_create(zh, "/locks", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);zoo_create(zh, "/locks/resource", nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);// 啟動 3 個客戶端線程,模擬分布式競爭std::thread t1(worker, zh, 1);std::thread t2(worker, zh, 2);std::thread t3(worker, zh, 3);t1.join();t2.join();t3.join();zookeeper_close(zh);return 0;
}