大綱
1.服務器的請求處理鏈
(1)Leader服務器的請求處理鏈
一.PrepRequestProcessor請求預處理器
二.ProposalRequestProcessor事務投票處理器
三.SyncRequestProcessor事務日志處理器
四.AckRequestProcessor投票反饋處理器
五.CommitProcessor事務提交處理器
六.ToBeAppliedRequestProcessor處理器
七.FinalRequestProcessor處理器
(2)Follower服務器的請求處理鏈
一.FollowerRequestProcessor請求轉發處理器
二.SendAckRequestProcessor投票反饋處理器
2.服務端處理會話創建請求的流程
(1)請求接收
(2)會話創建
(3)請求預處理
(4)事務處理
(5)事務應用和響應
1.服務器的請求處理鏈
(1)Leader服務器的請求處理鏈
(2)Follower服務器的請求處理鏈
(1)Leader服務器的請求處理鏈
一.PrepRequestProcessor請求預處理器
二.ProposalRequestProcessor事務投票處理器
三.SyncRequestProcessor事務日志處理器
四.AckRequestProcessor投票反饋處理器
五.CommitProcessor事務提交處理器
六.ToBeAppliedRequestProcessor處理器
七.FinalRequestProcessor處理器
當客戶端需要和zk服務端進行相互協調通信時,首先要通過Leader服務器建立該客戶端與服務端的連接會話。當會話創建成功后,zk服務端就可以接收來自客戶端的請求操作了。
Leader服務器是zk集群的核心,其主要工作是:
工作一:處理事務請求,保證集群事務處理的順序性
工作二:集群內各服務器的調度者
zk服務端會使用責任鏈模式來處理每一個客戶端的請求。在服務端啟動時,會進行請求處理鏈的初始化。Leader服務器的請求處理鏈如下圖示,主要有7個請求處理器。
一.PrepRequestProcessor請求預處理器
zk中的事務請求就是會改變服務器狀態的請求。事務請求包括創建節點、更新節點、刪除節點、創建會話等請求。
PrepRequestProcessor是Leader服務器的請求預處理器(Prepare),它能夠識別出當前客戶端請求是否是事務請求,它會對事務請求進行一系列的預處理操作。這些預處理包括:創建請求事務頭事務體、會話檢查、ACL檢查等。
PrepRequestProcessor實現了RequestProcessor接口并繼承了zk線程,而且還有一個RequestProcessor類型的nextProcessor屬性字段,nextProcessor屬性字段的作用是指向下一個請求處理器。
Leader服務器在開始處理請求時,會調用PrepRequestProcessor的processRequest()方法將請求添加到隊列。請求預處理器的線程啟動后會不斷從submittedRequests隊列取出請求,然后把請求交給PrepRequestProcessor的pRequest()方法進行預處理。在pRequest()方法中,會根據請求類型來判斷請求是否是事務請求。如果是事務請求,就調用pRequest2Txn()方法對事務請求進行預處理。之后再將請求交給nextProcessor屬性字段指向的處理器進行下一步處理。
PrepRequestProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程進行處理。
PrepRequestProcessor的nextProcessor屬性指向的是ProposalRequestProcessor處理器。
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {CommitProcessor commitProcessor;PrepRequestProcessor prepRequestProcessor;...//初始化請求處理鏈@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();//啟動請求預處理器線程firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {RequestProcessor nextProcessor;LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();public void processRequest(Request request) {//將請求添加到隊列submittedRequests.add(request);}...@Overridepublic void run() {while (true) {Request request = submittedRequests.take();...pRequest(request);}}protected void pRequest(Request request) throws RequestProcessorException {request.setHdr(null);request.setTxn(null);switch (request.type) {...case OpCode.create:CreateRequest createRequest = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request,createRequest, true);break;case OpCode.delete:...}...request.zxid = zks.getZxid();//將請求交給下一個處理器來處理nextProcessor.processRequest(request);}//下面這個方法專門用來對事務請求進行預處理protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {//設置請求的事務頭事務體 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));...}...
}
有兩個入口會觸發調用PrepRequestProcessor的processRequest()方法。
第一是Leader服務器監聽到Learner轉發給Leader的事務請求。也就是在不斷運行的LearnerHandler線程中發現Learner給Leader發送請求時,會調用LeaderZooKeeperServer.submitLearnerRequest方法來觸發。
第二是zk服務端監聽到的來自客戶端的事務請求。此時會先調用ZooKeeperServer的processPacket()方法處理Socket的讀請求,然后再調用ZooKeeperServer的submitRequest()方法提交讀請求,最后就會調用ZooKeeperServer的firstProcessor的processRequest()方法。firstProcessor的processRequest()方法執行完便進入PrepRequestProcessor。
//第一個入口
public class Leader {...void lead() throws IOException, InterruptedException {...cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();...}class LearnerCnxAcceptor extends ZooKeeperCriticalThread {...@Overridepublic void run() {while (!stop) {Socket s = ss.accept();s.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());LearnerHandler fh = new LearnerHandler(s, is, Leader.this);fh.start();...}...}}...
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...case Leader.REQUEST:...//調用LeaderZooKeeperServer的submitLearnerRequest方法leader.zk.submitLearnerRequest(si);...}...}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {PrepRequestProcessor prepRequestProcessor;...@Overrideprotected void setupRequestProcessors() {...prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);...}public void submitLearnerRequest(Request request) {prepRequestProcessor.processRequest(request);}...
}//第二個入口
public class NIOServerCnxnFactory extends ServerCnxnFactory {...class SelectorThread extends AbstractSelectThread {@Overridepublic void run() {...while (!stopped) {select();...}...}private void select() {selector.select();Set<SelectionKey> selected = selector.selectedKeys();ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);Iterator<SelectionKey> selectedKeys = selectedList.iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selected.remove(key);...if (key.isReadable() || key.isWritable()) {//服務端從客戶端讀數據(讀取請求) + 服務端向客戶端寫數據(發送響應)handleIO(key);}...}}private void handleIO(SelectionKey key) {IOWorkRequest workRequest = new IOWorkRequest(this, key);NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();cnxn.disableSelectable();key.interestOps(0);//激活連接:添加連接到連接過期隊列touchCnxn(cnxn);//通過工作線程池來處理請求workerPool.schedule(workRequest);}...}private class IOWorkRequest extends WorkerService.WorkRequest {private final NIOServerCnxn cnxn;public void doWork() throws InterruptedException {...if (key.isReadable() || key.isWritable()) {cnxn.doIO(key);...}}...}
}public class WorkerService {...public void schedule(WorkRequest workRequest, long id) {ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);int size = workers.size();if (size > 0) {int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} else {scheduledWorkRequest.run();}}private class ScheduledWorkRequest implements Runnable {private final WorkRequest workRequest;ScheduledWorkRequest(WorkRequest workRequest) {this.workRequest = workRequest;}@Overridepublic void run() {...workRequest.doWork();}}...
}public class NIOServerCnxn extends ServerCnxn {private final ZooKeeperServer zkServer;void doIO(SelectionKey k) throws InterruptedException {...if (k.isReadable()) {...readPayload();}}private void readPayload() throws IOException, InterruptedException {...readRequest();}private void readRequest() throws IOException {//處理輸入流zkServer.processPacket(this, incomingBuffer);}...
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {...public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");incomingBuffer = incomingBuffer.slice();...Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());submitRequest(si);...}public void submitRequest(Request si) {...//激活會話touch(si.cnxn);//firstProcessor.processRequest方法執行完便進入PrepRequestProcessorfirstProcessor.processRequest(si);...}...
}
二.ProposalRequestProcessor事務投票處理器
ProposalRequestProcessor處理器是Leader服務器的事務投票處理器。它是PrepRequestProcessor請求預處理器的下一個處理器,它的主要作用是對事務請求進行處理,包括創建提議、發起投票。
對于非事務請求:它會將請求直接交給CommitProcessor處理器處理,不再做其他處理。
對于事務請求:除了將請求交給CommitProcessor處理器外,還會創建請求對應的Proposal提議,并將Proposal提議發送給所有Follower來發起一次集群內的事務投票,同時還會將事務請求交給SyncRequestProcessor處理器來記錄事務日志。
提議是指:當處理一個事務請求時,zk會先在服務端發起一次投票流程。該投票的主要作用是通知zk服務端的各機器處理事務請求,從而避免因某個機器出現問題而造成事務不一致的問題。
ProposalRequestProcessor事務投票處理器的三個子流程分別是:Commit流程、Proposal流程、Sync流程。
流程一:Commit流程
完成Proposal流程后,zk服務器上的數據還沒有進行任何改變。完成Proposal流程只是說明zk服務端可以執行事務請求操作了,真正執行具體數據的變更需要在Commit流程中實現。Commit流程的主要作用就是完成請求的執行。該流程是由CommitProcessor處理器來實現的。
流程二:Proposal流程
處理事務請求時,zk要取得集群中過半機器的投票才能修改數據。Proposal流程的主要工作就是投票和統計投票結果。
流程三:Sync流程
Sync流程是由SyncRequestProcessor處理器來實現的。
ProposalRequestProcessor處理器不是一個線程,它的nextProcessor就是CommitProcessor處理器,它會調用SyncRequestProcessor處理器的processRequest()方法;
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();//構建ProposalRequestProcessor處理器,下一個處理器為CommitProcessor處理器ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();//初始化ProposalRequestProcessor處理器prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}//ProposalRequestProcessor的nextProcessor就是CommitProcessor
public class ProposalRequestProcessor implements RequestProcessor {LeaderZooKeeperServer zks;RequestProcessor nextProcessor;//nextProcessor其實就是CommitProcessor處理器SyncRequestProcessor syncProcessor;//事務日志處理器,它的下一個處理器是AckRequestProcessorpublic ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {this.zks = zks;this.nextProcessor = nextProcessor;AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());//創建事務日志處理器,它的下一個處理器是AckRequestProcessorsyncProcessor = new SyncRequestProcessor(zks, ackProcessor);}//初始化ProposalRequestProcessor處理器public void initialize() {syncProcessor.start();//啟動事務日志處理器的線程}public void processRequest(Request request) throws RequestProcessorException {if (request instanceof LearnerSyncRequest) {//處理Learner的數據同步請求zks.getLeader().processSync((LearnerSyncRequest)request);} else {//Commit流程,nextProcessor其實就是CommitProcessor處理器nextProcessor.processRequest(request);if (request.getHdr() != null) {//Proposal流程zks.getLeader().propose(request);//Sync流程,將請求添加到隊列,然后由事務日志處理器線程去處理syncProcessor.processRequest(request);}}}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提議Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true); }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//將發送的Proposal提議放入outstandingProposals隊列中outstandingProposals.put(lastProposed, p);//發送Proposal提議,其實就是把Proposal提議交給LearnerHandler處理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler會將提議放入其發送隊列里f.queuePacket(qp);}}}...
}public class LearnerHandler extends ZooKeeperThread {final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();...void queuePacket(QuorumPacket p) {queuedPackets.add(p);}@Overridepublic void run() {...//啟動一個線程去發送Packet,比如Proposal提議startSendingPackets();...}protected void startSendingPackets() {if (!sendingThreadStarted) {// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());sendPackets();}}.start();sendingThreadStarted = true;} else {LOG.error("Attempting to start sending thread after it already started");}}private void sendPackets() throws InterruptedException {long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;while (true) {QuorumPacket p;p = queuedPackets.poll();if (p == null) {bufferedOutput.flush();p = queuedPackets.take();}if (p == proposalOfDeath) {break;}if (p.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (p.getType() == Leader.PROPOSAL) {syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);}oa.writeRecord(p, "packet");}}...
}
三.SyncRequestProcessor事務日志處理器
SyncRequestProcessor處理器是事務日志處理器。它的作用是將事務請求記錄到事務日志文件中,同時觸發zk進行數據快照。
SyncRequestProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程處理,它的nextProcessor是AckRequestProcessor處理器。
//SyncRequestProcessor事務日志處理器,它的下一個處理器是AckRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {private final ZooKeeperServer zks;private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();private final RequestProcessor nextProcessor;//AckRequestProcessor處理器private Thread snapInProcess = null;volatile private boolean running;private final LinkedList<Request> toFlush = new LinkedList<Request>();private final Random r = new Random();private static int snapCount = ZooKeeperServer.getSnapCount();private final Request requestOfDeath = Request.requestOfDeath;public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;running = true;}public void processRequest(Request request) {//將請求添加到隊列queuedRequests.add(request);}@Overridepublic void run() {try {int logCount = 0;int randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {if (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {if (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}}private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {if (toFlush.isEmpty()) {return;}zks.getZKDatabase().commit();while (!toFlush.isEmpty()) {Request i = toFlush.remove();if (nextProcessor != null) {nextProcessor.processRequest(i);}}if (nextProcessor != null && nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}public void shutdown() {queuedRequests.add(requestOfDeath);if (running) {this.join();}if (!toFlush.isEmpty()) {flush(toFlush);}if (nextProcessor != null) {nextProcessor.shutdown();}}
}
四.AckRequestProcessor投票反饋處理器
SyncRequestProcessor的nextProcessor就是AckRequestProcessor,AckRequestProcessor是Leader特有的處理器。
它負責在SyncRequestProcessor處理器完成事務日志記錄后,通過Leader的processAck()方法向Proposal提議添加來自Leader的ACK響應。也就是將Leader的SID添加到Proposal提議的投票收集器里,然后調用Leader的tryToCommit()方法檢查提議是否已有過半ACK并嘗試提交。
同理,如果Leader收到Follower對該Proposal提議請求返回的ACK響應,也會通過Leader的processAck()方法向提議添加來自Follower的ACK響應,也就是將Follower的SID添加到Proposal提議的投票收集器里,然后調用Leader的tryToCommit()方法檢查提議是否已有過半ACK來嘗試提交。
AckRequestProcessor處理器不是一個線程,它沒有nextProcessor屬性字段。
//SyncRequestProcessor的nextProcessor就是AckRequestProcessor
class AckRequestProcessor implements RequestProcessor {Leader leader;AckRequestProcessor(Leader leader) {this.leader = leader;}//Forward the request as an ACK to the leaderpublic void processRequest(Request request) {QuorumPeer self = leader.self;if (self != null) {//Leader也作為參與Proposal投票的一份子進行ACK響應//將Leader的SID添加到Proposal提議的投票收集器里 + 檢查Proposal提議的投票收集器是否有過半ACK才提交leader.processAck(self.getId(), request.zxid, null);} else {LOG.error("Null QuorumPeer");}}
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...switch (qp.getType()) {case Leader.ACK:...//如果Leader收到Follower對某Proposal提議請求返回的ACK響應//那么就將Follower的SID添加到該Proposal提議的投票收集器里leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;...}...}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提議Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true); }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//將發送的Proposal提議放入outstandingProposals隊列中outstandingProposals.put(lastProposed, p);//發送Proposal提議,其實就是把Proposal提議交給LearnerHandler處理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler會將提議放入其發送隊列里f.queuePacket(qp);}}}synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { ...//檢查請求的ZXID,需要比上次已提交的請求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//將Leader的SID添加到Proposal提議的投票收集器里p.addAck(sid);//嘗試提交,即檢查Proposal提議的投票收集器中是否有過半ACK響應boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { //如果提議隊列中存在該提議的前一個提議,說明該提議的前一個提議還沒提交,那么就返回false//zxid - 1是因為,只有事務請求才會生成zxid,那么前一個事務肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提議的投票收集器是否已過半if (!p.hasAllQuorums()) {return false; }...outstandingProposals.remove(zxid);if (p.request != null) {toBeApplied.add(p);}...//一旦提議通過,馬上就要在Leader中標記lastCommitted為最新的提交ZXIDcommit(zxid);//給Follower廣播commit消息inform(p);//給Observer發送commit消息...//調用CommitProcessor處理器的commit方法提交請求zk.commitProcessor.commit(p.request);//讓Leader執行commit消息//下面處理的是Learner發起的同步請求if (pendingSyncs.containsKey(zxid)) {for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {sendSync(r);} } return true; }//廣播commit消息public void commit(long zxid) {synchronized(this) {//標記lastCommitted為最新的提交ZXIDlastCommitted = zxid;}QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);sendPacket(qp);}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//調用LearnerHandler的queuePacket方法添加Packet到發送隊列f.queuePacket(qp);}}}public void inform(Proposal proposal) {QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);sendObserverPacket(qp);}...static public class Proposal extends SyncedLearnerTracker {public QuorumPacket packet;public Request request;...}
}public class SyncedLearnerTracker {protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();...//添加到投票收集器public boolean addAck(Long sid) {boolean change = false;for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {qvAckset.getAckset().add(sid);change = true;}}return change;}//判斷投票收集器是否過半public boolean hasAllQuorums() {for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))return false;}return true;}...
}
五.CommitProcessor事務提交處理器
ProposalRequestProcessor的nextProcessor就是CommitProcessor處理器,CommitProcessor就是事務提交處理器。
對于非事務請求,CommitProcessor會將其轉交給nextProcessor處理。對于事務請求,CommitProcessor會阻塞等待Proposal提議可以被提交。
CommitProcessor有個LinkedBlockingQueue隊列queuedRequests。當調用CommitProcessor的processRequest()方法時,請求會被添加到該隊列。CommitProcessor線程會從queuedRequests隊列中取出請求進行處理。此外還通過nextPending和committedRequests隊列保證請求的順序處理。
CommitProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程處理,它的nextProcessor是ToBeAppliedRequestProcessor.
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {//請求隊列protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();//下一個要提交的請求protected final AtomicReference<Request> nextPending = new AtomicReference<Request>();//當前正在處理的請求數protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);...@Overridepublic void processRequest(Request request) {if (stopped) {return;}queuedRequests.add(request);if (!isWaitingForCommit()) {wakeup();//喚醒}}private boolean isProcessingRequest() {return numRequestsProcessing.get() != 0;}private boolean isWaitingForCommit() {return nextPending.get() != null;}private boolean isProcessingCommit() {return currentlyCommitting.get() != null;}synchronized private void wakeup() {notifyAll();//喚醒阻塞的線程}@Overridepublic void run() {Request request;while (!stopped) {synchronized(this) {while (!stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) {wait();//阻塞等待}}while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) {if (needCommit(request)) {//需要進行提交的事務請求nextPending.set(request);//設置下一個要提交的請求} else {//非事務請求轉交給下一個處理器sendToNextProcessor(request);}}processCommitted();//處理提交}}protected void processCommitted() {Request request;if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) {if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {return;}request = committedRequests.poll();Request pending = nextPending.get();if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {pending.setHdr(request.getHdr());pending.setTxn(request.getTxn());pending.zxid = request.zxid;currentlyCommitting.set(pending);nextPending.set(null);sendToNextProcessor(pending);} else {currentlyCommitting.set(request);sendToNextProcessor(request);}} }public void commit(Request request) {committedRequests.add(request);if (!isProcessingCommit()) {//CommitProcessor處理器當前沒有提交請求wakeup();//CommitProcessor喚醒線程}}private void sendToNextProcessor(Request request) {numRequestsProcessing.incrementAndGet();workerPool.schedule(new CommitWorkRequest(request), request.sessionId);}private class CommitWorkRequest extends WorkerService.WorkRequest {private final Request request;CommitWorkRequest(Request request) {this.request = request;}...public void doWork() throws RequestProcessorException {try {nextProcessor.processRequest(request);} finally {currentlyCommitting.compareAndSet(request, null);if (numRequestsProcessing.decrementAndGet() == 0) {if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) {wakeup();}}}}}...
}
如何理解保證事務請求的順序處理:
順序排隊的事務請求在被ProposalRequestProcessor處理的過程中,首先會執行CommitProcessor的processRequest()方法將請求加入請求隊列,所以請求隊列queuedRequests里面的請求是按順序排好的。然后會生成Proposal提議發送給Follower并收集ACK響應,最后當ACK響應過半時才調用CommitProcessor的commit()方法,此時可以進行提交的請求就會被添加到CommitProcessor的committedRequests隊列中。
是否會因網絡原因,導致CommitProcessor的committedRequests隊列里的請求并不一定按順序排好呢?
事務請求能保證順序處理的根本原因是:
整個Proposal消息廣播過程是基于FIFO特性的TCP協議來進行網絡通信的,所以能夠很容易保證消息廣播過程中消息接收和發送的順序性。也就是廣播時是由一個主進程Leader去通過FIFO的TCP協議進行發送的,所以每個Follower接收到的Proposal和Commit請求都會按順序進入隊列。
客戶端并發執行的事務請求到達Leader時一定會按順序入隊。然后Leader對事務請求進行廣播時,也會按順序進行廣播。被單一Leader進行順序廣播的多個事務請求也會順序到達某Follower。所以某Follower收到的多個Proposal提議也會按廣播時的順序進入隊列,之后某Follower都會按廣播時的順序發送ACK響應給Leader。
所以Leader收到某Follower的ACK響應都是按廣播時的順序收到的。即使Leader先收到Follower2響應的事務2,后收到Follower1的響應事務1,但最終統計過半選票時,Leader會發現事務1首先過半從而優先保證事務1的順序。
當然,Leader的processAck()方法會先確保要被提交的請求ZXID比上次大。此外,Leader的tryToCommit()方法也會首先確保前一個事務提交了才能處理。以及Follower在接收到Proposal和Commit請求就是按順序響應,即若Follower要提交的事務ID不是pendingTxns的頭部元素,那么就退出程序。最后結合CommitProcessor里的queuedRequests + committedRequests + nextPending,于是便能保證事務請求的順序處理。
public class Leader {...synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { ...//檢查請求的ZXID,需要比上次已提交的請求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//將Leader的SID添加到Proposal提議的投票收集器里p.addAck(sid);//嘗試提交,即檢查Proposal提議的投票收集器中是否有過半ACK響應boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { //如果提議隊列中存在該提議的前一個提議,說明該提議的前一個提議還沒提交,那么就返回false//zxid - 1是因為,只有事務請求才會生成zxid,那么前一個事務肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提議的投票收集器是否已過半if (!p.hasAllQuorums()) {return false; }...zk.commitProcessor.commit(p.request);... }...
}public class Follower extends Learner{...void followLeader() throws InterruptedException {...while (this.isRunning()) {readPacket(qp);processPacket(qp);}...}protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING: ping(qp); break;case Leader.PROPOSAL://處理Leader發起的Proposal提議投票請求TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);lastQueued = hdr.getZxid();...fzk.logRequest(hdr, txn);break;case Leader.COMMIT://處理Leader發送過來的對Proposal提議進行提交的請求fzk.commit(qp.getZxid());break;...}}
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();...//將收到的投票請求放入隊列pendingTxnspublic void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}syncProcessor.processRequest(request);}//When a COMMIT message is received, eventually this method is called,//which matches up the zxid from the COMMIT with (hopefully) the head of //the pendingTxns queue and hands it to the commitProcessor to commit.//@param zxid - must correspond to the head of pendingTxns if it existspublic void commit(long zxid) {if (pendingTxns.size() == 0) {LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");return;}long firstElementZxid = pendingTxns.element().zxid;if (firstElementZxid != zxid) {//如果Follower需要提交的事務ID不是pendingTxns的頭部元素,就退出程序LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid));System.exit(12);}Request request = pendingTxns.remove();commitProcessor.commit(request);}...
}
六.ToBeAppliedRequestProcessor處理器
Leader中有一個toBeApplied隊列,專門存儲那些可以被提交的Proposal提議。ToBeAppliedRequestProcessor會把已被CommitProcessor處理過的請求,轉交給下一個處理器處理,并把請求從Leader的toBeApplied隊列中移除。
ToBeAppliedRequestProcessor處理器不是一個線程,它的next是FinalRequestProcessor處理器。
public class Leader {private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();...static class ToBeAppliedRequestProcessor implements RequestProcessor {private final RequestProcessor next;private final Leader leader;ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {this.leader = leader;this.next = next;}...public void processRequest(Request request) throws RequestProcessorException {next.processRequest(request);if (request.getHdr() != null) {long zxid = request.getHdr().getZxid();Iterator<Proposal> iter = leader.toBeApplied.iterator();if (iter.hasNext()) {Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {iter.remove();return;}}}}...}...
}
七.FinalRequestProcessor處理器
FinalRequestProcessor處理器用來處理返回客戶端響應前的收尾工作,包括創建客戶端的響應、將事務請求應用到內存數據庫中。
FinalRequestProcessor處理器不是一個線程,它也沒有nextProcessor屬性字段。
public class FinalRequestProcessor implements RequestProcessor {...public void processRequest(Request request) {...ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) {// Need to process local session requestsrc = zks.processTxn(request);if (request.getHdr() != null) {TxnHeader hdr = request.getHdr();Record txn = request.getTxn();long zxid = hdr.getZxid();while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) {ChangeRecord cr = zks.outstandingChanges.remove();if (zks.outstandingChangesForPath.get(cr.path) == cr) {zks.outstandingChangesForPath.remove(cr.path);}}}// do not add non quorum packets to the queue.if (request.isQuorum()) {zks.getZKDatabase().addCommittedProposal(request);}}...//創建響應并發送響應給客戶端long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());cnxn.sendResponse(hdr, rsp, "response");if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}}...
}
總結:
PrepRequestProcessor的nextProcessor就是ProposalRequestProcessor;
ProposalRequestProcessor的nextProcessor就是CommitProcessor;
CommitProcessor的nextProcessor就是ToBeAppliedRequestProcessor;
ToBeAppliedRequestProcessor的next是FinalRequestProcessor;
FinalRequestProcessor沒有nextProcessor屬性字段;ProposalRequestProcessor會調用SyncRequestProcessor處理器的方法;
SyncRequestProcessor的nextProcessor就是AckRequestProcessor;
AckRequestProcessor沒有nextProcessor屬性字段;PrepRequestProcessor處理器是一個線程;
ProposalRequestProcessor處理器不是一個線程;
CommitProcessor處理器是一個線程;
ToBeAppliedRequestProcessor處理器不是一個線程;
FinalRequestProcessor處理器不是一個線程;SyncRequestProcessor處理器是一個線程;
AckRequestProcessor處理器不是一個線程;
(2)Follower服務器的請求處理鏈
一.FollowerRequestProcessor請求轉發處理器
二.SendAckRequestProcessor投票反饋處理器
Follower服務器的主要工作是:
一.處理非事務請求 + 轉發事務請求給Leader服務器
二.參與事務請求的Proposal提議的投票
三.參與Leader選舉投票
Follower服務器的請求處理鏈如下圖示:
Leader服務器的第一個處理器是LeaderRequestProcessor,Follower服務器的第一個處理器是FollowerRequestProcessor。由于不需要處理事務請求的投票,所以Follower服務器沒有ProposalRequestProcessor處理器。
一.FollowerRequestProcessor請求轉發處理器
FollowerRequestProcessor主要工作是識別當前請求是否是事務請求。如果是事務請求,那么Follower就會將該事務請求轉發給Leader服務器。FollowerRequestProcessor處理器會通過調用Learner的request()方法實現請求轉發。Learner的request方法會往輸出流leaderOs中寫入請求數據來發給Leader。輸出流leaderOs在Follower和Leader建立好連接時就已經初始化好了的。
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public Follower getFollower(){return self.follower;}...
}public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {FollowerZooKeeperServer zks;RequestProcessor nextProcessor;LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();boolean finished = false;public FollowerRequestProcessor(FollowerZooKeeperServer zks, RequestProcessor nextProcessor) {super("FollowerRequestProcessor:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;}@Overridepublic void run() {while (!finished) {Request request = queuedRequests.take();if (request == Request.requestOfDeath) {break;}nextProcessor.processRequest(request);//如果是事務請求,則調用zks.getFollower().request(request)轉發事務請求給Leaderswitch (request.type) {case OpCode.sync:zks.pendingSyncs.add(request);zks.getFollower().request(request);break;case OpCode.create:case OpCode.create2:case OpCode.createTTL:case OpCode.createContainer:case OpCode.delete:case OpCode.deleteContainer:case OpCode.setData:case OpCode.reconfig:case OpCode.setACL:case OpCode.multi:case OpCode.check:zks.getFollower().request(request);break;case OpCode.createSession:case OpCode.closeSession:// Don't forward local sessions to the leader.if (!request.isLocalSession()) {zks.getFollower().request(request);}break;}}}public void processRequest(Request request) {if (!finished) {Request upgradeRequest = null;upgradeRequest = zks.checkUpgradeSession(request);if (upgradeRequest != null) {queuedRequests.add(upgradeRequest);}queuedRequests.add(request);}}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...//send a request packet to the leader//發送一個請求給Leadervoid request(Request request) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();DataOutputStream oa = new DataOutputStream(baos);oa.writeLong(request.sessionId);oa.writeInt(request.cxid);oa.writeInt(request.type);if (request.request != null) {request.request.rewind();int len = request.request.remaining();byte b[] = new byte[len];request.request.get(b);request.request.rewind();oa.write(b);}oa.close();QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);//發送請求,往輸出流leaderOs寫數據writePacket(qp, true);}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {leaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立連接時就已經初始化好輸出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum()) {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好輸入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好輸出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}//創建BIO的scoektprivate Socket createSocket() throws X509Exception, IOException {Socket sock;if (self.isSslQuorum()) {sock = self.getX509Util().createSSLSocket();} else {sock = new Socket();}sock.setSoTimeout(self.tickTime * self.initLimit);return sock;}...
}
二.SendAckRequestProcessor投票反饋處理器
Leader的請求處理鏈有個叫AckRequestProcessor的投票反饋處理器,主要負責在執行完SyncRequestProcessor處理器記錄好事務日志后,向Proposal提議反饋來自Leader的ACK響應。
Follower的請求處理鏈也有個叫SendAckRequestProcessor的投票反饋處理器,主要負責在執行完SyncRequestProcessor處理器記錄好事務日志后,通過發送消息給Leader來向Proposal提議反饋來自Follower的ACK響應。
Follower請求處理鏈的SyncRequestProcessor處理器會啟動一個線程。SyncRequestProcessor處理器會先把請求添加到隊列,然后由線程處理。SyncRequestProcessor的nextProcessor就是SendAckRequestProcessor請求處理器。SendAckRequestProcessor不是一個線程。
public class SendAckRequestProcessor implements RequestProcessor, Flushable {Learner learner;SendAckRequestProcessor(Learner peer) {this.learner = peer;}public void processRequest(Request si) {if (si.type != OpCode.sync) {QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);learner.writePacket(qp, false);//向Leader發送ACK響應...}}...
}public class Follower extends Learner {...void followLeader() throws InterruptedException {...QuorumServer leaderServer = findLeader(); connectToLeader(leaderServer.addr, leaderServer.hostname);long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);//讀取Leader發過來的請求的輸入流leaderIsprocessPacket(qp);//處理Leader發過來的請求,其中就包括Proposal提議的投票請求}...}protected void processPacket(QuorumPacket qp) throws Exception{switch (qp.getType()) {case Leader.PING: ping(qp); break;//對Leader發起的Proposal提議投票進行響應case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);...//對Leader發起的Proposal提議投票進行響應//此時請求便能進入SyncRequestProcessor處理器的隊列里了//SyncRequestProcessor線程處理完該請求,就會由SendAckRequestProcessor來發送ACK響應fzk.logRequest(hdr, txn);break;...}}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}//調用SyncRequestProcessor的processRequest方法處理Proposal提議的投票響應syncProcessor.processRequest(request);}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...void readPacket(QuorumPacket pp) throws IOException {synchronized (leaderIs) {//讀取Leader發送過來的請求的輸入流leaderIs.readRecord(pp, "packet");}}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {//將響應寫入輸出流,發送給LeaderleaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立連接時就已經初始化好輸出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum()) {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好輸入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好輸出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}...
}