Flink – JobManager.submitJob

JobManager作為actor,

  case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),jobGraph.getSessionTimeout)submitJob(jobGraph, jobInfo)

?

submitJob,做3件事、

根據JobGraph生成ExecuteGraph
恢復狀態CheckpointedState,或者Savepoint
提交ExecuteGraph給Scheduler進行調度

?

ExecuteGraph

executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, //currentJobs.get(jobGraph.getJobID),對應的jobid是否有現存的ExecuteGraph
  jobGraph,flinkConfiguration, //配置futureExecutor, //Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")),根據cpu核數創建的線程池ioExecutor, // Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-io-", "-thread-"))userCodeLoader,  //libraryCacheManager.getClassLoader(jobGraph.getJobID),從jar中加載checkpointRecoveryFactory, //用于createCheckpointStore和createCheckpointIDCounter,standalone和zk兩種
  Time.of(timeout.length, timeout.unit),restartStrategy, //job重啟策略
  jobMetrics,numSlots, //scheduler.getTotalNumberOfSlots(),注冊到該JM上的instances一共有多少slotslog.logger)

?

ExecutionGraphBuilder.buildGraph

?

New

        // create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(futureExecutor,ioExecutor,jobId,jobName,jobGraph.getJobConfiguration(),jobGraph.getSerializedExecutionConfig(),timeout,restartStrategy,jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths(),classLoader,metrics);} catch (IOException e) {throw new JobException("Could not create the execution graph.", e);}

?

attachJobGraph,生成Graph的節點和邊

        // topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);

?

ExecutionGraph.attachJobGraph

       for (JobVertex jobVertex : topologiallySorted) {// create the execution job vertex and attach it to the graphExecutionJobVertex ejv =new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);ejv.connectToPredecessors(this.intermediateResults);//All job vertices that are part of this graph, ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasksExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);for (IntermediateResult res : ejv.getProducedDataSets()) {//All intermediate results that are part of this graph//ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResultsIntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);}//All vertices, in the order in which they were created//List<ExecutionJobVertex> verticesInCreationOrderthis.verticesInCreationOrder.add(ejv);}

將JobVertex封裝成ExecutionJobVertex

會依次創建出ExecutionJobVertex,ExecutionVertex, Execution; IntermediateResult, IntermediateResultPartition

?

ExecutionJobVertex

public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long createTimestamp) throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}//并發度,決定有多少ExecutionVertexint vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;//產生ExecutionVertexthis.taskVertices = new ExecutionVertex[numTaskVertices];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = jobVertex.getSlotSharingGroup();this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()]; //創建用于存放中間結果的IntermediateResultfor (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] = new IntermediateResult( //將JobGraph中的IntermediateDataSet封裝成IntermediateResult
                    result.getId(),this,numTaskVertices,result.getResultType());}// create all task verticesfor (int i = 0; i < numTaskVertices; i++) {ExecutionVertex vertex = new ExecutionVertex( //初始化ExecutionVertexthis, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);this.taskVertices[i] = vertex; //
        }finishedSubtasks = new boolean[parallelism];}

?

ExecutionVertex

      public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex, //第幾個task,task和ExecutionVertex對應
            IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.taskNameWithSubtask = String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); //用于記錄IntermediateResultPartitionfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); //初始化IntermediateResultPartition
            result.setPartition(subTaskIndex, irp);resultPartitions.put(irp.getPartitionId(), irp);}this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);this.currentExecution = new Execution( //創建Execution
            getExecutionGraph().getFutureExecutor(),this,0,createTimestamp,timeout);this.timeout = timeout;}

?

connectToPredecessors,把節點用edge相連

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {List<JobEdge> inputs = jobVertex.getInputs(); //JobVertex的輸入for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num); //對應的JobEdge
            IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); //取出JobEdge的source IntermediateResultthis.inputs.add(ires); //List<IntermediateResult> inputs;int consumerIndex = ires.registerConsumer(); //將當前vertex作為consumer注冊到IntermediateResult的每個IntermediateResultPartitionfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex); //為每個ExecutionVertex建立到具體IntermediateResultPartition的ExecutionEdge}}}

connectSource

public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {final DistributionPattern pattern = edge.getDistributionPattern(); // 獲取edge的distribution patternfinal IntermediateResultPartition[] sourcePartitions = source.getPartitions(); // 獲取souce的partitionsExecutionEdge[] edges;switch (pattern) {case POINTWISE:edges = connectPointwise(sourcePartitions, inputNumber);break;case ALL_TO_ALL:edges = connectAllToAll(sourcePartitions, inputNumber);break;default:throw new RuntimeException("Unrecognized distribution pattern.");}this.inputEdges[inputNumber] = edges;// add the consumers to the source// for now (until the receiver initiated handshake is in place), we need to register the // edges as the execution graphfor (ExecutionEdge ee : edges) {ee.getSource().addConsumer(ee, consumerNumber);}
}

看下connectPointwise

private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {final int numSources = sourcePartitions.length;  //Partitions的個數final int parallelism = getTotalNumberOfParallelSubtasks(); //subTasks的并發度// simple case same number of sources as targetsif (numSources == parallelism) { //如果1比1,簡單return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; //取sourcePartitions中和subTaskIndex對應的那個partition
    }else if (numSources < parallelism) { //如果subTasks的并發度高,那一個source會對應于多個taskint sourcePartition;// check if the pattern is regular or irregular// we use int arithmetics for regular, and floating point with rounding for irregularif (parallelism % numSources == 0) { //整除的情況下,比如2個source,6個task,那么第3個task應該對應于第一個source// same number of targets per sourceint factor = parallelism / numSources;sourcePartition = subTaskIndex / factor;}else {// different number of targets per sourcefloat factor = ((float) parallelism) / numSources;sourcePartition = (int) (subTaskIndex / factor);}return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };}else {//......
    }
}

?

配置checkpoint

                executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(),snapshotSettings.getCheckpointTimeout(),snapshotSettings.getMinPauseBetweenCheckpoints(),snapshotSettings.getMaxConcurrentCheckpoints(),snapshotSettings.getExternalizedCheckpointSettings(),triggerVertices,ackVertices,confirmVertices,checkpointIdCounter,completedCheckpoints,externalizedCheckpointsDir,checkpointStatsTracker);

啟動CheckpointCoordinator,參考專門討論Checkpoint機制的blog

?

Scheduler

下面看看如何將生成好的ExecutionGraph進行調度

     future { //異步try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //放入submittedJobGraphs} catch {//
            }}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知用戶提交成功if (leaderElectionService.hasLeadership) {executionGraph.scheduleForExecution(scheduler) //調度
          }} catch {//
        }}(context.dispatcher)}

executionGraph.scheduleForExecution

    public void scheduleForExecution(SlotProvider slotProvider) throws JobException {switch (scheduleMode) {case LAZY_FROM_SOURCES:// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { //ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks,這個tasks的命名不科學if (ejv.getJobVertex().isInputVertex()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}}break;case EAGER:for (ExecutionJobVertex ejv : getVerticesTopologically()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}break;default:throw new JobException("Schedule mode is invalid.");}}

對于流默認是EAGER,

public JobGraph createJobGraph() {jobGraph = new JobGraph(streamGraph.getJobName());// make sure that all vertices start immediatelyjobGraph.setScheduleMode(ScheduleMode.EAGER);

?

ExecutionJobVertex.scheduleAll

    public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {    ExecutionVertex[] vertices = this.taskVertices;// kick off the tasksfor (ExecutionVertex ev : vertices) {ev.scheduleForExecution(slotProvider, queued);}}

ExecutionVertex.scheduleForExecution

//The current or latest execution attempt of this vertex's task
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

Execution.scheduleForExecution

    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();if (transitionState(CREATED, SCHEDULED)) {ScheduledUnit toSchedule = locationConstraint == null ? //生成ScheduledUnitnew ScheduledUnit(this, sharingGroup) :new ScheduledUnit(this, sharingGroup, locationConstraint);final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //從slotProvider獲取slotfinal Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {@Overridepublic Void apply(SimpleSlot simpleSlot, Throwable throwable) {if (simpleSlot != null) { //slot分配成功try {deployToSlot(simpleSlot); //deploy} catch (Throwable t) {try {simpleSlot.releaseSlot();} finally {markFailed(t);}}}else {markFailed(throwable);}return null;}});}

slotProvider,參考Flink - Scheduler

?

deployToSlot,核心就是往TaskManager提交submitTask請求

    public void deployToSlot(final SimpleSlot slot) throws JobException {ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) { //狀態遷移成Deployingthrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) { //設置slot和ExecuteVertex關系throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}this.assignedResource = slot;final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( //創建DeploymentDescriptor
                attemptId,slot,taskState,attemptNumber);// register this execution at the execution graph, to receive call backsvertex.getExecutionGraph().registerExecution(this);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); //向TaskMananger的Actor發送請求
submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {......}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/259253.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/259253.shtml
英文地址,請注明出處:http://en.pswp.cn/news/259253.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

window內容

window parent top location.href location.reload location.replace轉載于:https://www.cnblogs.com/carlos-guo/p/3391784.html

計算機類公務員如何提升自己,大學畢業才發現:所學專業對考公務員如此重要,4類專業上岸率高...

導語&#xff1a;畢業季來臨&#xff0c;同學們是想直接找工作積累工作經驗&#xff0c;還是繼續考取相關證書&#xff0c;來獲得更穩定職業的入場券&#xff1f;畢業抉擇很多畢業生面臨的第一個問題就是未來職業規劃&#xff0c;因為大學畢業之后&#xff0c;就意味著一段新的…

使用getline讀入

直接上代碼&#xff1a; 第一份&#xff1a;從cin 讀入多行數字&#xff0c;每行2個。當輸入完畢后&#xff0c;按2次回車結束 #include<iostream> #include <cstdio> #include <sstream> #include <string> #include <vector> #include <it…

POJ 1221

整數劃分 劃分成單峰的回文數列 dp[i][j] 表示 把i劃分&#xff0c;其中劃分的數不能大于j 1             i1或j1 dp[i][j]  dp[i][j-1]1         ji dp(i,j-1)dp(i-j,min(i-j,j)) i>j>1 1 #include <iostream>2 #include <cstd…

HYSBZ - 1050(旅行comf 并查集Java實現)

HYSBZ - 1050(旅行comf Java實現) 原題地址 解法&#xff1a;枚舉每一條邊&#xff0c;對于這條邊&#xff0c;我們需要找到集合中和其值相差最小的最大邊&#xff0c;這個集合是指與包括i邊在內的ST聯通集。對于這一要求&#xff0c;我們只需對所有的邊進行從小到大的排序&…

UVA 11401 - Triangle Counting

Problem G Triangle Counting Input: Standard Input Output: Standard Output You are given n rods of length 1, 2…, n. You have to pick any 3 of them & build a triangle. How many distinct triangles can you make? Note that, two triangles will be considere…

蘇州軟件測試11k工資要什么水平,3個月從機械轉行軟件測試,他的入職薪資是11K...

原標題&#xff1a;3個月從機械轉行軟件測試&#xff0c;他的入職薪資是11K只要找到適合自己的學習方式&#xff0c;成功轉行只是早晚的問題&#xff01;今天匯智妹給大家介紹的這位小伙伴&#xff0c;是咱們匯學聯盟平臺上的一位線上學員——小周。97年的小哥哥&#xff0c;19…

python idle 清屏問題的解決

在學習和使用python的過程中&#xff0c;少不了要與python idle打交道。但使用python idle都會遇到一個常見而又懊惱的問題——要怎么清屏?我在stackoverflow看到這樣兩種答案&#xff1a;1.在shell中輸入1 import os 2 os.system(cls) 這種方法只能在windows系統中cmd模式下的…

TCP/IP 原理--鏈路層

鏈路層作用&#xff1a; &#xff08;1&#xff09;為IP模塊發送和接收IP數據報&#xff1b; &#xff08;2&#xff09;為ARP發送ARP請求和接受ARP應答 &#xff08;3&#xff09;為RARP發送RARP請求和接受ARP應答 協議&#xff1a;以太網和SLIP協議 A.以太網協議數據封裝格式…

Sqoop拒絕連接錯誤

使用Sqoop遠程連接MySQL導入數據到HBase數據庫&#xff1a; sqoop import --driver com.mysql.jdbc.Driver --connect "jdbc:mysql://hzhiServer:3306/myssh?autoReconnecttrue" --table table_001 --username hadoop --password 1 --hbase-table table_001 --colum…

拆解凹多邊形

偶遇需要拆解凹多邊形 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows; using System.Windows.Media;namespace DrawPolygon {public static class Settings{public const float…

計算機教學的弊端,信息技術在教學中的利弊及解決對策

摘 要&#xff1a;信息技術教育已成為國家信息化事業的重要組成部分&#xff0c;是當今素質教育的重要內容之一。從闡述信息技術教育的內涵和發展階段出發&#xff0c;分析了當前信息技術在教學應用中的優勢和存在的問題&#xff0c;并提出了相應的解決對策。關鍵詞&#xff1a…

【轉】Linux命令之查看文件占用空間大小-du,df

原文網址&#xff1a;http://blog.csdn.net/wangjunjun2008/article/details/19840671 du(disk usage),顧名思義,查看目錄/文件占用空間大小#查看當前目錄下的所有目錄以及子目錄的大小$ du -h $ du -ah #-h:用K、M、G的人性化形式顯示 #-a:顯示目錄和文件 du -h tmp du -ah tm…

一個FORK的面試題

為什么80%的碼農都做不了架構師&#xff1f;>>> #include <stdio.h> #include <sys/types.h> #include <unistd.h> int main(void) { int i; for(i0; i<2; i){ fork(); printf("-"); } wait(NULL); wait(NULL); return 0; }/c 如果…

C++11系列學習之二-----lambda表達式

C11添加了一項名為lambda表達式的新功能&#xff0c;通過這項功能可以編寫內嵌的匿名函數&#xff0c;而不必編寫獨立函數和函數對象&#xff0c;使得代碼更容易理解。lambda表達式的語法如下所示&#xff1a;[capture_block](parameters) exceptions_specification -> retu…

php四種基礎算法:冒泡,選擇,插入和快速排序法

許多人都說 算法是程序的核心&#xff0c;一個程序的好于差,關鍵是這個程序算法的優劣。作為一個初級phper&#xff0c;雖然很少接觸到算法方面的東西 。但是對于冒泡排序&#xff0c;插入排序&#xff0c;選擇排序&#xff0c;快速排序四種基本算法&#xff0c;我想還是要掌握…

GCPC2014 C Bounty Hunter

題意&#xff1a;給你一個平面上的點集&#xff08;x值各不相等&#xff09;&#xff0c;問你從最左邊走到最右邊&#xff08;只能以x遞增的順序&#xff09;&#xff0c;再從最右邊回到最左邊&#xff08;以x遞減的順序&#xff09;問你最短距離是多少。 解題思路&#xff1a;…

計算機啟動時運行ccleaner,Ccleaner的使用方法

ccleaner是一款非常好用的系統優化工具&#xff0c;它可以提升電腦速度&#xff0c;可以對上網歷史記錄、臨時文件夾、回收站垃圾清理、注冊表進行垃圾項掃描和清理、軟件卸載等功能&#xff0c;保護用戶的個人瀏覽隱私&#xff0c;為Windows系統騰出更多硬盤空間。下面小編就為…

PLSQL Developer軟件使用大全

PLSQL Developer軟件使用大全 第一章 PLSQL Developer特性 PL/SQL Developer是一個集成開發環境&#xff0c;專門面向Oracle數據庫存儲程序單元的開發。如今&#xff0c;有越來越多的商業邏輯和應用邏輯轉向了Oracle Server&#xff0c;因此&#xff0c;PL/SQL編程也成了整個開…

C++11系列學習之三----array/valarray

創建數組&#xff0c;是程序設計中必不可少的一環。我們一般可以有以下幾種方法來創建數組。 一、C內置數組 數組大小固定&#xff0c;速度較快 通用格式是&#xff1a;數據類型 數組名[ 數組大小 ]; 如 int a[40];//一維數組 int a[5][10];//二維數組 二、vector創建數組 包…