MapReduce源码解析--Reduce解析
MapReduce–Reduce
上篇主要扒了下Map阶段的代码,并根据代码把Map阶段的流程串了下,本篇主要扒下Reduce阶段的代码,顺便串下Reduce阶段的流程。
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。Reduce Task和Map Task一样,同样也有四中任务类型,分别为job setup、job cleanup、reduce task、task cleanup。Reduce阶段的代码入口是ReduceTask.java
类中的run
方法,代码如下:
1 | public void run(JobConf job, final TaskUmbilicalProtocol umbilical) |
由上面的run
代码段可以看出_copy_和_sort_是主要在shuffleConsumerPlugin.run()
中执行,此方法结束之后则进入reduce阶段,执行runNewReucer()
。则下面先看下shuffleConsumerPlugin
的逻辑,从shuffleConsumerPlugin.init(shuffleContext)
开始,
1 | // Shuffle.java |
通过createMergeManager
new出一个MergeManagerImpl对象,该对象的构造函数主要构建几个Merge线程,并为一些参数设置阈值。
1 | public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, |
init结束之后,就是shuffleConsumerPlugin.run()
也就是Shuffle.run
。run的执行也标志着Reduce阶段的开始,run中启动一个抓取Completion Map的线程EventFetcher和copy数据的线程Fetcher,等copy结束之后数据进行merge sort。代码解析如下:
1 | public RawKeyValueIterator run() throws IOException, InterruptedException { |
run中主要完成了reduce前期的工作,其中先来看下_EventFetcher_线程的run代码
1 | public void run() { |
EventFetcher
主要是通过getMapCompletionEvents
方法的rpc拿到Completion Map,然后对_scheduler_的相关list进行赋值。umbilical
是rpc协议TaskUmbilicalProtocol
,RPC的实现类是TaskAttemptListenerImpl
,则umbilical.getMapCompletionEvents
实际上调用的是TaskAttemptListenerImpl.getMapCompletionEvents
,然后继续调用JobImpl.getMapAttemptCompletionEvents
方法,得到已完成的events,并将其放入scheduler中相应的map映射中。
1 | // 得到events |
EventFetcher
到此分析完毕,接下来分析Fetcher
线程。Fetcher
线程有两种,分别是LocalFetcher
和Fetcher
,顾名思义LocalFetcher
是fetcher本地文件用的,没有用到http服务。Fethcer
是通过 http Get 从远程nodemanager上fetcher map输出。这里主要分析Fetcher
线程。
1 | public void run() { |
在Fetcher
线程的run方法中先判断是否有_merge_,如果有则block当前Fetcher
线程,这里只判断是否_存在内存到磁盘的merge_。随后从_Host_列表中_随机_选出一个_Host_(随机方法可查看scheduler.getHost()
代码),进行copy,copy的入口是copyFromHost
,copyFromHost
的主要工作是建立_http连接_,然后循环调用copyMapOutput
对_某个map_的输出进行copy。在copyMapOutput
中会判断当前的内容是输出到mem还是disk,此处的逻辑判断在merger.reserve()
中,看下代码:
1 | public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, |
由merger.reserve()
得到_OnDiskMapOutput_或者_InMemoryMapOutput_之后,调用mapOutput.shuffle
将内容通过输出流写入Memory或者Disk。随后由scheduler.copySucceeded
进行收尾工作,主要包括将已完成copy的map状态设置为true(map copy的状态存储在finishedMaps
的boolean数组中,mapIndex为数组的id,完成则为true),完成一些统计信息,以及由output.commit()
提交并关闭mapOutput流,在关闭的过程中会判断是否需要_merge_。下面看下代码,重点看下output.commit
:
1 | public synchronized void copySucceeded(TaskAttemptID mapId, |
由merger.reserve()
得到mapOutput有两种情况,则先来看InMemoryMapOutput.commit
,其内调用了closeInMemoryFile
,观其代码如下:
1 | public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) { |
再来看下OnDiskMapOutput.commit
,将输出的临时文件重命名,然后调用closeOnDiskFile
,查其代码如下:
1 | public synchronized void closeOnDiskFile(CompressAwarePath file) { |
在两个commit
里都是通过startMerge
触发MergeThread的,其代码如下:
1 | // MergeThread.startMerge |
首先numPending
记录了触发merge的次数,_numPending_会在MergeThread.waitForMerge()
和MergeThread.run
中用到,MergeThread.waitForMerge()
在Fetcher.run
通过merger.waitForResource()
调用,判断当前是否有merge线程,有则阻塞Fetcher线程。在MergeThread.run
中如果merge成功则调用numPending.decrementAndGet();notifyAll();
更新_numPending_的状态并通知所有被MergeThread.wait()
阻塞的线程。pendingToBeMerged
是_LinkedList_类型的,将需要merge的list添加到_pendingToBeMerged_末尾,在MergeThread.run
中从链表的头取出需要merge的list。MergeThread.run
代码如下:
1 | public void run() { |
由MergeThread.startMerge
激活MergeThread
线程之后,从pendingToBeMerged
中取出链头的元素,通过不同类实现的merge()
方法进行具体的merge过程,最后更新_mumPending_的状态。这里主要分析下_InMemoryMapOutput_和_OnDiskMapOutput_实现的merge方法。先看InMemoryMapOutput.merge
1 | public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { |
InMemoryMapOutput.merge
主要是调用了Merger.merge
对各个文件进行堆排序,这里还要判断下是否有combiner
(__只有mem to disk时才会判断__),方法的结尾会调用closeOnDiskFile
,检查下内存的文件merge到disk之后,是否满足disk merge的条件。InMemoryMapOutput.merge
到此结束,接下来看下OnDiskMapOutput.merge
1 | public void merge(List<CompressAwarePath> inputs) throws IOException { |
OnDiskMapOutput.merge
也是调用Merger.merge
对文件进行堆排序,然后write到本地,最后依然会调用closeOnDiskFile
检查merge之后是否依然满足disk merge条件。
_Fetcher_线程分析结束,代码回到Shuffle.run
中,此时copy阶段结束,执行copyPhase.complete()
,标识_SORT_阶段,执行taskStatus.setPhase(TaskStatus.Phase.SORT)
,真正的sort逻辑是在merger.close()
,之所以把把此阶段称为__SORT阶段__,可能是因为在此阶段是纯粹的SORT吧,而不掺杂别的操作,这就可以解释为什么在之前的_COPY阶段_虽然也存在SORT,但并没有将SORT阶段从此处开始。下面跟下代码:
1 | public RawKeyValueIterator close() throws Throwable { |
此阶段的主要逻辑在finalMerge
中实现,这里的代码逻辑是先判断内存中的数据和磁盘中的文件个数的多少(ioSortFactor > onDiskMapOutputs.size()),符合条件则进行内存中的文件进行merge,输出到disk中。随后对disk中的文件进行merge,此处夹杂一行代码Collections.sort
,对disk中的文件进行长度的排序,形成一个小顶堆进行merge,将内存和磁盘最终的merge文件放入finalSegments中进行最终的merge。则shuffle.run结束。准备进入下一阶段_Reduce阶段_。回到ReduceTask.run中,根据api执行runNewReducer
1 | private <INKEY,INVALUE,OUTKEY,OUTVALUE> |
在这里要说下reduce中的value-list是怎么形成的。
入口在rudecer.run(reducerContext)
,
1 | public void run(Context context) throws IOException, InterruptedException { |
代码在while循环处调用用户自定义的reduce,则value-list应该再while的判断语句中实现,则查看context.nextKey
,
1 | // WrappedReducer.nextKey() |
主要功能是在ReduceContextImpl.nextKey
代码中实现的。当有下一个键值对并且key值一样时(hasMore && nextKeyIsSame
),执行nextKeyValue
,如果下一个键值对的key值不一样,则增加key的个数然后执行nextKeyValue
总结
Reduce阶段大致分为copy、sort和reduce阶段,而_copy阶段又分为shuffle和merge阶段_,copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge,(其中还应注意merge的条件和数据往内存中写入时的情况)。带数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段。