MapReduce源码解析--Map解析
MapReduce–Map
MapReduce由Map和Reduce组成,而Map和Reduce又可以分为很多个小phase,下面就从源码的角度去扒下Map的流程。
通过intellij idea进行debug调试,在New API的流程发现Map中具体流程可以大致分为两种情况:有Reduce和没有Reduce
- 没有Reduce
split–>read–map(用户自定义的map函数)–>write(未排序)–>output
- 有Reduce
split–>read–>map(用户自定义的map函数)–>partition–>collect–>buffer–>quicksort–>(combiner)–>spill–>merge(heapsort、combiner)–>output
本篇主要介绍有Reduce的情况下Map中各个阶段的流程。
跟踪代码到MapTask.run中,代码中先根据是否有Reduce对Map阶段进行分割,然后判断Map Task的类型(Map Task分为job setup、job cleanup、map task、task cleanup),主要跟下map task,进入runNewMapper
1 | public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) |
Split
runNewMapper
中包含了整个Map的所有phase,首先通过split = getSplitDetails()
得到当前map对应的split,split是在JobSubmitter.submitJobInternal
中调用writeSplits
得到的,有多少个split就对应多少个map。
1 | private <T extends InputSplit> |
上面的代码将mr的输入文件进行切分为splits,其中splitSize
参数比较重要,在此对其取值代码进行解析下:
1 | long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); |
splitSize的大小由split.minsize
、split.maxsize
和blocksize
这三个参数控制,其中主要是由split.minsize
和blocksize
两个参数决定,_取这两个的较大值_。
read
将输入文件切分为splits之后,在MapTask.runNewMapper中加载,由RecordReader对象进行读取
map(用户自定义的map函数)
读取split文件之后,调用Mapper.run
方法,进入用户自己继承的Mapper类中
1 | // Mapper.run |
collect
map逻辑完之后,将map的每条结果通过context.write
进行collect。此处的context.write
最终调用的是在runNewMapper
中实例化的output(output = new NewOutputCollector(taskContext, job, umbilical, reporter)
)对象的write
方法。
1 | // NewOutputCollector.write |
partition
由上面的代码可以看出每条被map处理之后的结果在collect中,会对先对其进行分区处理,默认使用HashPartitioner.java
1 | public int getPartition(K key, V value, |
buffer
将map处理之后的key value 进行分区之后,写入buffer中的_环形缓冲区_中。
先来看下环形缓冲区的数据结构,然后理解其数据写入就比较容易了。
环形缓冲区在MapTask.MapOutputBuffer
中定义,相关的属性如下:
1 | // k/v accounting |
环形缓冲区的结构在MapOutputBuffer.init
中创建。
1 | public void init(MapOutputCollector.Context context |
此时环形缓冲区已被初始化,但其具体结构及其使用还不太明了,继续看下面的代码,
1 | // MapOutputBuffer.collect |
每次map的结果partition之后来到collect时,先从剩余的空间中减去此条数据元数据的长度bufferRemaining -= METASIZE
,然后判断bufferRemaining
是否小于0,
大于0则直接将key/value对和元数据信息写入缓冲区中,key和value是通过
keySerializer.serialize
序列化并通过一系列的方法调用,最终调用_MapOutputBuffer_的内部类Buffer.write
方法将其内容写入_kvbuffer_中。接下来是将元数据信息写入kvmeta中,元数据信息包括partition、key的起始位置、value的起始位置以及value的长度。首次小于等于0进入if语句,此时剩余的空间不足,将启动spill线程。先得到__可重入锁__spillLock的锁,并且此时并无有任何spill线程运行,所以
spillInProgress=false
,进入else if
语句中,执行startSpill()
,唤醒SpillThread线程,重新设置_equator_和_bufferRemaining_。随后正常将key/value对写入kvbuffer中,如果没有足够的空间存储则在Buffer.write
中阻塞。write
和startSpill
的代码如下:
1 | public void write(byte b[], int off, int len) |
spillReady.signal()
唤醒SpillThread线程,SpillThread的run方法如下:
1 | public void run() { |
当用户自定义的map过程结束之后,代码回到runNewMapper
中继续执行,进入SORT阶段,也可以说是Merge阶段。
1 | private <INKEY,INVALUE,OUTKEY,OUTVALUE> |
至此map整个阶段结束。
总结
整个Map阶段的流程是inputFile通过split被切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给context.write
,然后调用NewOutputCollector.write
,对其结果key进行分区(默认使用hash分区),然后传给MapOutputBuffer.collect
进行key、value的序列化写入buffer,由bufferRemaining
记录剩余的字节大小,小于等于0时,开始进行spill,spill时先对buffer中key/value的元数据进行快排,之后开始写入磁盘的临时文件(写之前判断是否有combiner),当整个数据处理结束之后开始对磁盘中的临时文件进行merge,merge时使用的是堆排序(使用优先级队列实现),排序结束之后准备作为最终文件写入磁盘,在写入之前依然会判断是否有combiner,但此处会多一个条件,并不是只要有combiner就会执行,在有combiner的情况下还需满足numSpills>mapreduce.map.combine.minspills才会执行combiner