一次和朋友的谈话中涉及到ApplicationMaster的container分配策略是什么,我映像中是随机分配的,但他说是根据各节点空闲资源来分配的。
之前看代码的时候也没注意这块的逻辑,既然现在有了疑惑那就去代码里瞅瞅。

从MR的运行log中可以找到AM的container是在什么时候分配的,见log

1
2
2017-04-09 03:26:17,113 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1491729774382_0001_000001 State change from SUBMITTED to SCHEDULED
2017-04-09 03:26:17,415 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1491729774382_0001_01_000001 Container Transitioned from NEW to ALLOCATED

AM container是在appattempt的状态由SUBMITTED变为SCHEDULED时初始化的。
appattempt由SUBMITTED变为SCHEDULED状态的处理逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
// Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after
// AM container allocated
// 设置am container的请求
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
// ResourceName为ANY表示任何机架上的任一机器
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);

// 由调度器来分配资源
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
...
return RMAppAttemptState.SCHEDULED;
} else {
...
}
}
}

首先为AM container构造container请求,其实从appAttempt.amReq.setResourceName(ResourceRequest.ANY)就可以看出am container的分配原则是随机的,因为在创建请求时对ResourceName并没有要求。但我们还是继续看下代码以验证下。
请求创建成功之后,由调度器来分配资源,这里默认使用的是Capacity调度,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// CapacityScheduler.java
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {

FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
...
// Release containers
releaseContainers(release, application);

synchronized (application) {
...
if (!ask.isEmpty()) {
...
application.showRequests();
// 将请求该application attempt的map中
// Update application requests
application.updateResourceRequests(ask);
application.showRequests();
}

application.updateBlacklist(blacklistAdditions, blacklistRemovals);
//
return application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
}
}

CapacityScheduler分配请求时,调用application.updateResourceRequests(ask)将请求放入map中,等待nm心跳时来取。
这个application是FiCaSchedulerApp的对象,FiCaSchedulerApp其实对应的是application attempt。updateResurceRequests代码如下:

1
2
3
4
5
6
7
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
// AppSchedulingInfo.updateResourceRequests
appSchedulingInfo.updateResourceRequests(requests, false);
}
}

AppSchedulingInfo记录了application的所有消费情况,当然也包括这个application正在运行或者已经完成的container。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
synchronized public void updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName();
boolean updatePendingResources = false;
ResourceRequest lastRequest = null;
// 如果request的ResourceName是ResourceRequest.ANY
// 只有am container是ANY???不应该吧
if (resourceName.equals(ResourceRequest.ANY)) {
...
// ResourceRequest.ANY才置为true??
updatePendingResources = true;

// Premature optimization?
// Assumes that we won't see more than one priority request updated
// in one call, reasonable assumption... however, it's totally safe
// to activate same application more than once.
// Thus we don't need another loop ala the one in decrementOutstanding()
// which is needed during deactivate.
if (request.getNumContainers() > 0) {
activeUsersManager.activateApplication(user, applicationId);
}
}
// requests是一个请求列表 map
// 查看requests中是否已有该优先级的请求
// this.requests中存放的是这个application的request
Map<String, ResourceRequest> asks = this.requests.get(priority);
// 没有此优先级的请求,则new一个map
if (asks == null) {
asks = new HashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
this.priorities.add(priority);
}
// asks不为null,查看asks中是否有与此请求ResourceName一样的请求
lastRequest = asks.get(resourceName);

if (recoverPreemptedRequest && lastRequest != null) {
// Increment the number of containers to 1, as it is recovering a
// single container.
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
// 把原来的请求拿出赋值给lastRequest,
// 然后将新的request将入asks中,lastRequest怎么办?在哪处理的?
asks.put(resourceName, request);
if (updatePendingResources) {

// Similarly, deactivate application?
if (request.getNumContainers() <= 0) {
LOG.info("checking for deactivate... ");
checkForDeactivation();
}

int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
metrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
}
}
}

updateResourceRequests主要是将请求放入requests中,等待nm心跳来取。不过这里有点模糊,在更新requests之前,如果存在该ResourceName的请求则取出,赋值给lastRequest,然后这个lastRequest是怎么处理的呢?不知道怎么回事,标注下。

更新完requests之后,回到CapacityScheduler.allocate中继续执行,return时执行application.getAllocation返回一个Allocation对象,这里会给container创建TOKEN,这里创建TOKEN的container是已经分配给nm的,也就是已经实例化的RMContainer,是不是说调度器在调度container时,先创建一个请求,然后从newlyAllocatedContainers列表中取出上次请求container的响应结果


am container的请求创建好之后,等待nm心跳来取

某个nm发送来了心跳,
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// CapacityScheduler.handle  NODE_UPDATE事件
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
// 更新该节点上的container的信息
// 对刚分配到该节点的container进行launch,已经完成的container进行状态转移
nodeUpdate(node);
if (!scheduleAsynchronously) {
// 该节点取container请求
allocateContainersToNode(getNode(node.getNodeID()));
}
}

nm与CapacityScheduler心跳之后,通过nodeUpdate(node)对改节点上已有的container进行状态更新,然后调用allocateContainersToNode去拉取新的container请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
...
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
// 如果有预留container的话先分配预留的container
...
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
// 计算nm上是否还有空闲的资源进行分配container
if (calculator.computeAvailableContainers(node.getAvailableResource(),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
// 分配container
root.assignContainers(clusterResource, node, false);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
}
}

调度器给这台nm调度container时,先判断这台nm上是否有预留的container,如果有先对预留的container进行分配,如果没有预留的container才调用root.assignContainers进行调度。
root是CSQueue对象,CSQueue是一个接口,抽象类AbstractCSQueue实现了该接口,而AbstractCSQueue又被ParentQueue和ChildQueue继承,这里调用的是ParentQueue的assignContainers,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public synchronized CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);

// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
return assignment;
}

while (canAssign(clusterResource, node)) {
...
boolean localNeedToUnreserve = false;
Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
assignment.setType(assignedToChild.getType());
...
// Do not assign more than one container if this isn't the root queue
// or if we've already assigned an off-switch container
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
}
}
break;
}
}

return assignment;
}

分配时,先判断此队列是否可以访问该nm,然后判断是否可以访问该nm上的label,都判断通过之后调用assignContainersToChildQueues进行分配,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
FiCaSchedulerNode node, boolean needToUnreserve) {
...
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue childQueue = iter.next();
...
assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
...
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
}
break;
}
}

return assignment;
}

assignContainersToChildQueues调用ChildQueue的assignContainer进行分配,分配之后要讲改childQueue从队列中remove掉,然后重新插入到队列中,进行排序。
childQueue.assignContainers如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve) {
...
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
return NULL_ASSIGNMENT;
}

// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
return assignReservedContainer(application, node, reservedContainer,
clusterResource);
}
}

// Try to assign containers to applications in order
for (FiCaSchedulerApp application : activeApplications) {
...
// 加锁
synchronized (application) {
// Check if this resource is on the blacklist
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}

// Schedule in priority order
for (Priority priority : application.getPriorities()) {
// 为什么是ANY?
// 如果当前application中的request中没有ANY就不分配?
// 想办法debug试一下
ResourceRequest anyRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
continue;
}

// Required resource
Resource required = anyRequest.getCapability();

// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}

Set<String> requestedNodeLabels =
getRequestLabelSetByExpression(anyRequest
.getNodeLabelExpression());

// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
// priority request as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
required, requestedNodeLabels);

// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
return NULL_ASSIGNMENT;
}

// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, true, requestedNodeLabels)) {
break;
}

// Inform the application it is about to get a scheduling opportunity
// 这又是什么鬼?增加调度的机会?
application.addSchedulingOpportunity(priority);

// Try to schedule
// 开始调度
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null, needToUnreserve);

// Did the application skip this node?
if (assignment.getSkipped()) {
// Don't count 'skipped nodes' as a scheduling opportunity!
application.subtractSchedulingOpportunity(priority);
continue;
}

// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(
resourceCalculator, clusterResource, assigned, Resources.none())) {

// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
labelManager.getLabelsOnNode(node.getNodeID()));

// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
application.resetSchedulingOpportunities(priority);
}

// Done
return assignment;
} else {
// Do not assign out of order w.r.t priorities
break;
}
}
}

if(LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
}
application.showRequests();
}

return NULL_ASSIGNMENT;
}

LeafQueue.assignContainers会从遍历当前队列中正在运行的application的container请求,通过一系列的逻辑之后调用assignContainersOnNode进行调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
// 如果ResourceName是NODE_LOCAL
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// 如果ResourceName是Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}

assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}

// 如果ResourceName是Off-switch,也就是ANY
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}

return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}

assignContainersOnNode会根据请求中资源的类型进行不同的逻辑处理,由于am container中ResourceRequest为ANY,所以这里只关注下Off-switch的处理逻辑,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve);
}

return Resources.none();
}

assignOffSwitchContainers又调用了assignContainer,继续跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve) {
...
// container的资源大小
Resource capability = request.getCapability();
// 节点可用的资源大小
Resource available = node.getAvailableResource();
// 节点总共资源大小
Resource totalResource = node.getTotalResource();
// 判断请求的资源是否超过了节点的总量
if (!Resources.fitsIn(capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());

// Create the container if necessary
// 生成containerId
Container container =
getContainer(rmContainer, application, node, capability, priority);
...
// 先判断是否可以分配预留的container,
// 可以分配正常的container时,才去判断空闲的资源是否可以分配
// Can we allocate a container on this node?
int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability);
if (availableContainers > 0) {
// Allocate...
...
// Inform the application
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);

// Does the application need this resource?
if (allocatedContainer == null) {
return Resources.none();
}
// 通知node进行分配,将container放入launchedContainers map中
// Inform the node
node.allocateContainer(allocatedContainer);

LOG.info("assignedContainer" +
" application attempt=" + application.getApplicationAttemptId() +
" container=" + container +
" queue=" + this +
" clusterResource=" + clusterResource);

return container.getResource();
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
...
return Resources.none();
}
}

assignContainer首先判断container请求的资源是否超过了节点的总资源量,如果没有超过调用getContainer查看当前节点上是否有预留的container,没有则createContainer生成containerId。containerId生成之后,去判断当前节点上的空闲资源能否够分配,如果可以的话就调用application.allocate进行分配,application是FiCaSchedulerApp的对象。最后将container放入launchedContainers中,随后会心跳返回给node。allocate代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
...
// container在RM端称为RMcontainer
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext);

// Add it to allContainers list.
// 将生成的container放入allContainers list
// 调度器在调度的时候从中取出container
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);

// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());

// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);

// Inform the container
// 时间调度器来通知container已经准备好,触发container状态机
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
...
return rmContainer;
}

allocate创建一个RMContainer,并将其放入allContainers列表newlyAllocatedContainers中,并触发了RMContainer的状态机变化。
调度器从newlyAllocatedContainers取出container分配给node。

总结

大致的流程顺着代码理解的差不多了,但一些细节还是没有搞太清楚,随后详细debug下,在更新吧。
说下我目前的理解,调度器首先创建一个container请求,并查看newlyAllocatedContainers中是否有可调度的container,如果有则创建该container的TOKEN。然后nm来进行心跳的时候,从requests中取出对应的请求进行实例化,随后再放入newlyAllocatedContainers列表中,等待调度。