MapReduce實現的簡單單詞計數--------總結

//hello文件中內容,文件已經上傳到hdfs中java

hello you apache

hello me網絡



public class WordCountApp {app

public static final String INPUT_PATH="hdfs://hadoop:9000/hello";框架

public static final String OUT_PATH="hdfs://hadoop:9000/out";ide

public static void main(String[] args) throws Exception {函數

   //讀取配置文件信息
oop

   Configuration configuration = new Configuration();spa

   //建立job對象
orm

   Job job = new Job(configuration,WordCountApp.class.getSimpleName());

   //1.1讀取內容,解析成k v

   //1.1從哪裏讀取數據

   FileInputFormat.setInputPaths(job, INPUT_PATH);

   //把輸入文件中的每一行解析爲鍵值對

   //FileInputFormat是InputFormat的實現類,

   //InputFormat負責處理MR的輸入部分

   //做用1:驗證做業的輸入是否規範

   //做用2:把輸入文件切分紅inputSplit

   //做用3:提供RecordReader的實現類,把inputSplit讀到Mapper中進行處理

   job.setInputFormatClass(TextInputFormat.class);

   //1.2覆蓋map函數,實現本身的邏輯

   job.setMapperClass(MyMapper.class);

   //設置map輸出的格式

   job.setMapOutputKeyClass(Text.class);

   job.setMapOutputValueClass(LongWritable.class);

   //1.3分區

   job.setPartitionerClass(HashPartitioner.class);

   //設置分區數

   job.setNumReduceTasks(1);

   //1.4排序,分組

   //規約

   //2.1網絡拷貝到不一樣的reduce節點    是框架作的額,不須要手動干預

   //2.2自定義reduce函數

   job.setReducerClass(MyReduce.class);

   job.setOutputKeyClass(Text.class);

   job.setOutputValueClass(Text.class);

   //2.3寫入到hdfs中

   FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

   //格式化

   job.setOutputFormatClass(TextOutputFormat.class);

   //提交給jobTracker執行

   job.waitForCompletion(true);

}

/**

* KEYIN:業務表示每行的起始位置(單位是字節),又稱做偏移量,即k1

* VALUEIN:業務上表示每一行的文本內容      v1

* KEYOUT:業務上表示每一行的每一個單詞 k2

* VALUEOUT:表示每一行每一個單詞的出現次數 v2

* @author kaiwang

*

*/

static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

//覆蓋map函數

/**

* 解析每一行的文本, 解析爲一個個單詞,統計出現的次數

*/

protected void map(LongWritable key,

Text value,

org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context)

throws java.io.IOException ,InterruptedException {

   //獲取計數器 Conter,統計hello出現的次數

   Counter counter = context.getCounter("Sensitive", "hello");

   if(value.toString().contains("hello")){

       //計數器增長

       counter.increment(value.toString().split("hello").length-1);

   }

   //每一行包含的單詞數

   String[] split = value.toString().split("\t");

   for(String word : split){

       //寫出到上下文中

       context.write(new Text(word), new LongWritable(1));

       }

   };

}

/**

* KEYIN:業務上表示文本中不一樣的單詞 k2

* VALUEIN:業務上表示不一樣單詞,出現的value集合 v2

*  KEYOUT:業務上表示文本中的不一樣單詞

*  VALUEOUT:表示文本出現的總次數

* @author kaiwang

*

*/

static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

//覆蓋reduce函數

protected void reduce(Text k2,

java.lang.Iterable<LongWritable> values,

org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context)

throws java.io.IOException ,InterruptedException {

   Long sum = 0L;

   for(LongWritable times : values){

       sum += times.get();

   }

       context.write(k2, new LongWritable(sum));

       };

   }

}

------------輸出結果

hello 2

you 1

me 1


計數器:

hello=2

相關文章
相關標籤/搜索