大綱
1.zk實現數據發布訂閱
2.zk實現負載均衡
3.zk實現分布式命名服務
4.zk實現分布式協調(Master-Worker協同)
5.zk實現分布式通信
6.zk實現Master選舉
7.zk實現分布式鎖
8.zk實現分布式隊列和分布式屏障
4.zk實現分布式協調(Master-Worker協同)
(1)Master-Worker架構
(2)Master-Worker架構示例—HBase
(3)Master-Worker架構示例—Kafka
(4)Master-Worker架構示例—HDFS
(5)如何使用zk實現Master-Worker
(1)Master-Worker架構
Master-Work是一個廣泛使用的分布式架構,系統中有一個Master負責監控Worker的狀態,并為Worker分配任務。
說明一:在任何時刻,系統中最多只能有一個Master。不可以出現兩個Master,多個Master共存會導致腦裂。
說明二:系統中除了Active狀態的Master還有一個Bakcup的Master。如果Active失敗,Backup可以很快進入Active狀態。
說明三:Master實時監控Worker的狀態,能夠及時收到Worker成員變化的通知。Master在收到Worker成員變化通知時,會重新進行任務分配。
(2)Master-Worker架構示例—HBase
HBase采用的就是Master-Worker的架構。HMBase是系統中的Master,HRegionServer是系統中的Worker。HMBase會監控HBase Cluster中Worker的成員變化,HMBase會把region分配給各個HRegionServer。系統中有一個HMaster處于Active狀態,其他HMaster處于備用狀態。
(3)Master-Worker架構示例—Kafka
一個Kafka集群由多個Broker組成,這些Borker是系統中的Worker,Kafka會從這些Worker選舉出一個Controller。這個Controlle是系統中的Master,負責把Topic Partition分配給各個Broker。
(4)Master-Worker架構示例—HDFS
HDFS采用的也是一個Master-Worker的架構。NameNode是系統中的Master,DataNode是系統中的Worker。NameNode用來保存整個分布式文件系統的MetaData,并把數據塊分配給Cluster中的DataNode進行保存。
(5)如何使用zk實現Master-Worker
步驟1:使用一個臨時節點"/master"表示Master。Master在行使Master的職能之前,首先要創建這個znode。如果創建成功,進入Active狀態,開始行使Master職能。如果創建失敗,則進入Backup狀態,并使用Watcher機制監聽"/master"。假設系統中有一個Active Master和一個Backup Master。如果Active Master故障了,那么它創建的"/master"就會被zk自動刪除。這時Backup Master會收到通知,再次創建"/master"成為新Active Master。
步驟2:使用一個持久節點"/workers"下的臨時子節點來表示Worker,Worker通過在"/workers"節點下創建臨時節點來加入集群。
步驟3:處于Active狀態的Master會通過Watcher機制,監控"/workers"下的子節點列表來實時獲取Worker成員的變化。
5.zk實現分布式通信
(1)心跳檢測
(2)工作進度匯報
(3)系統調度
在大部分的分布式系統中,機器間的通信有三種類型:心跳檢測、工作進度匯報、系統調度。
(1)心跳檢測
機器間的心跳檢測是指:在分布式環境中,不同機器間需要檢測彼此是否還在正常運行。其中,心跳檢測有如下三種方法。
方法一:通常會通過機器間是否可以相互PING通來判斷對方是否正常運行。
方法二:在機器間建立長連接,通過TCP連接固有的心跳檢測機制來實現上層機器的心跳檢測。
方法三:基于zk的臨時子節點來實現心跳檢測,讓不同的機器都在zk的一個指定節點下創建臨時子節點,不同機器間可以根據這個臨時子節點來判斷對應的客戶端是否存活。基于zk的臨時節點來實現的心跳檢測,可以大大減少系統的耦合。因為檢測系統和被檢測系統之間不需要直接關聯,只需要通過zk臨時節點間接關聯。
(2)工作進度匯報
在一個任務分發系統中,任務被分發到不同的機器上執行后,需要實時地將自己的任務執行進度匯報給分發系統。
通過zk的臨時子節點來實現工作進度匯報:可以在zk上選擇一個節點,每個任務機器都在該節點下創建臨時子節點。然后通過判斷臨時子節點是否存在來確定任務機器是否存活,各個任務機器會實時地將自己的任務執行進度寫到其對應的臨時節點上,以便中心系統能夠實時獲取到任務的執行進度。
(3)系統調度
一個分布式系統由控制臺和一些客戶端系統組成,控制臺的職責是將一些指令信息發送給所有客戶端。
使用zk實現系統調度時:先讓控制臺的一些操作指令對應到zk的某些節點數據,然后讓客戶端系統注冊對這些節點數據的監聽。當控制臺進行一些操作時,便會觸發修改這些節點的數據,而zk會將這些節點數據的變更以事件通知的形式發送給監聽的客戶端。
這樣就能省去大量底層網絡通信和協議設計上的重復工作了,也大大降低了系統間的耦合,方便實現異構系統的靈活通信。
6.zk實現Master選舉
(1)通過創建臨時節點實現
(2)通過臨時順序子節點來實現
Master選舉的需求是:在集群的所有機器中選舉出一臺機器作為Master。
(1)通過創建臨時節點實現
集群的所有機器都往zk上創建一個臨時節點如"/master"。在這個過程中只會有一個機器能成功創建該節點,則該機器便成為Master。同時其他沒有成功創建節點的機器會在"/master"節點上注冊Watcher監聽,一旦當前Master機器掛了,那么其他機器就會重新往zk上創建臨時節點。
(2)通過臨時順序子節點來實現
使用臨時順序子節點來表示集群中的機器發起的選舉請求,然后讓創建最小后綴數字節點的機器成為Master。
7.zk實現分布式鎖
(1)死鎖的解決方案
(2)zk如何實現排他鎖
(3)zk如何實現共享鎖(讀寫鎖)
(4)羊群效應
(5)改進后的排他鎖
(6)改進后的共享鎖
(7)zk原生實現分布式鎖的示例
可以利用zk的臨時節點來解決死鎖問題,可以利用zk的Watcher監聽機制實現鎖釋放后重新競爭鎖,可以利用zk數據節點的版本來實現樂觀鎖。
(1)死鎖的解決方案
在單機環境下,多線程之間會產生死鎖問題。同樣,在分布式系統環境下,也會產生分布式死鎖的問題。常用的解決死鎖問題的方法有超時方法和死鎖檢測。
一.超時方法
在解決死鎖問題時,超時方法可能是最簡單的處理方式了。超時方式是在創建分布式線程時,對每個線程都設置一個超時時間。當該線程的超時時間到期后,無論該線程是否執行完畢,都要關閉該線程并釋放該線程所占用的系統資源,之后其他線程就可以訪問該線程釋放的資源,這樣就不會造成死鎖問題。
但這種設置超時時間的方法最大的缺點是很難設置一個合適的超時時間。如果時間設置過短,可能造成線程未執行完相關的處理邏輯,就因為超時時間到期就被迫關閉,最終導致程序執行出錯。
二.死鎖檢測
死鎖檢測是處理死鎖問題的另一種方法,它解決了超時方法的缺陷。死鎖檢測方法會主動檢測發現線程死鎖,在控制死鎖問題上更加靈活準確。
可以把死鎖檢測理解為一個運行在各服務器系統上的線程或方法,該方法專門用來發現應用服務上的線程是否發生死鎖。如果發生死鎖,那么就會觸發相應的預設處理方案。
(2)zk如何實現排他鎖
一.獲取鎖
獲取排他鎖時,所有的客戶端都會試圖通過調用create()方法,在"/exclusive_lock"節點下創建臨時子節點"/exclusive_lock/lock"。zk會保證所有的客戶端中只有一個客戶端能創建臨時節點成功,從而獲得鎖。沒有創建臨時節點成功的客戶端也就沒能獲得鎖,需要到"/exclusive_lock"節點上,注冊一個子節點變更的Watcher監聽,以便可以實時監聽lock節點的變更情況。
二.釋放鎖
如果獲取鎖的客戶端宕機,那么zk上的這個臨時節點(lock節點)就會被移除。如果獲取鎖的客戶端執行完,也會主動刪除自己創建的臨時節點(lock節點)。
(3)zk如何實現共享鎖(讀寫鎖)
一.獲取鎖
獲取共享鎖時,所有客戶端會到"/shared_lock"下創建一個臨時順序節點。如果是讀請求,那么就創建"/shared_lock/read001"的臨時順序節點。如果是寫請求,那么就創建"/shared_lock/write002"的臨時順序節點。
二.判斷讀寫順序
步驟一:客戶端在創建完節點后,會獲取"/shared_lock"節點下的所有子節點,并對"/shared_lock"節點注冊子節點變更的Watcher監聽。
步驟二:然后確定自己的節點序號在所有子節點中的順序(包括讀節點和寫節點)。
步驟三:對于讀請求:如果沒有比自己序號小的寫請求子節點,或所有比自己小的子節點都是讀請求,那么表明可以成功獲取共享鎖。如果有比自己序號小的子節點是寫請求,那么就需要進入等待。對于寫請求:如果自己不是序號最小的子節點,那么就需要進入等待。
步驟四:如果客戶端在等待過程中接收到Watcher通知,則重復步驟一。
三.釋放鎖
如果獲取鎖的客戶端宕機,那么zk上的對應的臨時順序節點就會被移除。如果獲取鎖的客戶端執行完,也會主動刪除自己創建的臨時順序節點。
(4)羊群效應
一.排他鎖的羊群效應
如果有大量的客戶端在等待鎖的釋放,那么就會出現大量的Watcher通知。然后這些客戶端又會發起創建請求,但最后只有一個客戶端能創建成功。這個Watcher事件通知其實對絕大部分客戶端都不起作用,極端情況可能會出現zk短時間向其余客戶端發送大量的事件通知,這就是羊群效應。出現羊群效應的根源在于:沒有找準客戶端真正的關注點。
二.共享鎖的羊群效應
如果有大量的客戶端在等待鎖的釋放,那么不僅會出現大量的Watcher通知,還會出現大量的獲取"/shared_lock"的子節點列表的請求,但最后大部分客戶端都會判斷出自己并非是序號最小的節點。所以客戶端會接收過多和自己無關的通知和發起過多查詢節點列表的請求,這就是羊群效應。出現羊群效應的根源在于:沒有找準客戶端真正的關注點。
(5)改進后的排他鎖
使用臨時順序節點來表示獲取鎖的請求,讓創建出后綴數字最小的節點的客戶端成功拿到鎖。
步驟一:首先客戶端調用create()方法在"/exclusive_lock"下創建一個臨時順序節點。
步驟二:然后客戶端調用getChildren()方法返回"/exclusive_lock"下的所有子節點,接著對這些子節點進行排序。
步驟三:排序后,看看是否有后綴比自己小的節點。如果沒有,則當前客戶端便成功獲取到排他鎖。如果有,則調用exist()方法對排在自己前面的那個節點注冊Watcher監聽。
步驟四:當客戶端收到Watcher通知前面的節點不存在,則重復步驟二。
(6)改進后的共享鎖
步驟一:客戶端調用create()方法在"/shared_lock"節點下創建臨時順序節點。如果是讀請求,那么就創建"/shared_lock/read001"的臨時順序節點。如果是寫請求,那么就創建"/shared_lock/write002"的臨時順序節點。
步驟二:然后調用getChildren()方法返回"/shared_lock"下的所有子節點,接著對這些子節點進行排序。
步驟三:對于讀請求:如果排序后發現有比自己序號小的寫請求子節點,則需要等待,且需要向比自己序號小的最后一個寫請求子節點注冊Watcher監聽。對于寫請求:如果排序后發現自己不是序號最小的子節點,則需要等待,并且需要向比自己序號小的最后一個請求子節點注冊Watcher監聽。注意:這里注冊Watcher監聽也是調用exist()方法。此外,不滿足上述條件則表示成功獲取共享鎖。
步驟四:如果客戶端在等待過程中接收到Watcher通知,則重復步驟二。
(7)zk原生實現分布式鎖的示例
一.分布式鎖的實現步驟
步驟一:每個線程都通過"臨時順序節點 + zk.create()方法 + 添加回調"去創建節點。
步驟二:線程執行完創建臨時順序節點后,先通過CountDownLatch.await()方法進行阻塞。然后在創建成功的回調中,通過zk.getChildren()方法獲取根目錄并繼續回調。
步驟三:某線程在獲取根目錄成功后的回調中,會對目錄排序。排序后如果發現其創建的節點排第一,那么就執行countDown()方法不再阻塞,表示獲取鎖成功。排序后如果發現其創建的節點不是第一,則通過zk.exists()方法監聽前一節點。
步驟四:獲取到鎖的線程會通過zk.delete()方法來刪除其對應的節點實現釋放鎖。在等候獲取鎖的線程掉線時其對應的節點也會被刪除。而一旦節點被刪除,那些監聽根目錄的線程就會重新zk.getChildren()方法,獲取成功后其回調又會進行排序以及通過zk.exists()方法監聽前一節點。
二WatchCallBack對分布式鎖的具體實現
public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {ZooKeeper zk ;String threadName;CountDownLatch countDownLatch = new CountDownLatch(1);String pathName;public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}public void tryLock() {try {System.out.println(threadName + " create....");//創建一個臨時的有序的節點zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}//當前線程釋放鎖, 刪除節點public void unLock() {try {zk.delete(pathName, -1);System.out.println(threadName + " over work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//上面zk.create()方法的回調//創建臨時順序節點后的回調, 10個線程都能同時創建節點//創建完后獲取根目錄下的子節點, 也就是這10個線程創建的節點列表, 這個不用watch了, 但獲取成功后要執行回調//這個回調就是每個線程用來執行節點排序, 看誰是第一就認為誰獲得了鎖@Overridepublic void processResult(int rc, String path, Object ctx, String name) {if (name != null ) {System.out.println(threadName + " create node : " + name );setPathName(name);//一定能看到自己前邊的, 所以這里的watch要是falsezk.getChildren("/", false, this ,"sdf");}}//核心方法: 各個線程獲取根目錄下的節點時, 上面zk.getChildren("/", false, this ,"sdf")的回調@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {//一定能看到自己前邊的節點System.out.println(threadName + "look locks...");for (String child : children) {System.out.println(child);}//根目錄下的節點排序Collections.sort(children);//獲取當前線程創建的節點在根目錄中排第幾int i = children.indexOf(pathName.substring(1));//是不是第一個, 如果是則說明搶鎖成功; 如果不是, 則watch當前線程創建節點的前一個節點是否被刪除(刪除);if (i == 0) {System.out.println(threadName + " i am first...");try {//這里的作用就是不讓第一個線程獲得鎖釋放鎖跑得太快, 導致后面的線程還沒建立完監聽第一個節點就被刪了zk.setData("/", threadName.getBytes(), -1);countDownLatch.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else {//9個沒有獲取到鎖的線程都去調用zk.exists, 去監控各自自己前面的節點, 而沒有去監聽父節點//如果各自前面的節點發生刪除事件的時候才回調自己, 并關注被刪除的事件(所以會執行process回調)zk.exists("/" + children.get(i-1), this, this, "sdf");}}//上面zk.exists()的監聽//監聽的節點發生變化的Watcher事件監聽@Overridepublic void process(WatchedEvent event) {//如果第一個獲得鎖的線程釋放鎖了, 那么其實只有第二個線程會收到回調事件//如果不是第一個哥們某一個掛了, 也能造成他后邊的收到這個通知, 從而讓他后邊那個去watch掛掉這個哥們前邊的, 保持順序switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zk.getChildren("/", false, this ,"sdf");break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {//TODO}
}
三.分布式鎖的測試類
public class TestLock {ZooKeeper zk;@Beforepublic void conn () {zk = ZKUtils.getZK();}@Afterpublic void close () {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {//10個線程都去搶鎖for (int i = 0; i < 10; i++) {new Thread() {@Overridepublic void run() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);String threadName = Thread.currentThread().getName();watchCallBack.setThreadName(threadName);//每一個線程去搶鎖watchCallBack.tryLock();//搶到鎖之后才能干活System.out.println(threadName + " working...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//干完活釋放鎖watchCallBack.unLock();}}.start();}while(true) {}}
}
8.zk實現分布式隊列和分布式屏障
(1)分布式隊列的實現
(2)分布式屏障的實現
(1)分布式隊列的實現
步驟一:所有客戶端都到"/queue"節點下創建一個臨時順序節點。
步驟二:通過調用getChildren()方法來獲取"/queue"節點下的所有子節點。
步驟三:客戶端確定自己的節點序號在所有子節點中的順序。
步驟四:如果自己不是序號最小的子節點,那么就需要進入等待,同時調用exists()方法向比自己序號小的最后一個節點注冊Watcher監聽。
步驟五:如果客戶端收到Watcher事件通知,重復步驟二。
(2)分布式屏障的實現
"/barrier"節點是一個已存在的默認節點,"/barrier"節點的值是數字n,表示Barrier值,比如10。
步驟一:首先,所有客戶端都需要到"/barrier"節點下創建一個臨時節點。
步驟二:然后,客戶端通過getData()方法獲取"/barrier"節點的數據內容,比如10。
步驟三:接著,客戶端通過getChildren()方法獲取"/barrier"節點下的所有子節點,同時注冊對子節點列表變更的Watcher監聽。
步驟四:如果客戶端發現"/barrier"節點的子節點個數不足10個,那么就需要進入等待。
步驟五:如果客戶端接收到了Watcher事件通知,那么就重復步驟三。