1. ZooKeeper Java客戶端實戰
ZooKeeper應用開發主要通過Java客戶端API連接和操作ZooKeeper集群,有官方和第三方兩種客戶端選擇。
1.1 ZooKeeper原生Java客戶端
依賴引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>
注意:客戶端版本需與服務端保持一致,避免兼容性問題
基本使用
public class ZkClientDemo {private static final String CLUSTER_CONNECT_STR = "192.168.22.156:2181,192.168.22.190:2181,192.168.22.200:2181";public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected == event.getState() && event.getType() == Event.EventType.None) {countDownLatch.countDown();System.out.println("連接建立");}}});countDownLatch.await();System.out.println(zooKeeper.getState()); // CONNECTED// 創建持久節點zooKeeper.create("/user", "fox".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}
原生API的局限性
- Watcher監測為一次性,需重復注冊
- 無自動重連機制
- 異常處理復雜
- 僅提供byte[]接口,缺少POJO序列化支持
- 需手動檢查節點存在性
- 不支持級聯刪除
常用方法
create(path, data, acl, createMode)
:創建節點delete(path, version)
:刪除節點exists(path, watch)
:判斷節點存在性getData(path, watch)
:獲取節點數據setData(path, data, version)
:設置節點數據getChildren(path, watch)
:獲取子節點列表sync(path)
:同步客戶端與leader節點
所有方法都提供同步和異步兩個版本,且支持條件更新(通過version參數控制)。
同步創建節點
@Test
public void createTest() throws KeeperException, InterruptedException {String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}", path);
}
異步創建節點
@Test
public void createAsyncTest() throws InterruptedException {zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name),"context");
}
修改節點數據
@Test
public void setTest() throws KeeperException, InterruptedException {Stat stat = new Stat();byte[] data = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改前: {}", new String(data));zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);log.info("修改后: {}", new String(dataAfter));
}
1.2 Curator開源客戶端(常用)
依賴引入
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
客戶端創建
// 方式一:使用newClient方法
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();// 方式二:使用builder模式(推薦)
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base") // 命名空間隔離.build();
client.start();
重試策略類型
ExponentialBackoffRetry
:重試間隔按指數增長RetryNTimes
:最大重試次數RetryOneTime
:只重試一次RetryUntilElapsed
:在指定時間內重試
基本操作
// 創建節點
@Test
public void testCreate() throws Exception {String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{} successfully.", path);
}// 創建層級節點
@Test
public void testCreateWithParent() throws Exception {String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);
}// 獲取數據
@Test
public void testGetData() throws Exception {byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));
}// 更新數據
@Test
public void testSetData() throws Exception {curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));
}// 刪除節點
@Test
public void testDelete() throws Exception {String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
異步接口
@Test
public void testAsync() throws Exception {// 默認在EventThread中執行curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}).forPath(ZK_NODE);// 指定自定義線程池ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info("background: {}", item2);}, executorService).forPath(ZK_NODE);
}
監聽器機制
Curator提供了三種Cache監聽模式:
- NodeCache - 監聽單個節點
public class NodeCacheTest {public static final String NODE_CACHE = "/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(() -> {log.info("{} path nodeChanged: ", NODE_CACHE);printNodeData();});nodeCache.start();}
}
- PathChildrenCache - 監聽子節點(不包含二級子節點)
public class PathCacheTest {public static final String PATH = "/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener((client, event) -> {log.info("event: {}", event);});pathChildrenCache.start(true);}
}
- TreeCache - 監聽當前節點及所有遞歸子節點
public class TreeCacheTest {public static final String TREE_CACHE = "/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener((client, event) -> {log.info("tree cache: {}", event);});treeCache.start();}
}
2. ZooKeeper在分布式命名服務中的實戰
2.1 分布式API目錄
Dubbo框架使用ZooKeeper實現分布式JNDI功能:
- 服務提供者在啟動時向
/dubbo/${serviceName}/providers
節點寫入API地址 - 服務消費者訂閱該節點下的URL地址,獲取所有服務提供者的API
2.2 分布式節點命名
動態節點命名方案:
- 使用數據庫自增ID特性
- 使用ZooKeeper持久順序節點的順序特性
ZooKeeper方案流程:
- 啟動服務,連接ZooKeeper,檢查/創建根節點
- 在根節點下創建臨時順序節點,取回編號作為NodeId
- 根據需要刪除臨時順序節點
2.3 分布式ID生成器
方案對比
- Java UUID
- Redis INCR/INCRBY操作
- Twitter SnowFlake算法
- ZooKeeper順序節點
- MongoDB ObjectId
基于ZooKeeper的實現
public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String makeId(String path) throws Exception {String str = createSeqNode(path);if (null != str) {int index = str.lastIndexOf(path);if (index >= 0) {index += path.length();return index <= str.length() ? str.substring(index) : "";}}return str;}
}
基于SnowFlake算法的實現
public class SnowflakeIdGenerator {private static final long START_TIME = 1483200000000L;private static final int WORKER_ID_BITS = 13;private static final int SEQUENCE_BITS = 10;private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;private static final long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;private long workerId;private long lastTimestamp = -1L;private long sequence = 0L;public synchronized void init(long workerId) {if (workerId > MAX_WORKER_ID) {throw new IllegalArgumentException("worker Id wrong: " + workerId);}this.workerId = workerId;}private synchronized long generateId() {long current = System.currentTimeMillis();if (current < lastTimestamp) {return -1; // 時鐘回撥}if (current == lastTimestamp) {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE) {current = this.nextMs(lastTimestamp);}} else {sequence = 0L;}lastTimestamp = current;long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}
}
3. ZooKeeper實現分布式隊列
3.1 設計思路
- 創建持久節點作為隊列根節點
- 入隊:在根節點下創建臨時有序節點
- 出隊:獲取最小序號節點,讀取數據后刪除
3.2 Curator實現
public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 序列化器QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 消費者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消費消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 創建隊列(可指定鎖路徑保證原子性)DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).lockPath("/orderlock") // 可選:分布式鎖路徑.buildQueue();queue.start();// 生產消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生產消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}
3.3 注意事項
- ZooKeeper不適合大數據量存儲,官方不推薦作為隊列使用
- 在吞吐量不高的小型系統中較為適用
- 使用鎖路徑(
lockPath
)可保證操作的原子性和順序性 - 不指定鎖路徑可提高性能,但可能面臨并發問題
總結
ZooKeeper提供了強大的分布式協調能力,通過原生API或Curator客戶端可以實現多種分布式場景下的解決方案。在選擇方案時需要根據具體需求權衡性能、一致性和復雜性,特別是在高并發場景下需要考慮ZooKeeper的適用性和局限性。