構建項目
zk版本:3.5.7,引入4.0.0的curator版本,Curator依賴的版本只能比zookeeper依賴的版本高。
Curator簡單介紹
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等等,現在是Apache的開源項目。
Curator封裝了很多功能(分布式鎖、leader選舉、分布式隊列、共享計數器等等),更加簡單易用
Curator對比zookeeper原生API
- 原生API的超時重連,需要手動操作,而Curator封裝了很多重連策略,自動重連
- 原生API不支持遞歸創建節點,Curator可以遞歸創建節點
- 是對原生API的進一步封裝,功能更多,更容易使用
- Curator 是Fluent的API風格(依賴于方法鏈、提高代碼的易讀性)
pom文件引入
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>curator-zk</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><!-- curator--><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><!-- 日志--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>
log4j.properties(resource目錄下)
### set log levels - for more verbose logging change 'info' to 'debug' ###
log4j.rootLogger=off, stdout
### direct log messages to stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH/:mm/:ss}]-%5p %c(line/:%L) %x-%m%n
建立連接
package com.tang.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;public class CuratorTest {/*** 建立連接*/@Testpublic void testConnect() {//重試策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);//第一種創建方式:通過構造器構造CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 60*1000, 15*1000, retryPolicy);//第二種創建方式:通過builder模式鏈式編程client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(60*1000).connectionTimeoutMs(15*1000).retryPolicy(retryPolicy).namespace("zkProject").build();//開啟連接client.start();}
}
構造參數 | 意義 |
---|---|
connectString | 連接url,集群用逗號分割 ,127.0.0.1:2181,127.0.0.1:2182 |
sessionTimeoutMs | 會話超時時間(建立連接后,會話斷開的重連時間) 單位:毫秒,默認60秒 |
connectionTimeoutMs | 連接超時時間 單位:毫秒,默認15秒 |
retryPolicy | 重試策略 |
namespace | 默認一個根目錄,以后的所有創建都會在此目錄下進行 |
RetryPolicy相關(重試連接策略)
ExponentialBackoffRetry?
特點?:采用指數退避算法來實現重試機制。每次重試失敗后,等待時間間隔會逐漸增加,直到達到最大重試次數或者重試成功為止。這種策略適用于需要逐漸延長重試間隔以減少對系統的沖擊,同時確保最終能夠成功連接的場景。
?實現?:ExponentialBackoffRetry實現了RetryPolicy接口,并提供了一些構造方法來設置重試的參數,如最大重試次數、初始等待時間、最大等待時間等?
RetryNTimes?
特點?:簡單地重試N次,不考慮重試之間的時間間隔。這種策略適用于希望快速重試且對重試次數有明確限制的場景。
?實現?:通過指定重試次數N來實例化該策略,當重試次數達到N時停止重試?
RetryOneTime?
特點?:只重試一次,即如果初次嘗試失敗,則進行一次重試,然后停止。這種策略適用于對重試要求不高的場景,或者作為其他策略的補充。
?注意?:雖然這種策略較為簡單,但在某些情況下可能仍然有用,特別是當希望快速失敗而不是無限重試時。
RetryUntilElapsed?
特點?:在指定的總時間內不斷重試,直到成功或時間耗盡。這種策略適用于對時間有明確要求,且希望在這段時間內盡可能多地嘗試連接的場景。
?實現?:需要指定一個最長等待時間,在這段時間內會不斷嘗試重試,直到成功或時間到期?
RetryForever?
特點?:永遠重試,直到成功為止。這種策略適用于對連接成功有極高要求,且不介意無限重試的場景。
?注意?:在實際應用中應謹慎使用此策略,以避免因無限重試而導致的資源耗盡或其他問題?
添加節點
有四種添加節點的方式:
- 基本創建:create().forPath()
- 創建帶數據的節點:create().forPath(path,value)
- 設置節點類型:create().withMode().forPath()
- 創建多節點(父節點不存在會報錯,創建時需要調用:creatingParentContainersIfNeeded方法)
節點類型是一個CreateMode的枚舉,有以下四種類型:
節點名稱 | 枚舉 | 解釋 |
---|---|---|
持久節點? | Persistent | 節點創建后,會一直保存在ZooKeeper中,直到被明確刪除 |
持久順序節點 | PersistentSequential | 在創建子節點時,ZooKeeper會自動為節點名稱添加一個數字后綴,以保證子節點的創建順序 |
臨時節點 | Ephemeral | 節點的生命周期與創建它的會話綁定在一起,會話結束后,節點也會被自動刪除 |
?臨時順序節點? | EphemeralSequential | 與持久順序節點類似,但節點的生命周期是臨時的,與會話綁定 |
/*** 創建節點*/@Testpublic void create() throws Exception {//1.基本創建//如果創建節點,沒有指定數據,則默認將當前客戶端的ip作為數據存儲client.create().forPath("/test");//2.創建帶數據的節點,如果這里的path還是/test就會報錯client.create().forPath("/test1","test".getBytes());}@Testpublic void testCreate2() throws Exception {// 設置節點類型 (默認:持久化)String path = client.create().withMode(CreateMode.EPHEMERAL) // 設置臨時模式.forPath("/app3");//輸出path為/app3System.out.println(path);}@Testpublic void testCreate3() throws Exception {// 創建多節點String path = client.create().creatingParentContainersIfNeeded() // 父節點不存在,則創建父節點.forPath("/app4/p2");//輸出path為/app4/p2System.out.println(path);}
查詢節點
- 查詢數據:client.getData().forPath()
- 查詢子節點:client.getChildren().forPath()
- 查詢節點信息狀態: client.getData().storingStatIn(stat).forPath()
/*** 查詢節點* @throws Exception*/@Testpublic void testGet() throws Exception {//1.查詢數據:getbyte[] data = client.getData().forPath("/app1");System.out.println("1.查詢數據:"+new String(data));// 2.查詢子節點:lsList<String> path = client.getChildren().forPath("/app4");System.out.println("2."+path);//3.查詢節點狀態信息:ls -sStat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println("3."+stat.toString());}
修改節點
- 基本數據修改: client.setData().forPath()
- 根據版本修改:client.setData().withVersion().forPath()
一般使用第二種修改方式,version 是通過查詢出來的,目的是為了讓其他客戶端不干擾我修改(原子性操作)
/*** 1.基本修改數據:.setData().forPath()* * @throws Exception*/@Testpublic void testSet() throws Exception {client.setData().forPath("/test","MarryChristmas".getBytes());}/***2.根據版本修改* version 是通過查詢出來的,目的是為了讓其他客戶端不干擾我(原子性操作)* @throws Exception*/@Testpublic void testSetForVersion() throws Exception {int version = 0;Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/test");version = stat.getVersion();client.setData().withVersion(version).forPath("/test","HappyNewYear".getBytes());}
刪除節點
- 刪除單個節點:client.delete().forPath()
- 刪除帶有子節點的節點: client.delete().deletingChildrenIfNeeded().forPath()
- 必須成功的刪除:為了防止網絡波動,本質就是重試:client.delete().guaranteed().forPath()
- 回調:inBackground()
/*** 刪除節點* 1.刪除單個節點* 2.刪除帶有子節點的節點* 3.必須成功的刪除:為了防止網絡抖動,本質就是重試* 4.回掉* @throws Exception*/@Testpublic void testDelete() throws Exception {client.delete().forPath("/test1");client.delete().deletingChildrenIfNeeded().forPath("/app4");client.delete().guaranteed().forPath("/app4");client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println("水逆退散!");}}).forPath("/test");}
完整代碼:
package com.tang.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.List;public class CuratorTest {private CuratorFramework client;/*** 建立連接*/@Before//@Before表示在測試方法執行前執行@Testpublic void testConnect() {//重試策略ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);//第一種創建方式:通過構造器構造client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 60*1000, 15*1000, retryPolicy);//第二種創建方式:通過builder模式鏈式編程client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(60*1000).connectionTimeoutMs(15*1000).retryPolicy(retryPolicy).namespace("zkProject").build();//開啟連接client.start();}/*** 創建節點*/@Testpublic void create() throws Exception {//1.基本創建//如果創建節點,沒有指定數據,則默認將當前客戶端的ip作為數據存儲client.create().forPath("/test");//2.創建帶數據的節點,如果這里的path還是/test就會報錯client.create().forPath("/test1","test".getBytes());}@Testpublic void testCreate2() throws Exception {// 設置節點類型 (默認:持久化)String path = client.create().withMode(CreateMode.EPHEMERAL) // 設置臨時模式.forPath("/app3");//輸出path為/app3System.out.println(path);}@Testpublic void testCreate3() throws Exception {// 創建多節點String path = client.create().creatingParentContainersIfNeeded() // 父節點不存在,則創建父節點.forPath("/app4/p2");//輸出path為/app4/p2System.out.println(path);}/*** 查詢節點* @throws Exception*/@Testpublic void testGet() throws Exception {//1.查詢數據:getbyte[] data = client.getData().forPath("/app1");System.out.println("1.查詢數據:"+new String(data));// 2.查詢子節點:lsList<String> path = client.getChildren().forPath("/app4");System.out.println("2."+path);//3.查詢節點狀態信息:ls -sStat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println("3."+stat.toString());}/*** 1.基本修改數據:.setData().forPath()** @throws Exception*/@Testpublic void testSet() throws Exception {client.setData().forPath("/test","MarryChristmas".getBytes());}/***2.根據版本修改* version 是通過查詢出來的,目的是為了讓其他客戶端不干擾我(原子性操作)* @throws Exception*/@Testpublic void testSetForVersion() throws Exception {int version = 0;Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/test");version = stat.getVersion();client.setData().withVersion(version).forPath("/test","HappyNewYear".getBytes());}/*** 刪除節點* 1.刪除單個節點* 2.刪除帶有子節點的節點* 3.必須成功的刪除:為了防止網絡抖動,本質就是重試* 4.回掉* @throws Exception*/@Testpublic void testDelete() throws Exception {client.delete().forPath("/test1");client.delete().deletingChildrenIfNeeded().forPath("/app4");client.delete().guaranteed().forPath("/app4");client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {System.out.println("水逆退散!");}}).forPath("/test");}@Afterpublic void close() {if (client != null) {client.close();}}
}
參考博客:
https://blog.csdn.net/qq_37774171/article/details/122514318