背景
以 ES 存儲日志,且需要對日志進行分頁檢索,當數據量過大時,就面臨 ES 萬條以外的數據檢索問題,如何利用滾動檢索實現這個需求呢?本文介紹 ES 分頁檢索萬條以外的數據實現方法及注意事項。
需求分析
用 ES 存儲數據,分頁檢索,當 ES 數據量過大時,在頁面上直接點擊最后一頁時,怎么保證請求能正常返回?
常規思路就是,超過萬條以后,使用滾動檢索,但需要注意:編寫滾動檢索的分頁查詢時,滾動請求的 size 一定不能用頁面分頁參數的 pageSize ,要能快速滾動到目標頁所在的數據,最好以 ES 最大檢索窗口值。
算法要點
第一,滾動檢索的 Request 請求不能包含 from 屬性, 且設置了 size 參數后,以后的每次滾動返回的數據量都以 size 為主。
第二,滾動獲取數據的 size 選取。 滾動分頁檢索高效的關鍵是不能以頁面分頁參數 pageSize 作為滾動請求的 size ,而是以一個較大的數,或者直接以 ES 默認的滾動窗口最大值 10000 作為每批次獲取的數據量。
第三,計算目標頁的數據所在的位置。
- 根據分頁參數計算出目標數據的位置是
[(pageSize-1)*pageSize, pageSize * pageNo]
,為了拿到目標頁的數據,總共的數據量total = pageNo * pageSize
。 - 目標數據在最終數據中的真正范圍決定因素:
mode = total % 10000
。 - 計算滾動請求幾次能拿到目標數據。實際需要滾動請求的次數
scrollCount = mode == 0 ? total/ esWindowCount : (total/ esWindowCount + 1)
。 - 目標頁的數據有沒有分布在兩次請求中。當
10000 % pageSize !=0
時,說明這一頁的數據會橫跨兩次 ES 請求。例如pageSize =15,pageNo = 2667,total = 40005,
目標頁的數據包含在最后兩次請求中,倒數第二次請求中有 10 條數據,最后一次請求中有 5 條數據,合起來才是一整頁的 15 條數據。 - 最后一頁數據不足 pageSize 時,最后一頁數據真正的長度。
第四,分頁數據所在范圍處理。 當最后一批次獲取到數據后,從中摘出目標頁的數據時,需要考慮的四種情況,主要是 mode 和最終獲取的數據總長度直接的關系:
case 1:上圖左,mode=0 時存在最后一頁不足 size 的情況,realSize = size - (windowSize-length)
。
case 2:上圖右,length < mode 時,最后一頁不足 size 的情況,realSize = size - (mode -length)
。
最終的數據區間是 [from,to ] = [ length -realSize,length -1 ]
。
數據總長度 = end -start +1 = realSize
。
case 3 :上圖左,分頁數據在 mode 往前推 size 條。
case 4:上圖右,分頁數據橫跨兩次請求,兩批數據組合成一頁數據。
編碼實現
編寫 ES 滾動分頁檢索請求,處理超過萬條之外的查詢操作:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.builder.SearchSourceBuilder;import java.io.IOException;
import java.util.*;@Slf4j
public class EsPageUtil {/*** 真正的 ES 連接對象*/private RestHighLevelClient client;public void initClient() {// TODO 初始化 client 對象}/*** 使用 DSL JSON 配置創建檢索請求 Builder* @param queryJson* @return*/public SearchSourceBuilder createSearchSource(String queryJson) {if (StringUtils.isEmpty(queryJson)) {log.error("ElasticSearch dsl config is empty.");return null;}SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();try {SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());NamedXContentRegistry registry = new NamedXContentRegistry(searchModule.getNamedXContents());XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(registry, LoggingDeprecationHandler.INSTANCE, queryJson);searchSourceBuilder.parseXContent(parser);return searchSourceBuilder;} catch (Exception e) {log.error("Parse dsl error.", e);return null;}}/*** ES 分頁查詢:區分萬條以內還是萬條以外* @param pageSize 分頁size* @param pageNo 查詢頁數* @param indices 目標索引* @param queryJson 查詢 DSL JSON 格式字符串* @return*/public Map<String, Object> queryByPage(int pageSize, int pageNo, String[] indices, String queryJson) {SearchSourceBuilder searchSourceBuilder = createSearchSource(queryJson);if (searchSourceBuilder == null) {return null;}// 創建請求對象SearchRequest searchRequest = new SearchRequest(indices).source(searchSourceBuilder);Map<String, Object> result = new HashMap<>();List<Map<String, Object>> data = null;int total = pageSize * pageNo ;int maxEsWindow = 10000;try {if (total <= 10000) {// 萬條以內,直接查詢:設置 from , size 屬性searchSourceBuilder .from((pageNo - 1) * pageSize) .size(pageSize);SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);data = parseResponseToListData(response);} else {// 萬條以外,以 ES 最大窗口值查詢:只設置size 屬性searchSourceBuilder.size(maxEsWindow);data = scrollQuery(maxEsWindow, pageSize, total, searchRequest);}} catch (IOException e) {log.error("ElasticSearch query error.", e);}result.put("total" , 0);result.put("data" , data);return result;}/*** 滾動查詢** @param esWindowCount* @param pageSize* @param total* @param searchRequest* @return*/private List scrollQuery(int esWindowCount, int pageSize, int total , SearchRequest searchRequest) {List pageData = new ArrayList(pageSize);//創建滾動,指定滾動查詢保持的時間final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L));//添加滾動searchRequest.scroll(scroll);//提交第一次請求SearchResponse searchResponse = null;String scrollId = null;try {searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//獲取滾動查詢idscrollId = searchResponse.getScrollId();} catch (IOException e) {log.error("Elasticsearch request error.", e);return pageData;}int counter = 2;int mode = total % esWindowCount;int realPageCount = mode == 0 ? total/ esWindowCount : (total/ esWindowCount + 1);while (counter <= realPageCount) {// 設置滾動查詢id,從id開始繼續向下查詢SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);// 重置查詢時間,若不進行重置,則在提交的第一次請求中設置的時間結束,滾動查詢將失效scrollRequest.scroll(scroll);// 提交請求,獲取結果try {searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("Elasticsearch scroll request error.", e);}// size 非 10 的整數,則當前頁數據橫跨兩個 Scroll 請求if (mode != 0 && mode < pageSize && counter == (realPageCount -1)) {collectFirstPart(searchResponse, pageData, mode, pageSize);}// 更新滾動查詢idscrollId = searchResponse.getScrollId();counter++;}// 收集最后一次響應結果中的數據collectPageData(searchResponse, pageData, mode, pageSize, esWindowCount);// 滾動查詢結束時,清除滾動ClearScrollRequest clearScrollRequest = new ClearScrollRequest();clearScrollRequest.addScrollId(scrollId);try {client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("Elasticsearch clear scroll info error.", e);}return pageData;}/*** @param searchResponse* @param mode* @param size* @return*/public void collectFirstPart(SearchResponse searchResponse, List<Map<String, Object>> firstPartData, int mode, int size) {int firstPartCount = size - mode;// 只截取響應結果中的 結尾 size - mode 部分的內容SearchHits hits = searchResponse.getHits();SearchHit[] dataList = hits.getHits();int from = dataList.length - firstPartCount;for (int i = from; i < dataList.length; i++) {firstPartData.add(dataList[i].getSourceAsMap());}log.info("Mode less than size, first part data is here {} .", firstPartCount);}/*** 滾動到最后一組數據中包含目標頁的數據,從中摘出來* @param searchResponse* @param mode* @param size* @param esWindowCount* @return*/public void collectPageData(SearchResponse searchResponse, List<Map<String, Object>> pageData, int mode, int size, int esWindowCount) {SearchHits hits = searchResponse.getHits();SearchHit[] dataList = hits.getHits();int from = 0;int length = dataList.length;if (mode == 0) { // 剛好在萬條結尾// 不夠一頁if (length < esWindowCount) {int realSize = size - (esWindowCount - length);from = (length - realSize ) >= 0 ? (length - realSize ) : 0;} else {// 總長夠一頁from = length == esWindowCount ? (length - size) : 0;}} else if (length < mode){ // 最后一頁且總長不足 sizeint realSize = size - (mode - length);from = (length - realSize) >= 0 ? (length - realSize) : 0;} else if (mode > size){ // 中間部分from = (mode - size) >= 0 ? (mode -size) : 0;} else { // mode < size ,說明是一頁數據的下半部分from = 0;size = mode;log.info("Page data is across two request ,this response has {} .", mode);}// 收集目標數據for (int i = from; i< from + size && i < length; i++) {pageData.add(dataList[i].getSourceAsMap());}}/*** 解析 ES 響應結果為數據集合* @param response* @return*/public static List<Map<String, Object>> parseResponseToListData(SearchResponse response){List<Map<String, Object>> listData = new ArrayList<>();if (response == null) {return listData;}// 遍歷響應結果SearchHits hits = response.getHits();SearchHit[] hitArray = hits.getHits();listData = new ArrayList<>(hitArray.length);for (SearchHit hit : hitArray) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();listData.add(sourceAsMap);}// 返回結果return listData;}
}
啟示錄
滾動查詢時優化了 size 用一萬,相比用頁面的分頁參數 pageSize ,可以解決數據量過大時,直接從頁面點擊最后一頁導致頁面卡死長時間無響應的問題。
頁面分頁參數最大不過 100,當總數量幾百萬、pageSize=10,分頁跳轉查詢后面某頁 如 3000 時,ES 的滾動請求次數 是 3000 次,而優化后滾動請求 3次,第三次中的一萬條數據的最后10條即本頁的數據。
話說回來,ES 數據量過大時,用分頁查詢靠后的數據時,也沒多大的價值了,列表寬泛條件查詢結果過大時,誰看得過來呢?