上篇介绍了二次排序的场景和解决方案,但是无法做到全局有序,本篇就介绍下如何进行全局二次排序。

全局排序

让整个MR结果做到全局有序,其难点在partition上,也就是要保证分配给reducei的key一定比分配给reducej的key小(i小于j)。那么难点就在partition边界的设定上,边界值设置的不恰当,可能会导致每个reduce上分配的记录数不均匀,发生数据倾斜会导致整个job会被个别task拖慢

那么最理想的方式是每个reduce上得到的record尽可能的均匀,并且在决定某个record被分配到哪个reduce的时间应尽可能的快,也就是partition方法需要尽可能的满足使record平均分配分配算法高效

Hadoop本身提供了TotalOrderPartitioner类,此分区方法保证了相同key分配到同一个reduce的前提下更保证了key的全局有序。只是此方法依赖一个partition file,该file是一个sequence file,里面存放这每个reduce中key的分界点,如果我们设置的reducer个数是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner将partition file中的key按照是BinaryComparable类型的,如果是则将key构建一个trie树(查找的时间复杂度是O(n),n为trie树的深度)。如果是非BinaryComparable类型的,则使用二分查找(时间复杂度为O(log(n)),n为reduce个数)。当查找的数据结构构建好之后,TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。

那么面对大量的数据,partition file如何得到?Hadoop帮我们提供了采样器帮我们预估整个边界,以使数据的分配尽量平均。Hadoop提供的采样工具有RandomSampler、InputSampler和IntervalSampler

其中

  • RandomSampler<K,V>为遍历所有数据,随机采样,效率有点低。
  • SplitSampler<K,V>是对前n个记录进行采样,效率是这三个里最高的。
  • IntervalSampler<K,V>是对固定间隔采样,效率较高,但比较适合有序的数据集。

对于partition file的值除了用上面的采样方法之外,还可以根据特定需求自己写MR对数据进行划分。本篇采用RandomSampler对数据进行划分。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class TotalSort {

private static final Logger log = Logger.getLogger(TotalSort.class);

public static class MyMapper
extends Mapper<Text, Text, Text, IntWritable> {

public void map(Text key, Text value, Context context
) throws IOException, InterruptedException {
// key 和 value形成组合键,后期二次排序
context.write(new Text(key.toString() + " " + value.toString()),
new IntWritable(Integer.parseInt(value.toString())));
}
}

public static class MyReducer
extends Reducer<Text,IntWritable,Text,NullWritable> {

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {
// 将内容key value作为复合key输出
context.write(key, NullWritable.get());
}
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.244.131:9000");
// 设置源文件中字段之间的分隔符,默认是\t
// 此分隔符在 KeyValueTextInputFormat 中使用
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// partition file 在hdfs上的存储路径
Path partitionFile = new Path("hdfs://192.168.244.131:9000/secondPartition");
// 使用 RandomSampler 采集器对数据进行抽样划分界限
// 这里 RandomSampler<> 内K V的数据类型在本例中无用,可以随便写也可不写
// 因为在代码中使用的数据类型是 InputFormat 的数据类型
InputSampler.RandomSampler<NullWritable, NullWritable> randomSampler
= new InputSampler.RandomSampler<NullWritable, NullWritable>(0.5, 3);
// RandomSampler的另一个构造方法,当split较多时,可以使用
// = new InputSampler.RandomSampler<Text, Text>(0.5, 3, 2);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

Job job = Job.getInstance(conf, "TotalSort");
// 使用KeyValueTextInputFormat,默认是FileInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setJarByClass(SecondSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 模拟全局排序,设置reduce个数大于1
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 全局排序分区
job.setPartitionerClass(TotalOrderPartitioner.class);
// 重写排序方法
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));

FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(otherArgs[otherArgs.length - 1]))){
fs.delete(new Path(otherArgs[otherArgs.length - 1]), true);
}
// 将数据的分界点写入partition file中
InputSampler.writePartitionFile(job, randomSampler);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

上面的demo是在之前二次排序的基础上加上了全局排序,二次排序的思路在上篇文章中介绍过,全局排序使用的是Hadoop自带的TotalOrderPartitioner分区方法。

这里的难点是使用采集器RandomSampler生成partition file,这里容易出现类型不匹配的error
之前介绍partition file中存储的是key的分界点,根据分界点的值对map的输出进行分区。则partition file中存储的key应该和map output的key是同一类型(因为要让两者进行比较),而map的output key又是reduce的input key,则partition file中的key应该和map output key、reduce input key的类型一样。那么partition file中的key是怎么产生的呢?本demo是用RandomSampler来生成partition file,那么久应该让RandomSampler生成的partition file中的key的类型和map output key、reduce input key的类型一样
在new RandomSampler的时候,可以设置K V的数据类型,这里你可能会认为这个K V的数据类型应该就是输出到partition file中的数据类型吧,然而并不是。翻看代码发现输出到partition file的K的数据类型是由InputFormat的K决定的,V的数据类型在代码中写死的为NullWritable。又因为InputFormat K的数据类型决定了map input key的数据类型,则partition file中key的数据类型应该和InputFormat K、map input key的数据类型一样这也就是代码中设置InputFormat类型为KeyValueTextInputFormat.class的原因

综上所述,如果使用RandomSampler生成partition file,则必须保证partition file key、InputFormat K、map input key、map output key和reduce input key的类型一样。如果使用RandomSampler生成partition file(可能是自己写的MR),则只需保证partition file key、map output key和reduce input key的类型一样即可,因为在读partition file时,会调用job.getMapOutputKeyClass()设置读取的数据类型。

运行上面的代码会先将key的分界点写入partition file中,因为partition file是sequence file,使用hadoop命令查看hadoop dfs -text /secondPartition,结果为

1
2
# value 为null是因为在代码中设置为NullWritable类型
1993 (null)

然后根据其分界点对map的结果进行分区,查看全局排序的结果为:
part-r-00000
1990 10
1990 17
1990 22
1990 31
1991 18
1991 20
1991 27
1991 33
1992 22
1992 31
part-r-00001
1993 18
1993 33

可见其结果是全局排序并且是先按照key然后按照value排序的。只是由于分界点选择的问题,导致两个reduce处理的数据量不一样,出现了数据倾斜。

自定义InputFormatClass全局排序

上面的demo为了保证key的数据类型一致,使用了KeyValueTextInputFormat.class设置InputFormatClass,但是如果map output key为自定义的数据类型的话,那么InputFormatClass也必须自定义。自定义InputFormat时要重写getSplitscreateRecordReader方法,本例中的InputFormat方法继承自FileInputFormat,因为split的规则没有发生变化,所以只是重写了createRecordReader方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
public class EntityPairInputFormat extends FileInputFormat<EntityPair, Text> {
@Override
public RecordReader<EntityPair, Text> createRecordReader
(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
String delimiter = taskAttemptContext.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
// 模仿 TextInputFormat 的实现方式 并改写 LineRecordReader
// 自定义了行读取方法
return new EntitypairLineRecordReader(recordDelimiterBytes);
}
}
// EntitypairLineRecordReader.java
public class EntitypairLineRecordReader extends RecordReader<EntityPair, Text> {
private static final Log LOG = LogFactory.getLog(EntitypairLineRecordReader.class);
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";

private long start;
private long pos;
private long end;
private SplitLineReader in;
private FSDataInputStream fileIn;
private Seekable filePosition;
private EntityPair key;
private Text value;
private int maxLineLength;
private boolean isCompressedInput;
private byte[] recordDelimiterBytes;

public EntitypairLineRecordReader(byte[] recordDelimiterBytes) {
this.recordDelimiterBytes = recordDelimiterBytes;
}

@Override
public void initialize
(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration job = taskAttemptContext.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();

// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);

fileIn.seek(start);

in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}

private int maxBytesToConsume(long pos) {
return isCompressedInput
? Integer.MAX_VALUE
: (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
}

// 给key和value赋值
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new EntityPair();
}
if (value == null) {
value = new Text();
}
int newSize = 0;
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}

if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}

// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}

if (newSize == 0) {
key = null;
value = null;
return false;
} else {
// 打印value的值,可以看出value就是源文件中的数据
LOG.info("====EntitypairLineRecordReader.value : " + value.toString());
key.set(new Text(value.toString().split(" ")[0]),
new IntWritable(Integer.parseInt(value.toString().split(" ")[1])));
return true;
}
}

private int skipUtfByteOrderMark() throws IOException {
// Strip BOM(Byte Order Mark)
// Text only support UTF-8, we only need to check UTF-8 BOM
// (0xEF,0xBB,0xBF) at the start of the text stream.
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
Integer.MAX_VALUE);
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
// Even we read 3 extra bytes for the first line,
// we won't alter existing behavior (no backwards incompat issue).
// Because the newSize is less than maxLineLength and
// the number of bytes copied to Text is always no more than newSize.
// If the return size from readLine is not less than maxLineLength,
// we will discard the current line and read the next line.
pos += newSize;
int textLength = value.getLength();
byte[] textBytes = value.getBytes();
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
// find UTF-8 BOM, strip it.
LOG.info("Found UTF-8 BOM and skipped it");
textLength -= 3;
newSize -= 3;
if (textLength > 0) {
// It may work to use the same buffer and not do the copyBytes
textBytes = value.copyBytes();
value.set(textBytes, 3, textLength);
} else {
value.clear();
}
}
return newSize;
}

@Override
public EntityPair getCurrentKey() throws IOException, InterruptedException {
return this.key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return this.value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
}
}

private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}

@Override
public void close() throws IOException {
in.close();
}
}

在EntityPairInputFormat中又new了个新的RecordReader,用来读取record形成EntityPair(K)和Text(V)。新的RecordReader为EntitypairLineRecordReader

下面看下全局二次排序的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class EntitySecondSort {

private static final Logger log = Logger.getLogger(EntitySecondSort.class);

public static class MyMapper
extends Mapper<Object, Text, EntityPair, IntWritable> {

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");

// 将内容key value作为复合key输出
context.write(new EntityPair(new Text(arr[0]), new IntWritable(Integer.parseInt(arr[1]))),
new IntWritable(Integer.parseInt(arr[1])));
}
}

public static class MyReducer
extends Reducer<EntityPair,IntWritable,Text,IntWritable> {

public void reduce(EntityPair key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {

// 分组之后
context.write(new Text(key.getFirstKey()), val);
}
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.244.131:9000");

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}

Path partitionFile = new Path("hdfs://192.168.244.131:9000/secondPartition");
InputSampler.RandomSampler randomSampler
= new InputSampler.RandomSampler(0.5, 3, 2);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

Job job = Job.getInstance(conf, "EntitySecondSort");
// 自定义输入类型
job.setInputFormatClass(EntityPairInputFormat.class);
job.setJarByClass(EntitySecondSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(EntityPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 全局排序分区
job.setPartitionerClass(TotalOrderPartitioner.class);

// 重写排序方法
job.setSortComparatorClass(EntityComparator.class);
// 自定义分组算法
job.setGroupingComparatorClass(EntityGroup.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));

FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(otherArgs[otherArgs.length - 1]))){
fs.delete(new Path(otherArgs[otherArgs.length - 1]), true);
}

InputSampler.writePartitionFile(job, randomSampler);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

其结果为:
part-r-00000
1990 10
1990 17
1990 22
1990 31
part-r-00001
1991 18
1991 20
1991 27
1991 33
1992 22
1992 31
1993 18
1993 33

上面讲过使用RandomSampler生成partition file时必须保证partition file key、InputFormat K、map input key、map output key和reduce input key的类型一样,则partition file中存放的数据也是EntityPair类型的key,此时如果用hadoop dfs -text /secondPartition命令查看partition file时,会报错,因为无法解析EntityPair对象。那么如何去验证key的分界点是1991呢?

我并没有去手动反序列partition file,而是在InputSampler中写入partition file的代码处打了下log,将写入的key打印出来了,日志中显示分界点key为1991 27(之所以显示这个格式,是因为我重写了EntityPair的toString方法)。这就验证了分区的结果。

总结

本篇主要是利用MR进行全局排序,而全局排序的难点是如果对数据进行分区,并且尽量避免数据倾斜
本例使用的是Hadoop自身的TotalOrderPartitioner对数据进行分区,TotalOrderPartitioner会根据分界点的key形成一个利于查找的数据结构,使map的输出key在此数据结构中能够快速的定位到相应的reduce中,关于TotalOrderPartitioner的更多内容随后会有一篇文章来从源码的角度展开分析。
而分界点key是通过RandomSampler从源文件中随机抽取的。