想看fair Scheduler的源码已经很久了,只是一直没有合适的时间,更没有找到合适的方法去读src,导致每次读一点就不想继续读了,最近实在是太无聊了,也不知道该读点什么,于是就又想起了这个搁置很久的任务。
这次读代码时我直接从fair Scheduler的test类中进行跟读,这样我感觉会比较清晰也容易debug。
环境初始化 要想测试Fair Scheduler,首先要虚拟出一个yarn环境,并加载fair相关的配置。TestFairScheduler 这个类是怎么对环境进行初始化的。
TestFairScheduler继承自FairSchedulerTestBase ,一些公共常用方法被提到FairSchedulerTestBase中,yarn和相关配置环境的初始化在setUp中,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  void  setUp ()  throws  IOException      scheduler = new  FairScheduler();      conf = createConfiguration();      resourceManager = new  ResourceManager();   resourceManager.init(conf);         ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();   resourceManager.getRMContext().getStateStore().start();      resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();   scheduler.setRMContext(resourceManager.getRMContext()); } protected  Configuration createConfiguration ()    Configuration conf = new  YarnConfiguration();   conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,       ResourceScheduler.class);   conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0 );   conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,       1024 );   conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240 );   conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false );   conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f );   return  conf; } 
环境初始化之后,我们先来看个简单的例子,我们能从这个例子中知道每个队列的FairShare是如何算出来的。
FairShare计算逻辑 测试代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public  void  testFairShareWithMaxResources ()  throws  IOException      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);               PrintWriter out = new  PrintWriter(new  FileWriter(ALLOC_FILE));   out.println("<?xml version=\"1.0\"?>" );   out.println("<allocations>" );   out.println("<queue name=\"queueA\">" );   out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>" );   out.println("<weight>.25</weight>" );   out.println("</queue>" );   out.println("<queue name=\"queueB\">" );   out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>" );   out.println("<weight>.75</weight>" );   out.println("</queue>" );   out.println("</allocations>" );   out.close();   scheduler.init(conf);   scheduler.start();   scheduler.reinitialize(conf, resourceManager.getRMContext());      RMNode node1 =       MockNodes.newNodeInfo(1 , Resources.createResource(8  * 1024 , 8 ), 1 ,           "127.0.0.1" );   NodeAddedSchedulerEvent nodeEvent1 = new  NodeAddedSchedulerEvent(node1);   scheduler.handle(nodeEvent1);         createSchedulingRequest(1  * 1024 , "queueA" , "user1" );      createSchedulingRequest(6  * 1024 , "queueB" , "user1" );      scheduler.update();   FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(       "queueA" , false );      System.out.println( "queueA FairShare: "  + queue.getFairShare().getMemory() +           ", weight : "  + queue.getWeights().getWeight(ResourceType.MEMORY) +           ", steadyShare : "  + queue.getSteadyFairShare() +           ", demand : "  + queue.getDemand() +           ", max/min : "  + queue.getMaxShare() + "/"  + queue.getMinShare());      queue = scheduler.getQueueManager().getLeafQueue(       "queueB" , false );   System.out.println( "queueB FairShare: "  + queue.getFairShare().getMemory() +           ", weight : "  + queue.getWeights().getWeight(ResourceType.MEMORY) +           ", steadyShare : "  + queue.getSteadyFairShare() +           ", demand : "  + queue.getDemand() +           ", max/min : "  + queue.getMaxShare() + "/"  + queue.getMinShare());   queue = scheduler.getQueueManager().getLeafQueue(           "default" , false );   System.out.println( "default FairShare: "  + queue.getFairShare().getMemory() +           ", weight : "  + queue.getWeights().getWeight(ResourceType.MEMORY) +           ", steadyShare : "  + queue.getSteadyFairShare() +           ", demand : "  + queue.getDemand() +           ", max/min : "  + queue.getMaxShare() + "/"  + queue.getMinShare()); } 
代码的运行结果如下:
1 2 3 queueA FairShare: 2048, weight : 0.25, steadyShare : <memory:1024, vCores:0>, demand : <memory:1024, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0> queueB FairShare: 6144, weight : 0.75, steadyShare : <memory:3072, vCores:0>, demand : <memory:6144, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0> default FairShare: 0, weight : 1.0, steadyShare : <memory:4096, vCores:0>, demand : <memory:0, vCores:0>, max/min : <memory:2147483647, vCores:2147483647>/<memory:0, vCores:0> 
由上面的运行结果我们可以猜出Weight的默认值是1.0 ,maxShare和minShare(即maxResources和minResources)的默认值分别为Integer.MAX_VALUE(2147483647)和0 ,但是FairShare和steadyFairShare是什么鬼?
我们先将FairShare和steadyFairShare放下,先介绍下与他们相关的Weight 和maxResources/minResources 
Weight Weight 值的类型为大于等于0.0的数,默认为1.0。Weight是队列的一个属性,作为一个权值,使各个队列得到的资源成比例。
注意事项:
 
weigth的值决定了一个队列相对于其它兄弟队列 而言应该得到的资源,假设:
则queueX的FairShare是250GB、50个vcore*。  这里的FairShare是指steadyFairShare,原因在下面介绍 
Weight不只用来计算steadyFairShare也用来计算FairShare 。假如这个队列或者其子队列中至少有一个active Application,则FairShare 就会被强制计算(这里的FairShare是指Instantaneous FairShare )。
weigth的值是和集群相关的,所以当集群的资源改变之后FairShare也会发生变化,
 
思考  Weight的值应该怎么设置?怎么设置最合理???
minResources和maxResources minResources和maxResources是队列的资源限制。默认是0和Integer.MAX_VALUE
注意事项:
 
minResources是一个软性的限制。满足下面这两种情况时,minResources会强制这个队列的总资源大于或者等于minResources配置的大小。 
 
资源是可用的或者可以从其它队列中抢占 
所有队列的minResources的总和不超过集群的总资源 
 
maxResources是一个硬性的限制。也就是说该队列的子队列或者后代队列正在使用资源的总和不会超过此值。 
minResources和maxResources并不被推荐。这两个配置有一些缺点: 
 
minResources和maxResources的值都是静态的。因此集群资源发生变化的话此值需要更新。(那为什么不设置为百分比?是否可以自己优化下??? ) 
maxResources限制了集群资源的使用,队列超过maxResources之后不能再使用任何空闲的资源。(这个在不同的场景优缺点的性质也不一样) 
如果一个队列的minResources超过了FairShare,它可能会对其他队列的FairShare造成影响。 
In the past, one specified minResources when using preemption to get a chunk of resources sooner. This is no longer necessary as FairShare-based preemption has been improved significantly. We will discuss this subject in more detail later. 
 
FairShare和SteadyFairShare Fair scheduler包含两种FairShare,分别是Instantaneous FairShare 和Steady FairShare ,其中Instantaneous FairShare 一般被简称为FairShare (本文中的FairShare是指Instantaneous FairShare的简称)。
Steady FairShare是一个队列的理想值 ,这个值是由集群资源和weigths计算出来的,并不会被经常计算,只是在集群启动或者集群的总资源发生增减的时候才会被重新计算。每个队列的Steady FairShare的计算公式是totalResources(集群总资源)/weights(所有队列的weight总和)*weight(某个队列的weight) 
Instantaneous FairShare计算集群中每个active队列 的FairShare。Steady FairShare是计算集群中所有的队列。这两者的区别为:
空队列也就是没有分配任何资源的队列不会计算Instantaneous FairShare,其值为0。 
如果集群中所有的队列都是active,则Steady FairShare和Instantaneous FairShare相等 。 
每个队列的Instantaneous FairShare的计算公式是totalResources(集群总资源)/activeWeights(active队列的weight总和)*activeWeight(active队列的weight) 
上面的两个计算公式都不太严谨,仅供参考,代码中真正的计算逻辑比较复杂,先看下计算Steady FairShare代码的流程。
FairShare队列/作业 会有一个FairShare值,表示该队列/作业当前应得的资源。调度器后台会有一个线程不停的根据当前集群整体情况,动态的更新每个作业的FairShare值。通常当某个作业实际获取到资源不满足其FairShare时,该作业会更有可能获取到新的资源,同样当作业实际获取到资源超出其FairShare,则很难再获取到新的资源,甚至现有的资源也会被抢占。
 
上面提到Steady FairShare只是在集群总资源发生变化时才会被计算,则其入口函数是scheduler.handle(nodeEvent1);,scheduler捕获到NODE_ADDED事件,调用addNode,然后继续调用queueMgr.getRootQueue().setSteadyFairShare(clusterResource);queueMgr.getRootQueue().recomputeSteadyShares()先设置整个集群的steadyFairShare,然后计算每个队列的steadyFairShare,这里从recomputeSteadyShares开始屡下代码的思路:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public  void  recomputeSteadyShares ()          policy.computeSteadyShares(childQueues, getSteadyFairShare());         for  (FSQueue childQueue : childQueues) {     childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());     if  (childQueue instanceof  FSParentQueue) {       ((FSParentQueue) childQueue).recomputeSteadyShares();     }   } } public  void  computeSteadyShares (Collection<? extends FSQueue> queues,     Resource totalResources)    ComputeFairShares.computeSteadyShares(queues, totalResources,       ResourceType.MEMORY); } 
这里计算steadyFairShare用的是fair策略,则调用FairSharePolicy.computeSteadyShares,调用ComputeFairShares.computeSteadyShares来计算steadyFairShare,ComputeFairShares是计算FairShare的主要逻辑实现。先来看下computeSteadyShares方法:
1 2 3 4 5 6 public  static  void  computeSteadyShares (     Collection<? extends FSQueue> queues, Resource totalResources,     ResourceType type)       computeSharesInternal(queues, totalResources, type, true ); } 
由于FairShare和steadyFairShare都是在ComputeFairShares类中,而且其计算逻辑类似,所以放在同一个方法中计算,提出一个标识位来标识计算的是FairShare还是steadyFairShare,这个方法就是computeSharesInternal。
computeSteadyShares计算的是steadyFairShare,则调用computeSharesInternal时,将标识位设为true,computeSharesInternal的计算逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private  static  void  computeSharesInternal (     Collection<? extends Schedulable> allSchedulables,     Resource totalResources, ResourceType type, boolean  isSteadyShare)    Collection<Schedulable> schedulables = new  ArrayList<Schedulable>();      int  takenResources = handleFixedFairShares(       allSchedulables, schedulables, isSteadyShare, type);   if  (schedulables.isEmpty()) {     return ;   }               int  totalMaxShare = 0 ;   for  (Schedulable sched : schedulables) {     int  maxShare = getResourceValue(sched.getMaxShare(), type);     totalMaxShare = (int ) Math.min((long )maxShare + (long )totalMaxShare,         Integer.MAX_VALUE);     if  (totalMaxShare == Integer.MAX_VALUE) {       break ;     }   }   int  totalResource = Math.max((getResourceValue(totalResources, type) -       takenResources), 0 );      totalResource = Math.min(totalMaxShare, totalResource);      double  rMax = 1.0 ;   while  (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)       < totalResource) {     rMax *= 2.0 ;   }      double  left = 0 ;   double  right = rMax;      for  (int  i = 0 ; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {     double  mid = (left + right) / 2.0 ;     int  plannedResourceUsed = resourceUsedWithWeightToResourceRatio(         mid, schedulables, type);     if  (plannedResourceUsed == totalResource) {       right = mid;       break ;     } else  if  (plannedResourceUsed < totalResource) {       left = mid;     } else  {       right = mid;     }   }            for  (Schedulable sched : schedulables) {     if  (isSteadyShare) {       setResourceValue(computeShare(sched, right, type),           ((FSQueue) sched).getSteadyFairShare(), type);     } else  {       setResourceValue(           computeShare(sched, right, type), sched.getFairShare(), type);     }   } } 
computeSharesInternal可以用来计算FairShare和SteadyFairShare,但是这两个计算的队列集合是不一样的,那这个队列集合是在哪进行筛选的呢?从上面的代码中可以看到有个handleFixedFairShares方法,看下其具体的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private  static  int  handleFixedFairShares (     Collection<? extends Schedulable> schedulables,     Collection<Schedulable> nonFixedSchedulables,     boolean  isSteadyShare, ResourceType type)       int  totalResource = 0 ;   for  (Schedulable sched : schedulables) {   	     int  fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);     if  (fixedShare < 0 ) {       nonFixedSchedulables.add(sched);     } else  {       setResourceValue(fixedShare,           isSteadyShare               ? ((FSQueue)sched).getSteadyFairShare()               : sched.getFairShare(),           type);       totalResource = (int ) Math.min((long )totalResource + (long )fixedShare,           Integer.MAX_VALUE);     }   }   return  totalResource; } 
handleFixedFairShares用来对队列进行处理,将fixed队列的fixed fairshare进行求和返回,并将非fixed的队列放入nonFixedSchedulables中。判断某个队列是否fixed是由方法getFairShareIfFixed判断的,判断标准如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private  static  int  getFairShareIfFixed (Schedulable sched,     boolean  isSteadyShare, ResourceType type)       if  (getResourceValue(sched.getMaxShare(), type) <= 0 ) {     return  0 ;   }            if  (!isSteadyShare &&       (sched instanceof  FSQueue) && !((FSQueue)sched).isActive()) {     return  0 ;   }         if  (sched.getWeights().getWeight(type) <= 0 ) {     int  minShare = getResourceValue(sched.getMinShare(), type);     return  (minShare <= 0 ) ? 0  : minShare;   }      return  -1 ; } 
判断一个队列是fixed的条件是:
maxResources小于等于0 
weight小于等于0 
当求FairShare时,如果队列是非active时 
 
computeSharesInternal通过handleFixedFairShares将nonFixedSchedulables放入schedulables中,schedulables集合中存放的就是需要计算FairShare或者SteadyFairShare的队列。
求出所需计算的队列集合之后,则对这些队列应得的资源进行计算,要计算应得资源则需求出这些队列的总资源(这里的总资源并不是集群的总资源,而是所需计算队列的总资源 )。所求队列的总资源就是这些队列所配置的maxResources之和 ,其计算方式为:
1 2 3 4 5 6 7 8 9 10 int  totalMaxShare = 0 ;for  (Schedulable sched : schedulables) {  int  maxShare = getResourceValue(sched.getMaxShare(), type);   totalMaxShare = (int ) Math.min((long )maxShare + (long )totalMaxShare,       Integer.MAX_VALUE);   if  (totalMaxShare == Integer.MAX_VALUE) {     break ;   } } 
这些队列的总和有可能超过Integer.MAX_VALUE,则需要对其总资源进行校验,代码如下:
1 2 3 4 5 int  totalResource = Math.max((getResourceValue(totalResources, type) -    takenResources), 0 ); totalResource = Math.min(totalMaxShare, totalResource); 
求出这些队列的总资源,则下面就是将这些资源根据各个队列的weight进行分配。
在考虑minResources和maxResources时尽可能的进行公平分配资源,fair scheduler规定FairShare需在minResources和maxResources之间 。
fair scheduler定义一个比值R,则各个队列应分配的资源分为如下三种情况:
当队列S的_minShare > R*weight_,则分配minShare 
当队列S的_manShare < R*weight_,则分配manShare 
其它情况直接分配_R*weight_ 
 
那么R的取值是关键,R过小则导致分配给队列的slots总和小于totalSlots,R过大则导致分配的slots总和多余totalSlots。
R的查找可以使用二分查找,则另一个关键点就是R上限的确定,由上面对R的定义,得知R的上限就是尽可能的让分配的slots接近totalSlots。R上限的代码如下:
1 2 3 4 5 6 7 double  rMax = 1.0 ;while  (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)    < totalResource) {   rMax *= 2.0 ; } 
resourceUsedWithWeightToResourceRatio返回的值是当R为rMax时,各个队列分配的slots总和。而各个队列应该分配的slots是通过computeShare计算的。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private  static  int  resourceUsedWithWeightToResourceRatio (double  w2rRatio,     Collection<? extends Schedulable> schedulables, ResourceType type)    int  resourcesTaken = 0 ;   for  (Schedulable sched : schedulables) {     int  share = computeShare(sched, w2rRatio, type);     resourcesTaken += share;   }   return  resourcesTaken; } private  static  int  computeShare (Schedulable sched, double  w2rRatio,     ResourceType type)    double  share = sched.getWeights().getWeight(type) * w2rRatio;   share = Math.max(share, getResourceValue(sched.getMinShare(), type));   share = Math.min(share, getResourceValue(sched.getMaxShare(), type));   return  (int ) share; }   
找到R的最大值之后,就是进行二分查找,找到最公平的分配方案,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 double  left = 0 ;double  right = rMax;for  (int  i = 0 ; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {  double  mid = (left + right) / 2.0 ;   int  plannedResourceUsed = resourceUsedWithWeightToResourceRatio(       mid, schedulables, type);   if  (plannedResourceUsed == totalResource) {     right = mid;     break ;   } else  if  (plannedResourceUsed < totalResource) {     left = mid;   } else  {     right = mid;   } } 
最后根据isSteadyShare标识设置每个队列的FairShare或者SteadyFairShare的值。
总结 本篇主要解析了下队列FairShare和SteadyFairShare的计算过程,其中FairShare和SteadyFairShare的区别为FairShare分为Instantaneous FairShare和SteadyFairShare,但是Instantaneous FairShare一般简称为FairShare ,则FairShare和SteadyFairShare的区别为前者计算时不包括未active的队列,后者计算时包括所有的队列。
FairShare和SteadyFairShare的计算思想为:
当队列S的_minShare > R*weight_,则分配minShare 
当队列S的_maxShare < R*weight_,则分配maxShare 
其它情况直接分配_R*weight_ 
 
其中比值R的查找比较麻烦,平时如果想简单的评估下各个队列应分配的资源可以使用如下公式:totalResources(集群总资源)/weights(所有队列的weight总和)*weight(某个队列的weight)totalResources(集群总资源)/activeWeights(active队列的weight总和)*activeWeight(active队列的weight)
这两个公式成立的前提是所求出的SteadyFairShare和FairShare在minResources和maxResources之间 。如果小于minResources(大于maxResources)则steadyFairShare或者FairShare应该为minResources(maxResources),这种情况分配给队列的slots总和会超出totalSlots。