上篇 介绍了二次排序的场景和解决方案,但是无法做到全局有序,本篇就介绍下如何进行全局二次排序。
全局排序 让整个MR结果做到全局有序,其难点在partition上,也就是要保证分配给reducei的key一定比分配给reducej的key小(i小于j)。那么难点就在partition边界的设定上,边界值设置的不恰当,可能会导致每个reduce上分配的记录数不均匀,发生数据倾斜会导致整个job会被个别task拖慢 。
那么最理想的方式是每个reduce上得到的record尽可能的均匀,并且在决定某个record被分配到哪个reduce的时间应尽可能的快,也就是partition方法需要尽可能的满足使record平均分配 和分配算法高效 。
Hadoop本身提供了TotalOrderPartitioner
类,此分区方法保证了相同key分配到同一个reduce的前提下更保证了key的全局有序。只是此方法依赖一个partition file,该file是一个sequence file,里面存放这每个reduce中key的分界点,如果我们设置的reducer个数是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的 。TotalOrderPartitioner将partition file中的key按照是BinaryComparable
类型的,如果是则将key构建一个trie树(查找的时间复杂度是O(n),n为trie树的深度)。如果是非BinaryComparable
类型的,则使用二分查找(时间复杂度为O(log(n)),n为reduce个数)。当查找的数据结构构建好之后,TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
那么面对大量的数据,partition file如何得到?Hadoop帮我们提供了采样器 帮我们预估整个边界,以使数据的分配尽量平均。Hadoop提供的采样工具有RandomSampler、InputSampler和IntervalSampler 。
其中
RandomSampler<K,V>为遍历所有数据,随机采样,效率有点低。
SplitSampler<K,V>是对前n个记录进行采样,效率是这三个里最高的。
IntervalSampler<K,V>是对固定间隔采样,效率较高,但比较适合有序的数据集。
对于partition file的值除了用上面的采样方法之外,还可以根据特定需求自己写MR对数据进行划分。本篇采用RandomSampler
对数据进行划分。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public class TotalSort { private static final Logger log = Logger.getLogger(TotalSort.class); public static class MyMapper extends Mapper <Text , Text , Text , IntWritable > { public void map (Text key, Text value, Context context ) throws IOException, InterruptedException { context.write(new Text(key.toString() + " " + value.toString()), new IntWritable(Integer.parseInt(value.toString()))); } } public static class MyReducer extends Reducer <Text ,IntWritable ,Text ,NullWritable > { public void reduce (Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(key, NullWritable.get()); } } } public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" ); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator" , " " ); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2 ) { System.err.println("Usage: wordcount <in> [<in>...] <out>" ); System.exit(2 ); } Path partitionFile = new Path("hdfs://192.168.244.131:9000/secondPartition" ); InputSampler.RandomSampler<NullWritable, NullWritable> randomSampler = new InputSampler.RandomSampler<NullWritable, NullWritable>(0.5 , 3 ); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); Job job = Job.getInstance(conf, "TotalSort" ); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setJarByClass(SecondSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2 ); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); for (int i = 0 ; i < otherArgs.length - 1 ; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1 ])); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(otherArgs[otherArgs.length - 1 ]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1 ]), true ); } InputSampler.writePartitionFile(job, randomSampler); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
上面的demo是在之前二次排序的基础上加上了全局排序,二次排序的思路在上篇文章中介绍过,全局排序使用的是Hadoop自带的TotalOrderPartitioner
分区方法。
这里的难点是使用采集器RandomSampler生成partition file,这里容易出现类型不匹配的error 。 之前介绍partition file中存储的是key的分界点,根据分界点的值对map的输出进行分区。则partition file中存储的key应该和map output的key是同一类型(因为要让两者进行比较 ),而map的output key又是reduce的input key,则partition file中的key应该和map output key、reduce input key的类型一样 。那么partition file中的key是怎么产生的呢?本demo是用RandomSampler来生成partition file,那么久应该让RandomSampler生成的partition file中的key的类型和map output key、reduce input key的类型一样 。 在new RandomSampler的时候,可以设置K V的数据类型,这里你可能会认为这个K V的数据类型应该就是输出到partition file中的数据类型吧,然而并不是。翻看代码发现输出到partition file的K的数据类型是由InputFormat的K决定的,V的数据类型在代码中写死的为NullWritable 。又因为InputFormat K的数据类型决定了map input key的数据类型,则partition file中key的数据类型应该和InputFormat K、map input key的数据类型一样 。这也就是代码中设置InputFormat类型为KeyValueTextInputFormat.class
的原因 。
综上所述,如果使用RandomSampler生成partition file,则必须保证partition file key、InputFormat K、map input key、map output key和reduce input key的类型一样 。如果不 使用RandomSampler生成partition file(可能是自己写的MR),则只需保证partition file key、map output key和reduce input key的类型一样即可 ,因为在读partition file时,会调用job.getMapOutputKeyClass()
设置读取的数据类型。
运行上面的代码会先将key的分界点写入partition file中,因为partition file是sequence file,使用hadoop命令查看hadoop dfs -text /secondPartition
,结果为
然后根据其分界点对map的结果进行分区,查看全局排序的结果为: part-r-00000 1990 10 1990 17 1990 22 1990 31 1991 18 1991 20 1991 27 1991 33 1992 22 1992 31 part-r-00001 1993 18 1993 33
可见其结果是全局排序并且是先按照key然后按照value排序的。只是由于分界点选择的问题,导致两个reduce处理的数据量不一样,出现了数据倾斜。
上面的demo为了保证key的数据类型一致,使用了KeyValueTextInputFormat.class设置InputFormatClass,但是如果map output key为自定义的数据类型的话,那么InputFormatClass也必须自定义。自定义InputFormat时要重写getSplits
和createRecordReader
方法,本例中的InputFormat方法继承自FileInputFormat
,因为split的规则没有发生变化,所以只是重写了createRecordReader
方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 public class EntityPairInputFormat extends FileInputFormat <EntityPair , Text > { @Override public RecordReader<EntityPair, Text> createRecordReader (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { String delimiter = taskAttemptContext.getConfiguration().get( "textinputformat.record.delimiter" ); byte [] recordDelimiterBytes = null ; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new EntitypairLineRecordReader(recordDelimiterBytes); } } public class EntitypairLineRecordReader extends RecordReader <EntityPair , Text > { private static final Log LOG = LogFactory.getLog(EntitypairLineRecordReader.class); public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength" ; private long start; private long pos; private long end; private SplitLineReader in; private FSDataInputStream fileIn; private Seekable filePosition; private EntityPair key; private Text value; private int maxLineLength; private boolean isCompressedInput; private byte [] recordDelimiterBytes; public EntitypairLineRecordReader (byte [] recordDelimiterBytes) { this .recordDelimiterBytes = recordDelimiterBytes; } @Override public void initialize (InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration job = taskAttemptContext.getConfiguration(); this .maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); fileIn.seek(start); in = new SplitLineReader(fileIn, job, this .recordDelimiterBytes); filePosition = fileIn; if (start != 0 ) { start += in.readLine(new Text(), 0 , maxBytesToConsume(start)); } this .pos = start; } private int maxBytesToConsume (long pos) { return isCompressedInput ? Integer.MAX_VALUE : (int ) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } @Override public boolean nextKeyValue () throws IOException, InterruptedException { if (key == null ) { key = new EntityPair(); } if (value == null ) { value = new Text(); } int newSize = 0 ; while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0 ) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0 ) || (newSize < maxLineLength)) { break ; } LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0 ) { key = null ; value = null ; return false ; } else { LOG.info("====EntitypairLineRecordReader.value : " + value.toString()); key.set(new Text(value.toString().split(" " )[0 ]), new IntWritable(Integer.parseInt(value.toString().split(" " )[1 ]))); return true ; } } private int skipUtfByteOrderMark () throws IOException { int newMaxLineLength = (int ) Math.min(3L + (long ) maxLineLength, Integer.MAX_VALUE); int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); pos += newSize; int textLength = value.getLength(); byte [] textBytes = value.getBytes(); if ((textLength >= 3 ) && (textBytes[0 ] == (byte )0xEF ) && (textBytes[1 ] == (byte )0xBB ) && (textBytes[2 ] == (byte )0xBF )) { LOG.info("Found UTF-8 BOM and skipped it" ); textLength -= 3 ; newSize -= 3 ; if (textLength > 0 ) { textBytes = value.copyBytes(); value.set(textBytes, 3 , textLength); } else { value.clear(); } } return newSize; } @Override public EntityPair getCurrentKey () throws IOException, InterruptedException { return this .key; } @Override public Text getCurrentValue () throws IOException, InterruptedException { return this .value; } @Override public float getProgress () throws IOException, InterruptedException { if (start == end) { return 0.0f ; } else { return Math.min(1.0f , (getFilePosition() - start) / (float )(end - start)); } } private long getFilePosition () throws IOException { long retVal; if (isCompressedInput && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } @Override public void close () throws IOException { in.close(); } }
在EntityPairInputFormat中又new了个新的RecordReader
,用来读取record形成EntityPair(K)和Text(V)。新的RecordReader为EntitypairLineRecordReader
。
下面看下全局二次排序的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 public class EntitySecondSort { private static final Logger log = Logger.getLogger(EntitySecondSort.class); public static class MyMapper extends Mapper <Object , Text , EntityPair , IntWritable > { public void map (Object key, Text value, Context context ) throws IOException, InterruptedException { String[] arr = value.toString().split(" " ); context.write(new EntityPair(new Text(arr[0 ]), new IntWritable(Integer.parseInt(arr[1 ]))), new IntWritable(Integer.parseInt(arr[1 ]))); } } public static class MyReducer extends Reducer <EntityPair ,IntWritable ,Text ,IntWritable > { public void reduce (EntityPair key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(new Text(key.getFirstKey()), val); } } } public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" ); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2 ) { System.err.println("Usage: wordcount <in> [<in>...] <out>" ); System.exit(2 ); } Path partitionFile = new Path("hdfs://192.168.244.131:9000/secondPartition" ); InputSampler.RandomSampler randomSampler = new InputSampler.RandomSampler(0.5 , 3 , 2 ); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); Job job = Job.getInstance(conf, "EntitySecondSort" ); job.setInputFormatClass(EntityPairInputFormat.class); job.setJarByClass(EntitySecondSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2 ); job.setMapOutputKeyClass(EntityPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setSortComparatorClass(EntityComparator.class); job.setGroupingComparatorClass(EntityGroup.class); for (int i = 0 ; i < otherArgs.length - 1 ; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1 ])); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(otherArgs[otherArgs.length - 1 ]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1 ]), true ); } InputSampler.writePartitionFile(job, randomSampler); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
其结果为: part-r-00000 1990 10 1990 17 1990 22 1990 31 part-r-00001 1991 18 1991 20 1991 27 1991 33 1992 22 1992 31 1993 18 1993 33
上面讲过使用RandomSampler生成partition file时必须保证partition file key、InputFormat K、map input key、map output key和reduce input key的类型一样,则partition file中存放的数据也是EntityPair类型的key,此时如果用hadoop dfs -text /secondPartition
命令查看partition file时,会报错,因为无法解析EntityPair对象。那么如何去验证key的分界点是1991呢?
我并没有去手动反序列partition file,而是在InputSampler
中写入partition file的代码处打了下log,将写入的key打印出来了,日志中显示分界点key为1991 27
(之所以显示这个格式,是因为我重写了EntityPair的toString方法)。这就验证了分区的结果。
总结 本篇主要是利用MR进行全局排序,而全局排序的难点是如果对数据进行分区,并且尽量避免数据倾斜 。 本例使用的是Hadoop自身的TotalOrderPartitioner
对数据进行分区,TotalOrderPartitioner会根据分界点的key形成一个利于查找的数据结构,使map的输出key在此数据结构中能够快速的定位到相应的reduce中,关于TotalOrderPartitioner的更多内容随后会有一篇文章来从源码的角度展开分析。 而分界点key是通过RandomSampler从源文件中随机抽取的。