Nacos源碼—4.Nacos集群高可用分析三

大綱

6.CAP原則與Raft協議

7.Nacos實現的Raft協議是如何寫入數據的

8.Nacos實現的Raft協議是如何選舉Leader節點的

9.Nacos實現的Raft協議是如何同步數據的

10.Nacos如何實現Raft協議的簡版總結

6.CAP原則與Raft協議

(1)CAP分別指的是什么

(2)什么是分區以及容錯

(3)為什么不能同時滿足CAP原則

(4)Raft協議定義節點的三種狀態

(5)Raft協議的數據同步流程

(6)Raft協議的Leader選舉流程

(7)Raft協議如何解決腦裂問題

(8)總結

(1)CAP分別指的是什么

一.C指的是一致性Consistency

各個集群節點之間的數據,必須要保證一致。

二.A指的是可用性Availability

在分布式架構中,每個請求都能在合理的時間內獲得符合預期的響應。

三.P指的是分區容錯性Partition Tolerance

當集群節點間出現網絡問題,整個系統依然能正常提供服務。

在CAP原則中,我們首先要保證P即分區容錯性。

(2)什么是分區以及容錯

分區指的是網絡分區。如果在分布式架構中,出現了網絡通信問題。比如節點A可以和節點B相互通信,但是不能和節點C、D進行通信。但是節點C、D之間是可以通信的,這種情況下就是出現了網絡分區。

容錯是指在分布式架構中,集群節點出現分區情況時,整個系統仍然要保持對外提供服務的能力,不能因為網絡分區而導致整個系統不能對外提供服務。

在CAP原則下:由于P是首要保證的,所以C、A就不能兼得,必須要舍棄其一。因此需要根據業務來權衡,是更注重可用性、還是更加注重一致性。

(3)為什么不能同時滿足CAP原則

首先前提條件是,需要滿足P。

情況一:假設在分布式集群中選擇使用CP架構,更加注重數據的一致性。這時出現了網絡分區,節點A、B與節點C之間網絡不互通。如果此時向集群寫入一個數據,由于節點A、B能夠網絡互通,所以節點A、B寫入的數據可以相互同步,但是節點C沒辦法做數據同步。那么在這種情況下,如何才能保證數據的一致性呢?

此時只能將節點C暫時看作不可用的狀態,等網絡恢復和數據同步好了,節點C才能正常地提供服務。否則下一次用戶向集群請求獲取數據時,請求到了節點C。但由于網絡分區導致節點C并未同步數據,那么本次查詢就查不到數據,這樣就達不到CP架構的一致性要求了。所以肯定需要舍棄節點C的可用性。

情況二:假設在分布式集群中選擇使用AP架構,更加注重數據的可用性。這時出現了網絡分區,節點A、B與節點C之間網絡不互通。雖然節點C暫時由于網絡不通的原因,無法進行數據同步。但是由于集群更加注重服務的可用性,所以節點C還是可以正常提供服務。只是節點C和節點A、B之間的數據略有差異,但不影響節點的正常使用。所以就需要舍棄節點C的數據一致性。

在AP架構中,集群節點間的數據也需要同步。集群節點數據的同步一般都是通過一些異步任務來保證數據的最終一致性,只是同步時效沒有那么及時。

在CP架構中,可以通過Raft協議實現數據一致性。Raft協議就是在分布式架構下,多節點保證數據一致性的協議。

(4)Raft協議定義節點的三種狀態

Raft協議對集群節點定義了三種狀態:

一.Follower追隨者

這是默認的狀態,所有的集群節點一開始都是Follower狀態。

二.Candidate候選者

當某集群節點開始發起投票選舉Leader時,首先會投給自己一票,這時就會從Follower狀態變成Candidate狀態。

三.Leader領導者

當某集群節點獲得了大多數集群節點的投票,那么就會變成Leader狀態。

(5)Raft協議的數據同步流程

一.Raft協議是如何處理數據寫入請求

在Raft協議中,只有Leader節點才會處理客戶端數據的寫入請求。如果非Leader節點收到了寫入請求,會轉發到Leader節點上進行處理。

數據的寫入一共有兩個狀態:uncommit和commit。這個兩個狀態對應于兩階段提交,可以保證數據正確寫入成功。

當Leader節點接收到一個數據寫入請求時:首先會在自身的節點進行數據處理,然后馬上同步給集群的其他節點,此時Leader節點的這個數據的狀態是uncommit狀態。只有當半數以上的其他節點寫入成功,Leader節點才會把數據寫入成功。當Leader節點最終把數據寫入成功后,會通知其他節點進行commit,此時Leader節點的這個數據的狀態是commit狀態。

二.非Leader節點寫入失敗如何處理

由于Leader節點只需要有半數以上的節點寫入成功即可,所以如果有部分非Leader節點沒有寫入或寫入失敗,該如何處理?

Raft協議中的Leader節點和Follower節點會有心跳機制。在心跳傳輸過程中,Leader節點會把最新的數據傳給其他Follower節點,以保證Follower節點中的數據和Leader節點的數據是一致的。

需要注意的是:當Follower節點沒有在指定時間內接收到Leader節點發送過來的心跳包,Follower節點就會認為Leader節點掛掉了,此時Follower節點會把自身狀態修改為Candidate并且重新發起投票。

https://thesecretlivesofdata.com/raft/#home
Let's say we have a single node system.
For this example, you can think of our node as a database server that stores a single value.
We also have a client that can send a value to the server.
Coming to agreement, or consensus, on that value is easy with one node.
But how do we come to consensus if we have multiple nodes?
That's the problem of distributed consensus.
Raft is a protocol for implementing distributed consensus.
Let's look at a high level overview of how it works.A node can be in 1 of 3 states: the Follower state, the Candidate state, or the Leader state.
All our nodes start in the follower state.
If followers don't hear from a leader then they can become a candidate.
The candidate then requests votes from other nodes.
Nodes will reply with their vote.
The candidate becomes the leader if it gets votes from a majority of nodes.
This process is called Leader Election.All changes to the system now go through the leader.
Each change is added as an entry in the node's log.
This log entry is currently uncommitted so it won't update the node's value.
To commit the entry the node first replicates it to the follower nodes...
then the leader waits until a majority of nodes have written the entry.
The entry is now committed on the leader node and the node state is "5".
The leader then notifies the followers that the entry is committed.
The cluster has now come to consensus about the system state.
This process is called Log Replication.

(6)Raft協議的Leader選舉流程

Leader是如何選舉出來的?

一.選舉超時時間和選舉步驟

假設使用了Raft協議的集群有3個節點:那么一開始,三個節點都會在倒計時中進行等待,此時會有一個稱為Election Timeout的隨機休眠時間或選舉超時時間,該選舉超時時間會被隨機分配到150ms到300ms之間。

等待超過選舉超時時間過后,節點會馬上進行投票,投票分為如下幾個步驟:

步驟一:先投給自己一票,并且把自己節點狀態修改為Candidate

步驟二:向其他集群節點進行投票

步驟三:獲取投票結果,如果過半節點投自己,則把狀態修改為Leader

一旦Leader節點選舉出來,其他節點的數據都要以Leader節點的為準。因此Leader節點會馬上通過心跳機制,同步數據給其他節點。

https://thesecretlivesofdata.com/raft/#election
In Raft there are two timeout settings which control elections.First is the election timeout.
The election timeout is the amount of time a follower waits until becoming a candidate.
The election timeout is randomized to be between 150ms and 300ms.
After the election timeout the follower becomes a candidate and starts a new election term...
...votes for itself...
...and sends out Request Vote messages to other nodes.
If the receiving node hasn't voted yet in this term then it votes for the candidate...
...and the node resets its election timeout.
Once a candidate has a majority of votes it becomes leader.
The leader begins sending out Append Entries messages to its followers.Second is the heartbeat timeout.
These messages are sent in intervals specified by the heartbeat timeout.
Followers then respond to each Append Entries message.This election term will continue until a follower stops receiving heartbeats and becomes a candidate.
Let's stop the leader and watch a re-election happen.
Node B is now leader of term 2.
Requiring a majority of votes guarantees that only one leader can be elected per term.If two nodes become candidates at the same time then a split vote can occur.
Let's take a look at a split vote example...
Two nodes both start an election for the same term...
...and each reaches a single follower node before the other.
Now each candidate has 2 votes and can receive no more for this term.
The nodes will wait for a new election and try again.
Node A received a majority of votes in term 5 so it becomes leader.

二.不同的選舉情況分析

如果集群啟動時,節點C率先等待超過了選舉超時時間。那么節點C會馬上發起投票,并改變它自己的狀態變為Candidate。等節點C獲取超過半數以上的投票,那么它就會成為Leader節點。

如果在集群運行中,Leader節點突然下線。那么這時候其他的Follower節點會重新進行Leader選舉。假設原本的Leader節點是B,但由于B突然下線,節點A、C會重新發起投票,最終節點C成為新的Leader節點。并且重新選舉Leader后,Trem(任期)會進行遞增。Term可理解為Leader的選舉次數,次數越大說明數據肯定是最全的。

如果有四個節點,其中有兩個Candidate節點都有2票,沒有過半。在這種情況下,則會讓全部節點重新進行隨機睡眠,重新進行Leader選舉。

(7)Raft協議如何解決腦裂問題

在Raft協議的一些情況下,可能會產生多個Leader節點。那么多個Leader節點是如何產生的?多個Leader會不會有沖突?

如果在一個集群下,出現了兩個Leader節點,那么這就是腦裂問題。假設集群節點有5個,節點B是Leader,但由于發生了網絡分區問題。節點A、B可以相互通信,可是節點C、D、E不能和Leader進行通信。那么節點C、D、E將會重新進行Leader選舉,最終節點C也成為了Leader。此時,在原本一個集群下,就會產生兩個Leader節點。

此時,如果有客戶端來進行寫數據:

第一個客戶端請求到了節點B,由于節點B所在分區網絡只有一個Follower節點,達不到半數以上要求,所以節點B的數據一直處于uncommit狀態,數據也不會寫入成功。

第二個客戶端請求到了節點C,由于節點C所在分區網絡有兩個Follower節點,有半數以上支持,所以節點C的數據是能夠寫入成功的。

假如網絡突然恢復,5個節點都可以相互通信,那么怎么處理兩個Leader。這時兩個Leader會相互發送心跳。節點B會發現節點C的Term比自己大,所以會認節點C為Leader并自動轉換為Follower節點。

https://thesecretlivesofdata.com/raft/#replication
Once we have a leader elected we need to replicate all changes to our system to all nodes.
This is done by using the same Append Entries message that was used for heartbeats.
Let's walk through the process.First a client sends a change to the leader. Set value by "5".
The change is appended to the leader's log...
...then the change is sent to the followers on the next heartbeat.
An entry is committed once a majority of followers acknowledge it...
...and a response is sent to the client.Now let's send a command to increment the value by "2".
Our system value is now updated to "7".
Raft can even stay consistent in the face of network partitions.Let's add a partition to separate A & B from C, D & E.
Because of our partition we now have two leaders in different terms.
Let's add another client and try to update both leaders.
One client will try to set the value of node B to "3".
Node B cannot replicate to a majority so its log entry stays uncommitted.
The other client will try to set the value of node C to "8".
This will succeed because it can replicate to a majority.Now let's heal the network partition.
Node B will see the higher election term and step down.
Both nodes A & B will roll back their uncommitted entries and match the new leader's log.
Our log is now consistent across our cluster.

(8)總結

Raft協議相關論文:

https://raft.github.io/raft.pdf

Raft協議詳細流程演示:

https://thesecretlivesofdata.com/raft/

Nacos既支持AP架構,也支持CP架構。前面介紹的集群源碼,是屬于AP架構的。在源碼中可以看到很多異步任務,說明是比較看重可用性。由于是使用定時任務,那么數據會在某些特定時間出現不一致的情況,但最終還是會保證一致性。

7.Nacos實現的Raft協議是如何寫入數據的

(1)Nacos 1.4.1版本實現Raft協議說明

(2)Nacos實現的Raft協議是如何寫入數據的

(3)RaftCore的signalPublish()方法總結

(1)Nacos 1.4.1版本實現Raft協議說明

Nacos 1.4.1版本并沒有完全按照標準的Raft協議所定義的流程來實現,所以該版本的實現中會存在一些問題。并且Nacos 1.4.1版本,已標注后期會刪除這套實現。

Nacos 2.x版本會采用JRaft來實現Raft協議,JRaft就是完全按照Raft協議定義的流程來實現的。所以早期版本實現的Raft協議,沒必要仔細研究,大概知道流程即可。

(2)Nacos實現的Raft協議是如何寫入數據的

在Raft協議里只有Leader節點才會操作數據,并且會有兩階段提交的動作,所以可以通過服務實例注冊的處理為入口進行分析。

在進行服務實例注冊時:會通過一個key來選擇調用不同ConsistencyService實現類的put()方法。而這個key中會包含一個很關鍵的屬性叫做ephemeral,ephemeral默認是true,所以最終會執行AP架構下的服務注冊。我們可以在yml配置文件中,把ephemeral屬性設置為false,那么在服務實例注冊時,就會執行CP架構下的服務注冊。不過,注冊中心一般很少使用CP架構。

如果執行的是CP架構下的服務注冊,那么最終會調用RaftConsistencyServiceImpl的put()方法,從而觸發調用Raft協議的核心方法:RaftCore的signalPublish()方法。

@Component
public class ServiceManager implements RecordListener<Service> {...//Add instance to service. 添加服務實例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//構建要注冊的服務實例對應的服務的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根據命名空間以及服務名獲取要注冊的服務實例對應的服務Service service = getService(namespaceId, serviceName);//使用synchronized鎖住要注冊的服務實例對應的服務synchronized (service) {//由于一個服務可能存在多個服務實例,所以需要根據當前注冊請求的服務實例ips,獲取對應服務的最新服務實例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances實現了用于在Nacos集群進行網絡傳輸的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//執行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}...
}@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {    private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;public DelegateConsistencyServiceImpl(PersistentConsistencyServiceDelegateImpl persistentConsistencyService, EphemeralConsistencyService ephemeralConsistencyService) {this.persistentConsistencyService = persistentConsistencyService;this.ephemeralConsistencyService = ephemeralConsistencyService;}@Overridepublic void put(String key, Record value) throws NacosException {//如果是臨時實例,則調用DistroConsistencyServiceImpl.put()方法//如果是持久化實例,則調用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}...private ConsistencyService mapConsistencyService(String key) {//根據不同的key選擇不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}
}@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {private final RaftConsistencyServiceImpl oldPersistentConsistencyService;private final BasePersistentServiceProcessor newPersistentConsistencyService;private volatile boolean switchNewPersistentService = false;...@Overridepublic void put(String key, Record value) throws NacosException {switchOne().put(key, value);}private PersistentConsistencyService switchOne() {return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;}...
}//Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {private final RaftCore raftCore;...@Overridepublic void put(String key, Record value) throws NacosException {checkIsStopWork();try {//Raft協議的核心實現raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);}}...
}

RaftCore的signalPublish()方法中的邏輯大概分成三部分:

第一部分:方法一開始就會判斷自身節點是不是Leader節點,如果不是則會通過HTTP方式轉發給Leader節點進行處理。

第二部分:RaftCore的signalPublish()方法中有一行核心代碼onPublish(),即如果是Leader節點則會執行RaftCore的onPublish()方法來處理數據。該方法會先把數據寫入到本地文件,然后馬上同步給內存注冊表。

RaftStore的write(datum)方法會把服務實例信息持久化到本地文件,即把Instance服務實例信息以JSON格式持久化到Nacos服務端目錄下,并且存儲的文件是以命名空間##分組@@服務名來命名的。而持久化的服務實例信息,在下一次服務端重啟時會重新加載到內存注冊表中。

服務實例信息持久化后,會通過NotifyCenter發布ValueChangeEvent事件更新注冊表。RaftCore的init()方法會向NotifyCenter注冊一個訂閱者PersistentNotifier。所以NotifyCenter發布ValueChangeEvent事件時,就會被PersistentNotifier的onEvent()方法監聽到,然后執行PersistentNotifier的notify()方法,最后會執行Service的onChange()方法來更新內存注冊表。

第三部分:主要就是遍歷集群節點,向每個節點發起通知請求來進行數據同步,這里會使用CountDownLatch閉鎖來實現控制集群半數節點同步成功。

在創建CountDownLatch閉鎖時,會獲取集群半數的數量來創建閉鎖。每當有一個集群節點同步成功,就對CountDownLatch閉鎖進行減1。最后使用閉鎖的await()方法進行等待,直到閉鎖減完或超時才繼續執行。這樣通過CountDownLatch并發工具類就能實現需要過半節點成功的功能。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private final RaftProxy raftProxy;private final RaftStore raftStore;public final PersistentNotifier notifier;...@PostConstructpublic void init() throws Exception {...//注冊訂閱者NotifyCenter.registerSubscriber(notifier);...}...//Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}//第一部分:判斷自己是不是Leader節點if (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);//獲取Leader節點final RaftPeer leader = getLeader();//將寫請求轉發給Leader節點raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));//第二部分:Leader節點會執行到這里進行數據處理,把服務實例信息寫入磁盤以及內存onPublish(datum, peers.local());//第三部分:final String content = json.toString();//通過閉鎖來控制半數以上節點,peers.majorityCount()就是獲取集群半數以上的節點數量final CountDownLatch latch = new CountDownLatch(peers.majorityCount());//同步給其他節點for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);//通過HTTP方式通知其他節點HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode());return;}//某個節點成功,閉鎖-1latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}//通過閉鎖的await()方法來等待半數集群節點同步成功if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {//only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}}...//Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();//if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {//先把數據寫到本地文件中raftStore.write(datum);}//同步緩存datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());//通過發布ValueChangeEvent事件來同步內存注冊表NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);}
}@Deprecated
@Component
public class RaftStore implements Closeable {...//Write datum to cache file.public synchronized void write(final Datum datum) throws Exception {String namespaceId = KeyBuilder.getNamespace(datum.key);File cacheFile = new File(cacheFileName(namespaceId, datum.key));if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {MetricsMonitor.getDiskException().increment();throw new IllegalStateException("can not make cache file: " + cacheFile.getName());}ByteBuffer data;data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {fc.write(data, data.position());fc.force(true);} catch (Exception e) {MetricsMonitor.getDiskException().increment();throw e;}//remove old format file:if (StringUtils.isNoneBlank(namespaceId)) {if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {String oldDatumKey = datum.key.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));if (cacheFile.exists() && !cacheFile.delete()) {Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, datum.value);throw new IllegalStateException("failed to delete old format datum: " + datum.key);}}}}...
}//事件發布中心:事件發布機制的實現
public class NotifyCenter {...//注冊訂閱者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//執行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}//發布事件public static boolean publishEvent(final Event event) {return publishEvent(event.getClass(), event);}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//執行DefaultPublisher.publish()方法return publisher.publish(event);}...}...
}public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();...@Overridepublic void addSubscriber(Subscriber subscriber) {subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {...  receiveEvent(event);...}void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {...notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {final Runnable job = new Runnable() {@Overridepublic void run() {subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}

(3)RaftCore的signalPublish()方法總結

首先會判斷自身節點是不是Leader,如果不是,則會轉發給Leader處理。如果是Leader,則會對數據進行處理,先是寫入到本地文件,然后同步到內存注冊表,最后會通知其他Follower節點進行數據同步。

可見Nacos 1.4.1版本在數據的寫入實現上,并沒有兩階段提交的處理。而是Leader自身處理數據完成后,直接就去同步給其他集群節點。哪怕集群節點同步失敗或沒有過半節點成功,Leader的數據也不會回滾而只拋出異常。所以,Nacos 1.4.1版本只是實現了Raft的簡化版,后續也會被廢棄掉的。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/80175.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/80175.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/80175.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

普通IT的股票交易成長史--20250509晚復盤

聲明&#xff1a; 本文章的內容只是自己學習的總結&#xff0c;不構成投資建議。價格行為理論學習可參考簡介中的幾位&#xff0c;感謝他們的無私奉獻。 送給自己的話&#xff1a; 倉位就是生命&#xff0c;絕對不能滿倉&#xff01;&#xff01;&#xff01;&#xff01;&…

python實現點餐系統

使用python實現點餐系統的增加菜品及價格&#xff0c;刪除菜品&#xff0c;查詢菜單&#xff0c;點菜以及會員折扣價等功能。 代碼&#xff1a; 下面展示一些 內聯代碼片。 # coding utf-8menu {拍黃瓜: 6, 小炒肉: 28, 西紅柿炒蛋: 18, 烤魚: 30, 紅燒肉: 38, 手撕雞: 45,…

從ellisys空口分析藍牙耳機回連手機失敗案例

問題背景&#xff1a; 前兩天同事發現我們現在做的項目&#xff0c;耳機在跟某些特定類型安卓手機&#xff08;尤其是比較新的手機&#xff09;回連會失敗&#xff0c;然后我幫他分析了一些log&#xff0c;記錄如下&#xff1a; 回連失敗所做步驟如下&#xff1a; 手機和耳機…

教育+AI:個性化學習能否顛覆傳統課堂?

近年來&#xff0c;人工智能&#xff08;AI&#xff09;技術迅猛發展&#xff0c;逐漸滲透到各行各業&#xff0c;教育領域也不例外。從智能輔導系統到自適應學習平臺&#xff0c;AI正在改變傳統的教學模式&#xff0c;使個性化學習成為可能。然而&#xff0c;這種變革能否真正…

【C++設計模式之Strategy策略模式】

C設計模式之Strategy策略模式 模式定義核心思想動機(Motivation)結構(Structure)實現步驟1. 定義策略接口&#xff08;基于繼承&#xff09;2.實現具體策略3.上下文類(Context)4. 在main中調用 應用場景&#xff08;基于繼承&#xff09;1.定義策略接口2.實現具體策略3.上下文類…

Python企業級MySQL數據庫開發實戰指南

簡介 Python與MySQL的完美結合是現代Web應用和數據分析系統的基石,能夠創建高效穩定的企業級數據庫解決方案。本文將從零開始,全面介紹如何使用Python連接MySQL數據庫,設計健壯的表結構,實現CRUD操作,并掌握連接池管理、事務處理、批量操作和防止SQL注入等企業級開發核心…

matlab轉python

1 matlab2python開源程序 https://blog.csdn.net/qq_43426078/article/details/123384265 2 網址 轉換網址&#xff1a;https://app.codeconvert.ai/code-converter?inputLangMatlab&outputLangPython 文件比較網址&#xff1a;https://www.diffchecker.com/text-comp…

Vue 3 中編譯時和運行時的概念區別

文章目錄 前言Vue 3 中的編譯時 vs 運行時區別模板在編譯時轉化為渲染函數編譯時的優化處理運行時的工作:創建組件實例與渲染流程前言 詳細整理 Vue 3 中編譯時和運行時的概念區別,并重點解釋為什么組件實例是在運行時創建的。 我會結合官方文檔、源碼分析和社區解釋,確保內…

Spring 框架實戰:如何實現高效的依賴注入,優化項目結構?

Spring 框架實戰&#xff1a;如何實現高效的依賴注入&#xff0c;優化項目結構&#xff1f; 在當今的 Java 開發領域&#xff0c;Spring 框架占據著舉足輕重的地位。而依賴注入作為 Spring 的核心概念之一&#xff0c;對于構建高效、靈活且易于維護的項目結構有著關鍵作用。本…

創建虛擬服務時實現持久連接。

在調度器中配置虛擬服務&#xff0c;實現持久性連接&#xff0c;解決會話保持問題。 -p 【timeout】 -p 300 這5分鐘之內調度器會把來自同一個客戶端的請求轉發到同一個后端服務器。【不管使用的調度算法是什么。】【稱為持久性連接。】 作用&#xff1a;將客戶端一段時間…

說下RabbitMQ的整體架構

RabbitMQ 是一個基于 AMQP&#xff08;Advanced Message Queuing Protocol&#xff09; 協議的開源消息中間件&#xff0c;RabbitMQ的整體架構圍繞消息的生產、路由、存儲和消費設計&#xff0c;旨在實現高效、可靠的消息傳遞&#xff0c;它由多個核心組件協同工作。 核心組件 …

STM32--GPIO

教程 視頻 博主教程 STM32系統結構圖 GPIO GPIO&#xff08;General Purpose Input/Output&#xff09;是STM32內部的一種外設。 一個STM32芯片內存在多個GPIO外設&#xff0c;每個GPIO外設有16個引腳&#xff1b; 比如GPIOA&#xff1a;PA0~PA15; GPIOB&#xff1a;PB0~…

QUIC協議優化:HTTP_3環境下的超高速異步抓取方案

摘要 隨著 QUIC 和 HTTP/3 的普及&#xff0c;基于 UDP 的連接復用與內置加密帶來了遠超 HTTP/2 的性能提升&#xff0c;可顯著降低連接握手與擁塞恢復的開銷。本文以爬取知乎熱榜數據為目標&#xff0c;提出一種基于 HTTPX aioquic 的異步抓取方案&#xff0c;并結合代理 IP設…

[論文閱讀]MCP Guardian: A Security-First Layer for Safeguarding MCP-Based AI System

MCP Guardian: A Security-First Layer for Safeguarding MCP-Based AI System http://arxiv.org/abs/2504.12757 推出了 MCP Guardian&#xff0c;這是一個框架&#xff0c;通過身份驗證、速率限制、日志記錄、跟蹤和 Web 應用程序防火墻 &#xff08;WAF&#xff09; 掃描來…

Redis客戶端緩存的4種實現方式

Redis作為當今最流行的內存數據庫和緩存系統&#xff0c;被廣泛應用于各類應用場景。然而&#xff0c;即使Redis本身性能卓越&#xff0c;在高并發場景下&#xff0c;應用與Redis服務器之間的網絡通信仍可能成為性能瓶頸。 這時&#xff0c;客戶端緩存技術便顯得尤為重要。 客…

eNSP中路由器OSPF協議配置完整實驗和命令解釋

本實驗使用三臺華為路由器&#xff08;R1、R2和R3&#xff09;相連&#xff0c;配置OSPF協議實現網絡互通。拓撲結構如下&#xff1a; 實驗IP規劃 R1: GE0/0/0: 192.168.12.1/24 (Area 0)Loopback0: 1.1.1.1/32 (Area 0) R2: GE0/0/0: 192.168.12.2/24 (Area 0)GE0/0/1: 192.…

內網滲透——紅日靶場三

目錄 一、前期準備 二、外網探測 1.使用nmap進行掃描 2.網站信息收集 3.漏洞復現(CVE-2021-23132) 4.disable_function繞過 5.反彈shell&#xff08;也&#xff0c;并不是&#xff09; 6.SSH登錄 7.權限提升&#xff08;臟牛漏洞&#xff09; 8.信息收集 9.上線msf 三…

解決Win11下MySQL服務無法開機自啟動問題

問題描述 在win11系統中&#xff0c;明明將MySQL服務設置成了自動啟動&#xff0c;但在重啟電腦后MySQL服務還是無法自動啟動&#xff0c;每次都要重新到計算機管理的服務中找到服務再手動啟動。 解決方式 首先確保mysql服務的啟動類型為自動。 設置方法&#xff1a;找到此電…

后端項目進度匯報

項目概述 本項目致力于構建一個先進的智能任務自動化平臺。其核心技術是一套由大型語言模型&#xff08;LLM&#xff09;驅動的后端系統。該系統能夠模擬一個多角色協作的團隊&#xff0c;通過一系列精心設計或動態生成的處理階段&#xff0c;來高效完成各種復雜任務&#xff…

深度學習中學習率調整:提升食物圖像分類模型性能的關鍵實踐

深度學習中學習率調整&#xff1a;提升食物圖像分類模型性能的關鍵實踐 接上篇保存最優模型&#xff0c;在深度學習模型訓練過程中&#xff0c;學習率作為核心超參數&#xff0c;其設置直接影響模型的收斂速度與最終性能。本文將結合食物圖像分類項目&#xff0c;深入探討學習…