ZooKeeper編程02--多線程的分佈式鎖

面向過程版:

package distributedLockProcess;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class dl {private static final Logger LOG = LoggerFactory.getLogger(dl.class);//確保所有線程運行結束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int SESSION_TIMEOUT = 10000;private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {final CountDownLatch countDownLatch = new CountDownLatch(1);try{//此線程連接ZooKeeperZooKeeper zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher(){@Overridepublic void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}});countDownLatch.await();System.out.println(Thread.currentThread().getName() + " --- ZooKeeper.connect()");//GROUP_PATH不存在的話,由一個線程創建即可;if(zk.exists(GROUP_PATH, false)==null){LOG.info( Thread.currentThread().getName() + "節點創建成功, Path: "+ zk.create( GROUP_PATH,("該節點由線程"+Thread.currentThread().getName() + "創建").getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + ("該節點由線程"+Thread.currentThread().getName() + "創建") );}String selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(Thread.currentThread().getName()+"創建鎖路徑:"+selfPath);if(checkMinPath(zk, selfPath)){LOG.info(Thread.currentThread().getName() + "獲取鎖成功,趕緊干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本節點已不在了...");return;}zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "刪除本節點:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} catch (Exception e){LOG.error("【第"+threadId+"個線程】 拋出的異常:");e.printStackTrace();}}}.start();}try {
//	        	Thread.sleep(60000);threadSemaphore.await();LOG.info("所有線程運行結束!");} catch (Exception e) {e.printStackTrace();}}protected static boolean checkMinPath(final ZooKeeper zk, final String selfPath) throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(Thread.currentThread().getName()+"本節點已不在了..."+selfPath);return false;}case 0:{LOG.info(Thread.currentThread().getName()+"子節點中,我果然是老大"+selfPath);return true;}default:{final String waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(Thread.currentThread().getName()+"獲取子節點中,排在我前面的"+waitPath);try{zk.getData(waitPath, new Watcher(){@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  LOG.info(Thread.currentThread().getName()+ "收到情報,排我前面的家伙已掛,我是不是可以出山了?");  try {  if(checkMinPath(zk, selfPath)){  LOG.info(Thread.currentThread().getName() + "獲取鎖成功,趕緊干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本節點已不在了...");} else {zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "刪除本節點:"+selfPath);zk.close();                                    			}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}  } catch ( Exception e) {  e.printStackTrace();  }  } }}, new Stat());}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(Thread.currentThread().getName()+"子節點中,排在我前面的"+waitPath+"已失蹤,幸福來得太突然?");return checkMinPath(zk, selfPath);}else{throw e;}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}return false;}protected static void dosomething() {System.out.println("我正在獨享資源互斥地進行工作。。。");}
}

面向對象重構版:

package distributedLockObject;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;public class AbstractZooKeeper implements Watcher{protected ZooKeeper zookeeper;protected CountDownLatch countDownLatch = new CountDownLatch(1);public ZooKeeper connect(String hosts, int SESSION_TIMEOUT) throws IOException, InterruptedException{zookeeper = new ZooKeeper(hosts, SESSION_TIMEOUT, this);countDownLatch.await();System.out.println("AbstractZooKeeper.connect()");return zookeeper;}public void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}public void close() throws InterruptedException{zookeeper.close();}
}

package distributedLockObject;import java.util.Collections;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DistributedLock    {private ZooKeeper zk = null;private String selfPath;private String waitPath;private String LOG_PREFIX_OF_THREAD=Thread.currentThread().getName();private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);private Watcher  watcher;public DistributedLock(ZooKeeper  zk ) {this.zk = zk; }public Watcher getWatcher() {return watcher;}public void setWatcher(Watcher watcher) {this.watcher = watcher;}/*** 獲取鎖* @return*/public boolean  getLock()  throws KeeperException, InterruptedException {selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD+"創建鎖路徑:"+selfPath);if(checkMinPath()){return true;}return false;}/*** 創建節點* @param path 節點path* @param data 初始數據內容* @return*/public boolean createPath( String path, String data  ) throws KeeperException, InterruptedException {if(zk.exists(path, false)==null){LOG.info( LOG_PREFIX_OF_THREAD + "節點創建成功, Path: "+ this.zk.create( path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + data );}return true;}public void unlock(){try {if(zk.exists(this.selfPath,false) == null){LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了...");return;}zk.delete(this.selfPath, -1);LOG.info(LOG_PREFIX_OF_THREAD + "刪除本節點:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 檢查自己是不是最小的節點* @return*/public  boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了..."+selfPath);return false;}case 0:{LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,我果然是老大"+selfPath);return true;}default:{this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD+"獲取子節點中,排在我前面的"+waitPath);try{zk.getData(waitPath, this.watcher, new Stat());return false;}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,排在我前面的"+waitPath+"已失蹤,幸福來得太突然?");return checkMinPath();}else{throw e;}}}}}public String getWaitPath() {return waitPath;}}

package distributedLockObject;public interface DoTemplate {void dodo();
}

package distributedLockObject;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;public class LockService {//確保所有線程運行結束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);private static final String GROUP_PATH = "/disLocks";private static final int SESSION_TIMEOUT = 10000;AbstractZooKeeper az = new AbstractZooKeeper();public void doService(DoTemplate doTemplate){try {ZooKeeper zk = az.connect(CONNECTION_STRING,SESSION_TIMEOUT);DistributedLock dc = new DistributedLock(zk);LockWatcher lw = new LockWatcher(dc,doTemplate);dc.setWatcher(lw);//GROUP_PATH不存在的話,由一個線程創建即可;dc.createPath(GROUP_PATH, "該節點由線程"+Thread.currentThread().getName() + "創建");boolean rs = dc.getLock();if (rs==true) {lw.dosomething();dc.unlock();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}

package distributedLockObject;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LockWatcher implements Watcher{private static final Logger LOG = LoggerFactory.getLogger(LockWatcher.class);private DistributedLock distributedLock;private  DoTemplate doTemplate;public LockWatcher(DistributedLock distributedLock,DoTemplate doTemplate) {// TODO Auto-generated constructor stubthis.distributedLock = distributedLock;this.doTemplate = doTemplate;}@Overridepublic void process(WatchedEvent event) {// TODO Auto-generated method stubif (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(distributedLock.getWaitPath())) {  LOG.info(Thread.currentThread().getName()+ "收到情報,排我前面的家伙已掛,我是不是可以出山了?");  try {  if(distributedLock.checkMinPath()){  dosomething();distributedLock.unlock();}  } catch ( Exception e) {  e.printStackTrace();  }  } }public   void dosomething(){LOG.info(Thread.currentThread().getName() + "獲取鎖成功,趕緊干活!");doTemplate.dodo();TestLock.threadSemaphore.countDown();}}

package distributedLockObject;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestLock {private static final Logger LOG = LoggerFactory.getLogger(TestLock.class);//確保所有線程運行結束;private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {try{new LockService().doService(new DoTemplate() {@Overridepublic void dodo() {// TODO Auto-generated method stubLOG.info("我要修改一個文件。。。。"+threadId);}});} catch (Exception e){LOG.error("【第"+threadId+"個線程】 拋出的異常:");e.printStackTrace();}}}.start();}try {
//	        	Thread.sleep(60000);threadSemaphore.await();LOG.info("所有線程運行結束!");} catch (Exception e) {e.printStackTrace();}}
}


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387130.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387130.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387130.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

01 Python變量和數據類型

Python變量和數據類型 1 數據類型 計算機&#xff0c;顧名思義就是可以做數學計算的機器&#xff0c;因此&#xff0c;計算機程序理所當然也可以處理各種數值。 但是&#xff0c;計算機能處理的遠不止數值&#xff0c;還可以處理文本、圖形、音頻、視頻、網頁等各種各樣的數…

初識Python-1

1&#xff0c;計算機基礎。 2&#xff0c;python歷史。 宏觀上&#xff1a;python2 與 python3 區別&#xff1a; python2 源碼不標準&#xff0c;混亂&#xff0c;重復代碼太多&#xff0c; python3 統一 標準&#xff0c;去除重復代碼。 3&#xff0c;python的環境。 編譯型&…

02 List、Tuple、Dict、Set

List 線性表 創建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各種類型的數據 >>> empty_list [] #空List 按索引訪問List&#xff1a; >>> print L[0] #索引從0開始…

Jenkins的一些代碼

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代碼拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架實現對不同文件夾的多文件上傳

利用layui前端框架實現對不同文件夾的多文件上傳 問題場景&#xff1a; 普通的input標簽實現多文件上傳時&#xff0c;只能對同一個文件夾下的多個文件進行上傳&#xff0c;如果要同時上傳兩個或多個文件夾下的文件&#xff0c;是無法實現的。這篇文章就是利用layui中的插件&am…

ps、grep和kill聯合使用殺掉進程

ps、grep和kill聯合使用殺掉進程例如要殺掉hello這個進程&#xff0c;使用下面這個命令就能直接實現。ps -ef |grep hello |awk {print $2}|xargs kill -9這里是輸出ps -ef |grep hello 結果的第二列的內容然后通過xargs傳遞給kill -9,其實第二列內容就是hello的進程號&#xf…

03 控制語句

if語句 if age > 18 print your age is, age else print teenager Python代碼的縮進規則&#xff1a;具有相同縮進的代碼被視為代碼塊。 if age > 18 print adult elif age > 6 print teenager elif age > 3 print kid else print baby for循環 L [Adam, L…

yum 來安裝 nodejs

要通過 yum 來安裝 nodejs 和 npm 需要先給 yum 添加 epel 源&#xff0c;添加方法在 centos 添加epel和remi源 中##添加 epel 源 64位: rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm32位: rpm -ivh http://download.fedoraproj…

yzh的神仙題

U66905 zz題 考慮一個點權值被計算了多少次。。。不知 所以對未來承諾&#xff0c;方便直接算上總數&#xff01; 然后其實是給邊定向&#xff0c;即先刪除fa和son的哪一個 f[x][j]&#xff0c;會計算j次 無法轉移 f[x][j][k]&#xff0c;其中會從子樹計算k次。 當邊從兒子指向…

04 函數

內置函數 Python內置了很多有用的函數&#xff0c;可以直接調用。 要調用一個函數&#xff0c;需要知道函數的名稱和參數。 可以直接從Python的官方網站查看文檔&#xff1a;http://docs.python.org/2/library >>> abs(-20) >>> help(abs) >>>…

iview render的時候可以寫控件的基本格式

render: (h, params) > {return h(div, [h(Button, {props: {type: id,size: small},style: {marginRight: 5px},on: {click: () > {this.pojectshow(this.datatable[params.index].id)}}}, 詳情),h(Button, {props: {type: id,size: small},style: {marginRight: 5px},o…

ES6基本使用

var let 度可用于聲明變量. 區別&#xff1a;1、let&#xff1a;只在let命令所在代碼塊內有效 2、let 不存在變量提升&#xff08;內部影響不到外部&#xff09; var b [];for(var j0;j<10;j){let dj;b[j]function(){console.log(d);};}b[3]() //3 3、let 不允許在相同作用…

Axios的Vue插件(添加全局請求/響應攔截器)

/*** file Axios的Vue插件&#xff08;添加全局請求/響應攔截器&#xff09;*/// https://github.com/mzabriskie/axios import axios from axios// 攔截request,設置全局請求為ajax請求 axios.interceptors.request.use((config) > {config.headers[X-Requested-With] XML…

05 切片、迭代、列表生成

切片 >>> L [Adam, Lisa, Bart, Paul] >>> L[0:3] #取前3個元素 >>> L[:3] >>> L[1:3] >>> L[:] >>> L[::2] #第三個參數表示每2個元素取一個元素&#xff0c;也就是隔一個取一個 [Adam,Bart] >>>…

一個例子徹底搞懂C++的虛函數和純虛函數

學習C的多態性&#xff0c;你必然聽過虛函數的概念&#xff0c;你必然知道有關她的種種語法&#xff0c;但你未必了解她為什么要那樣做&#xff0c;未必了解她種種行為背后的所思所想。深知你不想在流于表面語法上的蜻蜓點水似是而非&#xff0c;今天我們就一起來揭開擋在你和虛…

利用Caffe實現mnist的數據訓練

阿里云的參考文檔&#xff1a;https://help.aliyun.com/document_detail/49571.html在文檔里提供了caffe的一個案例&#xff0c;利用Caffe實現mnist的數據訓練。準備的數據源可以在“深度學習案例代碼及數據下載”頁找到Caffe數據下載并解壓。要訓練自己的圖片&#xff0c;還是…

06 函數式編程

1 函數式編程簡介 函數&#xff1a;function 函數式&#xff1a;functional 一種編程范式 特點&#xff1a; 把計算視為函數而非指令 純函數式編程&#xff1a;不需要變量&#xff0c;沒有副作用&#xff0c;測試簡單 支持高階函數&#xff0c;代碼簡潔 Python支持的函數式…

Android SDK開發

目前我們的應用內使用了 ArcFace 的人臉檢測功能&#xff0c;其他的我們并不了解&#xff0c;所以這里就和大家分享一下我們的集成過程和一些使用心得 集成 ArcFace FD 的集成過程非常簡單 在 ArcFace FD 的文檔上有說明支持的系統為 5.0 及以上系統&#xff0c;但其實在 4.4 系…

jQuery WeUI 上傳

jQuery WeUI 是專為微信公眾賬號開發而設計的一個框架&#xff0c;jQuery WeUI的官網&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公眾號網頁添加上傳圖片功能 技術選型&#xff1a;實現上傳圖片功能可選百度的WebUploader、餓了么的Element和微信的jQuery WeUI…

07 模塊

模塊和包的概念 等同于java中的Package 模塊名文件名&#xff08;無后綴&#xff09; 在文件系統中&#xff0c;包就是文件夾&#xff0c;模塊就是xxx.py文件 每層包下面都有__init__.py文件 導入模塊 >>> import math >>> math.pow(2, 0.5) >>…