MapReduce–Reduce

上篇主要扒了下Map阶段的代码,并根据代码把Map阶段的流程串了下,本篇主要扒下Reduce阶段的代码,顺便串下Reduce阶段的流程。
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。Reduce Task和Map Task一样,同样也有四中任务类型,分别为job setup、job cleanup、reduce task、task cleanup。Reduce阶段的代码入口是ReduceTask.java类中的run方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
// Reduce阶段的三个小阶段
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);
// Reduce Task 的四种类型
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

// Initialize the codec
// 初始化压缩类型
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
// reduce 阶段依然要用到combiner
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
// 得到shuffle阶段的插件(此处将shuffle阶段当做一个服务插件)
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
// shuffleConsumerPlugin初始化
shuffleConsumerPlugin.init(shuffleContext);
// copy sort merge都在此阶段执行
rIter = shuffleConsumerPlugin.run();

// free up the data structures
mapOutputFilesOnDisk.clear();

sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();

if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
...
}

由上面的run代码段可以看出_copy_和_sort_是主要在shuffleConsumerPlugin.run()中执行,此方法结束之后则进入reduce阶段,执行runNewReucer()。则下面先看下shuffleConsumerPlugin的逻辑,从shuffleConsumerPlugin.init(shuffleContext)开始,

1
2
3
4
5
6
7
8
9
10
11
// Shuffle.java
public void init(ShuffleConsumerPlugin.Context context) {
...
// 主要用来存放map等相关信息,也包含fetcher相关的调度
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
// new 出一个MergeManagerImpl对象,在该对象中创建
// merge线程,包含inMemoryMerger(内存merge到磁盘)、onDiskMerger(磁盘merge)
merger = createMergeManager(context);
}

通过createMergeManagernew出一个MergeManagerImpl对象,该对象的构造函数主要构建几个Merge线程,并为一些参数设置阈值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, 
FileSystem localFS,
LocalDirAllocator localDirAllocator,
Reporter reporter,
CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
Progress mergePhase, MapOutputFile mapOutputFile) {
...
// SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"
// 默认是0.7 Reduce阶段内存缓冲区中用于shuffle的百分比
final float maxInMemCopyUse =
jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
// REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"
// 用于shuffle的buffer大小
// 此值是由Reduce运行时的内存*shuffle百分比得到的
this.memoryLimit =
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
* maxInMemCopyUse);
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
// SHUFFLE_MEMORY_LIMIT_PERCENT = "mapreduce.reduce.shuffle.memory.limit.percent"
// 单个shuffle的内存限制,默认是0.25
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}

usedMemory = 0L;
commitMemory = 0L;
// singleShuffleMemoryLimitPercent = mapreduce.reduce.shuffle.memory.limit.percent
// 每次写入时的最大限制,超过则直接写入disk
this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
// 在内存中merge的阈值,该阈值只有在开启内存merge功能才有用
// 内存merge的开关是mapreduce.reduce.merge.memtomem.enabled,默认关闭
this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
// SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent"
// merge阈值,用于shuffle的内存*mapreduce.reduce.shuffle.merge.percent
this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
0.90f));
...
// 内存merge功能是否开启
boolean allowMemToMemMerge =
jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
if (allowMemToMemMerge) {
this.memToMemMerger =
new IntermediateMemoryToMemoryMerger(this,
memToMemMergeOutputsThreshold);
this.memToMemMerger.start();
} else {
this.memToMemMerger = null;
}
// 启动内存merge到Disk的merger线程
this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start();
// 启动disk merge线程
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
this.mergePhase = mergePhase;
}

init结束之后,就是shuffleConsumerPlugin.run()也就是Shuffle.run。run的执行也标志着Reduce阶段的开始,run中启动一个抓取Completion Map的线程EventFetcher和copy数据的线程Fetcher,等copy结束之后数据进行merge sort。代码解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
// 每次从Completion Map的list中抓取多少个Map
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

// Start the map-completion events fetcher thread
// 抓取Map线程
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
// Fetcher线程的个数(分本地和远程两种情况)
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
// 文件在本地则启动LocalFetcher线程
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
// 循环启动远程Fetcher线程
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}

// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
...
}
...
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
// copy结束之后进行merge sort
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
...
return kvIter;
}

run中主要完成了reduce前期的工作,其中先来看下_EventFetcher_线程的run代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
int numNewMaps = getMapCompletionEvents();
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e)
...
}
} catch (InterruptedException e)
...
}

protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;

do {
// rpc 调用
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
events = update.getMapTaskCompletionEvents();
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);

assert !update.shouldReset() : "Unexpected legacy state";

// Update the last seen event ID
fromEventIdx += events.length;

// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
scheduler.resolve(event);
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
++numNewMaps;
}
}
} while (events.length == maxEventsToFetch);

return numNewMaps;
}

EventFetcher主要是通过getMapCompletionEvents方法的rpc拿到Completion Map,然后对_scheduler_的相关list进行赋值。umbilical是rpc协议TaskUmbilicalProtocol,RPC的实现类是TaskAttemptListenerImpl,则umbilical.getMapCompletionEvents实际上调用的是TaskAttemptListenerImpl.getMapCompletionEvents,然后继续调用JobImpl.getMapAttemptCompletionEvents方法,得到已完成的events,并将其放入scheduler中相应的map映射中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 得到events
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
readLock.lock();
try {
if (mapAttemptCompletionEvents.size() > startIndex) {
// 从mapAttemptCompletionEvents中截取maxEvent的map
int actualMax = Math.min(maxEvents,
(mapAttemptCompletionEvents.size() - startIndex));
events = mapAttemptCompletionEvents.subList(startIndex,
actualMax + startIndex).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}

// 将得到的events放入scheduler对应的map映射中
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
break;
case FAILED:
case KILLED:
case OBSOLETE:
obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
break;
}
}

EventFetcher到此分析完毕,接下来分析Fetcher线程。Fetcher线程有两种,分别是LocalFetcherFetcher,顾名思义LocalFetcher是fetcher本地文件用的,没有用到http服务。Fethcer是通过 http Get 从远程nodemanager上fetcher map输出。这里主要分析Fetcher线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
// 存在内存到磁盘的merge,则Fetcher被block
// 不像mapper端的环形缓冲区一样,可以一边向磁盘写,一边继续向内存写数据
// 这里当存在MemToDisk的merge时,fetch操作被阻塞
// memToDisk触发的条件是内存中的占比超过90%
merger.waitForResource();
// Get a host to shuffle from
// 随机得到一个Host
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
}
...
}
}
...
}

protected void copyFromHost(MapHost host) throws IOException {
...
// Get completed maps on 'host'
// 得到map list,最多拿到MAX_MAPS_AT_ONCE(默认是20)
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;
}
...
// List of maps to be fetched yet
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
try {
// 通过jetty服务器创建http连接
setupConnectionsWithRetry(host, remaining, url);
if (stopped) {
abortConnect(host, remaining);
return;
}
} catch (IOException ie) {
...
return;
}
// 根据http创建一个input流
input = new DataInputStream(connection.getInputStream());
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
try {
// 复制map
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
} catch (IOException e) {
//
// Setup connection again if disconnected by NM
connection.disconnect();
// Get map output from remaining tasks only.
url = getMapOutputURL(host, remaining);
// Connect with retry as expecting host's recovery take sometime.
setupConnectionsWithRetry(host, remaining, url);
if (stopped) {
abortConnect(host, remaining);
return;
}
input = new DataInputStream(connection.getInputStream());
}
}
...
} finally {
...
}
}

private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining,
boolean canRetry) throws IOException {
...
try {
long startTime = Time.monotonicNow();
int forReduce = -1;
//Read the shuffle header
try {
ShuffleHeader header = new ShuffleHeader();
// 读取内容到input流中
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
...
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
...
// Get the location for the map output - either in-memory or on-disk
try {
// 根据内容大小申请资源,此处决定写入的目的地 mem or disk
mapOutput = merger.reserve(mapId, decompressedLength, id);
} catch (IOException ioe) {
...
}
// Check if we can shuffle *now* ...
// 如果当前内存使用超过memoryLimit,则merger.reserve 返回null
// 此时不能shuffle
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}

// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
// 将数据写入mem 或者 disk
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
//
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
return null;
} catch (IOException ioe) {
...
}
}

Fetcher线程的run方法中先判断是否有_merge_,如果有则block当前Fetcher线程,这里只判断是否_存在内存到磁盘的merge_。随后从_Host_列表中_随机_选出一个_Host_(随机方法可查看scheduler.getHost()代码),进行copy,copy的入口是copyFromHostcopyFromHost的主要工作是建立_http连接_,然后循环调用copyMapOutput对_某个map_的输出进行copy。在copyMapOutput中会判断当前的内容是输出到mem还是disk,此处的逻辑判断在merger.reserve()中,看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
long requestedSize,
int fetcher
) throws IOException {
// 判断当前内容大小是否能写入Memory,不能则直接写入disk
// requestedSize < maxSingleShuffleLimit 为true
// memoryLimit * mapreduce.reduce.shuffle.memory.limit.percent
if (!canShuffleToMemory(requestedSize)) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}

// Stall shuffle if we are above the memory limit

// 下面这段英语的意思是说:
// 如果将(usedMemory > memoryLimit) 改为 (usedMemory + requestedSize > memoryLimit)
// 会出现所有的Fetcher线程被死循环
// 当used size < mergeThreshold && requested size < singleShuffleLimit
// && usedMemory + requestedSize > memoryLimit 则写操作暂停,但此时并没有
// 超过mergeThreshold,并不会触发merge Thread,则假如所有的Fetcher的requested size 都满足
// usedMemory + requestedSize > memoryLimit,则此时all threads could just be stalling and not
// make progress at all.

// It is possible that all threads could just be stalling and not make
// progress at all. This could happen when:
//
// requested size is causing the used memory to go above limit &&
// requested size < singleShuffleLimit &&
// current used size < mergeThreshold (merge will not get triggered)
//
// To avoid this from happening, we allow exactly one thread to go past
// the memory limit. We check (usedMemory > memoryLimit) and not
// (usedMemory + requestedSize > memoryLimit). When this thread is done
// fetching, this will automatically trigger a merge thereby unlocking
// all the stalled threads
// 如果usedMemory > memoryLimit 则返回null,暂定shuffle,等待memToDisk
if (usedMemory > memoryLimit) {
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return null;
}

// Allow the in-memory shuffle to progress
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ "CommitMemory is (" + commitMemory + ")");
// usedMemory += requestedSize and new InMemoryMapOutput
return unconditionalReserve(mapId, requestedSize, true);
}

merger.reserve()得到_OnDiskMapOutput_或者_InMemoryMapOutput_之后,调用mapOutput.shuffle将内容通过输出流写入Memory或者Disk。随后由scheduler.copySucceeded进行收尾工作,主要包括将已完成copy的map状态设置为true(map copy的状态存储在finishedMaps的boolean数组中,mapIndex为数组的id,完成则为true),完成一些统计信息,以及由output.commit()提交并关闭mapOutput流,在关闭的过程中会判断是否需要_merge_。下面看下代码,重点看下output.commit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public synchronized void copySucceeded(TaskAttemptID mapId,
MapHost host,
long bytes,
long startMillis,
long endMillis,
MapOutput<K,V> output
) throws IOException {
// 对完成的map进行一些状态修改
failureCounts.remove(mapId);
hostFailures.remove(host.getHostName());
int mapIndex = mapId.getTaskID().getId();

if (!finishedMaps[mapIndex]) {
// 提交并关闭mapOutput流 并 判断是否触发merge线程
output.commit();
// finishedMaps中对应的map的状态设置为true
finishedMaps[mapIndex] = true;
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
}

// update single copy task status
// 完成统计信息
...
}
}
// InMemoryMapOutput.commit
public void commit() throws IOException {
merger.closeInMemoryFile(this);
}
// OnDiskMapOutput.commit
public void commit() throws IOException {
fs.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath);
}

merger.reserve()得到mapOutput有两种情况,则先来看InMemoryMapOutput.commit,其内调用了closeInMemoryFile,观其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) { 
inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+ ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
// 更新内存使用量
commitMemory+= mapOutput.getSize();
// 判断当前使用量是否超过mergeThreshold mapreduce.reduce.shuffle.merge.percent
// Can hang if mergeThreshold is really low.
if (commitMemory >= mergeThreshold) {
LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
commitMemory + " > mergeThreshold=" + mergeThreshold +
". Current usedMemory=" + usedMemory);
// inMemoryMergedMapOutputs 是由memTomem更新的
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
// 触发MergeThread进行merge
inMemoryMerger.startMerge(inMemoryMapOutputs);
commitMemory = 0L; // Reset commitMemory.
}

if (memToMemMerger != null) {
if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
memToMemMerger.startMerge(inMemoryMapOutputs);
}
}
}

再来看下OnDiskMapOutput.commit,将输出的临时文件重命名,然后调用closeOnDiskFile,查其代码如下:

1
2
3
4
5
6
7
8
public synchronized void closeOnDiskFile(CompressAwarePath file) {
onDiskMapOutputs.add(file);
// 是否满足Disk merge条件
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
// 触发MergeThread进行merge
onDiskMerger.startMerge(onDiskMapOutputs);
}
}

在两个commit里都是通过startMerge触发MergeThread的,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// MergeThread.startMerge
public void startMerge(Set<T> inputs) {
if (!closed) {
// 标识触发merge的次数
numPending.incrementAndGet();
List<T> toMergeInputs = new ArrayList<T>();
Iterator<T> iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
toMergeInputs.add(iter.next());
iter.remove();
}
LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
" segments, while ignoring " + inputs.size() + " segments");
synchronized(pendingToBeMerged) {
// 将待merge的output放入pendingToBeMerged中
// pendingToBeMerged 是LinkedList
pendingToBeMerged.addLast(toMergeInputs);
pendingToBeMerged.notifyAll();
}
}
}

首先numPending记录了触发merge的次数,_numPending_会在MergeThread.waitForMerge()MergeThread.run中用到,MergeThread.waitForMerge()Fetcher.run通过merger.waitForResource() 调用,判断当前是否有merge线程,有则阻塞Fetcher线程。在MergeThread.run中如果merge成功则调用numPending.decrementAndGet();notifyAll();更新_numPending_的状态并通知所有被MergeThread.wait()阻塞的线程。
pendingToBeMerged是_LinkedList_类型的,将需要merge的list添加到_pendingToBeMerged_末尾,在MergeThread.run中从链表的头取出需要merge的list。MergeThread.run代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void run() {
while (true) {
List<T> inputs = null;
try {
// Wait for notification to start the merge...
synchronized (pendingToBeMerged) {
// pendingToBeMerged中没有时mergeThread阻塞
while(pendingToBeMerged.size() <= 0) {
pendingToBeMerged.wait();
}
// Pickup the inputs to merge.
inputs = pendingToBeMerged.removeFirst();
}

// Merge
// 具体的merge,由InMemoryMapOutput和OnDiskMapOutput实现
merge(inputs);
} catch (InterruptedException ie) {
numPending.set(0);
return;
} catch(Throwable t) {
numPending.set(0);
reporter.reportException(t);
return;
} finally {
synchronized (this) {
// merge成功并通知被阻塞的线程
numPending.decrementAndGet();
notifyAll();
}
}
}
}

MergeThread.startMerge激活MergeThread线程之后,从pendingToBeMerged中取出链头的元素,通过不同类实现的merge()方法进行具体的merge过程,最后更新_mumPending_的状态。这里主要分析下_InMemoryMapOutput_和_OnDiskMapOutput_实现的merge方法。先看InMemoryMapOutput.merge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}

//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)

//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();

List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();

Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);

FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");

rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);

if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
...
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}

// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}

InMemoryMapOutput.merge主要是调用了Merger.merge对各个文件进行堆排序,这里还要判断下是否有combiner(__只有mem to disk时才会判断__),方法的结尾会调用closeOnDiskFile,检查下内存的文件merge到disk之后,是否满足disk merge的条件。InMemoryMapOutput.merge到此结束,接下来看下OnDiskMapOutput.merge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}

long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);

LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");

// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}

// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);

Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}

closeOnDiskFile(compressAwarePath);
...
}

OnDiskMapOutput.merge也是调用Merger.merge对文件进行堆排序,然后write到本地,最后依然会调用closeOnDiskFile检查merge之后是否依然满足disk merge条件。
_Fetcher_线程分析结束,代码回到Shuffle.run中,此时copy阶段结束,执行copyPhase.complete(),标识_SORT_阶段,执行taskStatus.setPhase(TaskStatus.Phase.SORT),真正的sort逻辑是在merger.close(),之所以把把此阶段称为__SORT阶段__,可能是因为在此阶段是纯粹的SORT吧,而不掺杂别的操作,这就可以解释为什么在之前的_COPY阶段_虽然也存在SORT,但并没有将SORT阶段从此处开始。下面跟下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public RawKeyValueIterator close() throws Throwable {
...
return finalMerge(jobConf, rfs, memory, disk);
}

private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<CompressAwarePath> onDiskMapOutputs
) throws IOException {
...
// 每个reduce最大的buffer占百分比
final float maxRedPer =
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
...
// reduce用于存储buffer的最大内存
int maxInMemReduce = (int)Math.min(
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
boolean keepInputs = job.getKeepFailedTaskFiles();
final Path tmpDir = new Path(reduceId.toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
// segments required to vacate memory
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
// 标识是否进行mergePhase是否完成,是否执行内部merge?????
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
// 不符合条件就走到了if底
// 内存中有数据,磁盘中的文件少,
if (numMemDiskSegments > 0 &&
ioSortFactor > onDiskMapOutputs.size()) {

// If we reach here, it implies that we have less than io.sort.factor
// disk segments and this will be incremented by 1 (result of the
// memory segments merge). Since this total would still be
// <= io.sort.factor, we will not do any more intermediate merges,
// the merge of all these disk segments would be directly fed to the
// reduce method

mergePhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
final Path outputPath =
mapOutputFile.getInputFileForWrite(mapId,
inMemToDiskBytes).suffix(
Task.MERGED_OUTPUT_PREFIX);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);

FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
codec, null, true);
try {
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength()));
writer = null;
// add to list of final disk outputs.
} catch (IOException e) {
if (null != outputPath) {
try {
fs.delete(outputPath, true);
} catch (IOException ie) {
// NOTHING
}
}
throw e;
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
"reduce memory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
}

// segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
long rawBytes = inMemToDiskBytes;
CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
new CompressAwarePath[onDiskMapOutputs.size()]);
for (CompressAwarePath file : onDisk) {
long fileLength = fs.getFileStatus(file).getLen();
onDiskBytes += fileLength;
rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;

LOG.debug("Disk file: " + file + " Length is " + fileLength);
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter), file.getRawDataLength()
));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
// 安装长度大小进行排序,方便进行最小堆排序??????
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});

// build final list of segments from merged backed by disk + in-mem
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
// Pass mergePhase only if there is a going to be intermediate
// merges. See comment where mergePhaseFinished is being set
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null,
null);

}

此阶段的主要逻辑在finalMerge中实现,这里的代码逻辑是先判断内存中的数据和磁盘中的文件个数的多少(ioSortFactor > onDiskMapOutputs.size()),符合条件则进行内存中的文件进行merge,输出到disk中。随后对disk中的文件进行merge,此处夹杂一行代码Collections.sort,对disk中的文件进行长度的排序,形成一个小顶堆进行merge,将内存和磁盘最终的merge文件放入finalSegments中进行最终的merge。则shuffle.run结束。准备进入下一阶段_Reduce阶段_。回到ReduceTask.run中,根据api执行runNewReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}

在这里要说下reduce中的value-list是怎么形成的。
入口在rudecer.run(reducerContext)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}

代码在while循环处调用用户自定义的reduce,则value-list应该再while的判断语句中实现,则查看context.nextKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// WrappedReducer.nextKey()
public boolean nextKey() throws IOException, InterruptedException {
return reduceContext.nextKey();
}
// ReduceContextImpl.nextKey
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}
// ReduceContextImpl.nextKeyValue
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value);

currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();

if (isMarked) {
backupStore.write(nextKey, nextVal);
}

hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}

主要功能是在ReduceContextImpl.nextKey代码中实现的。当有下一个键值对并且key值一样时(hasMore && nextKeyIsSame),执行nextKeyValue,如果下一个键值对的key值不一样,则增加key的个数然后执行nextKeyValue

总结

Reduce阶段大致分为copy、sort和reduce阶段,而_copy阶段又分为shuffle和merge阶段_,copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge,(其中还应注意merge的条件和数据往内存中写入时的情况)。带数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段。