MapReduce就是将一个大job切分为n个task,由map和reduce分别去执行,但是切分规则是什么呢?其实job的切分也就是job所需文件的切分。
下面就从源码的角度简单分析下。
文件切分
MR是通过JobSubmitter.submitJobInternal提交给RM的,在submitJobInternal中通过writeSplits(JobContext job, Path jobSubmitDir)
将job的输入文件进行split,writeSplit只是对新旧api进行了下封装,根据你的代码选择新旧api,这里调用writeNewSplits
使用新API对file进行split
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
|
writeNewSplits根据得到的InputFormat实例,调用getSplits对文件进行切分,这也是切分的主要逻辑,看下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; }
if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(makeSplit(path, 0, length, new String[0])); } } job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); ... return splits; }
|
getSplits主要是将files的进行切片,将文件路径path、偏移量(即起始位置,是该split在整个文件中的起始位置)、切分大小splitSize、偏移量所在block的locations信息Host和在内存中的host信息写入FileSplit
对象中,一个split对应一个对象,最后放入splits中返回。
其它详细信息已在代码中进行注释,这里需要注意的是由于splitSize可能大于blockSize,则切分的split可能有多个block组成,代码中得到的blockIndex只是偏移量所在block的index,而记录在FileSplit中的Hosts信息也只是偏移量所在block的副本所在hosts。
writeNewSplits中input.getSplits将files进行切分放入list中,然后将list转换为array,对array中的元素根据长度进行逆序排序。最后调用JobSplitWriter.createSplitFiles
将保存切分信息的array落地到文件,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits) throws IOException, InterruptedException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); SplitMetaInfo[] info = writeNewSplits(conf, splits, out); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); }
|
createSplitFiles创建的文件包括两个,分别是记录切片的切片文件和记录切片元数据的切片元数据文件,这两个文件分别记录的是什么内容继续分析代码。
先看下createFile
,该方法通过JobSubmissionFiles.getJobSplitFile(jobSubmitDir)
得到切片文件的路径,根据其路径创建一个数据输出流FileSystem.create
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, Configuration job) throws IOException { FSDataOutputStream out = FileSystem.create(fs, splitFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); int replication = job.getInt(Job.SUBMIT_REPLICATION, 10); fs.setReplication(splitFile, (short)replication); writeSplitHeader(out); return out; }
|
将数据输出流返回,接着调用writeNewSplits
将split数据写入切片文件(job.split)中,并返回切片的元数据信息,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private static <T extends InputSplit> SplitMetaInfo[] writeNewSplits(Configuration conf, T[] array, FSDataOutputStream out) throws IOException, InterruptedException { SplitMetaInfo[] info = new SplitMetaInfo[array.length]; if (array.length != 0) { SerializationFactory factory = new SerializationFactory(conf); int i = 0; int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); long offset = out.getPos(); for(T split: array) { long prevCount = out.getPos(); Text.writeString(out, split.getClass().getName()); Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); serializer.open(out); serializer.serialize(split); long currCount = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); locations = Arrays.copyOf(locations, maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( locations, offset, split.getLength()); offset += currCount - prevCount; } } return info; }
|
从代码中得知job.split文件中的内容为头信息(SPLIT_FILE_HEADER 值为SPL 和 splitVersion 值为JobSplit.META_SPLIT_VERSION 1)和若干个split数据,split数据为split类名+FileSplit对象序列化后的内容。
写入job.split之后将split元数据信息返回,继续调用writeJobSplitMetaInfo
将元数据信息写入job.splitmetainfo中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException { FSDataOutputStream out = FileSystem.create(fs, filename, p); out.write(JobSplit.META_SPLIT_FILE_HEADER); WritableUtils.writeVInt(out, splitMetaInfoVersion); WritableUtils.writeVInt(out, allSplitMetaInfo.length); for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) { splitMetaInfo.write(out); } out.close(); }
|
split元数据信息写入job.splitmetainfo,数据格式为头信息(META_SPLIT_FILE_HEADER 值为META-SPL、版本号1)、元数据个数和元数据SplitMetaInfo对象。
文件切分完之后,就是启动task进行处理,也就是job的切分,下面看下job切分。
job切分
MR切分的代码在JobImpl.java中,首先由事件JobEventType.JOB_INIT
触发调用InitTransition.transition
对job进行init(对于yarn的事件调用和状态转换可以看AsyncDispatcher事件调度器和StateMachineFactory状态机这两篇文章),transition调用createSplits对job进行切分,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { TaskSplitMetaInfo[] allTaskSplitMetaInfo; try { allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo( job.oldJobId, job.fs, job.conf, job.remoteJobSubmitDir); } catch (IOException e) { throw new YarnRuntimeException(e); } return allTaskSplitMetaInfo; }
|
通过SplitMetaInfoReader.readSplitMetaInfo读取split的切片信息,对job进行切分,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo( JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) throws IOException { long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE, MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE); Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir); String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(); FileStatus fStatus = fs.getFileStatus(metaSplitFile); if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) { throw new IOException("Split metadata size exceeded " + maxMetaInfoSize +". Aborting job " + jobId); } FSDataInputStream in = fs.open(metaSplitFile); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header); if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) { throw new IOException("Invalid header on split file"); } int vers = WritableUtils.readVInt(in); if (vers != JobSplit.META_SPLIT_VERSION) { in.close(); throw new IOException("Unsupported split version " + vers); } int numSplits = WritableUtils.readVInt(in); JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits]; for (int i = 0; i < numSplits; i++) { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( jobSplitFile, splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); } in.close(); return allSplitMetaInfo; }
|
将切分之后的tasks返回,在transtion中调用
1 2
| createMapTasks(job, inputLength, taskSplitMetaInfo); createReduceTasks(job);
|
进行task的创建,先看下createMapTasks的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private void createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits) { for (int i=0; i < job.numMapTasks; ++i) { TaskImpl task = new MapTaskImpl(job.jobId, i, job.eventHandler, job.remoteJobConfFile, job.conf, splits[i], job.taskAttemptListener, job.jobToken, job.jobCredentials, job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); } LOG.info("Input size for job " + job.jobId + " = " + inputLength + ". Number of splits = " + splits.length); }
|
addTask的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| protected void addTask(Task task) { synchronized (tasksSyncHandle) { if (lazyTasksCopyNeeded) { Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>(); newTasks.putAll(tasks); tasks = newTasks; lazyTasksCopyNeeded = false; } } tasks.put(task.getID(), task); if (task.getType() == TaskType.MAP) { mapTasks.add(task.getID()); } else if (task.getType() == TaskType.REDUCE) { reduceTasks.add(task.getID()); } metrics.waitingTask(task); }
|
将task切分好之后就是等待调度了,代码为job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
,这个也是由事件触发的,这里就不展开了,随后可能会单独介绍。