大綱
6.CAP原則與Raft協議
7.Nacos實現的Raft協議是如何寫入數據的
8.Nacos實現的Raft協議是如何選舉Leader節點的
9.Nacos實現的Raft協議是如何同步數據的
10.Nacos如何實現Raft協議的簡版總結
8.Nacos實現的Raft協議是如何選舉Leader節點的
(1)初始化RaftCore實例時會開啟兩個異步任務
(2)選舉Leader節點的MasterElection異步任務
(1)初始化RaftCore實例時會開啟兩個異步任務
在RaftCore的init()方法中,會開啟兩個異步任務。第一個異步任務的作用是選舉Leader節點,第二個異步任務的作用是發送心跳同步數據。
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {...//Init raft core.@PostConstructpublic void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();//從本地文件中加載數據raftStore.loadDatums(notifier, datums);setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());initialized = true;Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));//開啟一個異步任務選舉Leader節點masterTask = GlobalExecutor.registerMasterElection(new MasterElection());//開啟一個異步任務通過心跳同步數據heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());versionJudgement.registerObserver(isAllNewVersion -> {stopWork = isAllNewVersion;if (stopWork) {try {shutdown();raftListener.removeOldRaftMetadata();} catch (NacosException e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}}, 100);//給NotifyCenter注冊一個監聽PersistentNotifierNotifyCenter.registerSubscriber(notifier);Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);}...
}public class GlobalExecutor {//線程池的線程數是可用線程的一半private static final ScheduledExecutorService NAMING_TIMER_EXECUTOR =ExecutorFactory.Managed.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),Runtime.getRuntime().availableProcessors() * 2,new NameThreadFactory("com.alibaba.nacos.naming.timer"));...public static ScheduledFuture registerMasterElection(Runnable runnable) {//以固定的頻率來執行某項任務,它不受任務執行時間的影響,到時間就會執行任務return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);}public static ScheduledFuture registerHeartbeat(Runnable runnable) {//以相對固定的頻率來執行某項任務,即只有等這一次任務執行完了(不管執行了多長時間),才能執行下一次任務return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);}...
}
(2)選舉Leader節點的MasterElection異步任務
MasterElection的run()方法就體現了Raft協議進行Leader選舉的第一步。即每個節點會進行休眠,如果時間沒到則返回然后重新執行異步任務。等休眠時間到了才會調用MasterElection的sendVote()方法發起投票。
一旦執行MasterElection的sendVote()方法發起投票:會先把選舉周期+1,然后投票給自己,接著修改節點狀態為Candidate。做完這些準備工作后,才會以HTTP形式向其他節點發送投票請求。
其他節點返回投票信息時,會調用RaftPeerSet的decideLeader()方法處理。這個方法會處理其他節點返回的投票信息,具體邏輯如下:
首先用一個Map記錄每個節點返回的投票信息,然后遍歷這個Map去統計投票數量,最后比較當前節點的累計票數,是否已超過集群節點半數。如果超過,則把當前節點的狀態修改為Leader。
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private RaftPeerSet peers;...public class MasterElection implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}//隨機休眠RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;//休眠時間沒到就直接返回if (local.leaderDueMs > 0) {return;}//reset timeoutlocal.resetLeaderDue();local.resetHeartbeatDue();//發起投票sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term);peers.reset();//選舉周期+1local.term.incrementAndGet();//投票給自己local.voteFor = local.ip;//自己成為候選者,設置當前節點狀態為Candidate狀態local.state = RaftPeer.State.CANDIDATE;Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));//遍歷其他集群節點for (final String server : peers.allServersWithoutMySelf()) {final String url = buildUrl(server, API_VOTE);try {//發送HTTP的投票請求HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));//處理返回的投票結果peers.decideLeader(peer);}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}}}...
}@Deprecated
@Component
@DependsOn("ProtocolManager")
public class RaftPeerSet extends MemberChangeListener implements Closeable {private volatile Map<String, RaftPeer> peers = new HashMap<>(8);...//Calculate and decide which peer is leader. If has new peer has more than half vote, change leader to new peer.public RaftPeer decideLeader(RaftPeer candidate) {//記錄本次投票結果peers.put(candidate.ip, candidate);SortedBag ips = new TreeBag();int maxApproveCount = 0;String maxApprovePeer = null;//統計累計票數for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}ips.add(peer.voteFor);if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);maxApprovePeer = peer.voteFor;}}//判斷投票數量是否超過半數,如果已超半數則把自己節點的狀態改為Leaderif (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);//設置當前節點的狀態為Leader狀態peer.state = RaftPeer.State.LEADER;if (!Objects.equals(leader, peer)) {leader = peer;ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));Loggers.RAFT.info("{} has become the LEADER", leader.ip);}}return leader;}...
}
9.Nacos實現的Raft協議是如何同步數據的
(1)Leader節點如何發送心跳來同步數據
(2)Follower節點如何處理心跳來同步數據
(1)Leader節點如何發送心跳來同步數據
RaftCore的init()方法會開啟另外一個異步任務HeartBeat。HeartBeat的run()方法會調用HeartBeat的sendBeat()方法來發送心跳請求。
其中只有Leader節點才會發送心跳請求。Leader在調用HeartBeat的sendBeat()方法發送心跳同步數據請求時,會將Instance的key作為心跳的參數發送給其他Follower節點。Follower節點接收到Leader的心跳請求后,會比較請求中的數據與自身數據的差異,如果存在差異則向Leader同步。
HeartBeat的sendBeat()方法主要包括三部分:
第一部分:判斷當前節點是不是Leader,如果不是Leader則不能發送心跳。
第二部分:組裝發送心跳包的參數。只會把datum.key放入進去,并不會把整個Instance信息傳輸過去。Follower節點拿到心跳包中的key之后,發現部分key在自身節點是不存在的,那么這時Follower節點就會根據這些key向Leader節點獲取Instance的詳細信息進行同步。
第三部分:向其他Follower節點發送心跳數據,是通過HTTP的方式來發起心跳請求的,請求地址為:/v1/ns/raft/beat。
@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {...public class HeartBeat implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}local.resetHeartbeatDue();//發送心跳同步數據sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}private void sendBeat() throws IOException, InterruptedException {RaftPeer local = peers.local();//第一部分:判斷當前節點是不是Leader,如果不是Leader則不能發送心跳if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {return;}if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());}local.resetLeaderDue();//build dataObjectNode packet = JacksonUtils.createEmptyJsonNode();packet.replace("peer", JacksonUtils.transferToJsonNode(local));ArrayNode array = JacksonUtils.createEmptyArrayNode();if (switchDomain.isSendBeatOnly()) {Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());}if (!switchDomain.isSendBeatOnly()) {//第二部分:組裝發送心跳包的參數//組裝數據,只會把datum.key放入進去,并不會把整個Instance信息傳輸過去for (Datum datum : datums.values()) {ObjectNode element = JacksonUtils.createEmptyJsonNode();if (KeyBuilder.matchServiceMetaKey(datum.key)) {//只放入key的信息element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));} else if (KeyBuilder.matchInstanceListKey(datum.key)) {element.put("key", KeyBuilder.briefInstanceListkey(datum.key));}element.put("timestamp", datum.timestamp.get());array.add(element);}}packet.replace("datums", array);//broadcastMap<String, String> params = new HashMap<String, String>(1);params.put("beat", JacksonUtils.toJson(packet));String content = JacksonUtils.toJson(params); ByteArrayOutputStream out = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(out);gzip.write(content.getBytes(StandardCharsets.UTF_8));gzip.close();byte[] compressedBytes = out.toByteArray();String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length());}//第三部分:向其他Follower節點發送心跳數據,通過HTTP的方式來發起心跳,請求地址為:/v1/ns/raft/beat//發送心跳 + 同步數據for (final String server : peers.allServersWithoutMySelf()) {try {final String url = buildUrl(server, API_BEAT);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("send beat to server " + server);}//通過HTTP發送心跳HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);MetricsMonitor.getLeaderSendBeatFailedException().increment();return;}peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("receive beat response from: {}", url);}}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, throwable);MetricsMonitor.getLeaderSendBeatFailedException().increment();}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);MetricsMonitor.getLeaderSendBeatFailedException().increment();}}}}...
}
(2)Follower節點如何處理心跳來同步數據
Follower節點收到Leader節點發送過來的HTTP請求"/v1/ns/raft/beat"時,會執行RaftController類中的beat()方法,接著會執行RaftCore的receivedBeat()方法來進行具體的心跳處理。
RaftCore.receivedBeat()方法的具體邏輯如下:
一.首先會進行一些判斷
第一個判斷:Follower節點接收到的心跳請求如果不是Leader節點發出的會直接拋出異常。
第二個判斷:Follower節點的term只會小于等于Leader節點的term,如果大于,則直接拋出異常。
第三個判斷:如果自身節點的狀態不是Follower,需要把狀態改為Follower。因為有可能自身節點之前是Leader,但因為網絡原因出現了腦裂問題。等網絡恢復后,自身節點收到新Leader發來的心跳,新Leader的term比自身節點要大,那么它就需要切換成Follower節點。
二.然后對自身節點的datums中的key和心跳請求中的key進行比對
如果發現自身節點數據缺少了,那么就會記錄到batch中,然后把batch中的key進行拆分包裝成請求參數,最后通過HTTP方式向Leader節點查詢這些key對應的Instance詳細信息。
Follower節點拿到Leader節點返回的Instance服務實例信息后,會繼續調用RaftStore.write()、PersistentNotifier.notify()這兩個方法,一個將數據持久化到本地文件、一個將數據同步到內存注冊表,從而最終完成以Leader節點為準的心跳請求同步數據的流程。
@Deprecated
@RestController
@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft", UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"})
public class RaftController {private final RaftCore raftCore;...@PostMapping("/beat")public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {if (versionJudgement.allMemberIsNewVersion()) {throw new IllegalStateException("old raft protocol already stop");}String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);String value = URLDecoder.decode(entity, "UTF-8");value = URLDecoder.decode(value, "UTF-8");JsonNode json = JacksonUtils.toObj(value);//處理心跳RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));return JacksonUtils.transferToJsonNode(peer);}...
}@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {public final PersistentNotifier notifier;...//Received beat from leader. // TODO split method to multiple smaller method.public RaftPeer receivedBeat(JsonNode beat) throws Exception {...//第一個判斷:如果發送心跳不是Leader,則直接拋出異常if (remote.state != RaftPeer.State.LEADER) {Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JacksonUtils.toJson(remote));throw new IllegalArgumentException("invalid state from master, state: " + remote.state);}//第二個判斷:如果本身節點的term還大于Leader的term,也直接拋出異常if (local.term.get() > remote.term.get()) {Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}", remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());}//第三個判斷:自己的節點狀態是不是FOLLOWER,如果不是則需要更改為FOLLOWERif (local.state != RaftPeer.State.FOLLOWER) {Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));//mk followerlocal.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;}...//遍歷Leader傳輸過來的Instance key,和本地的Instance進行對比for (Object object : beatDatums) {...try {if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {continue;}if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {//記錄需要同步的datumKeybatch.add(datumKey);}//到達一定數量才進行批量數據同步if (batch.size() < 50 && processedCount < beatDatums.size()) {continue;}...//使用batch組裝參數String url = buildUrl(remote.ip, API_GET);Map<String, String> queryParam = new HashMap<>(1);queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));//發送HTTP請求給Leader,根據keys參數獲取Instance詳細信息HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {return;}//序列化result結果List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() { });//遍歷Leader返回的Instance詳細信息for (JsonNode datumJson : datumList) {Datum newDatum = null;OPERATE_LOCK.lock();try {...//Raft寫本地數據raftStore.write(newDatum);//同步內存數據datums.put(newDatum.key, newDatum);//和服務實例注冊時的邏輯一樣,最終會調用listener.onChange()方法notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);} catch (Throwable e) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);} finally {OPERATE_LOCK.unlock();}}...return;}...});batch.clear();} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);}}...} ...
}
10.Nacos如何實現Raft協議的簡版總結
Nacos實現的Raft協議主要包括三部分內容:
一.Nacos集群如何使用Raft協議寫入數據
二.Nacos集群如何選舉Leader節點
三.Nacos集群如何讓Leader實現心跳請求同步數據
Nacos早期版本實現的只是Raft協議的簡化版本,并沒有兩階段提交的處理。而是Leader節點處理數據完成后,直接就去同步給其他集群節點。哪怕集群節點同步失敗或沒有過半節點成功,Leader的數據也不會回滾而只拋出異常。所以,Nacos早期版本的Raft實現,后期也會廢棄使用。
如下是Nacos實現的Raft協議在注冊服務實例時集群處理數據的流程:
如下是Nacos實現的Raft協議處理Leader選舉和通過心跳同步數據的流程: