文章目錄
- 前言
- 一、服務端啟動流程
- 1.1 啟動入口類:QuorumPeerMain
- 1.2 集群模式啟動核心:runFromConfig
- 1.3 QuorumPeer線程核心邏輯:run()
- 1.4 關鍵子流程:數據恢復
- 1.5 關鍵設計要點
- 二、請求處理鏈(責任鏈模式)
- 2.1 Leader服務器處理鏈
- 2.2 Follower服務器處理鏈
- 2.3 核心處理器
- 三、網絡通信層(NIOServerCnxnFactory為例)
- 3.1 核心類結構與初始化
- 3.2 核心處理流程源碼解析
- 3.3 性能優化技術
- 四、Leader選舉(FastLeaderElection)
- 五、Zab協議實現
- 5.1 主要流程源碼
- 5.2 關鍵數據結構
- 5.3 Zab協議特性實現
- 總結
前言
在分布式系統中,協調服務是構建高可用架構的基石。經過前九篇對Zookeeper基礎原理、應用場景和API的深入探討,我們終于迎來核心源碼解析的關鍵篇章。本文將深入Zookeeper最核心的運行時脈絡,揭開服務啟動、請求處理、網絡通信和一致性協議四大核心模塊的實現奧秘。
一、服務端啟動流程
啟動流程圖:
核心源碼解析:
1.1 啟動入口類:QuorumPeerMain
public class QuorumPeerMain {public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {// 解析命令行參數(通常是zoo.cfg路徑)main.initializeAndRun(args);} catch (Exception e) {LOG.error("Unexpected exception during startup", e);System.exit(2);}}protected void initializeAndRun(String[] args) throws ConfigException, IOException {// 1. 解析配置文件QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]); // 解析zoo.cfg文件}// 2. 啟動數據清理守護線程DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(),config.getSnapRetainCount(), // 保留的快照數量config.getPurgeInterval() // 清理間隔(小時));purgeMgr.start();// 3. 判斷啟動模式if (config.isDistributed()) {// 集群模式啟動runFromConfig(config);} else {// 單機模式啟動(省略)}}
}
1.2 集群模式啟動核心:runFromConfig
public void runFromConfig(QuorumPeerConfig config) throws IOException {// === 1. 初始化網絡通信層 ===ServerCnxnFactory cnxnFactory = null;if (config.getClientPortAddress() != null) {// 使用反射創建通信工廠(默認NIOServerCnxnFactory)cnxnFactory = ServerCnxnFactory.createFactory();// 配置端口和最大連接數(核心方法)cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());}// === 2. 初始化數據存儲 ===// 創建事務日志和快照文件管理器FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));// === 3. 創建QuorumPeer實例(核心線程) ===QuorumPeer quorumPeer = new QuorumPeer();// 3.1 基礎配置注入quorumPeer.setTxnFactory(txnLog); // 事務日志管理器quorumPeer.setQuorumPeers(config.getServers()); // 集群節點列表quorumPeer.setElectionType(config.getElectionAlg()); // 選舉算法quorumPeer.setMyid(config.getServerId()); // 當前節點IDquorumPeer.setTickTime(config.getTickTime()); // 心跳間隔(ms)quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());// 3.2 配置網絡層if (cnxnFactory != null) {quorumPeer.setServerCnxnFactory(cnxnFactory);}// 3.3 配置數據存儲quorumPeer.setZKDatabase(new ZKDatabase(txnLog));// 3.4 恢復數據quorumPeer.setLastLoggedZxid(txnLog.restore(quorumPeer.zkDb, quorumPeer));// === 4. 啟動QuorumPeer線程 ===quorumPeer.start(); // 啟動線程(進入run()方法)
}
1.3 QuorumPeer線程核心邏輯:run()
public void run() {while (running) {switch (getPeerState()) {case LOOKING: // 選舉狀態try {// 1. 執行Leader選舉setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception during election", e);// 異常處理...}break;case FOLLOWING: // Follower狀態try {// 2. 啟動Follower服務follower = new Follower(this, new FollowerZooKeeperServer(...));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception in follower", e);} finally {follower.shutdown();}break;case LEADING: // Leader狀態try {// 3. 啟動Leader服務leader = new Leader(this, new LeaderZooKeeperServer(...));leader.lead();} catch (Exception e) {LOG.warn("Unexpected exception in leader", e);} finally {leader.shutdown("Unexpected exception");}}}
}
1.4 關鍵子流程:數據恢復
// FileTxnSnapLog.java
public long restore(DataTree dt, Map<Long, Integer> sessions) {// 1. 從快照恢復long deserializeResult = snapLog.deserialize(dt, sessions);// 2. 從事務日志恢復FileTxnLog txnLog = new FileTxnLog(dataDir);long highestZxid = fastForwardFromEdits(dt, sessions);// 返回最大的ZXIDreturn highestZxid;
}// 快照恢復核心方法
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {// 找到最新的快照文件File snapShot = findMostRecentSnapshot();if (snapShot == null) {return -1L; // 無快照}try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snapShot))) {// 反序列化快照InputArchive ia = BinaryInputArchive.getArchive(snapIS);deserialize(dt, sessions, ia); // 將快照加載到DataTreereturn dt.lastProcessedZxid; // 返回快照對應的ZXID}
}
1.5 關鍵設計要點
分層初始化架構:
數據恢復策略:
- 先加載最新快照(snapshot.xxx文件)
- 再重放快照之后的所有事務日志(log.xxx文件)
- 使用CRC32校驗數據完整性
狀態機設計:
- LOOKING:選舉狀態,執行FastLeaderElection
- FOLLOWING:啟動Follower服務,連接Leader
- LEADING:啟動Leader服務,維護集群
資源清理機制:
- DatadirCleanupManager:定期清理舊快照和日志
- 按保留策略(默認3個快照)自動刪除歷史文件
啟動流程中的關鍵對象
對象名 | 作用描述 | 生命周期 |
---|---|---|
QuorumPeer | 集群節點主線程 | 整個運行期間 |
ServerCnxnFactory | 網絡通信服務 | 整個運行期間 |
FileTxnSnapLog | 事務日志和快照管理 | 整個運行期間 |
ZKDatabase | 內存數據庫(DataTree) | 整個運行期間 |
Follower/Leader | 角色特定行為實現 | 狀態持續期間 |
二、請求處理鏈(責任鏈模式)
2.1 Leader服務器處理鏈
// LeaderZooKeeperServer.java
protected void setupRequestProcessors() {// 創建最終處理器(實際執行操作)RequestProcessor finalProcessor = new FinalRequestProcessor(this);// 創建待應用處理器(記錄待提交提案)RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());// 創建提交處理器(保證請求順序性)CommitProcessor commitProcessor = new CommitProcessor(toBeAppliedProcessor, "CommitProcessor", getZooKeeperServer().isMatchSyncs());// 創建提案處理器(廣播提案)RequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);// 創建準備處理器(請求預處理)PrepRequestProcessor prepProcessor = new PrepRequestProcessor(this, proposalProcessor);// 構建完整處理鏈firstProcessor = new LeaderRequestProcessor(this, prepProcessor);// 啟動所有處理器線程startProcessors(new RequestProcessor[] {prepProcessor,proposalProcessor,commitProcessor,finalProcessor});
}
2.2 Follower服務器處理鏈
// FollowerZooKeeperServer.java
protected void setupRequestProcessors() {// 創建最終處理器RequestProcessor finalProcessor = new FinalRequestProcessor(this);// 創建提交處理器commitProcessor = new CommitProcessor(finalProcessor, "CommitProcessor", true);// 創建同步處理器(持久化事務日志)syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));// 構建處理鏈firstProcessor = new FollowerRequestProcessor(this, syncProcessor);// 啟動處理器線程startProcessors(new RequestProcessor[] {firstProcessor, syncProcessor, commitProcessor});
}
2.3 核心處理器
- PrepRequestProcessor:請求預處理
public void run() {try {while (true) {// 1. 從隊列獲取請求Request request = submittedRequests.take();// 2. 預處理請求(核心方法)pRequest(request);}} catch (Exception e) {handleException(this, e);}
}protected void pRequest(Request request) throws RequestProcessorException {// 請求類型檢查(1-21為合法操作碼)if (request.type < 0 || request.type > OpCode.maxOp) {throw new RequestProcessorException("Invalid op type");}try {// 3. 反序列化請求ByteBufferInputStream bbis = new ByteBufferInputStream(request.request);BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis);Record record = null;// 4. 根據操作類型反序列化不同請求switch (request.type) {case OpCode.create:record = new CreateRequest();break;case OpCode.delete:record = new DeleteRequest();break;// 其他操作類型處理...}record.deserialize(bia, "request");// 5. 權限檢查if (request.authInfo != null) {checkACL(request, record);}// 6. 生成事務頭request.hdr = new TxnHeader(request.sessionId,request.cxid,zks.getZKDatabase().getNextZxid(), // 分配全局唯一ZXIDTime.currentWallTime(),request.type);// 7. 傳遞到下一處理器nextProcessor.processRequest(request);} catch (Exception e) {// 異常處理...}
}
- SyncRequestProcessor:事務持久化
public void run() {try {int logCount = 0;while (true) {Request request = queuedRequests.take();// 1. 持久化到事務日志if (request != null) {// 寫事務日志zks.getZKDatabase().append(request);// 寫快照(按閾值觸發)if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);zks.takeSnapshot();logCount = 0;}}// 2. 傳遞給下一處理器if (nextProcessor != null) {nextProcessor.processRequest(request);}}} catch (Exception e) {// 異常處理...}
}
- ProposalRequestProcessor:提案廣播(僅Leader)
public void processRequest(Request request) {// 1. 讀請求直接傳遞if (!Request.isValid(request.type)) {nextProcessor.processRequest(request);return;}// 2. 創建提案對象Proposal p = new Proposal();p.packet = new QuorumPacket();p.request = request;// 3. 將提案加入待發送隊列synchronized (leader) {leader.addProposal(p);}// 4. 傳遞給下一處理器nextProcessor.processRequest(request);
}// Leader.addProposal實現
public void addProposal(Proposal p) {synchronized (toBeProposed) {// 添加到待提案隊列toBeProposed.add(p);// 喚醒發送線程toBeProposed.notifyAll();}
}
- CommitProcessor:提交調度器
public void run() {try {Request nextPending = null;while (true) {// 1. 檢查是否有新請求if (nextPending == null) {nextPending = queuedRequests.take();}// 2. 處理提交請求if (nextPending.type == OpCode.commit) {// 按ZXID順序提交commit(nextPending.zxid);nextPending = null;} // 3. 處理本地讀請求else if (nextPending.type == OpCode.getData) {nextProcessor.processRequest(nextPending);nextPending = null;}// 4. 寫請求放入等待隊列else {synchronized (queuedWriteRequests) {queuedWriteRequests.add(nextPending);nextPending = null;}}// 5. 檢查可提交的寫請求while (!queuedWriteRequests.isEmpty()) {Request writeReq = queuedWriteRequests.peek();// 如果該請求的ZXID已被提交if (writeReq.zxid <= lastCommitted) {queuedWriteRequests.poll();nextProcessor.processRequest(writeReq);} else {break;}}}} catch (Exception e) {// 異常處理...}
}
- FinalRequestProcessor:最終執行
public void processRequest(Request request) {// 1. 會話有效性檢查if (request.sessionId != 0) {Session session = zks.sessionTracker.getSession(request.sessionId);if (session == null) {return; // 會話已過期}}try {// 2. 執行請求操作switch (request.type) {case OpCode.create:processCreate(request);break;case OpCode.delete:processDelete(request);break;case OpCode.getData:processGetData(request);break;// 其他操作類型處理...}} catch (Exception e) {// 異常處理...}// 3. 發送響應if (request.cnxn != null) {request.cnxn.sendResponse(hdr, rsp, "response");}
}private void processCreate(Request request) {CreateRequest createReq = (CreateRequest)request.request;// 在DataTree中創建節點rsp = zks.getZKDatabase().createNode(createReq.getPath(), createReq.getData(),createReq.getAcl(),createReq.getFlags(),request.hdr.getZxid());
}
處理鏈工作流程圖:
處理器功能對比表:
處理器 | 所屬角色 | 核心職責 | 關鍵數據結構 |
---|---|---|---|
PrepRequestProcessor | Leader/Follower | 請求反序列化/ACL檢查 | RequestQueue |
SyncRequestProcessor | Leader/Follower | 事務日志持久化 | TransactionLog |
ProposalRequestProcessor | 僅Leader | 提案廣播 | ProposalQueue |
CommitProcessor | Leader/Follower | 請求提交調度 | QueuedWriteRequests |
FinalRequestProcessor | Leader/Follower | 內存數據庫操作 | DataTree/ZKDatabase |
典型問題排查:
- 請求卡住:
- 檢查CommitProcessor是否堆積大量請求
- 確認集群是否達到多數派(網絡分區?)
- ACL權限拒絕:
- PrepRequestProcessor中checkACL()拋出異常
- 檢查客戶端認證信息
- 事務日志寫入失敗:
- SyncRequestProcessor捕獲IO異常
- 檢查磁盤空間和權限
- 提案丟失:
- ProposalRequestProcessor未成功加入提案隊列
- 檢查Leader選舉狀態
三、網絡通信層(NIOServerCnxnFactory為例)
3.1 核心類結構與初始化
- 服務啟動入口:NIOServerCnxnFactory
public class NIOServerCnxnFactory extends ServerCnxnFactory {// 核心組件private SelectorThread selectorThread; // 主選擇器線程private AcceptThread acceptThread; // 接收連接線程private final ConnectionExpirer expirer; // 連接過期管理器// 配置參數private int maxClientCnxns = 60; // 最大連接數private int sessionlessCnxnTimeout; // 無會話連接超時// 初始化方法public void configure(InetSocketAddress addr, int maxcc) throws IOException {// 1. 初始化接收線程acceptThread = new AcceptThread(serverSock = ServerSocketChannel.open(),addr,selectorThread.getSelector());// 2. 配置端口參數serverSock.socket().setReuseAddress(true);serverSock.socket().bind(addr);serverSock.configureBlocking(false);// 3. 啟動線程acceptThread.start();selectorThread.start();}
}
3.2 核心處理流程源碼解析
- 連接接收線程:AcceptThread
class AcceptThread extends Thread {public void run() {while (!stopped) {try {// 1. 等待新連接SocketChannel sc = serverSock.accept();if (sc != null) {// 2. 配置連接參數sc.configureBlocking(false);sc.socket().setTcpNoDelay(true);// 3. 創建連接對象NIOServerCnxn cnxn = createConnection(sc);// 4. 注冊到選擇器selectorThread.addCnxn(cnxn);}} catch (IOException e) {LOG.warn("AcceptThread exception", e);}}}private NIOServerCnxn createConnection(SocketChannel sock) {// 初始化連接對象return new NIOServerCnxn(NIOServerCnxnFactory.this, sock, selectorThread.getSelector(),selectorThread.getNextWorker());}
}
- 選擇器線程:SelectorThread
class SelectorThread extends Thread {private final Selector selector;private final Set<NIOServerCnxn> cnxns = new HashSet<>();private final WorkerService workerPool; // I/O工作線程池public void run() {while (!stopped) {try {// 1. 選擇就緒事件selector.select();Set<SelectionKey> selected = selector.selectedKeys();// 2. 處理所有就緒事件for (SelectionKey k : selected) {if (k.isReadable() || k.isWritable()) {// 3. 獲取連接對象NIOServerCnxn c = (NIOServerCnxn) k.attachment();// 4. 提交給IOWorker處理c.getWorker().schedule(c);}}selected.clear();} catch (Exception e) {LOG.warn("SelectorThread error", e);}}}// 添加新連接void addCnxn(NIOServerCnxn cnxn) {synchronized (cnxns) {// 1. 檢查連接數限制if (cnxns.size() >= maxClientCnxns) {cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_REJECTED);return;}// 2. 注冊讀事件cnxn.register(selector);cnxns.add(cnxn);}}
}
- I/O工作線程:IOWorkRequest
class IOWorkRequest extends WorkerService.WorkRequest {private final NIOServerCnxn cnxn;public void doWork() throws InterruptedException {// 1. 處理讀事件if (cnxn.sockKey.isReadable()) {// 從通道讀取數據int rc = cnxn.sock.read(cnxn.recvBuffer);if (rc > 0) {// 反序列化請求cnxn.recvBuffer.flip();processRequest(cnxn.recvBuffer);} else if (rc < 0) {// 連接關閉cnxn.close(ServerCnxn.DisconnectReason.CLIENT_CLOSED);}}// 2. 處理寫事件if (cnxn.sockKey.isWritable()) {// 獲取待發送響應ByteBuffer bb = cnxn.outgoingQueue.poll();if (bb != null) {// 寫入通道cnxn.sock.write(bb);// 如果隊列還有數據,保持寫事件注冊if (!cnxn.outgoingQueue.isEmpty()) {cnxn.enableWrite();}}}}private void processRequest(ByteBuffer buffer) {try {// 1. 反序列化請求頭BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(buffer));RequestHeader h = new RequestHeader();h.deserialize(bia, "header");// 2. 創建請求對象Request req = new Request(cnxn, h.getSessionId(), h.getXid(), h.getType(), buffer,cnxn.getAuthInfo());// 3. 提交給處理鏈cnxn.zkServer.processRequest(req);} catch (Exception e) {LOG.error("Request processing error", e);}}
}
- 連接對象:NIOServerCnxn
class NIOServerCnxn extends ServerCnxn {final SocketChannel sock; // 底層Socket通道final SelectionKey sockKey; // 選擇鍵final IOWorker worker; // 分配的I/O工作線程// 緩沖區管理ByteBuffer recvBuffer = ByteBuffer.allocateDirect(4096);final Queue<ByteBuffer> outgoingQueue = new ConcurrentLinkedQueue<>();// 注冊選擇器void register(Selector selector) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_READ, this);}// 發送響應public void sendResponse(ReplyHeader h, Record r, String tag) {// 1. 序列化響應ByteBuffer bb = serializeResponse(h, r, tag);// 2. 加入發送隊列outgoingQueue.add(bb);// 3. 注冊寫事件enableWrite();}private void enableWrite() {int i = sockKey.interestOps();if ((i & SelectionKey.OP_WRITE) == 0) {sockKey.interestOps(i | SelectionKey.OP_WRITE);}}// 關閉連接public void close(DisconnectReason reason) {try {// 1. 取消選擇鍵if (sockKey != null) sockKey.cancel();// 2. 關閉通道sock.close();// 3. 清理會話zkServer.removeCnxn(this);} catch (IOException e) {LOG.debug("Error closing connection", e);}}
}
核心流程時序圖:
3.3 性能優化技術
- I/O工作線程池
workerPool = new WorkerService("NIOWorker", numWorkerThreads, // 默認2*CPU核心數true // 守護線程
);
避免Selector線程被阻塞。
并行處理多個連接的I/O。
- 智能事件注冊:減少不必要的Selector喚醒
// 只在有數據要寫時注冊寫事件
void enableWrite() {int i = sockKey.interestOps();if ((i & SelectionKey.OP_WRITE) == 0) {sockKey.interestOps(i | SelectionKey.OP_WRITE);}
}
- 緩沖區復用
// 接收緩沖區復用
if (!recvBuffer.hasRemaining()) {recvBuffer = ByteBuffer.allocateDirect(recvBuffer.capacity() * 2);
}
動態擴容避免頻繁分配。
大連接使用大緩沖區。
- 批量響應發送:單次系統調用發送多個響應包
void doWrite() {int batchSize = 10;while (batchSize-- > 0 && !outgoingQueue.isEmpty()) {ByteBuffer bb = outgoingQueue.poll();sock.write(bb);}
}
關鍵參數調優:
參數名 | 默認值 | 作用 | 調優建議 |
---|---|---|---|
maxClientCnxns | 60 | 單IP最大連接數 | 根據客戶端類型調整 |
clientPortAddress | 0.0.0.0:2181 | 監聽地址 | 生產環境綁定內網IP |
nioWorkerThreads | 2 * CPU核心 | I/O工作線程數 | 高并發場景增加 |
sessionlessCnxnTimeout | 10000ms | 無會話連接超時 | 防止惡意連接 |
maxResponseCacheSize | 400 | 響應緩存大小 | 根據內存調整 |
四、Leader選舉(FastLeaderElection)
算法核心:ZAB協議的選舉階段
選舉流程:
- 自增epoch(logicalclock++)
- 初始化投票:vote = (myid, zxid, epoch)
- 廣播NOTIFICATION消息
- 接收投票并統計:
// FastLeaderElection#totalOrderPredicate()
if (new_zxid > current_zxid) return true; // 優先選zxid大的
if (new_zxid == current_zxid && new_id > current_id) return true; // zxid相同時選serverId大的
- 超過半數支持則成為Leader
節點狀態轉換:
// QuorumPeer#run()
switch (getPeerState()) {case LOOKING:leaderElector.lookForLeader(); // 選舉中case FOLLOWING:follower.followLeader(); // 跟隨狀態case LEADING:leader.lead(); // 領導狀態
}
五、Zab協議實現
Zab協議流程圖解:
5.1 主要流程源碼
- 協議狀態機:QuorumPeer
public void run() {while (running) {switch (getPeerState()) {case LOOKING: // 選舉階段setCurrentVote(makeLEStrategy().lookForLeader());break;case FOLLOWING: // Follower狀態Follower follower = new Follower(this, ...);follower.followLeader(); // 包含Discovery和Sync階段break;case LEADING: // Leader狀態Leader leader = new Leader(this, ...);leader.lead(); // 包含Broadcast階段break;}}
}
- 發現階段(Discovery)- Follower實現
// Follower.java
void followLeader() throws InterruptedException {// 1. 連接LeaderconnectToLeader(leaderAddr);// 2. 發送FOLLOWERINFOQuorumPacket fInfoPacket = new QuorumPacket(Leader.FOLLOWERINFO, ...);writePacket(fInfoPacket, true);// 3. 接收Leader的LeaderInfoQuorumPacket lInfoPacket = readPacket();if (lInfoPacket.getType() != Leader.LEADERINFO) {throw new IOException("First packet should be LEADERINFO");}// 4. 解析epochlong newEpoch = lInfoPacket.getEpoch();if (newEpoch < self.getAcceptedEpoch()) {throw new IOException("Epoch less than accepted epoch");}// 5. 發送ACKEPOCHQuorumPacket ackEpochPacket = new QuorumPacket(Leader.ACKEPOCH, ...);writePacket(ackEpochPacket, true);// 6. 進入同步階段syncWithLeader(newEpoch);
}
- 同步階段(Synchronization)
// Follower.java
protected void syncWithLeader(long newEpoch) throws Exception {// 1. 接收Leader的NEWLEADER包QuorumPacket newLeaderPacket = readPacket();if (newLeaderPacket.getType() != Leader.NEWLEADER) {throw new IOException("First packet should be NEWLEADER");}// 2. 檢查是否需要同步if (self.getLastLoggedZxid() != leaderLastZxid) {// 3. 執行數據同步boolean needSnap = syncStrategy.determineSyncMethod();if (needSnap) {// 全量快照同步syncWithSnapshot(leader);} else {// 增量事務日志同步syncWithLogs(leader);}}// 4. 發送ACK給LeaderwritePacket(new QuorumPacket(Leader.ACK, ...), true);// 5. 等待Leader的UPTODATE包QuorumPacket uptodatePacket = readPacket();if (uptodatePacket.getType() != Leader.UPTODATE) {throw new IOException("Did not receive UPTODATE packet");}// 6. 進入廣播階段startFollowerThreads();
}
- 廣播階段(Broadcast)- Leader實現
// Leader.java
void lead() throws IOException, InterruptedException {// 1. 啟動ZK服務startZkServer();// 2. 等待Follower連接waitForEpochAck(self.getId(), leaderStateSummary);// 3. 發送NEWLEADER包sendNewLeader();// 4. 等待多數Follower的ACKwaitForNewLeaderAck(self.getId());// 5. 發送UPTODATE包sendUptodate();// 6. 進入廣播循環while (running) {// 7. 從隊列獲取提案Proposal p = pendingProposals.take();// 8. 廣播提案broadcastProposal(p);// 9. 等待ACKwaitForAckQuorum(p);// 10. 提交提案commit(p);}
}// 廣播提案方法
private void broadcastProposal(Proposal p) {// 構造提案包QuorumPacket proposal = new QuorumPacket(Leader.PROPOSAL, p.request.zxid,p.request.serialize(), null);// 發送給所有Followerfor (LearnerHandler f : followers) {f.queuePacket(proposal);}// 本地記錄outstandingProposals.put(p.request.zxid, p);
}
- 提案提交與ACK處理
// Leader.java
private void waitForAckQuorum(Proposal p) {synchronized (p) {while (!p.hasAllQuorums()) {// 等待ACKp.wait(rpcTimeout);}}
}// ACK處理
public void processAck(long sid, long zxid, SocketAddress followerAddr) {// 1. 獲取對應提案Proposal p = outstandingProposals.get(zxid);if (p == null) return;// 2. 添加ACKp.ackSet.add(sid);// 3. 檢查是否達到多數if (isQuorumSynced(p.ackSet)) {synchronized (p) {// 4. 滿足條件則喚醒等待線程p.notifyAll();}}
}// 提交提案
private void commit(Proposal p) {// 1. 創建提交包QuorumPacket commitPacket = new QuorumPacket(Leader.COMMIT, p.request.zxid, null, null);// 2. 廣播COMMITfor (LearnerHandler f : followers) {f.queuePacket(commitPacket);}// 3. 本地提交commitProcessor.commit(p.request);// 4. 從未完成提案中移除outstandingProposals.remove(p.request.zxid);
}
- 崩潰恢復實現
// Leader.java
protected void recovery() {// 1. 獲取最大ZXIDlong maxCommittedLog = getMaxCommittedLog();// 2. 獲取未提交提案列表List<Proposal> outstanding = getOutstandingProposals();// 3. 重建提案狀態for (Proposal p : outstanding) {// 4. 檢查提案是否在多數派中持久化if (isCommittedInQuorum(p)) {// 重新提交commit(p);} else {// 丟棄提案outstandingProposals.remove(p.request.zxid);}}// 5. 重新建立與Follower的連接waitForEpochAck(self.getId(), leaderStateSummary);
}
5.2 關鍵數據結構
- 提案對象(Proposal)
class Proposal {long zxid; // 事務IDRequest request; // 原始請求Set<Long> ackSet = new HashSet<>(); // ACK集合boolean committed = false; // 提交狀態// 檢查是否達到多數boolean hasAllQuorums() {return ackSet.size() >= getQuorumSize();}
}
- Leader狀態跟蹤
class Leader {// 未完成提案表ConcurrentHashMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<>();// 已提交提案表ConcurrentSkipListSet<Long> committedLog = new ConcurrentSkipListSet<>();// Follower列表List<LearnerHandler> followers = Collections.synchronizedList(new ArrayList<>());
}
5.3 Zab協議特性實現
- 全序性保證
// 為每個提案分配全局唯一ZXID
public long getNextZxid() {// 高32位是epoch,低32位是計數器return (epoch << 32) | (counter++);
}
- 可靠性保證
// 等待多數ACK
while (!p.hasAllQuorums()) {p.wait(timeout);
}
Zab協議通過精心設計的四個階段(選舉、發現、同步、廣播)實現了分布式系統的強一致性,其源碼實現展示了以下核心思想:
- 狀態機驅動:通過明確的狀態轉換管理協議流程
- 多數派原則:所有關鍵操作需獲得多數節點確認
- 冪等設計:提案處理可安全重試
- 順序保障:ZXID全局排序確保操作有序性
- 增量恢復:優先使用事務日志同步,減少全量傳輸
總結
通過對Zookeeper五大核心模塊的源碼級剖析,我們揭開了這個分布式協調服務的神秘面紗:
核心設計哲學總結
- 分層架構
從QuorumPeerMain啟動入口到FinalRequestProcessor的請求終結,Zookeeper通過清晰的層級劃分(網絡層→處理鏈→存儲層→協議層)實現了復雜功能的優雅解耦。 - 狀態機驅動范式
通過LOOKING→FOLLOWING→LEADING三態轉換,將分布式系統最復雜的共識問題轉化為確定性的狀態遷移,源碼中QuorumPeer.run()的狀態機實現堪稱經典。 - 流水線性能優化
請求處理鏈的責任鏈模式(如Prep→Sync→Proposal→Commit的分段處理)與網絡層的SelectorThread→IOWorker協作機制,共同構建了高吞吐量的處理流水線。
分布式共識的精髓實現
- Zab協議的四步流程:選舉(Election)→發現(Discovery)→同步(Sync)→廣播(Broadcast)的精密協作,在Leader.lead()和Follower.followLeader()中得以完美呈現。
- 崩潰恢復的智慧:通過epoch+ZXID的全局唯一標識(getNextZxid()實現)和提案重放機制,解決了分布式系統最棘手的腦裂問題。
- 數據一致性保障:CommitProcessor的順序提交控制與outstandingProposals的多數派確認機制,共同守護了狀態機的線性一致性。
源碼閱讀的價值
當我們在3萬行源碼中追蹤一個create /node請求的完整生命周期:
- 從NIOServerCnxn的字節反序列化開始
- 穿越PrepRequestProcessor的ACL檢查
- 經歷SyncRequestProcessor的磁盤持久化
- 通過Zab協議的提案廣播
- 最終在DataTree落地生根
這種全景式跟蹤帶來的認知深度,遠超過任何理論描述。
本篇雖已深入核心流程,但Zookeeper的精華遠不止此:會話管理的神秘時間輪、Watch機制的跨節點傳播、動態配置的切換… 這些留給讀者探索的寶藏,正是分布式領域永不枯竭的技術魅力。