有段時間沒寫博客了,在整理之前寫過的一套自定義框架,并且整理好上傳值github上了,也有一些新功能還在開發,歡迎大家使用:一個好用的Http接口請求工具組件
可能今天這篇文章跟之前的比有些跳躍性,一下子就談到了Zookeeper了,不過也沒關系啦,先談談最常用,然后在慢慢看Zooeeper的其他知識。
簡單介紹
ZooKeeper致力于提供一個高性能、高可用,且具備嚴格的順序訪問控制能力的分布式協調服務,是雅虎公司創建,是Google的Chubby一個開源的實現,也是Hadoop和Hbase的重要組件。它的數據以樹形結構(類似于文件系統)儲存在內存當中由于數儲存在內存當中(并且每個節點都必須要有數據),所以拿取數據效率特別快。
分布式鎖:功能與之前并發編程中的Lock功能一致,主要是為了解決共享資源被競爭所導致的并發問題。由于并發編程當中鎖是在當前的JVM當中,而對于分布式的服務來說單純的JVM的鎖已經不起作用了,不過實現功能還是一致。
監聽機制
既然是鎖,那么就存在線程等待以及線程被喚醒功能,所以就需要有一個監聽機制,當ZooKeeper上的鎖被釋放之后需監聽到,并且通知服務去獲取鎖資源,正好在ZooKeeper當中存在一種監聽機制,為事件監聽器(Watcher)
事件監聽器:客戶端可以在節點上注冊監聽器,當特定的事件發生后,ZooKeeper會通知到感興趣的客戶端;被監聽的事件有:NodeCreated(節點創建)、NodeDeleted(節點刪除)、NodeDataChanged(節點數據被改變)、NodeChildrenChange(子節點被修改)
Node類型
基于ZooKeeper分布式鎖必然基于節點,在ZooKeeper創建節點共有四點類型:
1、持久化節點(PERSISENT):?同一個節點路徑只能創建一個節點,并且連接創建節點后,斷開連接后,節點仍然存在并且關閉服務會保存至磁盤上。
2、持久化順序節點(PERSISENT_SEQUENTIAL):同一個節點路徑可以創建多個節點,并且ZooKeeper會自動分配一個按順序的節點號,斷開連接后,節點仍然會保存至磁盤。
3、臨時節點(EPHEMERAL):在一個連接中,同一路徑下只能創建一個節點,當創建節點的連接關閉后,該節點會被刪除,如果非正常關閉連接,則過一段時間后節點會被刪除。
4、臨時順序節點(EPHEMERAL_SEQUENTIAL):同一路徑下可以創建多個節點,但是節點名稱ZooKeeper會自動分配一個按順序的節點號,當連接關閉后,這些節點會被刪除。
實現方式
先引入一下zkClient的坐標
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version>
</dependency>
既然是鎖,那么必定是同一個節點,而且要先去嘗試獲取到鎖,如果沒有獲取到,那么就進入等待,并且監聽同一個節點是否被刪除(或者修改),如果刪除,則喚醒等待的線程,并且再次去獲取ZooKeeper上的鎖節點;那么整體的流程如下:
//鎖節點
public static final String PATH = "/lock";
//嘗試獲取鎖
public abstract boolean tryLock();
//等待鎖釋放
public abstract void waitLock();
//釋放鎖
public abstract void unLock();
//獲取鎖
void getLock(){if(tryLock()) {//獲取到鎖后,進行業務操作System.out.println(Thread.currentThread().getName() + " get Lock");}else {//沒有獲取則進入等待,并且監聽鎖節點是否被釋放waitLock();//再次獲取鎖getLock();}
}
?我這里是采用ZkClient進行操作ZooKeeper的,先創建一個ZkClient連接:
CountDownLatch latch = null;private static final String CONNECTION = "127.0.0.1:2181";ZkClient zkClient = new ZkClient(CONNECTION,3000);
先來看看嘗試獲取鎖可以怎樣實現:
@Override
public boolean tryLock() {try {//創建臨時節點,也可以創建持久化節點,到時候釋放節點的時候刪除就好了zkClient.createEphemeral(PATH, "1".getBytes());return true;} catch (Exception e) {//如果創建失敗,則獲取節點鎖失敗,則進入等待return false;}
}
進入等待,并且監聽鎖節點是否刪除或者修改:
@Override
public void waitLock() {//創建監聽事件IZkDataListener listen = new IZkDataListener() {public void handleDataChange(String dataPath, Object data) throws Exception {//當前方法為監聽節點修改,如果節點進行修改,那么就會執行當前方法}public void handleDataDeleted(String dataPath) throws Exception {//我這里是釋放鎖為刪除節點,刪除會執行當前方法latch.countDown();}};//注冊監聽器zkClient.subscribeDataChanges(PATH, listen);//如果ZooKeeper上存在鎖節點,那么進入等待if(zkClient.exists(PATH)) {//采用CountDownLatch等待latch = new CountDownLatch(1);try {//進入等待latch.await();} catch (InterruptedException e) {e.printStackTrace();}}//刪除監聽器zkClient.unsubscribeDataChanges(PATH, listen);
}
然后就是釋放節點了:
@Override
public void unLock() {if(zkClient != null) {System.out.println(Thread.currentThread().getName() + " unlock.. ");//刪除節點zkClient.delete(PATH);//關閉當前連接zkClient.close();}
}
這么一套流程下來,分布式鎖的功能就完成了,當這種實現的功能類似JVM鎖中的非公平鎖,即沒有先后順序所言,如果想要達到公平鎖,那么就必須得使用順序節點進行操作了。
那么分布式公平鎖監聽的節點就不是同一個節點了,而是監聽當前節點的上一個節點:
private String lockSeq = null;private String before = null;@Override
public boolean tryLock() {if(!zkClient.exists(PATH)) {try {zkClient.createPersistent(PATH, true);} catch (Throwable t) {//已經創建完畢,并發問題,拋異常處理}}if(lockSeq == null) {lockSeq = zkClient.createEphemeralSequential(PATH + "/", "1".getBytes());}List<String> children = zkClient.getChildren(PATH);if(lockSeq.equals(PATH + "/" + children.get(0))) {return true;}else {for(String str : children) {if(lockSeq.contains(str)) {break;}before = PATH + "/" + str;}System.out.println(Thread.currentThread().getName() + " before node:" + before);return false;}
}@Override
public void waitLock() {IZkDataListener listen = new IZkDataListener() {public void handleDataChange(String dataPath, Object data) throws Exception {}public void handleDataDeleted(String dataPath) throws Exception {latch.countDown();}};zkClient.subscribeDataChanges(before, listen);if(zkClient.exists(before)) {latch = new CountDownLatch(1);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}zkClient.unsubscribeDataChanges(before, listen);}@Override
public void unLock() {if(zkClient != null) {System.out.println(Thread.currentThread().getName() + " unlock.. ");System.out.println("this node:" + lockSeq + " last node:" + before);zkClient.delete(lockSeq);}
}
實現的代碼如上(一個簡單的實現方式,存在單應用并發問題,可以使用記錄線程的方式解決并發問題),大家可以各位去試一下,個人覺得非公平鎖的效率相比公平鎖來說效率要高一點點,不過對應大量的分布式服務去競爭鎖資源的話,個人建議還是使用公平鎖,避免阻塞時間過長,導致服務業務長期停滯問題。
還會存在一個問題就是臨時順序節點在關閉服務的時候ZooKeeper上會等待幾秒鐘才會刪除臨時節點,所以建議在程序中加上Hook鉤子方法進行刪除。
static {Runtime.getRuntime().addShutdownHook(new Thread(ZkClientLock::run));}private static void run() {System.out.println("關閉服務...");//刪除所有臨時順序節點,避免影響其他服務zkClient.delete(lockSeq);zkClient.close();}