上一篇 介绍了MRAppMaster启动时的一些流程。当MRAppMaster启动成功之后,job的状态已由INITED变为SETUP,并且在StartTransition.transition
中构建了CommitterEventType.JOB_SETUP
事件类型,由CommitterEventHandler.handle
进行处理。
CommitterEventType.JOB_SETUP事件类型的处理器在MRAppMaster中作为一个服务添加到MRAppMaster服务中。
来看下具体的流程
1 2 3 4 5 6 7 8 9 10 11 12 job.eventHandler.handle(new CommitterJobSetupEvent(job.jobId, job.jobContext)); public void handle (CommitterEvent event) { try { eventQueue.put(event); } catch (InterruptedException e) { throw new YarnRuntimeException(e); } }
上面提到CommitterEventHandler
是作为一个Service添加到MRAppMaster服务中的,显然它继承了AbstractService
,然而它又能处理事件,则它实现了EventHandler
接口。在其handle
方法中将event放入事件队列eventQueue
中,eventQueue是LinkedBlockingQueue
类型的阻塞队列,是线程安全的。
CommitterEventHandler通过handle将event放入eventQueue中,在serviceStart
中会启动一个线程从eventQueue中take事件然后交给ThreadPoolExecutor去执行。具体的执行代码在EventProcessor.run
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void run () { LOG.info("Processing the event " + event.toString()); switch (event.getType()) { case JOB_SETUP: handleJobSetup((CommitterJobSetupEvent) event); break ; case JOB_COMMIT: handleJobCommit((CommitterJobCommitEvent) event); break ; case JOB_ABORT: handleJobAbort((CommitterJobAbortEvent) event); break ; case TASK_ABORT: handleTaskAbort((CommitterTaskAbortEvent) event); break ; default : throw new YarnRuntimeException("Unexpected committer event " + event.toString()); } } protected void handleJobSetup (CommitterJobSetupEvent event) { try { committer.setupJob(event.getJobContext()); context.getEventHandler().handle( new JobSetupCompletedEvent(event.getJobID())); } catch (Exception e) { LOG.warn("Job setup failed" , e); context.getEventHandler().handle(new JobSetupFailedEvent( event.getJobID(), StringUtils.stringifyException(e))); } }
CommitterEventHandler处理CommitterEventType.JOB_SETUP事件类型时,会给job创建临时目录 存放mr所需的文件,创建成果之后触发JobEventType.JOB_SETUP_COMPLETED
事件类型,失败则触发JobEventType.JOB_SETUP_FAILED
。
JobEventType.JOB_SETUP_COMPLETED
和JobEventType.JOB_SETUP_FAILED
都是JobImpl状态机中的状态,此时job的状态是SETUP,当OutputCommitter把job所需的临时目录创建成功之后,触发JobEventType.JOB_SETUP_COMPLETED
,由SetupCompletedTransition
进行处理,看下代码:
1 2 3 4 5 6 7 8 9 public void transition (JobImpl job, JobEvent event) { job.setupProgress = 1.0f ; job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0 ); job.scheduleTasks(job.reduceTasks, true ); ... }
SetupCompletedTransition处理之后job的状态变为RUNNING ,其处理逻辑是调度task,task对象是在InitTransition.transition
中调用createMapTasks
和createReduceTasks
中创建的。scheduleTasks
对每个task构造一个TaskEventType.T_SCHEDULE
类型的TaskEvent事件,并通过异步调度器分发出去,由每个具体的task进行执行,map task之间没有执行顺序上的依赖,这样map task就达到了并行的目的。 看下相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected void scheduleTasks (Set<TaskId> taskIDs, boolean recoverTaskOutput) { for (TaskId taskID : taskIDs) { TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID); if (taskInfo != null ) { eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo, committer, recoverTaskOutput)); } else { eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE)); } } }
处理TaskEvent
事件的handle是在MRAppMaster中注册到异步调度器dispatcher中的TaskEventDispatcher
,其handle中调用了((EventHandler<TaskEvent>)task).handle(event)
,进入了task状态机的转换流程中。 task在createMapTasks中初始化状态是NEW,TaskEventType.T_SCHEDULE
事件类型被触发之后,由InitialScheduleTransition
使其状态转为SCHEDULED
。看下InitialScheduleTransition的代码:
1 2 3 4 5 6 7 public void transition (TaskImpl task, TaskEvent event) { task.addAndScheduleAttempt(Avataar.VIRGIN); task.scheduledTime = task.clock.getTime(); task.sendTaskStartedEvent(); }
yarn中实质性执行操作的都是attempt,如job是jobAttempt,task是taskAttempt,使用attempt是考虑到了失败重试和推测执行(Speculative Execution)。 所以task调度时先创建一个attempt并调度,然后向jobhistory发送EventType.TASK_STARTED
事件类型。重点看下addAndScheduleAttempt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void addAndScheduleAttempt (Avataar avataar) { TaskAttempt attempt = addAttempt(avataar); inProgressAttempts.add(attempt.getID()); if (failedAttempts.size() > 0 ) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_SCHEDULE)); } }
触发TaskAttemptEventType.TA_SCHEDULE
事件类型的TaskAttemptEvent
,并通过异步调度器将其分发给TaskAttemptEventDispatcher
,调用TaskAttemptImpl.handle
进行处理。 在scheduleTasks
中如此循环,将map task调度结束之后,开始调度reduce task。由于reduce依赖map的执行进度,这里先不对reduce task进行展开,我们继续跟踪下map task的attempt状态转换。
触发TaskAttemptEventType.TA_SCHEDULE
事件类型时task attempt的状态时NEW,对应的handle为RequestContainerTransition
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void transition (TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { taskAttempt.eventHandler.handle (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1 )); if (rescheduled) { taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, taskAttempt.resourceCapability)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( new String[taskAttempt.dataLocalRacks.size()]))); } }
container的请求中包含resource大小、数据所在hosts数组和机架数组,以保证计算的本地性。
这里的eventHandler依然是在MRAppMaster中的异步调度器,此时触发的ContainerRequestEvent事件的类型是ContainerAllocator.EventType.CONTAINER_REQ
,由containerAllocator
处理,在上一篇 中containerAllocator根据mr是否为uber状态创建不同的containerAllocator
,这里跟下非uber状态的RMContainerAllocator
,查看RMContainerAllocator.handle代码:
1 2 3 4 5 6 7 8 public void handle (ContainerAllocatorEvent event) { ... try { eventQueue.put(event); } catch (InterruptedException e) { throw new YarnRuntimeException(e); } }
如果你从开头看到这里你会发现这也是一个事件队列eventQueue,(在Hadoop源码中这种方式被广泛使用),同时RMContainerAllocator
既是一个service也是一个eventHandle。 看下serviceStart方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected void serviceStart () throws Exception { this .eventHandlingThread = new Thread() { @SuppressWarnings("unchecked") @Override public void run () { ... while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = RMContainerAllocator.this .eventQueue.take(); } catch (InterruptedException e) { ... } try { handleEvent(event); } catch (Throwable t) { ... } } } }; this .eventHandlingThread.start(); super .serviceStart(); }
serviceStart中创建了一个handle处理线程eventHandlingThread
,此线程负责从eventQueue中取出event传给handleEvent
方法处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected synchronized void handleEvent (ContainerAllocatorEvent event) { recalculateReduceSchedule = true ; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; JobId jobId = getJob().getID(); Resource supportedMaxContainerCapability = getMaxContainerCapability(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { if (mapResourceRequest.equals(Resources.none())) { mapResourceRequest = reqEvent.getCapability(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest .getMemory()))); LOG.info("mapResourceRequest:" + mapResourceRequest); ... } reqEvent.getCapability().setMemory(mapResourceRequest.getMemory()); reqEvent.getCapability().setVirtualCores( mapResourceRequest.getVirtualCores()); scheduledRequests.addMap(reqEvent); } else { } } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { ... } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { ... } }
此时event的事件类型是ContainerAllocator.EventType.CONTAINER_REQ
,在handleEvent中将map task通过scheduledRequests 进行调度。看下scheduledRequests.addMap
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void addMap (ContainerRequestEvent event) { ContainerRequest request = null ; if (event.getEarlierAttemptFailed()) { ... } else { for (String host : event.getHosts()) { LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); if (list == null ) { list = new LinkedList<TaskAttemptId>(); mapsHostMapping.put(host, list); } list.add(event.getAttemptID()); ... } for (String rack: event.getRacks()) { LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); if (list == null ) { list = new LinkedList<TaskAttemptId>(); mapsRackMapping.put(rack, list); } list.add(event.getAttemptID()); ... } request = new ContainerRequest(event, PRIORITY_MAP); } maps.put(event.getAttemptID(), request); addContainerReq(request); }
ScheduledRequests是RMContainerAllocator的一个内部类,这里调用的是addMap
,从字面上看是将map的container request放入集合中,那么什么时候从集合maps
中remove呢?这里先打个标记,随后再介绍。 接下来调用addContainerReq
将request放入ask
集合中,随后的心跳中会发送给RM。先看下addContainerReq代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void addContainerReq (ContainerRequest req) { for (String host : req.hosts) { if (!isNodeBlacklisted(host)) { addResourceRequest(req.priority, host, req.capability); } } for (String rack : req.racks) { addResourceRequest(req.priority, rack, req.capability); } addResourceRequest(req.priority, ResourceRequest.ANY, req.capability); }
这里有点疑惑 ,无论这个请求是对数据的要求是本地性还是本rack更或者是ANY,都会执行addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
这段代码???? addResourceRequest代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void addResourceRequest (Priority priority, String resourceName, Resource capability) { Map<String, Map<Resource, ResourceRequest>> remoteRequests = this .remoteRequestsTable.get(priority); if (remoteRequests == null ) { remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>(); this .remoteRequestsTable.put(priority, remoteRequests); ... } Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); if (reqMap == null ) { reqMap = new HashMap<Resource, ResourceRequest>(); remoteRequests.put(resourceName, reqMap); } ResourceRequest remoteRequest = reqMap.get(capability); if (remoteRequest == null ) { remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); remoteRequest.setPriority(priority); remoteRequest.setResourceName(resourceName); remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0 ); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1 ); addResourceRequestToAsk(remoteRequest); ... }
addResourceRequest的作用是将container request在remoteRequestsTable中记录,只是记录对应request的个数,然后将更新后的ResourceRequest放入ask set中。 调用addResourceRequestToAsk将ResourceRequest放入ask set中,代码如下:
1 2 3 4 5 6 7 8 9 private void addResourceRequestToAsk (ResourceRequest remoteRequest) { if (ask.contains(remoteRequest)) { ask.remove(remoteRequest); } ask.add(remoteRequest); }
到这里看似代码到此断了,实则不然,上文中提到RMContainerAllocator
既是一个service也是一个eventHandle,那么它作为服务时,会有哪些功能呢?
承上启下 本篇从job的状态由INITED变为SETUP之后,又经过一些事件触发使job变为RUNNING状态,开始创建task,并创建container request,将request放入ask中。 接下来就是AM中的RMContainerAllocator服务与RM保持心跳,在下次心跳时向RM申请资源,这部分在下一篇介绍。