- 一、相關API的handler
- 1、接受HTTP請求的hander(RestRefreshAction)
- 2、往數據節點發送刷新請求的action(TransportRefreshAction)
- 3、數據節點接收主節點refresh傳輸的action(TransportShardRefreshAction)
- 二、在IndexShard執行refresh操作
- 1、根據入參決定是使用lucene提供的阻塞還是非阻塞API刷新數據
- (1)、maybeRefresh和maybeRefreshBlocking的簡單介紹
- 三、lucene源碼中執行邏輯
- 1、判斷是否需要刷新
下面的圖來自ElasticSearch——刷盤原理流程,這篇文章主要講的是refresh命令把ES寫入索引緩沖區的數據刷進Lucene,使數據可供查詢,搜索,否則,在索引緩沖區是不可見的,不涉及到在
translog.log
和Lucene
的數據結構。
通過這個流程知道ES如何把索引緩沖區的數據刷進Lucene的,主要是下面左中部分refresh部分
其他部分源碼梳理
1、主節點同時寫入ES緩沖區和translog這一部分,請看Elasticsearch 8.9 Bulk批量給索引增加數據源碼
2、下半邊fsync的源碼邏輯,請看Elasticsearch 8.9 flush刷新緩存中的數據到磁盤源碼
一、相關API的handler
在ActionModule.java
中
registerHandler.accept(new RestRefreshAction());actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);
1、接受HTTP請求的hander(RestRefreshAction)
public class RestRefreshAction extends BaseRestHandler {@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/_refresh"),new Route(POST, "/_refresh"),new Route(GET, "/{index}/_refresh"),new Route(POST, "/{index}/_refresh"));}@Overridepublic String getName() {return "refresh_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {@Overrideprotected RestStatus getStatus(RefreshResponse response) {return response.getStatus();}});}
}
client.admin().indices().refresh()
會執行到下面的父類TransportBroadcastReplicationAction
的doExecute
方法
2、往數據節點發送刷新請求的action(TransportRefreshAction)
public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest,RefreshResponse,BasicReplicationRequest,ReplicationResponse> {@Injectpublic TransportRefreshAction(ClusterService clusterService,TransportService transportService,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver,NodeClient client) {super(RefreshAction.NAME,RefreshRequest::new,clusterService,transportService,client,actionFilters,indexNameExpressionResolver,TransportShardRefreshAction.TYPE,ThreadPool.Names.REFRESH);}//省略代碼
}
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>,Response extends BaseBroadcastResponse,ShardRequest extends ReplicationRequest<ShardRequest>,ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));}private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {return new CheckedConsumer<ActionListener<Response>, Exception>() {private int totalShardCopyCount;private int successShardCopyCount;private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();@Overridepublic void accept(ActionListener<Response> listener) {assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";final ClusterState clusterState = clusterService.state();final List<ShardId> shards = shards(request, clusterState);final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();try (var refs = new RefCountingRunnable(() -> finish(listener))) {//遍歷所有的分片for (final ShardId shardId : shards) {// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?shardExecute(task,request,shardId,ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()));}}}};}protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {assert Transports.assertNotTransportThread("may hit all the shards");ShardRequest shardRequest = newShardRequest(request, shardId);shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);}}
3、數據節點接收主節點refresh傳輸的action(TransportShardRefreshAction)
public class TransportShardRefreshAction extends TransportReplicationAction<BasicReplicationRequest,ShardRefreshReplicaRequest,ReplicationResponse> {private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);public static final String NAME = RefreshAction.NAME + "[s]";public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);public static final String SOURCE_API = "api";@Injectpublic TransportShardRefreshAction(Settings settings,TransportService transportService,ClusterService clusterService,IndicesService indicesService,ThreadPool threadPool,ShardStateAction shardStateAction,ActionFilters actionFilters) {super(settings,NAME,transportService,clusterService,indicesService,threadPool,shardStateAction,actionFilters,BasicReplicationRequest::new,ShardRefreshReplicaRequest::new,ThreadPool.Names.REFRESH);// registers the unpromotable version of shard refresh actionnew TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);}@Overrideprotected void shardOperationOnPrimary(BasicReplicationRequest shardRequest,IndexShard primary,ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener) {primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);replicaRequest.setParentTask(shardRequest.getParentTask());logger.trace("{} refresh request executed on primary", primary.shardId());l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));}));}
}
primary.externalRefresh
執行分片的刷新
二、在IndexShard執行refresh操作
public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {verifyNotClosed();getEngine().externalRefresh(source, listener);}public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {ActionListener.completeWith(listener, () -> {logger.trace("external refresh with source [{}]", source);return refresh(source);});}
getEngine()
的實現是InternalEngine
類
@Overridepublic RefreshResult refresh(String source) throws EngineException {return refresh(source, SearcherScope.EXTERNAL, true);}
1、根據入參決定是使用lucene提供的阻塞還是非阻塞API刷新數據
protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {//這兩種刷新類型都會導致內部刷新,但只有外部刷新類型也會將新的讀取器引用傳遞給外部讀取器管理器。//獲取當前的本地檢查點。final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();boolean refreshed;long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;try {//refresh 不需要按住 readLock,因為如果引擎在中途關閉,ReferenceManager 可以正確處理。if (store.tryIncRef()) {try {//盡管我們保留了 2 managers,但我們實際上只做過一次繁重的工作。第二次刷新只會做我們必須做的額外工作,以預熱緩存等。ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();//根據參數決定是進行阻塞刷新還是非阻塞刷新if (block) { //刷新可能會導致阻塞referenceManager.maybeRefreshBlocking();refreshed = true;} else {//刷新不會導致阻塞refreshed = referenceManager.maybeRefresh();}//如果刷新成功,獲取當前的讀取器,并更新段的生成號if (refreshed) {//獲取當前的目錄final ElasticsearchDirectoryReader current = referenceManager.acquire();try {//更新segment信息segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);} finally {referenceManager.release(current);}}} finally {store.decRef();}if (refreshed) {lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);}} else {refreshed = false;}} catch (AlreadyClosedException e) {failOnTragicEvent(e);throw e;} catch (Exception e) {try {failEngine("refresh failed source[" + source + "]", e);} catch (Exception inner) {e.addSuppressed(inner);}throw new RefreshFailedEngineException(shardId, e);}assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh: "refresh checkpoint was not advanced; "+ "local_checkpoint="+ localCheckpointBeforeRefresh+ " refresh_checkpoint="+ lastRefreshedCheckpoint();// TODO: maybe we should just put a scheduled job in threadPool?// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes// for a long time:maybePruneDeletes();mergeScheduler.refreshConfig();return new RefreshResult(refreshed, segmentGeneration);}
其中referenceManager
根據入參是 SearcherScope.EXTERNAL
獲得的實現是ExternalReaderManager
private final ExternalReaderManager externalReaderManager;@Overrideprotected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {return switch (scope) {case INTERNAL -> internalReaderManager;case EXTERNAL -> externalReaderManager;};}
根據入參中的block=true
實際執行的是referenceManager.maybeRefreshBlocking();
來刷新,是異步非阻塞的,
并且根據下圖ExternalReaderManager
繼承了ReferenceManager
,所以沒有重寫maybeRefreshBlocking
所以執行的是父類ReferenceManager
的
import org.apache.lucene.search.ReferenceManager;@SuppressForbidden(reason = "reference counting is required here")private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {//省略代碼}@Overrideprotected boolean tryIncRef(ElasticsearchDirectoryReader reference) {return reference.tryIncRef();}@Overrideprotected int getRefCount(ElasticsearchDirectoryReader reference) {return reference.getRefCount();}@Overrideprotected void decRef(ElasticsearchDirectoryReader reference) throws IOException {reference.decRef();}}
(1)、maybeRefresh和maybeRefreshBlocking的簡單介紹
下面是lucene源碼中關于這兩個API的實現,
//這個是會嘗試獲取刷新鎖,如果沒有則不執行刷新操作public final boolean maybeRefresh() throws IOException {this.ensureOpen();boolean doTryRefresh = this.refreshLock.tryLock();if (doTryRefresh) {try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}return doTryRefresh;}//這里會等待獲取刷新鎖,所以會阻塞public final void maybeRefreshBlocking() throws IOException {this.ensureOpen();this.refreshLock.lock();try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}
但是實際上最后執行刷新還是執行的this.doMaybeRefresh()
方法
三、lucene源碼中執行邏輯
private void doMaybeRefresh() throws IOException {this.refreshLock.lock();boolean refreshed = false;try {Object reference = this.acquire();try {//通知刷新監聽器。this.notifyRefreshListenersBefore();//調用 refreshIfNeeded(reference) 返回一個新的引用 (newReference)//用來判斷是否需要刷新,如果不需要刷新,refreshIfNeeded 應返回 nullG newReference = this.refreshIfNeeded(reference);if (newReference != null) {assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";try {//調用 swapReference(newReference) 方法來交換舊的引用為新的引用。this.swapReference(newReference);//設置 refreshed 為 true 表示刷新成功。 refreshed = true;} finally {//如果刷新失敗,釋放新的引用if (!refreshed) {this.release(newReference);}}}} finally {//釋放舊的引用this.release(reference);//通知刷新監聽器刷新完成this.notifyRefreshListenersRefreshed(refreshed);}this.afterMaybeRefresh();} finally {//最后釋放刷新鎖this.refreshLock.unlock();}}
1、判斷是否需要刷新
其中refreshIfNeeded
用的是子類ExternalReaderManager
的實現方法
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {internalReaderManager.maybeRefreshBlocking();//獲取其reader對象。final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();//isWarmedUp為false或者獲取到的新reader對象與傳入的referenceToRefresh對象不相等,說明需要刷新if (isWarmedUp == false || newReader != referenceToRefresh) {boolean success = false;try {refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);isWarmedUp = true;success = true;} finally {if (success == false) {internalReaderManager.release(newReader);}}}//沒有任何變化 - 兩個 ref 管理器共享同一個實例,因此我們可以使用引用相等性,不需要執行刷新操作if (referenceToRefresh == newReader) {internalReaderManager.release(newReader);return null;} else {return newReader; // steal the reference}}
}