1.StreamGraph圖:
? ? ? ??StreamGraph是Flink流處理作業的第一個計算調度流圖,它是從用戶編寫的 DataStream API程序轉換而來的邏輯圖。StreamGraph由StreamNode與StreamEdge組成,StreamNode為記錄數據處理的節點,StreamEdge為連接兩個StreamNode的邊。
StreamGraph圖解:
2.StreamNode節點:
? ? ? ??StreamNode是StreamGraph中的基本構建單元,代表了數據處理邏輯中的一個獨立操作或階段。每個StreamNode對應DataStream API中的一個轉換操作(Transformation)。
StreamNode圖解:
? ? ? ? StreamNode主要封裝了算子操作邏輯StreamOperator和其SimpleOperatorFactory,算子的入邊與出邊集合、還記錄SlotSharingGroup與ColocationGroup的等配置信息。
StreamNode完整源碼:
public class StreamNode {private final int id;//并行度private int parallelism;/*** Maximum parallelism for this stream node. The maximum parallelism is the upper limit for* dynamic scaling and the number of key groups used for partitioned state.*/private int maxParallelism;private ResourceSpec minResources = ResourceSpec.DEFAULT;private ResourceSpec preferredResources = ResourceSpec.DEFAULT;private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights =new HashMap<>();private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();private long bufferTimeout;private final String operatorName;private String operatorDescription;//slotSharingGroup與coLocationGroup配置private @Nullable String slotSharingGroup;private @Nullable String coLocationGroup;private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];private TypeSerializer<?> stateKeySerializer;//封裝算子的StreamOperatorFactoryprivate StreamOperatorFactory<?> operatorFactory;private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];private TypeSerializer<?> typeSerializerOut;//算子的入邊與出邊private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();private final Class<? extends TaskInvokable> jobVertexClass;private InputFormat<?, ?> inputFormat;private OutputFormat<?> outputFormat;private String transformationUID;private String userHash;private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();private @Nullable IntermediateDataSetID consumeClusterDatasetId;private boolean supportsConcurrentExecutionAttempts = true;private boolean parallelismConfigured = false;@VisibleForTestingpublic StreamNode(Integer id,@Nullable String slotSharingGroup,@Nullable String coLocationGroup,StreamOperator<?> operator,String operatorName,Class<? extends TaskInvokable> jobVertexClass) {this(id,slotSharingGroup,coLocationGroup,SimpleOperatorFactory.of(operator),operatorName,jobVertexClass);}public StreamNode(Integer id,@Nullable String slotSharingGroup,@Nullable String coLocationGroup,StreamOperatorFactory<?> operatorFactory,String operatorName,Class<? extends TaskInvokable> jobVertexClass) {this.id = id;this.operatorName = operatorName;this.operatorDescription = operatorName;this.operatorFactory = operatorFactory;this.jobVertexClass = jobVertexClass;this.slotSharingGroup = slotSharingGroup;this.coLocationGroup = coLocationGroup;}public void addInEdge(StreamEdge inEdge) {checkState(inEdges.stream().noneMatch(inEdge::equals),"Adding not unique edge = %s to existing inEdges = %s",inEdge,inEdges);if (inEdge.getTargetId() != getId()) {throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");} else {inEdges.add(inEdge);}}public void addOutEdge(StreamEdge outEdge) {checkState(outEdges.stream().noneMatch(outEdge::equals),"Adding not unique edge = %s to existing outEdges = %s",outEdge,outEdges);if (outEdge.getSourceId() != getId()) {throw new IllegalArgumentException("Source id doesn't match the StreamNode id");} else {outEdges.add(outEdge);}}public List<StreamEdge> getOutEdges() {return outEdges;}public List<StreamEdge> getInEdges() {return inEdges;}public List<Integer> getOutEdgeIndices() {List<Integer> outEdgeIndices = new ArrayList<Integer>();for (StreamEdge edge : outEdges) {outEdgeIndices.add(edge.getTargetId());}return outEdgeIndices;}public List<Integer> getInEdgeIndices() {List<Integer> inEdgeIndices = new ArrayList<Integer>();for (StreamEdge edge : inEdges) {inEdgeIndices.add(edge.getSourceId());}return inEdgeIndices;}public int getId() {return id;}public int getParallelism() {return parallelism;}public void setParallelism(Integer parallelism) {setParallelism(parallelism, true);}void setParallelism(Integer parallelism, boolean parallelismConfigured) {this.parallelism = parallelism;this.parallelismConfigured =parallelismConfigured && parallelism != ExecutionConfig.PARALLELISM_DEFAULT;}/*** Get the maximum parallelism for this stream node.** @return Maximum parallelism*/int getMaxParallelism() {return maxParallelism;}/*** Set the maximum parallelism for this stream node.** @param maxParallelism Maximum parallelism to be set*/void setMaxParallelism(int maxParallelism) {this.maxParallelism = maxParallelism;}public ResourceSpec getMinResources() {return minResources;}public ResourceSpec getPreferredResources() {return preferredResources;}public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {this.minResources = minResources;this.preferredResources = preferredResources;}public void setManagedMemoryUseCaseWeights(Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights,Set<ManagedMemoryUseCase> slotScopeUseCases) {managedMemoryOperatorScopeUseCaseWeights.putAll(operatorScopeUseCaseWeights);managedMemorySlotScopeUseCases.addAll(slotScopeUseCases);}public Map<ManagedMemoryUseCase, Integer> getManagedMemoryOperatorScopeUseCaseWeights() {return Collections.unmodifiableMap(managedMemoryOperatorScopeUseCaseWeights);}public Set<ManagedMemoryUseCase> getManagedMemorySlotScopeUseCases() {return Collections.unmodifiableSet(managedMemorySlotScopeUseCases);}public long getBufferTimeout() {return bufferTimeout;}public void setBufferTimeout(Long bufferTimeout) {this.bufferTimeout = bufferTimeout;}@VisibleForTestingpublic StreamOperator<?> getOperator() {return (StreamOperator<?>) ((SimpleOperatorFactory) operatorFactory).getOperator();}public StreamOperatorFactory<?> getOperatorFactory() {return operatorFactory;}public String getOperatorName() {return operatorName;}public String getOperatorDescription() {return operatorDescription;}public void setOperatorDescription(String operatorDescription) {this.operatorDescription = operatorDescription;}public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {checkArgument(typeSerializersIn.length > 0);// Unfortunately code above assumes type serializer can be null, while users of for example// getTypeSerializersIn would be confused by returning an array size of two with all// elements set to null...this.typeSerializersIn =Arrays.stream(typeSerializersIn).filter(typeSerializer -> typeSerializer != null).toArray(TypeSerializer<?>[]::new);}public TypeSerializer<?>[] getTypeSerializersIn() {return typeSerializersIn;}public TypeSerializer<?> getTypeSerializerOut() {return typeSerializerOut;}public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {this.typeSerializerOut = typeSerializerOut;}public Class<? extends TaskInvokable> getJobVertexClass() {return jobVertexClass;}public InputFormat<?, ?> getInputFormat() {return inputFormat;}public void setInputFormat(InputFormat<?, ?> inputFormat) {this.inputFormat = inputFormat;}public OutputFormat<?> getOutputFormat() {return outputFormat;}public void setOutputFormat(OutputFormat<?> outputFormat) {this.outputFormat = outputFormat;}public void setSlotSharingGroup(@Nullable String slotSharingGroup) {this.slotSharingGroup = slotSharingGroup;}@Nullablepublic String getSlotSharingGroup() {return slotSharingGroup;}public void setCoLocationGroup(@Nullable String coLocationGroup) {this.coLocationGroup = coLocationGroup;}public @Nullable String getCoLocationGroup() {return coLocationGroup;}public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)|| (slotSharingGroup != null&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));}@Overridepublic String toString() {return operatorName + "-" + id;}public KeySelector<?, ?>[] getStatePartitioners() {return statePartitioners;}public void setStatePartitioners(KeySelector<?, ?>... statePartitioners) {checkArgument(statePartitioners.length > 0);this.statePartitioners = statePartitioners;}public TypeSerializer<?> getStateKeySerializer() {return stateKeySerializer;}public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {this.stateKeySerializer = stateKeySerializer;}public String getTransformationUID() {return transformationUID;}void setTransformationUID(String transformationId) {this.transformationUID = transformationId;}public String getUserHash() {return userHash;}public void setUserHash(String userHash) {this.userHash = userHash;}public void addInputRequirement(int inputIndex, StreamConfig.InputRequirement inputRequirement) {inputRequirements.put(inputIndex, inputRequirement);}public Map<Integer, StreamConfig.InputRequirement> getInputRequirements() {return inputRequirements;}public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(String operatorName, OperatorID operatorID) {if (operatorFactory instanceof CoordinatedOperatorFactory) {return Optional.of(((CoordinatedOperatorFactory) operatorFactory).getCoordinatorProvider(operatorName, operatorID));} else {return Optional.empty();}}boolean isParallelismConfigured() {return parallelismConfigured;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}StreamNode that = (StreamNode) o;return id == that.id;}@Overridepublic int hashCode() {return id;}@Nullablepublic IntermediateDataSetID getConsumeClusterDatasetId() {return consumeClusterDatasetId;}public void setConsumeClusterDatasetId(@Nullable IntermediateDataSetID consumeClusterDatasetId) {this.consumeClusterDatasetId = consumeClusterDatasetId;}public boolean isSupportsConcurrentExecutionAttempts() {return supportsConcurrentExecutionAttempts;}public void setSupportsConcurrentExecutionAttempts(boolean supportsConcurrentExecutionAttempts) {this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;}
}
3.StreamEdge邊:
? ? ? ? StreamEdge是StreamGraph中連接各個StreamNode的邊,它定義了數據在StreamNode節點之間的流動方式和分區策略。
StreamEdge圖解:
? ? ? ? StreamEdge主要封裝了StreamEdge連接的前后StreamNode的id,并記錄確定分區選擇的Partitioner。
StreamEdge源碼:
public class StreamEdge implements Serializable {private static final long serialVersionUID = 1L;private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;private final String edgeId;//封裝前后StreamNode節點的idprivate final int sourceId;private final int targetId;/*** Note that this field doesn't have to be unique among all {@link StreamEdge}s. It's enough if* this field ensures that all logical instances of {@link StreamEdge} are unique, and {@link* #hashCode()} are different and {@link #equals(Object)} returns false, for every possible pair* of {@link StreamEdge}. Especially among two different {@link StreamEdge}s that are connecting* the same pair of nodes.*/private final int uniqueId;/** The type number of the input for co-tasks. */private final int typeNumber;/** The side-output tag (if any) of this {@link StreamEdge}. */private final OutputTag outputTag;//分區器 StreamPartitioner/** The {@link StreamPartitioner} on this {@link StreamEdge}. */private StreamPartitioner<?> outputPartitioner;/** The name of the operator in the source vertex. */private final String sourceOperatorName;/** The name of the operator in the target vertex. */private final String targetOperatorName;private final StreamExchangeMode exchangeMode;private long bufferTimeout;private boolean supportsUnalignedCheckpoints = true;private final IntermediateDataSetID intermediateDatasetIdToProduce;public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,StreamPartitioner<?> outputPartitioner,OutputTag outputTag) {this(sourceVertex,targetVertex,typeNumber,ALWAYS_FLUSH_BUFFER_TIMEOUT,outputPartitioner,outputTag,StreamExchangeMode.UNDEFINED,0,null);}public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,StreamPartitioner<?> outputPartitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,int uniqueId,IntermediateDataSetID intermediateDatasetId) {this(sourceVertex,targetVertex,typeNumber,sourceVertex.getBufferTimeout(),outputPartitioner,outputTag,exchangeMode,uniqueId,intermediateDatasetId);}public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,long bufferTimeout,StreamPartitioner<?> outputPartitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,int uniqueId,IntermediateDataSetID intermediateDatasetId) {this.sourceId = sourceVertex.getId();this.targetId = targetVertex.getId();this.uniqueId = uniqueId;this.typeNumber = typeNumber;this.bufferTimeout = bufferTimeout;this.outputPartitioner = outputPartitioner;this.outputTag = outputTag;this.sourceOperatorName = sourceVertex.getOperatorName();this.targetOperatorName = targetVertex.getOperatorName();this.exchangeMode = checkNotNull(exchangeMode);this.intermediateDatasetIdToProduce = intermediateDatasetId;this.edgeId =sourceVertex+ "_"+ targetVertex+ "_"+ typeNumber+ "_"+ outputPartitioner+ "_"+ uniqueId;}public int getSourceId() {return sourceId;}public int getTargetId() {return targetId;}public int getTypeNumber() {return typeNumber;}public OutputTag getOutputTag() {return this.outputTag;}public StreamPartitioner<?> getPartitioner() {return outputPartitioner;}public StreamExchangeMode getExchangeMode() {return exchangeMode;}public void setPartitioner(StreamPartitioner<?> partitioner) {this.outputPartitioner = partitioner;}public void setBufferTimeout(long bufferTimeout) {checkArgument(bufferTimeout >= -1);this.bufferTimeout = bufferTimeout;}public long getBufferTimeout() {return bufferTimeout;}public void setSupportsUnalignedCheckpoints(boolean supportsUnalignedCheckpoints) {this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;}public boolean supportsUnalignedCheckpoints() {return supportsUnalignedCheckpoints;}@Overridepublic int hashCode() {return Objects.hash(edgeId, outputTag);}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}StreamEdge that = (StreamEdge) o;return Objects.equals(edgeId, that.edgeId) && Objects.equals(outputTag, that.outputTag);}@Overridepublic String toString() {return "("+ (sourceOperatorName + "-" + sourceId)+ " -> "+ (targetOperatorName + "-" + targetId)+ ", typeNumber="+ typeNumber+ ", outputPartitioner="+ outputPartitioner+ ", exchangeMode="+ exchangeMode+ ", bufferTimeout="+ bufferTimeout+ ", outputTag="+ outputTag+ ", uniqueId="+ uniqueId+ ')';}public IntermediateDataSetID getIntermediateDatasetIdToProduce() {return intermediateDatasetIdToProduce;}
}
? ? ? ? 本文是對StreamGraph的解釋與補充,完整StreamGraph創建源碼見《Flink-1.19.0源碼詳解4-StreamGraph生成》。