YARN源码分析之ApplicationMaster启动流程中得知MR的AppMaster是由MRAppMaster启动的,在脚本中调用了AppMaster的main方法。

本文就顺着代码来对MRAppMaster进行解析下。

先来看下该类的main方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
try {
// Environment中的环境变量是在launch_container.sh中export的
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
...
// 一个MR对应一个MRAppMater对象
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
...
// 初始化并启动appMaster
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}

MRAppMaster首先从环境变量中获取相关的信息,然后调用initAndStartAppMaster开启AppMaster服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
...
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}

MRAppMaster继承了CompositeService,则调用init进行服务初始化,start启动服务。而init又调用MRAppMaster.serviceInit对appMaster进行初始化,调用MRAppMaster.serviceStart启动AppMaster服务。

MRAppMaster初始化

先看下AppMaster的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
protected void serviceInit(final Configuration conf) throws Exception {
...
// 判断一些目录是否存在
try {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
// 目录为 /tmp/hadoop-yarn/staging/user/.staging
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
FileSystem fs = getFileSystem(conf);
boolean stagingExists = fs.exists(stagingDir);
// 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_STARTED
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
boolean commitStarted = fs.exists(startCommitFile);
// 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_SUCCESS
Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
boolean commitSuccess = fs.exists(endCommitSuccessFile);
// 目录为 /tmp/hadoop-yarn/staging/user/.staging/jobId/COMMIT_FAIL
Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
boolean commitFailure = fs.exists(endCommitFailureFile);
...
} catch (IOException e) {
throw new YarnRuntimeException("Error while initializing", e);
}
if (errorHappenedShutDown) {
// 发生error的处理逻辑
} else {
// 创建MR依赖的OutputCommitter
committer = createOutputCommitter(conf);
// 为MRAppMaster服务创建异步调度器服务
dispatcher = createDispatcher();
addIfService(dispatcher);
//service to handle requests from JobClient
// 创建MRClientService
clientService = createClientService(context);
// Init ClientService separately so that we stop it separately, since this
// service needs to wait some time before it stops so clients can know the
// final states
clientService.init(conf);
// 根据mr的运行方式创建不同的container分配router
// 调度各个状态 流程 ?????????##########################??????????
// 请求的流程,分配的container是rm已经分配到am上的资源???主动分配的?在哪申请
containerAllocator = createContainerAllocator(clientService, context);
//service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler);
//service to handle requests to TaskUmbilicalProtocol
// 主要负责启动task 使用TaskUmbilicalProtocol协议
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
historyService);
this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(CommitterEventType.class, committerEventHandler);
...
// Now that there's a FINISHING state for application on RM to give AMs
// plenty of time to clean up after unregister it's safe to clean staging
// directory after unregistering with RM. So, we start the staging-dir
// cleaner BEFORE the ContainerAllocator so that on shut-down,
// ContainerAllocator unregisters first and then the staging-dir cleaner
// deletes staging directory.
// 开启清理服务
// 与OutputCommitter的区别
addService(createStagingDirCleaningService());
// service to allocate containers from RM (if non-uber) or to fake it (uber)
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
// corresponding service to launch allocated containers via NodeManager
containerLauncher = createContainerLauncher(context);
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
// Note: Even though JobHistoryEventHandler is started last, if any
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
// 可见服务的启动顺序是先进后出
addIfService(historyService);
}
super.serviceInit(conf);
} // end of init()

如果一切正常会进入最后一个if else语句块的else中进行AM的初始化。来看下初始化中都包括哪些服务。

先创建OutputCommitter对象,这是一个抽象类,实现类有FileOutputCommitter。随后用与构建CommitterEventHandler服务,并将此服务加入AppMaster服务中。
OutputCommitter的作用有

  1. job初始化时进行一些准备工作,比如为job创建临时输出目录
  2. job完成时进行一些扫尾工作,比如当job完成之后删除临时输出目录
  3. 为task准备临时输出目录
  4. 检查task是否需要commit
  5. commit task输出目录
  6. 放弃task commit

创建异步调度器dispatcher。主要用于事件的调度,是yarn框架中一个重要的概念。

创建MRClientService对象clientService,MRClientService主要用于client与am进行通信,需要对其进行单独的初始化和停止。

创建TaskAttemptListenerImpl服务,主要用与和task的通行。

创建StagingDirCleaningService服务,用于删除临时目录。此服务要在containerAllocator服务之前添加,因为container注销之后才能清楚该container的临时目录。======那和outputCommitter有什么区别呢????

创建ContainerAllocatorRouter服务,用与分配container。根据MR是否为Uber类型创建不同的分配策略,如果是Uber则创建LocalContainerAllocator,否则创建RMContainerAllocator

创建JobHistoryEventHandler服务,这个服务是最后添加的,要保证此服务先停止。

初始化不只是添加一些服务还会向异步调度器中注册一些事件处理器,包括jobhistory.EventType、JobEventType、TaskEventType、TaskAttemptEventType、CommitterEventType、ContainerAllocator.EventType和ContainerLauncher.EventType。

MRAppMaster启动

初始化之后,调用serviceStart启动服务,看下都启动了什么服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
protected void serviceStart() throws Exception {
...
// /////////////////// Create the job itself.
// 开启job的状态机 JobImpl NEW
job = createJob(getConfig(), forcedState, shutDownMessage);
// job创建成功之后,向jobHistory发送EventType.AM_STARTED事件类型,
// 但是为什么会有amInfos,amInfos用来干啥?
for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerPort(), info
.getNodeManagerHttpPort())));
}

// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString())));
amInfos.add(amInfo);

// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");

boolean initFailed = false;
if (!errorHappenedShutDown) {
// create a job event for job intialization
// job init job从NEW变为INITED
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
// Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
// 注意注释的内容,与dispatcher的区别
jobEventDispatcher.handle(initJobEvent);

// If job is still not initialized, an error happened during
// initialization. Must complete starting all of the services so failure
// events can be processed.
initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
...
// Start ClientService here, since it's not initialized if
// errorHappenedShutDown is true
clientService.start();
}
//start all the components
super.serviceStart();

// finally set the job classloader
MRApps.setClassLoader(jobClassLoader, getConfig());
// 如果job INITED失败,则job的状态为JOB_INIT_FAILED
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
jobEventDispatcher.handle(initFailedEvent);
} else {
// All components have started, start the job.
startJobs();
}
}

在启动MRAppMaster所有服务之前

  1. 先调用createJob创建job,job是JobImpl对象。job的创建开启了job的状态机旅程,初始的状态是NEW。在createJob中还会向异步调度器注册JobFinishEvent事件类型。
  2. 触发job的JOB_INIT并交给事件处理器JobEventDispatcher进行处理,此时是同步的,等待job init的结果
  3. 在启动MRAppMaster中服务之前要先启动clientService,然后启动所有的服务
  4. 如果job从NEW转化为INITED时发生了故障,则置initFailed为true,向JobEventDispatcher发送JobEventType.JOB_INIT_FAILED事件类型
  5. 一切正常之后调用startJobs()

针对上述流程我们看下具体代码,

serviceStart时先调用createJob创建一个job,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Job createJob(Configuration conf, JobStateInternal forcedState, 
String diagnostic) {
// new 出job对象,开启job状态机(初始化job的状态时NEW)
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
// 开始job的状态之后,向异步调度器注册JobFinishEvent事件类型的handler
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
} // end createJob()

createJob的时候用到一个属性amInfos,amInfos是一个list,在serviceStart时实例化。

在createJob之后,会将amInfos中的amInfo构建一个EventType.AM_STARTED事件类型的JobHistoryEvent事件,这里有些疑问,一个mr会有多个amInfo?而且不是因为重试???

如果期间没有发生error,则触发JobEventType.JOB_INIT事件类型,由jobEventDispatcher.handle(initJobEvent)处理,最终调用的是JobImpl.handle交由状态机进行处理,job原先状态是NEW,则JobEventType.JOB_INIT的处理类是InitTransition,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public JobStateInternal transition(JobImpl job, JobEvent event) {
...
// 创建job context
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}

try {
setup(job);
job.fs = job.getFileSystem(job.conf);

//log to job history
// EventType.JOB_SUBMITTED
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
// 根据在JobSubmitter.submitJobInternal中划分的文件来切分task
// 读取文件切分元信息
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
// mapWeight 和 reduceWeight 用来计算mr的进度
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
...
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
// 判断该job是否可以在uber模式下运行
job.makeUberDecision(inputLength);

job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
// 这里错误率,是指可以容忍一些map失败,而mr忽略这些失败的task,继续正常执行????
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

// create the Tasks but don't start them yet
// 创建map reduce task对象
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
return JobStateInternal.INITED;
} catch (Exception e) {
...
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}

job init的时候会根据文件split的元数据信息创建map和reduce task,此时并没有启动这些task

initJob执行结束之后,返回给serviceStart目前job的状态,成功返回INITED,然后继续执行,启动在serviceInit中添加的service,然后执行startJob

1
2
3
4
5
6
7
8
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobStartEvent(job.getID(),
recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
// 触发job execution
dispatcher.getEventHandler().handle(startJobEvent);
}

startJobs主要是创建一个JobEventType.JOB_START事件类型,由JobImpl的状态机进行处理。
startJobs之前job的状态为INITED,则由StartTransition进行处理,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* This transition executes in the event-dispatcher thread, though it's
* triggered in MRAppMaster's startJobs() method.
*/
public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != 0) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
}
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
job.numMapTasks, job.numReduceTasks,
job.getState().toString(),
job.isUber());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
// CommitterEventHandler.handle CommitterEventType.JOB_SETUP
job.eventHandler.handle(new CommitterJobSetupEvent(
job.jobId, job.jobContext));
}

serviceStart跳用startJobs之后,就完成了job的启动,MRAppMaster也启动完毕,触发job execution,使job之后会在另一个线程中进行状态机的转换,随后就是MRAppMaster中的各个服务来配合job来进行工作。

在StartTransition中,创建CommitterEventType.JOB_SETUP事件类型,由CommitterEventHandler.handle处理,进行job的准备工作。

最后这里需要注意下StartTransition上的注释,说StartTransition.transition是在事件异步调度器线程中执行的,也就是说现在job以后的流程已经和MRAppMaster分离了。
MRAppMaster在startJobs中异步调用dispatcher.getEventHandler().handle(startJobEvent),然后返回到serviceStart,此时MRAppMaster启动成功,之后就是以服务的形态存活。

下一篇接着介绍job在接收到JobEventType.JOB_START事件类型之后由INITED状态变为SETUP之后的流程。

本篇只介绍了MRAppMaster的启动过程,并没有把MRAppMaster在MR执行过程的作用介绍完整,随后几篇我会顺着MR的执行流程来介绍MRAppMaster的具体作用。