zk源碼—5.請求的處理過程一

大綱

1.服務器的請求處理鏈

(1)Leader服務器的請求處理鏈

一.PrepRequestProcessor請求預處理器

二.ProposalRequestProcessor事務投票處理器

三.SyncRequestProcessor事務日志處理器

四.AckRequestProcessor投票反饋處理器

五.CommitProcessor事務提交處理器

六.ToBeAppliedRequestProcessor處理器

七.FinalRequestProcessor處理器

(2)Follower服務器的請求處理鏈

一.FollowerRequestProcessor請求轉發處理器

二.SendAckRequestProcessor投票反饋處理器

2.服務端處理會話創建請求的流程

(1)請求接收

(2)會話創建

(3)請求預處理

(4)事務處理

(5)事務應用和響應

1.服務器的請求處理鏈

(1)Leader服務器的請求處理鏈

(2)Follower服務器的請求處理鏈

(1)Leader服務器的請求處理鏈

一.PrepRequestProcessor請求預處理器

二.ProposalRequestProcessor事務投票處理器

三.SyncRequestProcessor事務日志處理器

四.AckRequestProcessor投票反饋處理器

五.CommitProcessor事務提交處理器

六.ToBeAppliedRequestProcessor處理器

七.FinalRequestProcessor處理器

當客戶端需要和zk服務端進行相互協調通信時,首先要通過Leader服務器建立該客戶端與服務端的連接會話。當會話創建成功后,zk服務端就可以接收來自客戶端的請求操作了。

Leader服務器是zk集群的核心,其主要工作是:

工作一:處理事務請求,保證集群事務處理的順序性

工作二:集群內各服務器的調度者

zk服務端會使用責任鏈模式來處理每一個客戶端的請求。在服務端啟動時,會進行請求處理鏈的初始化。Leader服務器的請求處理鏈如下圖示,主要有7個請求處理器。

圖片

一.PrepRequestProcessor請求預處理器

zk中的事務請求就是會改變服務器狀態的請求。事務請求包括創建節點、更新節點、刪除節點、創建會話等請求。

PrepRequestProcessor是Leader服務器的請求預處理器(Prepare),它能夠識別出當前客戶端請求是否是事務請求,它會對事務請求進行一系列的預處理操作。這些預處理包括:創建請求事務頭事務體、會話檢查、ACL檢查等。

PrepRequestProcessor實現了RequestProcessor接口并繼承了zk線程,而且還有一個RequestProcessor類型的nextProcessor屬性字段,nextProcessor屬性字段的作用是指向下一個請求處理器。

Leader服務器在開始處理請求時,會調用PrepRequestProcessor的processRequest()方法將請求添加到隊列。請求預處理器的線程啟動后會不斷從submittedRequests隊列取出請求,然后把請求交給PrepRequestProcessor的pRequest()方法進行預處理。在pRequest()方法中,會根據請求類型來判斷請求是否是事務請求。如果是事務請求,就調用pRequest2Txn()方法對事務請求進行預處理。之后再將請求交給nextProcessor屬性字段指向的處理器進行下一步處理。

PrepRequestProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程進行處理。

PrepRequestProcessor的nextProcessor屬性指向的是ProposalRequestProcessor處理器。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {CommitProcessor commitProcessor;PrepRequestProcessor prepRequestProcessor;...//初始化請求處理鏈@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();//啟動請求預處理器線程firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {RequestProcessor nextProcessor;LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();public void processRequest(Request request) {//將請求添加到隊列submittedRequests.add(request);}...@Overridepublic void run() {while (true) {Request request = submittedRequests.take();...pRequest(request);}}protected void pRequest(Request request) throws RequestProcessorException {request.setHdr(null);request.setTxn(null);switch (request.type) {...case OpCode.create:CreateRequest createRequest = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request,createRequest, true);break;case OpCode.delete:...}...request.zxid = zks.getZxid();//將請求交給下一個處理器來處理nextProcessor.processRequest(request);}//下面這個方法專門用來對事務請求進行預處理protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {//設置請求的事務頭事務體      request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));...}...
}

有兩個入口會觸發調用PrepRequestProcessor的processRequest()方法。

第一是Leader服務器監聽到Learner轉發給Leader的事務請求。也就是在不斷運行的LearnerHandler線程中發現Learner給Leader發送請求時,會調用LeaderZooKeeperServer.submitLearnerRequest方法來觸發。

第二是zk服務端監聽到的來自客戶端的事務請求。此時會先調用ZooKeeperServer的processPacket()方法處理Socket的讀請求,然后再調用ZooKeeperServer的submitRequest()方法提交讀請求,最后就會調用ZooKeeperServer的firstProcessor的processRequest()方法。firstProcessor的processRequest()方法執行完便進入PrepRequestProcessor。

//第一個入口
public class Leader {...void lead() throws IOException, InterruptedException {...cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();...}class LearnerCnxAcceptor extends ZooKeeperCriticalThread {...@Overridepublic void run() {while (!stop) {Socket s = ss.accept();s.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());LearnerHandler fh = new LearnerHandler(s, is, Leader.this);fh.start();...}...}}...
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...case Leader.REQUEST:...//調用LeaderZooKeeperServer的submitLearnerRequest方法leader.zk.submitLearnerRequest(si);...}...}...
}public class LeaderZooKeeperServer extends QuorumZooKeeperServer {PrepRequestProcessor prepRequestProcessor;...@Overrideprotected void setupRequestProcessors() {...prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);...}public void submitLearnerRequest(Request request) {prepRequestProcessor.processRequest(request);}...
}//第二個入口
public class NIOServerCnxnFactory extends ServerCnxnFactory {...class SelectorThread extends AbstractSelectThread {@Overridepublic void run() {...while (!stopped) {select();...}...}private void select() {selector.select();Set<SelectionKey> selected = selector.selectedKeys();ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);Iterator<SelectionKey> selectedKeys = selectedList.iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selected.remove(key);...if (key.isReadable() || key.isWritable()) {//服務端從客戶端讀數據(讀取請求) + 服務端向客戶端寫數據(發送響應)handleIO(key);}...}}private void handleIO(SelectionKey key) {IOWorkRequest workRequest = new IOWorkRequest(this, key);NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();cnxn.disableSelectable();key.interestOps(0);//激活連接:添加連接到連接過期隊列touchCnxn(cnxn);//通過工作線程池來處理請求workerPool.schedule(workRequest);}...}private class IOWorkRequest extends WorkerService.WorkRequest {private final NIOServerCnxn cnxn;public void doWork() throws InterruptedException {...if (key.isReadable() || key.isWritable()) {cnxn.doIO(key);...}}...}
}public class WorkerService {...public void schedule(WorkRequest workRequest, long id) {ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);int size = workers.size();if (size > 0) {int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} else {scheduledWorkRequest.run();}}private class ScheduledWorkRequest implements Runnable {private final WorkRequest workRequest;ScheduledWorkRequest(WorkRequest workRequest) {this.workRequest = workRequest;}@Overridepublic void run() {...workRequest.doWork();}}...
}public class NIOServerCnxn extends ServerCnxn {private final ZooKeeperServer zkServer;void doIO(SelectionKey k) throws InterruptedException {...if (k.isReadable()) {...readPayload();}}private void readPayload() throws IOException, InterruptedException {...readRequest();}private void readRequest() throws IOException {//處理輸入流zkServer.processPacket(this, incomingBuffer);}...
}public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {...public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");incomingBuffer = incomingBuffer.slice();...Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());submitRequest(si);...}public void submitRequest(Request si) {...//激活會話touch(si.cnxn);//firstProcessor.processRequest方法執行完便進入PrepRequestProcessorfirstProcessor.processRequest(si);...}...
}

二.ProposalRequestProcessor事務投票處理器

ProposalRequestProcessor處理器是Leader服務器的事務投票處理器。它是PrepRequestProcessor請求預處理器的下一個處理器,它的主要作用是對事務請求進行處理,包括創建提議、發起投票。

對于非事務請求:它會將請求直接交給CommitProcessor處理器處理,不再做其他處理。

對于事務請求:除了將請求交給CommitProcessor處理器外,還會創建請求對應的Proposal提議,并將Proposal提議發送給所有Follower來發起一次集群內的事務投票,同時還會將事務請求交給SyncRequestProcessor處理器來記錄事務日志。

提議是指:當處理一個事務請求時,zk會先在服務端發起一次投票流程。該投票的主要作用是通知zk服務端的各機器處理事務請求,從而避免因某個機器出現問題而造成事務不一致的問題。

ProposalRequestProcessor事務投票處理器的三個子流程分別是:Commit流程、Proposal流程、Sync流程。

流程一:Commit流程

完成Proposal流程后,zk服務器上的數據還沒有進行任何改變。完成Proposal流程只是說明zk服務端可以執行事務請求操作了,真正執行具體數據的變更需要在Commit流程中實現。Commit流程的主要作用就是完成請求的執行。該流程是由CommitProcessor處理器來實現的。

流程二:Proposal流程

處理事務請求時,zk要取得集群中過半機器的投票才能修改數據。Proposal流程的主要工作就是投票和統計投票結果。

流程三:Sync流程

Sync流程是由SyncRequestProcessor處理器來實現的。

ProposalRequestProcessor處理器不是一個線程,它的nextProcessor就是CommitProcessor處理器,它會調用SyncRequestProcessor處理器的processRequest()方法;

public class LeaderZooKeeperServer extends QuorumZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();//構建ProposalRequestProcessor處理器,下一個處理器為CommitProcessor處理器ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();//初始化ProposalRequestProcessor處理器prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}...
}//ProposalRequestProcessor的nextProcessor就是CommitProcessor
public class ProposalRequestProcessor implements RequestProcessor {LeaderZooKeeperServer zks;RequestProcessor nextProcessor;//nextProcessor其實就是CommitProcessor處理器SyncRequestProcessor syncProcessor;//事務日志處理器,它的下一個處理器是AckRequestProcessorpublic ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {this.zks = zks;this.nextProcessor = nextProcessor;AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());//創建事務日志處理器,它的下一個處理器是AckRequestProcessorsyncProcessor = new SyncRequestProcessor(zks, ackProcessor);}//初始化ProposalRequestProcessor處理器public void initialize() {syncProcessor.start();//啟動事務日志處理器的線程}public void processRequest(Request request) throws RequestProcessorException {if (request instanceof LearnerSyncRequest) {//處理Learner的數據同步請求zks.getLeader().processSync((LearnerSyncRequest)request);} else {//Commit流程,nextProcessor其實就是CommitProcessor處理器nextProcessor.processRequest(request);if (request.getHdr() != null) {//Proposal流程zks.getLeader().propose(request);//Sync流程,將請求添加到隊列,然后由事務日志處理器線程去處理syncProcessor.processRequest(request);}}}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提議Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true);                       }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//將發送的Proposal提議放入outstandingProposals隊列中outstandingProposals.put(lastProposed, p);//發送Proposal提議,其實就是把Proposal提議交給LearnerHandler處理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler會將提議放入其發送隊列里f.queuePacket(qp);}}}...
}public class LearnerHandler extends ZooKeeperThread {final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();...void queuePacket(QuorumPacket p) {queuedPackets.add(p);}@Overridepublic void run() {...//啟動一個線程去發送Packet,比如Proposal提議startSendingPackets();...}protected void startSendingPackets() {if (!sendingThreadStarted) {// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());sendPackets();}}.start();sendingThreadStarted = true;} else {LOG.error("Attempting to start sending thread after it already started");}}private void sendPackets() throws InterruptedException {long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;while (true) {QuorumPacket p;p = queuedPackets.poll();if (p == null) {bufferedOutput.flush();p = queuedPackets.take();}if (p == proposalOfDeath) {break;}if (p.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (p.getType() == Leader.PROPOSAL) {syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);}oa.writeRecord(p, "packet");}}...
}

三.SyncRequestProcessor事務日志處理器

SyncRequestProcessor處理器是事務日志處理器。它的作用是將事務請求記錄到事務日志文件中,同時觸發zk進行數據快照。

SyncRequestProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程處理,它的nextProcessor是AckRequestProcessor處理器。

//SyncRequestProcessor事務日志處理器,它的下一個處理器是AckRequestProcessor
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {private final ZooKeeperServer zks;private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();private final RequestProcessor nextProcessor;//AckRequestProcessor處理器private Thread snapInProcess = null;volatile private boolean running;private final LinkedList<Request> toFlush = new LinkedList<Request>();private final Random r = new Random();private static int snapCount = ZooKeeperServer.getSnapCount();private final Request requestOfDeath = Request.requestOfDeath;public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;running = true;}public void processRequest(Request request) {//將請求添加到隊列queuedRequests.add(request);}@Overridepublic void run() {try {int logCount = 0;int randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {if (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {if (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}}private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {if (toFlush.isEmpty()) {return;}zks.getZKDatabase().commit();while (!toFlush.isEmpty()) {Request i = toFlush.remove();if (nextProcessor != null) {nextProcessor.processRequest(i);}}if (nextProcessor != null && nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}public void shutdown() {queuedRequests.add(requestOfDeath);if (running) {this.join();}if (!toFlush.isEmpty()) {flush(toFlush);}if (nextProcessor != null) {nextProcessor.shutdown();}}
}

四.AckRequestProcessor投票反饋處理器

SyncRequestProcessor的nextProcessor就是AckRequestProcessor,AckRequestProcessor是Leader特有的處理器。

它負責在SyncRequestProcessor處理器完成事務日志記錄后,通過Leader的processAck()方法向Proposal提議添加來自Leader的ACK響應。也就是將Leader的SID添加到Proposal提議的投票收集器里,然后調用Leader的tryToCommit()方法檢查提議是否已有過半ACK并嘗試提交。

同理,如果Leader收到Follower對該Proposal提議請求返回的ACK響應,也會通過Leader的processAck()方法向提議添加來自Follower的ACK響應,也就是將Follower的SID添加到Proposal提議的投票收集器里,然后調用Leader的tryToCommit()方法檢查提議是否已有過半ACK來嘗試提交。

AckRequestProcessor處理器不是一個線程,它沒有nextProcessor屬性字段。

//SyncRequestProcessor的nextProcessor就是AckRequestProcessor
class AckRequestProcessor implements RequestProcessor {Leader leader;AckRequestProcessor(Leader leader) {this.leader = leader;}//Forward the request as an ACK to the leaderpublic void processRequest(Request request) {QuorumPeer self = leader.self;if (self != null) {//Leader也作為參與Proposal投票的一份子進行ACK響應//將Leader的SID添加到Proposal提議的投票收集器里 + 檢查Proposal提議的投票收集器是否有過半ACK才提交leader.processAck(self.getId(), request.zxid, null);} else {LOG.error("Null QuorumPeer");}}
}public class LearnerHandler extends ZooKeeperThread {...@Overridepublic void run() {...while (true) {...switch (qp.getType()) {case Leader.ACK:...//如果Leader收到Follower對某Proposal提議請求返回的ACK響應//那么就將Follower的SID添加到該Proposal提議的投票收集器里leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;...}...}...
}public class Leader {final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();...public Proposal propose(Request request) throws XidRolloverException {...byte[] data = SerializeUtils.serializeRequest(request);proposalStats.setLastBufferSize(data.length);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);//生成Proposal提議Proposal p = new Proposal();p.packet = pp;p.request = request;synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());if (request.getHdr().getType() == OpCode.reconfig) {self.setLastSeenQuorumVerifier(request.qv, true);                       }if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());}lastProposed = p.packet.getZxid();//將發送的Proposal提議放入outstandingProposals隊列中outstandingProposals.put(lastProposed, p);//發送Proposal提議,其實就是把Proposal提議交給LearnerHandler處理sendPacket(pp);}return p;}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//LearnerHandler會將提議放入其發送隊列里f.queuePacket(qp);}}}synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        ...//檢查請求的ZXID,需要比上次已提交的請求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//將Leader的SID添加到Proposal提議的投票收集器里p.addAck(sid);//嘗試提交,即檢查Proposal提議的投票收集器中是否有過半ACK響應boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       //如果提議隊列中存在該提議的前一個提議,說明該提議的前一個提議還沒提交,那么就返回false//zxid - 1是因為,只有事務請求才會生成zxid,那么前一個事務肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提議的投票收集器是否已過半if (!p.hasAllQuorums()) {return false;                 }...outstandingProposals.remove(zxid);if (p.request != null) {toBeApplied.add(p);}...//一旦提議通過,馬上就要在Leader中標記lastCommitted為最新的提交ZXIDcommit(zxid);//給Follower廣播commit消息inform(p);//給Observer發送commit消息...//調用CommitProcessor處理器的commit方法提交請求zk.commitProcessor.commit(p.request);//讓Leader執行commit消息//下面處理的是Learner發起的同步請求if (pendingSyncs.containsKey(zxid)) {for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {sendSync(r);}               } return true;   }//廣播commit消息public void commit(long zxid) {synchronized(this) {//標記lastCommitted為最新的提交ZXIDlastCommitted = zxid;}QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);sendPacket(qp);}void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {//調用LearnerHandler的queuePacket方法添加Packet到發送隊列f.queuePacket(qp);}}}public void inform(Proposal proposal) {QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);sendObserverPacket(qp);}...static public class Proposal extends SyncedLearnerTracker {public QuorumPacket packet;public Request request;...}
}public class SyncedLearnerTracker {protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();...//添加到投票收集器public boolean addAck(Long sid) {boolean change = false;for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {qvAckset.getAckset().add(sid);change = true;}}return change;}//判斷投票收集器是否過半public boolean hasAllQuorums() {for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))return false;}return true;}...
}

五.CommitProcessor事務提交處理器

ProposalRequestProcessor的nextProcessor就是CommitProcessor處理器,CommitProcessor就是事務提交處理器。

對于非事務請求,CommitProcessor會將其轉交給nextProcessor處理。對于事務請求,CommitProcessor會阻塞等待Proposal提議可以被提交。

CommitProcessor有個LinkedBlockingQueue隊列queuedRequests。當調用CommitProcessor的processRequest()方法時,請求會被添加到該隊列。CommitProcessor線程會從queuedRequests隊列中取出請求進行處理。此外還通過nextPending和committedRequests隊列保證請求的順序處理。

CommitProcessor處理器也是一個線程,它會先把請求添加到隊列,然后由線程處理,它的nextProcessor是ToBeAppliedRequestProcessor.

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {//請求隊列protected final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();//下一個要提交的請求protected final AtomicReference<Request> nextPending = new AtomicReference<Request>();//當前正在處理的請求數protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);...@Overridepublic void processRequest(Request request) {if (stopped) {return;}queuedRequests.add(request);if (!isWaitingForCommit()) {wakeup();//喚醒}}private boolean isProcessingRequest() {return numRequestsProcessing.get() != 0;}private boolean isWaitingForCommit() {return nextPending.get() != null;}private boolean isProcessingCommit() {return currentlyCommitting.get() != null;}synchronized private void wakeup() {notifyAll();//喚醒阻塞的線程}@Overridepublic void run() {Request request;while (!stopped) {synchronized(this) {while (!stopped && ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) && (committedRequests.isEmpty() || isProcessingRequest()))) {wait();//阻塞等待}}while (!stopped && !isWaitingForCommit() && !isProcessingCommit() && (request = queuedRequests.poll()) != null) {if (needCommit(request)) {//需要進行提交的事務請求nextPending.set(request);//設置下一個要提交的請求} else {//非事務請求轉交給下一個處理器sendToNextProcessor(request);}}processCommitted();//處理提交}}protected void processCommitted() {Request request;if (!stopped && !isProcessingRequest() && (committedRequests.peek() != null)) {if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {return;}request = committedRequests.poll();Request pending = nextPending.get();if (pending != null && pending.sessionId == request.sessionId && pending.cxid == request.cxid) {pending.setHdr(request.getHdr());pending.setTxn(request.getTxn());pending.zxid = request.zxid;currentlyCommitting.set(pending);nextPending.set(null);sendToNextProcessor(pending);} else {currentlyCommitting.set(request);sendToNextProcessor(request);}}      }public void commit(Request request) {committedRequests.add(request);if (!isProcessingCommit()) {//CommitProcessor處理器當前沒有提交請求wakeup();//CommitProcessor喚醒線程}}private void sendToNextProcessor(Request request) {numRequestsProcessing.incrementAndGet();workerPool.schedule(new CommitWorkRequest(request), request.sessionId);}private class CommitWorkRequest extends WorkerService.WorkRequest {private final Request request;CommitWorkRequest(Request request) {this.request = request;}...public void doWork() throws RequestProcessorException {try {nextProcessor.processRequest(request);} finally {currentlyCommitting.compareAndSet(request, null);if (numRequestsProcessing.decrementAndGet() == 0) {if (!queuedRequests.isEmpty() || !committedRequests.isEmpty()) {wakeup();}}}}}...
}

如何理解保證事務請求的順序處理:

順序排隊的事務請求在被ProposalRequestProcessor處理的過程中,首先會執行CommitProcessor的processRequest()方法將請求加入請求隊列,所以請求隊列queuedRequests里面的請求是按順序排好的。然后會生成Proposal提議發送給Follower并收集ACK響應,最后當ACK響應過半時才調用CommitProcessor的commit()方法,此時可以進行提交的請求就會被添加到CommitProcessor的committedRequests隊列中。

是否會因網絡原因,導致CommitProcessor的committedRequests隊列里的請求并不一定按順序排好呢?

事務請求能保證順序處理的根本原因是:

整個Proposal消息廣播過程是基于FIFO特性的TCP協議來進行網絡通信的,所以能夠很容易保證消息廣播過程中消息接收和發送的順序性。也就是廣播時是由一個主進程Leader去通過FIFO的TCP協議進行發送的,所以每個Follower接收到的Proposal和Commit請求都會按順序進入隊列。

客戶端并發執行的事務請求到達Leader時一定會按順序入隊。然后Leader對事務請求進行廣播時,也會按順序進行廣播。被單一Leader進行順序廣播的多個事務請求也會順序到達某Follower。所以某Follower收到的多個Proposal提議也會按廣播時的順序進入隊列,之后某Follower都會按廣播時的順序發送ACK響應給Leader。

所以Leader收到某Follower的ACK響應都是按廣播時的順序收到的。即使Leader先收到Follower2響應的事務2,后收到Follower1的響應事務1,但最終統計過半選票時,Leader會發現事務1首先過半從而優先保證事務1的順序。

圖片

當然,Leader的processAck()方法會先確保要被提交的請求ZXID比上次大。此外,Leader的tryToCommit()方法也會首先確保前一個事務提交了才能處理。以及Follower在接收到Proposal和Commit請求就是按順序響應,即若Follower要提交的事務ID不是pendingTxns的頭部元素,那么就退出程序。最后結合CommitProcessor里的queuedRequests + committedRequests + nextPending,于是便能保證事務請求的順序處理。

public class Leader {...synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        ...//檢查請求的ZXID,需要比上次已提交的請求的ZXID也就是lastCommitted要大if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);//將Leader的SID添加到Proposal提議的投票收集器里p.addAck(sid);//嘗試提交,即檢查Proposal提議的投票收集器中是否有過半ACK響應boolean hasCommitted = tryToCommit(p, zxid, followerAddr);...}synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       //如果提議隊列中存在該提議的前一個提議,說明該提議的前一個提議還沒提交,那么就返回false//zxid - 1是因為,只有事務請求才會生成zxid,那么前一個事務肯定就是zxid = 1if (outstandingProposals.containsKey(zxid - 1)) return false;//getting a quorum from all necessary configurations.//Proposal提議的投票收集器是否已過半if (!p.hasAllQuorums()) {return false;                 }...zk.commitProcessor.commit(p.request);...      }...
}public class Follower extends Learner{...void followLeader() throws InterruptedException {...while (this.isRunning()) {readPacket(qp);processPacket(qp);}...}protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING:            ping(qp);            break;case Leader.PROPOSAL://處理Leader發起的Proposal提議投票請求TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);lastQueued = hdr.getZxid();...fzk.logRequest(hdr, txn);break;case Leader.COMMIT://處理Leader發送過來的對Proposal提議進行提交的請求fzk.commit(qp.getZxid());break;...}}
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();...//將收到的投票請求放入隊列pendingTxnspublic void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}syncProcessor.processRequest(request);}//When a COMMIT message is received, eventually this method is called,//which matches up the zxid from the COMMIT with (hopefully) the head of //the pendingTxns queue and hands it to the commitProcessor to commit.//@param zxid - must correspond to the head of pendingTxns if it existspublic void commit(long zxid) {if (pendingTxns.size() == 0) {LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");return;}long firstElementZxid = pendingTxns.element().zxid;if (firstElementZxid != zxid) {//如果Follower需要提交的事務ID不是pendingTxns的頭部元素,就退出程序LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid));System.exit(12);}Request request = pendingTxns.remove();commitProcessor.commit(request);}...
}

六.ToBeAppliedRequestProcessor處理器

Leader中有一個toBeApplied隊列,專門存儲那些可以被提交的Proposal提議。ToBeAppliedRequestProcessor會把已被CommitProcessor處理過的請求,轉交給下一個處理器處理,并把請求從Leader的toBeApplied隊列中移除。

ToBeAppliedRequestProcessor處理器不是一個線程,它的next是FinalRequestProcessor處理器。

public class Leader {private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();...static class ToBeAppliedRequestProcessor implements RequestProcessor {private final RequestProcessor next;private final Leader leader;ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {this.leader = leader;this.next = next;}...public void processRequest(Request request) throws RequestProcessorException {next.processRequest(request);if (request.getHdr() != null) {long zxid = request.getHdr().getZxid();Iterator<Proposal> iter = leader.toBeApplied.iterator();if (iter.hasNext()) {Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {iter.remove();return;}}}}...}...
}

七.FinalRequestProcessor處理器

FinalRequestProcessor處理器用來處理返回客戶端響應前的收尾工作,包括創建客戶端的響應、將事務請求應用到內存數據庫中。

FinalRequestProcessor處理器不是一個線程,它也沒有nextProcessor屬性字段。

public class FinalRequestProcessor implements RequestProcessor {...public void processRequest(Request request) {...ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) {// Need to process local session requestsrc = zks.processTxn(request);if (request.getHdr() != null) {TxnHeader hdr = request.getHdr();Record txn = request.getTxn();long zxid = hdr.getZxid();while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.peek().zxid <= zxid) {ChangeRecord cr = zks.outstandingChanges.remove();if (zks.outstandingChangesForPath.get(cr.path) == cr) {zks.outstandingChangesForPath.remove(cr.path);}}}// do not add non quorum packets to the queue.if (request.isQuorum()) {zks.getZKDatabase().addCommittedProposal(request);}}...//創建響應并發送響應給客戶端long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());cnxn.sendResponse(hdr, rsp, "response");if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}}...
}

總結:

PrepRequestProcessor的nextProcessor就是ProposalRequestProcessor;
ProposalRequestProcessor的nextProcessor就是CommitProcessor;
CommitProcessor的nextProcessor就是ToBeAppliedRequestProcessor;
ToBeAppliedRequestProcessor的next是FinalRequestProcessor;
FinalRequestProcessor沒有nextProcessor屬性字段;ProposalRequestProcessor會調用SyncRequestProcessor處理器的方法;
SyncRequestProcessor的nextProcessor就是AckRequestProcessor;
AckRequestProcessor沒有nextProcessor屬性字段;PrepRequestProcessor處理器是一個線程;
ProposalRequestProcessor處理器不是一個線程;
CommitProcessor處理器是一個線程;
ToBeAppliedRequestProcessor處理器不是一個線程;
FinalRequestProcessor處理器不是一個線程;SyncRequestProcessor處理器是一個線程;
AckRequestProcessor處理器不是一個線程;

(2)Follower服務器的請求處理鏈

一.FollowerRequestProcessor請求轉發處理器

二.SendAckRequestProcessor投票反饋處理器

Follower服務器的主要工作是:

一.處理非事務請求 + 轉發事務請求給Leader服務器

二.參與事務請求的Proposal提議的投票

三.參與Leader選舉投票

Follower服務器的請求處理鏈如下圖示:

圖片

Leader服務器的第一個處理器是LeaderRequestProcessor,Follower服務器的第一個處理器是FollowerRequestProcessor。由于不需要處理事務請求的投票,所以Follower服務器沒有ProposalRequestProcessor處理器。

一.FollowerRequestProcessor請求轉發處理器

FollowerRequestProcessor主要工作是識別當前請求是否是事務請求。如果是事務請求,那么Follower就會將該事務請求轉發給Leader服務器。FollowerRequestProcessor處理器會通過調用Learner的request()方法實現請求轉發。Learner的request方法會往輸出流leaderOs中寫入請求數據來發給Leader。輸出流leaderOs在Follower和Leader建立好連接時就已經初始化好了的。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {...protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public Follower getFollower(){return self.follower;}...
}public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {FollowerZooKeeperServer zks;RequestProcessor nextProcessor;LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();boolean finished = false;public FollowerRequestProcessor(FollowerZooKeeperServer zks, RequestProcessor nextProcessor) {super("FollowerRequestProcessor:" + zks.getServerId(), zks.getZooKeeperServerListener());this.zks = zks;this.nextProcessor = nextProcessor;}@Overridepublic void run() {while (!finished) {Request request = queuedRequests.take();if (request == Request.requestOfDeath) {break;}nextProcessor.processRequest(request);//如果是事務請求,則調用zks.getFollower().request(request)轉發事務請求給Leaderswitch (request.type) {case OpCode.sync:zks.pendingSyncs.add(request);zks.getFollower().request(request);break;case OpCode.create:case OpCode.create2:case OpCode.createTTL:case OpCode.createContainer:case OpCode.delete:case OpCode.deleteContainer:case OpCode.setData:case OpCode.reconfig:case OpCode.setACL:case OpCode.multi:case OpCode.check:zks.getFollower().request(request);break;case OpCode.createSession:case OpCode.closeSession:// Don't forward local sessions to the leader.if (!request.isLocalSession()) {zks.getFollower().request(request);}break;}}}public void processRequest(Request request) {if (!finished) {Request upgradeRequest = null;upgradeRequest = zks.checkUpgradeSession(request);if (upgradeRequest != null) {queuedRequests.add(upgradeRequest);}queuedRequests.add(request);}}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...//send a request packet to the leader//發送一個請求給Leadervoid request(Request request) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();DataOutputStream oa = new DataOutputStream(baos);oa.writeLong(request.sessionId);oa.writeInt(request.cxid);oa.writeInt(request.type);if (request.request != null) {request.request.rewind();int len = request.request.remaining();byte b[] = new byte[len];request.request.get(b);request.request.rewind();oa.write(b);}oa.close();QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);//發送請求,往輸出流leaderOs寫數據writePacket(qp, true);}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {leaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立連接時就已經初始化好輸出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum())  {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好輸入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好輸出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}//創建BIO的scoektprivate Socket createSocket() throws X509Exception, IOException {Socket sock;if (self.isSslQuorum()) {sock = self.getX509Util().createSSLSocket();} else {sock = new Socket();}sock.setSoTimeout(self.tickTime * self.initLimit);return sock;}...
}

二.SendAckRequestProcessor投票反饋處理器

Leader的請求處理鏈有個叫AckRequestProcessor的投票反饋處理器,主要負責在執行完SyncRequestProcessor處理器記錄好事務日志后,向Proposal提議反饋來自Leader的ACK響應。

Follower的請求處理鏈也有個叫SendAckRequestProcessor的投票反饋處理器,主要負責在執行完SyncRequestProcessor處理器記錄好事務日志后,通過發送消息給Leader來向Proposal提議反饋來自Follower的ACK響應。

Follower請求處理鏈的SyncRequestProcessor處理器會啟動一個線程。SyncRequestProcessor處理器會先把請求添加到隊列,然后由線程處理。SyncRequestProcessor的nextProcessor就是SendAckRequestProcessor請求處理器。SendAckRequestProcessor不是一個線程。

public class SendAckRequestProcessor implements RequestProcessor, Flushable {Learner learner;SendAckRequestProcessor(Learner peer) {this.learner = peer;}public void processRequest(Request si) {if (si.type != OpCode.sync) {QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);learner.writePacket(qp, false);//向Leader發送ACK響應...}}...
}public class Follower extends Learner {...void followLeader() throws InterruptedException {...QuorumServer leaderServer = findLeader();            connectToLeader(leaderServer.addr, leaderServer.hostname);long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);syncWithLeader(newEpochZxid);                QuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);//讀取Leader發過來的請求的輸入流leaderIsprocessPacket(qp);//處理Leader發過來的請求,其中就包括Proposal提議的投票請求}...}protected void processPacket(QuorumPacket qp) throws Exception{switch (qp.getType()) {case Leader.PING:            ping(qp);            break;//對Leader發起的Proposal提議投票進行響應case Leader.PROPOSAL:         TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);...//對Leader發起的Proposal提議投票進行響應//此時請求便能進入SyncRequestProcessor處理器的隊列里了//SyncRequestProcessor線程處理完該請求,就會由SendAckRequestProcessor來發送ACK響應fzk.logRequest(hdr, txn);break;...}}...
}public class FollowerZooKeeperServer extends LearnerZooKeeperServer {...@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();}public void logRequest(TxnHeader hdr, Record txn) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}//調用SyncRequestProcessor的processRequest方法處理Proposal提議的投票響應syncProcessor.processRequest(request);}...
}public class Learner {protected BufferedOutputStream bufferedOutput;protected Socket sock;protected InputArchive leaderIs;protected OutputArchive leaderOs;...void readPacket(QuorumPacket pp) throws IOException {synchronized (leaderIs) {//讀取Leader發送過來的請求的輸入流leaderIs.readRecord(pp, "packet");}}void writePacket(QuorumPacket pp, boolean flush) throws IOException {synchronized (leaderOs) {if (pp != null) {//將響應寫入輸出流,發送給LeaderleaderOs.writeRecord(pp, "packet");}if (flush) {bufferedOutput.flush();}}}//和Leader建立連接時就已經初始化好輸出流leaderOs了protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {this.sock = createSocket();int initLimitTime = self.tickTime * self.initLimit;int remainingInitLimitTime = initLimitTime;long startNanoTime = nanoTime();for (int tries = 0; tries < 5; tries++) {remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));if (self.isSslQuorum())  {((SSLSocket) sock).startHandshake();}sock.setTcpNoDelay(nodelay);break;}Thread.sleep(1000);self.authLearner.authenticate(sock, hostname);//初始化好輸入流leaderIsleaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));bufferedOutput = new BufferedOutputStream(sock.getOutputStream());//初始化好輸出流leaderOsleaderOs = BinaryOutputArchive.getArchive(bufferedOutput);}...
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/901561.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/901561.shtml
英文地址,請注明出處:http://en.pswp.cn/news/901561.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

小程序獲取用戶總結(全)

獲取方式 目前小程序獲取用戶一共有3中(自己接觸到的),但由于這個API一直在改,所以不確定后期是否有變動,還是要多關注官方公告。 方式一 使用wx.getUserInfo 實例: wxml 文件<button open-type="getUserInfo" bindgetuserinfo="onGetUserInfo&quo…

[LeetCode 1871] 跳躍游戲 7(Ⅶ)

題面&#xff1a; 數據范圍&#xff1a; 2 ≤ s . l e n g t h ≤ 1 0 5 2 \le s.length \le 10^5 2≤s.length≤105 s [ i ] s[i] s[i] 要么是 ′ 0 ′ 0 ′0′ &#xff0c;要么是 ′ 1 ′ 1 ′1′ s [ 0 ] 0 s[0] 0 s[0]0 1 ≤ m i n J u m p ≤ m a x J u m p <…

【Linux】基礎 IO(文件描述符、重定向、緩沖區)

Linux 1.理解文件2.C文件接口1.打開 寫文件2.讀文件 簡單實現cat命令3.輸出信息到顯示器的方式4.stdin、stdout、stderr5.打開文件的方式 3.系統接口 IO1.傳遞標志位2.open、close3.write、read 4.文件描述符1.是什么&#xff1f;2.分配規則3.重定向原理4.通過dup2系統調用重…

Apache Doris SelectDB 技術能力全面解析

Apache Doris 是一款開源的 MPP 數據庫&#xff0c;以其優異的分析性能著稱&#xff0c;被各行各業廣泛應用在實時數據分析、湖倉融合分析、日志與可觀測性分析、湖倉構建等場景。Apache Doris 目前被 5000 多家中大型的企業深度應用在生產系統中&#xff0c;包含互聯網、金融、…

交換機與路由器的默契配合:它們的聯系與區別

交換機與路由器的默契配合&#xff1a;它們的聯系與區別 一. 交換機與路由器的基本功能1.1 交換機的功能1.2 路由器的功能 二. 交換機和路由器的區別三. 交換機和路由器的聯系3.1 數據轉發的協作3.2 網絡分段與分隔3.3 協同工作提供互聯網接入 四. 交換機和路由器的聯合應用場景…

【計算機系統結構】MIPSsim

目錄 雙擊MIPSsim.exe 問題1&#xff1a;Microsoft Defender SmartScreen阻止了無法是被的應用啟動&#xff0c;運行此應用可能會導致你的電腦存在風險 解決 出現下面的問題的話&#xff0c;建議直接在官網下載 問題2&#xff1a;.NET Framework 3.5安裝錯誤代碼0x80240438 …

map 中key 是否可以放置的自定義的對象?

在 Java 中,可以將自定義對象作為 Map 的 Key,但必須滿足以下條件: 1. 必須正確重寫 hashCode() 和 equals() 方法 原因:Map(如 HashMap)依賴這兩個方法確定鍵的唯一性和存儲位置。未正確重寫的風險: 無法正確查找值:即使兩個對象邏輯上相等,若 hashCode 不同,會被視…

【筆記ing】AI大模型-04邏輯回歸模型

一個神經網絡結構&#xff0c;其中的一個神經網絡層&#xff0c;本質就是一個邏輯回歸模型 深度神經網絡的本質就是多層邏輯回歸模型互相連接或采用一定的特殊連接的方式連接在一起構成的。其中每一個層本質就是一個邏輯回歸模型。 邏輯回歸模型基本原理 邏輯回歸&#xff0…

Android學習總結之算法篇七(圖和矩陣)

有向圖的深度優先搜索&#xff08;DFS&#xff09;和廣度優先搜索&#xff08;BFS&#xff09;的示例&#xff0c;以此來模擬遍歷 GC Root 引用鏈這種有向圖結構&#xff1a; 一、深度優先搜索&#xff08;DFS&#xff09; import java.util.*;public class GraphDFS {privat…

熟悉Linux下的編程

可能 目錄 熟悉Linux下Python編程的含義及與非Linux環境編程的區別 一、核心含義解析 二、與非Linux環境的關鍵區別 三、典型應用場景對比 四、能力培養建議 openfoem的下載之路&#xff1a; 方法一&#xff1a;使用cd命令 方法二&#xff1a;使用快捷方式 方法三&am…

c++引入nacos,詳細步驟

以下是將Nacos引入C項目的詳細步驟&#xff0c;包括安裝、配置和代碼實現&#xff1a; 1. 安裝Nacos服務器 下載Nacos服務器安裝包&#xff0c;可以從Nacos官網獲取最新版本。 解壓安裝包并啟動Nacos服務器&#xff1a; cd nacos/bin sh startup.sh -m standalone 這將啟動…

性能優化實踐

4.1 大規模量子態處理的性能優化 背景與問題分析 量子計算中的大規模量子態處理(如量子模擬、量子態可視化)需要高效計算和實時渲染能力。傳統圖形API(如WebGL)在處理高維度量子態時可能面臨性能瓶頸,甚至崩潰(如表格中14量子比特時WebGL的崩潰)。而現代API(如WebGPU…

課堂總結。

第三章第六節 Spark-SQL核心編程&#xff08;五&#xff09;自定義函數&#xff1a;UDF&#xff1a;val sparkConf new SparkConf().setMaster("local[*]").setAppName("SQLDemo")//創建SparkSession對象val spark :SparkSession SparkSession.builder()…

分庫分表-除了hash分片還有別的嗎?

在分庫分表的設計中,除了常見的 Hash 分片,還有多種策略根據業務場景靈活選擇。以下是幾種主流的分庫分表策略及其應用場景、技術實現和優缺點分析,結合項目經驗(如標易行投標服務平臺的高并發場景)進行說明: 一、常見分庫分表策略 1. 范圍分片(Range Sharding) 原理:…

AUTOSAR圖解==>AUTOSAR_SWS_GPTDriver

AUTOSAR GPT驅動 (通用定時器驅動) 分析 AUTOSAR標準軟件規范解析 目錄 1. GPT驅動概述 1.1 GPT驅動在AUTOSAR架構中的位置1.2 GPT驅動主要功能 2. GPT驅動模塊結構3. GPT驅動初始化流程4. GPT驅動狀態機5. GPT驅動錯誤處理6. GPT預定義定時器7. 總結 1. GPT驅動概述 GPT驅動…

MyBatis持久層框架

MyBatis持久層框架 目錄 一、Mybatis簡介 1. 簡介 2. 持久層框架對比 3. 快速入門&#xff08;基于Mybatis3方式&#xff09; 二、日志框架擴展 1. 用日志打印替代sout 2. Java日志體系演變 3. 最佳拍檔用法 4. Lombok插件的使用 4.1 Lombok簡介 4.2 Lombok安裝 4.3 …

域控制器升級的先決條件驗證失敗,證書服務器已安裝

出現“證書服務器已安裝”導致域控制器升級失敗時&#xff0c;核心解決方法是卸載已安裝的證書服務?。具體操作如下&#xff1a;? ?卸載證書服務? 以管理員身份打開PowerShell&#xff0c;執行命令&#xff1a; Remove-WindowsFeature -Name AD-Certificate該命令會移除A…

VMware虛擬機常用Linux命令進階指南(一)

摘要&#xff1a;本文涵蓋多方面 Linux 命令的使用。包括用戶與用戶組管理&#xff0c;創建用戶和組并設置權限&#xff1b;目錄結構操作&#xff0c;涉及創建和更改目錄結構&#xff1b;Vim 編輯器及文件歸檔&#xff0c;有文件創建、編譯、合并、打包等任務。 更多優質文章 …

【AI News | 20250415】每日AI進展

AI News 1、字節跳動發布Seaweed-7B視頻模型&#xff1a;70億參數實現音視頻同步生成與多鏡頭敘事 字節跳動推出新一代視頻生成模型Seaweed-7B&#xff0c;該模型僅70億參數卻實現多項突破&#xff1a;支持音視頻同步生成、多鏡頭敘事&#xff08;保持角色連貫性&#xff09;、…

如何實現動態請求地址(baseURL)

需求: 在項目中遇到了需要實時更換請求地址,后續使用修改后的請求地址(IP) 例如:原ip請求為http://192.168.1.1:80/xxx,現在需要你點擊或其他操作將其修改為http://192.168.1.2:80/xxx,該如何操作 tips: 修改后需要跳轉( 修改了IP之前的不可使用,需要訪問修改后的地址來操作 …