想看fair Scheduler的源码已经很久了,只是一直没有合适的时间,更没有找到合适的方法去读src,导致每次读一点就不想继续读了,最近实在是太无聊了,也不知道该读点什么,于是就又想起了这个搁置很久的任务。
这次读代码时我直接从fair Scheduler的test类中进行跟读,这样我感觉会比较清晰也容易debug。
环境初始化 要想测试Fair Scheduler,首先要虚拟出一个yarn环境,并加载fair相关的配置。 下面就来看下TestFairScheduler 这个类是怎么对环境进行初始化的。
TestFairScheduler继承自FairSchedulerTestBase ,一些公共常用方法被提到FairSchedulerTestBase中,yarn和相关配置环境的初始化在setUp中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void setUp () throws IOException { scheduler = new FairScheduler(); conf = createConfiguration(); resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); resourceManager.getRMContext().getStateStore().start(); resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); scheduler.setRMContext(resourceManager.getRMContext()); } protected Configuration createConfiguration () { Configuration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0 ); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1024 ); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240 ); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false ); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f ); return conf; }
FairShare计算逻辑 测试代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public void testFairShareWithMaxResources () throws IOException { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>" ); out.println("<allocations>" ); out.println("<queue name=\"queueA\">" ); out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>" ); out.println("<weight>.25</weight>" ); out.println("</queue>" ); out.println("<queue name=\"queueB\">" ); out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>" ); out.println("<weight>.75</weight>" ); out.println("</queue>" ); out.println("</allocations>" ); out.close(); scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1 , Resources.createResource(8 * 1024 , 8 ), 1 , "" ); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); createSchedulingRequest(1 * 1024 , "queueA" , "user1" ); createSchedulingRequest(6 * 1024 , "queueB" , "user1" ); scheduler.update(); FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA" , false ); System.out.println( "queueA FairShare: " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare()); queue = scheduler.getQueueManager().getLeafQueue( "queueB" , false ); System.out.println( "queueB FairShare: " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare()); queue = scheduler.getQueueManager().getLeafQueue( "default" , false ); System.out.println( "default FairShare: " + queue.getFairShare().getMemory() + ", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) + ", steadyShare : " + queue.getSteadyFairShare() + ", demand : " + queue.getDemand() + ", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare()); }
1 2 3 queueA FairShare: 2048, weight : 0.25, steadyShare : <memory:1024, vCores:0>, demand : <memory:1024, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0> queueB FairShare: 6144, weight : 0.75, steadyShare : <memory:3072, vCores:0>, demand : <memory:6144, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0> default FairShare: 0, weight : 1.0, steadyShare : <memory:4096, vCores:0>, demand : <memory:0, vCores:0>, max/min : <memory:2147483647, vCores:2147483647>/<memory:0, vCores:0>
由上面的运行结果我们可以猜出Weight的默认值是1.0 ,maxShare和minShare(即maxResources和minResources)的默认值分别为Integer.MAX_VALUE(2147483647)和0 ,但是FairShare和steadyFairShare是什么鬼?
我们先将FairShare和steadyFairShare放下,先介绍下与他们相关的Weight 和maxResources/minResources
Weight Weight 值的类型为大于等于0.0的数,默认为1.0。Weight是队列的一个属性,作为一个权值,使各个队列得到的资源成比例。
weigth的值决定了一个队列相对于其它兄弟队列 而言应该得到的资源,假设: 集群资源为1000GB、200个vcore,queueX的weight是0.25,其它队列的weight总和为0.75,
则queueX的FairShare是250GB、50个vcore*。 这里的FairShare是指steadyFairShare,原因在下面介绍
Weight不只用来计算steadyFairShare也用来计算FairShare 。假如这个队列或者其子队列中至少有一个active Application,则FairShare 就会被强制计算(这里的FairShare是指Instantaneous FairShare )。
思考 Weight的值应该怎么设置?怎么设置最合理???
minResources和maxResources minResources和maxResources是队列的资源限制。默认是0和Integer.MAX_VALUE
minResources和maxResources的值都是静态的。因此集群资源发生变化的话此值需要更新。(那为什么不设置为百分比?是否可以自己优化下??? )
In the past, one specified minResources when using preemption to get a chunk of resources sooner. This is no longer necessary as FairShare-based preemption has been improved significantly. We will discuss this subject in more detail later.
FairShare和SteadyFairShare Fair scheduler包含两种FairShare,分别是Instantaneous FairShare 和Steady FairShare ,其中Instantaneous FairShare 一般被简称为FairShare (本文中的FairShare是指Instantaneous FairShare的简称)。
Steady FairShare是一个队列的理想值 ,这个值是由集群资源和weigths计算出来的,并不会被经常计算,只是在集群启动或者集群的总资源发生增减的时候才会被重新计算。每个队列的Steady FairShare的计算公式是totalResources(集群总资源)/weights(所有队列的weight总和)*weight(某个队列的weight)
Instantaneous FairShare计算集群中每个active队列 的FairShare。Steady FairShare是计算集群中所有的队列。这两者的区别为:
空队列也就是没有分配任何资源的队列不会计算Instantaneous FairShare,其值为0。
如果集群中所有的队列都是active,则Steady FairShare和Instantaneous FairShare相等 。
每个队列的Instantaneous FairShare的计算公式是totalResources(集群总资源)/activeWeights(active队列的weight总和)*activeWeight(active队列的weight)
上面的两个计算公式都不太严谨,仅供参考,代码中真正的计算逻辑比较复杂,先看下计算Steady FairShare代码的流程。
FairShare 每一个队列/作业 会有一个FairShare值,表示该队列/作业当前应得的资源。调度器后台会有一个线程不停的根据当前集群整体情况,动态的更新每个作业的FairShare值。通常当某个作业实际获取到资源不满足其FairShare时,该作业会更有可能获取到新的资源,同样当作业实际获取到资源超出其FairShare,则很难再获取到新的资源,甚至现有的资源也会被抢占。 FairShare的计算也是一个自顶向下资源分配的过程,通常情况下同一个队列内的各个作业会尽量均分队列资源。同时具体某个作业的FairShare会受到其他一些参数的修正。
上面提到Steady FairShare只是在集群总资源发生变化时才会被计算,则其入口函数是scheduler.handle(nodeEvent1);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void recomputeSteadyShares () { policy.computeSteadyShares(childQueues, getSteadyFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); if (childQueue instanceof FSParentQueue) { ((FSParentQueue) childQueue).recomputeSteadyShares(); } } } public void computeSteadyShares (Collection<? extends FSQueue> queues, Resource totalResources) { ComputeFairShares.computeSteadyShares(queues, totalResources, ResourceType.MEMORY); }
1 2 3 4 5 6 public static void computeSteadyShares ( Collection<? extends FSQueue> queues, Resource totalResources, ResourceType type) { computeSharesInternal(queues, totalResources, type, true ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private static void computeSharesInternal ( Collection<? extends Schedulable> allSchedulables, Resource totalResources, ResourceType type, boolean isSteadyShare) { Collection<Schedulable> schedulables = new ArrayList<Schedulable>(); int takenResources = handleFixedFairShares( allSchedulables, schedulables, isSteadyShare, type); if (schedulables.isEmpty()) { return ; } int totalMaxShare = 0 ; for (Schedulable sched : schedulables) { int maxShare = getResourceValue(sched.getMaxShare(), type); totalMaxShare = (int ) Math.min((long )maxShare + (long )totalMaxShare, Integer.MAX_VALUE); if (totalMaxShare == Integer.MAX_VALUE) { break ; } } int totalResource = Math.max((getResourceValue(totalResources, type) - takenResources), 0 ); totalResource = Math.min(totalMaxShare, totalResource); double rMax = 1.0 ; while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) < totalResource) { rMax *= 2.0 ; } double left = 0 ; double right = rMax; for (int i = 0 ; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0 ; int plannedResourceUsed = resourceUsedWithWeightToResourceRatio( mid, schedulables, type); if (plannedResourceUsed == totalResource) { right = mid; break ; } else if (plannedResourceUsed < totalResource) { left = mid; } else { right = mid; } } for (Schedulable sched : schedulables) { if (isSteadyShare) { setResourceValue(computeShare(sched, right, type), ((FSQueue) sched).getSteadyFairShare(), type); } else { setResourceValue( computeShare(sched, right, type), sched.getFairShare(), type); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static int handleFixedFairShares ( Collection<? extends Schedulable> schedulables, Collection<Schedulable> nonFixedSchedulables, boolean isSteadyShare, ResourceType type) { int totalResource = 0 ; for (Schedulable sched : schedulables) { int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type); if (fixedShare < 0 ) { nonFixedSchedulables.add(sched); } else { setResourceValue(fixedShare, isSteadyShare ? ((FSQueue)sched).getSteadyFairShare() : sched.getFairShare(), type); totalResource = (int ) Math.min((long )totalResource + (long )fixedShare, Integer.MAX_VALUE); } } return totalResource; }
handleFixedFairShares用来对队列进行处理,将fixed队列的fixed fairshare进行求和返回,并将非fixed的队列放入nonFixedSchedulables中。判断某个队列是否fixed是由方法getFairShareIfFixed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static int getFairShareIfFixed (Schedulable sched, boolean isSteadyShare, ResourceType type) { if (getResourceValue(sched.getMaxShare(), type) <= 0 ) { return 0 ; } if (!isSteadyShare && (sched instanceof FSQueue) && !((FSQueue)sched).isActive()) { return 0 ; } if (sched.getWeights().getWeight(type) <= 0 ) { int minShare = getResourceValue(sched.getMinShare(), type); return (minShare <= 0 ) ? 0 : minShare; } return -1 ; }
求出所需计算的队列集合之后,则对这些队列应得的资源进行计算,要计算应得资源则需求出这些队列的总资源(这里的总资源并不是集群的总资源,而是所需计算队列的总资源 )。所求队列的总资源就是这些队列所配置的maxResources之和 ,其计算方式为:
1 2 3 4 5 6 7 8 9 10 int totalMaxShare = 0 ;for (Schedulable sched : schedulables) { int maxShare = getResourceValue(sched.getMaxShare(), type); totalMaxShare = (int ) Math.min((long )maxShare + (long )totalMaxShare, Integer.MAX_VALUE); if (totalMaxShare == Integer.MAX_VALUE) { break ; } }
1 2 3 4 5 int totalResource = Math.max((getResourceValue(totalResources, type) - takenResources), 0 ); totalResource = Math.min(totalMaxShare, totalResource);
在考虑minResources和maxResources时尽可能的进行公平分配资源,fair scheduler规定FairShare需在minResources和maxResources之间 。
fair scheduler定义一个比值R,则各个队列应分配的资源分为如下三种情况:
当队列S的_minShare > R*weight_,则分配minShare
当队列S的_manShare < R*weight_,则分配manShare
其它情况直接分配_R*weight_ 这些分配的总和等于totalSlots。这个比值R也称为weight-to-slots,因为其值将队列的weight转换为实际的资源个数。
1 2 3 4 5 6 7 double rMax = 1.0 ;while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) < totalResource) { rMax *= 2.0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static int resourceUsedWithWeightToResourceRatio (double w2rRatio, Collection<? extends Schedulable> schedulables, ResourceType type) { int resourcesTaken = 0 ; for (Schedulable sched : schedulables) { int share = computeShare(sched, w2rRatio, type); resourcesTaken += share; } return resourcesTaken; } private static int computeShare (Schedulable sched, double w2rRatio, ResourceType type) { double share = sched.getWeights().getWeight(type) * w2rRatio; share = Math.max(share, getResourceValue(sched.getMinShare(), type)); share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); return (int ) share; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 double left = 0 ;double right = rMax;for (int i = 0 ; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0 ; int plannedResourceUsed = resourceUsedWithWeightToResourceRatio( mid, schedulables, type); if (plannedResourceUsed == totalResource) { right = mid; break ; } else if (plannedResourceUsed < totalResource) { left = mid; } else { right = mid; } }
总结 本篇主要解析了下队列FairShare和SteadyFairShare的计算过程,其中FairShare和SteadyFairShare的区别为FairShare分为Instantaneous FairShare和SteadyFairShare,但是Instantaneous FairShare一般简称为FairShare ,则FairShare和SteadyFairShare的区别为前者计算时不包括未active的队列,后者计算时包括所有的队列。
FairShare和SteadyFairShare的计算思想为: 先找到一个比值R,则各个队列应分配的资源分为如下三种情况:
当队列S的_minShare > R*weight_,则分配minShare
当队列S的_maxShare < R*weight_,则分配maxShare
其它情况直接分配_R*weight_ 这些分配的总和等于totalSlots。这个比值R也称为weight-to-slots,因为其值将队列的weight转换为实际的资源个数。
其中比值R的查找比较麻烦,平时如果想简单的评估下各个队列应分配的资源可以使用如下公式: Steady FairShare的计算公式是totalResources(集群总资源)/weights(所有队列的weight总和)*weight(某个队列的weight)
这两个公式成立的前提是所求出的SteadyFairShare和FairShare在minResources和maxResources之间 。如果小于minResources(大于maxResources)则steadyFairShare或者FairShare应该为minResources(maxResources),这种情况分配给队列的slots总和会超出totalSlots。