MapReduce天生就具有排序的特性,但是面对稍微复杂的排序时我们还是希望能够充分利用其自身的设计原理来达到我们的目的。其中二次排序就是一个很好的例子,下面主要介绍下二次排序。
二次排序的场景是不仅需要对key排序依然需要对value中的某个值进行排序,也就是先对key排序然后对相同key的record再对value进行排序。
对key排序我们可以使用MR自身的排序,但是怎么对value中的某个值进行排序呢?
MapReduce留出了很多可以自定义的接口,比如partition、comparator和group等等接口,这里只需用到这三个,其它以后用的接口再介绍。
之前有几篇介绍MapReduce流程的blog,可以自行搜索,关键字为MapReduce源码解析 。
 
熟悉MapReduce流程的同学会知道key的第一次排序 发生在map端的sortAndSpill阶段,此阶段是将内存中的数据先根据partition进行排序然后再对key排序(排序算法是改进的快排), 第二次排序 发生在reduce端的merge阶段,此阶段是将从map端copy来的segment(局部有序的数据,局部有序的原因是map端已将数据按key排序,reduce copy时从n个map中将此reduce需要的数据复制过来,则每个map内的数据是有序的,而各个map之间的数据是无序的 )进行堆排序,使数据按照key有序。(这里的第二次排序其实也可以说是第三次排序 ,因为在map端也会有个merge阶段,将spill到磁盘的临时文件merge成一个大文件,这个过程将spills文件中按照partition和key进行排序)
由此可知MapReduce整个流程是以key排序为核心的,那么针对上面的需求是否可以在key上做点文章呢?
针对上面需求的解决方案是将需要排序的key和value1(value中的某个属性)组成一个复合键compositeKey,在map端按照key分区和排序,则相同的key被分到同一个reduce中,在reduce中对相同key的value1进行排序,然后根据key进行分组,以便形成相同key的Iterator,这时输出的数据就是按照key和value1排序的。 
这里需要注意下由于partition选择的方法不一样,可能会导致最终的结果可能是reduce内有序和全局有序 。
 
问题 问题如下:
先来看下根据复合键compositeKey中的key进行hash分区的代码
hash分区代码示例 复合键compositeKey的形成有两种形式:一种是将key和value拼接成一个字符串,key和value之前用特殊字符分隔,另一种是实现WritableComparable接口 自己写一个新的数据类型。
先来个简单的,将key和value拼接为一个字符串
key和value拼接字符串为复合键 key和value拼接为字符串是在map中执行的,先看下map的代码:
1 2 3 4 5 6 7 8 9 10 11 12 public  static  class  MyMapper         extends  Mapper <Object , Text , Text , IntWritable >  {    public  void  map (Object key, Text value, Context context      )  throws  IOException, InterruptedException         String[] arr = value.toString().split(" " );                  context.write(new  Text(arr[0 ] + " "  + arr[1 ]), new  IntWritable(Integer.parseInt(arr[1 ])));     } } 
源文件里的key和value是用空格分隔的,在map中将key和value分隔开然后将key和value组合为一个新的newKey,将value依然作为value输出。
这里既然看了map的代码,那么不防再看下reduce代码:
1 2 3 4 5 6 7 8 9 10 11 12 public  static  class  MyReducer         extends  Reducer <Text ,IntWritable ,Text ,NullWritable >  {    public  void  reduce (Text key, Iterable<IntWritable> values,                         Context context     )  throws  IOException, InterruptedException         for  (IntWritable val : values) {                            context.write(key, NullWritable.get());         }     } } 
reduce只是将key(这里的key是指在map中组合之后的复合键 )输出,value是hadoop中提供的null对象NullWritable 。
这里之所以只是直接将key输出是因为我们在**整个MR流程中通过key的第一个字段进行分区和分组,比较两个key的大小时是先比较key的第一个字段,相同时再比较第二个字段(这里需要注意的是源文件中第二个字段是int,如果第二个字段也是string类型的,则就可以利用Text自身的比较器进行两个key的比较)**。这样的好处是你可以自定义key和value之前的分隔符(而不用重写outputFormater去定义key和value之间的分隔符),但也有一些局限性,具体看自己需求吧。
下面来看下我们是怎么对这个复合键进行处理的。首先看下分区策略:
自定义分区策略时要继承Partitioner类,重写getPartition方法,代码如下:
1 2 3 4 5 6 7 8 9 public  class  FirstPartition  extends  Partitioner <Text , IntWritable >     @Override      public  int  getPartition (Text text, IntWritable intWritable, int  i)       	     	         return  Math.abs(text.toString().split(" " )[0 ].hashCode() * 127 ) % i;     } } 
对数据分区之后,就该对复合键进行排序了,这里的比较原则是先将复合键进行切分,然后先对第一个字段进行比较,相同之后在对第二个字段进行比较。由于源文件中key是string,value是int,将key和value组成Text类型的复合键时,不能使用Text自身的比较器(因为value是int),则这里需要自定义比较器 。
Hadoop中自定义比较器要继承WritableComparator并且重载compare(WritableComparable a, WritableComparable b)方法(需要特别注意的是,必须有一个构造函数 ),自定义比较器也可以实现RawComparator接口。这里是继承WritableComparator类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public  class  KeyComparator  extends  WritableComparator           public  KeyComparator ()          super (Text.class, true );     }     @Override      public  int  compare (WritableComparable a, WritableComparable b)       	         String[] arr_a = a.toString().split(" " );         String[] arr_b = b.toString().split(" " );         System.out.println("=========KeyComparator=========" );                  if  (arr_a[0 ].compareTo(arr_b[0 ]) != 0 ){             return  arr_a[0 ].compareTo(arr_b[0 ]);         }else  {             return  Integer.parseInt(arr_a[1 ]) - Integer.parseInt(arr_b[1 ]);         }     } } 
到此代码其实就已经实现了reduce输出的局部有序,但有些场景也需要重写reduce端的分组策略,所以这里也加上自定义的分组策略。自定义分组策略时要其实也是重写一个比较器 ,这里依然采用继承WritableComparator类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  GroupComparator  extends  WritableComparator           public  GroupComparator ()          super (Text.class, true );     }     @Override      public  int  compare (WritableComparable a, WritableComparable b)           String[] arr_a = a.toString().split(" " );         String[] arr_b = b.toString().split(" " );         System.out.println("=========GroupComparator=========" );                  return  arr_a[0 ].compareTo(arr_b[0 ]);     } } 
最后就是main主类了,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public  static  void  main (String[] args)  throws  Exception     Configuration conf = new  Configuration();          conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" );     String[] otherArgs = new  GenericOptionsParser(conf, args).getRemainingArgs();     if  (otherArgs.length < 2 ) {         System.err.println("Usage: wordcount <in> [<in>...] <out>" );         System.exit(2 );     }     Job job = Job.getInstance(conf, "word count" );     job.setJarByClass(SecondSort.class);     job.setMapperClass(MyMapper.class);     job.setReducerClass(MyReducer.class);          job.setNumReduceTasks(2 );          job.setMapOutputKeyClass(Text.class);     job.setMapOutputValueClass(IntWritable.class);     job.setOutputKeyClass(Text.class);     job.setOutputValueClass(NullWritable.class);               job.setPartitionerClass(FirstPartition.class);          job.setSortComparatorClass(KeyComparator.class);          job.setGroupingComparatorClass(GroupComparator.class);     for  (int  i = 0 ; i < otherArgs.length - 1 ; ++i) {         FileInputFormat.addInputPath(job, new  Path(otherArgs[i]));     }     FileOutputFormat.setOutputPath(job,             new  Path(otherArgs[otherArgs.length - 1 ]));          FileSystem fs = FileSystem.get(conf);     if  (fs.exists(new  Path(otherArgs[otherArgs.length - 1 ]))){         fs.delete(new  Path(otherArgs[otherArgs.length - 1 ]), true );     }     System.exit(job.waitForCompletion(true ) ? 0  : 1 ); } 
输出结果如下:
自定义数据类型为reduce的key 将需要排序的字段组合为一个新的数据类型,由此新数据类型作为key。Hadoop自定义数据类型时需要实现WritableComparable接口。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public  class  EntityPair  implements  WritableComparable <EntityPair >    private  Text firstKey;     private  IntWritable secondKey;     public  Text getFirstKey ()           return  firstKey;     }     public  void  setFirstKey (Text firstKey)           this .firstKey = firstKey;     }     public  IntWritable getSecondKey ()           return  secondKey;     }     public  void  setSecondKey (IntWritable secondKey)           this .secondKey = secondKey;     }     public  EntityPair (Text firstKey, IntWritable secondKey)           this .firstKey = firstKey;         this .secondKey = secondKey;     }     public  EntityPair ()       }     @Override      public  int  compareTo (EntityPair o)           return  this .firstKey.compareTo(o.getFirstKey());     }     @Override      public  void  write (DataOutput dataOutput)  throws  IOException          dataOutput.writeUTF(firstKey.toString());         dataOutput.writeInt(secondKey.get());     }     @Override      public  void  readFields (DataInput dataInput)  throws  IOException          firstKey = new  Text(dataInput.readUTF());         secondKey = new  IntWritable(dataInput.readInt());     }     @Override      public  String toString ()          return  this .getFirstKey() + " "  + this .getSecondKey();     } } 
则分区策略、比较规则和分组策略和上一节的思路大体一样,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public  class  EntityPartition  extends  Partitioner <EntityPair , IntWritable >     @Override      public  int  getPartition (EntityPair text, IntWritable intWritable, int  i)       	         return  Math.abs(text.getFirstKey().hashCode() * 127 ) % i;     } } public  class  EntityComparator  extends  WritableComparator      public  EntityComparator ()          super (EntityPair.class, true );     }     @Override      public  int  compare (WritableComparable a, WritableComparable b)           EntityPair entityPair1 = (EntityPair) a;         EntityPair entityPair2 = (EntityPair) b;         System.out.println("=========Comparator=========" );         if  (!entityPair1.getFirstKey().toString().equals(entityPair2.getFirstKey().toString())){             return  entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString());         }else  {             return  entityPair1.getSecondKey().get() - entityPair2.getSecondKey().get();         }     } } public  class  EntityGroup   extends  WritableComparator      public  EntityGroup ()          super (EntityPair.class, true );     }     @Override      public  int  compare (WritableComparable a, WritableComparable b)           EntityPair entityPair1 = (EntityPair) a;         EntityPair entityPair2 = (EntityPair) b;         System.out.println("=========GroupComparator=========" );         return  entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString());     } } 
下面是MR类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public  class  EntitySecondSort      private  static  final  Logger log = Logger.getLogger(EntitySecondSort.class);     public  static  class  MyMapper              extends  Mapper <Object , Text , EntityPair , IntWritable >  {        public  void  map (Object key, Text value, Context context          )  throws  IOException, InterruptedException             String[] arr = value.toString().split(" " );                          context.write(new  EntityPair(new  Text(arr[0 ]), new  IntWritable(Integer.parseInt(arr[1 ]))),                     new  IntWritable(Integer.parseInt(arr[1 ])));         }     }     public  static  class  MyReducer              extends  Reducer <EntityPair ,IntWritable ,Text ,IntWritable >  {        public  void  reduce (EntityPair key, Iterable<IntWritable> values,                             Context context         )  throws  IOException, InterruptedException             for  (IntWritable val : values) {                                  context.write(new  Text(key.getFirstKey()), val);             }         }     }     public  static  void  main (String[] args)  throws  Exception          Configuration conf = new  Configuration();         conf.set("fs.defaultFS" , "hdfs://192.168.244.131:9000" );         String[] otherArgs = new  GenericOptionsParser(conf, args).getRemainingArgs();         if  (otherArgs.length < 2 ) {             System.err.println("Usage: wordcount <in> [<in>...] <out>" );             System.exit(2 );         }         Job job = Job.getInstance(conf, "word count" );         job.setJarByClass(EntitySecondSort.class);         job.setMapperClass(MyMapper.class);         job.setReducerClass(MyReducer.class);         job.setNumReduceTasks(2 );         job.setMapOutputKeyClass(EntityPair.class);         job.setMapOutputValueClass(IntWritable.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);                  job.setPartitionerClass(EntityPartition.class);                  job.setSortComparatorClass(EntityComparator.class);                  job.setGroupingComparatorClass(EntityGroup.class);         for  (int  i = 0 ; i < otherArgs.length - 1 ; ++i) {             FileInputFormat.addInputPath(job, new  Path(otherArgs[i]));         }         FileOutputFormat.setOutputPath(job,                 new  Path(otherArgs[otherArgs.length - 1 ]));         FileSystem fs = FileSystem.get(conf);         if  (fs.exists(new  Path(otherArgs[otherArgs.length - 1 ]))){             fs.delete(new  Path(otherArgs[otherArgs.length - 1 ]), true );         }         System.exit(job.waitForCompletion(true ) ? 0  : 1 );     } } 
这里的分组策略也不是必须的,(只有在reduce中需要对相同key的values进行合并操作时,才需要对其records根据复合键的第一个值进行分组。)
上面的reduce代码输出的是键值对,之所以这样是考虑到value中可能包含很多属性,而只需要对其中的某一个属性value1进行排序,则将剩余的属性列出。
关于二次排序的例子Hadoop自带的MapReduce例子中也有样例,类名SecondarySort
 
上面的代码只实现了reduce内有序,各个reduce之间是无序的,那么如何得到一个全局有序的结果呢 ?看下篇 
附加: equals和hashCode的关系 这里补充下equals和hashCode的关系
equals和hashCode都是Object类的方法,可以在任务一个类中重写,Object默认 的equals实现是判断两个对象的地址是否相等 (即,是否是同一个对象)来区分它们是否相等,**此时等价于”==”**。但是equals方法往往被类重写,用来判断两个对象的内容是否相等,如String类的equals方法。
hashCode主要是用来判断对象在散列表中的位置,则如果两个对象相同则在散列表中的位置也应该是相同的,但是判断两个对象是否相同是由equals判断的,则*__如果某个类的对象要在散列表中使用__,重写equals方法时往往也要重写hashCode方法,以保证equals为true时,hashCode是相同的*。如果某个类不会出现在散列表中,则equals和hashCode并没有什么直接的关系。