众所周知Reduce中的values是一个Iterator,但是value的数据并不是一下全部加载到这个Iterator(也就是对value根据key进行分组,这个分组比较器可以重写,二次排序中可能会需要重写,默认是按照key进行分组),那么values的Iterator是怎么形成的,是怎么被加载的呢?

下面从源码的角度去探秘一下吧,因为源码才是最好的教科书。

写Reduce函数时都会继承Reduce类然后重写reduce函数,在Reduce.run中循环调用reduce,reduce函数传进去的参数values已经是一个Iterator,则此Iterator的形成只能在调用reduce之前,开始循环调用reduce之后。看下run的代码可能更清晰一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
// while 循环调用reduce
while (context.nextKey()) {
// reduce传入的参数context.getValues()只是返回一个Iterator
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
...
}
} finally {
cleanup(context);
}
}

这里代码很简单,context.nextKey检查是否有下一个key,有则进入循环,调用context.getValues返回一个ValueIterable对象,第一感觉就是找到这个ValueIterable对象在哪被赋值了,那么问题就解决了,但是并不是。这个ValueIterable对象由返回一个Iterator对象,而这个Iterator对象只是在ValueIterable中被初始化了一个空对象,那么Iterator对象中的数据是在呢被赋值的呢?难道是在context.nextKey()中?

下面看下context.nextKey()的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ReduceContextImpl.java
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}

这里有两个属性hasMorenextKeyIsSame,hasMore初次是在ReduceContextImpl的构造方法中被hasMore = input.next()赋值(input是RawKeyValueIterator,是map端merge数据的输出类型),有下一行数据则为true。nextKeyIsSame初始化为false,随后会在nextKeyValue中被赋值。

则当reduce刚启动时,hasMore为true,nextKeyIsSame为false,不进while循环,进if语句,首先对key进行统计,然后调用nextKeyValue,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public boolean nextKeyValue() throws IOException, InterruptedException {
...
// 是否的一个值标识
firstValue = !nextKeyIsSame;
// 从input中拿到key值
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
// key值反序列
key = keyDeserializer.deserialize(key);
// 从input中拿到value值
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
// value反序列
value = valueDeserializer.deserialize(value);

currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
// mark reset功能是否开启,开启之后可以多次遍历values中的值
if (isMarked) {
backupStore.write(nextKey, nextVal);
}

hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
// 判断当前key是否和下一个key相同
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
// 对value的个数进行统计
inputValueCounter.increment(1);
return true;
}

nextKeyValue并没有给Iterator赋值,但是令人高兴的是这里把key和value从input中读取出来,反序列化放入key和value中,那么value和key都是在哪被调用呢,查看代码发现是在ValueIteratornext方法中value被return,ValueIterator就是上文中返回的Iterator对象。ValueIterator是ReduceContextImpl的内部类实现了Iterator接口。看下next方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public VALUEIN next() {
...
// if this is the first record, we don't need to advance
// 如果是第一个value,值直接返回在nextKeyValue中被赋值的value
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
// 如果不是第一个值,则调用nextKeyValue读取value值
nextKeyValue();
// 将上面读取的value值返回
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}

由next方法可见,Iterator初期只是个null对象,通过firstValuenextKeyIsSame来控制,是否还有值,并进行读取。每读取一次都是直接从input中读取值,这样减少了内存的利用,input可以在内存也可以在磁盘。

下面来说下Reduce读取value的整体流程。

首先在Reduce.run中调用context.nextKey()决定是否进入while,nextKey对key的个数进行统计,然后调用nextKeyValue将key/value的值从input中读出,并对firstValue、hashMore和nextKeyIsSame的值进行更新。

其次通过context.getValues将Iterator传入reduce中,在reduce中通过Iterator.hasNext查看此key是否有下个value,然后通过Iterator.next调用nextKeyValue去input中读取value。

hasNext代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
// 第一次调用hasNext时,firstValue在nextKeyValue中被赋值为true,
// 并且如果和下一个key相同,则nextKeyIsSame也为true
// 当调用过next()之后,firstValue会在next中被赋值为false,next方法直接返回
// 此时nextKeyIsSame依然是在第一次调用nextKeyValue中被赋值的true
return firstValue || nextKeyIsSame;
}

然后循环迭代Iterator,读取input中相同key的value。

读取当前key的value读取结束之后,再次调用context.nextKey。

也就是说reduce中相同key的value值在Iterator.next中通过nextKeyValue读取的,每调用一次next就从input中读一个value。此时有的同学会问假如我在读Iterator的循环中中途break呢,剩下的value是在哪被消费呢?如果不消费我再次循环的时候怎么读到下一个key的value。

下面看下context.nextKey的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}

有没有注意到方法开始有个while循环,当你中途break之后,再次进入run中的while循环时,nextKey会继续把没有读完的value读完,然后递增key的个数,调用nextKeyValue去读取下一个不同的key。