elasticsearch源碼分析-03選舉集群狀態

選舉集群狀態

es中存儲的數據有一下幾種,state元數據、lucene索引文件、translog事務日志
元數據信息可以分為:

  • 集群層面的元信息-對應著metaData數據結構,主要是clusterUUid、settings、templates等
  • 索引層面的元信息-對應著indexMetaData數據結構,主要存儲分片數量、mappings索引字段映射等
  • 分片層面的元信息-對應著shardStateMetaData,主要是version、indexUUid、主分片等
    每個節點可能會有不同的集群狀態,需要選擇正確的元數據作為權威源數據。狀態信息的管理在gatewayService中,它實現了ClusterStateListener接口,當選擇完主節點后會發布一個集群狀態task,觸發回調方法clusterChanged
//恢復分片分配狀態
performStateRecovery(enforceRecoverAfterTime, reason);

集群層和索引層元數據恢復在gateway模塊完成

public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state = event.state();//只有主節點才能執行if (state.nodes().isLocalNodeElectedMaster() == false) {// not our job to recoverreturn;}//已經執行過了集群狀態和索引狀態恢復了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {// already recoveredreturn;}//這段省略主要是檢查是否達到恢復狀態條件......//恢復狀態performStateRecovery(enforceRecoverAfterTime, reason);
}

首先判斷只有主節點可以執行狀態選舉,然后判斷是否已經在執行了狀態恢復任務了,如果是則直接返回;如果沒有則執行恢復狀態任務
最終會調用recoveryRunnable.run()

final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable = () ->gateway.performStateRecovery(new GatewayRecoveryListener());

執行gateway的performStateRecovery方法
首先回去所有master資格的節點信息

//具有master資格的node節點final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);

獲取其他master節點的元數據

//獲取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();

這里我們看下TransportNodesListGatewayMetaState的構造函數

public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState = metaState;
}//注冊action處理類
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());

回到list方法,會調用doExecute方法

public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future;
}protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {//執行new AsyncAction(task, request, listener).start();
}

發送所有節點獲取元數據

void start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {//沒有需要獲取數據的node// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder = TransportRequestOptions.builder();if (request.timeout() != null) {builder.withTimeout(request.timeout());}//循環發送請求給所有節點for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//發送請求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//處理返回@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}@Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}}
}

對端接收請求后處理在上面注冊的NodeTransportHandler,構造每個節點元數據返回

//node請求處理class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {@Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}

我們繼續回到每個節點發送請求的返回處理

//處理返回
@Override
public void handleResponse(NodeResponse response) {onOperation(idx, response);
}private void onOperation(int idx, NodeResponse nodeResponse) {//記錄node的返回結果responses.set(idx, nodeResponse);//當所有節點都返回結果了無論是失敗還是成功了if (counter.incrementAndGet() == responses.length()) {finishHim();}
}private void finishHim() {NodesResponse finalResponse;try {finalResponse = newResponse(request, responses);} catch (Exception e) {logger.debug("failed to combine responses from nodes", e);listener.onFailure(e);return;}//觸發監聽回調listener.onResponse(finalResponse);
}

及獲取到了其他節點的元數據,繼續回到performStateRecovery
需要獲取的master角色節點數

//需要分配數量
final int requiredAllocation = Math.max(1, minimumMasterNodes);

開始通過版本號選擇集群層元數據,比較版本號,選擇版本號最大的集群狀態

//集群元數據
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}found++;if (electedGlobalState == null) {electedGlobalState = nodeState.metadata();//比較版本號大的勝出} else if (nodeState.metadata().version() > electedGlobalState.version()) {electedGlobalState = nodeState.metadata();}for (final ObjectCursor<IndexMetadata> cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);}
}

檢查是否有足夠數量節點返回了集群狀態

 //沒有獲取足夠的節點返回消息
if (found < requiredAllocation) {listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");return;
}

構造集群狀態,刪除索引信息,下面會選擇索引級元數據

//更新全局狀態,清理索引,我們在下一階段選擇它們final Metadata.Builder metadataBuilder = Metadata.builder(electedGlobalState).removeAllIndices();

遍歷所有節點選擇返回的索引元數據版本最高的節點作為索引級元數據,然后將索引級元數據添加到metadataBuilder中

for (int i = 0; i < keys.length; i++) {if (keys[i] != null) {final Index index = (Index) keys[i];IndexMetadata electedIndexMetadata = null;int indexMetadataCount = 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}final IndexMetadata indexMetadata = nodeState.metadata().index(index);if (indexMetadata == null) {continue;}if (electedIndexMetadata == null) {electedIndexMetadata = indexMetadata;//比較版本號,選擇最大版本號} else if (indexMetadata.getVersion() > electedIndexMetadata.getVersion()) {electedIndexMetadata = indexMetadata;}indexMetadataCount++;}if (electedIndexMetadata != null) {if (indexMetadataCount < requiredAllocation) {logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//設置索引級元數據metadataBuilder.put(electedIndexMetadata, false);}}
}

構造恢復后的集群級元數據和索引級元數據

//恢復后的集群狀態
ClusterState recoveredState = Function.<ClusterState>identity().andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);

調用GatewayRecoveryListener的onSuccess向集群提交任務

class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic void onFailure(final String msg) {logger.info("state recovery failed: {}", msg);resetRecoveredFlags();}}

調用RecoverStateUpdateTask的execute方法

@Override
public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//狀態信息恢復完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//開始分配分片return allocationService.reroute(newState, "state recovered");
}

集群元數據和索引級元數據恢復完成開始分配分片

  • 元數據的持久化
    具有master資格的節點和數據節點可以持久化集群狀態,當接收到集群狀態變更時會將其持久化到磁盤GatewayClusterApplier實現了ClusterStateApplier,當集群狀態變更時會調用applyClusterState方法
@Override
public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// that's higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盤上的元數據incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn("Exception occurred when storing new meta data", e);}
}

將集群級元數據和索引級元數據落盤

void updateClusterState(ClusterState newState) throws WriteStateException {//元數據Metadata newMetadata = newState.metadata();final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元數據long globalStateGeneration = writeGlobalState(writer, newMetadata);//索引級元數據Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest = manifest;previousClusterState = newState;final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +"wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());}
}
  • 加載磁盤元數據
    在node實例的start方法中會調用gatewayMetaState.start方法
//集群元數據
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));

然后會調用loadFullState方法

//加載元數據
manifestClusterStateTuple = metaStateService.loadFullState();public Tuple<Manifest, Metadata> loadFullState() throws IOException {//加載最新的狀態文件final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest == null) {return loadFullStateBWC();}//構建元數據final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder = Metadata.builder();} else {final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata != null) {metadataBuilder = Metadata.builder(globalMetadata);} else {throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]");}}//索引級元數據for (Map.Entry<Index, Long> entry : manifest.getIndexGenerations().entrySet()) {final Index index = entry.getKey();final long generation = entry.getValue();final String indexFolderName = index.getUUID();final IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata != null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName +", generation: " + generation + "]");}}return new Tuple<>(manifest, metadataBuilder.build());
}

從磁盤讀取構建索引級元數據和集群級元數據,用于構建集群狀態對象ClusterState

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/37543.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/37543.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/37543.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

RK35x8通過TFTP下載內核到開發板

對于有網線接口的RK35X8開發板&#xff0c;調試時候&#xff0c;可以通過網線下載內核鏡像和設備樹到開發板&#xff0c;不用每次修改驅動都要重新打開下載工具&#xff0c;進入下載模式。通過TFTP可以大大提高調試效率。 在ubuntu安裝TFTP服務 安裝tftp服務器 sudo apt-get…

【面試系列】前端開發工程師高頻面試題及詳細解答

歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;歡迎訂閱相關專欄&#xff1a; ?? 全網最全IT互聯網公司面試寶典&#xff1a;收集整理全網各大IT互聯網公司技術、項目、HR面試真題. ?? AIGC時代的創新與未來&#xff1a;詳細講解AIGC的概念、核心技術、…

Python商務數據分析知識專欄(二)——Python數據分析基礎

Python商務數據分析知識專欄&#xff08;二&#xff09;——Python數據分析基礎 一、Python數據分析概述二、Numpy數值計算基礎專欄二&#xff08;Python數據分析基礎&#xff09;的總結 與 專欄三&#xff08;Python數據分析的應用&#xff09;開端 一、Python數據分析概述 二…

【筆記】Spring Cloud Gateway 實現 gRPC 代理

Spring Cloud Gateway 在 3.1.x 版本中增加了針對 gRPC 的網關代理功能支持,本片文章描述一下如何實現相關支持.本文主要基于 Spring Cloud Gateway 的 官方文檔 進行一個實踐練習。有興趣的可以翻看官方文檔。 由于 Grpc 是基于 HTTP2 協議進行傳輸的&#xff0c;因此 Srping …

深度學習之Transformer模型的Vision Transformer(ViT)和Swin Transformer

Transformer 模型最初由 Vaswani 等人在 2017 年提出,是一種基于自注意力機制的深度學習模型。它在自然語言處理(NLP)領域取得了巨大成功,并且也逐漸被應用到計算機視覺任務中。以下是兩種在計算機視覺領域中非常重要的 Transformer 模型:Vision Transformer(ViT)和 Swi…

git 個人常見錯誤備注

問題1&#xff1a;all conflict fixed but you are still merging。。。。。 如果你已經解決了所有沖突&#xff0c;但 Git 仍然提示你正在進行合并&#xff0c;可能是因為你還沒有完成合并過程。以下是詳細步驟&#xff0c;確保你正確完成合并并提交更改&#xff1a; 確認所…

Tongsuo(銅鎖)項目介紹 - 實現國密SSL協議

文章介紹 銅鎖(Tongsuo)是一個提供現代密碼學算法和安全通信協議的開源基礎密碼庫,為存儲、網絡、密鑰管理、隱私計算、區塊鏈等諸多業務場景提供底層的密碼學基礎能力,實現數據在傳輸、使用、存儲等過程中的私密性、完整性和可認證性,為數據生命周期中的隱私和安全提供保…

鴻蒙 如何 url decode

在 TypeScript 和 JavaScript 中進行 URL 編碼的最簡單方式是使用內置的 global 函數 encodeURIComponent()。以下是一個示例&#xff1a; let url "https://example.com/?name測試&job開發者"; let encodedURL encodeURIComponent(url); console.log(encode…

【RAG】FoRAG:面向網絡增強型長形式問答的事實性優化RAG

一、解決問題 在基于網絡的長形式問答&#xff08;Web-enhanced Long-form Question Answering, LFQA&#xff09;任務中&#xff0c;現有RAG在生成答案時存在的問題&#xff1a; 事實性不足&#xff1a;研究表明&#xff0c;現有系統生成的答案中只有大約一半的陳述能夠完全得…

Qt開發筆記:Qt3D三維開發筆記(一):Qt3D三維開發基礎概念介紹

若該文為原創文章&#xff0c;轉載請注明原文出處 本文章博客地址&#xff1a;https://blog.csdn.net/qq21497936/article/details/140059315 長沙紅胖子Qt&#xff08;長沙創微智科&#xff09;博文大全&#xff1a;開發技術集合&#xff08;包含Qt實用技術、樹莓派、三維、O…

匯編語言基礎教程

匯編語言基礎教程 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們將深入探討匯編語言的基礎知識和應用&#xff0c;幫助大家理解匯編語言在計算機編程中…

來自Claude官方的提示詞庫,支持中文!建議收藏!

大家好,我是木易,一個持續關注AI領域的互聯網技術產品經理,國內Top2本科,美國Top10 CS研究生,MBA。我堅信AI是普通人變強的“外掛”,所以創建了“AI信息Gap”這個公眾號,專注于分享AI全維度知識,包括但不限于AI科普,AI工具測評,AI效率提升,AI行業洞察。關注我,AI之…

多元時間序列分析——VAR(向量自回歸模型)

VAR模型主要是考察多個變量之間的動態互動關系&#xff0c;從而解釋各種經濟沖擊對經濟變量形成的動態影響。這種動態關系可通過格蘭杰因果關系、脈沖響應以及方差分解來進一步明確和可視化。VAR模型主要研究內生變量之間的關系&#xff0c;內生變量就是參與模型并由模型體系內…

通天星CMSV6車載監控平臺CompanyList信息泄露漏洞

1 漏洞描述 通天星CMSV6車載視頻監控平臺是東莞市通天星軟件科技有限公司研發的監控平臺,通天星CMSV6產品覆蓋車載錄像機、單兵錄像機、網絡監控攝像機、行駛記錄儀等產品的視頻綜合平臺。通天星科技應用于公交車車載、校車車載、大巴車車載、物流車載、油品運輸車載、警車車…

推薦一款程序員的搞錢神器

你是不是經常為開發環境的搭建而頭疼&#xff1f;有沒有遇到過因為接口開發而焦頭爛額的情況&#xff1f;作為一名程序員&#xff0c;特別是獨立開發者&#xff0c;這些問題是不是常常讓你覺得心力交瘁&#xff1f;別擔心&#xff0c;現在有一個神器&#xff0c;能讓你擺脫這些…

五、golang基礎之slice和map

文章目錄 一、slice&#xff08;一&#xff09;含義&#xff08;二&#xff09;定義切片&#xff08;三&#xff09;切片初始化&#xff08;四&#xff09;len() 和 cap() 函數&#xff08;五&#xff09;空(nil)切片&#xff08;六&#xff09;切片截取&#xff08;七&#xf…

2024HVV最新POC/EXP,目前有8000+個POC/EXP

點擊"仙網攻城獅”關注我們哦~ 不當想研發的滲透人不是好運維 讓我們每天進步一點點 簡介 都是網上收集的POC和EXP&#xff0c;最新收集時間是2024年五月&#xff0c;需要的自取。 表里沒有的可以翻翻之前的文章&#xff0c;資源比較零散沒有整合起來。 文件鏈接&#xff…

hexo博客搭建

系列文章目錄 文章目錄 系列文章目錄前言1. 環境配置2. 打包并發布到github倉庫3. 生成ssh秘鑰4.vscode配置本地與遠端相對路徑不一致問題總結 前言 本文主要介紹了hexo博客怎么搭建 1. 環境配置 安裝git、nodejs、npm創建博客文件夾blogcmd輸入命令npm install -g hexo初始化…

10波形震蕩原因及采集設備安裝視頻

10波形震蕩原因及采集設備安裝視頻 排查過程算法軟件后臺解碼計算嵌入式采集設備準備視頻 結語其他以下是廢話 之前說過&#xff1a;“解決不了的真的就不是我這邊能解決的了”&#xff0c;這是因為我們充分排查了自身&#xff0c;那么問題出在哪里呢&#xff1f; 不愿溝通、不…

Spring Boot 全面解析:從入門到實踐案例

引言&#xff1a; Spring Boot 是由 Pivotal 團隊提供的全新框架&#xff0c;旨在簡化 Spring 應用的初始搭建以及開發過程。它基于 Spring 平臺&#xff0c;通過“約定優于配置”的原則&#xff0c;盡可能自動化配置&#xff0c;減少XML配置&#xff0c;使得開發者能夠快速啟…