JobManager作為actor,
case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),jobGraph.getSessionTimeout)submitJob(jobGraph, jobInfo)
?
submitJob,做3件事、
根據JobGraph生成ExecuteGraph
恢復狀態CheckpointedState,或者Savepoint
提交ExecuteGraph給Scheduler進行調度
?
ExecuteGraph
executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, //currentJobs.get(jobGraph.getJobID),對應的jobid是否有現存的ExecuteGraph jobGraph,flinkConfiguration, //配置futureExecutor, //Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")),根據cpu核數創建的線程池ioExecutor, // Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-io-", "-thread-"))userCodeLoader, //libraryCacheManager.getClassLoader(jobGraph.getJobID),從jar中加載checkpointRecoveryFactory, //用于createCheckpointStore和createCheckpointIDCounter,standalone和zk兩種 Time.of(timeout.length, timeout.unit),restartStrategy, //job重啟策略 jobMetrics,numSlots, //scheduler.getTotalNumberOfSlots(),注冊到該JM上的instances一共有多少slotslog.logger)
?
ExecutionGraphBuilder.buildGraph
?
New
// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(futureExecutor,ioExecutor,jobId,jobName,jobGraph.getJobConfiguration(),jobGraph.getSerializedExecutionConfig(),timeout,restartStrategy,jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths(),classLoader,metrics);} catch (IOException e) {throw new JobException("Could not create the execution graph.", e);}
?
attachJobGraph,生成Graph的節點和邊
// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);
?
ExecutionGraph.attachJobGraph
for (JobVertex jobVertex : topologiallySorted) {// create the execution job vertex and attach it to the graphExecutionJobVertex ejv =new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);ejv.connectToPredecessors(this.intermediateResults);//All job vertices that are part of this graph, ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasksExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);for (IntermediateResult res : ejv.getProducedDataSets()) {//All intermediate results that are part of this graph//ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResultsIntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);}//All vertices, in the order in which they were created//List<ExecutionJobVertex> verticesInCreationOrderthis.verticesInCreationOrder.add(ejv);}
將JobVertex封裝成ExecutionJobVertex
會依次創建出ExecutionJobVertex,ExecutionVertex, Execution; IntermediateResult, IntermediateResultPartition
?
ExecutionJobVertex
public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long createTimestamp) throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}//并發度,決定有多少ExecutionVertexint vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;//產生ExecutionVertexthis.taskVertices = new ExecutionVertex[numTaskVertices];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = jobVertex.getSlotSharingGroup();this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()]; //創建用于存放中間結果的IntermediateResultfor (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] = new IntermediateResult( //將JobGraph中的IntermediateDataSet封裝成IntermediateResult result.getId(),this,numTaskVertices,result.getResultType());}// create all task verticesfor (int i = 0; i < numTaskVertices; i++) {ExecutionVertex vertex = new ExecutionVertex( //初始化ExecutionVertexthis, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);this.taskVertices[i] = vertex; // }finishedSubtasks = new boolean[parallelism];}
?
ExecutionVertex
public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex, //第幾個task,task和ExecutionVertex對應 IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.taskNameWithSubtask = String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); //用于記錄IntermediateResultPartitionfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); //初始化IntermediateResultPartition result.setPartition(subTaskIndex, irp);resultPartitions.put(irp.getPartitionId(), irp);}this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);this.currentExecution = new Execution( //創建Execution getExecutionGraph().getFutureExecutor(),this,0,createTimestamp,timeout);this.timeout = timeout;}
?
connectToPredecessors,把節點用edge相連
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {List<JobEdge> inputs = jobVertex.getInputs(); //JobVertex的輸入for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num); //對應的JobEdge IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); //取出JobEdge的source IntermediateResultthis.inputs.add(ires); //List<IntermediateResult> inputs;int consumerIndex = ires.registerConsumer(); //將當前vertex作為consumer注冊到IntermediateResult的每個IntermediateResultPartitionfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex); //為每個ExecutionVertex建立到具體IntermediateResultPartition的ExecutionEdge}}}
connectSource
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {final DistributionPattern pattern = edge.getDistributionPattern(); // 獲取edge的distribution patternfinal IntermediateResultPartition[] sourcePartitions = source.getPartitions(); // 獲取souce的partitionsExecutionEdge[] edges;switch (pattern) {case POINTWISE:edges = connectPointwise(sourcePartitions, inputNumber);break;case ALL_TO_ALL:edges = connectAllToAll(sourcePartitions, inputNumber);break;default:throw new RuntimeException("Unrecognized distribution pattern.");}this.inputEdges[inputNumber] = edges;// add the consumers to the source// for now (until the receiver initiated handshake is in place), we need to register the // edges as the execution graphfor (ExecutionEdge ee : edges) {ee.getSource().addConsumer(ee, consumerNumber);} }
看下connectPointwise
private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {final int numSources = sourcePartitions.length; //Partitions的個數final int parallelism = getTotalNumberOfParallelSubtasks(); //subTasks的并發度// simple case same number of sources as targetsif (numSources == parallelism) { //如果1比1,簡單return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; //取sourcePartitions中和subTaskIndex對應的那個partition }else if (numSources < parallelism) { //如果subTasks的并發度高,那一個source會對應于多個taskint sourcePartition;// check if the pattern is regular or irregular// we use int arithmetics for regular, and floating point with rounding for irregularif (parallelism % numSources == 0) { //整除的情況下,比如2個source,6個task,那么第3個task應該對應于第一個source// same number of targets per sourceint factor = parallelism / numSources;sourcePartition = subTaskIndex / factor;}else {// different number of targets per sourcefloat factor = ((float) parallelism) / numSources;sourcePartition = (int) (subTaskIndex / factor);}return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };}else {//...... } }
?
配置checkpoint
executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(),snapshotSettings.getCheckpointTimeout(),snapshotSettings.getMinPauseBetweenCheckpoints(),snapshotSettings.getMaxConcurrentCheckpoints(),snapshotSettings.getExternalizedCheckpointSettings(),triggerVertices,ackVertices,confirmVertices,checkpointIdCounter,completedCheckpoints,externalizedCheckpointsDir,checkpointStatsTracker);
啟動CheckpointCoordinator,參考專門討論Checkpoint機制的blog
?
Scheduler
下面看看如何將生成好的ExecutionGraph進行調度
future { //異步try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //放入submittedJobGraphs} catch {// }}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知用戶提交成功if (leaderElectionService.hasLeadership) {executionGraph.scheduleForExecution(scheduler) //調度 }} catch {// }}(context.dispatcher)}
executionGraph.scheduleForExecution
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {switch (scheduleMode) {case LAZY_FROM_SOURCES:// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { //ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks,這個tasks的命名不科學if (ejv.getJobVertex().isInputVertex()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}}break;case EAGER:for (ExecutionJobVertex ejv : getVerticesTopologically()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}break;default:throw new JobException("Schedule mode is invalid.");}}
對于流默認是EAGER,
public JobGraph createJobGraph() {jobGraph = new JobGraph(streamGraph.getJobName());// make sure that all vertices start immediatelyjobGraph.setScheduleMode(ScheduleMode.EAGER);
?
ExecutionJobVertex.scheduleAll
public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { ExecutionVertex[] vertices = this.taskVertices;// kick off the tasksfor (ExecutionVertex ev : vertices) {ev.scheduleForExecution(slotProvider, queued);}}
ExecutionVertex.scheduleForExecution
//The current or latest execution attempt of this vertex's task public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {return this.currentExecution.scheduleForExecution(slotProvider, queued); }
Execution.scheduleForExecution
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();if (transitionState(CREATED, SCHEDULED)) {ScheduledUnit toSchedule = locationConstraint == null ? //生成ScheduledUnitnew ScheduledUnit(this, sharingGroup) :new ScheduledUnit(this, sharingGroup, locationConstraint);final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //從slotProvider獲取slotfinal Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {@Overridepublic Void apply(SimpleSlot simpleSlot, Throwable throwable) {if (simpleSlot != null) { //slot分配成功try {deployToSlot(simpleSlot); //deploy} catch (Throwable t) {try {simpleSlot.releaseSlot();} finally {markFailed(t);}}}else {markFailed(throwable);}return null;}});}
slotProvider,參考Flink - Scheduler
?
deployToSlot,核心就是往TaskManager提交submitTask請求
public void deployToSlot(final SimpleSlot slot) throws JobException {ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) { //狀態遷移成Deployingthrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) { //設置slot和ExecuteVertex關系throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}this.assignedResource = slot;final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( //創建DeploymentDescriptor attemptId,slot,taskState,attemptNumber);// register this execution at the execution graph, to receive call backsvertex.getExecutionGraph().registerExecution(this);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); //向TaskMananger的Actor發送請求 submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {......}}