zookeeper案例

目錄

案例一:服務器動態上下線

服務端:

(1)先獲取zookeeper連接

(2)注冊服務器到zookeeper集群:

(3)業務邏輯(睡眠):

服務端代碼如下:

客戶端:

(1)獲取zookeeper的連接:

(2)監聽/servers下邊的子節點的增減:

客戶端代碼如下:

案例二:ZooKeeper 分布式鎖

分布式鎖是什么?

鎖的實現:

構造函數:

加鎖函數:

解鎖函數:

整體代碼:

測試類代碼 :

Curator 框架實現分布式鎖案例:

實現步驟:

代碼如下:


該案例主要也是客戶端監聽原理,客戶端監聽服務器的上下線情況

先在集群上創建/servers 節點(用于存儲連接的服務器的主機和該服務器的節點數)相當于zookeeper集群

案例一:服務器動態上下線

服務端:

(1)先獲取zookeeper連接

? ? ? ? 創建類對象

該類為我們創建的服務端類:

        DistributeServer server = new DistributeServer();

? ? ? ? 獲取zookeeper連接:

自己創建連接方法:

    private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}

?讓后server對象在main函數中調用

(2)注冊服務器到zookeeper集群:

注冊是需要注冊到zookeeper集群的/servers路徑下,需要指定參數進行創建

private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要創建有序的臨時節點所以-e(暫時) -s(有序)System.out.println("服務器"+hostname+"已注冊連接");}

(3)業務邏輯(睡眠):

    private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}

服務端代碼如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/*** @Date 2023/8/10 19:06* @Author */
public class DistributeServer {private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";private static int sessionTimeout=2000;private ZooKeeper zk =null;private String parentNode = "/servers";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//獲取zk連接//創建DistributeServer server = new DistributeServer();server.getconnect();//注冊服務器到zk集群//注冊是需要在/servers節點下創建所開啟的服務器的路徑server.regestServer(args[0]);//業務邏輯(實際是延時讓它睡覺---不然會注冊完成就關閉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要創建有序的臨時節點所以-e(暫時) -s(有序)System.out.println("服務器"+hostname+"已注冊連接");}private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}

客戶端:

(1)獲取zookeeper的連接:

? ? ? ? 先創建客戶端對象,在進行構建獲取zookeeper連接的方法,本方法對process方法進行了重寫,填寫了再發生上下線的運行邏輯

 private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}

(2)監聽/servers下邊的子節點的增減:

? ? ? ? 構建方法client.getServerList()來進行監聽:

代碼邏輯就是通過getChildren()方法獲取指定目錄下的所有子目錄并開啟監聽

再進行遍歷,把遍歷結果封裝到一個集合中,最后進行輸出

 private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//該方法會獲取指定路徑下的所有子節點//true 會走初始化中的watch 也可以自己創建watch//把所有的服務器都封裝到一個集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上邊已經便利到一個服務器對象,再進行添加list.add(new String(data));}System.out.println(list);}

(3)業務邏輯同服務端不在贅述。

客戶端代碼如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*** @Date 2023/8/10 21:27* @Author * 客戶端的監聽功能*/
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout=2000;private ZooKeeper zk=null;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//獲取zk連接DistributeClient client = new DistributeClient();client.getConnect();//監聽/servers下邊的子節點的增減client.getServerList();//業務邏輯(睡眠)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//該方法會獲取指定路徑下的所有子節點//true 會走初始化中的watch 也可以自己創建watch//把所有的服務器都封裝到一個集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上邊已經便利到一個服務器對象,再進行添加list.add(new String(data));}System.out.println(list);}private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}

案例二:ZooKeeper 分布式鎖

分布式鎖是什么?

日常使用計算機的時候,我們的電腦不會只開一個進程,但是當“進程1”在訪問某些資源的時候,不能被其他進程所訪問,它就會去獲得鎖,把她所訪問的資源進行鎖上,對該資源進行獨占。"進程 1"用完該資源以后就將鎖釋放掉,讓其 他進程來獲得鎖,那么通過這個鎖機制,我們就能保證了分布式系統中多個進程能夠有序的 訪問該臨界資源。那么我們把這個分布式環境下的這個鎖叫作分布式鎖。

鎖的實現:

構造函數:

在該類中首先要實現構造方法,構造方法與類名相同,在該方法中需要獲取連接,重寫process方法,在該方法中實現釋放CountDownLatch的類對象,有兩種情況,正常連接釋放一種,不是正常連接狀態,則釋放另一種。在構造方法中還要判斷是否存在“/locks”路徑,存在則正常退出,不存在則創建該路徑。

加鎖函數:

使用ZooKeeper對象進行創建節點(臨時有序),讓后獲取“/locks”路徑下的所有節點序號,對結果進行判斷,如果返回的List集合只有一個節點,則直接返回,默認加鎖,不用再做監聽工作。如果不是只有一個節點,則對List集合進行排序,再獲取他的節點名稱,通過indexOf函數來獲取該名稱節點的下標。如果為-1,則數據異常,為0 則為最小節點,則直接退出,進行加鎖不需要設置監聽,結果為其他則需要設置監聽,先設置監聽字符串,當狀態不發生改變會一致阻塞,只有上鎖節點讓位后會調用process方法進行釋放。

解鎖函數:

解鎖就是直接刪除節點即可

整體代碼:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @Date 2023/8/12 19:56* @Author */
public class DistributedLock {final    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";final  private int sessionTimeout=2000;final    private   ZooKeeper zk;private String waitPath;private String currentModu;//為了程序的健壯性,創建該對象   等待操作final   private CountDownLatch waitLach=new CountDownLatch(1);final   private CountDownLatch countDownLatch=new CountDownLatch(1);public DistributedLock() throws IOException, InterruptedException, KeeperException {//獲取連接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//  connectLatch  如果正常連接zk  可以釋放if (watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//檢測到刪除節點并且是前一個節點則釋放waitlatchif (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLach.countDown();}}});//等待是否正常連接  正常(已)連接會釋放  否則阻塞countDownLatch.await();// 判斷是否存在lock鎖Stat stat = zk.exists("/locks", false);if (stat==null){//創建該節點String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);}}//對zk加鎖public void zkLock()  {//創建臨時的帶序號的節點try {currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren("/locks", false);//如果只有一個節點   則直接獲取if(children.size()==1){return;}else {//排序Collections.sort(children);//直接從s后邊開始   開始的下標就是length的長度String substring = currentModu.substring("/locks/".length());//通過substring來獲取在List集合中的下標位置int index = children.indexOf(substring);if (index==-1){System.out.println("數據異常");}else if (index==0){return;}else {//  需要監聽上一個節點waitPath="/locks/"+children.get(index-1);zk.getData(waitPath,true,new Stat());//等待監聽waitLach.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//判斷創建的節點是否是最小序號的節點 如果是則獲取鎖  不是則監聽他的前一個節點}//對zk解鎖public void unzkLock(){
//刪除節點try {//-1  是版本號zk.delete(this.currentModu,-1);} catch (InterruptedException  | KeeperException e) {e.printStackTrace();}}
}

測試類代碼 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/*** @Date 2023/8/12 22:31* @Author 唐曉聰*/
public class DistributedLockTest
{public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//創建兩個客戶端對象final    DistributedLock lock1 = new DistributedLock();final   DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {  lock1.zkLock();System.out.println("線程1啟動獲得鎖");Thread.sleep(5*1000);lock1.unzkLock();System.out.println("線程1釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("線程2啟動獲得鎖");Thread.sleep(5*1000);lock2.unzkLock();System.out.println("線程2釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();}
}

Curator 框架實現分布式鎖案例:

該案例是直接使用API進行實現分布式鎖

實現步驟:

創建分布式鎖對象,new?InterProcessMutex(),參數1為所要連接的客戶端,參數2為監聽路徑

參數1傳入的為getCuratorFramework()自定義函數,

該函數通過工廠類的方式進行建立連接,返回創建好的客戶端,讓后start啟動客戶端

創建完分布式鎖對象后創建兩個線程,在線程中進行獲得鎖,釋放鎖的操作。

代碼如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @Date 2023/8/13 20:07* @Author */
public class CuratorLockTest {public static void main(String[] args) {//創建分布式鎖1//參數1   所連接的客戶端 參數2 監聽路徑InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//創建分布式鎖2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");//創建線程new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("thread 1 acquire lock");lock1.acquire();System.out.println("thread 1 again acquire lock");Thread.sleep(5*1000);lock1.release();System.out.println("thread 1 relax lock");lock1.release();System.out.println("thread 1 again relax lock");System.out.println();} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("thread 2 acquire lock");lock2.acquire();System.out.println("thread 2 again acquire lock");Thread.sleep(5*1000);lock2.release();System.out.println("thread 2 relax lock");lock2.release();System.out.println("thread 2 again relax lock");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);//通過工廠類的方式進行建立連接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy)//連接失敗后  間隔多少秒下次間隔.build();client.start();System.out.println("zookeeper  success start  !!!!!");return client;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/36212.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/36212.shtml
英文地址,請注明出處:http://en.pswp.cn/news/36212.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java+Excel+POI+testNG基于數據驅動做一個簡單的接口測試【杭州多測師_王sir】

一、創建一個apicases.xlsx放入到eclipse的resource里面&#xff0c;然后refresh刷新一下 二、在pom.xml文件中加入poi和testng的mvn repository、然后在eclipse的對應目錄下放入features和plugins&#xff0c;重啟eclipse就可以看到testNG了 <!--poi excel解析 --><d…

運維監控學習筆記3

DELL的IPMI頁面的登錄&#xff1a; 風扇的狀態&#xff1a; 電源溫度&#xff1a;超過70度就告警&#xff1a; 日志信息&#xff1a; 可以看到更換過磁盤。 iDRAC的設置 虛擬控制臺&#xff1a;啟動遠程控制臺&#xff1a; 可以進行遠程控制。 機房工程師幫我們接遠程控制&…

【云原生】kubernetes中容器的資源限制

目錄 1 metrics-server 2 指定內存請求和限制 3 指定 CPU 請求和限制 資源限制 在k8s中對于容器資源限制主要分為以下兩類: 內存資源限制: 內存請求&#xff08;request&#xff09;和內存限制&#xff08;limit&#xff09;分配給一個容器。 我們保障容器擁有它請求數量的…

【云原生】K8S集群

目錄 一、調度約束1.1 POT的創建過程1.1調度過程 二、指定節點調度2.1 通過標簽選擇節點 三、親和性3.1requiredDuringSchedulingIgnoredDuringExecution&#xff1a;硬策略3.1 preferredDuringSchedulingIgnoredDuringExecution&#xff1a;軟策略3.3Pod親和性與反親和性3.4使…

(2)原神角色數據分析-2

功能一&#xff1a; 得到某個屬性的全部角色&#xff0c;將其封裝在class中 """各元素角色信息&#xff1a;一對多""" from pandas import DataFrame, Series import pandas as pd import numpy as npclass FindType:# 自動執行&#xff0c;將…

山東布谷科技直播平臺搭建游戲開發技術分享:數據存儲的重要意義

在市場上的熱門的直播平臺中&#xff0c;有很多小程序為用戶提供各種各樣的功能&#xff0c;這其中就有很多游戲小程序&#xff0c;當今社會獨生子女眾多&#xff0c;很多作為獨生子女的用戶都會去選擇一個能夠社交互動的APP來填補內心的空虛&#xff0c;而直播平臺的實時互動的…

SAP 選擇屏幕組件名描述翻譯時字符長度不夠問題處理

問題&#xff1a;有時候我們在開發report程序的時候&#xff0c;要求程序顯示支持中英文&#xff0c;如果程序是在中文環境下開發的時候&#xff0c;需要進行翻譯處理&#xff0c;但是我們發現選擇屏幕上的組件的描述支持的默認長度是30位&#xff0c;如果超過該如何處理呢 解…

《路由與交換技術》讀書筆記

小小感悟 工作近3年&#xff0c;基本沒去看路由交換相關書籍&#xff0c;趁著搬家后&#xff0c;周末閑暇時間&#xff0c;快速看了一遍《路由與交換技術》&#xff0c;溫習了一遍&#xff0c;很有收獲&#xff0c;以后還是要多花時間看看其他類型的書。 讀書筆記 1.1 移動通…

構建一個LLM應用所需的所有信息

一、說明 您是否對大型語言模型&#xff08;LLM&#xff09;的潛力感興趣&#xff0c;并渴望創建您的第一個基于LLM的應用程序&#xff1f;或者&#xff0c;也許您是一位經驗豐富的開發人員&#xff0c;希望簡化工作流程&#xff1f;看看DemoGPT就是您的最佳選擇。該工具旨在簡…

【軟件測試】Linux環境下Docker搭建+Docker搭建MySQL服務(詳細)

目錄&#xff1a;導讀 前言 一、Python編程入門到精通二、接口自動化項目實戰三、Web自動化項目實戰四、App自動化項目實戰五、一線大廠簡歷六、測試開發DevOps體系七、常用自動化測試工具八、JMeter性能測試九、總結&#xff08;尾部小驚喜&#xff09; 前言 Linux之docker搭…

CDN(內容分發網絡)

CDN的全稱是 Content Delivery Network, 即內容分發網絡。CDN是構建在現有網絡基礎之上的智能虛擬網絡&#xff0c;依靠部署在各地的邊緣服務器&#xff0c;通過中心平臺的負載均衡、內容分發、調度等功能模塊&#xff0c;使用戶就近獲取所需內容&#xff0c;降低網絡擁塞&a…

詳談MongoDB的那些事

概念區分 什么是關系型數據庫 關系型數據庫&#xff08;Relational Database&#xff09;是一種基于關系模型的數據庫管理系統&#xff08;DBMS&#xff09;。在關系型數據庫中&#xff0c;數據以表格的形式存儲&#xff0c;表格由行和列組成&#xff0c;行表示數據記錄&…

神秘的ip地址8.8.8.8,到底是什么類型的DNS服務器?

下午好&#xff0c;我的網工朋友。 DNS&#xff0c;咱們網工配置網絡連接或者路由器時&#xff0c;高低得和這玩意兒打交道吧。 它是互聯網中用于將人類可讀的域名&#xff08;例如http://www.example.com&#xff09;轉換為計算機可理解的IP地址&#xff08;例如192.0.2.1&a…

元宇宙核能發電VR模擬仿真實訓教學為建設新型電力系統提供重要支撐

隨著“碳達峰、碳中和”目標與建設新型能源體系的提出&#xff0c;在元宇宙環境下建設電力系統是未來發展的趨勢。以物聯網、區塊鏈、數字孿生、混合現實等技術為主要代表的元宇宙技術體系及其在電力和能源系統中的應用&#xff0c;將會促進智能電網的發展&#xff0c;為建設新…

Oracle 知識篇+分區表上的索引由global改為local注意事項

★ 知識點 二、知識點 ?Local型索引有如下優點 1.Only one index partition must be rebuilt when a maintenance operation other than SPLIT PARTITION or ADD PARTITION is performed on an underlying table partition. 2.The duration of a partition maintenance opera…

【uniapp】使用Vs Code開發uniapp:

文章目錄 一、使用命令行創建uniapp項目&#xff1a;二、安裝插件與配置&#xff1a;三、編譯和運行:四、修改pinia&#xff1a; 一、使用命令行創建uniapp項目&#xff1a; 二、安裝插件與配置&#xff1a; 三、編譯和運行: 該項目下的dist》dev》mp-weixin文件導入微信開發者…

unity vscode 代碼關聯 跳轉 BUG

一早打開電腦發現代碼關聯失效了&#xff0c;目測可能跟昨天一些插件更新有關 結論 就這貨&#xff0c;開了就沒法提示代碼關聯&#xff0c;估計預覽版全是BUG。 另一個坑 同期有個unity插件也是預覽版&#xff0c;“非常好使”&#xff0c;當場去世。評論點開有好幾個人說用…

替代阿托斯DLKZOR-T/DLHZO-TES直動式伺服閥比例閥

DLKZOR-T/DLKZOR-TES直動式伺服閥比例閥結構&#xff1a; 1&#xff0c;LVDT傳感器 2&#xff0c;比例電磁鐵 3&#xff0c;閥體 4&#xff0c;閥套 5&#xff0c;閥芯 6&#xff0c;復位彈簧 7&#xff0c;集成數字放大器 8&#xff0c;七芯插頭 9&#xff0c;RS232通…

[保研/考研機試] 楊輝三角形 西北工業大學復試上機題 C++實現

題目描述 Time Limit: 1000 ms Memory Limit: 256 mb 輸入n值&#xff0c;使用遞歸函數&#xff0c;求楊輝三角形中各個位置上的值。 輸入描述: 一個大于等于2的整型數n 輸出描述: 題目可能有多組不同的測試數據&#xff0c;對于每組輸入數據&#xff0c; 按題目的要求輸…

15.3.2 【Linux】系統的配置文件:/etc/crontab,/etc/cron.d/*

這個“ crontab -e ”是針對使用者的 cron 來設計的&#xff0c;如果是“系統的例行性任務”時&#xff0c; 該怎么辦呢&#xff1f;是否還是需要以 crontab -e 來管理你的例行性工作調度呢&#xff1f;當然不需要&#xff0c;你只要編輯/etc/crontab 這個文件就可以。有一點需…