任何一个计算框架或者说一个服务要运行在yarn上,都需要一个master来对job进行管理,这个master就是ApplicationMaster。

ApplicationMaster是一个job的大脑,下面就以MapReduce为例,介绍下ApplicationMaster的启动流程。

首先client向RM提交一个application请求,RM创建一个application,然后再创建一个appattempt,后期的调度和任务的拆解都是对这个appattempt进行的,当appattempt的状态从ALLOCATED_SAVING变成ALLOCATED时,由AttemptStoredTransition.transition调用appAttempt.launchAttempt()进行启动,下面来看下具体代码:

1
2
3
4
5
6
7
// RMAppAttemptImpl.java
private void launchAttempt(){
// Send event to launch the AM Container
// 通过异步调度器得到该事件注册的handle (在ResourceManager中注册)
// AMLauncherEvent 对应的handle是ApplicationMasterLauncher
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
}

AMLauncherEvent对应的handle是ApplicationMasterLauncher,事件类型是LAUNCH,在ApplicationMasterLauncher.handle中会调用launch(application),代码如下:

1
2
3
4
5
6
7
private void launch(RMAppAttempt application) {
// 创建一个线程
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
// 将线程放入阻塞队列中
masterEvents.add(launcher);
}

只从这个方法来分析,首先创建了一个launcher线程,然后将其放入一个队列中,等待另一个线程从队列中取出进行操作,这是典型的生产者消费者模型。那么我们就来看下ApplicationMasterLauncher(ApplicationMasterLauncher是一个事件也是一个服务)关于这块代码的具体实现。

先看下createRunnableLauncher

1
2
3
4
5
6
protected Runnable createRunnableLauncher(RMAppAttempt application, 
AMLauncherEventType event) {
Runnable launcher =
new AMLauncher(context, application, event, getConfig());
return launcher;
}

这里只是new了一个AMLauncher,AMLauncher实现了Runnable接口,是执行AM操作的线程,只执行launchcleanup

launcher线程创建之后add到阻塞队列masterEvents中,那么必然会有另一个线程来队列中take launcher,这个线程是LauncherThread类型的launcherHandlingThread,launcherHandlingThread将launcher取出丢给线程池去执行,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private class LauncherThread extends Thread {
public LauncherThread() {
super("ApplicationMaster Launcher");
}
@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
// 从阻塞队列中取出
toLaunch = masterEvents.take();
// 交给线程池执行
// this.launcherPool = new ThreadPoolExecutor(10, 10, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}

放入线程池之后,launcher线程就开始执行,调用的是AMLauncher.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
launch();
// 向AMLivelinessMonitor中注册am
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application
.getAppAttemptId(), message));
}
break;
case CLEANUP:
...
break;
default:
LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
break;
}
}

之前放入阻塞队列masterEvents的事件类型是LAUNCH,则此处调用launch()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void launch() throws IOException, YarnException {
// 得到对应node的rpc客户端
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
// 构建request
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
// 远程调用startContainers
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}

AMLaunch.launch先在connect()中拿到对应node的rpc客户端containerMgrProxy,然后构造request,最后调用rpc函数startContainers()并返回response。看下node端startContainers代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public StartContainersResponse
startContainers(StartContainersRequest requests) throws YarnException,
IOException {
...
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi,nmTokenIdentifier);
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
for (StartContainerRequest request : requests.getStartContainerRequests()) {
ContainerId containerId = null;
try {
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (InvalidToken ie) {
failedContainers.put(containerId, SerializedException.newInstance(ie));
throw ie;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}

return StartContainersResponse.newInstance(getAuxServiceMetaData(),
succeededContainers, failedContainers);
}

startContainers对request中的container请求进行遍历,调用startContainerInternal启动一个container,这个container是在nodemanager上准备运行task的。启动成功的放入succeededContainers列表中,失败的则放入failedContainers中,遍历结束构造一个response返回给rm。

看下启动container的startContainerInternal方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException {
...
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIdStr = containerId.toString();
String user = containerTokenIdentifier.getApplicationSubmitter();

LOG.info("Start request for " + containerIdStr + " by user " + user);
// 得到当前container的上下文信息
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
...
// 创建container对象,开始NodeManager上container的状态机转换
// container的初始状态为NEW
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
context.getNMStateStore(), launchContext,
credentials, metrics, containerTokenIdentifier);

ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
// 将container放入context的containers中
if (context.getContainers().putIfAbsent(containerId, container) != null) {
NMAuditLogger.logFailure(user, AuditConstants.START_CONTAINER,
"ContainerManagerImpl", "Container already running on this node!",
applicationID, containerId);
throw RPCUtil.getRemoteException("Container " + containerIdStr
+ " already is running on this node!!");
}

this.readLock.lock();
try {
if (!serviceStopped) {
// Create the application
// 创建application对象
Application application =
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
// 如果是该application的第一个container,则进行一些辅助操作,如启动log aggregation服务
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
LogAggregationContext logAggregationContext =
containerTokenIdentifier.getLogAggregationContext();
Map<ApplicationAccessType, String> appAcls =
container.getLaunchContext().getApplicationACLs();
// logAggregationContext放入context中共用
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext));
// 触发ApplicationEventType.INIT_APPLICATION事件类型
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, appAcls,
logAggregationContext));
}

this.context.getNMStateStore().storeContainer(containerId, request);
// 触发ApplicationEventType.INIT_CONTAINER事件类型
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
...
} else {
throw new YarnException(
"Container start failed as the NodeManager is " +
"in the process of shutting down");
}
} finally {
this.readLock.unlock();
}
}

startContainerInternal首先创建一个container对象,这也就开启了container的状态机之旅,新建的container状态是NEW。
随后判断该container是否是该application的第一个container,如果是则启动日志聚合功能,并触发ApplicationEventType.INIT_APPLICATION事件类型,使application的状态由ApplicationState.NEW变为ApplicationState.INITING,最后触发ApplicationEventType.INIT_CONTAINER事件类型,更新container的状态。如果不是则直接触发ApplicationEventType.INIT_CONTAINER事件类型。

if语句执行完之后,application的状态由NEW变为了INITING,此时将container信息存储在context中,并触发ApplicationEventType.INIT_CONTAINER事件类型,由于此时application的状态是INITING,事件类型为INIT_CONTAINER,则处理次事件的handler是InitContainerTransition,随后application的状态依然是INITING,看下InitContainerTransition.transition方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerId(), container);
LOG.info("Adding " + container.getContainerId()
+ " to application " + app.toString());

switch (app.getApplicationState()) {
case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
break;
case INITING:
case NEW:
// these get queued up and sent out in AppInitDoneTransition
break;
default:
assert false : "Invalid state for InitContainerTransition: " +
app.getApplicationState();
}
}

看代码可见当application的状态是INITING和NEW时,触发INIT_CNONTAINER方法时不进行任何操作,则application的状态也不会发生变化。只有当application的状态时RUNNING时,才会触发由ContainerEventType.INIT_CONTAINER事件类型触发container的状态转移。

那么何时application的状态才会变为RUNNING呢?我们回到在新建application对象时触发的ApplicationEventType.INIT_APPLICATION事件类型上,此时application刚被new出来,则初始状态上NEW,则处理该事件类型的handler是AppInitTransition,下面看下AppInitTransition.transition

1
2
3
4
5
6
7
8
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
...
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs, app.logAggregationContext));
}

该transtion中是一个异步调度器,处理的事件是LogHandlerAppStartedEvent,事件类型是LogHandlerEventType.APPLICATION_STARTED,此事件类型是在LogAggregationService中处理的,看下对应的handle方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartEvent =
(LogHandlerAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls(),
appStartEvent.getLogAggregationContext());
break;
case CONTAINER_FINISHED:
...
break;
case APPLICATION_FINISHED:
...
break;
default:
; // Ignore
}
}

LogAggregationService从名字来看,只要是用来进行日志聚合的,处理的事件类型有APPLICATION_STARTEDCONTAINER_FINISHEDAPPLICATION_FINISHED
APPLICATION_STARTED事件类型会调用initApp方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ApplicationEvent eventResponse;
try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
logAggregationContext);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
}

logDir创建成功之后,会触发ApplicationEventType.APPLICATION_LOG_HANDLING_INITED事件类型,此时application的状态是INITING,则对应的handler是AppLogInitDoneTransition,看下transition方法:

1
2
3
4
5
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}

当LogAggregationService为application创建了logDir并且启动日志聚合线程之后,才会通过AppLogInitDoneTransition处理APPLICATION_LOG_HANDLING_INITED事件。
在AppLogInitDoneTransition中触发LocalizationEventType.INIT_APPLICATION_RESOURCES在ResourceLocalizationService中被捕获,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void handle(LocalizationEvent event) {
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
handleInitApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
case INIT_CONTAINER_RESOURCES:
LOG.info("{intellij} ResourceLocalizationsService handle INIT_CONTAINER_RESOURCES");
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CACHE_CLEANUP:
handleCacheCleanup(event);
break;
case CLEANUP_CONTAINER_RESOURCES:
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
break;
case DESTROY_APPLICATION_RESOURCES:
handleDestroyApplicationResources(
((ApplicationLocalizationEvent)event).getApplication());
break;
default:
throw new YarnRuntimeException("Unknown localization event: " + event);
}
}

handle中处理不同的事件类型,INIT_APPLICATION_RESOURCES由handleInitApplicationResources处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
null, dispatcher, true, super.getConfig(), stateStore));
String appIdStr = ConverterUtils.toString(app.getAppId());
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
// containers to proceed with launching.
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
app.getAppId()));
}

这里会触发ApplicationEventType.APPLICATION_INITED,在application的状态机中处理,对应的handler是AppInitDoneTransition,处理之后application的状态由INITING转化为RUNNING。AppInitDoneTransition.transition方法如下:

1
2
3
4
5
6
7
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
}
}

这里会触发ContainerEventType.INIT_CONTAINER事件类型,由此事件类型开启container的状态转移。
startContainerInternal中新建了一个container,初始化状态为NEW。此时当ApplicationEventType.APPLICATION_INITED触发之后,对应的handler会触发ContainerEventType.INIT_CONTAINER,container开始状态的转化,对应的handler是RequestResourcesTransition,看下transtion的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
...
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();

container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));

// Inform the AuxServices about the opaque serviceData
Map<String,ByteBuffer> csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
container.user, container.containerId
.getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}

// Send requests for public, private resources
// 为public和private资源发送远程请求,这里的请求协议是yarn_protos.ContainerLaunchContextProto
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
throw e;
}
}
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}

LOG.info("{intellij} ContainerImpl new to localizing handle");
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
}

从该handler的名字上可以看出其主要作用是请求container的resources,方法中会判断该container是否需要请求resources,

  • 如果需要则将资源进行本地化,触发LocalizationEventType.INIT_CONTAINER_RESOURCES,返回ContainerState.LOCALIZING,使container由NEW转换为LOCALIZING状态。而LocalizationEventType.INIT_CONTAINER_RESOURCES被ResourceLocalizationService进行捕获,开始Resource的本地化。
  • 如果不需要则发送LAUNCH_CONTAINER事件,返回ContainerState.LOCALIZED,使container从NEW直接转化为LOCALIZED

这里我们跟下不需要请求resources的情况,调用sendLaunchEvent发送ContainersLauncherEventType.LAUNCH_CONTAINER事件类型,由ContainersLauncher捕获。其handle方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerId();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
// 新建一个ContainerLaunch线程,然后放入线程池containerLauncher中执行
ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
case RECOVER_CONTAINER:
...
break;
case CLEANUP_CONTAINER:
...
break;
}
}

ContainerLaunch线程提交到containerLauncher线程池之后开始执行此线程。ContainerLaunch实现了Callable接口,则线程的执行逻辑在call方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Integer call() {
...
try {
...
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
// 处理ContainerEventType.CONTAINER_LAUNCHED,使container由LOCALIZED变为RUNNING
// 并开始监控这个container使用的内存(物理内存和虚拟内存)
dispatcher.getEventHandler().handle(new ContainerEvent(
containerID,
ContainerEventType.CONTAINER_LAUNCHED));
context.getNMStateStore().storeContainerLaunched(containerID);

// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
LOG.info("Container " + containerIdStr + " not launched as "
+ "cleanup already called");
ret = ExitCode.TERMINATED.getExitCode();
}
else {
exec.activateContainer(containerID, pidFilePath);
// 执行启动container的脚本
ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir,
localDirs, logDirs);
}
} catch (Throwable e) {
LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
try {
context.getNMStateStore().storeContainerCompleted(containerID, ret);
} catch (IOException e) {
LOG.error("Unable to set exit code for container " + containerID);
}
}
...
dispatcher.getEventHandler().handle(
new ContainerEvent(containerID,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
}

ContainerLaunch将container运行环境准备好之后,触发ContainerEventType.CONTAINER_LAUNCHED事件类型,LaunchTransition捕获之后,*触发监控container的事件(监控container所需的物理内存和虚拟内存)*,使container由LOCALIZED变为RUNNING。
触发ContainerEventType.CONTAINER_LAUNCHED事件类型之后,继续执行,调用exec.launchContainer启动container,
该方法在container执行完毕之后才会返回*。如果正常结束ret为0,触发ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS事件类型,
使container由RUNNING变为EXITED_WITH_SUCCESS*。

这里exec.launchContainer的实现是DefaultContainerExecutor.launchContainer,在launchContainer方法中会执行bash default_container_executor.sh命令,default_container_executor.sh脚本的内容是:

1
2
3
#!/bin/bash
/bin/bash "/xx/usercache/user/appcache/application_1499422474367_0001/container_1499422474367_0001_01_000001/default_container_executor_session.sh"
...

调用了default_container_executor_session.sh脚本

1
2
3
#!/bin/bash
...
exec /bin/bash "/xx/usercache/user/appcache/application_1499422474367_0001/container_1499422474367_0001_01_000001/launch_container.sh"

最后调用了launch_container.sh脚本,内容如下:

1
2
3
...
exec /bin/bash -c "$JAVA_HOME/bin/java -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/xx/application_1499422474367_0001/container_1499422474367_0001_01_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1>/xx/application_1499422474367_0001/container_1499422474367_0001_01_000001/stdout 2>/xx/application_1499422474367_0001/container_1499422474367_0001_01_000001/stderr "
...

这个container是appmaster,所以这里调用的是MRAppMaster,并将标准输出写到stdout中,将标准错误输出写到stderr中。这也就是container的log目录里有三个文件的原因。

至此,AppMaster启动完毕,过程比较繁琐,下面附上一张图。随后介绍下MRAppMaster的运行过程。
AppMaster启动流程