[上篇](http://bigdatadecode.top/YARN源码分析之Fair Scheduler part1.html)介绍了Fair Scheduler中FairShare和SteadyFairShare的计算方法,本篇主要介绍其抢占相关的知识。

抢占在YARN中经常发生,但其在什么时候发生,抢占谁的资源,又将抢占后的资源分配给谁,这篇文章就尝试着去解释这些问题。

本篇主要将在Fair Scheduler调度中发生的抢占。先介绍几个相关配置。

与抢占相关的配置

yarn-site.xml

yarn.scheduler.fair.preemption

Fair调度时是否开启抢占功能。默认为false

yarn.scheduler.fair.preemption.cluster-utilization-threshold

集群中使用的资源超过此阈值时,才发生抢占。默认是0.8f

还有几个配置貌似没有开放设置,

  • yarn.scheduler.fair.waitTimeBeforeKill,默认是15000ms。等待时间超过此值时,将抢占队列中的containerkill掉。

  • yarn.scheduler.fair.preemptionInterval,默认是5000ms,两次抢占检查的时间间隔

fair-scheduler.xml

在fair-scheduler.xml中可以单独对某个队列设置抢占时间和阈值,也可以设置一个全局的抢占时间和阈值。
对队列单独设置是在queue中设置,参数如下:

fairSharePreemptionTimeout

队列所分配的资源小于fairShare*fairSharePreemptionThreshold的时间超过了这个阈值,此队列将从别的队列中抢占资源。如果不设置,此队列该属性的值将继承父队列的设置。

fairSharePreemptionThreshold

the fair share preemption threshold for the queue.
如果队列等待了fairSharePreemptionTimeout时间,队列中的资源依然少于fairSharePreemptionThreshold*fairShare,则从别的队列中抢占。如果不设置,此队列该属性的值将继承父队列的设置。

全局抢占属性设置的参数如下:

defaultFairSharePreemptionTimeout

设置root队列发生fairShare抢占的时间阈值,被fairSharePreemptionTimeout重写

defaultMinSharePreemptionTimeout

设置root队列发生minShare抢占的时间阈值,被minSharePreemptionTimeout重写

defaultFairSharePreemptionThreshold

设置root队列发生fairShare抢占资源的阈值,被fairSharePreemptionThreshold重写。也就是说某个队列的资源低于fairSharePreemptionThreshold阈值的时间超过FairSharePreemptionTimeout时,则发生抢占。

测试代码

先看代码吧,代码有点长,按照程序流程一个方法一个方法解析吧,最后再把整个代码贴上备用。

yarn在启动时需要加载配置文件,也就是configuration,那么我们的第一步也就是初始化发fair Scheduler发生抢占的环境。相关的环境配置是在createConfiguration中,代码如下:

1
2
3
4
5
6
7
8
9
10
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
// 开启抢占开关 yarn.scheduler.fair.preemption
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
// fair配置所在路径
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}

fair调度中默认抢占功能是关闭的,要想让fair调度使用抢占功能需要更改配置yarn.scheduler.fair.preemption,除此之外还有一个参数来控制yarn.scheduler.fair.preemption.cluster-utilization-threshold该参数控制着集群的资源使用率达到多少之后可以发生抢占行为。该参数在createConfiguration中没有设置,而是在随后的startResourceManager方法中设置的。下面就来看下startResourceManager方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void startResourceManager(float utilizationThreshold) {
// yarn.scheduler.fair.preemption.cluster-utilization-threshold
// 设置集群资源发生抢占的阈值
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
utilizationThreshold);
// 虚拟RM,只是为了方便测试
resourceManager = new MockRM(conf);
resourceManager.start();

scheduler = (FairScheduler)resourceManager.getResourceScheduler();

scheduler.setClock(clock);
scheduler.updateInterval = 60 * 1000;
}

startResourceManager传进去个参数,该参数赋值给yarn.scheduler.fair.preemption.cluster-utilization-threshold用来控制集群发生抢占的时机。

fair调度则需要启动一个RM用来进行资源管理,这里new一个MockRM,MockRM是一个虚拟的RM,只是为了方便测试,Hadoop中有很多类似的类,极大的方便了代码的测试。

启动RM之后就是想RM注册NM,即为集群添加资源,然后app申请资源,请看代码registerNodeAndSubmitApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private ApplicationAttemptId registerNodeAndSubmitApp(
int memory, int vcores, int appContainers, int appMemory) {
// new出一个名为node1的nm,资源为memory vcores
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(memory, vcores), 1, "node1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
System.out.println("resources in the cluster : " + scheduler.rootMetrics.getAvailableMB());
// 提交一个app,所需资源为 appContainers * appMemory
ApplicationAttemptId app1 = createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
scheduler.update();
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < appContainers; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
}
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// 打印queueA队列相关的参数,计算方式可以参考上一篇blog
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());
System.out.println("rest resources of cluster : " +
(memory - appContainers * appMemory) + " , " +
scheduler.rootMetrics.getAvailableMB());
return app1;
}

下面看下主代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Test
public void testPreemptionWithFreeResources() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
// 将fair-scheduler.xml配置写入文件
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
// maxResources 为0,则default得到fixed队列,
// 不列入计算SteadyFairShare和FairShare的集合中
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
// if fairSharePreemptionTimeout and defaultFairSharePreemptionTimeout both exist,
// we take the default one
out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();

// 参数0f表示集群资源的使用量超过此值时发生抢占
startResourceManager(0f);
// Create node with 4GB memory and 4 vcores
// app1申请4个container,每个container占1024MB,则占满集群资源
ApplicationAttemptId app1 = registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024);

// app2 提交到queueB,抢占开发已打开并且集群资源占用比例超过0则可以发生抢占
ApplicationAttemptId app2 = createSchedulingRequest(1024, "queueB", "user1", 1, 1);
scheduler.update();
// 停顿6秒,使其超过defaultMinSharePreemptionTimeout,queueB没有满足minShare则抢占
clock.tick(6);
// 抢占前queueA的状态
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());
// 检查是否抢占
// 此次只是将要抢占的container放入warnedContainers list中
scheduler.preemptTasksIfNecessary();
// 打印要抢占多少资源
System.out.println("preemptResources() should have been called " + 1024 + " , " +
scheduler.getSchedulerApp(app1).getPreemptionContainers().iterator().next().getContainer().getResource());
System.out.println("====================================================================");
// 停顿超过15s,将container杀掉,释放资源
clock.tick(18);
// 将container杀掉,释放资源
scheduler.preemptTasksIfNecessary();
System.out.println("preemptResources() should have been called " + 1024 + " , " +
scheduler.getSchedulerApp(app1).getPreemptionContainers());
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());

resourceManager.stop();
}

整个测试抢占的流程就结束了。大致思路是,startResourceManager时设置集群可以发生抢占的时机(例子设置的阈值是0),然后向集群注册了一个4g的node1,又想集群queueA提交了app1(4个container,一个container占1024MB),app1占满整个集群资源,此时app2(1个container,1024MB)提交给queueB,模拟等待6s(defaultMinSharePreemptionTimeout是5s,queueB此时不满足minShare,需要抢占),调用preemptTasksIfNecessary查看是否要抢占(查看是否要抢占在真实环境中其实是一个线程不断的在check),最后再等待18s(yarn.scheduler.fair.waitTimeBeforeKill设置的是15s,超过此值才会将containerkill掉释放资源),然后查看queueA队列被抢占前后状态的变化。

上面代码运行结果如下:

1
2
3
4
5
6
7
8
resources in the cluster : 4096
queueA : 4096, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:4096, vCores:4>, preempt : null
rest resources of cluster : 0
queueA : 2048, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:4096, vCores:4>, preempt : container_0_0001_01_000004
preemptResources() should have been called 1024 , <memory:1024, vCores:1>
====================================================================
preemptResources() should have been called 1024 , []
queueA : 2048, weight : 1.0, steadyShare : <memory:2048, vCores:0>, demand : <memory:4096, vCores:4>, running : <memory:3072, vCores:3>, preempt : container_0_0001_01_000003

从运行结果中可以看出初期集群被queueA中的app1全占,然后queueB中的app2提交,而queueB的资源在5s之后仍然低于minShare(1024),则发生抢占,从queueA中抢占1024满足minShare,等待时间超过15s之后,依然没有资源被释放则kill掉container_0_0001_01_000004,queueA中running的资源变为3072,说明抢占成功。

抢占进阶

上一节说到了queueB从queueA抢占1024MB,那么假如集群中有多个队列,那么应该怎样选择从哪个队列抢占,又应该抢占多少呢?带着这个问题再去看下代码。

与抢占相关的代码是scheduler.preemptTasksIfNecessary(),那么就先看下这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected synchronized void preemptTasksIfNecessary() {
// 校验当前集群是否能发生抢占
if (!shouldAttemptPreemption()) {
return;
}

long curTime = getClock().getTime();
// 时间间隔是否超过 yarn.scheduler.fair.preemptionInterval
if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;
}
lastPreemptCheckTime = curTime;

Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
// 遍历所有队列,计算哪些队列需要抢占资源,抢占多少
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
// 选择从哪个队列中抢
preemptResources(resToPreempt);
}
}

preemptTasksIfNecessary用来检查哪些队列需要进行tasks的抢占,计算共有多少个task需要被抢占,并从中选择。

判断队列是否要进行抢占的条件是当队列的资源低于minShare的时间超过minSharePreemptionTimeout或者低于fairShare的时间超过fairSharePreemptionTimeout时就认为该队列需要去抢占别的队列的资源

在检查队列是否需要抢占时,得先校验下大环境是否允许抢占发生,这个是在shouldAttemptPreemption中校验的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private boolean shouldAttemptPreemption() {
// yarn.scheduler.fair.preemption 为true时,
// 再判断是否超过 yarn.scheduler.fair.preemption.cluster-utilization-threshold
// 超过才可以发生抢占
if (preemptionEnabled) {
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}
return false;
}

只有在集群允许发生抢占的前提下才有可能发生抢占,而判断集群是否允许发生抢占的条件是yarn.scheduler.fair.preemption为true,并且集群资源的使用率超过了yarn.scheduler.fair.preemption.cluster-utilization-threshold

当集群允许发生抢占时,遍历所有的队列,找出需要抢占的队列并计算出需要抢占多少资源。这部分代码逻辑在resToPreempt中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
// 是否超过了minSharePreemptionTimeout
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
// 是否超过了fairSharePreemptionTimeout
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message);
}
return resToPreempt;
}

resToPreempt计算当前队列需要抢占多少资源,计算规则为:
假设队列所需的资源大于minShare和fairShare

  • 如果此队列的资源低于minShare的时间超过minSharePreemptionTimeout,则应该抢占minShare和正在使用资源的差值。
  • 如果此队列的资源低于fairShare的时间超过fairSharePreemptionTimeout,则应该抢占fairShare和正在使用资源的差值。
  • 如果上述两个条件都满足,则抢占两者较大的数。

resToPreempt决定了该队列是否应该去抢占,应该抢多少,那么从哪抢占是在哪决定的?答案是preemptResources,看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
protected void preemptResources(Resource toPreempt) {
long start = getClock().getTime();
if (Resources.equals(toPreempt, Resources.none())) {
return;
}

// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}

try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
app.resetPreemptedResources();
}
}

while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
// preemptContainer 决定从哪个队列中抢资源
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
warnOrKillContainer(container);
warnedContainers.add(container);
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
app.clearPreemptedResources();
}
}
}

long duration = getClock().getTime() - start;
fsOpDurations.addPreemptCallDuration(duration);
}

资源的抢占是通过挑选队列,然后在队列中选app,最后在此app上选一个container上,队列的选取是从root队列开始,然后一层一层的选取,直到一个候选Application。

抢占策略根据不同的调度策略而不同:
(1) fair/drf 选择超过fairShare最多的队列
(2) fifo 选择最近提交的app的队列
在app中选择container是根据container的优先级,抢占优先级最低的container,container的优先级是由数字标识的,数字越大优先级越低。

这个挑选流程的入口函数是getQueueManager().getRootQueue().preemptContainer(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// FSParentQueue.java
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;

// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
// 根据调度策略选择比较器
Comparator<Schedulable> comparator = policy.getComparator();
// 找到超过fairShaer最多的队列
for (FSQueue queue : childQueues) {
if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
}
}

// Let the selected queue choose which of its container to preempt
if (candidateQueue != null) {
// 如果候选队列依然是父队列,则继续调用FSParentQueue.preemptContainer,形成递归
// 如果候选队列是叶子队列,则从队列中找到合适的app进行抢占
toBePreempted = candidateQueue.preemptContainer();
}
return toBePreempted;
}

FSParentQueue.preemptContainer从root队列选择超过fairShare最多的一个队列作为候选队列,如果此候选队列依然是父队列则继续调用FSParentQueue.preemptContainer形成递归,如果是叶子队列则调用FSLeafQueue.preemptContainer,从队列中选择一个app。

选择队列和app时涉及到一个比较器,这个比较器根据不同的策略实现不一样,这里主要介绍下FairShareComparator,FairShareComparator是FairSharePolicy.java的内部类,实现了Comparator接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static class FairShareComparator implements Comparator<Schedulable>,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
private static final Resource ONE = Resources.createResource(1);

@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}

该比较器传入两个队列A、B,如果A大于B,则返回1,小于返回-1,等于则返回0。

队列比较时分两种情况,一种是所用资源低于minShare,另一种情况是高于minShare。
(1) 低于minShare时,

  • 两个队列都低于minShare时,则比较_两个队列挂起的任务数与总任务数的比例_,比例值越低则越应该先得到资源。则比较器认为比例大的队列较大。
  • 只有一个队列低于minShare时,则超过minShare的那个队列较大,因为低于minShare的队列得到资源的优先级高于已经超过minShare队列的优先级。

(2) 高于minShare时,

  • 高于minShare则计算队列所使用的资源与weight的比例,比例大的队列大。

接下来看下FSLeafQueue.preemptContainer,看下app是怎么从队列中选中的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// FSLeafQueue.java
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;

// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}

// Choose the app that is most over fair share
// 此处的比较器依然是上面介绍的比较器
Comparator<Schedulable> comparator = policy.getComparator();
FSAppAttempt candidateSched = null;
for (FSAppAttempt sched : runnableApps) {
if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched;
}
}

// Preempt from the selected app
if (candidateSched != null) {
// 从app中选择一个container,container的选取是按照优先级
// 优先级一样则比较containerId
toBePreempted = candidateSched.preemptContainer();
}
return toBePreempted;
}

通过上面的步骤将container从app中选出之后,调用warnOrKillContainer,判断是将container放入preemptionMap(这里记录了可以被抢占的container和放入的时间)还是直接kill掉(当超过yarn.scheduler.fair.waitTimeBeforeKill时,则kill掉container释放资源)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());

Long time = app.getContainerPreemptionTime(container);

if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
if (time + waitTimeBeforeKill < getClock().getTime()) {
ContainerStatus status =
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);

recoverResourceRequestForContainer(container);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
LOG.info("Killing container" + container +
" (after waiting for premption for " +
(getClock().getTime() - time) + "ms)");
}
} else {
// track the request in the FSAppAttempt itself
// 每个app都记录自己可以被抢占的container
app.addPreemption(container, getClock().getTime());
}
}

当warnOrKillContainer将container杀掉之后,队列就可以抢到资源了,整个抢占过程也就结束了。

总结

在fair环境下抢占时,先看大环境是否可以发生抢占,

判断是否发生抢占的条件是
yarn.scheduler.fair.preemption为true,并且集群资源的使用率超过了yarn.scheduler.fair.preemption.cluster-utilization-threshold

可以发生抢占时,则要判断需要抢占多少资源。抢占多少资源则是将所有的队列都遍历一遍,对每个队列需要抢占的资源进行汇总。在FairScheduler.resToPreempt方法中。
每个队列要抢占多少资源的计算规则为:
假设队列所需的资源大于minShare和fairShare

  • 如果此队列的资源低于minShare的时间超过minSharePreemptionTimeout,则应该抢占minShare和正在使用资源的差值。
  • 如果此队列的资源低于fairShare的时间超过fairSharePreemptionTimeout,则应该抢占fairShare和正在使用资源的差值。
  • 如果上述两个条件都满足,则抢占两者较大的数。

计算出需要抢占的资源总数之后就是找出抢哪些container。找哪些container的流程是从root队列一层一层的查找可以抢占的队列,然后从队列中找到Application,最后找到可以抢占的container。这个流程的入口函数是FairScheduler.preemptResources
preemptResources先遍历warnedContainers(是个list)中的container,如果toPreempt依然有剩余,则循环的从root队列向下一层一层的查找可以抢占的container,并将此container放入list warnedContainers中。
抢占策略根据不同的调度策略而不同:
(1) fair/drf 选择超过fairShare最多的队列
(2) fifo 选择最近提交的app的队列
在app中选择container是根据container的优先级,抢占优先级最低的container,container的优先级是由数字标识的,数字越大优先级越低。

Tips

  • FairSharePreemptionTimeout的值应该大于0.0小于1.0,因为设置为0.0则表示关闭了fairShare抢占功能。
  • 抢占是将queueA的container杀掉进行重新分配给queueX队列,不能将queueA中app1的container杀掉然后再分配给queueA中的app2
  • SchedulingPolicy
    [上篇](http://bigdatadecode.top/YARN源码分析之Fair Scheduler part1.html)中介绍了SchedulingPolicy中计算fairShare的逻辑,本篇中介绍了队列排序比较的逻辑,这也就把SchedulingPolicy的功能介绍完了。
    包括两个功能:
    (1)计算fairshare的逻辑
    (2)队列排序比较的逻辑(在分配资源,以及在抢占资源时候会进行排序),分配给最需要资源的,以及抢占掉最不需要资源的

附件-完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
public class TestFairSchedulerPreemptionSzw extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File("F:\\test-queues").getAbsolutePath();

private MockClock clock;

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
// 开启抢占开关 yarn.scheduler.fair.preemption
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
// fair配置所在路径
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}

@Before
public void setup() throws IOException {
conf = createConfiguration();
clock = new MockClock();
}

@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}

private void startResourceManager(float utilizationThreshold) {
// yarn.scheduler.fair.preemption.cluster-utilization-threshold
// 设置集群资源发生抢占的阈值
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
utilizationThreshold);
// 虚拟RM,只是为了方便测试
resourceManager = new MockRM(conf);
resourceManager.start();

scheduler = (FairScheduler)resourceManager.getResourceScheduler();

scheduler.setClock(clock);
scheduler.updateInterval = 60 * 1000;
}

private ApplicationAttemptId registerNodeAndSubmitApp(
int memory, int vcores, int appContainers, int appMemory) {
// new出一个名为node1的nm,资源为memory vcores
RMNode node1 = MockNodes.newNodeInfo(
1, Resources.createResource(memory, vcores), 1, "node1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);

System.out.println("resources in the cluster : " + scheduler.rootMetrics.getAvailableMB());
// 提交一个app,所需资源为 appContainers * appMemory
ApplicationAttemptId app1 = createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
scheduler.update();
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < appContainers; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
}
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());
System.out.println("rest resources of cluster : " +
(memory - appContainers * appMemory) + " , " +
scheduler.rootMetrics.getAvailableMB());
return app1;
}

@Test
public void testPreemptionWithFreeResources() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
// maxResources 为0,则default得到fixed队列,
// 不列入计算SteadyFairShare和FairShare的集合中
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
// if fairSharePreemptionTimeout and defaultFairSharePreemptionTimeout both exist,
// we take the default one
out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();

// 参数0f表示集群资源的使用量超过此值时发生抢占
startResourceManager(0f);
// Create node with 4GB memory and 4 vcores
// 2 个container 1024mb
ApplicationAttemptId app1 = registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024);

// Verify submitting another request triggers preemption
ApplicationAttemptId app2 = createSchedulingRequest(1024, "queueB", "user1", 1, 1);
scheduler.update();
// 停顿6秒,使其超过defaultMinSharePreemptionTimeout,queueB没有满足minShare则抢占
clock.tick(6);

FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());
// 检查是否抢占
scheduler.preemptTasksIfNecessary();
System.out.println("preemptResources() should have been called " + 1024 + " , " +
scheduler.getSchedulerApp(app1).getPreemptionContainers().iterator().next().getContainer().getResource());
System.out.println("====================================================================");
// 停顿超过15s,将container杀掉,释放资源
clock.tick(18);
// 将container杀掉,释放资源
scheduler.preemptTasksIfNecessary();
System.out.println("preemptResources() should have been called " + 1024 + " , " +
scheduler.getSchedulerApp(app1).getPreemptionContainers());
System.out.println( "queueA : " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", running : " + queue.getResourceUsage() +
", preempt : " + queue.preemptContainer());

resourceManager.stop();
}
}