MapReduce–Map

MapReduce由Map和Reduce组成,而Map和Reduce又可以分为很多个小phase,下面就从源码的角度去扒下Map的流程。
通过intellij idea进行debug调试,在New API的流程发现Map中具体流程可以大致分为两种情况:有Reduce和没有Reduce

  • 没有Reduce

split–>read–map(用户自定义的map函数)–>write(未排序)–>output

  • 有Reduce

split–>read–>map(用户自定义的map函数)–>partition–>collect–>buffer–>quicksort–>(combiner)–>spill–>merge(heapsort、combiner)–>output

本篇主要介绍有Reduce的情况下Map中各个阶段的流程。

跟踪代码到MapTask.run中,代码中先根据是否有Reduce对Map阶段进行分割,然后判断Map Task的类型(Map Task分为job setup、job cleanup、map task、task cleanup),主要跟下map task,进入runNewMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;

if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
...
// make a mapper
// 通过反射得到Mapper的实现类
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
// 得到map对应的split
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
// 得到RecordReader对象,用于读取split中的文本,使其变为key value的格式
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object
if (job.getNumReduceTasks() == 0) {
// 没有reduce时,直接输出
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);

try {
// read split
input.initialize(split, mapperContext);
// 调用用户继承的Mapper类中的方法 也就是用户编写的map阶段
mapper.run(mapperContext);
mapPhase.complete();
// SORT阶段,在此阶段进行merge 临时文件
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

Split

runNewMapper中包含了整个Map的所有phase,首先通过split = getSplitDetails()得到当前map对应的split,split是在JobSubmitter.submitJobInternal中调用writeSplits得到的,有多少个split就对应多少个map。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
...
// 通过InputFormat从原文件中达到splits
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
//FileInputFormat.getSplits
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
...
for (FileStatus file: files) {
...
if (length != 0) {
...
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 得到split的大小,此参数关系着map的个数,比较重要
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
// 如果当前file的大小小于splitSize则不进入while循环,即不对该file进行split
// 也就是说如果当前mr中包含大量的小文件,则该splitSize不能决定map的个数
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
...
return splits;
}

上面的代码将mr的输入文件进行切分为splits,其中splitSize参数比较重要,在此对其取值代码进行解析下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
// SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"
// 默认为0
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
minSize = Math.max(1, 1)
long maxSize = getMaxSplitSize(job);
public static long getMaxSplitSize(JobContext context) {
// SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
// hads中块的大小,默认128M
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
splitSize = Math.max(max(1, mapreduce.input.fileinputformat.split.minsize),
min(mapreduce.input.fileinputformat.split.maxsize, blockSize));

splitSize的大小由split.minsizesplit.maxsizeblocksize这三个参数控制,其中主要是由split.minsizeblocksize两个参数决定,_取这两个的较大值_。

read

将输入文件切分为splits之后,在MapTask.runNewMapper中加载,由RecordReader对象进行读取

map(用户自定义的map函数)

读取split文件之后,调用Mapper.run方法,进入用户自己继承的Mapper类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Mapper.run
public void run(Context context) throws IOException, InterruptedException {
// 执行用户重写的setup
setup(context);
try {
// 迭代
while (context.nextKeyValue()) {
// 执行用户重写的map函数
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

//以WordCount代码为例
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}

collect

map逻辑完之后,将map的每条结果通过context.write进行collect。此处的context.write最终调用的是在runNewMapper中实例化的output(output = new NewOutputCollector(taskContext, job, umbilical, reporter))对象的write方法。

1
2
3
4
5
// NewOutputCollector.write
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}

partition

由上面的代码可以看出每条被map处理之后的结果在collect中,会对先对其进行分区处理,默认使用HashPartitioner.java

1
2
3
4
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

buffer

将map处理之后的key value 进行分区之后,写入buffer中的_环形缓冲区_中。
先来看下环形缓冲区的数据结构,然后理解其数据写入就比较容易了。
环形缓冲区在MapTask.MapOutputBuffer中定义,相关的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// k/v accounting
// 存放meta数据的IntBuffer,都是int entry,占4byte
private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
// 分割meta和key value内容的标识
// meta数据和key value内容都存放在同一个环形缓冲区,所以需要分隔开
int equator; // marks origin of meta/serialization
int bufstart; // marks beginning of spill
int bufend; // marks beginning of collectable
int bufmark; // marks end of record
int bufindex; // marks end of collected
int bufvoid; // marks the point where we should stop
// reading at the end of the buffer
// 存放key value的byte数组,单位是byte,注意与kvmeta区分
byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];

// key value在kvbuffer中的地址存放在偏移kvindex的距离
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
// partition信息存在kvmeta中偏移kvindex的距离
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
// 一对key value的meta数据在kvmeta中占用的个数
private static final int NMETA = 4; // num meta ints
// 一对key value的meta数据在kvmeta中占用的byte数
private static final int METASIZE = NMETA * 4; // size in bytes

环形缓冲区的结构在MapOutputBuffer.init中创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
...
//MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent
// map 端buffer所占的百分比
//sanity checks
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
//IO_SORT_MB = "mapreduce.task.io.sort.mb"
// map 端buffer大小
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
// 所有的spill index 在内存所占的大小的阈值
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
...
// 排序的实现类,可以自己实现。 这里用的是改写的快排
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
// 上面IO_SORT_MB的单位是MB,左移20位将单位转化为byte
int maxMemUsage = sortmb << 20;
// METASIZE是元数据的长度,元数据有4个int单元,分别为
// VALSTART、KEYSTART、PARTITION、VALLEN,而int为4个byte,
// 所以METASIZE长度为16。下面是计算buffer中最多有多少byte来存元数据
// 此时maxMemUsage是METASIZE的整数倍
maxMemUsage -= maxMemUsage % METASIZE;
// 元数据数组 以byte为单位
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
// 将kvbuffer转化为int型的kvmeta 以int为单位,也就是4byte
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
// 设置buf和kvmeta的分界线
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
// kvmeta中存放元数据实体的最大个数
maxRec = kvmeta.capacity() / NMETA;
// buffer spill时的阈值(不单单是sortmb*spillper)
// 更加精确的是kvbuffer.length*spiller
softLimit = (int)(kvbuffer.length * spillper);
// 此变量较为重要,作为spill的动态衡量标准
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
// 将bb作为key序列化写入的output
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
// 将bb作为value序列化写入的output
valSerializer.open(bb);
...
// combiner
...
spillInProgress = false;
// 最后一次merge时,在有combiner的情况下,超过此阈值才执行combiner
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}

// setEquator(0)的代码如下
private void setEquator(int pos) {
equator = pos;
// set index prior to first entry, aligned at meta boundary
// 第一个 entry的末尾位置,即元数据和kv数据的分界线 单位是byte
final int aligned = pos - (pos % METASIZE);
// Cast one of the operands to long to avoid integer overflow
// 元数据中存放数据的起始位置
kvindex = (int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
}

此时环形缓冲区已被初始化,但其具体结构及其使用还不太明了,继续看下面的代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// MapOutputBuffer.collect
public synchronized void collect(K key, V value, final int partition
) throws IOException {
...
// 新数据collect时,先将剩余的空间减去元数据的长度,之后进行判断
bufferRemaining -= METASIZE;
if (bufferRemaining <= 0) {
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
// 首次spill时,spillInProgress是false
if (!spillInProgress) {
// 得到kvindex的byte位置
final int kvbidx = 4 * kvindex;
// 得到kvend的byte位置
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
// 将key value 及元数据信息写入缓冲区
try {
// serialize key bytes into buffer
int keystart = bufindex;
// 将key序列化写入kvbuffer中,并移动bufindex
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);

// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();

mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));

// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}

每次map的结果partition之后来到collect时,先从剩余的空间中减去此条数据元数据的长度bufferRemaining -= METASIZE,然后判断bufferRemaining是否小于0,

  • 大于0则直接将key/value对和元数据信息写入缓冲区中,key和value是通过keySerializer.serialize序列化并通过一系列的方法调用,最终调用_MapOutputBuffer_的内部类Buffer.write方法将其内容写入_kvbuffer_中。接下来是将元数据信息写入kvmeta中,元数据信息包括partition、key的起始位置、value的起始位置以及value的长度。

  • 首次小于等于0进入if语句,此时剩余的空间不足,将启动spill线程。先得到__可重入锁__spillLock的锁,并且此时并无有任何spill线程运行,所以spillInProgress=false,进入else if语句中,执行startSpill(),唤醒SpillThread线程,重新设置_equator_和_bufferRemaining_。随后正常将key/value对写入kvbuffer中,如果没有足够的空间存储则在Buffer.write中阻塞。writestartSpill的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public void write(byte b[], int off, int len)
throws IOException {
// must always verify the invariant that at least METASIZE bytes are
// available beyond kvindex, even when len == 0
bufferRemaining -= len;
if (bufferRemaining <= 0) {
// writing these bytes could exhaust available buffer space or fill
// the buffer to soft limit. check if spill or blocking are necessary
boolean blockwrite = false;
spillLock.lock();
try {
do {
checkSpillException();

final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// ser distance to key index
final int distkvi = distanceTo(bufindex, kvbidx);
// ser distance to spill end index
final int distkve = distanceTo(bufindex, kvbend);

// if kvindex is closer than kvend, then a spill is neither in
// progress nor complete and reset since the lock was held. The
// write should block only if there is insufficient space to
// complete the current write, write the metadata for this record,
// and write the metadata for the next record. If kvend is closer,
// then the write should block if there is too little space for
// either the metadata or the current write. Note that collect
// ensures its metadata requirement with a zero-length write
blockwrite = distkvi <= distkve
? distkvi <= len + 2 * METASIZE
: distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;

if (!spillInProgress) {
if (blockwrite) {
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
// need to use meta exclusively; zero-len rec & 100% spill
// pcnt would fail
resetSpill(); // resetSpill doesn't move bufindex, kvindex
bufferRemaining = Math.min(
distkvi - 2 * METASIZE,
softLimit - distanceTo(kvbidx, bufindex)) - len;
continue;
}
// we have records we can spill; only spill if blocked
if (kvindex != kvend) {
startSpill();
// Blocked on this write, waiting for the spill just
// initiated to finish. Instead of repositioning the marker
// and copying the partial record, we set the record start
// to be the new equator
setEquator(bufmark);
} else {
// We have no buffered records, and this record is too large
// to write into kvbuffer. We must spill it directly from
// collect
final int size = distanceTo(bufstart, bufindex) + len;
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
bufvoid = kvbuffer.length;
throw new MapBufferTooSmallException(size + " bytes");
}
}
}
// 没有足够的空间,则阻塞等待spill
if (blockwrite) {
// wait for spill
try {
while (spillInProgress) {
reporter.progress();
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException(
"Buffer interrupted while waiting for the writer", e);
}
}
} while (blockwrite);
} finally {
spillLock.unlock();
}
}
// here, we know that we have sufficient space to write
if (bufindex + len > bufvoid) {
final int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
off += gaplen;
bufindex = 0;
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
}

private void startSpill() {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
spillInProgress = true;
...
// 唤醒SpillThread
spillReady.signal();
}

spillReady.signal()唤醒SpillThread线程,SpillThread的run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
// 判断是否在spill,false则挂起SpillThread线程,等待唤醒
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
// 唤醒之后,进行排序和溢写到磁盘
sortAndSpill();
} catch (Throwable t) {
sortSpillException = t;
} finally {
spillLock.lock();
if (bufend < bufstart) {
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
spillInProgress = false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}

private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = distanceTo(bufstart, bufend, bufvoid) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// kvend/4 是截止到当前位置能存放多少个元数据实体
final int mstart = kvend / NMETA;
// kvstart 处能存放多少个元数据实体
// 元数据则在mstart和mend之间,(mstart - mend)则是元数据的个数
final int mend = 1 + // kvend is a valid record
(kvstart >= kvend
? kvstart
: kvmeta.capacity() + kvstart) / NMETA;
// 排序 只对元数据进行排序,只调整元数据在kvmeta中的顺序
// 排序规则是MapOutputBuffer.compare,
// 先对partition进行排序其次对key值排序
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
int spindex = mstart;
final IndexRecord rec = new IndexRecord();
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
// 临时文件是IFile格式的
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
// 往磁盘写数据时先判断是否有combiner
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}

// close the writer
writer.close();

// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);

writer = null;
} finally {
if (null != writer) writer.close();
}
}
// 判断内存中的index文件是否超出阈值,超出则将index文件写入磁盘
// 当超出阈值时只是把当前index和之后的index写入磁盘
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}

当用户自定义的map过程结束之后,代码回到runNewMapper中继续执行,进入SORT阶段,也可以说是Merge阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
...// 完整代码看上文,此处只列出关键点
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
// 用户自定义的map结束后,继续执行
mapPhase.complete();
// 开启SORT阶段,此处的SORT我感觉用merge可能更加准确
// 此阶段将spill的临时文件进行堆排序merge成一个最终文件输出
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
// 堆排序merge临时文件的入口
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

// NewOutputCollector.close
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}

// MapOutputBuffer.flush
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
// 首先将内存中残留的map结果spill到磁盘
try {
while (spillInProgress) {
reporter.progress();
spillDone.await();
}
checkSpillException();

final int kvbend = 4 * kvend;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished
resetSpill();
}
if (kvindex != kvend) {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
LOG.info("Spilling map output");
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
"; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
"); kvend = " + kvend + "(" + (kvend * 4) +
"); length = " + (distanceTo(kvend, kvstart,
kvmeta.capacity()) + 1) + "/" + maxRec);
sortAndSpill();
}
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for the writer", e);
} finally {
spillLock.unlock();
}
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
throw new IOException("Spill failed", e);
}
// release sort buffer before the merge
kvbuffer = null;
// merge临时文件(使用堆排序)
mergeParts();
Path outputPath = mapOutputFile.getOutputFile();
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}

// MapOutputBuffer.mergeParts
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final TaskAttemptID mapId = getTaskID();

for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
// 只有一个spill则直接将文件进行重命名,不进行merge
if (numSpills == 1) { //the spill is the final output
...
sortPhase.complete();
return;
}

// read in paged indices
// 将磁盘中的index文件加载到内存
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
...
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

if (numSpills == 0) {
//create dummy files
...
sortPhase.complete();
return;
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
for(int i = 0; i < numSpills; i++) {
// 从spill的index文件中得到当前spill中某个partition的信息
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);

if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
// 临时文件超过mapreduce.task.io.sort.factor 时进行排序
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
// sort the segments only if there are intermediate merges
// 是否按照长度对segment进行排序
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
@SuppressWarnings("unchecked")
// 堆排序(小顶堆)
// sortSegments为true时先将segments按照文件大小进行排序然后进行堆排序
// 堆排序是使用优先级队列
// kvIter依然是键值对的形式存在
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
null, spilledRecordsCounter, sortPhase.phase(),
TaskType.MAP);

//write merged output to disk
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
// merge 往磁盘写数据时也会检查下是否有combiner
// 注意此处不只是简单检查下是否有combiner,假如有combiner也不一定执行
// 需在numSpills>mapreduce.map.combine.minspills(默认3) 且有combiner时才执行combiner
// (map阶段只要往磁盘上写数据都会检查下是否有combiner)
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}

//close
writer.close();

sortPhase.startNextPhase();

// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}

至此map整个阶段结束。

总结

整个Map阶段的流程是inputFile通过split被切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给context.write,然后调用NewOutputCollector.write,对其结果key进行分区(默认使用hash分区),然后传给MapOutputBuffer.collect进行key、value的序列化写入buffer,由bufferRemaining记录剩余的字节大小,小于等于0时,开始进行spill,spill时先对buffer中key/value的元数据进行快排,之后开始写入磁盘的临时文件(写之前判断是否有combiner),当整个数据处理结束之后开始对磁盘中的临时文件进行merge,merge时使用的是堆排序(使用优先级队列实现),排序结束之后准备作为最终文件写入磁盘,在写入之前依然会判断是否有combiner,但此处会多一个条件,并不是只要有combiner就会执行,在有combiner的情况下还需满足numSpills>mapreduce.map.combine.minspills才会执行combiner