mapreduce實例代碼

1.一個簡單的mapreduce代碼,單詞計數的一個實例代碼:java

 1 package com.cgh.test;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 
15 public class WordCount {
16     
17     public static void mian(String[] args) throws Exception {
18         //建立Configuration對象
19         Configuration configuration=new Configuration();
20         Job job=Job.getInstance(configuration);
21         //設置jar包所在路徑
22         job.setJarByClass(WordCount.class);
23         
24         //設置mapper類和reducer類
25         job.setMapperClass(WCMapper.class);
26         job.setReducerClass(WCReducer.class);
27         //指定maptask的輸出類型
28         job.setMapOutputKeyClass(Text.class);
29         job.setMapOutputValueClass(LongWritable.class);
30         //指定Reducetask的輸出類型
31         job.setOutputKeyClass(Text.class);
32         job.setOutputValueClass(LongWritable.class);
33         
34         //指定該mapReduce程序數據的輸入和輸出路徑
35         FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
36         FileOutputFormat.setOutputPath(job, new Path("wordcount/output"));
37         
38         
39         //最後提交任務
40         boolean waitForCompletion=job.waitForCompletion(true);
41         job.submit();
42     }
43     
44     private static class WCMapper extends Mapper<LongWritable, Text,Text, LongWritable>{
45         @Override
46         protected void map(LongWritable key, Text value,
47                 Mapper<LongWritable, Text, Text, LongWritable>.Context context)
48                 throws IOException, InterruptedException {
49             //對咱們的傳入的數據進行切分
50             String[] words=value.toString().split(" ");
51             for(String word:words){
52                 //對切分好的數據發送送給Reduce
53                 context.write(new Text(word), new LongWritable(1));
54             }
55         }
56     }
57     
58     private static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
59         @Override
60         protected void reduce(Text arg0, Iterable<LongWritable> arg1,
61                 Reducer<Text, LongWritable, Text, LongWritable>.Context arg2)
62                 throws IOException, InterruptedException {
63             
64             int sum=0;
65             //對傳入的數據進行計數,加和
66             for(LongWritable v:arg1){
67                 sum+=v.get();
68             }
69         //將最終的結果輸出到咱們的hdfs上
70             arg2.write(arg0, new LongWritable(sum));
71         }
72     }
73     
74 }

 

2.學習一個測試實例:apache

相關文章
相關標籤/搜索