目錄
案例一:服務器動態上下線
服務端:
(1)先獲取zookeeper連接
(2)注冊服務器到zookeeper集群:
(3)業務邏輯(睡眠):
服務端代碼如下:
客戶端:
(1)獲取zookeeper的連接:
(2)監聽/servers下邊的子節點的增減:
客戶端代碼如下:
案例二:ZooKeeper 分布式鎖
分布式鎖是什么?
鎖的實現:
構造函數:
加鎖函數:
解鎖函數:
整體代碼:
測試類代碼 :
Curator 框架實現分布式鎖案例:
實現步驟:
代碼如下:
該案例主要也是客戶端監聽原理,客戶端監聽服務器的上下線情況
先在集群上創建/servers 節點(用于存儲連接的服務器的主機和該服務器的節點數)相當于zookeeper集群
案例一:服務器動態上下線
服務端:
(1)先獲取zookeeper連接
? ? ? ? 創建類對象
該類為我們創建的服務端類:
DistributeServer server = new DistributeServer();
? ? ? ? 獲取zookeeper連接:
自己創建連接方法:
private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
?讓后server對象在main函數中調用
(2)注冊服務器到zookeeper集群:
注冊是需要注冊到zookeeper集群的/servers路徑下,需要指定參數進行創建
private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 需要創建有序的臨時節點所以-e(暫時) -s(有序)System.out.println("服務器"+hostname+"已注冊連接");}
(3)業務邏輯(睡眠):
private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
服務端代碼如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/*** @Date 2023/8/10 19:06* @Author */
public class DistributeServer {private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";private static int sessionTimeout=2000;private ZooKeeper zk =null;private String parentNode = "/servers";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//獲取zk連接//創建DistributeServer server = new DistributeServer();server.getconnect();//注冊服務器到zk集群//注冊是需要在/servers節點下創建所開啟的服務器的路徑server.regestServer(args[0]);//業務邏輯(實際是延時讓它睡覺---不然會注冊完成就關閉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 需要創建有序的臨時節點所以-e(暫時) -s(有序)System.out.println("服務器"+hostname+"已注冊連接");}private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}
客戶端:
(1)獲取zookeeper的連接:
? ? ? ? 先創建客戶端對象,在進行構建獲取zookeeper連接的方法,本方法對process方法進行了重寫,填寫了再發生上下線的運行邏輯
private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
(2)監聽/servers下邊的子節點的增減:
? ? ? ? 構建方法client.getServerList()來進行監聽:
代碼邏輯就是通過getChildren()方法獲取指定目錄下的所有子目錄并開啟監聽
再進行遍歷,把遍歷結果封裝到一個集合中,最后進行輸出
private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//該方法會獲取指定路徑下的所有子節點//true 會走初始化中的watch 也可以自己創建watch//把所有的服務器都封裝到一個集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上邊已經便利到一個服務器對象,再進行添加list.add(new String(data));}System.out.println(list);}
(3)業務邏輯同服務端不在贅述。
客戶端代碼如下:
package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*** @Date 2023/8/10 21:27* @Author * 客戶端的監聽功能*/
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout=2000;private ZooKeeper zk=null;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//獲取zk連接DistributeClient client = new DistributeClient();client.getConnect();//監聽/servers下邊的子節點的增減client.getServerList();//業務邏輯(睡眠)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//該方法會獲取指定路徑下的所有子節點//true 會走初始化中的watch 也可以自己創建watch//把所有的服務器都封裝到一個集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上邊已經便利到一個服務器對象,再進行添加list.add(new String(data));}System.out.println(list);}private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}
案例二:ZooKeeper 分布式鎖
分布式鎖是什么?
日常使用計算機的時候,我們的電腦不會只開一個進程,但是當“進程1”在訪問某些資源的時候,不能被其他進程所訪問,它就會去獲得鎖,把她所訪問的資源進行鎖上,對該資源進行獨占。"進程 1"用完該資源以后就將鎖釋放掉,讓其 他進程來獲得鎖,那么通過這個鎖機制,我們就能保證了分布式系統中多個進程能夠有序的 訪問該臨界資源。那么我們把這個分布式環境下的這個鎖叫作分布式鎖。
鎖的實現:
構造函數:
在該類中首先要實現構造方法,構造方法與類名相同,在該方法中需要獲取連接,重寫process方法,在該方法中實現釋放CountDownLatch的類對象,有兩種情況,正常連接釋放一種,不是正常連接狀態,則釋放另一種。在構造方法中還要判斷是否存在“/locks”路徑,存在則正常退出,不存在則創建該路徑。
加鎖函數:
使用ZooKeeper對象進行創建節點(臨時有序),讓后獲取“/locks”路徑下的所有節點序號,對結果進行判斷,如果返回的List集合只有一個節點,則直接返回,默認加鎖,不用再做監聽工作。如果不是只有一個節點,則對List集合進行排序,再獲取他的節點名稱,通過indexOf函數來獲取該名稱節點的下標。如果為-1,則數據異常,為0 則為最小節點,則直接退出,進行加鎖不需要設置監聽,結果為其他則需要設置監聽,先設置監聽字符串,當狀態不發生改變會一致阻塞,只有上鎖節點讓位后會調用process方法進行釋放。
解鎖函數:
解鎖就是直接刪除節點即可
整體代碼:
package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @Date 2023/8/12 19:56* @Author */
public class DistributedLock {final private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";final private int sessionTimeout=2000;final private ZooKeeper zk;private String waitPath;private String currentModu;//為了程序的健壯性,創建該對象 等待操作final private CountDownLatch waitLach=new CountDownLatch(1);final private CountDownLatch countDownLatch=new CountDownLatch(1);public DistributedLock() throws IOException, InterruptedException, KeeperException {//獲取連接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果正常連接zk 可以釋放if (watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//檢測到刪除節點并且是前一個節點則釋放waitlatchif (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLach.countDown();}}});//等待是否正常連接 正常(已)連接會釋放 否則阻塞countDownLatch.await();// 判斷是否存在lock鎖Stat stat = zk.exists("/locks", false);if (stat==null){//創建該節點String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);}}//對zk加鎖public void zkLock() {//創建臨時的帶序號的節點try {currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren("/locks", false);//如果只有一個節點 則直接獲取if(children.size()==1){return;}else {//排序Collections.sort(children);//直接從s后邊開始 開始的下標就是length的長度String substring = currentModu.substring("/locks/".length());//通過substring來獲取在List集合中的下標位置int index = children.indexOf(substring);if (index==-1){System.out.println("數據異常");}else if (index==0){return;}else {// 需要監聽上一個節點waitPath="/locks/"+children.get(index-1);zk.getData(waitPath,true,new Stat());//等待監聽waitLach.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//判斷創建的節點是否是最小序號的節點 如果是則獲取鎖 不是則監聽他的前一個節點}//對zk解鎖public void unzkLock(){
//刪除節點try {//-1 是版本號zk.delete(this.currentModu,-1);} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}
}
測試類代碼 :
package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/*** @Date 2023/8/12 22:31* @Author 唐曉聰*/
public class DistributedLockTest
{public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//創建兩個客戶端對象final DistributedLock lock1 = new DistributedLock();final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try { lock1.zkLock();System.out.println("線程1啟動獲得鎖");Thread.sleep(5*1000);lock1.unzkLock();System.out.println("線程1釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("線程2啟動獲得鎖");Thread.sleep(5*1000);lock2.unzkLock();System.out.println("線程2釋放鎖");} catch (Exception e) {e.printStackTrace();}}}).start();}
}
Curator 框架實現分布式鎖案例:
該案例是直接使用API進行實現分布式鎖
實現步驟:
先創建分布式鎖對象,new?InterProcessMutex(),參數1為所要連接的客戶端,參數2為監聽路徑
參數1傳入的為getCuratorFramework()自定義函數,
該函數通過工廠類的方式進行建立連接,返回創建好的客戶端,讓后start啟動客戶端
創建完分布式鎖對象后創建兩個線程,在線程中進行獲得鎖,釋放鎖的操作。
代碼如下:
package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @Date 2023/8/13 20:07* @Author */
public class CuratorLockTest {public static void main(String[] args) {//創建分布式鎖1//參數1 所連接的客戶端 參數2 監聽路徑InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//創建分布式鎖2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");//創建線程new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("thread 1 acquire lock");lock1.acquire();System.out.println("thread 1 again acquire lock");Thread.sleep(5*1000);lock1.release();System.out.println("thread 1 relax lock");lock1.release();System.out.println("thread 1 again relax lock");System.out.println();} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("thread 2 acquire lock");lock2.acquire();System.out.println("thread 2 again acquire lock");Thread.sleep(5*1000);lock2.release();System.out.println("thread 2 relax lock");lock2.release();System.out.println("thread 2 again relax lock");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);//通過工廠類的方式進行建立連接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy)//連接失敗后 間隔多少秒下次間隔.build();client.start();System.out.println("zookeeper success start !!!!!");return client;}
}