MapReduce天生就具有排序的特性,但是面对稍微复杂的排序时我们还是希望能够充分利用其自身的设计原理来达到我们的目的。其中二次排序就是一个很好的例子,下面主要介绍下二次排序。
二次排序的场景是不仅需要对key排序依然需要对value中的某个值进行排序,也就是先对key排序然后对相同key的record再对value进行排序。
对key排序我们可以使用MR自身的排序,但是怎么对value中的某个值进行排序呢?
MapReduce留出了很多可以自定义的接口,比如partition、comparator和group等等接口,这里只需用到这三个,其它以后用的接口再介绍。
之前有几篇介绍MapReduce流程的blog,可以自行搜索,关键字为MapReduce源码解析 。
熟悉MapReduce流程的同学会知道key的第一次排序 发生在map端的sortAndSpill阶段,此阶段是将内存中的数据先根据partition进行排序然后再对key排序(排序算法是改进的快排), 第二次排序 发生在reduce端的merge阶段,此阶段是将从map端copy来的segment(局部有序的数据,局部有序的原因是map端已将数据按key排序,reduce copy时从n个map中将此reduce需要的数据复制过来,则每个map内的数据是有序的,而各个map之间的数据是无序的 )进行堆排序,使数据按照key有序。(这里的第二次排序其实也可以说是第三次排序 ,因为在map端也会有个merge阶段,将spill到磁盘的临时文件merge成一个大文件,这个过程将spills文件中按照partition和key进行排序)
由此可知MapReduce整个流程是以key排序为核心的,那么针对上面的需求是否可以在key上做点文章呢?
针对上面需求的解决方案是将需要排序的key和value1(value中的某个属性)组成一个复合键compositeKey,在map端按照key分区和排序,则相同的key被分到同一个reduce中,在reduce中对相同key的value1进行排序,然后根据key进行分组,以便形成相同key的Iterator,这时输出的数据就是按照key和value1排序的。
这里需要注意下由于partition选择的方法不一样,可能会导致最终的结果可能是reduce内有序和全局有序 。
问题 问题如下: 有两个文件a.txt和b.txt a.txt中的内容是 1990 31 1991 20 1991 18 1991 33 1990 22 1990 17 b.txt中的内容是 1992 31 1991 27 1993 18 1993 33 1992 22 1990 10 想要的排序结果是 1990 10 1990 17 1990 22 1990 31 1991 18 1991 20 1991 27 1991 33 1992 22 1992 31 1993 18 1993 33
先来看下根据复合键compositeKey中的key进行hash分区的代码
hash分区代码示例 复合键compositeKey的形成有两种形式:一种是将key和value拼接成一个字符串,key和value之前用特殊字符分隔,另一种是实现WritableComparable接口 自己写一个新的数据类型。
先来个简单的,将key和value拼接为一个字符串
key和value拼接字符串为复合键 key和value拼接为字符串是在map中执行的,先看下map的代码:
1 2 3 4 5 6 7 8 9 10 11 12 public static class MyMapper extends Mapper <Object , Text , Text , IntWritable > { public void map (Object key, Text value, Context context ) throws IOException, InterruptedException { String[] arr = value.toString().split(" " ); context.write(new Text(arr[0 ] + " " + arr[1 ]), new IntWritable(Integer.parseInt(arr[1 ]))); } }
源文件里的key和value是用空格分隔的,在map中将key和value分隔开然后将key和value组合为一个新的newKey,将value依然作为value输出。
这里既然看了map的代码,那么不防再看下reduce代码:
1 2 3 4 5 6 7 8 9 10 11 12 public static class MyReducer extends Reducer <Text ,IntWritable ,Text ,NullWritable > { public void reduce (Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(key, NullWritable.get()); } } }
reduce只是将key(这里的key是指在map中组合之后的复合键 )输出,value是hadoop中提供的null对象NullWritable 。
这里之所以只是直接将key输出是因为我们在**整个MR流程中通过key的第一个字段进行分区和分组,比较两个key的大小时是先比较key的第一个字段,相同时再比较第二个字段(这里需要注意的是源文件中第二个字段是int,如果第二个字段也是string类型的,则就可以利用Text自身的比较器进行两个key的比较)**。这样的好处是你可以自定义key和value之前的分隔符(而不用重写outputFormater去定义key和value之间的分隔符),但也有一些局限性,具体看自己需求吧。
下面来看下我们是怎么对这个复合键进行处理的。首先看下分区策略:
自定义分区策略时要继承Partitioner
类,重写getPartition
方法,代码如下:
1 2 3 4 5 6 7 8 9 public class FirstPartition extends Partitioner <Text , IntWritable > { @Override public int getPartition (Text text, IntWritable intWritable, int i) { return Math.abs(text.toString().split(" " )[0 ].hashCode() * 127 ) % i; } }
对数据分区之后,就该对复合键进行排序了,这里的比较原则是先将复合键进行切分,然后先对第一个字段进行比较,相同之后在对第二个字段进行比较。由于源文件中key是string,value是int,将key和value组成Text类型的复合键时,不能使用Text自身的比较器(因为value是int),则这里需要自定义比较器 。
Hadoop中自定义比较器要继承WritableComparator
并且重载compare(WritableComparable a, WritableComparable b)
方法(需要特别注意的是,必须有一个构造函数 ),自定义比较器也可以实现RawComparator
接口。这里是继承WritableComparator
类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class KeyComparator extends WritableComparator { public KeyComparator () { super (Text.class, true ); } @Override public int compare (WritableComparable a, WritableComparable b) { String[] arr_a = a.toString().split(" " ); String[] arr_b = b.toString().split(" " ); System.out.println("=========KeyComparator=========" ); if (arr_a[0 ].compareTo(arr_b[0 ]) != 0 ){ return arr_a[0 ].compareTo(arr_b[0 ]); }else { return Integer.parseInt(arr_a[1 ]) - Integer.parseInt(arr_b[1 ]); } } }
到此代码其实就已经实现了reduce输出的局部有序,但有些场景也需要重写reduce端的分组策略,所以这里也加上自定义的分组策略。自定义分组策略时要其实也是重写一个比较器 ,这里依然采用继承WritableComparator
类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class GroupComparator extends WritableComparator { public GroupComparator () { super (Text.class, true ); } @Override public int compare (WritableComparable a, WritableComparable b) { String[] arr_a = a.toString().split(" " ); String[] arr_b = b.toString().split(" " ); System.out.println("=========GroupComparator=========" ); return arr_a[0 ].compareTo(arr_b[0 ]); } }
最后就是main主类了,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" ); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2 ) { System.err.println("Usage: wordcount <in> [<in>...] <out>" ); System.exit(2 ); } Job job = Job.getInstance(conf, "word count" ); job.setJarByClass(SecondSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2 ); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(FirstPartition.class); job.setSortComparatorClass(KeyComparator.class); job.setGroupingComparatorClass(GroupComparator.class); for (int i = 0 ; i < otherArgs.length - 1 ; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1 ])); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(otherArgs[otherArgs.length - 1 ]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1 ]), true ); } System.exit(job.waitForCompletion(true ) ? 0 : 1 ); }
输出结果如下: 因为有两个reduce则有两个输出,part-r-00000和part-r-00001。 part-r-00000的内容如下: 1991 18 1991 20 1991 27 1991 33 1993 18 1993 33 part-r-00001的内容如下: 1990 10 1990 17 1990 22 1990 31 1992 22 1992 31
自定义数据类型为reduce的key 将需要排序的字段组合为一个新的数据类型,由此新数据类型作为key。Hadoop自定义数据类型时需要实现WritableComparable
接口。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class EntityPair implements WritableComparable <EntityPair > { private Text firstKey; private IntWritable secondKey; public Text getFirstKey () { return firstKey; } public void setFirstKey (Text firstKey) { this .firstKey = firstKey; } public IntWritable getSecondKey () { return secondKey; } public void setSecondKey (IntWritable secondKey) { this .secondKey = secondKey; } public EntityPair (Text firstKey, IntWritable secondKey) { this .firstKey = firstKey; this .secondKey = secondKey; } public EntityPair () { } @Override public int compareTo (EntityPair o) { return this .firstKey.compareTo(o.getFirstKey()); } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeUTF(firstKey.toString()); dataOutput.writeInt(secondKey.get()); } @Override public void readFields (DataInput dataInput) throws IOException { firstKey = new Text(dataInput.readUTF()); secondKey = new IntWritable(dataInput.readInt()); } @Override public String toString () { return this .getFirstKey() + " " + this .getSecondKey(); } }
则分区策略、比较规则和分组策略和上一节的思路大体一样,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class EntityPartition extends Partitioner <EntityPair , IntWritable > { @Override public int getPartition (EntityPair text, IntWritable intWritable, int i) { return Math.abs(text.getFirstKey().hashCode() * 127 ) % i; } } public class EntityComparator extends WritableComparator { public EntityComparator () { super (EntityPair.class, true ); } @Override public int compare (WritableComparable a, WritableComparable b) { EntityPair entityPair1 = (EntityPair) a; EntityPair entityPair2 = (EntityPair) b; System.out.println("=========Comparator=========" ); if (!entityPair1.getFirstKey().toString().equals(entityPair2.getFirstKey().toString())){ return entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString()); }else { return entityPair1.getSecondKey().get() - entityPair2.getSecondKey().get(); } } } public class EntityGroup extends WritableComparator { public EntityGroup () { super (EntityPair.class, true ); } @Override public int compare (WritableComparable a, WritableComparable b) { EntityPair entityPair1 = (EntityPair) a; EntityPair entityPair2 = (EntityPair) b; System.out.println("=========GroupComparator=========" ); return entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString()); } }
下面是MR类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class EntitySecondSort { private static final Logger log = Logger.getLogger(EntitySecondSort.class); public static class MyMapper extends Mapper <Object , Text , EntityPair , IntWritable > { public void map (Object key, Text value, Context context ) throws IOException, InterruptedException { String[] arr = value.toString().split(" " ); context.write(new EntityPair(new Text(arr[0 ]), new IntWritable(Integer.parseInt(arr[1 ]))), new IntWritable(Integer.parseInt(arr[1 ]))); } } public static class MyReducer extends Reducer <EntityPair ,IntWritable ,Text ,IntWritable > { public void reduce (EntityPair key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(new Text(key.getFirstKey()), val); } } } public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" ); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2 ) { System.err.println("Usage: wordcount <in> [<in>...] <out>" ); System.exit(2 ); } Job job = Job.getInstance(conf, "word count" ); job.setJarByClass(EntitySecondSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2 ); job.setMapOutputKeyClass(EntityPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setPartitionerClass(EntityPartition.class); job.setSortComparatorClass(EntityComparator.class); job.setGroupingComparatorClass(EntityGroup.class); for (int i = 0 ; i < otherArgs.length - 1 ; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1 ])); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(otherArgs[otherArgs.length - 1 ]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1 ]), true ); } System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } }
这里的分组策略也不是必须的,(只有在reduce中需要对相同key的values进行合并操作时,才需要对其records根据复合键的第一个值进行分组。)
上面的reduce代码输出的是键值对,之所以这样是考虑到value中可能包含很多属性,而只需要对其中的某一个属性value1进行排序,则将剩余的属性列出。
关于二次排序的例子Hadoop自带的MapReduce例子中也有样例,类名SecondarySort
上面的代码只实现了reduce内有序,各个reduce之间是无序的,那么如何得到一个全局有序的结果呢 ?看下篇
附加: equals和hashCode的关系 这里补充下equals和hashCode的关系
equals和hashCode都是Object类的方法,可以在任务一个类中重写,Object默认 的equals实现是判断两个对象的地址是否相等 (即,是否是同一个对象)来区分它们是否相等,**此时等价于”==”**。但是equals方法往往被类重写,用来判断两个对象的内容是否相等,如String类的equals方法。
hashCode主要是用来判断对象在散列表中的位置,则如果两个对象相同则在散列表中的位置也应该是相同的,但是判断两个对象是否相同是由equals判断的,则*__如果某个类的对象要在散列表中使用__,重写equals方法时往往也要重写hashCode方法,以保证equals为true时,hashCode是相同的*。如果某个类不会出现在散列表中,则equals和hashCode并没有什么直接的关系。