? ? ? ? 《Flink-1.19.0源碼詳解8-ExecutionGraph生成-前篇》前篇已從Flink集群端調度開始解析ExecutionGraph生成的源碼,解析了ExecutionGraph的ExecutionJobVertex節點、ExecutionVertex節點、IntermediateResult數據集、IntermediateResultPartition數據集分區與封裝Task執行信息的Execution的創建完整過程。本篇接著前篇,繼續解析ExecutionGraph生成的后續源碼。
ExecutionGraph生成的完整源碼:
?
1.連接ExecutionJobVertex節點和前置的IntermediateResult數據集
? ? ? ? Flink在新版本(1.13后)取消了ExecutionEdge,用EdgeManager管理的vertexConsumedPartitions(Map<ExecutionVertexID, List<ConsumedPartitionGroup>>)和partitionConsumers(Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>>)來保存IntermediateResultPartition數據集分區與ExecutionVertex節點的連接關系。
? ? ? ? 回到DefaultExecutionGraph的initializeJobVertex()方法,在完成ExecutionJobVertex的initialize()方法為每個ExecutionJobVertex節點創建其對應的ExecutionVertex節點和IntermediateResultPartition數據集分區后,DefaultExecutionGraph會繼續調用ExecutionJobVertex的connectToPredecessors()方法,連接ExecutionJobVertex節點(包括其每個并行度上的ExecutionVertex節點)和前置的IntermediateResult數據集(包括其每個并行度上的IntermediateResultPartition數據集分區)。
?源碼圖解:
DefaultExecutionGraph.initializeJobVertex()方法源碼:
public void initializeJobVertex(ExecutionJobVertex ejv,long createTimestamp,Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos)throws JobException {//...//初始化每個ExecutionJobVertexejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()));//連接ExecutionJobVertex和前置的IntermediateResultejv.connectToPredecessors(this.intermediateResults);//...
}
? ? ? ? ExecutionJobVertex的connectToPredecessors()方法找到每個ExecutionJobVertex節點對應的JobVertex節點,從JobVertex節點中獲取每個輸入的JobEdge邊和其連接前置的IntermediateDataSet數據集,繼續調用EdgeManagerBuildUtil的connectVertexToResult()方法連接單個ExecutionJobVertex節點與IntermediateResult數據集。
ExecutionJobVertex.connectToPredecessors()方法源碼:
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)throws JobException {checkState(isInitialized());//從ExecutionJobVertex對應的JobVertex獲取所有入邊List<JobEdge> inputs = jobVertex.getInputs();//遍歷本節點所有入邊for (int num = 0; num < inputs.size(); num++) {//找出每個邊的IntermediateResultJobEdge edge = inputs.get(num);//...IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());//...//連接ExecutionJobVertex和前置的IntermediateResultEdgeManagerBuildUtil.connectVertexToResult(this, ires);}
}
? ? ? ??EdgeManagerBuildUtil的connectVertexToResult()方法獲取了ExecutionJobVertex的DistributionPattern連接方式和由VertexInputInfoComputationUtils的computeVertexInputInfos()方法生成的JobVertexInputInfo輸入描述,并根據連接方式是POINTWISE還是ALL_TO_ALL,進行ExecutionJobVertex節點與IntermediateResult數據集的連接。
EdgeManagerBuildUtil.connectVertexToResult()方法源碼:
static void connectVertexToResult(ExecutionJobVertex vertex, IntermediateResult intermediateResult) {//獲取ExecutionJobVertex與IntermediateResult的連接方式(點對點、All對ALL)final DistributionPattern distributionPattern =intermediateResult.getConsumingDistributionPattern();//獲取輸入描述final JobVertexInputInfo jobVertexInputInfo =vertex.getGraph().getJobVertexInputInfo(vertex.getJobVertexId(), intermediateResult.getId());//根據不同連接方式(點對點、All對ALL)構建連接(相當于ExecutionEdge)switch (distributionPattern) {case POINTWISE:connectPointwise(vertex, intermediateResult, jobVertexInputInfo);break;case ALL_TO_ALL:connectAllToAll(vertex, intermediateResult, jobVertexInputInfo);break;default:throw new IllegalArgumentException("Unrecognized distribution pattern.");}
}
對于POINTWISE:
? ? ? ??EdgeManagerBuildUtil會根據JobVertexInputInfo為每個ExecutionVertex節點分配需連接的IntermediateResultPartition數據集分區,并調用connectInternal()方法具體創建連接。
ExecutionVertex.connectPointwise()方法源碼:
private static void connectPointwise(ExecutionJobVertex jobVertex,IntermediateResult result,JobVertexInputInfo jobVertexInputInfo) {Map<IndexRange, List<Integer>> consumersByPartition = new LinkedHashMap<>();//根據JobVertexInputInfo分配的為每個ExecutionVertex節點連接IntermediateResultPartition數據集。for (ExecutionVertexInputInfo executionVertexInputInfo :jobVertexInputInfo.getExecutionVertexInputInfos()) {int consumerIndex = executionVertexInputInfo.getSubtaskIndex();IndexRange range = executionVertexInputInfo.getPartitionIndexRange();consumersByPartition.compute(range,(ignore, consumers) -> {if (consumers == null) {consumers = new ArrayList<>();}consumers.add(consumerIndex);return consumers;});}//調用connectInternal()方法具體創建連接consumersByPartition.forEach((range, subtasks) -> {List<ExecutionVertex> taskVertices = new ArrayList<>();List<IntermediateResultPartition> partitions = new ArrayList<>();for (int index : subtasks) {taskVertices.add(jobVertex.getTaskVertices()[index]);}for (int i = range.getStartIndex(); i <= range.getEndIndex(); ++i) {partitions.add(result.getPartitions()[i]);}connectInternal(taskVertices,partitions,result.getResultType(),jobVertex.getGraph().getEdgeManager());});
}
對于ALL_TO_ALL:
? ? ? ? 對ExecutionVertex節點和IntermediateResultPartition數據集分區做全連接。
ExecutionJobVertex.connectToPredecessors()方法源碼:
private static void connectAllToAll(ExecutionJobVertex jobVertex,IntermediateResult result,JobVertexInputInfo jobVertexInputInfo) {// check the vertex input info is legal//ExecutionVertex對IntermediateResultPartition做全連接jobVertexInputInfo.getExecutionVertexInputInfos().forEach(executionVertexInputInfo -> {IndexRange partitionRange =executionVertexInputInfo.getPartitionIndexRange();checkArgument(partitionRange.getStartIndex() == 0);checkArgument(partitionRange.getEndIndex()== (result.getNumberOfAssignedPartitions() - 1));});connectInternal(Arrays.asList(jobVertex.getTaskVertices()),Arrays.asList(result.getPartitions()),result.getResultType(),jobVertex.getGraph().getEdgeManager());
}
?
2.調用ExecutionVertex.connectInternal()進行具體連接
? ? ? ? 無論是POINTWISE還是ALL_TO_ALL,在為每個ExecutionVertex節點分配號上游IntermediateResultPartition數據集分區后,都是通過調用ExecutionVertex.connectInternal()方法進行具體連接的。
? ? ? ? 在ExecutionVertex的connectInternal()方法中,首先創建consumedPartitionGroup封裝ExecutionVertex節點需要連接的IntermediateResultPartition數據集分區,并向EdgeManager的vertexConsumedPartitions(Map<ExecutionVertexID, List<ConsumedPartitionGroup>> )添加ExecutionVertex節點和對應的ConsumedPartitionGroup。
? ? ? ?然后繼續創建ConsumerVertexGroup封裝上游IntermediateResult數據集需連接的ExecutionVertex節點,并向EdgeManager的partitionConsumers (Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>>)添加IntermediateResultPartition數據集分區和其對應的ConsumerVertexGroup。
源碼圖解:
ExecutionVertex.connectInternal()方法源碼:
private static void connectInternal(List<ExecutionVertex> taskVertices,List<IntermediateResultPartition> partitions,ResultPartitionType resultPartitionType,EdgeManager edgeManager) {checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());//創建consumedPartitionGroup封裝ExecutionVertex需要連接的IntermediateResultPartitionConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(), partitions, resultPartitionType, edgeManager);//向ExecutionJobVertex中所有ExecutionVertex添加ConsumedPartitionGroupfor (ExecutionVertex ev : taskVertices) {ev.addConsumedPartitionGroup(consumedPartitionGroup);}//創建ConsumerVertexGroup封裝上游IntermediateResult需連接的ExecutionVertexList<ExecutionVertexID> consumerVertices =taskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromMultipleVertices(consumerVertices, resultPartitionType);//向IntermediateResult中所有IntermediateResultPartition添加ConsumerVertexGroup for (IntermediateResultPartition partition : partitions) {partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);
}
? ? ? ? 因為在Flink1.13后取消了ExecutionEdge,ExecutionVertex與IntermediateResultPartition的連接關系由EdgeManager管理。
? ? ? ? 對于ExecutionJobVertex節點中所有ExecutionVertex節點,添加需要連接的IntermediateResultPartition數據集分區的ConsumedPartitionGroup,是調用ExecutionVertex節點的addConsumedPartitionGroup()方法,再進一步通過EdgeManager的connectVertexWithConsumedPartitionGroup()方法實現的。
ExecutionVertex.addConsumedPartitionGroup()方法源碼:
public void addConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitions) {//向EdgeManager添加ConsumedPartitionGroupgetExecutionGraphAccessor().getEdgeManager().connectVertexWithConsumedPartitionGroup(executionVertexId, consumedPartitions);
}
? ? ? ? 最終EdgeManager從partitionConsumers中讀出ExecutionVertex節點對應IntermediateResultPartition數據集分區的List<ConsumerVertexGroup>,向EdgeManager的vertexConsumedPartitions(Map<ExecutionVertexID, List<ConsumedPartitionGroup>> )添加ConsumerVertexGroup。
EdgeManager.connectVertexWithConsumedPartitionGroup()方法源碼:
public void connectVertexWithConsumedPartitionGroup(ExecutionVertexID executionVertexId, ConsumedPartitionGroup consumedPartitionGroup) {checkNotNull(consumedPartitionGroup);//從partitionConsumers讀出本IntermediateResultPartition對應的List<ConsumerVertexGroup>,添加ConsumerVertexGroupfinal List<ConsumedPartitionGroup> consumedPartitions =getConsumedPartitionGroupsForVertexInternal(executionVertexId);consumedPartitions.add(consumedPartitionGroup);
}
? ? ? ? 同上,對于IntermediateResult數據集中所有IntermediateResultPartition數據集分區,添加要連接的ExecutionVertex節點的ConsumerVertexGroup,是調用IntermediateResultPartition的addConsumers()方法,再進一步通過EdgeManager的connectPartitionWithConsumerVertexGroup()方法實現的。
IntermediateResultPartition.addConsumers()方法源碼:
public void addConsumers(ConsumerVertexGroup consumers) {//向EdgeManager添加ConsumerVertexGroupgetEdgeManager().connectPartitionWithConsumerVertexGroup(partitionId, consumers);
}
? ? ? ? 最終EdgeManager從partitionConsumers中讀出IntermediateResultPartition數據集分區對應ExecutionVertex節點的List<ConsumerVertexGroup>,向EdgeManager的partitionConsumers (Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>>)添加ConsumerVertexGroup。
EdgeManager.connectPartitionWithConsumerVertexGroup()方法源碼:
public void connectPartitionWithConsumerVertexGroup(IntermediateResultPartitionID resultPartitionId,ConsumerVertexGroup consumerVertexGroup) {checkNotNull(consumerVertexGroup);//從vertexConsumedPartitions讀出本IntermediateResultPartition對應的List<ConsumerVertexGroup>,添加ConsumerVertexGroupList<ConsumerVertexGroup> groups =getConsumerVertexGroupsForPartitionInternal(resultPartitionId);groups.add(consumerVertexGroup);
}
? ? ? ? 最終遍歷完所有ExecutionJobVertex節點,完成EdgeManager管理的vertexConsumedPartitions(Map<ExecutionVertexID, List<ConsumedPartitionGroup>>)和partitionConsumers(Map<IntermediateResultPartitionID, List<ConsumerVertexGroup>>)的創建,就完整保存了每個ExecutionJobVertex節點所有并行度上的ExecutionVertex節點與每個IntermediateResult數據集對應IntermediateResultPartition數據集分區的連接關系。
?
3.SchedulingPipelinedRegion劃分
? ? ? ??SchedulingPipelinedRegion是Flink獨立申請資源進行調度的單位,會把一系列通過流水線(pipelined)方式連接的算子組合起來,一起進行資源申請與調度。
? ? ? ? 當完成ExecutionJobVertex節點創建與初始化后,回到DefaultExecutionGraph的attachJobGraph()方法 ,繼續進行SchedulingPipelinedRegion的劃分。
源碼圖解:
?DefaultExecutionGraph.attachJobGraph()方法源碼:
public void attachJobGraph(List<JobVertex> verticesToAttach, JobManagerJobMetricGroup jobManagerJobMetricGroup)throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",verticesToAttach.size(),tasks.size(),intermediateResults.size());//生成ExecutionJobVertexattachJobVertices(verticesToAttach, jobManagerJobMetricGroup);if (!isDynamic) {//初始化所有ExecutionJobVertexinitializeJobVertices(verticesToAttach);}//將ExecutionGraph的拓撲劃分Region// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopology = DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy =partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
? ? ? ? 進入DefaultExecutionTopology.fromExecutionGraph()方法中,DefaultExecutionTopology創建了LogicalPipelinedRegion,并將LogicalPipelinedRegion轉換成SchedulingPipelinedRegion。
DefaultExecutionGraph.attachJobGraph()方法源碼:
public static DefaultExecutionTopology fromExecutionGraph(DefaultExecutionGraph executionGraph) {checkNotNull(executionGraph, "execution graph can not be null");//獲取EdgeManagerEdgeManager edgeManager = executionGraph.getEdgeManager();//創建LogicalPipelinedRegionDefaultExecutionTopology schedulingTopology =new DefaultExecutionTopology(() ->IterableUtils.toStream(executionGraph.getAllExecutionVertices()).map(ExecutionVertex::getID).collect(Collectors.toList()),edgeManager,//創建LogicalPipelinedRegioncomputeLogicalPipelinedRegionsByJobVertexId(executionGraph));//將LogicalPipelinedRegion轉換成SchedulingPipelinedRegionschedulingTopology.notifyExecutionGraphUpdated(executionGraph,IterableUtils.toStream(executionGraph.getVerticesTopologically()).filter(ExecutionJobVertex::isInitialized).collect(Collectors.toList()));return schedulingTopology;
}
? ? ? ? 進入DefaultExecutionTopology的computeLogicalPipelinedRegionsByJobVertexId()方法繼續分析LogicalPipelinedRegion的創建。首先DefaultExecutionTopology先對JobVertex節點進行排序,再根據JobVertex節點生成LogicalPipelinedRegion,最后再將把每個LogicalVertex關聯用其對于的LogicalPipelinedRegion。
DefaultExecutionTopology.computeLogicalPipelinedRegionsByJobVertexId()方法源碼:
private static Map<JobVertexID, DefaultLogicalPipelinedRegion>computeLogicalPipelinedRegionsByJobVertexId(final ExecutionGraph executionGraph) {//獲取拓撲排序后的JobVertex列表List<JobVertex> topologicallySortedJobVertices =IterableUtils.toStream(executionGraph.getVerticesTopologically()).map(ExecutionJobVertex::getJobVertex).collect(Collectors.toList());//通過JobVertex生成LogicalPipelinedRegionIterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions =DefaultLogicalTopology.fromTopologicallySortedJobVertices(topologicallySortedJobVertices).getAllPipelinedRegions();//把每個LogicalVertex關聯其LogicalPipelinedRegionMap<JobVertexID, DefaultLogicalPipelinedRegion> logicalPipelinedRegionsByJobVertexId =new HashMap<>();for (DefaultLogicalPipelinedRegion logicalPipelinedRegion : logicalPipelinedRegions) {for (LogicalVertex vertex : logicalPipelinedRegion.getVertices()) {logicalPipelinedRegionsByJobVertexId.put(vertex.getId(), logicalPipelinedRegion);}}return logicalPipelinedRegionsByJobVertexId;
}
? ? ? ? 通過JobVertex節點生成LogicalPipelinedRegion是依次調用DefaultLogicalTopology的getAllPipelinedRegions()方法、LogicalPipelinedRegionComputeUtil的computePipelinedRegions()方法,最終進入PipelinedRegionComputeUtil的buildRawRegions()方法。
DefaultLogicalTopology.getAllPipelinedRegions()方法源碼:
public Iterable<DefaultLogicalPipelinedRegion> getAllPipelinedRegions() {//繼續調用LogicalPipelinedRegionComputeUtil.computePipelinedRegions()final Set<Set<LogicalVertex>> regionsRaw =LogicalPipelinedRegionComputeUtil.computePipelinedRegions(verticesSorted);final Set<DefaultLogicalPipelinedRegion> regions = new HashSet<>();for (Set<LogicalVertex> regionVertices : regionsRaw) {regions.add(new DefaultLogicalPipelinedRegion(regionVertices));}return regions;
}
LogicalPipelinedRegionComputeUtil.computePipelinedRegions()方法源碼:
public static Set<Set<LogicalVertex>> computePipelinedRegions(final Iterable<? extends LogicalVertex> topologicallySortedVertices) {//繼續調用final Map<LogicalVertex, Set<LogicalVertex>> vertexToRegion =PipelinedRegionComputeUtil.buildRawRegions(topologicallySortedVertices,LogicalPipelinedRegionComputeUtil::getMustBePipelinedConsumedResults);// Since LogicalTopology is a DAG, there is no need to do cycle detection nor to merge// regions on cycles.return uniqueVertexGroups(vertexToRegion);
}
? ? ? 在PipelinedRegionComputeUtil的buildRawRegions()方法中,首先遍歷所有JobVertex節點,調用LogicalPipelinedRegionComputeUtil的getMustBePipelinedConsumedResults()方法判斷上下游節點是否連接關系是可以合并的,若可合并,且上下游節點不在一個Region,則直接合并。
PipelinedRegionComputeUtil.buildRawRegions()方法源碼:
static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>Map<V, Set<V>> buildRawRegions(final Iterable<? extends V> topologicallySortedVertices,final Function<V, Iterable<R>> getMustBePipelinedConsumedResults) {final Map<V, Set<V>> vertexToRegion = new IdentityHashMap<>();//遍歷所有JobVertex節點// iterate all the vertices which are topologically sortedfor (V vertex : topologicallySortedVertices) {//把節點加入當前RegionSet<V> currentRegion = new HashSet<>();currentRegion.add(vertex);vertexToRegion.put(vertex, currentRegion);//調用LogicalPipelinedRegionComputeUtil的getMustBePipelinedConsumedResults()方法判斷上下游節點是否連接關系是可以合并的// Each vertex connected through not mustBePipelined consumingConstraint is considered// as a// single region.for (R consumedResult : getMustBePipelinedConsumedResults.apply(vertex)) {final V producerVertex = consumedResult.getProducer();final Set<V> producerRegion = vertexToRegion.get(producerVertex);if (producerRegion == null) {throw new IllegalStateException("Producer task "+ producerVertex.getId()+ " failover region is null"+ " while calculating failover region for the consumer task "+ vertex.getId()+ ". This should be a failover region building bug.");}//若可合并,且上下游節點不在一個Region,則直接合并// check if it is the same as the producer region, if so skip the merge// this check can significantly reduce compute complexity in All-to-All// PIPELINED edge caseif (currentRegion != producerRegion) {currentRegion =VertexGroupComputeUtil.mergeVertexGroups(currentRegion, producerRegion, vertexToRegion);}}}return vertexToRegion;
}
? ? ? ? 其中判斷是否可以合并的方法為LogicalPipelinedRegionComputeUtil的getMustBePipelinedConsumedResults()方法,判斷是根據本JobVertex節點與上游IntermediateDataSet數據集的連接關系的ResultPartitionType,來判斷是否可以Pipeline連接。
private static Iterable<LogicalResult> getMustBePipelinedConsumedResults(LogicalVertex vertex) {List<LogicalResult> mustBePipelinedConsumedResults = new ArrayList<>();//獲取本JobVertex與所有上游IntermediateDataSet數據集的連接關系for (LogicalResult consumedResult : vertex.getConsumedResults()) {//根據本JobVertex與上游IntermediateDataSet數據集的連接關系的ResultPartitionType判斷是否可以Pipeline連接if (consumedResult.getResultType().mustBePipelinedConsumed()) {mustBePipelinedConsumedResults.add(consumedResult);}}return mustBePipelinedConsumedResults;
}
? ? ? ? 當把ExecutionGraph劃分好LogicalPipelinedRegionComputeUtil并轉換為SchedulingPipelinedRegion后,JobMaster將依次為每個SchedulingPipelinedRegion向Flink的ResourceManager申請cpu內存資源,進行計算資源調度。
4.結語
? ? ? ? 至此,ExecutionGraph生成的完整源碼已解析完畢,本文解析了ExecutionGraph的ExecutionJobVertex節點、ExecutionVertex節點、IntermediateResult數據集、IntermediateResultPartition數據集分區與封裝Task執行信息的Execution的創建;解析了ExecutionJobVertex節點與前置的IntermediateResult數據集的連接,及SchedulingPipelinedRegion的劃分。本專欄的下篇博文將繼續從Flink JobMaster 依次為每個SchedulingPipelinedRegion進行計算資源調度分配,來繼續解析Flink的完整源碼。
?
?
?