上一篇介绍了MRAppMaster启动时的一些流程。当MRAppMaster启动成功之后,job的状态已由INITED变为SETUP,并且在StartTransition.transition中构建了CommitterEventType.JOB_SETUP事件类型,由CommitterEventHandler.handle进行处理。

CommitterEventType.JOB_SETUP事件类型的处理器在MRAppMaster中作为一个服务添加到MRAppMaster服务中。

来看下具体的流程

1
2
3
4
5
6
7
8
9
10
11
12
// StartTransition.transition 构建CommitterEventType.JOB_SETUP事件类型对象
job.eventHandler.handle(new CommitterJobSetupEvent(job.jobId, job.jobContext));

// CommitterEventHandler.handle
public void handle(CommitterEvent event) {
try {
// 将事件放入队列中
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}

上面提到CommitterEventHandler是作为一个Service添加到MRAppMaster服务中的,显然它继承了AbstractService,然而它又能处理事件,则它实现了EventHandler接口。在其handle方法中将event放入事件队列eventQueue中,eventQueue是LinkedBlockingQueue类型的阻塞队列,是线程安全的。

CommitterEventHandler通过handle将event放入eventQueue中,在serviceStart中会启动一个线程从eventQueue中take事件然后交给ThreadPoolExecutor去执行。具体的执行代码在EventProcessor.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {
LOG.info("Processing the event " + event.toString());
switch (event.getType()) {
case JOB_SETUP:
handleJobSetup((CommitterJobSetupEvent) event);
break;
case JOB_COMMIT:
handleJobCommit((CommitterJobCommitEvent) event);
break;
case JOB_ABORT:
handleJobAbort((CommitterJobAbortEvent) event);
break;
case TASK_ABORT:
handleTaskAbort((CommitterTaskAbortEvent) event);
break;
default:
throw new YarnRuntimeException("Unexpected committer event "
+ event.toString());
}
}
// 此时放入的事件类型是CommitterEventType.JOB_SETUP,则在handleJobSetup中处理
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
// Create the temporary directory that is the root of all of the task work directories.
// committer是在MRAppMaster.serviceInit中create的OutputCommitter
committer.setupJob(event.getJobContext());
// JobEventType.JOB_SETUP_COMPLETED
context.getEventHandler().handle(
new JobSetupCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.warn("Job setup failed", e);
context.getEventHandler().handle(new JobSetupFailedEvent(
event.getJobID(), StringUtils.stringifyException(e)));
}
}

CommitterEventHandler处理CommitterEventType.JOB_SETUP事件类型时,会给job创建临时目录存放mr所需的文件,创建成果之后触发JobEventType.JOB_SETUP_COMPLETED事件类型,失败则触发JobEventType.JOB_SETUP_FAILED

JobEventType.JOB_SETUP_COMPLETEDJobEventType.JOB_SETUP_FAILED都是JobImpl状态机中的状态,此时job的状态是SETUP,当OutputCommitter把job所需的临时目录创建成功之后,触发JobEventType.JOB_SETUP_COMPLETED,由SetupCompletedTransition进行处理,看下代码:

1
2
3
4
5
6
7
8
9
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
// SETUP 到 RUNNING时,开始调度task
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
job.scheduleTasks(job.reduceTasks, true);

// If we have no tasks, just transition to job completed
...
}

SetupCompletedTransition处理之后job的状态变为RUNNING,其处理逻辑是调度task,task对象是在InitTransition.transition中调用createMapTaskscreateReduceTasks中创建的。
scheduleTasks对每个task构造一个TaskEventType.T_SCHEDULE类型的TaskEvent事件,并通过异步调度器分发出去,由每个具体的task进行执行,map task之间没有执行顺序上的依赖,这样map task就达到了并行的目的。
看下相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void scheduleTasks(Set<TaskId> taskIDs,
boolean recoverTaskOutput) {
for (TaskId taskID : taskIDs) {
TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
if (taskInfo != null) {
eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
committer, recoverTaskOutput));
} else {
// 开始调度task
// 通过异步调度器将task事件分发出去
eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
}
}
}

处理TaskEvent事件的handle是在MRAppMaster中注册到异步调度器dispatcher中的TaskEventDispatcher,其handle中调用了((EventHandler<TaskEvent>)task).handle(event),进入了task状态机的转换流程中。
task在createMapTasks中初始化状态是NEW,TaskEventType.T_SCHEDULE事件类型被触发之后,由InitialScheduleTransition使其状态转为SCHEDULED。看下InitialScheduleTransition的代码:

1
2
3
4
5
6
7
public void transition(TaskImpl task, TaskEvent event) {
// 调度task attempt,attempt是针对task的
// 申请container
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
task.sendTaskStartedEvent();
}

yarn中实质性执行操作的都是attempt,如job是jobAttempt,task是taskAttempt,使用attempt是考虑到了失败重试和推测执行(Speculative Execution)。
所以task调度时先创建一个attempt并调度,然后向jobhistory发送EventType.TASK_STARTED事件类型。重点看下addAndScheduleAttempt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void addAndScheduleAttempt(Avataar avataar) {
// 将attempt添加到singletonMap的map中
TaskAttempt attempt = addAttempt(avataar);
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
// 为task申请container,开启task attempt状态机循环
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
}

触发TaskAttemptEventType.TA_SCHEDULE事件类型的TaskAttemptEvent,并通过异步调度器将其分发给TaskAttemptEventDispatcher,调用TaskAttemptImpl.handle进行处理。
scheduleTasks中如此循环,将map task调度结束之后,开始调度reduce task。由于reduce依赖map的执行进度,这里先不对reduce task进行展开,我们继续跟踪下map task的attempt状态转换。

触发TaskAttemptEventType.TA_SCHEDULE事件类型时task attempt的状态时NEW,对应的handle为RequestContainerTransition,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void transition(TaskAttemptImpl taskAttempt, 
TaskAttemptEvent event) {
// Tell any speculator that we're requesting a container
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
//request for container
if (rescheduled) {
taskAttempt.eventHandler.handle(
ContainerRequestEvent.createContainerRequestEventForFailedContainer(
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {
// 请求资源
// 请求container中包括dataLocalHosts,来自splitInfo.getLocations()
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability,
taskAttempt.dataLocalHosts.toArray(
new String[taskAttempt.dataLocalHosts.size()]),
taskAttempt.dataLocalRacks.toArray(
new String[taskAttempt.dataLocalRacks.size()])));
}
}

container的请求中包含resource大小、数据所在hosts数组和机架数组,以保证计算的本地性。

这里的eventHandler依然是在MRAppMaster中的异步调度器,此时触发的ContainerRequestEvent事件的类型是ContainerAllocator.EventType.CONTAINER_REQ,由containerAllocator处理,在上一篇中containerAllocator根据mr是否为uber状态创建不同的containerAllocator,这里跟下非uber状态的RMContainerAllocator,查看RMContainerAllocator.handle代码:

1
2
3
4
5
6
7
8
public void handle(ContainerAllocatorEvent event) {
...
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}

如果你从开头看到这里你会发现这也是一个事件队列eventQueue,(在Hadoop源码中这种方式被广泛使用),同时RMContainerAllocator既是一个service也是一个eventHandle。
看下serviceStart方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void serviceStart() throws Exception {
this.eventHandlingThread = new Thread() {
@SuppressWarnings("unchecked")
@Override
public void run() {
...
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
...
}
try {
handleEvent(event);
} catch (Throwable t) {
...
}
}
}
};
this.eventHandlingThread.start();
// 调用父类的serviceStart,开启heartbeat线程
super.serviceStart();
}

serviceStart中创建了一个handle处理线程eventHandlingThread,此线程负责从eventQueue中取出event传给handleEvent方法处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
JobId jobId = getJob().getID();
Resource supportedMaxContainerCapability = getMaxContainerCapability();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceRequest.equals(Resources.none())) {
mapResourceRequest = reqEvent.getCapability();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
.getMemory())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
...
}
// set the resources
// 对reqEvent中资源进行校验之后对其进行设置
reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else {
// reduce task
}
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
...
} else if (
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
...
}
}

此时event的事件类型是ContainerAllocator.EventType.CONTAINER_REQ,在handleEvent中将map task通过scheduledRequests进行调度。看下scheduledRequests.addMap方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void addMap(ContainerRequestEvent event) {
ContainerRequest request = null;
if (event.getEarlierAttemptFailed()) {
...
} else {
// 根据本地性将task attempt放入对应的mapping中
// mapsHostmapping or mapsRackMapping
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
mapsHostMapping.put(host, list);
}
list.add(event.getAttemptID());
...
}
for (String rack: event.getRacks()) {
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
mapsRackMapping.put(rack, list);
}
list.add(event.getAttemptID());
...
}
// 构建一个container request
request = new ContainerRequest(event, PRIORITY_MAP);
}
// 将container request 和对应的attemptID放入maps中
maps.put(event.getAttemptID(), request);
// 将request放入ask集合中
addContainerReq(request);
}

ScheduledRequests是RMContainerAllocator的一个内部类,这里调用的是addMap,从字面上看是将map的container request放入集合中,那么什么时候从集合maps中remove呢?这里先打个标记,随后再介绍。
接下来调用addContainerReq将request放入ask集合中,随后的心跳中会发送给RM。先看下addContainerReq代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void addContainerReq(ContainerRequest req) {
// Create resource requests
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
addResourceRequest(req.priority, host, req.capability);
}
}
// Nothing Rack-local for now
for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability);
}
// 这条是不是无论如何都会执行????
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
}

这里有点疑惑,无论这个请求是对数据的要求是本地性还是本rack更或者是ANY,都会执行addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);这段代码????
addResourceRequest代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void addResourceRequest(Priority priority, String resourceName,
Resource capability) {
// 通过优先级priority得到一个map,map的key/value是resourceName/map
// 也就是该host上对应的request map
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
this.remoteRequestsTable.put(priority, remoteRequests);
...
}
// 然后再通过resourceName得到该host/rack上的request map
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
reqMap = new HashMap<Resource, ResourceRequest>();
remoteRequests.put(resourceName, reqMap);
}
// 最后通过资源的大小得到一个请求
ResourceRequest remoteRequest = reqMap.get(capability);
// 如果该资源大小没有对应的request,则新建一个,注意此时container的num为0
if (remoteRequest == null) {
remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
remoteRequest.setPriority(priority);
remoteRequest.setResourceName(resourceName);
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(remoteRequest);
...
}

addResourceRequest的作用是将container request在remoteRequestsTable中记录,只是记录对应request的个数,然后将更新后的ResourceRequest放入ask set中。
调用addResourceRequestToAsk将ResourceRequest放入ask set中,代码如下:

1
2
3
4
5
6
7
8
9
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// because objects inside the resource map can be deleted ask can end up
// containing an object that matches new resource object but with different
// numContainers. So exisintg values must be replaced explicitly
if(ask.contains(remoteRequest)) {
ask.remove(remoteRequest);
}
ask.add(remoteRequest);
}

到这里看似代码到此断了,实则不然,上文中提到RMContainerAllocator既是一个service也是一个eventHandle,那么它作为服务时,会有哪些功能呢?

承上启下

本篇从job的状态由INITED变为SETUP之后,又经过一些事件触发使job变为RUNNING状态,开始创建task,并创建container request,将request放入ask中。
接下来就是AM中的RMContainerAllocator服务与RM保持心跳,在下次心跳时向RM申请资源,这部分在下一篇介绍。