上一篇 提到RMContainerAllocator既是一个service也是一个eventHandle,并且简单介绍了下作为eventHandle的功能,现在来介绍下作为service服务的功能。
RMContainerAllocator继承RMContainerRequestor类,RMContainerRequestor又继承自RMCommunicator,RMCommunicator类在代码中的注释是Registers/unregisters to RM and sends heartbeats to RM.
RMContainerAllocator是一个服务,在MRAppMaster.serviceInit中添加到MRAppMaster中,并且在serviceStart中启动该服务,启动时首先调用init,然后调用start。 RMContainerAllocator.serviceInit主要是一些属性值的设置,重点看下serviceStart方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected void serviceStart () throws Exception { this .eventHandlingThread = new Thread() { @SuppressWarnings("unchecked") @Override public void run () { ContainerAllocatorEvent event; while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = RMContainerAllocator.this .eventQueue.take(); } catch (InterruptedException e) { ... return ; } try { handleEvent(event); } catch (Throwable t) { ... return ; } } } }; this .eventHandlingThread.start(); super .serviceStart(); }
serviceStart中启动了一个eventHandlingThread线程,用于处理eventQueue中的事件,此时看RMContainerAllocator类的主要功能是eventHandle,接着继续看它调用的父类super.serviceStart()
,该父类是RMCommunicator
1 2 3 4 5 6 7 8 9 10 11 12 protected void serviceStart () throws Exception { scheduler= createSchedulerProxy(); JobID id = TypeConverter.fromYarn(this .applicationId); JobId jobId = TypeConverter.toYarn(id); job = context.getJob(jobId); register(); startAllocatorThread(); super .serviceStart(); }
有两个重要方法,分别是register
和startAllocatorThread
,其实RMContainerAllocator作为service的主要功能就是注册 和心跳 。
注册 register是向rm注册此mr的am,startAllocatorThread是启动am于rm之间心跳的线程。register代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void register () { ... try { RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); ... RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request); isApplicationMasterRegistered = true ; ... } catch (Exception are) { LOG.error("Exception while registering" , are); throw new YarnRuntimeException(are); } }
register是向ApplicationMasterService
注册am服务,通过rpc协议ApplicationMasterProtocol
调用ApplicationMasterService.registerApplicationMaster
向ApplicationMasterService进行注册,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public RegisterApplicationMasterResponse registerApplicationMaster ( RegisterApplicationMasterRequest request) throws YarnException, IOException { ... synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (hasApplicationMasterRegistered(applicationAttemptId)) { ... } this .amLivelinessMonitor.receivedPing(applicationAttemptId); lastResponse.setResponseId(0 ); lock.setAllocateResponse(lastResponse); LOG.info("AM registration " + applicationAttemptId); this .rmContext .getDispatcher() .getEventHandler() .handle( new RMAppAttemptRegistrationEvent(applicationAttemptId, request .getHost(), request.getRpcPort(), request.getTrackingUrl())); RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM, "ApplicationMasterService" , appID, applicationAttemptId); RegisterApplicationMasterResponse response = recordFactory .newRecordInstance(RegisterApplicationMasterResponse.class); ... response.setSchedulerResourceTypes(rScheduler .getSchedulingResourceTypes()); return response; } }
am向ApplicationMasterService注册时registerApplicationMaster方法是非现场安全的,则注册时需要获取AllocateResponseLock对象锁,其注册的流程是更新am在AMLivelinessMonitor中的生命时钟 ,设置responseId为0,然后构造一个RMAppAttemptEventType.REGISTERED事件类型的事件,由异步调度器分发给相应的Eventhandle处理。 此时注册过程结束,构建一个response返回给客户端。
心跳 am注册结束之后,回到RMCommunicator.serviceStart
中,执行startAllocatorThread
启动一个心跳线程,周期性的向ApplicationMasterService
进行心跳。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected void startAllocatorThread () { allocatorThread = new Thread(new Runnable() { @Override public void run () { while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { Thread.sleep(rmPollInterval); try { heartbeat(); } catch (YarnRuntimeException e) { ... } lastHeartbeatTime = context.getClock().getTime(); executeHeartbeatCallbacks(); } catch (InterruptedException e) { ... } } } }); allocatorThread.setName("RMCommunicator Allocator" ); allocatorThread.start(); }
这里启动一个线程,将线程命名为RMCommunicator Allocator ,每隔rmPollInterval时间,执行一次heartbeat
。此文中RMCommunicator抽象类的子类是RMContainerAllocator,则heartbeat的实现方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected synchronized void heartbeat () throws Exception { scheduleStats.updateAndLogIfChanged("Before Scheduling: " ); List<Container> allocatedContainers = getResources(); if (allocatedContainers != null && allocatedContainers.size() > 0 ) { scheduledRequests.assign(allocatedContainers); } ... if ((lastCompletedTasks != completedTasks) || (scheduledRequests.maps.size() > 0 )) { lastCompletedTasks = completedTasks; recalculateReduceSchedule = true ; } if (recalculateReduceSchedule) { preemptReducesIfNeeded(); scheduleReduces( getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false ; } scheduleStats.updateAndLogIfChanged("After Scheduling: " ); }
heartbeat主要用于am和rm之间周期性的通信,以告知rm此am是否存活,但heartbeat中不止只是生命心跳,在发送的request中包含请求资源的列表ask 和需要释放的资源列表release ,返回的response包含了allocated的资源 和*已经完成的container(是由上次请求中release列表中得到的)*。 返回的container资源此时并没有分配给task,而是通过scheduledRequests.assign
将container资源分配给具体的task。 heartbeat中通过getResources()
通过Scheduler获取集群资源,查看下getResources的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private List<Container> getResources () throws Exception { ... try { response = makeRemoteRequest(); retrystartTime = System.currentTimeMillis(); } catch (ApplicationAttemptNotFoundException e ) { ... } ... List<Container> newContainers = response.getAllocatedContainers(); ... List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); ... computeIgnoreBlacklisting(); handleUpdatedNodes(response); for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont.getContainerId()); TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); if (attemptID == null ) { LOG.error("Container complete event for unknown container id " + cont.getContainerId()); } else { pendingRelease.remove(cont.getContainerId()); assignedRequests.remove(attemptID); eventHandler.handle(createContainerFinishedEvent(cont, attemptID)); String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); } } return newContainers; }
一看getResources这个名字就知道这里会有个远程rpc调用从rm处获取资源,方法为makeRemoteRequest
,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected AllocateResponse makeRemoteRequest () throws YarnException, IOException { ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), new ArrayList<String>(blacklistRemovals)); AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super .getApplicationProgress(), new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release), blacklistRequest); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; clusterNmCount = allocateResponse.getNumClusterNodes(); ... return allocateResponse; }
makeRemoteRequest利用ask和release列表以及blackList构建一个AllocateRequest
对象,用于向ApplicationMasterService请求分配资源。 这里需要注意的是ask中的所有请求和release列表中container会在每次心跳时发送给ApplicationMasterService,然后此时从ApplicationMasterService返回的response中包含的已分配的container信息并没有此次ask中的请求信息,而是在此次心跳之前发送给ApplicationMasterService的container请求对应的资源。
makeRemoteRequest返回ApplicationMasterService的response,response中包括响应id、当前集群可用的资源和集群的节点个数,还有最关键的分配的container列表以及finishContainer列表 。 在getResources
中拿到newContainers
和finishedContainers
,对newContainers设置一些Tokens,遍历finishedContainers,从assignedRequests
将finishedContainer移除,并触发一个TaskAttemptEvent
事件。 在getResources中只对finishedContainers进行了处理,对新得到的container资源并没有分配给具体的task。从getResource方法返回到heartbeat
中,调用scheduledRequests.assign(allocatedContainers)
给task分配资源。具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void assign (List<Container> allocatedContainers) { Iterator<Container> it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); containersAllocated += allocatedContainers.size(); ... assignContainers(allocatedContainers); it = allocatedContainers.iterator(); while (it.hasNext()) { Container allocated = it.next(); LOG.info("Releasing unassigned and invalid container " + allocated + ". RM may have assignment issues" ); containerNotAssigned(allocated); } }
assign在分配资源之前会先过滤下allocatedContainers列表中的资源,对于无法满足request的container和在黑名单中的container从列表中移除,对过滤过的allocatedContainers
调用assignContainers
进行分配,最后对没有分配的剩余container进行释放。assignContainers代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void assignContainers (List<Container> allocatedContainers) { Iterator<Container> it = allocatedContainers.iterator(); while (it.hasNext()) { Container allocated = it.next(); ContainerRequest assigned = assignWithoutLocality(allocated); if (assigned != null ) { containerAssigned(allocated, assigned); it.remove(); } } assignMapsWithLocality(allocatedContainers); }
分配container时,先对allocatedContainers进行遍历,遍历中对不需要本地性 的container进行分配,(不需要本地性的container为失败之后重跑的map和reduce ),遍历结束之后调用assignMapsWithLocality
将剩余的container分配给map。 这里可以看出失败之后重跑的map和reduce的运行优先级比map的运行优先级高,防止集群中的job都在等待reduce资源,而无法使job结束释放资源。
看下assignMapsWithLocality方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private void assignMapsWithLocality (List<Container> allocatedContainers) { Iterator<Container> it = allocatedContainers.iterator(); while (it.hasNext() && maps.size() > 0 ){ Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); String host = allocated.getNodeId().getHost(); LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); while (list != null && list.size() > 0 ) { ... TaskAttemptId tId = list.removeFirst(); if (maps.containsKey(tId)) { ContainerRequest assigned = maps.remove(tId); containerAssigned(allocated, assigned); it.remove(); ... break ; } } } it = allocatedContainers.iterator(); while (it.hasNext() && maps.size() > 0 ){ ... String host = allocated.getNodeId().getHost(); String rack = RackResolver.resolve(host).getNetworkLocation(); LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); while (list != null && list.size() > 0 ) { TaskAttemptId tId = list.removeFirst(); if (maps.containsKey(tId)) { ContainerRequest assigned = maps.remove(tId); containerAssigned(allocated, assigned); ... break ; } } } it = allocatedContainers.iterator(); while (it.hasNext() && maps.size() > 0 ){ Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); TaskAttemptId tId = maps.keySet().iterator().next(); ContainerRequest assigned = maps.remove(tId); containerAssigned(allocated, assigned); it.remove(); ... } }
assignMapsWithLocality先对allocatedContainers列表遍历一边,将container资源进行host本地性分配,host本地性是指container所在的节点上是否有container request,如果有则将request从maps中取出,调用containerAssigned
进行资源分配。 host本地性分配完之后,再次遍历allocatedContainers列表,将剩下的container针对rack本地性进行分配,此次遍历结束之后,还需进行一次遍历,对剩下的container进行不考虑本地性的分配。
container分配给map还是reduce都是调用containerAssigned
,看下具体代码:
1 2 3 4 5 6 7 8 9 10 11 12 private void containerAssigned (Container allocated, ContainerRequest assigned) { decContainerReq(assigned); eventHandler.handle(new TaskAttemptContainerAssignedEvent( assigned.attemptID, allocated, applicationACLs)); assignedRequests.add(allocated, assigned.attemptID); ... }
总结 本篇主要介绍了RMContainerAllocator作为一个service时的相关逻辑,这部分逻辑大部分是在RMContainerRequestor
中实现的,而RMContainerRequestor又继承自RMCommunicator
。
RMContainerAllocator作为service时的主要功能为注册 和心跳 。 注册时由AM向ApplicationMasterService
发出请求,在注册的同时会更新下AMLivelinessMonitor
中的存活时间,并触发RMAppAttemptEventType.REGISTERED事件类型的事件,由异步调度器分发给相应的Eventhandle处理。 注册比较简单,心跳则有一个专门的线程进行周期性的执行。 心跳不止只是用来告诉rm此am是否存活,更重要的是在心跳中会向rm发送申请container的请求,并将接收到的container分配给task。am发送的申请container请求可以认为是异步的,因为一次心跳过程中的发送当前所需的container资源,然后接收到的container响应是之前心跳中发送的container请求,而不是此次心跳中发送的container请求对应的响应,此次心跳中所需的container需在下次心跳或者下下次心跳中才能得到响应 。
结合前面几篇关于ApplicationMaster的文章,目前已经把ApplicationMaster与RM之前的相关的流程已梳理完毕了(ApplicationMaster与NM之前的流程随后梳理 ),其中有几个关键类ApplicationMaster、AMLivelinessMonitor、ApplicationMasterLauncher和ApplicationMasterService,下面将会有一篇文章综合介绍下这4个类之间的关系。