若當前JobClient (0.22 hadoop) 運行在YARN.則job提交任務運行在YARNRunner
Hadoop Yarn 框架原理及運作機制

主要步驟
- 作業提交
- 作業初始化
- 資源申請與任務分配
- 任務執行
具體步驟
在運行作業之前,Resource Manager和Node Manager都已經啟動,所以在上圖中,Resource Manager進程和Node Manager進程不需要啟動
- 1. 客戶端進程通過runJob(實際中一般使用waitForCompletion提交作業)在客戶端提交Map Reduce作業(在Yarn中,作業一般稱為Application應用程序)
- 2. 客戶端向Resource Manager申請應用程序ID(application id),作為本次作業的唯一標識
- 3. 客戶端程序將作業相關的文件(通常是指作業本身的jar包以及這個jar包依賴的第三方的jar),保存到HDFS上。也就是說Yarn based MR通過HDFS共享程序的jar包,供Task進程讀取
- 4. 客戶端通過runJob向ResourceManager提交應用程序
- 5.a/5.b. Resource Manager收到來自客戶端的提交作業請求后,將請求轉發給作業調度組件(Scheduler),Scheduler分配一個Container,然后Resource Manager在這個Container中啟動Application Master進程,并交由Node Manager對Application Master進程進行管理
- 6. Application Master初始化作業(應用程序),初始化動作包括創建監聽對象以監聽作業的執行情況,包括監聽任務匯報的任務執行進度以及是否完成(不同的計算框架為集成到YARN資源調度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架為了運行在Yarn之上,它們都提供了ApplicationMaster)
- 7. Application Master根據作業代碼中指定的數據地址(數據源一般來自HDFS)進行數據分片,以確定Mapper任務數,具體每個Mapper任務發往哪個計算節點,Hadoop會考慮數據本地性,本地數據本地性、本機架數據本地性以及最后跨機架數據本地性)。同時還會計算Reduce任務數,Reduce任務數是在程序代碼中指定的,通過job.setNumReduceTask顯式指定的
- 8.如下幾點是Application Master向Resource Manager申請資源的細節
- 8.1 Application Master根據數據分片確定的Mapper任務數以及Reducer任務數向Resource Manager申請計算資源(計算資源主要指的是內存和CPU,在Hadoop Yarn中,使用Container這個概念來描述計算單位,即計算資源是以Container為單位的,一個Container包含一定數量的內存和CPU內核數)。
- 8.2 Application Master是通過向Resource Manager發送Heart Beat心跳包進行資源申請的,申請時,請求中還會攜帶任務的數據本地性等信息,使得Resource Manager在分配資源時,不同的Task能夠分配到的計算資源盡可能滿足數據本地性
- 8.3 Application Master向Resource Manager資源申請時,還會攜帶內存數量信息,默認情況下,Map任務和Reduce任務都會分陪1G內存,這個值是可以通過參數mapreduce.map.memory.mb and mapreduce.reduce.memory.mb進行修改。
5. YARNRunner
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
調用YarnClient的submitApplication()方法,其實現如下:
6. YarnClientImpl
@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }
7. ClientRMService
ClientRMService是resource manager的客戶端接口。這個模塊處理從客戶端到resource mananger的rpc接口。
@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.