一、ZooKeeper 客戶端常用命令
1. 啟動與退出
bin/zkCli.sh -server 127.0.0.1:2181 # 連接客戶端 quit # 退出客戶端
2. 節點操作
# 查看子節點 ls / ls -s / ls /app# 查看節點詳細信息 ls2 /app stat /app
# 創建節點 create /node1 "hello" # 持久節點 create -e /node2 "temp" # 臨時節點 create -s /node3 "seq" # 順序節點 create -e -s /node4 "tempSeq" # 臨時順序節點創建node1下的子節點,不能node1和node1一起創建,必須創建了node1才能執行下面的否則報錯 create /node1/node11 "hello1" #子節點
# 獲取/修改數據 get /node1 set /node1 "world"
# 刪除節點 delete /node1 # 刪除無子節點的 deleteall /節點path #刪除帶有子節點的節點 rmr /node1 # 遞歸刪除
3. Watch 監聽
get /node1 true # 監聽數據變化 ls / true # 監聽子節點變化
?? 監聽是一次性的,觸發后失效。
4. ACL 權限控制
getAcl /node1 setAcl /node1 world:anyone:rw
權限:
r
=讀,w
=寫,c
=創建,d
=刪除,a
=管理。
模式:world
(所有人)、auth
、digest
(用戶名密碼)、ip
。
5. 輔助命令
help # 查看幫助 history # 查看歷史命令 redo <id> # 重做歷史命令
二、ZooKeeper Java API 操作
1. 原生 API(org.apache.zookeeper.ZooKeeper)
需要的依賴
<!-- 原生 ZooKeeper 依賴 --> <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.3</version> </dependency>
代碼實現
import org.apache.zookeeper.*;public class ZkDemo {public static void main(String[] args) throws Exception {// 1. 連接 ZooKeeperZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 30000, event -> {System.out.println("收到事件:" + event);});// 2. 創建節點zk.create("/node1", "hello".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);// 3. 獲取節點數據byte[] data = zk.getData("/node1", false, null);System.out.println("節點數據:" + new String(data));// 4. 修改節點數據zk.setData("/node1", "world".getBytes(), -1);// 5. 獲取子節點System.out.println("子節點:" + zk.getChildren("/", false));// 6. 刪除節點zk.delete("/node1", -1);// 7. 關閉連接zk.close();} }
2. Curator 客戶端(推薦,簡化 API)
需要的依賴
<!--curator--> <!-- Curator(如果你要用 Curator 封裝的API) --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version> </dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version> </dependency>
??
curator-framework
和curator-recipes
內部會依賴zookeeper
,所以一般不用單獨引入zookeeper
依賴,除非你要控制 zk 版本。2.1 使用Curator實現增刪改查的api
Curator 方法 ZooKeeper CLI 類似命令 create().forPath("/node")
create /node
getData().forPath("/node")
get /node
getChildren().forPath("/")
ls /
getData().storingStatIn(stat)
ls -s /node
setData().forPath("/node", data)
set /node data
delete().forPath("/node")
delete /node
delete().deletingChildrenIfNeeded()
rmr /node
代碼實現:
public class CuratorTest {private CuratorFramework client; // Curator 客戶端對象/*** 建立連接*/@Beforepublic void testConnect() {/*** @param connectString 連接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs 會話超時時間 單位ms* @param connectionTimeoutMs 連接超時時間 單位ms* @param retryPolicy 重試策略*//* //重試策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一種方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重試策略:初始等待3秒,重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二種方式:通過builder方式構建客戶端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 設置ZK地址.sessionTimeoutMs(60 * 1000) // 會話超時時間.connectionTimeoutMs(15 * 1000) // 連接超時時間.retryPolicy(retryPolicy) // 設置重試策略// namespace("czq")// 設置命名空間,將 "czq" 作為客戶端操作的根路徑// 這樣之后創建節點時無需每次都寫 /czq 前綴// 例如:client.create().forPath("/node11", "hello1".getBytes())// 實際會在 ZooKeeper 中創建 /czq/node11 節點.namespace("czq") .build();//開啟連接client.start();}//==============================create=============================================================================/*** 創建節點:create 持久 臨時 順序 數據* 1. 基本創建 :create().forPath("")* 2. 創建節點 帶有數據:create().forPath("",data)* 3. 設置節點的類型:create().withMode().forPath("",data)* 4. 創建多級節點 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)*/@Testpublic void testCreate() throws Exception {//2. 創建節點 帶有數據//如果創建節點,沒有指定數據,則默認將當前客戶端的ip作為數據存儲String path = client.create().forPath("/app2", "hehe".getBytes());System.out.println(path);}@Testpublic void testCreate2() throws Exception {//1. 基本創建//如果創建節點,沒有指定數據,則默認將當前客戶端的ip作為數據存儲String path = client.create().forPath("/app1");System.out.println(path);}@Testpublic void testCreate3() throws Exception {//3. 設置節點的類型//默認類型:持久化String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");System.out.println(path);}@Testpublic void testCreate4() throws Exception {//4. 創建多級節點 /app1/p1//creatingParentsIfNeeded():如果父節點不存在,則創建父節點String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(path);}//===========================get================================================================================/*** 查詢節點:* 1. 查詢數據:get: getData().forPath()* 2. 查詢子節點: ls /: getChildren().forPath()* 3. 查詢節點狀態信息:ls -s /:getData().storingStatIn(狀態對象).forPath()*/@Testpublic void testGet1() throws Exception {//1. 查詢數據:getbyte[] data = client.getData().forPath("/app1");System.out.println(new String(data));}@Testpublic void testGet2() throws Exception {// 2. 查詢子節點: ls /List<String> path = client.getChildren().forPath("/");System.out.println(path);}@Testpublic void testGet3() throws Exception {Stat status = new Stat(); // 狀態對象,用來存儲節點元信息System.out.println(status);//3. 查詢節點狀態信息:ls -s ///.storingStatIn(status)代表存儲狀態信息到status對象中client.getData().storingStatIn(status).forPath("/app1");System.out.println(status);}//===========================set================================================================================/*** 修改數據* 1. 基本修改數據:setData().forPath()* 2. 根據版本修改: setData().withVersion().forPath()* * version 是通過查詢出來的。目的就是為了讓其他客戶端或者線程不干擾我。** @throws Exception*/@Testpublic void testSet() throws Exception {// 基本修改節點數據client.setData().forPath("/app1", "itcast".getBytes());}@Testpublic void testSetForVersion() throws Exception {Stat status = new Stat();//3. 查詢節點狀態信息:ls -s//.storingStatIn(status)代表存儲狀態信息到status對象中client.getData().storingStatIn(status).forPath("/app1");int version = status.getVersion();//查詢出來的節點版本System.out.println(version);// 根據版本修改節點數據,保證并發安全client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());}//===========================delete================================================================================/*** 刪除節點: delete deleteall* 1. 刪除單個節點:delete().forPath("/app1");* 2. 刪除帶有子節點的節點:delete().deletingChildrenIfNeeded().forPath("/app1");* 3. 必須成功的刪除:為了防止網絡抖動。本質就是重試。 client.delete().guaranteed().forPath("/app2");* 4. 回調:inBackground* @throws Exception*/@Testpublic void testDelete() throws Exception {// 1. 刪除單個節點client.delete().forPath("/app1");}@Testpublic void testDelete2() throws Exception {//2. 刪除帶有子節點的節點client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void testDelete3() throws Exception {//3. 必須成功的刪除(自動重試保證刪除成功)client.delete().guaranteed().forPath("/app2");}@Testpublic void testDelete4() throws Exception {//4. 回調異步刪除client.delete().guaranteed().inBackground(new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("我被刪除了~");System.out.println(event);}}).forPath("/app1");}@Afterpublic void close() {// 關閉客戶端連接if (client != null) {client.close();}}}
2.2 使用Curator實現Watch事件監聽的api
(1)Curator 2.x/4.x 常見的寫法
使用者三個類(
NodeCache
、PathChildrenCache
、TreeCache
)
NodeCache
(監聽一個節點自己)
PathChildrenCache
(監聽某個節點的直接子節點)
TreeCache
(監聽某個節點和所有子節點)????????從 Curator 5.x 開始,這三個類(
NodeCache
、PathChildrenCache
、TreeCache
)已經被 統一棄用,官方推薦用CuratorCache
來代替。
監聽器 監聽范圍 典型應用場景 NodeCache
單個節點的數據變化 監聽配置節點變化 PathChildrenCache
子節點的增刪改,不監聽本節點 監聽服務節點上下線 TreeCache
節點及其所有子節點 全量配置或服務樹監控 代碼實現? ??
public class CuratorWatcherTest {private CuratorFramework client; // Curator 客戶端對象/*** 建立連接*/@Beforepublic void testConnect() {/*** @param connectString 連接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs 會話超時時間 單位ms* @param connectionTimeoutMs 連接超時時間 單位ms* @param retryPolicy 重試策略*//* //重試策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一種方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重試策略:初始等待3秒,重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二種方式:通過builder方式構建客戶端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 設置ZK地址.sessionTimeoutMs(60 * 1000) // 會話超時時間.connectionTimeoutMs(15 * 1000) // 連接超時時間.retryPolicy(retryPolicy) // 設置重試策略// namespace("czq")// 設置命名空間,將 "czq" 作為客戶端操作的根路徑// 這樣之后創建節點時無需每次都寫 /czq 前綴// 例如:client.create().forPath("/node11", "hello1".getBytes())// 實際會在 ZooKeeper 中創建 /czq/node11 節點.namespace("czq") .build();//開啟連接client.start();}@Afterpublic void close() {if (client != null) {client.close(); // 關閉客戶端連接}}/*** 演示 NodeCache:給指定一個節點注冊監聽器* NodeCache 只能監聽某個具體節點的數據變化(新增/修改/刪除)*/@Testpublic void testNodeCache() throws Exception {// 1. 創建 NodeCache 對象,監聽 /app1 節點final NodeCache nodeCache = new NodeCache(client,"/app1");// 2. 注冊監聽器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("節點變化了~");// 獲取修改后的節點數據(如果節點被刪除,這里可能會是 null)byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});// 3. 開啟監聽// 參數 true 表示在啟動監聽時,立即加載一次緩存數據nodeCache.start(true);// 阻塞住主線程,保證監聽器一直生效while (true){}}/*** 演示 PathChildrenCache:監聽某個節點的所有子節點* 只能監聽子節點的變化(新增/修改/刪除),不能監聽當前節點本身*/@Testpublic void testPathChildrenCache() throws Exception {// 1. 創建 PathChildrenCache 監聽對象// 參數 true 表示對子節點數據進行緩存PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);// 2. 綁定監聽器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子節點變化了~");System.out.println(event);// 監聽子節點數據變更PathChildrenCacheEvent.Type type = event.getType();if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("數據變了!!!");byte[] data = event.getData().getData();System.out.println(new String(data));}}});// 3. 開啟監聽pathChildrenCache.start();// 阻塞主線程,保證監聽器一直生效while (true){}}/*** 演示 TreeCache:監聽某個節點自己和它的所有子節點* 相當于 NodeCache + PathChildrenCache 的結合體*/@Testpublic void testTreeCache() throws Exception {// 1. 創建 TreeCache 監聽對象TreeCache treeCache = new TreeCache(client,"/app2");// 2. 注冊監聽器treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("節點變化了");System.out.println(event);}});// 3. 開啟監聽treeCache.start();// 阻塞主線程,保證監聽器一直生效while (true){}} }
(2)Curator 5.x以上常見的寫法
?
從 Curator 5.x 開始,這三個類(NodeCache
、PathChildrenCache
、TreeCache
)已經被 統一棄用,官方推薦用CuratorCache
來代替。(新版本)
方法名 對應舊API 監聽范圍 應用場景 testCuratorCacheNode
NodeCache 單節點數據變化 單個配置項 testCuratorCacheChildren
PathChildrenCache 子節點(不含父節點) 服務注冊/發現 testCuratorCacheTree
TreeCache 節點 + 全部子節點 配置中心、全量監控 代碼實現?
public class CuratorWatcherTest {// Curator 客戶端對象,用于操作 ZooKeeperprivate CuratorFramework client;/*** 建立連接*/@Beforepublic void testConnect() {/*** @param connectString 連接字符串。zk server 地址和端口 "127.0.0.1:2181,127.0.0.1:2181"* @param sessionTimeoutMs 會話超時時間 單位ms* @param connectionTimeoutMs 連接超時時間 單位ms* @param retryPolicy 重試策略*//* //重試策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);//1.第一種方式CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",60 * 1000, 15 * 1000, retryPolicy);*///重試策略:初始等待3秒,重試10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);//2.第二種方式:通過builder方式構建客戶端//CuratorFrameworkFactory.builder();client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") // 設置ZK地址.sessionTimeoutMs(60 * 1000) // 會話超時時間.connectionTimeoutMs(15 * 1000) // 連接超時時間.retryPolicy(retryPolicy) // 設置重試策略// namespace("czq")// 設置命名空間,將 "czq" 作為客戶端操作的根路徑// 這樣之后創建節點時無需每次都寫 /czq 前綴// 例如:client.create().forPath("/node11", "hello1".getBytes())// 實際會在 ZooKeeper 中創建 /czq/node11 節點.namespace("czq") .build();//開啟連接client.start();}@Afterpublic void close() {// 測試完成后關閉客戶端,釋放資源if (client != null) {client.close();}}// ==============================替代 NodeCache=============================================================================/*** 1. 監聽單個節點(替代 NodeCache)* 使用 CuratorCache + CuratorCacheListener 來代替舊的 NodeCache*/@Testpublic void testCuratorCacheNode() throws Exception {// 創建 CuratorCache,監聽 /app1 節點CuratorCache cache = CuratorCache.build(client, "/app1");// 定義監聽器,使用 forNodeCache 模式,只監聽該節點數據變化CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new Runnable() {@Overridepublic void run() {System.out.println("節點變化了~");try {// 獲取節點最新數據并打印byte[] data = client.getData().forPath("/app1");System.out.println("最新數據:" + new String(data));} catch (Exception e) {e.printStackTrace();}}}).build();// 將監聽器綁定到緩存cache.listenable().addListener(listener);// 開啟緩存(監聽)cache.start();// 阻塞主線程,保證監聽器一直運行Thread.sleep(Long.MAX_VALUE);}// ==============================替代 PathChildrenCache=============================================================================/*** 2. 監聽子節點變化(替代 PathChildrenCache)* 使用 CuratorCache + PathChildrenCacheListener 監聽某節點的所有子節點*/@Testpublic void testCuratorCacheChildren() throws Exception {// 創建 CuratorCache,監聽 /app2 節點的子節點CuratorCache cache = CuratorCache.build(client, "/app2");// 定義監聽器,forPathChildrenCache 表示只監聽子節點的變化CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache("/app2", client, new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("子節點變化了~");System.out.println("類型:" + event.getType());if (event.getData() != null) {// 打印節點路徑和數據System.out.println("節點:" + event.getData().getPath());System.out.println("數據:" + new String(event.getData().getData()));}}}).build();// 將監聽器綁定到緩存cache.listenable().addListener(listener);// 開啟緩存cache.start();// 阻塞主線程,保證監聽器一直運行Thread.sleep(Long.MAX_VALUE);}// ==============================替代 TreeCache=============================================================================/*** 3. 監聽節點及其所有子節點(替代 TreeCache)* 使用 CuratorCache + TreeCacheListener 監聽整個節點樹*/@Testpublic void testCuratorCacheTree() throws Exception {// 創建 CuratorCache,監聽 /app2 節點及其子節點CuratorCache cache = CuratorCache.build(client, "/app2");// 定義監聽器,forTreeCache 表示節點本身和子節點都監聽CuratorCacheListener listener = CuratorCacheListener.builder().forTreeCache(client, new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("樹節點變化了~");System.out.println("類型:" + event.getType());if (event.getData() != null) {// 打印節點路徑和數據System.out.println("節點:" + event.getData().getPath());System.out.println("數據:" + new String(event.getData().getData()));}}}).build();// 將監聽器綁定到緩存cache.listenable().addListener(listener);// 開啟緩存cache.start();// 阻塞主線程,保證監聽器一直運行Thread.sleep(Long.MAX_VALUE);}}
2.3 實現分布式鎖
?這里不做過多講解詳細去看我的另一篇博客:分布式微服務--ZooKeeper作為分布式鎖-CSDN博客