***數據去重***算法
目標:原始數據中出現次數超過一次的數據在輸出文件中只出現一次。數據結構
算法思想:根據reduce的過程特性,會自動根據key來計算輸入的value集合,把數據做爲key輸出給reduce,不管這個數據出現多少次,reduce最終結果中key只能輸出一次。app
1.實例中每一個數據表明輸入文件中的一行內容,map階段採用Hadoop默認的做業輸入方式。將value設置爲key,並直接輸出。 map輸出數據的key爲數據,將value設置成空值
2.在MapReduce流程中,map的輸出<key,value>通過shuffle過程彙集成<key,value-list>後會交給reduce
3.reduce階段無論每一個key有多少個value,它直接將輸入的key複製爲輸出的key,並輸出(輸出中的value被設置成空)。oop
代碼實現:orm
public class testquchong { blog
static String INPUT_PATH="hdfs://master:9000/quchong"; //將文件file1和file2放在該目錄下排序
static String OUTPUT_PATH="hdfs://master:9000/quchong/qc"; 字符串
static class MyMapper extends Mapper<Object,Text,Text,Text>{ //將輸入輸出做爲string類型,對應Text類型get
private static Text line=new Text(); //每一行做爲一個數據 string
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
line=value;
context.write(line,new Text(",")); //key是惟一的,做爲數據,即實現去重
}
}
static class MyReduce extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
context.write(key,new Text(" ")); //map傳給reduce的數據已經作完數據去重,輸出便可
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.waitForCompletion(true);
}
}
***數據排序***
目標:實現多個文件中的數據進行從小到大的排序並輸出
算法思想:MapReduce過程當中就有排序,它的默認排序規則按照key值進行排序的,若是key爲封裝int的IntWritable類型,那麼MapReduce按照數字大小對key排序,若是key爲封裝爲String的Text類型,那麼MapReduce按照字典順序對字符串排序。
使用封裝int的IntWritable型數據結構。也就是在map中將讀入的數據轉化成IntWritable型,而後做爲key值輸出(value任意)。reduce拿到<key,value-list>以後,將輸入的key做爲value輸出,並根據value-list中元素的個數決定輸出的次數。輸出的key(即代碼中的linenum)是一個全局變量,它統計當前key的位次。
代碼實現:
public class paixu {
static String INPUT_PATH="hdfs://master:9000/test";
static String OUTPUT_PATH="hdfs://master:9000/output/sort";
static class MyMapper extends Mapper<Object,Object,IntWritable,NullWritable>{ //選擇爲Int類型,value值任意
IntWritable output_key=new IntWritable();
NullWritable output_value=NullWritable.get();
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
int val=Integer.parseUnsignedInt(value.toString().trim()); //進行數據類型轉換
output_key.set(val);
context.write(output_key,output_value); //key值肯定
}
}
static class MyReduce extends Reducer<IntWritable,NullWritable,IntWritable,IntWritable>{ //輸入是map的輸出,輸出行號和數據爲int
IntWritable output_key=new IntWritable();
int num=1;
protected void reduce(IntWritable key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{
output_key.set(num++); //循環賦值做爲行號
context.write(output_key,key); //key爲map傳入的數據
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(IntWritable.class); //由於map和reduce的輸出類型不同
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}