背景
本文來從底層代碼的實現來分析一下Starrocks怎么獲取統計信息,這些統計信息在后續基于CBO的代價計算的時候有著重要的作用
本文基于Starrrocks 3.3.5
結論
Starrocks的統計信息的收集是通過周期性的運行一系列的SQL(以分區為維度,如果不是分區表,其實也有個默認的分區,也就是單個分區),之后插入到_statistics_.column_statistics
表中,并會存儲在 GlobalStateMgr.CachedStatisticStorage,后續所有的統計信息的獲取也是通過這里獲取的
分析
直接到StatisticAutoCollector類
public StatisticAutoCollector() {super("AutoStatistic", Config.statistic_collect_interval_sec * 1000);}
這里默認的調度周期是 statistic_collect_interval_sec (也就是5分鐘)
@Overrideprotected void runAfterCatalogReady() {// update intervalif (getInterval() != Config.statistic_collect_interval_sec * 1000) {setInterval(Config.statistic_collect_interval_sec * 1000);}if (!Config.enable_statistic_collect || FeConstants.runningUnitTest) {return;}if (!checkoutAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {return;}// check statistic table stateif (!StatisticUtils.checkStatisticTableStateNormal()) {return;}initDefaultJob();runJobs();}
- 強制 調度周期設置為5分鐘
- 進行 調度時間的檢查,默認是一天,也可以設置開始和結束時間,
statistic_auto_analyze_start_time
,statistic_auto_analyze_end_time
- 還可以設置enable_statistic_collect為false,如果不想進行統計信息的采集的話
- initDefaultJob 初始化統計信息采集任務,默認是
enable_collect_full_statistic
為 true,也就是全量采集 - runJobs 運行采集任務,也就是最核心的階段
protected List<StatisticsCollectJob> runJobs() {...Set<Long> analyzeTableSet = Sets.newHashSet();for (NativeAnalyzeJob nativeAnalyzeJob : allNativeAnalyzeJobs) {List<StatisticsCollectJob> jobs = nativeAnalyzeJob.instantiateJobs();result.addAll(jobs);ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext();statsConnectCtx.setThreadLocalInfo();nativeAnalyzeJob.run(statsConnectCtx, STATISTIC_EXECUTOR, jobs);for (StatisticsCollectJob job : jobs) {if (job.isAnalyzeTable()) {analyzeTableSet.add(job.getTable().getId());}}}LOG.info("auto collect statistic on analyze job[{}] end", analyzeJobIds);if (Config.enable_collect_full_statistic) {LOG.info("auto collect full statistic on all databases start");List<StatisticsCollectJob> allJobs =StatisticsCollectJobFactory.buildStatisticsCollectJob(createDefaultJobAnalyzeAll());for (StatisticsCollectJob statsJob : allJobs) {// user-created analyze job has a higher priorityif (statsJob.isAnalyzeTable() && analyzeTableSet.contains(statsJob.getTable().getId())) {continue;}result.add(statsJob);AnalyzeStatus analyzeStatus = new NativeAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(),statsJob.getDb().getId(), statsJob.getTable().getId(), statsJob.getColumnNames(),statsJob.getType(), statsJob.getScheduleType(), statsJob.getProperties(), LocalDateTime.now());analyzeStatus.setStatus(StatsConstants.ScheduleStatus.FAILED);GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus);ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext();statsConnectCtx.setThreadLocalInfo();STATISTIC_EXECUTOR.collectStatistics(statsConnectCtx, statsJob, analyzeStatus, true);}LOG.info("auto collect full statistic on all databases end");}...return result;}
- nativeAnalyzeJob.instantiateJobs 構造統計信息
這里調用了StatisticsCollectJobFactory.buildStatisticsCollectJob
方法,
首先這里有個配置statistic_exclude_pattern
可以排除不需要進行統計的表(以db.table格式)
其次是會根據當前所謂的健康度(也就是分區更新的時間比例)和statistic_auto_collect_ratio
大小比較,如果健康度小于該值,則調用createFullStatsJob
方法,創建全量統計任務。
這里 主要用buildStatisticsCollectJob
構造一個FullStatisticsCollectJob
類型的job - nativeAnalyzeJob.run 運行統計信息任務
這個方法會調用StatisticExecutor.collectStatistics
,最終會調用FullStatisticsCollectJob.collect
方法int parallelism = Math.max(1, context.getSessionVariable().getStatisticCollectParallelism());List<List<String>> collectSQLList = buildCollectSQLList(parallelism);long totalCollectSQL = collectSQLList.size();...Exception lastFailure = null;for (List<String> sqlUnion : collectSQLList) {if (sqlUnion.size() < parallelism) {context.getSessionVariable().setPipelineDop(parallelism / sqlUnion.size());} else {context.getSessionVariable().setPipelineDop(1);}String sql = Joiner.on(" UNION ALL ").join(sqlUnion);try {collectStatisticSync(sql, context);} catch (Exception e) {...}finishedSQLNum++;analyzeStatus.setProgress(finishedSQLNum * 100 / totalCollectSQL);GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus);}...flushInsertStatisticsData(context, true);
- 首先設置一個 運行sql的并行度
statistic_collect_parallel
默認是1,這個意思就是這個統計sql會分多少次運行 - buildCollectSQLList 這里會構建具體運行統計信息的SQL,這會具體的分區級別
- collectStatisticSync 這里會執行具體的SQL
SQL如下:SELECT cast(4 as INT) ,cast($partitionId as BIGINT) ,'$columnNameStr' ,cast(COUNT(1) as BIGINT) ,cast($dataSize as BIGINT) ,hex(hll_serialize(IFNULL(hll_raw(column_key), hll_empty()))),cast( (COUNT(1) - COUNT(column_key)) as BIGINT) ,MAX(column_key) ,MIN(column_key) FROM (select $quoteColumnName as column_key from `$dbName`.`$tableName` partition `$partitionName`) tt
- flushInsertStatisticsData 這里會把執行的結果數據存儲到
_statistics_.column_statistics
- 首先設置一個 運行sql的并行度
analyzeMgr.refreshBasicStatisticsCache
這個主要的作用是 更新CachedStatisticStorage 里的統計信息
主要通過 refreshTableStatistic 和 getColumnStatistics
這兩個方法分別會調用 TableStatsCacheLoader 和 ColumnBasicStatsCacheLoader 去執行SQL從而獲取對應的統計信息,調用的SQL如下:select cast(3 as INT), partition_id, any_value(row_count)FROM column_statisticsWHERE table_id = $tableId and partition_id = $partitionIdGROUP BY partition_id;
SELECT cast( 1 as INT), $updateTime, db_id, table_id, column_name,sum(row_count), cast(sum(data_size) as bigint), hll_union_agg(ndv), sum(null_count), cast(max(cast(max as $type)) as string), cast(min(cast(min as $type)) as string)FROM column_statisticsWHERE table_id = $table_id and column_name in (xxx,xxx,xxx)GROUP BY db_id, table_id, column_name;
- nativeAnalyzeJob.instantiateJobs 構造統計信息
其他
- StatisticAutoCollector 是通過周期性的任務來進行統計信息的收集
- 手動的收集
ANALYZE TABLE
如命令:ANALYZE [FULL|SAMPLE] TABLE tbl_name (col_name [,col_name]) [WITH SYNC | ASYNC MODE] PROPERTIES (property [,property])
- 手動觸發自動收集
CREATE ANALYZE
如命令:CREATE ANALYZE [FULL|SAMPLE] TABLE tbl_name (col_name [,col_name]) PROPERTIES (property [,property])
以上都會觸發統計信息的收集。