MapReduce就是将一个大job切分为n个task,由map和reduce分别去执行,但是切分规则是什么呢?其实job的切分也就是job所需文件的切分

下面就从源码的角度简单分析下。

文件切分

MR是通过JobSubmitter.submitJobInternal提交给RM的,在submitJobInternal中通过writeSplits(JobContext job, Path jobSubmitDir)将job的输入文件进行split,writeSplit只是对新旧api进行了下封装,根据你的代码选择新旧api,这里调用writeNewSplits使用新API对file进行split

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
// 创建一个InputFormat,用于指定文件的格式化格式,默认是FileInputFormat
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
// 从JobContext中的INPUT_DIR中拿到files,然后根据splitSize对文件进行切分
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
// 对splits进行排序,按照split的大小进行逆序排序,先处理大的
Arrays.sort(array, new SplitComparator());
// 将split信息固化到文件中
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

writeNewSplits根据得到的InputFormat实例,调用getSplits对文件进行切分,这也是切分的主要逻辑,看下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 得到split的大小,由mapreduce.input.fileinputformat.split.minsize控制
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// split的最大值,默认是Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 多线程得到输入路径中的文件状态
List<FileStatus> files = listStatus(job);
// 以文件为单位进行切分
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
// 得到file所有block的locations信息,详情请看BlockLocations类
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断是否可以切分,压缩文件不能切分
if (isSplitable(job, path)) {
// 得到当前file的block大小
long blockSize = file.getBlockSize();
// 计算切分值 max(minSize, min(blockSize,maxSize))
// 出与对MR的优化,minSize配置的一般都等于blockSize或者是blockSize的整数倍
// 如果大blockSize,会有别的block通过网络传输到map节点,造成网络压力
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 循环切分,这里有个小优化,在大数据中避免出现小文件,
// 将最后剩下一个比splitSize稍微大一点不再进行切分,
// 因为SPLIT_SLOP = 1.1,而不是 1
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 根据偏移量length-bytesRemaining,在blkLocations中找到偏移量所在block的index
// 这里只是起始block的index,该split可能包含多个block
// length为1100,block为128,splitSize为500,则第一个split的index为0,第二个则是4
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 将文件的切分信息通过makeSplit写入FileSplit对象中
// 只是将当前blkIndex的block的相关信息放入FileSplit中
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
...
return splits;
}

getSplits主要是将files的进行切片,将文件路径path、偏移量(即起始位置,是该split在整个文件中的起始位置)、切分大小splitSize、偏移量所在block的locations信息Host和在内存中的host信息写入FileSplit对象中,一个split对应一个对象,最后放入splits中返回。

其它详细信息已在代码中进行注释,这里需要注意的是由于splitSize可能大于blockSize,则切分的split可能有多个block组成,代码中得到的blockIndex只是偏移量所在block的index,而记录在FileSplit中的Hosts信息也只是偏移量所在block的副本所在hosts

writeNewSplits中input.getSplits将files进行切分放入list中,然后将list转换为array,对array中的元素根据长度进行逆序排序。最后调用JobSplitWriter.createSplitFiles将保存切分信息的array落地到文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
// 通过createFile创建分片文件并获取文件系统数据输出流FSDataOutputStream实例out,
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
// 将分片数据FileSplit对象写入分片文件,并返回split的元数据
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
out.close();
// 将split的元数据写入分片元数据文件中
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}

createSplitFiles创建的文件包括两个,分别是记录切片的切片文件和记录切片元数据的切片元数据文件,这两个文件分别记录的是什么内容继续分析代码。

先看下createFile,该方法通过JobSubmissionFiles.getJobSplitFile(jobSubmitDir)得到切片文件的路径,根据其路径创建一个数据输出流FileSystem.create,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, 
Configuration job) throws IOException {
// 根据jobSubmitDir/job.split的路径创建数据输出流
FSDataOutputStream out = FileSystem.create(fs, splitFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
// 设置切片文件的副本数,默认是10
int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
fs.setReplication(splitFile, (short)replication);
// 将切片文件的头信息写入切片文件中
// 头信息为 SPLIT_FILE_HEADER(值为SPL) 和 splitVersion(值为JobSplit.META_SPLIT_VERSION 1)
writeSplitHeader(out);
return out;
}

将数据输出流返回,接着调用writeNewSplits将split数据写入切片文件(job.split)中,并返回切片的元数据信息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static <T extends InputSplit> 
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
// new 出用于存放元数据信息的数组
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
// mapreduce.job.max.split.locations
// 该split中多个block副本所在的host个数不能超过该值
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
long offset = out.getPos();
for(T split: array) {
long prevCount = out.getPos();
// 将split的类名 这里就是FileSplit写入job.split文件中
Text.writeString(out, split.getClass().getName());
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
serializer.open(out);
// 将split数据序列化写入job.split中
serializer.serialize(split);
long currCount = out.getPos();
// 此处得到的locations只是offset所在block的locations?
// 应该拿到的是job.split中数据的position
// 加入该split有多个block组成,其余block的location在哪合并的?
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
// 将split元数据信息写入SplitMetaInfo的数组中
// 元数据包括locations、offset和length
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
offset += currCount - prevCount;
}
}
return info;
}

从代码中得知job.split文件中的内容为头信息(SPLIT_FILE_HEADER 值为SPL 和 splitVersion 值为JobSplit.META_SPLIT_VERSION 1)和若干个split数据,split数据为split类名+FileSplit对象序列化后的内容。

写入job.split之后将split元数据信息返回,继续调用writeJobSplitMetaInfo将元数据信息写入job.splitmetainfo中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, 
FsPermission p, int splitMetaInfoVersion,
JobSplit.SplitMetaInfo[] allSplitMetaInfo)
throws IOException {
// write the splits meta-info to a file for the job tracker
// 通过JobSubmissionFiles.getJobSplitMetaFile得到元数据的文件路径job.splitmetainfo
// 打开数据输出流
FSDataOutputStream out =
FileSystem.create(fs, filename, p);
// 将头信息写入 "META-SPL"
out.write(JobSplit.META_SPLIT_FILE_HEADER);
// 和job.split一个版本号
WritableUtils.writeVInt(out, splitMetaInfoVersion);
// split元数据的个数也就是split分片的个数
WritableUtils.writeVInt(out, allSplitMetaInfo.length);
// 将元数据信息SplitMetaInfo对象写入
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
splitMetaInfo.write(out);
}
out.close();
}

split元数据信息写入job.splitmetainfo,数据格式为头信息(META_SPLIT_FILE_HEADER 值为META-SPL、版本号1)、元数据个数和元数据SplitMetaInfo对象。

文件切分完之后,就是启动task进行处理,也就是job的切分,下面看下job切分。

job切分

MR切分的代码在JobImpl.java中,首先由事件JobEventType.JOB_INIT触发调用InitTransition.transition对job进行init(对于yarn的事件调用和状态转换可以看AsyncDispatcher事件调度器StateMachineFactory状态机这两篇文章),transition调用createSplits对job进行切分,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
TaskSplitMetaInfo[] allTaskSplitMetaInfo;
try {
// 读取split的job.split和job.splitmetainfo进行切分
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
job.oldJobId, job.fs,
job.conf,
job.remoteJobSubmitDir);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
return allTaskSplitMetaInfo;
}

通过SplitMetaInfoReader.readSplitMetaInfo读取split的切片信息,对job进行切分,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
throws IOException {
long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
// 得到job.splitmetainfo和job.split的路径
Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
FileStatus fStatus = fs.getFileStatus(metaSplitFile);
if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
throw new IOException("Split metadata size exceeded " +
maxMetaInfoSize +". Aborting job " + jobId);
}
// 打开job.splitmetainfo的读取数据流
FSDataInputStream in = fs.open(metaSplitFile);
byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
// 读取头文件
in.readFully(header);
if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
// 版本号
int vers = WritableUtils.readVInt(in);
if (vers != JobSplit.META_SPLIT_VERSION) {
in.close();
throw new IOException("Unsupported split version " + vers);
}
// 切片的个数
int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
new JobSplit.TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; i++) {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
// 切片的元数据信息
splitMetaInfo.readFields(in);
// 根据元数据和split数据信息对job进行切分
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
jobSplitFile,
splitMetaInfo.getStartOffset());
// 将切分之后的task元数据放入数组
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(),
splitMetaInfo.getInputDataLength());
}
in.close();
return allSplitMetaInfo;
}

将切分之后的tasks返回,在transtion中调用

1
2
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);

进行task的创建,先看下createMapTasks的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
// 根据task的元数据信息进行TaskImpl的实例化
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.jobCredentials,
job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
// 将task信息保存到jobImpl中
job.addTask(task);
}
LOG.info("Input size for job " + job.jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
}

addTask的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void addTask(Task task) {
synchronized (tasksSyncHandle) {
if (lazyTasksCopyNeeded) {
Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>();
newTasks.putAll(tasks);
tasks = newTasks;
lazyTasksCopyNeeded = false;
}
}
// 将task和taskID作为映射放入tasks的map中
tasks.put(task.getID(), task);
// 将taskID放入set中
if (task.getType() == TaskType.MAP) {
mapTasks.add(task.getID());
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.add(task.getID());
}
metrics.waitingTask(task);
}

将task切分好之后就是等待调度了,代码为job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);,这个也是由事件触发的,这里就不展开了,随后可能会单独介绍。