import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
/**
* MapReduceBase類:實現了Mapper和Reducer接口的基類(其中的方法只是實現接口,而未做任何事情)
* Mapper接口:
* WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。
* Reporter 則可用於報告整個應用的運行進度,本例中未使用。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable>
{
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口,
* 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲long,int,String 的替代品。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一個單個的輸入k/v對到一箇中間的k/v對
* 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。
* OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對。
* OutputCollector接口的collect(k, v)方法:增長一個(k,v)對到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
/**
* JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工做
* 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); //設置一個用戶定義的job名稱
conf.setOutputKeyClass(Text.class); //爲job的輸出數據設置Key類
conf.setOutputValueClass(IntWritable.class); //爲job輸出設置value類
conf.setMapperClass(Map.class); //爲job設置Mapper類
conf.setCombinerClass(Reduce.class); //爲job設置Combiner類
conf.setReducerClass(Reduce.class); //爲job設置Reduce類
conf.setInputFormat(TextInputFormat.class); //爲map-reduce任務設置InputFormat實現類
conf.setOutputFormat(TextOutputFormat.class); //爲map-reduce任務設置OutputFormat實現類
/**
* InputFormat描述map-reduce中對job的輸入定義
* setInputPaths():爲map-reduce job設置路徑數組做爲輸入列表
* setInputPath():爲map-reduce job設置路徑數組做爲輸出列表
*/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); //運行一個job
}
}