zookeeper實際應用場景
zookeeper能夠實現哪些場景
1)訂閱發布/配置中心
watcher機制
統一配置管理(disconf)
實現配置信息的集中式原理和數據的動態更新
實現配置中心有倆種模式:push,pull
長輪詢
zookeeper采用的是推拉相結合的方式。客戶端向服務器端注冊自己需要關注的節點。一旦節點數據發生變化,name服務器端會向客戶端發送watcher事件通知。客戶端收到通知后,主動到服務器端獲取更新后的數據。
a 數據量比較小
b 數據內容在運行時發生動態變更
c 集群中的各個機器共享變量
2)分布式鎖
2.1 redis setNX 存在則會返回0 不存在則返回數據
2.2 數據庫 創建一個表 通過唯一索引的方式
create table(id,methodname..) methodname增加唯一索引
insert 一條數據 xxx delete 刪除數據
mysql 有innodb來設置表鎖或者行鎖
2.3 zookeeper 有序節點
排他鎖
3)負載均衡
請求/數據分攤多個計算單元
4)ID生成器
5)分布式隊列
activeMQ kafka
a 先進先出隊列
getchildren獲取指定根節點下面的子節點
確定自己節點在子節點中的順序
如果自己不是最小的子節點,監聽比自己小的上一個子節點,否則處于等待 接受watcher通知,重復流程
b Barrier模式 =阻礙模式= 圍欄模型 滿足條件才會觸發執行
7)master選舉
7*24小時可用 99.999%可用
master-slave模式
master出現故障 slave上位作為master 心跳機制去維持狀態 腦裂
1)分布式鎖實現
package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ZookeeperClient {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";private static int sessionTimeOut = 5000;//獲取連接public static ZooKeeper getInstance() throws IOException, InterruptedException {final CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent paramWatchedEvent) {if (paramWatchedEvent.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();}}});countDownLatch.await();return zooKeeper;}public static int getSessionTimeOut() {return sessionTimeOut;}
}
復制代碼
package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;public class DistributeLock {private static final String ROOT_LOCKS = "/LOCKS"; // 根節點private ZooKeeper zooKeeper;private int sessionTimeOut = 5000;// 會話超時時間private String lockID;// 記錄鎖節點IDprivate CountDownLatch countDownLatch = new CountDownLatch(1);private final static byte[] data = { 1, 2 };public DistributeLock() throws IOException, InterruptedException {this.zooKeeper = ZookeeperClient.getInstance();this.sessionTimeOut = ZookeeperClient.getSessionTimeOut();}// 獲取鎖的方法public boolean lock() {try {lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName() + "->成功創建lock節點[" + lockID + "]開始去競爭鎖");List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);// 排序從小到大SortedSet<String> sortedSet = new TreeSet<String>();for (String children : childrenNodes) {sortedSet.add(ROOT_LOCKS + "/" + children);}String first = sortedSet.first();// 拿到最小的節點if (lockID.equals(first)) {// 表示當前就是最小的System.out.println(Thread.currentThread().getName() + "success get lock,lock節點[" + lockID + "]");return true;}SortedSet<String> lessThanLockID = sortedSet.headSet(lockID);if (!lessThanLockID.isEmpty()) {String preLockId = lessThanLockID.last(); // 拿到比當前lockid這個節點更小的上一個節點zooKeeper.exists(preLockId, new LockWatcher(countDownLatch));countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);// 上面這段代碼意味著如果會話超時或者節點被刪除System.out.println(Thread.currentThread().getName() + "成功獲取鎖,lock節點[" + lockID + "]");}return true;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return false;}// 釋放鎖的方法public boolean unlock() {System.out.println(Thread.currentThread().getName() + "-->開始釋放鎖:[" + lockID + "]");try {zooKeeper.delete(lockID, -1);System.out.println("節點[" + lockID + "]被成功刪除");return true;} catch (Exception e) {e.getStackTrace().toString();}return false;}public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(10);Random random = new Random();for (int i = 0; i < 10; i++) {new Thread(() -> {DistributeLock lock = null;try {lock = new DistributeLock();latch.countDown();latch.await();lock.lock();Thread.sleep(random.nextInt(500));} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}).start();}}
}
復制代碼
package com.lulf.DistrubuteLock.zkclient;import java.io.Serializable;public class UserCenter implements Serializable{/*** */private static final long serialVersionUID = -4060228979536051295L;private int m_id;//機器信息private String mc_name; //機器名稱public int getM_id() {return m_id;}public void setM_id(int m_id) {this.m_id = m_id;}public String getMc_name() {return mc_name;}public void setMc_name(String mc_name) {this.mc_name = mc_name;}
}
復制代碼
package com.lulf.DistrubuteLock.zkclient;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;/*** 選主服務* * @author lulf**/
public class MasterSelector {private ZkClient client;private final static String MASTER_PATH = "/master";// 需要爭搶的節點private IZkDataListener dataListener;// 注冊節點內容發生變化private UserCenter server; // 其他服務器private UserCenter master; // master節點private static boolean isrunning = false;ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);public MasterSelector(UserCenter server,ZkClient client) {this.server = server;this.client=client;this.dataListener = new IZkDataListener() {@Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 節點如果被刪除,發起一個選主操作chooseMaster();}@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}};}public void start() {// 開始選舉if(!isrunning){isrunning=true;client.subscribeDataChanges(MASTER_PATH, dataListener);//注冊節點時間chooseMaster();}}public void stop() {// 停止if(isrunning){isrunning=false;scheduledExecutorService.shutdown();client.unsubscribeDataChanges(MASTER_PATH, dataListener);//取消訂閱releaseMaster();}}// 具體選主的服務private void chooseMaster() {if (!isrunning) {System.out.println("當前服務沒有啟動。。。");return;}try {client.createEphemeral(MASTER_PATH, server);master = server;// 把server節點賦值給masterSystem.out.println(master.getMc_name() + "-->我已經是master,開始領導你們");// 定時器// master釋放鎖,出現故障scheduledExecutorService.schedule(() -> {releaseMaster();}, 5, TimeUnit.SECONDS);//每5秒釋放一次鎖} catch (ZkNodeExistsException e) {e.getStackTrace().toString();//表示master已經存在UserCenter userCenter=client.readData(MASTER_PATH, true);if(userCenter==null){chooseMaster();//再次獲取master}else{master=userCenter;}}}private void releaseMaster() {// 釋放鎖(故障模擬)//判斷當前是否是master,只有master才需要釋放鎖if(checkMaster()){client.delete(MASTER_PATH, -1);//刪除}}private boolean checkMaster() {// 判斷當前的server是否是masterUserCenter userCenter=client.readData(MASTER_PATH);if(userCenter.getMc_name().equals(server.getMc_name())){master=userCenter;return true;}return false;}
}
復制代碼
package com.lulf.DistrubuteLock.zkclient;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;public class MasterChooseDemo {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";public static void main(String[] args) {List<MasterSelector>selectorList=new ArrayList<MasterSelector>();try { for (int i = 0; i < 10; i++) {ZkClient zkClient=new ZkClient(CONNECTSTRING, 5000,5000,new SerializableSerializer());UserCenter userCenter=new UserCenter();userCenter.setM_id(i);userCenter.setMc_name("lulf_"+i);MasterSelector selector=new MasterSelector(userCenter,zkClient);selectorList.add(selector);selector.start();//觸發選舉操作TimeUnit.SECONDS.sleep(4);} } catch (Exception e) {e.getStackTrace().toString();}finally {for (MasterSelector masterSelector : selectorList) {masterSelector.stop();}}}
}
復制代碼
curator 提供應用場景封裝
curator-reciples
提供了api調用
如:master/leader選舉
分布式鎖 讀鎖 寫鎖
分布式隊列
。。。
LeaderLatch 阻塞
寫一個master
LeaderSelector 自動搶
每個應用都寫一個臨時有序節點,根據最小的節點來獲取優先點
package com.lulf.DistrubuteLock.curator;import java.util.concurrent.TimeUnit;import javax.swing.tree.ExpandVetoException;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;public class MasterSelector {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";private final static String MASTER_PATH="/curator_master_path";public static void main(String[] args) {CuratorFramework curatorFramework=CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();LeaderSelector leaderSelector=new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework arg0) throws Exception {System.out.println("獲取leader成功");TimeUnit.SECONDS.sleep(2);}});leaderSelector.autoRequeue();leaderSelector.start();//開始選舉}
}
復制代碼
zookeeper的集群角色
leader
leader是zookeeper集群的核心。
1 事務請求的唯一調度的矗立著,他需要保證事務處理的順序性
2 集群內部各個服務器的調度者
follower
1 處理客戶端非事務請求以及轉發事務請求給leader服務器
2 參與事務請求提議的proposal投票 【客戶端的一個事務請求需要半數服務投票通過才能通知leader commit,leader會發起一個提案,要求follower投票】
3 參與leader選舉的投票
observer
1 觀察zookeeper集群中最新狀態的變化,并且把這些狀態同步到observer服務器上。
2 增加observer不影響集群事務處理能力,同時能提升集群的非事務處理能力
zookeeper的集群組成
zookeeper一般是由2n+1臺服務器組成
leader選舉
1)leaderElection
2)AuthFastLeaderElection
3)FastLeaderElection
serverID :配置server集群的時候給定服務器的標識id myid
zxid:服務器它運行時產生的數據ID,zxid值越大,標識數據越新
Epoch:選舉輪次 sid
server的狀態:Looking,Following,Observering,Leading
第一次初始化啟動的時候是Looking
1)所有集群中的server都會推薦自己為leader,然后把(myid,zxid,epoch)作為廣播信息,廣播給集群中的其他server,然后等待其他服務器返回。
2)每個服務器都會接受到來自集群中的其他服務器的投票,及群眾的每個服務器在接受到投票之后,都會判斷投票的有效性
a)判斷邏輯時鐘epoch 如果epoch大于自己當前的epoch,說明自己保存的epoch是過期,更新epoch,同事clear其他服務器送過來的選舉數據,判斷是否需要更新當前自己的選舉情況
b)如果Epoch小于目前的epoch,說明對方的epoch過期,意味著對方服務器的選舉輪次是過期的,只需要把自己的信息發送給對方
c)如果接收到的epoch等于當前的epoch,根據規則來判斷是否有資格獲得leader 接受到來自其他服務器的投票后,針對每一個投標,都需要將別人的投票和自己的投票進行pk
ZXID最大的服務器優先
3)統計投票
ZAB協議
拜占庭問題
paxos協議主要就是如何保證在分布式網絡環境下,各個服務器如何達成一致最終保證數據的一致性問題。
ZAB協議,基于paxos協議的一個改進。
ZAB協議為分布式協調服務zookeeper專門設計的一種支持奔潰恢復的原子廣播協議。
zookeeper并沒有完全采用paxos算法,而是采用zab zookeeper stomic broadcast zab協議的原理:
1)在zookeeper的主備模式下,通過zab協議來保證集群中的各個副本數據的一致性
2)zookeeper是使用單一的主進程來接受并處理所有的事務請求,并采用zab協議,把數據的狀態變更以事務請求的形式廣播到其他節點
3)zab協議在主備模型架構中保證了同一時刻只能有一個主進程來廣播服務器的狀態變更
4)所有的事務請求必須由全局唯一的服務器來協調處理,這個服務器叫leader,其他叫follower
leader節點主要是負責把客戶端的請求轉化為一個事務提議(proposal),并且分發給集群中的所有follower節點,再等待所有follower節點反饋,一旦超過半數服務器進行了正確的反饋,nameleader就會commit這個消息。
原子廣播
zab協議的工作原理
1)什么情況下zab協議會進入奔潰恢復模式
a 當服務器啟動時
b 當leader服務器出現網絡中斷 奔潰 重啟的情況
c 集群中已經不存在過半的服務器與該leader保持正常通行
2)zab協議進入奔潰恢復模式會做什么
a 當leader出現問題,zab協議進入奔潰恢復模式,并且選舉出新的leader。當新的leader選舉出來以后,如果集群中已經有過半機器完成了leader服務器的狀態同步(數據同步),退出崩潰恢復。進入消息廣播模式
b 當新的機器加入到集群中的時候,如果已經存在leader服務器,那么新加入的服務器就會自覺進入數據恢復模式,找到leader進行數據同步
問題:
假設一個事務在leader服務器被提交了,并且已經有過半的follower返回了ack。在leader節點把commit消息發送給follower機器之前,leader服務器掛了怎么辦?
zab協議,一定需要保證已經被leader提交的事務也能夠被所有follower提交。
zab協議需要協議,在奔潰恢復過程中跳過那些已經被丟棄的事務。