code iban 是有什么組成_深入淺出Zookeeper(四):客戶端的請求在服務器中經歷了什么...

12c5996ab658eb43c076f177f63530b8.png

作者? 泊浮目 · 沃趣科技高級研發工程師

出品 ?沃趣科技

84890c75adf5a67b89a39a831bdfbd5a.png

1. 前言

當我們向zk發出一個數據更新請求時,這個請求的處理流程是什么樣的?zk又是使用了什么共識算法來保證一致性呢?帶著這些問題,我們進入今天的正文。

2. 設計模式:責任鏈模式(Chain of Responsibility)

在分析源碼之前,必須先和大家簡單的科普一下責任鏈模式,因為這和本文的內容密切相關。簡單的說:責任鏈模式將多個對象組成一條指責鏈,然后按照它們在職責鏈的順序一個個地找出到底誰來負責處理。

那它的好處是什么呢?即松耦合發出請求者和處理者之間的關系:處理者們可以自由的推卸“請求”直到找到相應的處理者。如果處理者收到了不屬于自己所需處理的請求時,只需轉發下去即可,不需要編寫額外的邏輯處理。

3. 請求邏輯追蹤

我們先從ZooKeeperServer這個類入手,查看其實現類。我們需要關心的(常見的)zk服務器角色如下:

  • LeaderZooKeeperServer

  • FollowerZooKeeperServer

  • ObserverZooKeeperServer

3.1 實現鳥瞰

8abcf2598347517573189d5c7503fd9c.png

3.1.1 LeaderZooKeeperServer

代碼的入口在LeaderZooKeeperServer.setupRequestProcessors,為了閱讀體驗,筆者在這里會先以視圖的方式呈現邏輯組織。而喜歡閱讀源碼的同學可以閱讀3.2里的實現詳解。

|-- LeaderRequestProcessor
\-- processRequest //檢查會話是否失效
|-- PrepRequestProcessor
\-- processRequest //參數校驗和根據需求創建事務
|-- ProposalRequestProcessor
\-- processRequest // 發起proposal
\-- //事務型請求
\-- SyncRequestProcessor
\-- processRequest // 將請求記錄到事務日志中,如果有需要的話則觸發快照
\-- AckRequestProcessor
\-- processRequest // 確認事務日志收集完成,對于Proposal的投票器進行ack反饋
\-- CommitProcessor
\-- processRequest // 等待集群內Proposal投票直到可被提交
\-- ToBeAppliedRequestProcessor
\-- processRequest // 存儲已經被CommitProcessor處理過的可提交的Proposal——直到FinalRequestProcessor處理完后,才會將其移除
\-- FinalRequestProcessor
\-- processRequest // 回復請求,并改變內存數據庫的狀態
\-- //非事務型請求
\-- CommitProcessor
\-- processRequest // skip,只處理非事務型請求
\-- ToBeAppliedRequestProcessor
\-- processRequest // skip,配合CommitProcessor一起工作
\-- FinalRequestProcessor
\-- processRequest // 回復請求,并改變內存數據庫的狀態

3.1.2 FollowerZooKeeperServer

//處理 client的請求
|-- FollowerRequestProcessor
\-- processRequest //事務的話調用CommitProcessor,并發送給leader;不然直接到FinalProcessor
|-- CommitProcessor
\-- processRequest // 等待集群內Proposal投票直到可被提交
|-- FinalProcessor
\-- processRequest // 回復請求,并改變內存數據庫的狀態

//專門用來處理 leader發起的proposal
|-- SyncRequestProcessor
| \-- processRequest // 將請求記錄到事務日志中,如果有需要的話則觸發快照
|-- SendAckRequestProcessor
\-- processRequest // ack給proposal發起者,表示自身完成了日志的記錄

3.1.3 ObserverZooKeeperServer

//處理 client的請求
|-- ObserverRequestProcessor
\-- processRequest //和FollowerRequestProcessor代碼幾乎一模一樣:事務的話調用CommitProcessor,并發送給leader;不然直接到FinalProcessor
|-- CommitProcessor
\-- processRequest // 等待集群內Proposal投票直到可被提交
|-- FinalProcessor
\-- processRequest // 回復請求,并改變內存數據庫的狀態

3.2 實現詳解

下面的源碼分析基于3.5.7版本。

3.2.1 LeaderZooKeeperServer

   @Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

setupContainerManager();
}

3.2.2 LeaderRequestProcessor

    @Override
public void processRequest(Request request)throws RequestProcessorException {
// Check if this is a local session and we are trying to create
// an ephemeral node, in which case we upgrade the session
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
LOG.debug("Updating header");
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.info("Error creating upgrade request " + ke.getMessage());
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}

nextProcessor.processRequest(request);
}

這段邏輯很清楚。因需要檢查會話是否過期,去創建一個臨時節點。如果失敗那么就拋出異常。

3.2.3 PrepRequestProcessor

該類有1000多行代碼,故此會挑出較為典型的代碼進行剖析。在此之前,我們先看注釋:

This request processor is generally at the start of a RequestProcessor change. It sets up any transactions associated with requests that change the state of the system. It counts on ZooKeeperServer to update outstandingRequests, so that it can take into account transactions that are in the queue to be applied when generating a transaction.

簡單來說,它一般位于請求處理鏈的頭部,它會設置事務型請求(改變系統狀態的請求)。

OpCode.create2

對于創建型請求邏輯大致為:

          case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;

跳往pRequest2Txn

    protected void pRequest2Txn(int type, long zxid, Request request,
Record record, boolean deserialize)throws KeeperException, IOException, RequestProcessorException{
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));

switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
//....多余代碼不再展示

跳往pRequest2TxnCreate

    private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, record);
}

int flags;
String path;
List acl;byte[] data;long ttl;if (type == OpCode.createTTL) {
CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
CreateRequest createRequest = (CreateRequest)record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
String parentPath = validatePathForCreate(path, request.sessionId);
List listACL = fixupACL(path, request.authInfo, acl);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);int parentCVersion = parentRecord.stat.getCversion();if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {// ignore this one
}boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException(path);
}int newCversion = parentRecord.stat.getCversion()+1;if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
newCversion));
}
StatPersisted s = new StatPersisted();if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
}

大致可以總結下邏輯:

  1. 組裝請求

  2. 校驗請求是否合理:未定義的請求、參數不合理

  3. 檢查上級路徑是否存在

  4. 檢查ACL

  5. 檢查路徑是否合法

  6. 將請求裝入outstandingChanges隊列

  7. 發送至下一個Processor

OpCode.multi

事務型請求:

          case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi));
throw e;
}
List txns = new ArrayList();//Each op in a multi-op must have the same zxid!long zxid = zks.getNextZxid();
KeeperException ke = null;//Store off current pending change records in case we need to rollback
Map pendingChanges = getPendingChanges(multiRequest);for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();int type;
Record txn;/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
}/* Prep the request and convert to a Txn */else {try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting" +" remaining multi ops. Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}
request.setException(e);/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}//FIXME: I don't want to have to serialize it here and then// immediately deserialize in next processor. But I'm// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns));break;

代碼雖然看起來很惡心,但是邏輯倒是挺簡單的:

  • 遍歷所有請求,一個個組裝成起來(要通過一系列的校驗:請求合理、上級路徑存在、ACL、路徑合法),如果中間一直沒有異常,則組裝成一個請求,里面封裝了事務的記錄。不然則變成一個標記為錯誤的請求,并回滾掉當前作用域里的記錄(一個Map)。無論如何,請求都會被發送至下一個Processor。

OpCode.sync

//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;

非事務型請求,校驗一下session就可以發送至下一個Processor了。

3.2.4 ProposalRequestProcessor

對于事務請求會發起Proposal,并發送給CommitProcessor。而且ProposalRequestProcessor還會將事務請求交付給SyncRequestProcessor。

  public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");


/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/

if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}

接著看propose:

  /**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}

byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

Proposal p = new Proposal();
p.packet = pp;
p.request = request;

synchronized(this) {
p.addQuorumVerifier(self.getQuorumVerifier());

if (request.getHdr().getType() == OpCode.reconfig){
self.setLastSeenQuorumVerifier(request.qv, true);
}

if (self.getQuorumVerifier().getVersion() p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}

lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}

這次提交的記錄是一個QuorumPacket,其實現了Record接口。指定了type為PROPOSAL。我們看一下注釋:

    /**
* This message type is sent by a leader to propose a mutation.
*/
public final static int PROPOSAL = 2;

很顯然,這個只有Leader才可以發起的一種變化型請求。再簡單描述下邏輯:

  1. 放到outstandingProposals的Map里

  2. 組裝成發送的Packet

  3. 將Proposal傳遞給下一個Processor

3.2.5 CommitProcessor

顧名思義,事務提交器。只關心事務請求——等待集群內Proposal投票直到可被提交。有了CommitProcessor,每個服務器都可以很好的對事務進行順序處理。

該部分的代碼實在簡陋,故不占篇幅來分析。讀者朋友知道上述信息后,也可以理解整個請求鏈是怎樣的。

3.2.6 SyncRequestProcessor

邏輯非常的簡單,將請求記錄到事務日志中,并嘗試觸發快照。

   public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}

//線程的核心方法,會對queuedRequests這個隊列進行操作
@Override
public void run() {
try {
int logCount = 0;

// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}

3.2.7 ToBeAppliedRequestProcessor

該處理器的核心為一個toBeApplied隊列,專門用來存儲那些已經被CommitProcessor處理過的可提交的Proposal——直到FinalRequestProcessor處理完后,才會將其移除。

        /*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
*/
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request);

// The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator iter = leader.toBeApplied.iterator();if (iter.hasNext()) {
Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {
iter.remove();return;
}
}
LOG.error("Committed request not found on toBeApplied: "
+ request);
}
}

3.2.8 FinalRequestProcessor

篇幅原因,在這里簡單的描述下邏輯:既然是最后一個處理器,那么需要回復相應的請求,并負責事務請求的生效——改變內存數據庫的狀態。

3.2.9 FollowerZooKeeperServer

先看一下其組裝Processors的代碼:

    @Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}

可以看到,這里又兩對兒請求鏈:

  1. FollowerRequestProcessor -> CommitProcessor -> FinalProcessor

  2. SyncRequestProcessor -> SendAckRequestProcessor

那么請求來的時候,是哪個Processor來handle呢?這邊可以大致跟蹤一下:

  • firstProcessor(即FollowerRequestProcessor),是主要handle流程,由父類ZooKeeperServer來調度,handle 請求

  • syncProcessor(即SyncRequestProcessor)從logRequest的入口進來。該類的由Learner調度進來,handle leader的請求。

看到這里有人就要問了,這明明是個Observer,怎么從Learner進來的呢?這就得看簽名了:

/**
* This class is the superclass of two of the three main actors in a ZK
* ensemble: Followers and Observers. Both Followers and Observers share
* a good deal of code which is moved into Peer to avoid duplication.
*/
public class Learner {

為了避免重復代碼,就把一些共同的代碼抽取上來了。

3.2.10 FollowerRequestProcessor

Follower的正常處理器,會判斷是不是事務,是事務就發送給Leader,不然自己處理。

FollowerRequestProcessor.run

    @Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}

而交付請求到CommitProcessor的邏輯很迷,事務型消息應該提交到leader,所以不需要這么一個processor——該Processor在前文也說過,用于等待集群內Proposal投票直到可被提交。

3.2.11 SendAckRequestProcessor

    public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}

邏輯非常的簡單,用于反饋ACK成功,表示自身完成了事務日志的記錄。

3.2.12 ObserverZooKeeperServer

    /**
* Set up the request processors for an Observer:
* firstProcesor->commitProcessor->finalProcessor
*/
@Override
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();

/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}

邏輯很清晰(大概是因為3.3.0后加入的代碼吧),正常的請求鏈為:

  1. ObserverRequestProcessor

  2. CommitProcessor

  3. FinalProcessor

如果syncRequestProcessorEnabled開啟的情況下(缺省為開),這意味著Observer也會去記錄事務日志以及做快照,這會給下降一定的性能,以及更多的內存要求。

然后再看下ObserverRequestProcessor,簡直和FollowerRequestProcessor如出一轍,有追求的工程師都會想辦法復用代碼。

    @Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this Observer has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getObserver().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getObserver().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getObserver().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("ObserverRequestProcessor exited loop!");
}

以上,就是源碼分析部分,基于3.5.7版本。

4. 分布式事務:ZK如何進行事務處理

之前和大家過了一下源碼,相信各位對ZK請求處理流程有了一定的了解。接下來,讓我們理一理事務請求的過程。從Leader的ProposalRequestProcessor開始,大致會分為三個階段,即:

  1. Sync

  2. Proposal

  3. Commit

4.1 Sync

主要由ProposalRequestProcessor來做,通知參與proposql的機器(Leader和Follower)都要記錄事務日志。

4.2 Proposal

每個事務請求都要超過半數的投票認可(Leader + Follower)。

  1. Leader檢查服務端的ZXID可用,可用的話發起Proposal。不可用則拋出XidRolloverException。(見org.apache.zookeeper.server.quorum.Leader.propose)

  2. 根據請求頭、事務體以及ZXID生成Proposal(見org.apache.zookeeper.server.quorum.Leader.propose)

  3. 廣播給所有Follower服務器(見org.apache.zookeeper.server.quorum.Leader.sendPacket)

  4. 相關成員記錄日志,并ACK給Leader服務器——直到超過半數,或者超時(見org.apache.zookeeper.server.quorum.Leader.processAck)。

  5. 將請求丟入toBeApplied隊列中。(見org.apache.zookeeper.server.quorum.Leader.tryToCommit)

  6. 廣播Commit,發給Follower的為COMMIT,而Observer的為Inform。這使它們提交該Proposal。(見org.apache.zookeeper.server.quorum.Leader.commit && inform)

直到這里,算是完成了SyncRequestProcessor -> AckRequestProcessor

4.3 Commit

接下來講CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor的過程。

  1. 請求到CommitPrcocessor后是放入一個隊列里,由線程一個個取出來。當取出來是事務請求時,那么就會設置一個pending對象到投票結束。這樣保證了事務的順序性,也可以讓CommitPrcocessor方便的直到集群中是否有進行中的事務。

  2. 投票通過,喚醒commit流程。提交請求至committedRequests這個隊列中,然后一個個發送至ToBeAppliedRequestProcessor里去。

  3. ToBeAppliedRequestProcessor會等待FinalRequestProcessor處理完成后,從toBeApplied隊列中移除這個Proposal。

  4. FinalRequestProcessor會先去校驗隊列中最新的一個請求是否zxid小于等于當前的請求:

  • 是的話則移除該請求。這種情況說明最新應用的事務比當前事務更早完成共識,當前事務請求無效,但是會被記錄到commitedLog中。

  • 等于是正常現象,因為這個對列是在PrepRequestProcessor時添加的。接著就是應用到內存數據庫了,該內存數據庫實例會維護一個默認上限為500的committedLog——存放最近成功的Proposal,便于快速同步。

如果在該步驟服務器宕機,則會在機器拉起時通過proposal階段的預寫日志進行數據訂正,并通過PlayBackListener同時將其轉換成proposal,并保存到committedLog中,便于同步。

5. 小結

綜合全文,我們可以發現ZK對于事務的處理方式有點像是二階段提交(two-phase commit)。其實這就是ZAB算法,在下一篇文章里,我們會詳細介紹其實現,并介紹它的另一個用途——分布式選舉。

相關鏈接

MySQL 一個讓你懷疑人生的hang死現象

MySQL權限表損壞導致無法啟動

K8S服務暴露: HAProxy在RDS場景下的妙用

深入淺出Zookeeper(三):Watch實現剖析

深入淺出Zookeeper(二):會話管理

深入淺出Zookeeper(一):存儲技術

組復制系統變量 | 全方位認識 MySQL 8.0 Group Replication

組復制升級 | 全方位認識 MySQL 8.0 Group Replication

組復制性能 | 全方位認識 MySQL 8.0 Group Replication

組復制安全 | 全方位認識 MySQL 8.0 Group Replication

組復制常規操作-使用xtrabackup備份恢復或添加組成員 | 全方位認識MySQL8.0 Group Replication

組復制常規操作-網絡分區&混合使用IPV6與IPV4 | 全方位認識 MySQL 8.0 Group Replication

組復制常規操作-分布式恢復 | 全方位認識 MySQL 8.0 Group Replication

組復制常規操作-事務一致性保證 | 全方位認識 MySQL 8.0 Group Replication

組復制常規操作-在線配置組 | 全方位認識 MySQL 8.0 Group Replication

再述mysqldump時域問題

揭秘 MySQL 主從環境中大事務的傳奇事跡

MySQL 執行DDL語句 hang住了怎么辦?

手把手教你認識OPTIMIZER_TRACE

MySQL行級別并行復制能并行應用多少個binlog group?

binlog server還是不可靠嗎?

MySQL binlog基于時間點恢復數據失敗是什么鬼?

MySQL高可用工具Orchestrator系列六:Orchestrator/raft一致性集群

MySQL高可用工具Orchestrator系列五:raft多節點模式安裝

MySQL高可用工具Orchestrator系列四:拓撲恢復

MySQL高可用工具Orchestrator系列三:探測機制

select into outfile問題一則

開源監控系統Prometheus的前世今生

prometheus監控多個MySQL實例

prometheus配置MySQL郵件報警

MySQL問題兩則

Kubernetes?scheduler學習筆記

直方圖系列1

執行計劃-12:基數反饋

執行計劃-11:真實數據

執行計劃-10:猜想

執行計劃-9:多倍操作

執行計劃-8:成本、時間等

大數據量刪除的思考(四)

大數據量刪除的思考(三)

日志信息記錄表|全方位認識 mysql 系統庫

復制信息記錄表|全方位認識 mysql 系統庫

時區信息記錄表|全方位認識 mysql 系統庫

Oracle RAC Cache Fusion系列十八:Oracle RAC Statisticsand Wait Events

Oracle RAC Cache Fusion 系列十七:Oracle RAC DRM

Oracle RAC CacheFusion 系列十六:Oracle RAC CurrentBlock Server

a38dfaf5a7e31fd9ced22ae6f833e3a4.png

更多干貨,歡迎來撩~

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/540087.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/540087.shtml
英文地址,請注明出處:http://en.pswp.cn/news/540087.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Centos7: 配置IO調度

今天有同事問我個問題:“CentOS7如何修改IO默認的調度”,我這里簡單整理下如何調整的方法: 1、查看CentOS7下IO支持的調度 1234[rootkvm02 ~]# dmesg | grep -i scheduler [ 16.900459] io scheduler noop registered[ 16.900473] io sch…

啟動activemq_「Java」 - SpringBoot amp; ActiveMQ

一、消息隊列消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題,實現高性能、高可用、可伸縮和最終一致性架構,是大型分布式系統不可缺少的中間件。目前在生產環境中使用較多的消息隊列有ActiveMQ、Rabbit…

永不休眠怎么設置_電腦休眠后應該怎樣喚醒?

可能有朋友會碰到這種情況,電腦屏幕顯示正在休眠,而且不停的轉圈,這是怎么回事,如何喚醒?下面讓堅哥為大家分析一下吧。電腦開機顯示休眠一般根據以下幾種情況進行處理:1、正常電腦休眠:一般的是按一下電源…

配置ssl證書_Mysql配置ssl證書

本環境基于mysql5.6配置,通過openssl生成證書進行配置一、確認環境信息1、查看數據庫版本mysql> select version();-----------| version() |-----------| 5.6.36 |-----------2、查看數據庫是否支持ssl配置mysql> show variables like have%ssl%;-----------…

echarts散點圖中大小_echarts在地圖上繪制散點圖(任意點)

項目需求:在省份地圖上繪制散點圖,散點位置不一定是哪個城市或哪個區縣,即任意點通過查詢官網文檔,找到一個與需求類似的demo:,更改代碼,將中國地圖替換為省份地圖,省份地圖的js代碼…

如何讓圖片充滿excel單元格_如何在Excel單元格建立下拉菜單

對于一些常用的數據我們往往會希望能夠盡量快速的輸入,下拉菜單就是一個最簡單的解決辦法。那么如何實現下拉菜單呢?跟隨以下步驟,建立屬于自己的下拉菜單吧!如何建立下拉菜單?一、確定內容:在單元格中&…

pgsql中float4導致java程序精度丟失_Java基礎系列02

注釋Java中支持三種注釋:1.單行注釋以//開始換行結束。2.多行注釋以/*開始,以*/結束。3.說明注釋以/**開始,以*/結束。關鍵字關鍵字:是指在程序中,Java已經定義好的單詞,具有特殊含義。例如上篇文章中Hello…

python地理位置聚類_python – 用于聚類地理位置數據的DBSCAN

我有一個緯度和經度對的數據幀.這是我的數據幀外觀.order_lat order_long0 19.111841 72.9107291 19.111342 72.9083872 19.111342 72.9083873 19.137815 72.9140854 19.119677 72.9050815 19.119677 72.9050816 19.119677 72.9050817 19.120217 72.9071218 19.120217 72.90712…

用idea建立jsp項目_用idea創建maven項目,配置tomcat詳解

用idea創建maven項目,配置tomcat詳解,電腦上得有jdk1.7,或者1.8,然后就是maven3.x吧,再有就是tomcat7以上下面就直接開始看圖啦:這個我剛剛開始沒注意細看,原來web app 的骨架有2個呢&#xff0…

求立方根_初一數學立方根考點詳解,立足基礎,把握題型,學會方法

初一數學實數部分,平方根和立方根這兩部分的知識點比較的基礎,但是考試中卻是經常會考,并且有很多的“陷阱”,也是讓學生猝不及防,今天我和同學們繼續學習交流立方根的知識點,通過詳解考點,幫助…

ctf音頻yinxie_ctf-圖片隱寫術

文件隱藏常見文件頭尾jpg 文件頭FFD8FFE0 文件尾FFD9png 文件頭89504E470D0A1A0A 文件尾AE426082gif 文件頭47494638 文件尾003Bzip 文件頭504B0304 文件尾rar 文件頭Rar! 文件尾7zip 文件頭7z集 文件尾文件分析工具binwalk,kali中集成工具,用于分析文件中所包含的內…

mysql負責均衡讀寫分離_MySQL讀寫分離之負載均衡

mysql官方文檔中有這么一句話:MySQL Proxy is currently an Alpha release and should not be used within production environments.So。。。使用haproxy來做這些事,以下僅供參考:環境配置master 192.168.1.106 master1s…

mysql使用技巧_MySQL使用不得不看的幾個小技巧

程序中寫入的一行行的SQL語句,如果使用了一些優化小技巧,定能達到事半功倍的效果。1. 優化你的MySQL查詢緩存在MySQL服務器上進行查詢,可以啟用高速查詢緩存。讓數據庫引擎在后臺悄悄的處理是提高性能的最有效方法之一。當同一個查詢被執行多…

mysql oracle 數據類型轉換_Mysql與Oracle之間的數據類型轉換

[轉]MYSQL 與 Oracle 之間的數據類型轉換

rad linux下安裝mysql_Linux(CentOS或RadHat)下MySQL源碼安裝

MySQL 5.6開始,需要使用g進行編譯。cmake :MySQL 5.5開始,使用cmake進行工程管理,cmake需要2.8以上版本。bison :MySQL語法解析器需要使用bison進行編譯。ncurses-devel :用于終端操作的開發包。zlib …

mysql5.1數據庫亂碼_MySql5.1以上版本中文亂碼的解決方法

在my.cnf內添加以下代碼輸出err日志信息:[safe_mysqld]err-log /var/log/mysqld.logpid-file /var/lib/mysql/localhost.localdomain.pid在shell中輸入/bin/sh /usr/bin/mysqld_safe &啟動mysql,shell輸出如下:110328 11:39:55 mysqld_…

mysql 命令行批量sql_命令行中執行批量SQL的方法

基礎信息介紹測試庫:test;測試表:user;user表定義:CREATE TABLE user (id int(11) NOT NULL AUTO_INCREMENT,name char(30) NOT NULL,age int(11) NOT NULL,gender tinyint(1) DEFAULT 1 COMMENT 性別:1男;…

mysql雙主數據一致性_MySQL雙主一致性架構優化 | 架構師之路-阿里云開發者社區...

一、雙主保證高可用MySQL數據庫集群常使用一主多從,主從同步,讀寫分離的方式來擴充數據庫的讀性能,保證讀庫的高可用,但此時寫庫仍然是單點。在一個MySQL數據庫集群中可以設置兩個主庫,并設置雙向同步,以冗…

spool導出姓名中文亂碼_MySQL不同字符集轉化標準—7步實現,杜絕亂碼!

引言作為資深的DBA程序員,在工作中是否會遇到更這樣的情況呢?原有數據庫的字符集由于前期規劃不足,隨著業務的發展不能滿足業務的需求。如原來業務系統用的是utf8字符集,后期有存儲表情符號的需求,uft8字符集就不能滿足…

appium和airtest_關于Airtest自動化測試工具

一開始知道Airtest大概是在年初的時候,當時,看了一下官方的文檔,大概是類似Sikuli的一個工具,主要用來做游戲自動化的,通過截圖的方式用來解決游戲自動化測試的難題。最近,移動端測試的同事嘗試用它的poco庫…