0、要點
??Flink的分區列不會存數據,也就是兩個列有一個分區列,則文件只會存另一個列的數據
1、CreateTable
??根據SQL的執行流程,進入TableEnvironmentImpl.executeInternal,createTable分支
} else if (operation instanceof CreateTableOperation) {CreateTableOperation createTableOperation = (CreateTableOperation) operation;if (createTableOperation.isTemporary()) {catalogManager.createTemporaryTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());} else {catalogManager.createTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;
1.1 GenericInMemoryCatalog
??之后調用catalog.createTable,以GenericInMemoryCatalog來說,其中有幾個分區的Map,但實際這里并不存儲分區信息,可以看到,這里創建的是空表
} else {tables.put(tablePath, table.copy());if (isPartitionedTable(tablePath)) {partitions.put(tablePath, new LinkedHashMap<>());partitionStats.put(tablePath, new LinkedHashMap<>());partitionColumnStats.put(tablePath, new LinkedHashMap<>());}
}
1.2 Catalog中的分區Map
??partitionStats和partitionColumnStats是放一些統計信息的,partitions目前看是單獨的分區操作時會用到,如createPartition(對應SQL語句ALTER TABLE ADD PARTITION),并且這一塊存儲的只有Alter語句里修改的partition信息,主要還是一些描述信息,并不是主要用于記錄分區,信息來源在SqlToOperationConverter.convertAlterTable當中
for (int i = 0; i < addPartitions.getPartSpecs().size(); i++) {specs.add(new CatalogPartitionSpec(addPartitions.getPartitionKVs(i)));Map<String, String> props =OperationConverterUtils.extractProperties(addPartitions.getPartProps().get(i));partitions.add(new CatalogPartitionImpl(props, null));
}
return new AddPartitionsOperation(tableIdentifier, addPartitions.ifNotExists(), specs, partitions);
1.3 AbstractCatalogTable
??真正有用的信息是在table表的信息當中,核心在tables.put(tablePath, table.copy());這一句當中,table.copy()存儲了表信息,最終調用到實現類CatalogTableImpl,其父類的構造函數有分區信息。表中存儲了相應的分區信息,SQL最終操作的都是表,所以都是從這取的分區信息,注意這是一個StringList
public AbstractCatalogTable(TableSchema tableSchema,List<String> partitionKeys,Map<String, String> options,String comment) {this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");this.options = checkNotNull(options, "options cannot be null");
2、DescribeTable
??同樣的在ableEnvironmentImpl.executeInternal,describe分支
} else if (operation instanceof DescribeTableOperation) {DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;Optional<ContextResolvedTable> result =catalogManager.getTable(describeTableOperation.getSqlIdentifier());if (result.isPresent()) {return buildDescribeResult(result.get().getResolvedSchema());} else {throw new ValidationException(String.format("Tables or views with the identifier '%s' doesn't exist",describeTableOperation.getSqlIdentifier().asSummaryString()));}
2.1 獲取及解析表
??首先這邊getTable方法,獲取Table,由CatalogManager做入口,正常的表都是走的getPermanentTable
public Optional<ContextResolvedTable> getTable(ObjectIdentifier objectIdentifier) {CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);if (temporaryTable != null) {final ResolvedCatalogBaseTable<?> resolvedTable =resolveCatalogBaseTable(temporaryTable);return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));} else {return getPermanentTable(objectIdentifier);}
}
??這里對Table進行了多層封裝,最底層的還是來自GenericInMemoryCatalog當中,前面CreateTable的時候有一個tables的Map,這里就是從這個里面拿當時存儲的Table類
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {checkNotNull(tablePath);if (!tableExists(tablePath)) {throw new TableNotExistException(getName(), tablePath);} else {return tables.get(tablePath).copy();}
}
??返回到最上層前會對這個Table進行解析封裝,在CatalogManager.resolveCatalogTable進行解析,這里有一個重要的點就是對分區的校驗,必須對應表的列
// Validate partition keys are included in physical columns
final List<String> physicalColumns =resolvedSchema.getColumns().stream().filter(Column::isPhysical).map(Column::getName).collect(Collectors.toList());
table.getPartitionKeys().forEach(partitionKey -> {if (!physicalColumns.contains(partitionKey)) {throw new ValidationException(String.format("Invalid partition key '%s'. A partition key must "+ "reference a physical column in the schema. "+ "Available columns are: %s",partitionKey, physicalColumns));}});
2.2 返回信息
??最終構建返回信息,這里有一個關注點就是接口只傳入了表的Schema信息,沒有傳入分區信息
return buildDescribeResult(result.get().getResolvedSchema());
??Describe返回控制臺是一個表結構的形式,所以這里會構建一個表的格式
private TableResultInternal buildDescribeResult(ResolvedSchema schema) {Object[][] rows = buildTableColumns(schema);return buildResult(generateTableColumnsNames(), generateTableColumnsDataTypes(), rows);
}
??buildTableColumns是把Shema信息構建成行數據,因為Describe輸出的表是有固定字段的,所以這里要對應固定字段填值
??具體的列名在generateTableColumnsNames當中指定,這個也是最后返回信息里的表頭
private String[] generateTableColumnsNames() {return new String[] {"name", "type", "null", "key", "extras", "watermark"};
}
??generateTableColumnsDataTypes設置上面幾個列的字段類型
private DataType[] generateTableColumnsDataTypes() {return new DataType[] {DataTypes.STRING(),DataTypes.STRING(),DataTypes.BOOLEAN(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
}
??最終把Shema構建的行信息插入表中就構成了返回信息,是一個TableResultImpl的類型
private TableResultInternal buildResult(String[] headers, DataType[] types, Object[][] rows) {ResolvedSchema schema = ResolvedSchema.physical(headers, types);ResultProvider provider =new StaticResultProvider(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()));return TableResultImpl.builder().resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.physical(headers, types)).resultProvider(provider).setPrintStyle(PrintStyle.tableauWithDataInferredColumnWidths(schema,provider.getRowDataStringConverter(),Integer.MAX_VALUE,true,false)).build();
}
??整體的輸出形態如下
±--------±-------±-----±----±-------±----------+
| name | type | null | key | extras | watermark |
±--------±-------±-----±----±-------±----------+
| user | BIGINT | TRUE | | | |
| product | STRING | TRUE | | | |
±--------±-------±-----±----±-------±----------+
3、Insert
3.1 封裝SinkModifyOperation
??首先是封裝SinkModifyOperation的時候,其中有表,在SqlToOperationConverter.convertSqlInsert當中,getTableOrError最終調用的跟前面describe獲取表一樣,從Catalog拿表并且對分區進行校驗,這些步驟都不少,所以SinkModifyOperation里封裝的contextResolvedTable是帶分區信息的
ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(identifier);return new SinkModifyOperation(contextResolvedTable,query,insert.getStaticPartitionKVs(),insert.isOverwrite(),dynamicOptions);
3.2 轉TableSink
??在SQL轉換流程的PlannerBase.translateToRel當中,走catalogSink分支,在getTableSink接口調用時,走到TableFactoryUtil.findAndCreateTableSink
public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) {try {return TableFactoryService.find(TableSinkFactory.class, context.getTable().toProperties()).createTableSink(context);} catch (Throwable t) {throw new TableException("findAndCreateTableSink failed.", t);}
}
??這里在toProperties接口當中,會把分區傳入成為一項Property
public Map<String, String> toProperties() {DescriptorProperties descriptor = new DescriptorProperties(false);descriptor.putTableSchema(SCHEMA, getSchema());descriptor.putPartitionKeys(getPartitionKeys());Map<String, String> properties = new HashMap<>(getOptions());descriptor.putProperties(properties);return descriptor.asMap();
}
??在下一步的createTableSink接口調用的時候,也會調用到toProperties,但目前這個好像只有CSV的兩個實現類,所以TableSink的具體過程待研究
??getTableSink最后調用的是createDynamicTableSink,這里面封裝了table,就是跟前面一樣的從catalog拿的表,所以這個表是包含分區信息的
val tableSink = FactoryUtil.createDynamicTableSink(factory,objectIdentifier,tableToFind,Collections.emptyMap(),getTableConfig,getFlinkContext.getClassLoader,isTemporary)
3.3 分區分配
??從文件數據源追蹤下去,有一個分區分配器的類PartitionComputer,在fileconnector當中,有四個實現類,分別是file和hive的
??Sink在SQL轉換的時候調用translateToPlanInternal,有構建SinkRuntimeProvider的流程,在CommonExecSink.createSinkTransformation
final SinkRuntimeProvider runtimeProvider =tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
??在FileSystemTableSink當中,最終構建了分區計算器
private DataStreamSink<RowData> createBatchSink(DataStream<RowData> inputStream, Context sinkContext, final int parallelism) {FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();builder.setPartitionComputer(partitionComputer());private RowDataPartitionComputer partitionComputer() {return new RowDataPartitionComputer(defaultPartName,DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),partitionKeys.toArray(new String[0]));
}
??在這里面,會根據分區列名生成分區index用作后續使用,也就是必須跟列有對應關系,不過計算分區的時候還是用分區名的List
this.partitionIndexes =Arrays.stream(partitionColumns).mapToInt(columnList::indexOf).toArray();
this.partitionTypes =Arrays.stream(partitionIndexes).mapToObj(columnTypeList::get).toArray(LogicalType[]::new);
this.partitionFieldGetters =IntStream.range(0, partitionTypes.length).mapToObj(i ->RowData.createFieldGetter(partitionTypes[i], partitionIndexes[i])).toArray(RowData.FieldGetter[]::new);List<Integer> partitionIndexList =Arrays.stream(partitionIndexes).boxed().collect(Collectors.toList());
??generatePartValues會計算數據的分區,基于前面RowDataPartitionComputer初始化時基于分區構建的各種對象進行計算
public LinkedHashMap<String, String> generatePartValues(RowData in) {LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();for (int i = 0; i < partitionIndexes.length; i++) {Object field = partitionFieldGetters[i].getFieldOrNull(in);String partitionValue = field != null ? field.toString() : null;if (partitionValue == null || "".equals(partitionValue)) {partitionValue = defaultPartValue;}partSpec.put(partitionColumns[i], partitionValue);}return partSpec;
}
??FileSystemTableSink最下層寫的實現類是GroupedPartitionWriter和DynamicPartitionWriter,GroupedPartitionWriter的write如下
public void write(T in) throws Exception {String partition = generatePartitionPath(computer.generatePartValues(in));if (!partition.equals(currentPartition)) {if (currentFormat != null) {currentFormat.close();}currentFormat = context.createNewOutputFormat(manager.createPartitionDir(partition));currentPartition = partition;}currentFormat.writeRecord(computer.projectColumnsToWrite(in));
}
3.4 分區列不寫數據
??RowDataPartitionComputer.projectColumnsToWrite計算需要寫數據的列,也就是說只要這幾列會寫數據,核心就是去除分區列
for (int i = 0; i < nonPartitionIndexes.length; i++) {reuseRow.setField(i, nonPartitionFieldGetters[i].getFieldOrNull(in));
}
reuseRow.setRowKind(in.getRowKind());
return reuseRow;
??nonPartitionIndexes在構建RowDataPartitionComputer,可以看到,就是遍歷列名,然后去除分區列
this.nonPartitionIndexes =IntStream.range(0, columnNames.length).filter(c -> !partitionIndexList.contains(c)).toArray();
3.5 分區目錄
??在PartitionPathUtils.generatePartitionPath當中定義了分區目錄的形式,以{列名=分區值的形式},因為目前key就是列名
for (Map.Entry<String, String> e : partitionSpec.entrySet()) {if (i > 0) {suffixBuf.append(Path.SEPARATOR);}suffixBuf.append(escapePathName(e.getKey()));suffixBuf.append('=');suffixBuf.append(escapePathName(e.getValue()));i++;
}
4、Select
??參照Sink的流程,在CommonExecTableSourceScan.translateToPlanInternal有構建Source的流程
final ScanTableSource tableSource =tableSourceSpec.getScanTableSource(planner.getFlinkContext());
ScanTableSource.ScanRuntimeProvider provider =tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
4.1 getScanTableSource
??在getScanTableSource接口當中,由createDynamicTableSource創建TableSource
tableSource =FactoryUtil.createDynamicTableSource(factory,contextResolvedTable.getIdentifier(),contextResolvedTable.getResolvedTable(),loadOptionsFromCatalogTable(contextResolvedTable, flinkContext),flinkContext.getTableConfig(),flinkContext.getClassLoader(),contextResolvedTable.isTemporary());
??以FileSystemTableFactory為例,最終創建FileSystemTableSource,其中傳入的參數就有分區信息,分區信息同樣是來自Catalog的表
public DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);validate(helper);return new FileSystemTableSource(context.getObjectIdentifier(),context.getPhysicalRowDataType(),context.getCatalogTable().getPartitionKeys(),helper.getOptions(),discoverDecodingFormat(context, BulkReaderFormatFactory.class),discoverDecodingFormat(context, DeserializationFormatFactory.class));
}
4.2 getScanRuntimeProvider
??之后進行provider創建,是基于上面的FileSystemTableSource進行的調用,最終到它的getScanRuntimeProvider接口當中,內部有很多跟分區相關的操作
??首先是無分區就返回一個默認簡單的類
// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
}
??后續又是用表的列字段對分區進行過濾
// Filter out partition columns not in producedDataType
final List<String> partitionKeysToExtract =DataType.getFieldNames(producedDataType).stream().filter(this.partitionKeys::contains).collect(Collectors.toList());
??后續這個過濾過的分區會被傳入format,format是Flink最后執行讀寫的類(但是這里有些傳了有些沒傳,需要看一下差別)
??format雖然不一樣,但是最終都是調用的return createSourceProvider(format);,在createSourceProvider當中有獲取分區的操作(不是分區key,是分區值)
private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) {final FileSource.FileSourceBuilder<RowData> fileSourceBuilder =FileSource.forBulkFileFormat(bulkFormat, paths());
??這里的paths()就是基于remainingPartitions獲取要讀取的分區目錄
private Path[] paths() {if (partitionKeys.isEmpty()) {return new Path[] {path};} else {return getOrFetchPartitions().stream().map(FileSystemTableSource.this::toFullLinkedPartSpec).map(PartitionPathUtils::generatePartitionPath).map(n -> new Path(path, n)).toArray(Path[]::new);}
}
4.3 remainingPartitions
??這個是分區下推使用的一個東西,當支持分區下推時,就會把這個值設置為分區,在PartitionPushDownSpec的apply當中
??這個不是存儲的分區列,而是實際的分區值
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {if (tableSource instanceof SupportsPartitionPushDown) {((SupportsPartitionPushDown) tableSource).applyPartitions(partitions);} else {throw new TableException(String.format("%s does not support SupportsPartitionPushDown.",tableSource.getClass().getName()));}
}
??這里applyPartitions就是設置remainingPartitions的
public void applyPartitions(List<Map<String, String>> remainingPartitions) {this.remainingPartitions = remainingPartitions;
}
??還有其他的地方會進行設置,在getOrFetchPartitions當中
private List<Map<String, String>> getOrFetchPartitions() {if (remainingPartitions == null) {remainingPartitions = listPartitions().get();}return remainingPartitions;
}
??這里listPartitions就是去數據目錄掃描分區
public Optional<List<Map<String, String>>> listPartitions() {try {return Optional.of(PartitionPathUtils.searchPartSpecAndPaths(path.getFileSystem(), path, partitionKeys.size()).stream().map(tuple2 -> tuple2.f0).map(spec -> {LinkedHashMap<String, String> ret = new LinkedHashMap<>();spec.forEach((k, v) ->ret.put(k,defaultPartName.equals(v)? null: v));return ret;}).collect(Collectors.toList()));} catch (Exception e) {throw new TableException("Fetch partitions fail.", e);}
}
5、分區下推
??能力由PartitionPushDownSpec決定,規則是PushPartitionIntoTableSourceScanRule
??分區信息同樣的從catalog中的table上獲取
List<String> partitionFieldNames =tableSourceTable.contextResolvedTable().<ResolvedCatalogTable>getResolvedTable().getPartitionKeys();
??之后從過濾條件中提取分區相關的條件
// extract partition predicates
RelBuilder relBuilder = call.builder();
RexBuilder rexBuilder = relBuilder.getRexBuilder();
Tuple2<Seq<RexNode>, Seq<RexNode>> allPredicates =RexNodeExtractor.extractPartitionPredicateList(filter.getCondition(),FlinkRelOptUtil.getMaxCnfNodeCount(scan),inputFieldNames.toArray(new String[0]),rexBuilder,partitionFieldNames.toArray(new String[0]));
RexNode partitionPredicate =RexUtil.composeConjunction(rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1));
??之后獲取分區列類型,這里又是從分區列獲取的
// build pruner
LogicalType[] partitionFieldTypes =partitionFieldNames.stream().map(name -> {int index = inputFieldNames.indexOf(name);if (index < 0) {throw new TableException(String.format("Partitioned key '%s' isn't found in input columns. "+ "Validator should have checked that.",name));}return inputFieldTypes.getFieldList().get(index).getType();}).map(FlinkTypeFactory::toLogicalType).toArray(LogicalType[]::new);
??后續會進行分區過濾,這里就會生成上面的remainingPartitions,獲取首先是調用TableSource的listPartitions,如果能直接獲取到就用它的值,獲取不到會進行一個獲取邏輯處理readPartitionFromCatalogAndPrune,這里要注意分區條件已經被轉換成了RexNode形態,最終過濾還是基于catalog,目前看只有HiveCatalog有處理邏輯
return catalog.listPartitionsByFilter(tablePath, partitionFilters).stream().map(CatalogPartitionSpec::getPartitionSpec).collect(Collectors.toList());
??之后,會基于上面的分區過濾,構建新的TableSourceTable,替換其中的tableStats
FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(tableSourceTable.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTableSourceTable =tableSourceTable.copy(dynamicTableSource,newStatistic,new SourceAbilitySpec[] {partitionPushDownSpec});LogicalTableScan newScan =LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints());