[上篇](http://bigdatadecode.top/YARN源码分析之Fair Scheduler part1.html)介绍了Fair Scheduler中FairShare和SteadyFairShare的计算方法,本篇主要介绍其抢占相关的知识。
抢占在YARN中经常发生,但其在什么时候发生,抢占谁的资源,又将抢占后的资源分配给谁,这篇文章就尝试着去解释这些问题。
本篇主要将在Fair Scheduler调度中发生的抢占。先介绍几个相关配置。
与抢占相关的配置 yarn-site.xml
yarn.scheduler.fair.preemption
Fair调度时是否开启抢占功能。默认为false
yarn.scheduler.fair.preemption.cluster-utilization-threshold
集群中使用的资源超过此阈值时,才发生抢占。默认是0.8f
还有几个配置貌似没有开放设置,
fair-scheduler.xml 在fair-scheduler.xml中可以单独对某个队列设置抢占时间和阈值,也可以设置一个全局的抢占时间和阈值。 对队列单独设置是在queue中设置,参数如下:
fairSharePreemptionTimeout
队列所分配的资源小于fairShare*fairSharePreemptionThreshold
的时间超过了这个阈值,此队列将从别的队列中抢占资源。如果不设置,此队列该属性的值将继承父队列的设置。
fairSharePreemptionThreshold
the fair share preemption threshold for the queue. 如果队列等待了fairSharePreemptionTimeout时间,队列中的资源依然少于fairSharePreemptionThreshold*fairShare
,则从别的队列中抢占。如果不设置,此队列该属性的值将继承父队列的设置。
全局抢占属性设置的参数如下:
defaultFairSharePreemptionTimeout
设置root队列发生fairShare抢占的时间阈值,被fairSharePreemptionTimeout重写
defaultMinSharePreemptionTimeout
设置root队列发生minShare抢占的时间阈值,被minSharePreemptionTimeout重写
defaultFairSharePreemptionThreshold
设置root队列发生fairShare抢占资源的阈值,被fairSharePreemptionThreshold重写。也就是说某个队列的资源低于fairSharePreemptionThreshold阈值的时间超过FairSharePreemptionTimeout时,则发生抢占。
测试代码 先看代码吧,代码有点长,按照程序流程一个方法一个方法解析吧,最后再把整个代码贴上备用。
yarn在启动时需要加载配置文件,也就是configuration,那么我们的第一步也就是初始化发fair Scheduler发生抢占的环境。相关的环境配置是在createConfiguration
中,代码如下:
1 2 3 4 5 6 7 8 9 10 protected Configuration createConfiguration () { Configuration conf = super .createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true ); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); return conf; }
fair调度中默认抢占功能是关闭的,要想让fair调度使用抢占功能需要更改配置yarn.scheduler.fair.preemption
,除此之外还有一个参数来控制yarn.scheduler.fair.preemption.cluster-utilization-threshold
,该参数控制着集群的资源使用率达到多少之后可以发生抢占行为 。该参数在createConfiguration
中没有设置,而是在随后的startResourceManager
方法中设置的。下面就来看下startResourceManager
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void startResourceManager (float utilizationThreshold) { conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, utilizationThreshold); resourceManager = new MockRM(conf); resourceManager.start(); scheduler = (FairScheduler)resourceManager.getResourceScheduler(); scheduler.setClock(clock); scheduler.updateInterval = 60 * 1000 ; }
startResourceManager传进去个参数,该参数赋值给yarn.scheduler.fair.preemption.cluster-utilization-threshold
用来控制集群发生抢占的时机。
fair调度则需要启动一个RM用来进行资源管理,这里new一个MockRM
,MockRM是一个虚拟的RM,只是为了方便测试,Hadoop中有很多类似的类,极大的方便了代码的测试。
启动RM之后就是想RM注册NM,即为集群添加资源,然后app申请资源,请看代码registerNodeAndSubmitApp
,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private ApplicationAttemptId registerNodeAndSubmitApp ( int memory, int vcores, int appContainers, int appMemory) { RMNode node1 = MockNodes.newNodeInfo( 1 , Resources.createResource(memory, vcores), 1 , "node1" ); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); System.out.println("resources in the cluster : " + scheduler.rootMetrics.getAvailableMB()); ApplicationAttemptId app1 = createSchedulingRequest(appMemory, "queueA" , "user1" , appContainers); scheduler.update(); for (int i = 0 ; i < appContainers; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); } FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA" , false ); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); System.out.println("rest resources of cluster : " + (memory - appContainers * appMemory) + " , " + scheduler.rootMetrics.getAvailableMB()); return app1; }
下面看下主代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 @Test public void testPreemptionWithFreeResources () throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>" ); out.println("<allocations>" ); out.println("<queue name=\"default\">" ); out.println("<maxResources>0mb,0vcores</maxResources>" ); out.println("</queue>" ); out.println("<queue name=\"queueA\">" ); out.println("<weight>1</weight>" ); out.println("<minResources>1024mb,0vcores</minResources>" ); out.println("</queue>" ); out.println("<queue name=\"queueB\">" ); out.println("<weight>1</weight>" ); out.println("<minResources>1024mb,0vcores</minResources>" ); out.println("</queue>" ); out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>" ); out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>" ); out.println("</allocations>" ); out.close(); startResourceManager(0f ); ApplicationAttemptId app1 = registerNodeAndSubmitApp(4 * 1024 , 4 , 4 , 1024 ); ApplicationAttemptId app2 = createSchedulingRequest(1024 , "queueB" , "user1" , 1 , 1 ); scheduler.update(); clock.tick(6 ); FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA" , false ); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); scheduler.preemptTasksIfNecessary(); System.out.println("preemptResources() should have been called " + 1024 + " , " + scheduler.getSchedulerApp(app1).getPreemptionContainers().iterator().next().getContainer().getResource()); System.out.println("====================================================================" ); clock.tick(18 ); scheduler.preemptTasksIfNecessary(); System.out.println("preemptResources() should have been called " + 1024 + " , " + scheduler.getSchedulerApp(app1).getPreemptionContainers()); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); resourceManager.stop(); }
整个测试抢占的流程就结束了。大致思路是,startResourceManager时设置集群可以发生抢占的时机(例子设置的阈值是0),然后向集群注册了一个4g的node1,又想集群queueA提交了app1(4个container,一个container占1024MB),app1占满整个集群资源,此时app2(1个container,1024MB)提交给queueB,模拟等待6s(defaultMinSharePreemptionTimeout是5s,queueB此时不满足minShare,需要抢占),调用preemptTasksIfNecessary查看是否要抢占(查看是否要抢占在真实环境中其实是一个线程不断的在check),最后再等待18s(yarn.scheduler.fair.waitTimeBeforeKill设置的是15s,超过此值才会将containerkill掉释放资源),然后查看queueA队列被抢占前后状态的变化。
上面代码运行结果如下:
1 2 3 4 5 6 7 8 resources in the cluster : 4096 queueA : 4096, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:4096, vCores:4>, preempt : null rest resources of cluster : 0 queueA : 2048, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:4096, vCores:4>, preempt : container_0_0001_01_000004 preemptResources() should have been called 1024 , <memory:1024, vCores:1> ==================================================================== preemptResources() should have been called 1024 , [] queueA : 2048, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:3072, vCores:3>, preempt : container_0_0001_01_000003
从运行结果中可以看出初期集群被queueA中的app1全占,然后queueB中的app2提交,而queueB的资源在5s之后仍然低于minShare(1024),则发生抢占,从queueA中抢占1024满足minShare,等待时间超过15s之后,依然没有资源被释放则kill掉container_0_0001_01_000004,queueA中running的资源变为3072,说明抢占成功。
抢占进阶 上一节说到了queueB从queueA抢占1024MB,那么假如集群中有多个队列,那么应该怎样选择从哪个队列抢占,又应该抢占多少呢?带着这个问题再去看下代码。
与抢占相关的代码是scheduler.preemptTasksIfNecessary()
,那么就先看下这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected synchronized void preemptTasksIfNecessary () { if (!shouldAttemptPreemption()) { return ; } long curTime = getClock().getTime(); if (curTime - lastPreemptCheckTime < preemptionInterval) { return ; } lastPreemptCheckTime = curTime; Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { preemptResources(resToPreempt); } }
preemptTasksIfNecessary用来检查哪些队列需要进行tasks的抢占,计算共有多少个task需要被抢占,并从中选择。
判断队列是否要进行抢占的条件是当队列的资源低于minShare的时间超过minSharePreemptionTimeout或者低于fairShare的时间超过fairSharePreemptionTimeout时就认为该队列需要去抢占别的队列的资源 。
在检查队列是否需要抢占时,得先校验下大环境是否允许抢占发生,这个是在shouldAttemptPreemption
中校验的,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 private boolean shouldAttemptPreemption () { if (preemptionEnabled) { return (preemptionUtilizationThreshold < Math.max( (float ) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), (float ) rootMetrics.getAllocatedVirtualCores() / clusterResource.getVirtualCores())); } return false ; }
只有在集群允许发生抢占的前提下才有可能发生抢占,而判断集群是否允许发生抢占的条件是yarn.scheduler.fair.preemption为true,并且集群资源的使用率超过了yarn.scheduler.fair.preemption.cluster-utilization-threshold
当集群允许发生抢占时,遍历所有的队列,找出需要抢占的队列并计算出需要抢占多少资源。这部分代码逻辑在resToPreempt
中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected Resource resToPreempt (FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; LOG.info(message); } return resToPreempt; }
resToPreempt计算当前队列需要抢占多少资源,计算规则为: 假设队列所需的资源大于minShare和fairShare
如果此队列的资源低于minShare的时间超过minSharePreemptionTimeout,则应该抢占minShare和正在使用资源的差值。
如果此队列的资源低于fairShare的时间超过fairSharePreemptionTimeout,则应该抢占fairShare和正在使用资源的差值。
如果上述两个条件都满足,则抢占两者较大的数。
resToPreempt决定了该队列是否应该去抢占,应该抢多少,那么从哪抢占是在哪决定的?答案是preemptResources
,看下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 protected void preemptResources (Resource toPreempt) { long start = getClock().getTime(); if (Resources.equals(toPreempt, Resources.none())) { return ; } Iterator<RMContainer> warnedIter = warnedContainers.iterator(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); if ((container.getState() == RMContainerState.RUNNING || container.getState() == RMContainerState.ALLOCATED) && Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } try { for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { app.resetPreemptedResources(); } } while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { RMContainer container = getQueueManager().getRootQueue().preemptContainer(); if (container == null ) { break ; } else { warnOrKillContainer(container); warnedContainers.add(container); Resources.subtractFrom( toPreempt, container.getContainer().getResource()); } } } finally { for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { app.clearPreemptedResources(); } } } long duration = getClock().getTime() - start; fsOpDurations.addPreemptCallDuration(duration); }
资源的抢占是通过挑选队列,然后在队列中选app,最后在此app上选一个container上,队列的选取是从root队列开始,然后一层一层的选取,直到一个候选Application。
抢占策略根据不同的调度策略而不同: (1) fair/drf 选择超过fairShare最多的队列 (2) fifo 选择最近提交的app的队列 在app中选择container是根据container的优先级,抢占优先级最低的container,container的优先级是由数字标识的,数字越大优先级越低。
这个挑选流程的入口函数是getQueueManager().getRootQueue().preemptContainer()
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public RMContainer preemptContainer () { RMContainer toBePreempted = null ; FSQueue candidateQueue = null ; Comparator<Schedulable> comparator = policy.getComparator(); for (FSQueue queue : childQueues) { if (candidateQueue == null || comparator.compare(queue, candidateQueue) > 0 ) { candidateQueue = queue; } } if (candidateQueue != null ) { toBePreempted = candidateQueue.preemptContainer(); } return toBePreempted; }
FSParentQueue.preemptContainer从root队列选择超过fairShare最多的一个队列作为候选队列,如果此候选队列依然是父队列则继续调用FSParentQueue.preemptContainer形成递归,如果是叶子队列则调用FSLeafQueue.preemptContainer,从队列中选择一个app。
选择队列和app时涉及到一个比较器,这个比较器根据不同的策略实现不一样,这里主要介绍下FairShareComparator
,FairShareComparator是FairSharePolicy.java的内部类,实现了Comparator接口,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private static class FairShareComparator implements Comparator <Schedulable >, Serializable { private static final long serialVersionUID = 5564969375856699313L ; private static final Resource ONE = Resources.createResource(1 ); @Override public int compare (Schedulable s1, Schedulable s2) { double minShareRatio1, minShareRatio2; double useToWeightRatio1, useToWeightRatio2; Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null , s1.getMinShare(), s1.getDemand()); Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null , s2.getMinShare(), s2.getDemand()); boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null , s1.getResourceUsage(), minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null , s2.getResourceUsage(), minShare2); minShareRatio1 = (double ) s1.getResourceUsage().getMemory() / Resources.max(RESOURCE_CALCULATOR, null , minShare1, ONE).getMemory(); minShareRatio2 = (double ) s2.getResourceUsage().getMemory() / Resources.max(RESOURCE_CALCULATOR, null , minShare2, ONE).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeights().getWeight(ResourceType.MEMORY); useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeights().getWeight(ResourceType.MEMORY); int res = 0 ; if (s1Needy && !s2Needy) res = -1 ; else if (s2Needy && !s1Needy) res = 1 ; else if (s1Needy && s2Needy) res = (int ) Math.signum(minShareRatio1 - minShareRatio2); else res = (int ) Math.signum(useToWeightRatio1 - useToWeightRatio2); if (res == 0 ) { res = (int ) Math.signum(s1.getStartTime() - s2.getStartTime()); if (res == 0 ) res = s1.getName().compareTo(s2.getName()); } return res; } }
该比较器传入两个队列A、B,如果A大于B,则返回1,小于返回-1,等于则返回0。
队列比较时分两种情况, 一种是所用资源低于minShare,另一种情况是高于minShare。 (1) 低于minShare时,
两个队列都低于minShare时,则比较_两个队列挂起的任务数与总任务数的比例_,比例值越低则越应该先得到资源。则比较器认为比例大的队列较大。
只有一个队列低于minShare时,则超过minShare的那个队列较大,因为低于minShare的队列得到资源的优先级高于已经超过minShare队列的优先级。
(2) 高于minShare时,
高于minShare则计算队列所使用的资源与weight的比例,比例大的队列大。
接下来看下FSLeafQueue.preemptContainer
,看下app是怎么从队列中选中的,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public RMContainer preemptContainer () { RMContainer toBePreempted = null ; if (!preemptContainerPreCheck()) { return toBePreempted; } if (LOG.isDebugEnabled()) { LOG.debug("Queue " + getName() + " is going to preempt a container " + "from its applications." ); } Comparator<Schedulable> comparator = policy.getComparator(); FSAppAttempt candidateSched = null ; for (FSAppAttempt sched : runnableApps) { if (candidateSched == null || comparator.compare(sched, candidateSched) > 0 ) { candidateSched = sched; } } if (candidateSched != null ) { toBePreempted = candidateSched.preemptContainer(); } return toBePreempted; }
通过上面的步骤将container从app中选出之后,调用warnOrKillContainer
,判断是将container放入preemptionMap
(这里记录了可以被抢占的container和放入的时间)还是直接kill掉(当超过yarn.scheduler.fair.waitTimeBeforeKill时,则kill掉container释放资源)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 protected void warnOrKillContainer (RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); Long time = app.getContainerPreemptionTime(container); if (time != null ) { if (time + waitTimeBeforeKill < getClock().getTime()) { ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); recoverResourceRequestForContainer(container); completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for premption for " + (getClock().getTime() - time) + "ms)" ); } } else { app.addPreemption(container, getClock().getTime()); } }
当warnOrKillContainer将container杀掉之后,队列就可以抢到资源了,整个抢占过程也就结束了。
总结 在fair环境下抢占时,先看大环境是否可以发生抢占,
判断是否发生抢占的条件是 yarn.scheduler.fair.preemption为true,并且集群资源的使用率超过了yarn.scheduler.fair.preemption.cluster-utilization-threshold
可以发生抢占时,则要判断需要抢占多少资源。抢占多少资源则是将所有的队列都遍历一遍,对每个队列需要抢占的资源进行汇总。在FairScheduler.resToPreempt 方法中。 每个队列要抢占多少资源的计算规则为: 假设队列所需的资源大于minShare和fairShare
如果此队列的资源低于minShare的时间超过minSharePreemptionTimeout,则应该抢占minShare和正在使用资源的差值。
如果此队列的资源低于fairShare的时间超过fairSharePreemptionTimeout,则应该抢占fairShare和正在使用资源的差值。
如果上述两个条件都满足,则抢占两者较大的数。
计算出需要抢占的资源总数之后就是找出抢哪些container。找哪些container的流程是从root队列一层一层的查找可以抢占的队列,然后从队列中找到Application,最后找到可以抢占的container。这个流程的入口函数是FairScheduler.preemptResources
。 preemptResources先遍历warnedContainers (是个list)中的container,如果toPreempt依然有剩余,则循环的从root队列向下一层一层的查找可以抢占的container,并将此container放入list warnedContainers中。 抢占策略根据不同的调度策略而不同: (1) fair/drf 选择超过fairShare最多的队列 (2) fifo 选择最近提交的app的队列 在app中选择container是根据container的优先级,抢占优先级最低的container,container的优先级是由数字标识的,数字越大优先级越低。
Tips
FairSharePreemptionTimeout的值应该大于0.0小于1.0,因为设置为0.0则表示关闭了fairShare抢占功能。
抢占是将queueA的container杀掉进行重新分配给queueX队列,不能将queueA中app1的container杀掉然后再分配给queueA中的app2
SchedulingPolicy [上篇](http://bigdatadecode.top/YARN源码分析之Fair Scheduler part1.html)中介绍了SchedulingPolicy中计算fairShare的逻辑,本篇中介绍了队列排序比较的逻辑,这也就把SchedulingPolicy的功能介绍完了。 包括两个功能: (1)计算fairshare的逻辑 (2)队列排序比较的逻辑(在分配资源,以及在抢占资源时候会进行排序),分配给最需要资源的,以及抢占掉最不需要资源的
附件-完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 public class TestFairSchedulerPreemptionSzw extends FairSchedulerTestBase { private final static String ALLOC_FILE = new File("F:\\test-queues" ).getAbsolutePath(); private MockClock clock; @Override protected Configuration createConfiguration () { Configuration conf = super .createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true ); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); return conf; } @Before public void setup () throws IOException { conf = createConfiguration(); clock = new MockClock(); } @After public void teardown () { if (resourceManager != null ) { resourceManager.stop(); resourceManager = null ; } conf = null ; } private void startResourceManager (float utilizationThreshold) { conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, utilizationThreshold); resourceManager = new MockRM(conf); resourceManager.start(); scheduler = (FairScheduler)resourceManager.getResourceScheduler(); scheduler.setClock(clock); scheduler.updateInterval = 60 * 1000 ; } private ApplicationAttemptId registerNodeAndSubmitApp ( int memory, int vcores, int appContainers, int appMemory) { RMNode node1 = MockNodes.newNodeInfo( 1 , Resources.createResource(memory, vcores), 1 , "node1" ); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); System.out.println("resources in the cluster : " + scheduler.rootMetrics.getAvailableMB()); ApplicationAttemptId app1 = createSchedulingRequest(appMemory, "queueA" , "user1" , appContainers); scheduler.update(); for (int i = 0 ; i < appContainers; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); } FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA" , false ); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); System.out.println("rest resources of cluster : " + (memory - appContainers * appMemory) + " , " + scheduler.rootMetrics.getAvailableMB()); return app1; } @Test public void testPreemptionWithFreeResources () throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>" ); out.println("<allocations>" ); out.println("<queue name=\"default\">" ); out.println("<maxResources>0mb,0vcores</maxResources>" ); out.println("</queue>" ); out.println("<queue name=\"queueA\">" ); out.println("<weight>1</weight>" ); out.println("<minResources>1024mb,0vcores</minResources>" ); out.println("</queue>" ); out.println("<queue name=\"queueB\">" ); out.println("<weight>1</weight>" ); out.println("<minResources>1024mb,0vcores</minResources>" ); out.println("</queue>" ); out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>" ); out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>" ); out.println("</allocations>" ); out.close(); startResourceManager(0f ); ApplicationAttemptId app1 = registerNodeAndSubmitApp(4 * 1024 , 4 , 4 , 1024 ); ApplicationAttemptId app2 = createSchedulingRequest(1024 , "queueB" , "user1" , 1 , 1 ); scheduler.update(); clock.tick(6 ); FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA" , false ); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); scheduler.preemptTasksIfNecessary(); System.out.println("preemptResources() should have been called " + 1024 + " , " + scheduler.getSchedulerApp(app1).getPreemptionContainers().iterator().next().getContainer().getResource()); System.out.println("====================================================================" ); clock.tick(18 ); scheduler.preemptTasksIfNecessary(); System.out.println("preemptResources() should have been called " + 1024 + " , " + scheduler.getSchedulerApp(app1).getPreemptionContainers()); System.out.println( "queueA : " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", running : " + queue.getResourceUsage() + ", preempt : " + queue.preemptContainer()); resourceManager.stop(); } }