LOG.info("Start request for " + containerIdStr + " by user " + user); // 得到当前container的上下文信息 ContainerLaunchContext launchContext = request.getContainerLaunchContext(); ... // 创建container对象,开始NodeManager上container的状态机转换 // container的初始状态为NEW Container container = new ContainerImpl(getConfig(), this.dispatcher, context.getNMStateStore(), launchContext, credentials, metrics, containerTokenIdentifier);
ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); // 将container放入context的containers中 if (context.getContainers().putIfAbsent(containerId, container) != null) { NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER, "ContainerManagerImpl", "Container already running on this node!", applicationID, containerId); throw RPCUtil.getRemoteException("Container " + containerIdStr + " already is running on this node!!"); }
this.readLock.lock(); try { if (!serviceStopped) { // Create the application // 创建application对象 Application application = new ApplicationImpl(dispatcher, user, applicationID, credentials, context); // 如果是该application的第一个container,则进行一些辅助操作,如启动log aggregation服务 if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); LogAggregationContext logAggregationContext = containerTokenIdentifier.getLogAggregationContext(); Map<ApplicationAccessType, String> appAcls = container.getLaunchContext().getApplicationACLs(); // logAggregationContext放入context中共用 context.getNMStateStore().storeApplication(applicationID, buildAppProto(applicationID, user, credentials, appAcls, logAggregationContext)); // 触发ApplicationEventType.INIT_APPLICATION事件类型 dispatcher.getEventHandler().handle( new ApplicationInitEvent(applicationID, appAcls, logAggregationContext)); }
this.context.getNMStateStore().storeContainer(containerId, request); // 触发ApplicationEventType.INIT_CONTAINER事件类型 dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); ... } else { thrownew YarnException( "Container start failed as the NodeManager is " + "in the process of shutting down"); } } finally { this.readLock.unlock(); } }
publicvoidtransition(ApplicationImpl app, ApplicationEvent event){ ApplicationContainerInitEvent initEvent = (ApplicationContainerInitEvent) event; Container container = initEvent.getContainer(); app.containers.put(container.getContainerId(), container); LOG.info("Adding " + container.getContainerId() + " to application " + app.toString()); switch (app.getApplicationState()) { case RUNNING: app.dispatcher.getEventHandler().handle(new ContainerInitEvent( container.getContainerId())); break; case INITING: case NEW: // these get queued up and sent out in AppInitDoneTransition break; default: assertfalse : "Invalid state for InitContainerTransition: " + app.getApplicationState(); } }
// Inform the AuxServices about the opaque serviceData Map<String,ByteBuffer> csd = ctxt.getServiceData(); if (csd != null) { // This can happen more than once per Application as each container may // have distinct service data for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) { container.dispatcher.getEventHandler().handle( new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, container.user, container.containerId .getApplicationAttemptId().getApplicationId(), service.getKey().toString(), service.getValue())); } }
// Send requests for public, private resources // 为public和private资源发送远程请求,这里的请求协议是yarn_protos.ContainerLaunchContextProto Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources(); if (!cntrRsrc.isEmpty()) { try { for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) { try { LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); List<String> links = container.pendingResources.get(req); if (links == null) { links = new ArrayList<String>(); container.pendingResources.put(req, links); } links.add(rsrc.getKey()); switch (rsrc.getValue().getVisibility()) { case PUBLIC: container.publicRsrcs.add(req); break; case PRIVATE: container.privateRsrcs.add(req); break; case APPLICATION: container.appRsrcs.add(req); break; } } catch (URISyntaxException e) { LOG.info("Got exception parsing " + rsrc.getKey() + " and value " + rsrc.getValue()); throw e; } } } catch (URISyntaxException e) { // malformed resource; abort container launch LOG.warn("Failed to parse resource-request", e); container.cleanup(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZATION_FAILED; } Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req = new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>(); if (!container.publicRsrcs.isEmpty()) { req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs); } if (!container.privateRsrcs.isEmpty()) { req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs); } if (!container.appRsrcs.isEmpty()) { req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs); }
LOG.info("{intellij} ContainerImpl new to localizing handle"); container.dispatcher.getEventHandler().handle( new ContainerLocalizationRequestEvent(container, req)); return ContainerState.LOCALIZING; } else { container.sendLaunchEvent(); container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } }
public Integer call(){ ... try { ... // LaunchContainer is a blocking call. We are here almost means the // container is launched, so send out the event. // 处理ContainerEventType.CONTAINER_LAUNCHED,使container由LOCALIZED变为RUNNING // 并开始监控这个container使用的内存(物理内存和虚拟内存) dispatcher.getEventHandler().handle(new ContainerEvent( containerID, ContainerEventType.CONTAINER_LAUNCHED)); context.getNMStateStore().storeContainerLaunched(containerID);
// Check if the container is signalled to be killed. if (!shouldLaunchContainer.compareAndSet(false, true)) { LOG.info("Container " + containerIdStr + " not launched as " + "cleanup already called"); ret = ExitCode.TERMINATED.getExitCode(); } else { exec.activateContainer(containerID, pidFilePath); // 执行启动container的脚本 ret = exec.launchContainer(container, nmPrivateContainerScriptPath, nmPrivateTokensPath, user, appIdStr, containerWorkDir, localDirs, logDirs); } } catch (Throwable e) { LOG.warn("Failed to launch container.", e); dispatcher.getEventHandler().handle(new ContainerExitEvent( containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret, e.getMessage())); return ret; } finally { completed.set(true); exec.deactivateContainer(containerID); try { context.getNMStateStore().storeContainerCompleted(containerID, ret); } catch (IOException e) { LOG.error("Unable to set exit code for container " + containerID); } } ... dispatcher.getEventHandler().handle( new ContainerEvent(containerID, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); return0; }