1.概念瞭解java
在hadoop中默認的排序算法中,只會針對key值進行排序。當key值相同時,須要對value進行排序。算法
簡單來講,就是在數據文件中,若是按照第一列升序排序,當第一列相同時,第二列按照升序排序。app
2.應用實例ide
輸入樣例 輸出樣例oop
3.算法思想orm
本樣例使用了自定義的類MyGrouptest類,對兩個數的讀取、寫入和比較進行了定義。blog
在map中對key的值是默認排序的,因此將第一列的值做爲key,value值任意,傳到reduce中。在reduce中使用MyGrouptest類,對數值進行比較,從而獲得第二列的排序。排序
4.代碼實現token
**MyGrouptest.javahadoop
public class MyGrouptest implements WritableComparable<MyGrouptest> {
long firstNum;
long secondNum;
public MyGrouptest() {}
public MyGrouptest(long first, long second) {
firstNum = first;
secondNum = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
@Override
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
}
/* * 當key進行排序時會調用如下這個compreTo方法 */
@Override
public int compareTo(MyGrouptest anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) { // 說明第一列不相等,則返回兩數之間小的數
return (int) min;
}
else {
return (int) (secondNum - anotherKey.secondNum);
}
}
public long getFirstNum() {
return firstNum;
}
public long getSecondNum() {
return secondNum;
}
}
***Groupsort.java
public class GroupSort {
static String INPUT_PATH="hdfs://master:9000/input/f.txt";
static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";
static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{
MyGrouptest output_key=new MyGrouptest();
NullWritable output_value=NullWritable.get();
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
String[] tokens=value.toString().split(",",2);
MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));
context.write(output_key,output_value);
}
}
static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{
LongWritable output_key=new LongWritable();
LongWritable output_value=new LongWritable();
protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{
output_key.set(key.getFirstNum());
output_value.set(key.getSecondNum());
context.write(output_key,output_value);
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(LiuPartitioner.class);
job.setMapOutputKeyClass(MyGrouptest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.waitForCompletion(true);
}
}