Elasticsearch 8.9 refresh刷Es緩沖區的數據到Lucene,更新segemnt,使數據可見

  • 一、相關API的handler
    • 1、接受HTTP請求的hander(RestRefreshAction)
    • 2、往數據節點發送刷新請求的action(TransportRefreshAction)
    • 3、數據節點接收主節點refresh傳輸的action(TransportShardRefreshAction)
  • 二、在IndexShard執行refresh操作
    • 1、根據入參決定是使用lucene提供的阻塞還是非阻塞API刷新數據
      • (1)、maybeRefresh和maybeRefreshBlocking的簡單介紹
  • 三、lucene源碼中執行邏輯
    • 1、判斷是否需要刷新

下面的圖來自ElasticSearch——刷盤原理流程,這篇文章主要講的是refresh命令把ES寫入索引緩沖區的數據刷進Lucene,使數據可供查詢,搜索,否則,在索引緩沖區是不可見的,不涉及到在translog.logLucene的數據結構。
通過這個流程知道ES如何把索引緩沖區的數據刷進Lucene的,主要是下面左中部分refresh部分

在這里插入圖片描述

其他部分源碼梳理
1、主節點同時寫入ES緩沖區和translog這一部分,請看Elasticsearch 8.9 Bulk批量給索引增加數據源碼
2、下半邊fsync的源碼邏輯,請看Elasticsearch 8.9 flush刷新緩存中的數據到磁盤源碼

一、相關API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP請求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/_refresh"),new Route(POST, "/_refresh"),new Route(GET, "/{index}/_refresh"),new Route(POST, "/{index}/_refresh"));}@Overridepublic String getName() {return "refresh_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {@Overrideprotected RestStatus getStatus(RefreshResponse response) {return response.getStatus();}});}
}

client.admin().indices().refresh()會執行到下面的父類TransportBroadcastReplicationActiondoExecute方法

2、往數據節點發送刷新請求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest,RefreshResponse,BasicReplicationRequest,ReplicationResponse> {@Injectpublic TransportRefreshAction(ClusterService clusterService,TransportService transportService,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver,NodeClient client) {super(RefreshAction.NAME,RefreshRequest::new,clusterService,transportService,client,actionFilters,indexNameExpressionResolver,TransportShardRefreshAction.TYPE,ThreadPool.Names.REFRESH);}//省略代碼
}
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>,Response extends BaseBroadcastResponse,ShardRequest extends ReplicationRequest<ShardRequest>,ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));}private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {return new CheckedConsumer<ActionListener<Response>, Exception>() {private int totalShardCopyCount;private int successShardCopyCount;private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();@Overridepublic void accept(ActionListener<Response> listener) {assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";final ClusterState clusterState = clusterService.state();final List<ShardId> shards = shards(request, clusterState);final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();try (var refs = new RefCountingRunnable(() -> finish(listener))) {//遍歷所有的分片for (final ShardId shardId : shards) {// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?shardExecute(task,request,shardId,ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()));}}}};}protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {assert Transports.assertNotTransportThread("may hit all the shards");ShardRequest shardRequest = newShardRequest(request, shardId);shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);}}    

3、數據節點接收主節點refresh傳輸的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<BasicReplicationRequest,ShardRefreshReplicaRequest,ReplicationResponse> {private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);public static final String NAME = RefreshAction.NAME + "[s]";public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);public static final String SOURCE_API = "api";@Injectpublic TransportShardRefreshAction(Settings settings,TransportService transportService,ClusterService clusterService,IndicesService indicesService,ThreadPool threadPool,ShardStateAction shardStateAction,ActionFilters actionFilters) {super(settings,NAME,transportService,clusterService,indicesService,threadPool,shardStateAction,actionFilters,BasicReplicationRequest::new,ShardRefreshReplicaRequest::new,ThreadPool.Names.REFRESH);// registers the unpromotable version of shard refresh actionnew TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);}@Overrideprotected void shardOperationOnPrimary(BasicReplicationRequest shardRequest,IndexShard primary,ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener) {primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);replicaRequest.setParentTask(shardRequest.getParentTask());logger.trace("{} refresh request executed on primary", primary.shardId());l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));}));}
}    

primary.externalRefresh執行分片的刷新

二、在IndexShard執行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {verifyNotClosed();getEngine().externalRefresh(source, listener);}public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {ActionListener.completeWith(listener, () -> {logger.trace("external refresh with source [{}]", source);return refresh(source);});}   

getEngine()的實現是InternalEngine

  @Overridepublic RefreshResult refresh(String source) throws EngineException {return refresh(source, SearcherScope.EXTERNAL, true);}

1、根據入參決定是使用lucene提供的阻塞還是非阻塞API刷新數據

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {//這兩種刷新類型都會導致內部刷新,但只有外部刷新類型也會將新的讀取器引用傳遞給外部讀取器管理器。//獲取當前的本地檢查點。final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();boolean refreshed;long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;try {//refresh 不需要按住 readLock,因為如果引擎在中途關閉,ReferenceManager 可以正確處理。if (store.tryIncRef()) {try {//盡管我們保留了 2 managers,但我們實際上只做過一次繁重的工作。第二次刷新只會做我們必須做的額外工作,以預熱緩存等。ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();//根據參數決定是進行阻塞刷新還是非阻塞刷新if (block) { //刷新可能會導致阻塞referenceManager.maybeRefreshBlocking();refreshed = true;} else {//刷新不會導致阻塞refreshed = referenceManager.maybeRefresh();}//如果刷新成功,獲取當前的讀取器,并更新段的生成號if (refreshed) {//獲取當前的目錄final ElasticsearchDirectoryReader current = referenceManager.acquire();try {//更新segment信息segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);} finally {referenceManager.release(current);}}} finally {store.decRef();}if (refreshed) {lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);}} else {refreshed = false;}} catch (AlreadyClosedException e) {failOnTragicEvent(e);throw e;} catch (Exception e) {try {failEngine("refresh failed source[" + source + "]", e);} catch (Exception inner) {e.addSuppressed(inner);}throw new RefreshFailedEngineException(shardId, e);}assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh: "refresh checkpoint was not advanced; "+ "local_checkpoint="+ localCheckpointBeforeRefresh+ " refresh_checkpoint="+ lastRefreshedCheckpoint();// TODO: maybe we should just put a scheduled job in threadPool?// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes// for a long time:maybePruneDeletes();mergeScheduler.refreshConfig();return new RefreshResult(refreshed, segmentGeneration);}

其中referenceManager 根據入參是 SearcherScope.EXTERNAL 獲得的實現是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;@Overrideprotected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {return switch (scope) {case INTERNAL -> internalReaderManager;case EXTERNAL -> externalReaderManager;};}

根據入參中的block=true 實際執行的是referenceManager.maybeRefreshBlocking(); 來刷新,是異步非阻塞的,
并且根據下圖ExternalReaderManager繼承了ReferenceManager,所以沒有重寫maybeRefreshBlocking 所以執行的是父類ReferenceManager

import org.apache.lucene.search.ReferenceManager;@SuppressForbidden(reason = "reference counting is required here")private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {//省略代碼}@Overrideprotected boolean tryIncRef(ElasticsearchDirectoryReader reference) {return reference.tryIncRef();}@Overrideprotected int getRefCount(ElasticsearchDirectoryReader reference) {return reference.getRefCount();}@Overrideprotected void decRef(ElasticsearchDirectoryReader reference) throws IOException {reference.decRef();}}

(1)、maybeRefresh和maybeRefreshBlocking的簡單介紹

下面是lucene源碼中關于這兩個API的實現,

//這個是會嘗試獲取刷新鎖,如果沒有則不執行刷新操作public final boolean maybeRefresh() throws IOException {this.ensureOpen();boolean doTryRefresh = this.refreshLock.tryLock();if (doTryRefresh) {try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}return doTryRefresh;}//這里會等待獲取刷新鎖,所以會阻塞public final void maybeRefreshBlocking() throws IOException {this.ensureOpen();this.refreshLock.lock();try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}

但是實際上最后執行刷新還是執行的this.doMaybeRefresh() 方法

三、lucene源碼中執行邏輯

private void doMaybeRefresh() throws IOException {this.refreshLock.lock();boolean refreshed = false;try {Object reference = this.acquire();try {//通知刷新監聽器。this.notifyRefreshListenersBefore();//調用 refreshIfNeeded(reference) 返回一個新的引用 (newReference)//用來判斷是否需要刷新,如果不需要刷新,refreshIfNeeded 應返回 nullG newReference = this.refreshIfNeeded(reference);if (newReference != null) {assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";try {//調用 swapReference(newReference) 方法來交換舊的引用為新的引用。this.swapReference(newReference);//設置 refreshed 為 true 表示刷新成功。   refreshed = true;} finally {//如果刷新失敗,釋放新的引用if (!refreshed) {this.release(newReference);}}}} finally {//釋放舊的引用this.release(reference);//通知刷新監聽器刷新完成this.notifyRefreshListenersRefreshed(refreshed);}this.afterMaybeRefresh();} finally {//最后釋放刷新鎖this.refreshLock.unlock();}}

1、判斷是否需要刷新

其中refreshIfNeeded用的是子類ExternalReaderManager的實現方法

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {internalReaderManager.maybeRefreshBlocking();//獲取其reader對象。final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();//isWarmedUp為false或者獲取到的新reader對象與傳入的referenceToRefresh對象不相等,說明需要刷新if (isWarmedUp == false || newReader != referenceToRefresh) {boolean success = false;try {refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);isWarmedUp = true;success = true;} finally {if (success == false) {internalReaderManager.release(newReader);}}}//沒有任何變化 - 兩個 ref 管理器共享同一個實例,因此我們可以使用引用相等性,不需要執行刷新操作if (referenceToRefresh == newReader) {internalReaderManager.release(newReader);return null;} else {return newReader; // steal the reference}}
}        

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/212637.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/212637.shtml
英文地址,請注明出處:http://en.pswp.cn/news/212637.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【華為數據之道學習筆記】3-8以確保合規遵從為核心的外部數據管理

一、以確保合規遵從為核心的外部數據管理 外部數據是指華為公司引入的外部組織或者個人擁有處置權利的 數據&#xff0c;如供應商資質證明、消費者洞察報告等。外部數據治理的出發點是合規遵從優先&#xff0c;與內部數據治理的目的不同。 外部數據的治理主要遵循以下原則。 1&…

【設計模式--創建型--原型模式】

設計模式--創建型--原型模式 原型模式概述結構實現結果 案例代碼結果使用場景 擴展&#xff08;深\淺克隆&#xff09;淺克隆演示&#xff1a;結果&#xff1a;使用深克隆&#xff08;利用對象流&#xff09;結果 原型模式 概述 用一個已經創建的實例作為原型&#xff0c;通過…

Go簡單了解

0.一直很好奇,go是不是像傳說中的速度快,解決了多線程問題,快速進行了解了解,和java進行對比,他是怎么解決語言發展的問題的…,所有語言都是差不多的,只是熟練程度不同而已 1.go圖標是土撥鼠,2009發行 docker使用go,解決了并發問題 google facebook 騰訊 百度 七牛云 京東 小米…

Spring Cloud Gateway + Nacos + LoadBalancer實現企業級網關

1. Spring Cloud Gateway 整合Nacos、LoadBalancer 實現企業級網關 前置工作&#xff1a; 創建 SpringBoot 多模塊項目創建網關&#xff08;gateway-service&#xff09;、用戶&#xff08;user-service&#xff09;模塊用戶模塊添加 Nacos discovery 支持以及 Spring Web&am…

gitbash下載安裝

參考教程 零、下載 官網地址 2.43.0win64 鏈接&#xff1a;https://pan.baidu.com/s/16urs_nmky7j20-qNzUTTkg 提取碼&#xff1a;7jaq 一、安裝 圖標組件&#xff08;Additional icons&#xff09;&#xff1a;選擇是否創建桌面快捷方式&#xff1b;桌面瀏覽&#xff08;Win…

設計模式--命令模式的簡單例子

引入&#xff1a;以一個對數組的增刪改查為例。通過命令模式可以對數組進行增刪改查以及撤銷回滾。 一、基本概念 命令模式有多種分法&#xff0c;在本文中主要分為CommandMgr、Command、Receiver. CommandMgr主要用于控制命令執行等操作、Command為具體的命令、Receiver為命…

逸迅科技丁紅陽:三種能力幫助企業打造GBI “護城河”

大數據產業創新服務媒體 ——聚焦數據 改變商業 近日&#xff0c;由上海市經濟和信息化委員會、上海市科學技術委員會指導&#xff0c;數據猿與上海大數據聯盟聯合主辦的“2023企業數智化轉型升級發展論壇”在上海舉行。本次論壇以“釋放數字價值驅動智能升級”為主題&#xf…

piakachu越權漏洞

水平越權 首先打開這一關&#xff0c;在右側有一些提示&#xff0c;我們可以看到 然后我們隨便輸入一組信息即可&#xff0c;可以在url中看到這樣的字段 當我們嘗試在url中直接更換另一個用戶名時可以發現&#xff0c;直接切換到了另一個用戶的身份 垂直越權 這里可以看到右邊…

QML和C++交互中,實現C++中connect到qml的信號,再從qml發射信號傳遞數據給C++的一種方式

1.需求&#xff1a; 假設我們有一個需求&#xff0c;要求在用戶點擊列表中的項目時&#xff0c;不僅在控制臺上輸出項目的名稱&#xff0c;還要在C端進行一些處理。我們希望在C端能夠接收到用戶點擊的項目名稱&#xff0c;并進行相應的處理。 2.分析&#xff1a; 在這種情況…

Android 10.0 系統framework修改低電量關機值為2%

1.前言 在10.0的系統產品開發中,在系統關于低電量關機的值,每個平臺都不同,根據實際開發底層硬件的要求看實際情況來調整這個值, 所以需要分析相關的電量變化執行的代碼流程,來實現這個功能 2.系統framework修改低電量關機值為2%的核心類 frameworks\base\services\cor…

一文學會使用 PyInstaller 將 Python 腳本打包為 .exe 可執行文件

文章目錄 前言PyInstaller特點跨平臺支持自動依賴項處理單文件發布支持圖形用戶界面&#xff08;GUI&#xff09;和命令行界面&#xff08;CLI&#xff09;應用支持多種打包選項 基本用法常用參數其它參數 版本 & 環境實現步驟安裝 PyInstaller創建 Python 腳本使用 PyInst…

Strange-Towers-of-Hanoi

title: Strange Towers of Hanoi date: 2023-12-11 03:20:05 tags: 遞推 categories: 算法進階指南 題目大意 解出 n n n 個盒子 4 4 4 座塔的漢諾塔問題最少需要多少次&#xff1f; 思路 首先考慮 n n n 個盒子 3 3 3 座塔的經典漢諾塔問題&#xff0c;設 d [ n ] d[n] …

第三十章 控制到 XML 模式的映射 - Array of Classname

文章目錄 第三十章 控制到 XML 模式的映射 - Array of ClassnameArray of Classname 第三十章 控制到 XML 模式的映射 - Array of Classname Array of Classname 本部分顯示了從啟用 XML 的類生成的XML 架構的一部分&#xff0c;此時該類包含定義為類名數組的屬性。例如&…

【SpringBoot教程】SpringBoot 創建定時任務(配合數據庫動態執行)

作者簡介&#xff1a;大家好&#xff0c;我是擼代碼的羊駝&#xff0c;前阿里巴巴架構師&#xff0c;現某互聯網公司CTO 聯系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我進群&#xff0c;大家一起學習&#xff0c;一起進步&#xff0c;一起對抗…

transformer模型結構|李宏毅機器學習21年

來源&#xff1a;https://www.bilibili.com/video/BV1Bb4y1L7FT?p4&vd_sourcef66cebc7ed6819c67fca9b4fa3785d39 文章目錄 概述seq2seqtransformerEncoderDecoderAutoregressive&#xff08;AT&#xff09;self-attention與masked-self attentionmodel如何決定輸出的長度…

【親測有效】支持橫豎屏 微信小程序video禁止進度條拖動,微信小程序遮罩進度條,

背景&#xff1a;部分課程禁止客戶拖動視頻進度條直至播放結束 紅色是遮罩區域遮罩區域 實際遮罩效果&#xff08;有一個很淺的陰影區域&#xff09; 實現代碼 .wxml文件 <video enable-progress-gesture"false" ><cover-view class"cover">…

基于深度學習的yolov7植物病蟲害識別及防治系統

歡迎大家點贊、收藏、關注、評論啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代碼。 文章目錄 一項目簡介簡介YOLOv7 系統特性工作流程 二、功能三、系統四. 總結 一項目簡介 # YOLOv7植物病蟲害識別及防治系統介紹 簡介 該系統基于深度學習技術&#xff0c;采…

Seata配置

參考教程 seata 分布式事務的環境搭建與使用 Seata 1.4.0 nacos配置和使用&#xff0c;超詳細 Seata 1.4.2 的安裝 Nacos的配置和使用 官網下載地址 本文以v1.4.1為例 1.數據庫及表的創建 創建seata數據庫&#xff0c;創建以下表&#xff08;右鍵連接-》新建數據庫seata-》…

kubeadm搭建1.20.7版本k8s

資源 服務器名稱ip地址服務master1&#xff08;2C/4G&#xff0c;cpu核心數要求大于2&#xff09;192.168.100.10docker、kubeadm、kubelet、kubectl、flannelnode01&#xff08;2C/2G&#xff09;192.168.100.30docker、kubeadm、kubelet、kubectl、flannelnode02&#xff08…

windows系統proteus中Ardunio Mega 2560和虛擬機上Ubuntu系統CuteCom進行串口通信

在文章利用proteus實現串口助手和arduino Mega 2560的串口通信-CSDN博客 中&#xff0c;實現了windows系統的proteus中Ardunio Mega 2560和SSCOM通過虛擬串口進行通信。虛擬串口的連接示意圖如下圖所示。 在文章windows系統和虛擬機上ubuntu系統通過虛擬串口進行通信-CSDN博客…