2017-04-09 03:26:17,113 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1491729774382_0001_000001 State change from SUBMITTED to SCHEDULED 2017-04-09 03:26:17,415 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1491729774382_0001_01_000001 Container Transitioned from NEW to ALLOCATED
AM container是在appattempt的状态由SUBMITTED变为SCHEDULED时初始化的。 appattempt由SUBMITTED变为SCHEDULED状态的处理逻辑为:
publicstaticfinalclassScheduleTransition implements MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event){ ApplicationSubmissionContext subCtx = appAttempt.submissionContext; if (!subCtx.getUnmanagedAM()) { // Need reset #containers before create new attempt, because this request // will be passed to scheduler, and scheduler will deduct the number after // AM container allocated // 设置am container的请求 appAttempt.amReq.setNumContainers(1); appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); // ResourceName为ANY表示任何机架上的任一机器 appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); // 由调度器来分配资源 Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); ... return RMAppAttemptState.SCHEDULED; } else { ... } } }
synchronizedpublicvoidupdateResourceRequests( List<ResourceRequest> requests, boolean recoverPreemptedRequest){ // Update resource requests for (ResourceRequest request : requests) { Priority priority = request.getPriority(); String resourceName = request.getResourceName(); boolean updatePendingResources = false; ResourceRequest lastRequest = null; // 如果request的ResourceName是ResourceRequest.ANY // 只有am container是ANY???不应该吧 if (resourceName.equals(ResourceRequest.ANY)) { ... // ResourceRequest.ANY才置为true?? updatePendingResources = true; // Premature optimization? // Assumes that we won't see more than one priority request updated // in one call, reasonable assumption... however, it's totally safe // to activate same application more than once. // Thus we don't need another loop ala the one in decrementOutstanding() // which is needed during deactivate. if (request.getNumContainers() > 0) { activeUsersManager.activateApplication(user, applicationId); } } // requests是一个请求列表 map // 查看requests中是否已有该优先级的请求 // this.requests中存放的是这个application的request Map<String, ResourceRequest> asks = this.requests.get(priority); // 没有此优先级的请求,则new一个map if (asks == null) { asks = new HashMap<String, ResourceRequest>(); this.requests.put(priority, asks); this.priorities.add(priority); } // asks不为null,查看asks中是否有与此请求ResourceName一样的请求 lastRequest = asks.get(resourceName);
if (recoverPreemptedRequest && lastRequest != null) { // Increment the number of containers to 1, as it is recovering a // single container. request.setNumContainers(lastRequest.getNumContainers() + 1); } // 把原来的请求拿出赋值给lastRequest, // 然后将新的request将入asks中,lastRequest怎么办?在哪处理的? asks.put(resourceName, request); if (updatePendingResources) { // Similarly, deactivate application? if (request.getNumContainers() <= 0) { LOG.info("checking for deactivate... "); checkForDeactivation(); } int lastRequestContainers = lastRequest != null ? lastRequest .getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest .getCapability() : Resources.none(); metrics.incrPendingResources(user, request.getNumContainers(), request.getCapability()); metrics.decrPendingResources(user, lastRequestContainers, lastRequestCapability); } } }
privatesynchronizedvoidallocateContainersToNode(FiCaSchedulerNode node){ ... // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations // 如果有预留container的话先分配预留的container ... // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { // 计算nm上是否还有空闲的资源进行分配container if (calculator.computeAvailableContainers(node.getAvailableResource(), minimumAllocation) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } // 分配container root.assignContainers(clusterResource, node, false); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + " is reserved by application " + node.getReservedContainer().getContainerId().getApplicationAttemptId() ); } }
publicsynchronized CSAssignment assignContainers( Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve){ CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); // if our queue cannot access this node, just return if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, labelManager.getLabelsOnNode(node.getNodeID()))) { return assignment; } while (canAssign(clusterResource, node)) { ... boolean localNeedToUnreserve = false; Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); // Are we over maximum-capacity for this queue? if (!canAssignToThisQueue(clusterResource, nodeLabels)) { // check to see if we could if we unreserve first localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); if (!localNeedToUnreserve) { break; } } // Schedule CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve); assignment.setType(assignedToChild.getType()); ... // Do not assign more than one container if this isn't the root queue // or if we've already assigned an off-switch container if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { if (LOG.isDebugEnabled()) { if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { LOG.debug("Not assigning more than one off-switch container," + " assignments so far: " + assignment); } } break; } } return assignment; }
publicsynchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve){ ... // if our queue cannot access this node, just return if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, labelManager.getLabelsOnNode(node.getNodeID()))) { return NULL_ASSIGNMENT; } // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { return assignReservedContainer(application, node, reservedContainer, clusterResource); } } // Try to assign containers to applications in order for (FiCaSchedulerApp application : activeApplications) { ... // 加锁 synchronized (application) { // Check if this resource is on the blacklist if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { continue; } // Schedule in priority order for (Priority priority : application.getPriorities()) { // 为什么是ANY? // 如果当前application中的request中没有ANY就不分配? // 想办法debug试一下 ResourceRequest anyRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (null == anyRequest) { continue; } // Required resource Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'? if (application.getTotalRequiredResources(priority) <= 0) { continue; } if (!this.reservationsContinueLooking) { if (!needContainers(application, priority, required)) { if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } continue; } } Set<String> requestedNodeLabels = getRequestLabelSetByExpression(anyRequest .getNodeLabelExpression());
// Compute user-limit & set headroom // Note: We compute both user-limit & headroom with the highest // priority request as the target. // This works since we never assign lower priority requests // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, required, requestedNodeLabels); // Check queue max-capacity limit if (!canAssignToThisQueue(clusterResource, required, labelManager.getLabelsOnNode(node.getNodeID()), application, true)) { return NULL_ASSIGNMENT; }
// Check user limit if (!assignToUser(clusterResource, application.getUser(), userLimit, application, true, requestedNodeLabels)) { break; }
// Inform the application it is about to get a scheduling opportunity // 这又是什么鬼?增加调度的机会? application.addSchedulingOpportunity(priority); // Try to schedule // 开始调度 CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, null, needToUnreserve);
// Did the application skip this node? if (assignment.getSkipped()) { // Don't count 'skipped nodes' as a scheduling opportunity! application.subtractSchedulingOpportunity(priority); continue; } // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); if (Resources.greaterThan( resourceCalculator, clusterResource, assigned, Resources.none())) {
// Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, labelManager.getLabelsOnNode(node.getNodeID())); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. // This helps apps with many off-cluster requests schedule faster. if (assignment.getType() != NodeType.OFF_SWITCH) { if (LOG.isDebugEnabled()) { LOG.debug("Resetting scheduling opportunities"); } application.resetSchedulingOpportunities(priority); } // Done return assignment; } else { // Do not assign out of order w.r.t priorities break; } } }
// Create the container if necessary // 生成containerId Container container = getContainer(rmContainer, application, node, capability, priority); ... // 先判断是否可以分配预留的container, // 可以分配正常的container时,才去判断空闲的资源是否可以分配 // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); if (availableContainers > 0) { // Allocate... ... // Inform the application RMContainer allocatedContainer = application.allocate(type, node, priority, request, container);
// Does the application need this resource? if (allocatedContainer == null) { return Resources.none(); } // 通知node进行分配,将container放入launchedContainers map中 // Inform the node node.allocateContainer(allocatedContainer);
return container.getResource(); } else { // if we are allowed to allocate but this node doesn't have space, reserve it or // if this was an already a reserved container, reserve it again ... return Resources.none(); } }
// Add it to allContainers list. // 将生成的container放入allContainers list // 调度器在调度的时候从中取出container newlyAllocatedContainers.add(rmContainer); liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container // 时间调度器来通知container已经准备好,触发container状态机 rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); ... return rmContainer; }