文章目錄
- Java連接Zookeeper服務端
- 依賴
- 代碼使用
- 應用場景
- 統一命名服務
- 統一配置管理
- 統一集群管理
- 服務器節點動態上下線
- 理解
- 實現
- 模擬服務提供者【客戶端代碼】-注冊服務
- 模擬服務消費者【客戶端代碼】-獲取服務信息進行請求消費
- 軟負載均衡
- 分布式鎖
- 理解
- 實現
- 生產集群安裝N臺機器合適
- 第三方基于zookeeper的包
- curator
- 依賴
- 代碼
Java連接Zookeeper服務端
文檔: https://zookeeper.apache.org/doc/r3.9.1/javaExample.html
依賴
依賴
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.1</version></dependency><!-- https://mvnrepository.com/artifact/org.dromara.hutool/hutool-all --><dependency><groupId>org.dromara.hutool</groupId><artifactId>hutool-all</artifactId><version>6.0.0-M11</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>5.10.2</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency></dependencies>
代碼使用
Java代碼
public class ZkClient {@Test@SneakyThrowspublic void test1() {String connectString = "192.168.19.107:2181"; // zookeeper服務端信息int sessionTimeout = 2000; // 連接最大時長(毫秒)ZooKeeper zooKeeperClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {Console.log("服務端推送給客戶端的監聽事件信息 == {}", event);}});// 監聽節點數據的變化 === 等價于get -w 命令try {String s = zooKeeperClient.create("/test", "testData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Console.log("創建節點成功:{}", s);} catch (Exception e) {Console.log("創建節點失敗:{}", e.getMessage());}// 啟動監聽增刪子節點的變化,然后在前面【Watcher】能收到監聽事件 === 等價于ls -w 命令List<String> children = zooKeeperClient.getChildren("/test", true);Console.log("Zookeeper服務端當前/test所有的子節點名字:{}", children);//啟動節點的狀態信息變化 === 等價于stat -w 命令Stat statInfo = zooKeeperClient.exists("/test", true);Console.log("Zookeeper服務端當前/test節點的狀態信息:{}" , statInfo);//程序永遠不結束while (true) {try {Thread.sleep(1000); // 暫停1秒鐘} catch (InterruptedException e) {e.printStackTrace();}}}}
1. 前提背景
2. 開始執行代碼
3. 命令增加節點
4. Java客戶端監聽到的消息
應用場景
統一命名服務
對應用、服務同意命名便于識別,比如一個對外服務接口的多集群,則需要統一的管理同一服務的所有IP
統一配置管理
- 場景:
- 一般要求一個集群中,所有節點的配置信息是一致的,比如Kafka集群。
- 對配置文件修改后,希望能夠快速同步到各個節點上
- 實現:配置信息寫入到Zookeeper一個節點中,客戶端監聽這個節點即可
統一集群管理
- 場景:
- 分布式環境,實時掌握每個節點狀態是必要的
- 實現: 節點信息寫入ZooKeeper_上的一個ZNode。客戶端監聽這個ZNode可獲取它的實時狀態變化
服務器節點動態上下線
理解
特點: 客戶端能實時洞察到服務器上下線的變化
實現
前提: 運行代碼前自行在Zookeeper客戶端創建/service節點【create /service “service”】,因為zookeeper創建子節點前必須有父節點,否則創建子節點失敗
模擬服務提供者【客戶端代碼】-注冊服務
public class ServiceProviderZkClient {private static String connecting = StrUtil.join(StrUtil.COMMA,"192.168.19.107:2181","192.168.19.108:2181","192.168.19.109:2181");private static Integer timeout = 2000;@SneakyThrowspublic static void main(String[] args) {Arrays.asList("application1","application2","application3").stream().parallel().forEach(applicationName -> {serviceRegister(applicationName);});}@SneakyThrowspublic static void serviceRegister(String applicationName) {ZooKeeper zooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {public void process(WatchedEvent event) {Console.log("服務端推送的監聽信息:{}", event);}});String zookeeperPath = StrUtil.format("/service/{}", applicationName);byte[] zookeeperPathData = Convert.toPrimitiveByteArray(StrUtil.format("{}應用的IP地址等信息", applicationName));String newNodePath = zooKeeper.create(zookeeperPath, zookeeperPathData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);Console.log("【{}】在線" , applicationName);//程序永遠不結束while (true) {try {Thread.sleep(1000); // 暫停1秒鐘} catch (InterruptedException e) {e.printStackTrace();}}}}
模擬服務消費者【客戶端代碼】-獲取服務信息進行請求消費
public class ServiceProviderConsumerZkClient {private static String connecting = StrUtil.join(StrUtil.COMMA,"192.168.19.107:2181","192.168.19.108:2181","192.168.19.109:2181");private static Integer timeout = 2000;private static AtomicReference children = new AtomicReference(ListUtil.of());private static ZooKeeper zooKeeper = null;@SneakyThrowspublic static void main(String[] args) {zooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {public void process(WatchedEvent event) {Console.log("服務端推送的監聽信息:{}", event);//每次收到監聽通知消息,同步服務在線狀態getServiceNode();}});//獲取在線中的提供提供者getServiceNode();while (true) {String targetServiceName = "application2";//在線的服務真實路徑String targetServiceNodeName = CollUtil.emptyIfNull((List<String>)children.get()).stream().filter(childrenPath -> StrUtil.contains(childrenPath, targetServiceName)).findFirst().orElse(null);String targetServiceNamePath = StrUtil.format("/service/{}" , targetServiceNodeName);boolean targetServiceNameExistFlag = StrUtil.isNotBlank(targetServiceNodeName);if(targetServiceNameExistFlag) {//獲取服務的配置信息進行服務調用 == 節點里面一般包含當前服務提供者http,端口等等信息String nodeData = Convert.toStr(zooKeeper.getData(targetServiceNamePath, false, null));Console.log("【{}】第三方服務上線,調用接口成功", targetServiceName);}else {Console.log("【{}】第三方服務未上線,調用接口失敗" , targetServiceName);}ThreadUtil.sleep(5000);}}@SneakyThrowspublic static void getServiceNode() {children.set(zooKeeper.getChildren("/service", true));Console.log("系統中的服務提供者節點:{}" , children.get());}}
服務提供者進行注冊服務時
服務消費者進行消費時
軟負載均衡
特點: 在Zookeepert中記錄每臺服務器的訪問數,讓訪問數最少的服務器去處理最新的客戶端請求
分布式鎖
理解
概念: 分布式系統中能保證多個進程有序地進行訪問臨界資源的鎖,拿到鎖的進程才可以訪問資源,否則一直排隊等待鎖
實現
public class DistributedLock {private static String connecting = StrUtil.join(StrUtil.COMMA, "192.168.19.107:2181", "192.168.19.108:2181", "192.168.19.109:2181");private static Integer timeout = 2000;private static ZooKeeper zooKeeper;private static String parentPath = "/DistributedLock";private static Map<String, CountDownLatch> threadIdToCountDownLatchMap = MapUtil.newSafeConcurrentHashMap();static {init();}@SneakyThrowsprivate static void init() {zooKeeper = connectZooKeeper();// 創建鎖父節點String realParentPath = zooKeeper.create(parentPath, Convert.toPrimitiveByteArray(parentPath), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 監聽父節點下子節點增刪的變化// List<String> sonNodeNames = getParentSortedNodes(true);}@SneakyThrowsprivate static ZooKeeper connectZooKeeper() {ZooKeeper connectedZooKeeper = new ZooKeeper(connecting, timeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {Console.log("zookeeper收到服務端監聽通知消息:{}", event);String dealNodePath = event.getPath();Event.EventType eventType = event.getType();if (eventType == Event.EventType.NodeDeleted) {String nodeBelongThreadId = StrUtil.subBefore(StrUtil.subAfter(dealNodePath, "/", true), "_", false);Console.log("收到刪除節點通知,釋放線程等待 == {}", nodeBelongThreadId);// 釋放鎖CountDownLatch countDownLatch = threadIdToCountDownLatchMap.get(nodeBelongThreadId);if (countDownLatch != null) {countDownLatch.countDown();threadIdToCountDownLatchMap.remove(nodeBelongThreadId);}}}});return connectedZooKeeper;}@SneakyThrowspublic static List<String> getParentSortedNodes(Boolean watchFlag) {List<String> sonNodeNames = CollUtil.emptyIfNull(zooKeeper.getChildren(parentPath, true)).stream().sorted(CompareUtil::compare).collect(Collectors.toList());return sonNodeNames;}/*** 獲取鎖*/@SneakyThrowspublic static void acquireLock() {// 當前鎖的節點前綴String nodeNamePrefix = Thread.currentThread().getId() + "_";// 當前鎖的節點完整領前綴String absolutenodeNamePathPrefix = StrUtil.format("{}/{}", parentPath, nodeNamePrefix);// 完整的前綴String realAbsolutenodeNamePath = zooKeeper.create(absolutenodeNamePathPrefix, Convert.toPrimitiveByteArray("absolutenodeNamePathPrefix"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 獲取當前父路徑下的所有節點List<String> sonNodeNames = getParentSortedNodes(false);if (CollUtil.size(sonNodeNames) == 1) { // 獲取鎖Console.log("【子元素為1】獲取鎖【{}】",realAbsolutenodeNamePath);return;} else {String firstNodeName = CollUtil.getFirst(sonNodeNames);if (StrUtil.startWith(firstNodeName, nodeNamePrefix)) {Console.log("【當前子節點在排序后序列為第一個】獲取鎖【{}】",realAbsolutenodeNamePath);return;} else {// 一直等待直到有機會獲取鎖CountDownLatch countDownLatch = new CountDownLatch(1);// 監聽該節點的前一個節點的增刪變化int currentNodeIndex = CollUtil.indexOf(sonNodeNames, nodeName -> StrUtil.endWith(realAbsolutenodeNamePath, nodeName));String previousNodeName = CollUtil.get(sonNodeNames, currentNodeIndex - 1);String previousNodePath = StrUtil.format("{}/{}", parentPath, previousNodeName);zooKeeper.getData(previousNodePath, true, null);threadIdToCountDownLatchMap.put(StrUtil.subBefore(previousNodeName,"_", false), countDownLatch);countDownLatch.await();Console.log("獲取鎖【{}】", realAbsolutenodeNamePath);}}}/*** 釋放鎖*/@SneakyThrowspublic static void releaseLock() {String currentThreadId = Convert.toStr(Thread.currentThread().getId());// CountDownLatch countDownLatch = threadIdToCountDownLatchMap.get(currentThreadId);// if (ObjUtil.isNull(countDownLatch)) {// Console.log("當前線程并沒有等待鎖的操作");// return;// }// 當前鎖的節點前綴String nodeNamePrefix = currentThreadId + "_";String realNodeName = getParentSortedNodes(false).stream().filter(nodeName -> StrUtil.startWith(nodeName, nodeNamePrefix)).findFirst().orElse(null);if (StrUtil.isBlank(realNodeName)) {Console.log("當前線程并未有獲取鎖的操作");return;}String completeNodePath = StrUtil.format("{}/{}", parentPath, realNodeName);zooKeeper.delete(completeNodePath, -1);Console.log("釋放鎖【{}】", completeNodePath);}public static void main(String[] args) {String s = StrUtil.subAfter("fsd/fdsfsdfds", "/", true);Console.log(s);}
}
?
?
public class App2Test {@Test@SneakyThrowspublic void test3() {Bean publicBean = new Bean();List<Thread> threadGroup = ListUtil.of();for (int i = 0; i < 10; i++) {Thread newThread = new Thread(() -> {DistributedLock.acquireLock();// 隨機等待try {Thread.sleep(RandomUtil.randomInt(1000, 3000));} catch (InterruptedException e) {throw new RuntimeException(e);}publicBean.num = ++publicBean.num;Console.log("【{}】:數+1處理 == {}", Thread.currentThread().getId(), publicBean.num);DistributedLock.releaseLock();});newThread.start();threadGroup.add(newThread);}for (Thread runThread : threadGroup) {// 等待線程運行完runThread.join();}Console.log("公共數據最終的結果:{}", publicBean.num);Assert.equals(publicBean.num, 10);}
}
生產集群安裝N臺機器合適
特點: 好處提高可靠性、壞處數據同步有延遲
第三方基于zookeeper的包
curator
官網: https://curator.apache.org/docs/about
?
入門教程: https://curator.apache.org/docs/getting-started/
依賴
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>5.6.0</version></dependency>
代碼
發現: 運行代碼可見curator的分布式鎖的原理跟前面自己實現的邏輯差不多,都是通過增、刪子節點,然后監控前一個節點被刪釋放鎖的邏輯原理去做的
public class OtherTest {@Test@SneakyThrowspublic void test2() {String connectString = "192.168.19.107:2181,192.168.19.108:2181,192.168.19.109:2181";RetryOneTime retryOneTime = new RetryOneTime(2000);CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, 60 * 1000 * 10, 15 * 1000 * 10, retryOneTime);curatorFramework.start();//獲取某父目錄旗下的親兒子節點名字信息List<String> sonNodeNames = curatorFramework.getChildren().forPath("/");Console.log(sonNodeNames);// 分布式鎖InterProcessMutex interProcessMutexLock = new InterProcessMutex(curatorFramework, "/CuratorLock");App2Test.Bean publicBean = new App2Test.Bean();List<Thread> threadGroup = ListUtil.of();for (int i = 0; i < 5; i++) {Thread newThread = new Thread(() -> {try {interProcessMutexLock.acquire();// 隨機等待try {Thread.sleep(RandomUtil.randomInt(1000, 3000));} catch (InterruptedException e) {throw new RuntimeException(e);}publicBean.num = ++publicBean.num;Console.log("【{}】:數+1處理 == {}", Thread.currentThread().getId(), publicBean.num);}catch (Exception e) {}finally {try {interProcessMutexLock.release();}catch (Exception e) {}}});newThread.start();threadGroup.add(newThread);}for (Thread runThread : threadGroup) {// 等待線程運行完runThread.join();}Console.log("公共數據最終的結果:{}", publicBean.num);Assert.equals(publicBean.num, 5);}}
?
剛興趣的同行可以進群溝通交流,內置機器人供大家愉快