Zookeeper Java 客戶端
項目構建
ookeeper 官方的客戶端沒有和服務端代碼分離,他們為同一個jar 文件,所以我們直接引入
zookeeper的maven即可, 這里版本請保持與服務端版本一致,不然會有很多兼容性的問題
1 <dependency>
2 <groupId>org.apache.zookeeper</groupId>
3 <artifactId>zookeeper</artifactId>
4 <version>3.5.8</version>
5 </dependency>
創建客戶端實例:
為了便于測試,直接在初始化方法中創建zookeeper實例
1 @Slf4j
2 public class ZookeeperClientTest {
3
4 private static final String ZK_ADDRESS="192.168.109.200:2181";
5
6 private static final int SESSION_TIMEOUT = 5000;
7
8 private static ZooKeeper zooKeeper;
9
10 private static final String ZK_NODE="/zk‐node";
11
12
13 @Before
14 public void init() throws IOException, InterruptedException {
15 final CountDownLatch countDownLatch=new CountDownLatch(1);
16 zooKeeper=new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event ‐> {
17 if (event.getState()== Watcher.Event.KeeperState.SyncConnected &&
18 event.getType()== Watcher.Event.EventType.None){
19 countDownLatch.countDown();
20 log.info("連接成功!");
21 }
22 });
23 log.info("連接中....");
24 countDownLatch.await();
25 }
26 }
創建Zookeeper實例的方法:
1 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
2 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientC
onfig)
3 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean c
anBeReadOnly, HostProvider)
4 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean c
anBeReadOnly, HostProvider, ZKClientConfig)
5 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean c
anBeReadOnly)
6 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean c
anBeReadOnly, ZKClientConfig)
7 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byt
e[])
8 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byt
e[], boolean, HostProvider)
9 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byt
e[], boolean, HostProvider, ZKClientConfig)
10 ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, by
te[], boolean)
connectString:
ZooKeeper服務器列表,由英文逗號分開的host:port字符串組成,
每一個都代表一臺ZooKeeper機器,如,
host1:port1,host2:port2,host3:port3。另外,也可以在connectString中設
置客戶端連接上ZooKeeper
后的根目錄,方法是在host:port字符串之后添加上這個根目錄,例
如,host1:port1,host2:port2,host3:port3/zk-base,這樣就指定了該客戶端連
接上ZooKeeper服務器之后,所有對ZooKeeper
的操作,都會基于這個根目錄。例如,客戶端對/sub-node 的操作,最終創建
/zk-node/sub-node, 這個目錄也叫Chroot,即客戶端隔離命名空間。
sessionTimeout?
會話的超時時間,是一個以“毫秒”為單位的整型值。在ZooKeeper中有
會話的概念,在一個會話周期內,ZooKeeper客戶端和服務器之間會通過心跳
檢
測機制來維持會話的有效性,一旦在sessionTimeout時間內沒有進行有效
的心跳檢測,會話就會失效。
watcher
ZooKeeper允許
客戶端在構造方法中傳入一個接口 watcher (org.apache. zookeeper.
Watcher)的實現類對象來作為默認的 Watcher事件通知處理器。當然,該參
數可以設置為null 以表明不需要設置默認的 Watcher處理器。
canBeReadOnly
這是一個boolean類型的參數,用于標識當前會話是否支持“read-only(只
讀)”模式。默認情況下,在ZooKeeper集群中,一個機器如果和集群中過半
及
以上機器失去了網絡連接,那么這個機器將不再處理客戶端請求(包括讀寫請
求)。但是在某些使用場景下,當ZooKeeper服務器發生此類故障的時候,我
們
還是希望ZooKeeper服務器能夠提供讀服務(當然寫服務肯定無法提供)——
這就是 ZooKeeper的“read-only”模式。
sessionId和 ses
sionPasswd
分別代表會話ID和會話秘鑰。這兩個參數能夠唯一確定一個會話,同時客戶
端使用這兩個參數可以實現客戶端會話復用,從而達到恢復會話的效果。具體
使用方法是,第一次連接上ZooKeeper服務器時,通過調用ZooKeeper對象
實
例的以下兩個接口,即可獲得當前會話的ID和秘鑰:
long getSessionId();
byte[]getSessionPasswd( );
荻取到這兩個參數值之后,就可以在下次創建ZooKeeper對象實例的時候傳
入構造方法了
同步創建節點:
1 @Test
2 public void createTest() throws KeeperException, InterruptedException {
3 String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_A
CL_UNSAFE, CreateMode.PERSISTENT);
4 log.info("created path: {}",path);
5 }
異步創建節點:
1 @Test
2 public void createAsycTest() throws InterruptedException {
3 zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
4 CreateMode.PERSISTENT,
5 (rc, path, ctx, name) ‐> log.info("rc {},path {},ctx {},name
{}",rc,path,ctx,name),"context");
6 TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
7 }
修改節點數據
1 @Test
2 public void setTest() throws KeeperException, InterruptedException {
3
4 Stat stat = new Stat();
5 byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
6 log.info("修改前: {}",new String(data));
7 zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
8 byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
9 log.info("修改后: {}",new String(dataAfter));
10 }
什么是 Curator
Curator 是一套由netflix 公司開源的,Java 語言編程的 ZooKeeper 客戶端框架,Curator項目
是現在ZooKeeper 客戶端中使用最多,對ZooKeeper 版本支持最好的第三方客戶端,并推薦使
用,Curator 把我們平時常用的很多 ZooKeeper 服務開發功能做了封裝,例如 Leader 選舉、
分布式計數器、分布式鎖。這就減少了技術人員在使用 ZooKeeper 時的大部分底層細節開發工
作。在會話重新連接、Watch 反復注冊、多種異常處理等使用場景中,用原生的 ZooKeeper
處理比較復雜。而在使用 Curator 時,由于其對這些功能都做了高度的封裝,使用起來更加簡
單,不但減少了開發時間,而且增強了程序的可靠性。
Curator Caches:
Curator 引入了 Cache 來實現對 Zookeeper 服務端事件監聽,Cache 事件監聽可以理解為一
個本地緩存視圖與遠程 Zookeeper 視圖的對比過程。Cache 提供了反復注冊的功能。Cache 分
為兩類注冊類型:節點監聽和子節點監聽。