1. ZooKeeper基本概念
Zookeeper官網:https://zookeeper.apache.org/index.html
- Zookeeper是Apache Hadoop項目中的一個子項目,是一個樹形目錄服務
- Zookeeper翻譯過來就是動物園管理員,用來管理Hadoop(大象)、Hive(蜜蜂)、Pig(小豬)的管理員,簡稱zk
- Zookeeper的本質是一個分布式的、開源的、提供分布式應用程序協調服務的組件
- Zookeeper提供的主要功能有:
- 配置管理
- 分布式鎖
- 集群管理
2. ZooKeeper常用命令
2.1 ZooKeeper數據模型
在正式介紹Zookeeper的常用命令之前,我們先來了解一下Zookeeper的相關數據模型:
- Zookeeper的是一個樹形目錄服務,其數據模型與unix文件系統目錄樹類似,是一個層次化的結構
- 這里面的每一個節點都被稱為ZNode,每個節點上都會保存自己的數據以及元數據信息
- 節點也可以擁有子節點,同時允許少量數據(1MB)存儲在該節點之下
- 節點類型大致可以分為如下四類:
- PERSISTENT:持久化節點
- EPHEMERAL:臨時節點 -e
- PERSISTENT_SEQUENTIAL:持久化順序節點 -s
- EPHEMERAL_SEQUENTIAL:臨時順序節點 -e -s
2.2 ZooKeeper常用命令
Zookeeper是一個常見的客戶端-服務器模型,我們可以使用命令行或者JavaAPI的方式充當客戶端進行訪問,其架構如下圖所示:
- 服務端命令:
./zkServer.sh start
啟動zookeeper服務
./zkServer.sh status
查看zookeeper服務運行狀態
./zkServer.sh restart
重啟zookeeper服務
./zkServer.sh stop
關閉zookeeper服務
- 客戶端命令:
./zkCli.sh -server ip:port
連接指定的zookeeper服務(如連接本地可忽略選項直接使用./zkCli.sh)
quit
退出客戶端交互界面
help
查看命令幫助
ls 目錄
查看指定目錄下的znode節點
ls -s 目錄
查看節點詳細信息
create znode [value]
創建znode節點(可以攜帶data)
create znode -e [value]
創建臨時節點(會話結束后消失)create znode -s [value]
創建順序節點
get znode
查看節點攜帶數據
set znode value
設置節點數據
delete znode
刪除指定的znode節點(必須為空)
deleteall znode
刪除指定的znode節點及其子節點
2.3 ZooKeeper的JavaAPI操作
2.3.1 Curator介紹
Curator:是一個Zookeeper的Java客戶端庫
- 常見的Zookeeper Java客戶端有如下幾種:
- 原生JavaAPI
- ZkClient
- Curator
- Curator的目標就是簡化Zookeeper客戶端的使用
- Curator項目最初有Netflix公司研發,后來捐給了Apache基金會,成為頂級項目
Curator官網:http://curator.apache.org/
2.3.2 Curator API操作
2.3.2.1 建立連接
我們可以使用CuratorFrameworkFactory
靜態工廠類進行創建,可以通過如下兩種方式配置:
- 使用
newClient()
方法 - 使用
build()
方法
下面我們就給出對應兩種代碼的實現方式:
newClient:
/*** ZooKeeper測試類*/
public class ZooKeeperTest {private CuratorFramework client = null;@Beforepublic void initByNewClient() {CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",3000,3000,new ExponentialBackoffRetry(3000, 1));this.client = client;this.client.start();}
}
build:
/*** ZooKeeper測試類*/
public class ZooKeeperTest {private CuratorFramework client = null;@Beforepublic void init() {CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(3000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(3000, 1)).namespace("").build();client.start();this.client = client;}
}
其中各個配置項含義如下:
connectString
:連接字符串,配置服務器地址,格式為ip:portsessionTimeoutMs
:會話超時時間connectionTimeoutMs
:連接超時時間retryPolicy
:重試策略namespace
:設置根目錄位置
2.3.2.2 創建節點
創建節點有如下常見的四種方式:
Case1:創建節點(不攜帶數據)
@Test
public void testCreate1() throws Exception {String path = client.create().forPath("/app1");System.out.println(path);
}
Case2:創建節點(攜帶數據)
@Test
public void testCreate2() throws Exception {String path = client.create().forPath("/app2", "curator java api".getBytes());System.out.println(path);
}
Case3:創建多級節點
@Test
public void testCreate4() throws Exception {client.create().creatingParentsIfNeeded().forPath("/test/test1/test2");
}
Case4:創建節點并指定類型
@Test
public void testCreate3() throws Exception {client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");client.create().withMode(CreateMode.PERSISTENT).forPath("/app4");client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/app5");client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app6");
}
2.3.2.3 刪除節點
刪除節點有如下常見的兩種方式:
Case1:刪除節點(不含子節點)
@Test
public void testDelete() throws Exception {client.delete().forPath("/app1");
}
Case2:刪除節點(遞歸刪除子節點)
@Test
public void testDeleteAll() throws Exception {client.delete().deletingChildrenIfNeeded().forPath("/test");
}
2.3.2.4 查詢節點
查詢節點有如下常見的三種方式:
Case1:查詢子節點信息
@Test
public void testGetChildren() throws Exception {List<String> childrenList = client.getChildren().forPath("/");System.out.println(childrenList);
}
Case2:查詢節點數據
@Test
public void testGetData() throws Exception {byte[] bytes = client.getData().forPath("/app2");System.out.println("data: " + new String(bytes));
}
Case3:查詢節點詳細信息
@Test
public void testGetData3() throws Exception {Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app2");System.out.println(stat);
}
2.3.2.5 修改節點
修改節點有如下常見的兩種方式:
Case1:修改節點數據
@Test
public void testSetData() throws Exception {Stat stat = client.setData().forPath("/app1", "some data".getBytes());System.out.println(stat);
}
Case2:修改節點數據(帶有版本號)
@Test
public void testSetData2() throws Exception {Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app2");int version = stat.getVersion();System.out.println(version);client.setData().withVersion(version).forPath("/app2", "set with version".getBytes());
}
3. ZooKeeper的事件監聽機制
Watcher事件監聽機制:
- ZooKeeper允許用戶在指定節點上注冊一些Watcher,當一些特定事件發生時,ZooKeeper就會將事件通知給對其感興趣的客戶端,這是ZooKeeper提供分布式協調服務的重要特性
- ZooKeeper引入了Watcher機制來實現發布 / 訂閱功能,能夠讓多個訂閱者同時監聽某一個對象,當一個對象狀態發生變化時就會通知所有訂閱者
- ZooKeeper提供原生Watcher的方式,但是比較麻煩,因此Curator使用Cache數據結構進行了優化實現監聽機制
- Curator提供了如下三種Cache:
- NodeCache:只監聽某一個指定的節點變化
- PathChildrenCache:監控一個節點的所有子節點
- TreeCache:監控整個樹上的節點,類似于前兩者的組合
3.1 Node Cache
代碼實現:
@Test
public void testCuratorCache() throws Exception {NodeCache cache = new NodeCache(client, "/app1");cache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("監聽到節點變化...");}});cache.start();while (true) {}
}
3.2 PathChildren Cache
代碼實現:
@Test
public void testPathChildrenCache() throws Exception {PathChildrenCache cache = new PathChildrenCache(client, "/app1", true);cache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {System.out.println("監聽到子節點變化...");PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {System.out.println("監聽到子節點數據變化...");System.out.println("更新后數據: " + pathChildrenCacheEvent.getData().getData());}}});cache.start();while (true) {}
}
3.3 Tree Cache
代碼實現:
@Testpublic void testTreeCache() throws Exception {TreeCache cache = new TreeCache(client, "/app1");cache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {System.out.println("監聽到節點發生變化...");System.out.println(treeCacheEvent);}});cache.start();while (true) {}}
4. ZooKeeper分布式鎖
4.1 ZooKeeper分布式鎖原理
- 核心思想:當用戶獲取到鎖時就創建節點,使用完鎖就刪除節點
- 每當一個用戶想要獲取鎖時就在
/lock
節點下創建一個 **臨時順序 **節點 - 然后獲取
/lock
節點下的全部子節點,如果發現當前節點編號是最小的,則該節點對應的客戶端獲取到鎖,使用完鎖后,刪除該節點 - 如果發現節點編號不是最小的,則對前一個比自己小的編號節點,并注冊事件監聽器,監聽刪除事件
- 如果后續發現比自己小的節點被刪除,則客戶端會接收到來自ZooKeeper的通知,然后再次判斷所對應節點編號是否是最小的,重復上述步驟
注意:這里創建臨時節點是因為防止獲取到鎖的客戶端宕機了,進而導致鎖永遠不會被刪的情況;這是創建順序節點是方便編號的排序
Cutator提供了下面五種分布式鎖的方式:
- InterProcessMutex(分布式可重入排他鎖)
- InterProcessSemaphoreMutex(分布式不可重入排他鎖)
- InterProcessReadWriteLock(分布式讀寫鎖)
- InterProcessMutliLock(將多個鎖作為單個實體管理的容器)
- InterProcessSemaphoreV2(共享信號量)
4.2 分布式鎖實戰(模擬12306搶票)
代碼如下:
package org.example;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;public class ZooKeeperLockTest {private static int tickets = 10; // 票數public static void main(String[] args) {// 建立連接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(3000).connectionTimeoutMs(3000).retryPolicy(new ExponentialBackoffRetry(3000, 1)).namespace("").build();client.start();// 獲取分布式鎖InterProcessMutex lock = new InterProcessMutex(client, "/lock");Thread t1 = new Thread(() -> {while (true) {try {boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);if (hasLock && tickets > 0) {// 不斷搶票System.out.println("線程" + Thread.currentThread().getName() + "搶到了當前第" + tickets + "張票");tickets--;if (tickets <= 0) {break;}}} catch (Exception e) {throw new RuntimeException(e);} finally {try {lock.release();} catch (Exception e) {throw new RuntimeException(e);}}}}, "攜程");Thread t2 = new Thread(() -> {while (true) {try {boolean hasLock = lock.acquire(3, TimeUnit.SECONDS);if (hasLock && tickets > 0) {// 不斷搶票System.out.println("線程" + Thread.currentThread().getName() + "搶到了當前第" + tickets + "張票");tickets--;if (tickets <= 0) {break;}}} catch (Exception e) {throw new RuntimeException(e);} finally {try {lock.release();} catch (Exception e) {throw new RuntimeException(e);}}}}, "飛豬");t1.start();t2.start();}
}
5. ZooKeeper集群管理
Leader選舉過程:
- ServerId:服務器ID
比如有三臺服務器,編號分別是1,2,3。則編號越大在選擇算法中的權重就越大
- Zxid:數據ID
服務器中存放的數據ID越大,值越大說明更新的越頻繁,則在選擇算法中的權重就越大
- 在Leader選舉的過程中如果某臺ZooKeeper超過了半數選票,則直接當選為Leader