上一篇提到RMContainerAllocator既是一个service也是一个eventHandle,并且简单介绍了下作为eventHandle的功能,现在来介绍下作为service服务的功能。

RMContainerAllocator继承RMContainerRequestor类,RMContainerRequestor又继承自RMCommunicator,RMCommunicator类在代码中的注释是Registers/unregisters to RM and sends heartbeats to RM.

RMContainerAllocator是一个服务,在MRAppMaster.serviceInit中添加到MRAppMaster中,并且在serviceStart中启动该服务,启动时首先调用init,然后调用start。
RMContainerAllocator.serviceInit主要是一些属性值的设置,重点看下serviceStart方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void serviceStart() throws Exception {
// new 一个线程从eventQueue中take事件进行处理
this.eventHandlingThread = new Thread() {
@SuppressWarnings("unchecked")
@Override
public void run() {
ContainerAllocatorEvent event;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
...
return;
}
try {
handleEvent(event);
} catch (Throwable t) {
...
return;
}
}
}
};
this.eventHandlingThread.start();
// 调用父类的serviceStart,重点关注
super.serviceStart();
}

serviceStart中启动了一个eventHandlingThread线程,用于处理eventQueue中的事件,此时看RMContainerAllocator类的主要功能是eventHandle,接着继续看它调用的父类super.serviceStart(),该父类是RMCommunicator

1
2
3
4
5
6
7
8
9
10
11
12
protected void serviceStart() throws Exception {
// scheduler是ApplicationMasterProtocol类型
scheduler= createSchedulerProxy();
JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId);
// am注册
register();
// 启动一个心跳线程
startAllocatorThread();
super.serviceStart();
}

有两个重要方法,分别是registerstartAllocatorThread,其实RMContainerAllocator作为service的主要功能就是注册心跳

注册

register是向rm注册此mr的am,startAllocatorThread是启动am于rm之间心跳的线程。register代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void register() {
//Register
...
try {
RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
...
RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request);
isApplicationMasterRegistered = true;
...
} catch (Exception are) {
LOG.error("Exception while registering", are);
throw new YarnRuntimeException(are);
}
}

register是向ApplicationMasterService注册am服务,通过rpc协议ApplicationMasterProtocol调用ApplicationMasterService.registerApplicationMaster向ApplicationMasterService进行注册,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
...
// Allow only one thread in AM to do registerApp at a time.
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
// hasApplicationMasterRegistered中有个双重校验锁
// responseId>=0是判断是否注册的一个条件
if (hasApplicationMasterRegistered(applicationAttemptId)) {
...
}
// 向AMLivelinessMonitor更新am存活时间
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
// Setting the response id to 0 to identify if the
// application master is register for the respective attemptid
// 未注册时为-1,判断是否注册的一个标准时>=0
// responseId从0开始,注册时为0
lastResponse.setResponseId(0);
lock.setAllocateResponse(lastResponse);
LOG.info("AM registration " + applicationAttemptId);
// 由异步调度器发出RMAppAttemptEventType.REGISTERED事件
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
"ApplicationMasterService", appID, applicationAttemptId);
// 构造response
// Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory
.newRecordInstance(RegisterApplicationMasterResponse.class);
...
response.setSchedulerResourceTypes(rScheduler
.getSchedulingResourceTypes());
return response;
}
}

am向ApplicationMasterService注册时registerApplicationMaster方法是非现场安全的,则注册时需要获取AllocateResponseLock对象锁,其注册的流程是更新am在AMLivelinessMonitor中的生命时钟,设置responseId为0,然后构造一个RMAppAttemptEventType.REGISTERED事件类型的事件,由异步调度器分发给相应的Eventhandle处理。
此时注册过程结束,构建一个response返回给客户端。

心跳

am注册结束之后,回到RMCommunicator.serviceStart中,执行startAllocatorThread启动一个心跳线程,周期性的向ApplicationMasterService进行心跳。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void startAllocatorThread() {
allocatorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
// 心跳间隔
Thread.sleep(rmPollInterval);
try {
// 调用RMContainerAllocator.heartbeat
heartbeat();
} catch (YarnRuntimeException e) {
...
}

lastHeartbeatTime = context.getClock().getTime();
// 每次心跳都调用这个,干什么??????
executeHeartbeatCallbacks();
} catch (InterruptedException e) {
...
}
}
}
});
allocatorThread.setName("RMCommunicator Allocator");
allocatorThread.start();
}

这里启动一个线程,将线程命名为RMCommunicator Allocator,每隔rmPollInterval时间,执行一次heartbeat。此文中RMCommunicator抽象类的子类是RMContainerAllocator,则heartbeat的实现方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
// 关键方法,会远程调用Scheduler.allocated方法对资源进行allocate
// 这里只是申请到资源,但资源并没有于task绑定,也就是说资源并没有分配给task
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
// 将资源分配给task
scheduledRequests.assign(allocatedContainers);
}
...
// 如果task的进度发生变化并且剩余的map个大于0,则触发是否调度reduce判断
if ((lastCompletedTasks != completedTasks) ||
(scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
// 是否要调度reduce
// 这里要关注mapreduce.job.reduce.slowstart.completedmaps
// 和yarn.app.mapreduce.am.job.reduce.rampup.limit
scheduleReduces(
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}

scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}

heartbeat主要用于am和rm之间周期性的通信,以告知rm此am是否存活,但heartbeat中不止只是生命心跳,在发送的request中包含请求资源的列表ask需要释放的资源列表release,返回的response包含了allocated的资源和*已经完成的container(是由上次请求中release列表中得到的)*。
返回的container资源此时并没有分配给task,而是通过scheduledRequests.assign将container资源分配给具体的task。
heartbeat中通过getResources()通过Scheduler获取集群资源,查看下getResources的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private List<Container> getResources() throws Exception {
...
// 发送request,远程调用Scheduler.allocated分配资源
try {
response = makeRemoteRequest();
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e ) {
...
}
...
List<Container> newContainers = response.getAllocatedContainers();
// Setting NMTokens
// Setting AMRMToken
...

List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
...

//Called on each allocation. Will know about newly blacklisted/added hosts.
computeIgnoreBlacklisting();
// 让response.getUpdatedNodes()进行report,查看是否为不稳定状态,
// 更新unusableNodes,将这些节点上的task kill掉
handleUpdatedNodes(response);
// 对心跳响应中得到的finishContainer进行处理
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId());
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id "
+ cont.getContainerId());
} else {
pendingRelease.remove(cont.getContainerId());
assignedRequests.remove(attemptID);

// send the container completed event to Task attempt
eventHandler.handle(createContainerFinishedEvent(cont, attemptID));

// Send the diagnostics
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
}
}
return newContainers;
}

一看getResources这个名字就知道这里会有个远程rpc调用从rm处获取资源,方法为makeRemoteRequest,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals));
// ask 通过 scheduledRequests.addMap addResourceRequest 更新
// 请求中包括 ask & release 集合
AllocateRequest allocateRequest =
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest);
// 调度器分配资源 scheduler.allocate (ApplicationMasterProtocol)
// 在allocate中会更新AMLivelinessMonitor中的时间
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
...
return allocateResponse;
}

makeRemoteRequest利用ask和release列表以及blackList构建一个AllocateRequest对象,用于向ApplicationMasterService请求分配资源。
这里需要注意的是ask中的所有请求和release列表中container会在每次心跳时发送给ApplicationMasterService,然后此时从ApplicationMasterService返回的response中包含的已分配的container信息并没有此次ask中的请求信息,而是在此次心跳之前发送给ApplicationMasterService的container请求对应的资源。

makeRemoteRequest返回ApplicationMasterService的response,response中包括响应id、当前集群可用的资源和集群的节点个数,还有最关键的分配的container列表以及finishContainer列表
getResources中拿到newContainersfinishedContainers,对newContainers设置一些Tokens,遍历finishedContainers,从assignedRequests将finishedContainer移除,并触发一个TaskAttemptEvent事件。
在getResources中只对finishedContainers进行了处理,对新得到的container资源并没有分配给具体的task。从getResource方法返回到heartbeat中,调用scheduledRequests.assign(allocatedContainers)给task分配资源。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void assign(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size());
containersAllocated += allocatedContainers.size();
// 对allocatedContainers进行过滤,将不能分配的资源移除
...
// 分配资源给task
assignContainers(allocatedContainers);
// 将剩余未分配的资源释放掉
// release container if we could not assign it
it = allocatedContainers.iterator();
while (it.hasNext()) {
Container allocated = it.next();
LOG.info("Releasing unassigned and invalid container "
+ allocated + ". RM may have assignment issues");
containerNotAssigned(allocated);
}
}

assign在分配资源之前会先过滤下allocatedContainers列表中的资源,对于无法满足request的container和在黑名单中的container从列表中移除,对过滤过的allocatedContainers调用assignContainers进行分配,最后对没有分配的剩余container进行释放。assignContainers代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void assignContainers(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator();
while (it.hasNext()) {
Container allocated = it.next();
// 无本地性要求的task 如 reduce和fail map task
ContainerRequest assigned = assignWithoutLocality(allocated);
if (assigned != null) {
// 将container分配给task attempt,
// 触发TaskAttemptEventType.TA_ASSIGNED事件类型
containerAssigned(allocated, assigned);
it.remove();
}
}
// 分配map
assignMapsWithLocality(allocatedContainers);
}

分配container时,先对allocatedContainers进行遍历,遍历中对不需要本地性的container进行分配,(不需要本地性的container为失败之后重跑的map和reduce),遍历结束之后调用assignMapsWithLocality将剩余的container分配给map。
这里可以看出失败之后重跑的map和reduce的运行优先级比map的运行优先级高,防止集群中的job都在等待reduce资源,而无法使job结束释放资源。

看下assignMapsWithLocality方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void assignMapsWithLocality(List<Container> allocatedContainers) {
// try to assign to all nodes first to match node local
Iterator<Container> it = allocatedContainers.iterator();
// 数据本地性
while(it.hasNext() && maps.size() > 0){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
// 取出改host对应的attempt列表
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
...
TaskAttemptId tId = list.removeFirst();
// 如果tId在待执行的map集合中,则去处对应的request进行分配资源
if (maps.containsKey(tId)) {
ContainerRequest assigned = maps.remove(tId);
// 无论map还是reduce都是调用次方法
containerAssigned(allocated, assigned);
it.remove();
...
break;
}
}
}

// try to match all rack local
it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0){
...
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
String rack = RackResolver.resolve(host).getNetworkLocation();
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
while (list != null && list.size() > 0) {
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
ContainerRequest assigned = maps.remove(tId);
containerAssigned(allocated, assigned);
...
break;
}
}
}

// assign remaining
it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
TaskAttemptId tId = maps.keySet().iterator().next();
ContainerRequest assigned = maps.remove(tId);
containerAssigned(allocated, assigned);
it.remove();
...
}
}

assignMapsWithLocality先对allocatedContainers列表遍历一边,将container资源进行host本地性分配,host本地性是指container所在的节点上是否有container request,如果有则将request从maps中取出,调用containerAssigned进行资源分配。
host本地性分配完之后,再次遍历allocatedContainers列表,将剩下的container针对rack本地性进行分配,此次遍历结束之后,还需进行一次遍历,对剩下的container进行不考虑本地性的分配。

container分配给map还是reduce都是调用containerAssigned,看下具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
private void containerAssigned(Container allocated, 
ContainerRequest assigned) {
// Update resource requests
decContainerReq(assigned);
// 触发TaskAttemptEventType.TA_ASSIGNED事件类型
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated, applicationACLs));
// 将container放入已分配的集合中
assignedRequests.add(allocated, assigned.attemptID);
...
}

总结

本篇主要介绍了RMContainerAllocator作为一个service时的相关逻辑,这部分逻辑大部分是在RMContainerRequestor中实现的,而RMContainerRequestor又继承自RMCommunicator

RMContainerAllocator作为service时的主要功能为注册心跳
注册时由AM向ApplicationMasterService发出请求,在注册的同时会更新下AMLivelinessMonitor中的存活时间,并触发RMAppAttemptEventType.REGISTERED事件类型的事件,由异步调度器分发给相应的Eventhandle处理。
注册比较简单,心跳则有一个专门的线程进行周期性的执行。
心跳不止只是用来告诉rm此am是否存活,更重要的是在心跳中会向rm发送申请container的请求,并将接收到的container分配给task。am发送的申请container请求可以认为是异步的,因为一次心跳过程中的发送当前所需的container资源,然后接收到的container响应是之前心跳中发送的container请求,而不是此次心跳中发送的container请求对应的响应,此次心跳中所需的container需在下次心跳或者下下次心跳中才能得到响应

结合前面几篇关于ApplicationMaster的文章,目前已经把ApplicationMaster与RM之前的相关的流程已梳理完毕了(ApplicationMaster与NM之前的流程随后梳理),其中有几个关键类ApplicationMaster、AMLivelinessMonitor、ApplicationMasterLauncher和ApplicationMasterService,下面将会有一篇文章综合介绍下这4个类之间的关系。