選舉集群狀態
es中存儲的數據有一下幾種,state元數據、lucene索引文件、translog事務日志
元數據信息可以分為:
- 集群層面的元信息-對應著metaData數據結構,主要是clusterUUid、settings、templates等
- 索引層面的元信息-對應著indexMetaData數據結構,主要存儲分片數量、mappings索引字段映射等
- 分片層面的元信息-對應著shardStateMetaData,主要是version、indexUUid、主分片等
每個節點可能會有不同的集群狀態,需要選擇正確的元數據作為權威源數據。狀態信息的管理在gatewayService中,它實現了ClusterStateListener接口,當選擇完主節點后會發布一個集群狀態task,觸發回調方法clusterChanged
//恢復分片分配狀態
performStateRecovery(enforceRecoverAfterTime, reason);
集群層和索引層元數據恢復在gateway模塊完成
public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state = event.state();//只有主節點才能執行if (state.nodes().isLocalNodeElectedMaster() == false) {// not our job to recoverreturn;}//已經執行過了集群狀態和索引狀態恢復了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {// already recoveredreturn;}//這段省略主要是檢查是否達到恢復狀態條件......//恢復狀態performStateRecovery(enforceRecoverAfterTime, reason);
}
首先判斷只有主節點可以執行狀態選舉,然后判斷是否已經在執行了狀態恢復任務了,如果是則直接返回;如果沒有則執行恢復狀態任務
最終會調用recoveryRunnable.run()
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable = () ->gateway.performStateRecovery(new GatewayRecoveryListener());
執行gateway的performStateRecovery方法
首先回去所有master資格的節點信息
//具有master資格的node節點final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
獲取其他master節點的元數據
//獲取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
這里我們看下TransportNodesListGatewayMetaState的構造函數
public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState = metaState;
}//注冊action處理類
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());
回到list方法,會調用doExecute方法
public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future;
}protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {//執行new AsyncAction(task, request, listener).start();
}
發送所有節點獲取元數據
void start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {//沒有需要獲取數據的node// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder = TransportRequestOptions.builder();if (request.timeout() != null) {builder.withTimeout(request.timeout());}//循環發送請求給所有節點for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//發送請求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//處理返回@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}@Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}}
}
對端接收請求后處理在上面注冊的NodeTransportHandler,構造每個節點元數據返回
//node請求處理class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {@Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}
我們繼續回到每個節點發送請求的返回處理
//處理返回
@Override
public void handleResponse(NodeResponse response) {onOperation(idx, response);
}private void onOperation(int idx, NodeResponse nodeResponse) {//記錄node的返回結果responses.set(idx, nodeResponse);//當所有節點都返回結果了無論是失敗還是成功了if (counter.incrementAndGet() == responses.length()) {finishHim();}
}private void finishHim() {NodesResponse finalResponse;try {finalResponse = newResponse(request, responses);} catch (Exception e) {logger.debug("failed to combine responses from nodes", e);listener.onFailure(e);return;}//觸發監聽回調listener.onResponse(finalResponse);
}
及獲取到了其他節點的元數據,繼續回到performStateRecovery
需要獲取的master角色節點數
//需要分配數量
final int requiredAllocation = Math.max(1, minimumMasterNodes);
開始通過版本號選擇集群層元數據,比較版本號,選擇版本號最大的集群狀態
//集群元數據
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}found++;if (electedGlobalState == null) {electedGlobalState = nodeState.metadata();//比較版本號大的勝出} else if (nodeState.metadata().version() > electedGlobalState.version()) {electedGlobalState = nodeState.metadata();}for (final ObjectCursor<IndexMetadata> cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);}
}
檢查是否有足夠數量節點返回了集群狀態
//沒有獲取足夠的節點返回消息
if (found < requiredAllocation) {listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");return;
}
構造集群狀態,刪除索引信息,下面會選擇索引級元數據
//更新全局狀態,清理索引,我們在下一階段選擇它們final Metadata.Builder metadataBuilder = Metadata.builder(electedGlobalState).removeAllIndices();
遍歷所有節點選擇返回的索引元數據版本最高的節點作為索引級元數據,然后將索引級元數據添加到metadataBuilder中
for (int i = 0; i < keys.length; i++) {if (keys[i] != null) {final Index index = (Index) keys[i];IndexMetadata electedIndexMetadata = null;int indexMetadataCount = 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}final IndexMetadata indexMetadata = nodeState.metadata().index(index);if (indexMetadata == null) {continue;}if (electedIndexMetadata == null) {electedIndexMetadata = indexMetadata;//比較版本號,選擇最大版本號} else if (indexMetadata.getVersion() > electedIndexMetadata.getVersion()) {electedIndexMetadata = indexMetadata;}indexMetadataCount++;}if (electedIndexMetadata != null) {if (indexMetadataCount < requiredAllocation) {logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//設置索引級元數據metadataBuilder.put(electedIndexMetadata, false);}}
}
構造恢復后的集群級元數據和索引級元數據
//恢復后的集群狀態
ClusterState recoveredState = Function.<ClusterState>identity().andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);
調用GatewayRecoveryListener的onSuccess向集群提交任務
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic void onFailure(final String msg) {logger.info("state recovery failed: {}", msg);resetRecoveredFlags();}}
調用RecoverStateUpdateTask的execute方法
@Override
public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//狀態信息恢復完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//開始分配分片return allocationService.reroute(newState, "state recovered");
}
集群元數據和索引級元數據恢復完成開始分配分片
- 元數據的持久化
具有master資格的節點和數據節點可以持久化集群狀態,當接收到集群狀態變更時會將其持久化到磁盤GatewayClusterApplier實現了ClusterStateApplier,當集群狀態變更時會調用applyClusterState方法
@Override
public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// that's higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盤上的元數據incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn("Exception occurred when storing new meta data", e);}
}
將集群級元數據和索引級元數據落盤
void updateClusterState(ClusterState newState) throws WriteStateException {//元數據Metadata newMetadata = newState.metadata();final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元數據long globalStateGeneration = writeGlobalState(writer, newMetadata);//索引級元數據Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest = manifest;previousClusterState = newState;final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +"wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());}
}
- 加載磁盤元數據
在node實例的start方法中會調用gatewayMetaState.start方法
//集群元數據
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));
然后會調用loadFullState方法
//加載元數據
manifestClusterStateTuple = metaStateService.loadFullState();public Tuple<Manifest, Metadata> loadFullState() throws IOException {//加載最新的狀態文件final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest == null) {return loadFullStateBWC();}//構建元數據final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder = Metadata.builder();} else {final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata != null) {metadataBuilder = Metadata.builder(globalMetadata);} else {throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]");}}//索引級元數據for (Map.Entry<Index, Long> entry : manifest.getIndexGenerations().entrySet()) {final Index index = entry.getKey();final long generation = entry.getValue();final String indexFolderName = index.getUUID();final IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata != null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName +", generation: " + generation + "]");}}return new Tuple<>(manifest, metadataBuilder.build());
}
從磁盤讀取構建索引級元數據和集群級元數據,用于構建集群狀態對象ClusterState