MapReduce就是将一个大job切分为n个task,由map和reduce分别去执行,但是切分规则是什么呢?其实job的切分也就是job所需文件的切分。
下面就从源码的角度简单分析下。
文件切分
MR是通过JobSubmitter.submitJobInternal提交给RM的,在submitJobInternal中通过writeSplits(JobContext job, Path jobSubmitDir)将job的输入文件进行split,writeSplit只是对新旧api进行了下封装,根据你的代码选择新旧api,这里调用writeNewSplits使用新API对file进行split
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
 InterruptedException, ClassNotFoundException {
 Configuration conf = job.getConfiguration();
 
 InputFormat<?, ?> input =
 ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
 List<InputSplit> splits = input.getSplits(job);
 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
 
 
 Arrays.sort(array, new SplitComparator());
 
 JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
 jobSubmitDir.getFileSystem(conf), array);
 return array.length;
 }
 
 | 
writeNewSplits根据得到的InputFormat实例,调用getSplits对文件进行切分,这也是切分的主要逻辑,看下代码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 
 | public List<InputSplit> getSplits(JobContext job) throws IOException {
 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 
 long maxSize = getMaxSplitSize(job);
 
 List<InputSplit> splits = new ArrayList<InputSplit>();
 
 List<FileStatus> files = listStatus(job);
 
 for (FileStatus file: files) {
 Path path = file.getPath();
 long length = file.getLen();
 if (length != 0) {
 
 BlockLocation[] blkLocations;
 if (file instanceof LocatedFileStatus) {
 blkLocations = ((LocatedFileStatus) file).getBlockLocations();
 } else {
 FileSystem fs = path.getFileSystem(job.getConfiguration());
 blkLocations = fs.getFileBlockLocations(file, 0, length);
 }
 
 if (isSplitable(job, path)) {
 
 long blockSize = file.getBlockSize();
 
 
 
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 
 
 
 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 
 
 
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 
 
 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 bytesRemaining -= splitSize;
 }
 
 if (bytesRemaining != 0) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 }
 } else {
 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
 blkLocations[0].getCachedHosts()));
 }
 } else {
 
 splits.add(makeSplit(path, 0, length, new String[0]));
 }
 }
 
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
 ...
 return splits;
 }
 
 | 
getSplits主要是将files的进行切片,将文件路径path、偏移量(即起始位置,是该split在整个文件中的起始位置)、切分大小splitSize、偏移量所在block的locations信息Host和在内存中的host信息写入FileSplit对象中,一个split对应一个对象,最后放入splits中返回。
其它详细信息已在代码中进行注释,这里需要注意的是由于splitSize可能大于blockSize,则切分的split可能有多个block组成,代码中得到的blockIndex只是偏移量所在block的index,而记录在FileSplit中的Hosts信息也只是偏移量所在block的副本所在hosts。
writeNewSplits中input.getSplits将files进行切分放入list中,然后将list转换为array,对array中的元素根据长度进行逆序排序。最后调用JobSplitWriter.createSplitFiles将保存切分信息的array落地到文件,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits)
 throws IOException, InterruptedException {
 
 FSDataOutputStream out = createFile(fs,
 JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
 
 SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
 out.close();
 
 writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
 info);
 }
 
 | 
createSplitFiles创建的文件包括两个,分别是记录切片的切片文件和记录切片元数据的切片元数据文件,这两个文件分别记录的是什么内容继续分析代码。
先看下createFile,该方法通过JobSubmissionFiles.getJobSplitFile(jobSubmitDir)得到切片文件的路径,根据其路径创建一个数据输出流FileSystem.create,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, Configuration job)  throws IOException {
 
 FSDataOutputStream out = FileSystem.create(fs, splitFile,
 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
 
 int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
 fs.setReplication(splitFile, (short)replication);
 
 
 writeSplitHeader(out);
 return out;
 }
 
 | 
将数据输出流返回,接着调用writeNewSplits将split数据写入切片文件(job.split)中,并返回切片的元数据信息,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | private static <T extends InputSplit> SplitMetaInfo[] writeNewSplits(Configuration conf,
 T[] array, FSDataOutputStream out)
 throws IOException, InterruptedException {
 
 SplitMetaInfo[] info = new SplitMetaInfo[array.length];
 if (array.length != 0) {
 SerializationFactory factory = new SerializationFactory(conf);
 int i = 0;
 
 
 int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
 MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
 long offset = out.getPos();
 for(T split: array) {
 long prevCount = out.getPos();
 
 Text.writeString(out, split.getClass().getName());
 Serializer<T> serializer =
 factory.getSerializer((Class<T>) split.getClass());
 serializer.open(out);
 
 serializer.serialize(split);
 long currCount = out.getPos();
 
 
 
 String[] locations = split.getLocations();
 if (locations.length > maxBlockLocations) {
 LOG.warn("Max block location exceeded for split: "
 + split + " splitsize: " + locations.length +
 " maxsize: " + maxBlockLocations);
 locations = Arrays.copyOf(locations, maxBlockLocations);
 }
 
 
 info[i++] =
 new JobSplit.SplitMetaInfo(
 locations, offset,
 split.getLength());
 offset += currCount - prevCount;
 }
 }
 return info;
 }
 
 | 
从代码中得知job.split文件中的内容为头信息(SPLIT_FILE_HEADER 值为SPL 和 splitVersion 值为JobSplit.META_SPLIT_VERSION  1)和若干个split数据,split数据为split类名+FileSplit对象序列化后的内容。
写入job.split之后将split元数据信息返回,继续调用writeJobSplitMetaInfo将元数据信息写入job.splitmetainfo中。代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion,
 JobSplit.SplitMetaInfo[] allSplitMetaInfo)
 throws IOException {
 
 
 
 FSDataOutputStream out =
 FileSystem.create(fs, filename, p);
 
 out.write(JobSplit.META_SPLIT_FILE_HEADER);
 
 WritableUtils.writeVInt(out, splitMetaInfoVersion);
 
 WritableUtils.writeVInt(out, allSplitMetaInfo.length);
 
 for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
 splitMetaInfo.write(out);
 }
 out.close();
 }
 
 | 
split元数据信息写入job.splitmetainfo,数据格式为头信息(META_SPLIT_FILE_HEADER 值为META-SPL、版本号1)、元数据个数和元数据SplitMetaInfo对象。
文件切分完之后,就是启动task进行处理,也就是job的切分,下面看下job切分。
job切分
MR切分的代码在JobImpl.java中,首先由事件JobEventType.JOB_INIT触发调用InitTransition.transition对job进行init(对于yarn的事件调用和状态转换可以看AsyncDispatcher事件调度器和StateMachineFactory状态机这两篇文章),transition调用createSplits对job进行切分,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {TaskSplitMetaInfo[] allTaskSplitMetaInfo;
 try {
 
 allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
 job.oldJobId, job.fs,
 job.conf,
 job.remoteJobSubmitDir);
 } catch (IOException e) {
 throw new YarnRuntimeException(e);
 }
 return allTaskSplitMetaInfo;
 }
 
 | 
通过SplitMetaInfoReader.readSplitMetaInfo读取split的切片信息,对job进行切分,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 
 | public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
 throws IOException {
 long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
 MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
 
 Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
 String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
 FileStatus fStatus = fs.getFileStatus(metaSplitFile);
 if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
 throw new IOException("Split metadata size exceeded " +
 maxMetaInfoSize +". Aborting job " + jobId);
 }
 
 FSDataInputStream in = fs.open(metaSplitFile);
 byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
 
 in.readFully(header);
 if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
 throw new IOException("Invalid header on split file");
 }
 
 int vers = WritableUtils.readVInt(in);
 if (vers != JobSplit.META_SPLIT_VERSION) {
 in.close();
 throw new IOException("Unsupported split version " + vers);
 }
 
 int numSplits = WritableUtils.readVInt(in);
 JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
 new JobSplit.TaskSplitMetaInfo[numSplits];
 for (int i = 0; i < numSplits; i++) {
 JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
 
 splitMetaInfo.readFields(in);
 
 JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
 jobSplitFile,
 splitMetaInfo.getStartOffset());
 
 allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
 splitMetaInfo.getLocations(),
 splitMetaInfo.getInputDataLength());
 }
 in.close();
 return allSplitMetaInfo;
 }
 
 | 
将切分之后的tasks返回,在transtion中调用
| 12
 
 | createMapTasks(job, inputLength, taskSplitMetaInfo);createReduceTasks(job);
 
 | 
进行task的创建,先看下createMapTasks的代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | private void createMapTasks(JobImpl job, long inputLength,TaskSplitMetaInfo[] splits) {
 for (int i=0; i < job.numMapTasks; ++i) {
 
 TaskImpl task =
 new MapTaskImpl(job.jobId, i,
 job.eventHandler,
 job.remoteJobConfFile,
 job.conf, splits[i],
 job.taskAttemptListener,
 job.jobToken, job.jobCredentials,
 job.clock,
 job.applicationAttemptId.getAttemptId(),
 job.metrics, job.appContext);
 
 job.addTask(task);
 }
 LOG.info("Input size for job " + job.jobId + " = " + inputLength
 + ". Number of splits = " + splits.length);
 }
 
 | 
addTask的代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | protected void addTask(Task task) {synchronized (tasksSyncHandle) {
 if (lazyTasksCopyNeeded) {
 Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>();
 newTasks.putAll(tasks);
 tasks = newTasks;
 lazyTasksCopyNeeded = false;
 }
 }
 
 tasks.put(task.getID(), task);
 
 if (task.getType() == TaskType.MAP) {
 mapTasks.add(task.getID());
 } else if (task.getType() == TaskType.REDUCE) {
 reduceTasks.add(task.getID());
 }
 metrics.waitingTask(task);
 }
 
 | 
将task切分好之后就是等待调度了,代码为job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);,这个也是由事件触发的,这里就不展开了,随后可能会单独介绍。