protectedvoidserviceInit(final Configuration conf)throws Exception { ... // 判断一些目录是否存在 try { String user = UserGroupInformation.getCurrentUser().getShortUserName(); // 目录为 /tmp/hadoop-yarn/staging/user/.staging Path stagingDir = MRApps.getStagingAreaDir(conf, user); FileSystem fs = getFileSystem(conf); boolean stagingExists = fs.exists(stagingDir); // 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_STARTED Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId); boolean commitStarted = fs.exists(startCommitFile); // 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_SUCCESS Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId); boolean commitSuccess = fs.exists(endCommitSuccessFile); // 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_FAIL Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId); boolean commitFailure = fs.exists(endCommitFailureFile); ... } catch (IOException e) { thrownew YarnRuntimeException("Error while initializing", e); } if (errorHappenedShutDown) { // 发生error的处理逻辑 } else { // 创建MR依赖的OutputCommitter committer = createOutputCommitter(conf); // 为MRAppMaster服务创建异步调度器服务 dispatcher = createDispatcher(); addIfService(dispatcher); //service to handle requests from JobClient // 创建MRClientService clientService = createClientService(context); // Init ClientService separately so that we stop it separately, since this // service needs to wait some time before it stops so clients can know the // final states clientService.init(conf); // 根据mr的运行方式创建不同的container分配router // 调度各个状态 流程 ?????????##########################?????????? // 请求的流程,分配的container是rm已经分配到am上的资源???主动分配的?在哪申请 containerAllocator = createContainerAllocator(clientService, context); //service to handle the output committer committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); //service to handle requests to TaskUmbilicalProtocol // 主要负责启动task 使用TaskUmbilicalProtocol协议 taskAttemptListener = createTaskAttemptListener(context); addIfService(taskAttemptListener); //service to log job history events EventHandler<JobHistoryEvent> historyService = createJobHistoryHandler(context); dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, historyService); this.jobEventDispatcher = new JobEventDispatcher(); //register the event dispatchers dispatcher.register(JobEventType.class, jobEventDispatcher); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(CommitterEventType.class, committerEventHandler); ... // Now that there's a FINISHING state for application on RM to give AMs // plenty of time to clean up after unregister it's safe to clean staging // directory after unregistering with RM. So, we start the staging-dir // cleaner BEFORE the ContainerAllocator so that on shut-down, // ContainerAllocator unregisters first and then the staging-dir cleaner // deletes staging directory. // 开启清理服务 // 与OutputCommitter的区别 addService(createStagingDirCleaningService()); // service to allocate containers from RM (if non-uber) or to fake it (uber) addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); // corresponding service to launch allocated containers via NodeManager containerLauncher = createContainerLauncher(context); addIfService(containerLauncher); dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. // Note: Even though JobHistoryEventHandler is started last, if any // component creates a JobHistoryEvent in the meanwhile, it will be just be // queued inside the JobHistoryEventHandler // 可见服务的启动顺序是先进后出 addIfService(historyService); } super.serviceInit(conf); } // end of init()
protectedvoidserviceStart()throws Exception { ... // /////////////////// Create the job itself. // 开启job的状态机 JobImpl NEW job = createJob(getConfig(), forcedState, shutDownMessage); // job创建成功之后,向jobHistory发送EventType.AM_STARTED事件类型, // 但是为什么会有amInfos,amInfos用来干啥? for (AMInfo info : amInfos) { dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(info .getAppAttemptId(), info.getStartTime(), info.getContainerId(), info.getNodeManagerHost(), info.getNodeManagerPort(), info .getNodeManagerHttpPort()))); }
// Send out an MR AM inited event for this AM. dispatcher.getEventHandler().handle( new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo .getNodeManagerHttpPort(), this.forcedState == null ? null : this.forcedState.toString()))); amInfos.add(amInfo);
// metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster");
boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization // job init job从NEW变为INITED JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); // Send init to the job (this does NOT trigger job execution) // This is a synchronous call, not an event through dispatcher. We want // job-init to be done completely here. // 注意注释的内容,与dispatcher的区别 jobEventDispatcher.handle(initJobEvent);
// If job is still not initialized, an error happened during // initialization. Must complete starting all of the services so failure // events can be processed. initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); ... // Start ClientService here, since it's not initialized if // errorHappenedShutDown is true clientService.start(); } //start all the components super.serviceStart();
// finally set the job classloader MRApps.setClassLoader(jobClassLoader, getConfig()); // 如果job INITED失败,则job的状态为JOB_INIT_FAILED if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); jobEventDispatcher.handle(initFailedEvent); } else { // All components have started, start the job. startJobs(); } }
// create the Tasks but don't start them yet // 创建map reduce task对象 createMapTasks(job, inputLength, taskSplitMetaInfo); createReduceTasks(job); ... return JobStateInternal.INITED; } catch (Exception e) { ... // Leave job in the NEW state. The MR AM will detect that the state is // not INITED and send a JOB_INIT_FAILED event. return JobStateInternal.NEW; } }
protectedvoidstartJobs(){ /** create a job-start event to get this ball rolling */ JobEvent startJobEvent = new JobStartEvent(job.getID(), recoveredJobStartTime); /** send the job-start event. this triggers the job execution. */ // 触发job execution dispatcher.getEventHandler().handle(startJobEvent); }