MapReduce源码解析--Reduce中values的Iterator的生成
众所周知Reduce中的values是一个Iterator,但是value的数据并不是一下全部加载到这个Iterator(也就是对value根据key进行分组,这个分组比较器可以重写,二次排序中可能会需要重写,默认是按照key进行分组),那么values的Iterator是怎么形成的,是怎么被加载的呢?
下面从源码的角度去探秘一下吧,因为源码才是最好的教科书。
写Reduce函数时都会继承Reduce类然后重写reduce函数,在Reduce.run中循环调用reduce,reduce函数传进去的参数values已经是一个Iterator,则此Iterator的形成只能在调用reduce之前,开始循环调用reduce之后。看下run的代码可能更清晰一些:
1 | public void run(Context context) throws IOException, InterruptedException { |
这里代码很简单,context.nextKey检查是否有下一个key,有则进入循环,调用context.getValues返回一个ValueIterable对象,第一感觉就是找到这个ValueIterable对象在哪被赋值了,那么问题就解决了,但是并不是。这个ValueIterable对象由返回一个Iterator对象,而这个Iterator对象只是在ValueIterable中被初始化了一个空对象,那么Iterator对象中的数据是在呢被赋值的呢?难道是在context.nextKey()中?
下面看下context.nextKey()的代码:
1 | // ReduceContextImpl.java |
这里有两个属性hasMore
和nextKeyIsSame
,hasMore初次是在ReduceContextImpl的构造方法中被hasMore = input.next()
赋值(input是RawKeyValueIterator
,是map端merge数据的输出类型),有下一行数据则为true。nextKeyIsSame初始化为false,随后会在nextKeyValue中被赋值。
则当reduce刚启动时,hasMore为true,nextKeyIsSame为false,不进while循环,进if语句,首先对key进行统计,然后调用nextKeyValue
,代码如下:
1 | public boolean nextKeyValue() throws IOException, InterruptedException { |
nextKeyValue并没有给Iterator赋值,但是令人高兴的是这里把key和value从input中读取出来,反序列化放入key和value中,那么value和key都是在哪被调用呢,查看代码发现是在ValueIterator
的next
方法中value被return,ValueIterator就是上文中返回的Iterator对象。ValueIterator是ReduceContextImpl的内部类实现了Iterator接口。看下next方法:
1 | public VALUEIN next() { |
由next方法可见,Iterator初期只是个null对象,通过firstValue
和nextKeyIsSame
来控制,是否还有值,并进行读取。每读取一次都是直接从input中读取值,这样减少了内存的利用,input可以在内存也可以在磁盘。
下面来说下Reduce读取value的整体流程。
首先在Reduce.run中调用context.nextKey()决定是否进入while,nextKey对key的个数进行统计,然后调用nextKeyValue将key/value的值从input中读出,并对firstValue、hashMore和nextKeyIsSame的值进行更新。
其次通过context.getValues将Iterator传入reduce中,在reduce中通过Iterator.hasNext查看此key是否有下个value,然后通过Iterator.next调用nextKeyValue去input中读取value。
hasNext代码如下:
1 | public boolean hasNext() { |
然后循环迭代Iterator,读取input中相同key的value。
读取当前key的value读取结束之后,再次调用context.nextKey。
也就是说reduce中相同key的value值在Iterator.next中通过nextKeyValue读取的,每调用一次next就从input中读一个value。此时有的同学会问假如我在读Iterator的循环中中途break呢,剩下的value是在哪被消费呢?如果不消费我再次循环的时候怎么读到下一个key的value。
下面看下context.nextKey的代码:
1 | public boolean nextKey() throws IOException,InterruptedException { |
有没有注意到方法开始有个while循环,当你中途break之后,再次进入run中的while循环时,nextKey会继续把没有读完的value读完,然后递增key的个数,调用nextKeyValue去读取下一个不同的key。