大數據學習之十四——二次排序

1.概念瞭解java

在hadoop中默認的排序算法中,只會針對key值進行排序。當key值相同時,須要對value進行排序。算法

簡單來講,就是在數據文件中,若是按照第一列升序排序,當第一列相同時,第二列按照升序排序。app

2.應用實例ide

輸入樣例                                         輸出樣例oop

    

3.算法思想orm

本樣例使用了自定義的類MyGrouptest類,對兩個數的讀取、寫入和比較進行了定義。blog

在map中對key的值是默認排序的,因此將第一列的值做爲key,value值任意,傳到reduce中。在reduce中使用MyGrouptest類,對數值進行比較,從而獲得第二列的排序。排序

4.代碼實現token

**MyGrouptest.javahadoop

public class MyGrouptest implements WritableComparable<MyGrouptest> {         

long firstNum;           

long secondNum;        

public MyGrouptest() {}     

 public MyGrouptest(long first, long second) {             

 firstNum = first;             

 secondNum = second;       

 }        

@Override         

public void write(DataOutput out) throws IOException {             

 out.writeLong(firstNum);              

out.writeLong(secondNum);         

}      

 @Override       

 public void readFields(DataInput in) throws IOException {             

 firstNum = in.readLong();              

secondNum = in.readLong();         

}   

/*         * 當key進行排序時會調用如下這個compreTo方法         */         

@Override         

public int compareTo(MyGrouptest anotherKey) {               

long min = firstNum - anotherKey.firstNum;              

 if (min != 0) {                // 說明第一列不相等,則返回兩數之間小的數                    

return (int) min;              

 }    

else {                   

 return (int) (secondNum - anotherKey.secondNum);              

 }         

}     

public long getFirstNum() {   

return firstNum;

 }

 public long getSecondNum() {   

return secondNum;  

}

}

 

***Groupsort.java

public class GroupSort {   

static String INPUT_PATH="hdfs://master:9000/input/f.txt";  

static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";  

 static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{   

MyGrouptest output_key=new MyGrouptest();  

 NullWritable output_value=NullWritable.get();  

 protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    

String[] tokens=value.toString().split(",",2);    

MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));    

context.write(output_key,output_value);  

 }

 }  

 static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{  

 LongWritable output_key=new LongWritable();   

LongWritable output_value=new LongWritable();    

protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{    

output_key.set(key.getFirstNum());    

output_value.set(key.getSecondNum());    

context.write(output_key,output_value);   

 }   

}    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);  

 Configuration conf=new Configuration();      

 Job job=Job.getInstance(conf);   

FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);   

job.setReducerClass(MyReduce.class);     

 job.setNumReduceTasks(1);  

 job.setPartitionerClass(LiuPartitioner.class);      

job.setMapOutputKeyClass(MyGrouptest.class);   

job.setMapOutputValueClass(NullWritable.class);      

job.setOutputKeyClass(LongWritable.class);   

job.setOutputValueClass(LongWritable.class);     

 job.waitForCompletion(true);

 }

}

相關文章
相關標籤/搜索