大綱
6.CAP原則與Raft協議
7.Nacos實現的Raft協議是如何寫入數據的
8.Nacos實現的Raft協議是如何選舉Leader節點的
9.Nacos實現的Raft協議是如何同步數據的
10.Nacos如何實現Raft協議的簡版總結
6.CAP原則與Raft協議
(1)CAP分別指的是什么
(2)什么是分區以及容錯
(3)為什么不能同時滿足CAP原則
(4)Raft協議定義節點的三種狀態
(5)Raft協議的數據同步流程
(6)Raft協議的Leader選舉流程
(7)Raft協議如何解決腦裂問題
(8)總結
(1)CAP分別指的是什么
一.C指的是一致性Consistency
各個集群節點之間的數據,必須要保證一致。
二.A指的是可用性Availability
在分布式架構中,每個請求都能在合理的時間內獲得符合預期的響應。
三.P指的是分區容錯性Partition Tolerance
當集群節點間出現網絡問題,整個系統依然能正常提供服務。
在CAP原則中,我們首先要保證P即分區容錯性。
(2)什么是分區以及容錯
分區指的是網絡分區。如果在分布式架構中,出現了網絡通信問題。比如節點A可以和節點B相互通信,但是不能和節點C、D進行通信。但是節點C、D之間是可以通信的,這種情況下就是出現了網絡分區。
容錯是指在分布式架構中,集群節點出現分區情況時,整個系統仍然要保持對外提供服務的能力,不能因為網絡分區而導致整個系統不能對外提供服務。
在CAP原則下:由于P是首要保證的,所以C、A就不能兼得,必須要舍棄其一。因此需要根據業務來權衡,是更注重可用性、還是更加注重一致性。
(3)為什么不能同時滿足CAP原則
首先前提條件是,需要滿足P。
情況一:假設在分布式集群中選擇使用CP架構,更加注重數據的一致性。這時出現了網絡分區,節點A、B與節點C之間網絡不互通。如果此時向集群寫入一個數據,由于節點A、B能夠網絡互通,所以節點A、B寫入的數據可以相互同步,但是節點C沒辦法做數據同步。那么在這種情況下,如何才能保證數據的一致性呢?
此時只能將節點C暫時看作不可用的狀態,等網絡恢復和數據同步好了,節點C才能正常地提供服務。否則下一次用戶向集群請求獲取數據時,請求到了節點C。但由于網絡分區導致節點C并未同步數據,那么本次查詢就查不到數據,這樣就達不到CP架構的一致性要求了。所以肯定需要舍棄節點C的可用性。
情況二:假設在分布式集群中選擇使用AP架構,更加注重數據的可用性。這時出現了網絡分區,節點A、B與節點C之間網絡不互通。雖然節點C暫時由于網絡不通的原因,無法進行數據同步。但是由于集群更加注重服務的可用性,所以節點C還是可以正常提供服務。只是節點C和節點A、B之間的數據略有差異,但不影響節點的正常使用。所以就需要舍棄節點C的數據一致性。
在AP架構中,集群節點間的數據也需要同步。集群節點數據的同步一般都是通過一些異步任務來保證數據的最終一致性,只是同步時效沒有那么及時。
在CP架構中,可以通過Raft協議實現數據一致性。Raft協議就是在分布式架構下,多節點保證數據一致性的協議。
(4)Raft協議定義節點的三種狀態
Raft協議對集群節點定義了三種狀態:
一.Follower追隨者
這是默認的狀態,所有的集群節點一開始都是Follower狀態。
二.Candidate候選者
當某集群節點開始發起投票選舉Leader時,首先會投給自己一票,這時就會從Follower狀態變成Candidate狀態。
三.Leader領導者
當某集群節點獲得了大多數集群節點的投票,那么就會變成Leader狀態。
(5)Raft協議的數據同步流程
一.Raft協議是如何處理數據寫入請求
在Raft協議中,只有Leader節點才會處理客戶端數據的寫入請求。如果非Leader節點收到了寫入請求,會轉發到Leader節點上進行處理。
數據的寫入一共有兩個狀態:uncommit和commit。這個兩個狀態對應于兩階段提交,可以保證數據正確寫入成功。
當Leader節點接收到一個數據寫入請求時:首先會在自身的節點進行數據處理,然后馬上同步給集群的其他節點,此時Leader節點的這個數據的狀態是uncommit狀態。只有當半數以上的其他節點寫入成功,Leader節點才會把數據寫入成功。當Leader節點最終把數據寫入成功后,會通知其他節點進行commit,此時Leader節點的這個數據的狀態是commit狀態。
二.非Leader節點寫入失敗如何處理
由于Leader節點只需要有半數以上的節點寫入成功即可,所以如果有部分非Leader節點沒有寫入或寫入失敗,該如何處理?
Raft協議中的Leader節點和Follower節點會有心跳機制。在心跳傳輸過程中,Leader節點會把最新的數據傳給其他Follower節點,以保證Follower節點中的數據和Leader節點的數據是一致的。
需要注意的是:當Follower節點沒有在指定時間內接收到Leader節點發送過來的心跳包,Follower節點就會認為Leader節點掛掉了,此時Follower節點會把自身狀態修改為Candidate并且重新發起投票。
https://thesecretlivesofdata.com/raft/#home
Let's say we have a single node system.
For this example, you can think of our node as a database server that stores a single value.
We also have a client that can send a value to the server.
Coming to agreement, or consensus, on that value is easy with one node.
But how do we come to consensus if we have multiple nodes?
That's the problem of distributed consensus.
Raft is a protocol for implementing distributed consensus.
Let's look at a high level overview of how it works.A node can be in 1 of 3 states: the Follower state, the Candidate state, or the Leader state.
All our nodes start in the follower state.
If followers don't hear from a leader then they can become a candidate.
The candidate then requests votes from other nodes.
Nodes will reply with their vote.
The candidate becomes the leader if it gets votes from a majority of nodes.
This process is called Leader Election.All changes to the system now go through the leader.
Each change is added as an entry in the node's log.
This log entry is currently uncommitted so it won't update the node's value.
To commit the entry the node first replicates it to the follower nodes...
then the leader waits until a majority of nodes have written the entry.
The entry is now committed on the leader node and the node state is "5".
The leader then notifies the followers that the entry is committed.
The cluster has now come to consensus about the system state.
This process is called Log Replication.
(6)Raft協議的Leader選舉流程
Leader是如何選舉出來的?
一.選舉超時時間和選舉步驟
假設使用了Raft協議的集群有3個節點:那么一開始,三個節點都會在倒計時中進行等待,此時會有一個稱為Election Timeout的隨機休眠時間或選舉超時時間,該選舉超時時間會被隨機分配到150ms到300ms之間。
等待超過選舉超時時間過后,節點會馬上進行投票,投票分為如下幾個步驟:
步驟一:先投給自己一票,并且把自己節點狀態修改為Candidate
步驟二:向其他集群節點進行投票
步驟三:獲取投票結果,如果過半節點投自己,則把狀態修改為Leader
一旦Leader節點選舉出來,其他節點的數據都要以Leader節點的為準。因此Leader節點會馬上通過心跳機制,同步數據給其他節點。
https://thesecretlivesofdata.com/raft/#election
In Raft there are two timeout settings which control elections.First is the election timeout.
The election timeout is the amount of time a follower waits until becoming a candidate.
The election timeout is randomized to be between 150ms and 300ms.
After the election timeout the follower becomes a candidate and starts a new election term...
...votes for itself...
...and sends out Request Vote messages to other nodes.
If the receiving node hasn't voted yet in this term then it votes for the candidate...
...and the node resets its election timeout.
Once a candidate has a majority of votes it becomes leader.
The leader begins sending out Append Entries messages to its followers.Second is the heartbeat timeout.
These messages are sent in intervals specified by the heartbeat timeout.
Followers then respond to each Append Entries message.This election term will continue until a follower stops receiving heartbeats and becomes a candidate.
Let's stop the leader and watch a re-election happen.
Node B is now leader of term 2.
Requiring a majority of votes guarantees that only one leader can be elected per term.If two nodes become candidates at the same time then a split vote can occur.
Let's take a look at a split vote example...
Two nodes both start an election for the same term...
...and each reaches a single follower node before the other.
Now each candidate has 2 votes and can receive no more for this term.
The nodes will wait for a new election and try again.
Node A received a majority of votes in term 5 so it becomes leader.
二.不同的選舉情況分析
如果集群啟動時,節點C率先等待超過了選舉超時時間。那么節點C會馬上發起投票,并改變它自己的狀態變為Candidate。等節點C獲取超過半數以上的投票,那么它就會成為Leader節點。
如果在集群運行中,Leader節點突然下線。那么這時候其他的Follower節點會重新進行Leader選舉。假設原本的Leader節點是B,但由于B突然下線,節點A、C會重新發起投票,最終節點C成為新的Leader節點。并且重新選舉Leader后,Trem(任期)會進行遞增。Term可理解為Leader的選舉次數,次數越大說明數據肯定是最全的。
如果有四個節點,其中有兩個Candidate節點都有2票,沒有過半。在這種情況下,則會讓全部節點重新進行隨機睡眠,重新進行Leader選舉。
(7)Raft協議如何解決腦裂問題
在Raft協議的一些情況下,可能會產生多個Leader節點。那么多個Leader節點是如何產生的?多個Leader會不會有沖突?
如果在一個集群下,出現了兩個Leader節點,那么這就是腦裂問題。假設集群節點有5個,節點B是Leader,但由于發生了網絡分區問題。節點A、B可以相互通信,可是節點C、D、E不能和Leader進行通信。那么節點C、D、E將會重新進行Leader選舉,最終節點C也成為了Leader。此時,在原本一個集群下,就會產生兩個Leader節點。
此時,如果有客戶端來進行寫數據:
第一個客戶端請求到了節點B,由于節點B所在分區網絡只有一個Follower節點,達不到半數以上要求,所以節點B的數據一直處于uncommit狀態,數據也不會寫入成功。
第二個客戶端請求到了節點C,由于節點C所在分區網絡有兩個Follower節點,有半數以上支持,所以節點C的數據是能夠寫入成功的。
假如網絡突然恢復,5個節點都可以相互通信,那么怎么處理兩個Leader。這時兩個Leader會相互發送心跳。節點B會發現節點C的Term比自己大,所以會認節點C為Leader并自動轉換為Follower節點。
https://thesecretlivesofdata.com/raft/#replication
Once we have a leader elected we need to replicate all changes to our system to all nodes.
This is done by using the same Append Entries message that was used for heartbeats.
Let's walk through the process.First a client sends a change to the leader. Set value by "5".
The change is appended to the leader's log...
...then the change is sent to the followers on the next heartbeat.
An entry is committed once a majority of followers acknowledge it...
...and a response is sent to the client.Now let's send a command to increment the value by "2".
Our system value is now updated to "7".
Raft can even stay consistent in the face of network partitions.Let's add a partition to separate A & B from C, D & E.
Because of our partition we now have two leaders in different terms.
Let's add another client and try to update both leaders.
One client will try to set the value of node B to "3".
Node B cannot replicate to a majority so its log entry stays uncommitted.
The other client will try to set the value of node C to "8".
This will succeed because it can replicate to a majority.Now let's heal the network partition.
Node B will see the higher election term and step down.
Both nodes A & B will roll back their uncommitted entries and match the new leader's log.
Our log is now consistent across our cluster.
(8)總結
Raft協議相關論文:
https://raft.github.io/raft.pdf
Raft協議詳細流程演示:
https://thesecretlivesofdata.com/raft/
Nacos既支持AP架構,也支持CP架構。前面介紹的集群源碼,是屬于AP架構的。在源碼中可以看到很多異步任務,說明是比較看重可用性。由于是使用定時任務,那么數據會在某些特定時間出現不一致的情況,但最終還是會保證一致性。
7.Nacos實現的Raft協議是如何寫入數據的
(1)Nacos 1.4.1版本實現Raft協議說明
(2)Nacos實現的Raft協議是如何寫入數據的
(3)RaftCore的signalPublish()方法總結
(1)Nacos 1.4.1版本實現Raft協議說明
Nacos 1.4.1版本并沒有完全按照標準的Raft協議所定義的流程來實現,所以該版本的實現中會存在一些問題。并且Nacos 1.4.1版本,已標注后期會刪除這套實現。
Nacos 2.x版本會采用JRaft來實現Raft協議,JRaft就是完全按照Raft協議定義的流程來實現的。所以早期版本實現的Raft協議,沒必要仔細研究,大概知道流程即可。
(2)Nacos實現的Raft協議是如何寫入數據的
在Raft協議里只有Leader節點才會操作數據,并且會有兩階段提交的動作,所以可以通過服務實例注冊的處理為入口進行分析。
在進行服務實例注冊時:會通過一個key來選擇調用不同ConsistencyService實現類的put()方法。而這個key中會包含一個很關鍵的屬性叫做ephemeral,ephemeral默認是true,所以最終會執行AP架構下的服務注冊。我們可以在yml配置文件中,把ephemeral屬性設置為false,那么在服務實例注冊時,就會執行CP架構下的服務注冊。不過,注冊中心一般很少使用CP架構。
如果執行的是CP架構下的服務注冊,那么最終會調用RaftConsistencyServiceImpl的put()方法,從而觸發調用Raft協議的核心方法:RaftCore的signalPublish()方法。
@Component
public class ServiceManager implements RecordListener<Service> {...//Add instance to service. 添加服務實例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//構建要注冊的服務實例對應的服務的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根據命名空間以及服務名獲取要注冊的服務實例對應的服務Service service = getService(namespaceId, serviceName);//使用synchronized鎖住要注冊的服務實例對應的服務synchronized (service) {//由于一個服務可能存在多個服務實例,所以需要根據當前注冊請求的服務實例ips,獲取對應服務的最新服務實例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances實現了用于在Nacos集群進行網絡傳輸的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//執行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}...
}@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService { private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;public DelegateConsistencyServiceImpl(PersistentConsistencyServiceDelegateImpl persistentConsistencyService, EphemeralConsistencyService ephemeralConsistencyService) {this.persistentConsistencyService = persistentConsistencyService;this.ephemeralConsistencyService = ephemeralConsistencyService;}@Overridepublic void put(String key, Record value) throws NacosException {//如果是臨時實例,則調用DistroConsistencyServiceImpl.put()方法//如果是持久化實例,則調用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}...private ConsistencyService mapConsistencyService(String key) {//根據不同的key選擇不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}
}@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {private final RaftConsistencyServiceImpl oldPersistentConsistencyService;private final BasePersistentServiceProcessor newPersistentConsistencyService;private volatile boolean switchNewPersistentService = false;...@Overridepublic void put(String key, Record value) throws NacosException {switchOne().put(key, value);}private PersistentConsistencyService switchOne() {return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;}...
}//Use simplified Raft protocol to maintain the consistency status of Nacos cluster.
@Deprecated
@DependsOn("ProtocolManager")
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {private final RaftCore raftCore;...@Overridepublic void put(String key, Record value) throws NacosException {checkIsStopWork();try {//Raft協議的核心實現raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);}}...
}
RaftCore的signalPublish()方法中的邏輯大概分成三部分:
第一部分:方法一開始就會判斷自身節點是不是Leader節點,如果不是則會通過HTTP方式轉發給Leader節點進行處理。
第二部分:RaftCore的signalPublish()方法中有一行核心代碼onPublish(),即如果是Leader節點則會執行RaftCore的onPublish()方法來處理數據。該方法會先把數據寫入到本地文件,然后馬上同步給內存注冊表。
RaftStore的write(datum)方法會把服務實例信息持久化到本地文件,即把Instance服務實例信息以JSON格式持久化到Nacos服務端目錄下,并且存儲的文件是以命名空間##分組@@服務名來命名的。而持久化的服務實例信息,在下一次服務端重啟時會重新加載到內存注冊表中。
服務實例信息持久化后,會通過NotifyCenter發布ValueChangeEvent事件更新注冊表。RaftCore的init()方法會向NotifyCenter注冊一個訂閱者PersistentNotifier。所以NotifyCenter發布ValueChangeEvent事件時,就會被PersistentNotifier的onEvent()方法監聽到,然后執行PersistentNotifier的notify()方法,最后會執行Service的onChange()方法來更新內存注冊表。
第三部分:主要就是遍歷集群節點,向每個節點發起通知請求來進行數據同步,這里會使用CountDownLatch閉鎖來實現控制集群半數節點同步成功。
在創建CountDownLatch閉鎖時,會獲取集群半數的數量來創建閉鎖。每當有一個集群節點同步成功,就對CountDownLatch閉鎖進行減1。最后使用閉鎖的await()方法進行等待,直到閉鎖減完或超時才繼續執行。這樣通過CountDownLatch并發工具類就能實現需要過半節點成功的功能。
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private final RaftProxy raftProxy;private final RaftStore raftStore;public final PersistentNotifier notifier;...@PostConstructpublic void init() throws Exception {...//注冊訂閱者NotifyCenter.registerSubscriber(notifier);...}...//Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}//第一部分:判斷自己是不是Leader節點if (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);//獲取Leader節點final RaftPeer leader = getLeader();//將寫請求轉發給Leader節點raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));//第二部分:Leader節點會執行到這里進行數據處理,把服務實例信息寫入磁盤以及內存onPublish(datum, peers.local());//第三部分:final String content = json.toString();//通過閉鎖來控制半數以上節點,peers.majorityCount()就是獲取集群半數以上的節點數量final CountDownLatch latch = new CountDownLatch(peers.majorityCount());//同步給其他節點for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);//通過HTTP方式通知其他節點HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode());return;}//某個節點成功,閉鎖-1latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}//通過閉鎖的await()方法來等待半數集群節點同步成功if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {//only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}}...//Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.public void onPublish(Datum datum, RaftPeer source) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}RaftPeer local = peers.local();if (datum.value == null) {Loggers.RAFT.warn("received empty datum");throw new IllegalStateException("received empty datum");}if (!peers.isLeader(source.ip)) {Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader()));throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");}if (source.term.get() < local.term.get()) {Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local));throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());}local.resetLeaderDue();//if data should be persisted, usually this is true:if (KeyBuilder.matchPersistentKey(datum.key)) {//先把數據寫到本地文件中raftStore.write(datum);}//同步緩存datums.put(datum.key, datum);if (isLeader()) {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);} else {if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {//set leader term:getLeader().term.set(source.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);}}raftStore.updateTerm(local.term.get());//通過發布ValueChangeEvent事件來同步內存注冊表NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);}
}@Deprecated
@Component
public class RaftStore implements Closeable {...//Write datum to cache file.public synchronized void write(final Datum datum) throws Exception {String namespaceId = KeyBuilder.getNamespace(datum.key);File cacheFile = new File(cacheFileName(namespaceId, datum.key));if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {MetricsMonitor.getDiskException().increment();throw new IllegalStateException("can not make cache file: " + cacheFile.getName());}ByteBuffer data;data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {fc.write(data, data.position());fc.force(true);} catch (Exception e) {MetricsMonitor.getDiskException().increment();throw e;}//remove old format file:if (StringUtils.isNoneBlank(namespaceId)) {if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {String oldDatumKey = datum.key.replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));if (cacheFile.exists() && !cacheFile.delete()) {Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key, datum.value);throw new IllegalStateException("failed to delete old format datum: " + datum.key);}}}}...
}//事件發布中心:事件發布機制的實現
public class NotifyCenter {...//注冊訂閱者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//執行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}//發布事件public static boolean publishEvent(final Event event) {return publishEvent(event.getClass(), event);}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//執行DefaultPublisher.publish()方法return publisher.publish(event);}...}...
}public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();...@Overridepublic void addSubscriber(Subscriber subscriber) {subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {... receiveEvent(event);...}void receiveEvent(Event event) {...for (Subscriber subscriber : subscribers) {...notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {final Runnable job = new Runnable() {@Overridepublic void run() {subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}
(3)RaftCore的signalPublish()方法總結
首先會判斷自身節點是不是Leader,如果不是,則會轉發給Leader處理。如果是Leader,則會對數據進行處理,先是寫入到本地文件,然后同步到內存注冊表,最后會通知其他Follower節點進行數據同步。
可見Nacos 1.4.1版本在數據的寫入實現上,并沒有兩階段提交的處理。而是Leader自身處理數據完成后,直接就去同步給其他集群節點。哪怕集群節點同步失敗或沒有過半節點成功,Leader的數據也不會回滾而只拋出異常。所以,Nacos 1.4.1版本只是實現了Raft的簡化版,后續也會被廢棄掉的。