//hello文件中內容,文件已經上傳到hdfs中java
hello you apache
hello me網絡
public class WordCountApp {app
public static final String INPUT_PATH="hdfs://hadoop:9000/hello";框架
public static final String OUT_PATH="hdfs://hadoop:9000/out";ide
public static void main(String[] args) throws Exception {函數
//讀取配置文件信息
oop
Configuration configuration = new Configuration();spa
//建立job對象
orm
Job job = new Job(configuration,WordCountApp.class.getSimpleName());
//1.1讀取內容,解析成k v
//1.1從哪裏讀取數據
FileInputFormat.setInputPaths(job, INPUT_PATH);
//把輸入文件中的每一行解析爲鍵值對
//FileInputFormat是InputFormat的實現類,
//InputFormat負責處理MR的輸入部分
//做用1:驗證做業的輸入是否規範
//做用2:把輸入文件切分紅inputSplit
//做用3:提供RecordReader的實現類,把inputSplit讀到Mapper中進行處理
job.setInputFormatClass(TextInputFormat.class);
//1.2覆蓋map函數,實現本身的邏輯
job.setMapperClass(MyMapper.class);
//設置map輸出的格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//1.3分區
job.setPartitionerClass(HashPartitioner.class);
//設置分區數
job.setNumReduceTasks(1);
//1.4排序,分組
//規約
//2.1網絡拷貝到不一樣的reduce節點 是框架作的額,不須要手動干預
//2.2自定義reduce函數
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.3寫入到hdfs中
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//格式化
job.setOutputFormatClass(TextOutputFormat.class);
//提交給jobTracker執行
job.waitForCompletion(true);
}
/**
* KEYIN:業務表示每行的起始位置(單位是字節),又稱做偏移量,即k1
* VALUEIN:業務上表示每一行的文本內容 v1
* KEYOUT:業務上表示每一行的每一個單詞 k2
* VALUEOUT:表示每一行每一個單詞的出現次數 v2
* @author kaiwang
*
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
//覆蓋map函數
/**
* 解析每一行的文本, 解析爲一個個單詞,統計出現的次數
*/
protected void map(LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context)
throws java.io.IOException ,InterruptedException {
//獲取計數器 Conter,統計hello出現的次數
Counter counter = context.getCounter("Sensitive", "hello");
if(value.toString().contains("hello")){
//計數器增長
counter.increment(value.toString().split("hello").length-1);
}
//每一行包含的單詞數
String[] split = value.toString().split("\t");
for(String word : split){
//寫出到上下文中
context.write(new Text(word), new LongWritable(1));
}
};
}
/**
* KEYIN:業務上表示文本中不一樣的單詞 k2
* VALUEIN:業務上表示不一樣單詞,出現的value集合 v2
* KEYOUT:業務上表示文本中的不一樣單詞
* VALUEOUT:表示文本出現的總次數
* @author kaiwang
*
*/
static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
//覆蓋reduce函數
protected void reduce(Text k2,
java.lang.Iterable<LongWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context)
throws java.io.IOException ,InterruptedException {
Long sum = 0L;
for(LongWritable times : values){
sum += times.get();
}
context.write(k2, new LongWritable(sum));
};
}
}
------------輸出結果
hello 2
you 1
me 1
計數器:
hello=2