經過以前的操做, html
http://www.cnblogs.com/wenbronk/p/6636926.htmljava
http://www.cnblogs.com/wenbronk/p/6659481.htmlapache
hadoop-HA的集羣已經搭建完成了, 須要寫個小程序來認識下hadoop了小程序
統計文本文件中, 每一個單詞出現的次數數組
hadoop-2.5.1\share\hadoop\common 全部jar, hadoop-2.5.1\share\hadoop\common\lib 全部jar, hadoop-2.5.1\share\hadoop\hdfs 全部jar hadoop-2.5.1\share\hadoop\mapreduce 全部jar hadoop-2.5.1\share\hadoop\yarn 全部jar
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 測試mapreduce, 計算單詞出現的次數 * @author wenbronk * KEYIN: split的鍵, 行坐在的下標 * VALUEIN: split的值, 行值 * KEYOUT: 需求, 輸出給reduce * VALUEOUT: 需求, 輸出給reduce */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 重寫map方法, 循環調用 * 從split中讀取一行調用一次, 以行所在下標爲key, 行內容爲value */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // text 轉string, toString(), 使用空格分隔爲單詞數組 String[] words = StringUtils.split(value.toString(), ' '); for (String word : words) { // 鍵值對輸出, 輸出給reduce context.write(new Text(word), new IntWritable(1)); } } }
package com.wenbronk.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * shuffling 後傳給 reduce * @author wenbronk * KEYIN: mapper的輸出 * VALUEIN: mapper的輸出 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * 循環調用 * 每組調用一次, key相同, value可能多個, 使用迭代器 */ @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 對值進行累加 int sum = 0; // 使用迭代器 for (IntWritable value : arg1) { sum += value.get(); } // 使用context輸出 context.write(arg0 , new IntWritable(sum)); } }
package com.wenbronk.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 執行mapreduce * 統計單詞出新的次數 * @author wenbr * */ public class RunMapReduce { public static void main(String[] args) throws Exception { // 初始化時加載src或classpath下全部的hadoop配置文件 Configuration configuration = new Configuration(); // 獲得執行的任務 Job job = Job.getInstance(config); // 入口類 job.setJarByClass(RunMapReduce.class); // job名字 job.setJobName("wordCount"); // job執行是map執行的類 job.setMapperClass(WordCountMapper.class); // job執行的reduce類 job.setReducerClass(WordCountReduce.class); // job輸出的鍵類型 job.setMapOutputKeyClass(Text.class); // job輸出的value類型 job.setMapOutputValueClass(IntWritable.class); //**** 使用插件上傳data.txt到hdfs/root/usr/data.txt // 使用文件 FileInputFormat.addInputPath(job, new Path("/root/usr/")); // 使用一個不存在的目錄進行 Path path = new Path("/root/usr/output"); // 若是存在刪除 FileSystem fs = FileSystem.get(configuration); if (fs.exists(path)) { fs.delete(path, true); } // 輸出 FileOutputFormat.setOutputPath(job, path); boolean forCompletion = job.waitForCompletion(true); if (forCompletion) { System.out.println("success"); } } }
全部的類編寫好了, 接下來是上傳文件服務器
我是用的插件爲: app
這兒使用直接發佈到服務器運行的方式eclipse
eclipse打包項目成jar包(只須要源碼便可), 而後上傳到服務器目錄下, 使用hadoop命令執行
格式: hadoop jar jar路徑 類全限定名ide
hadoop jar wc.jar com.wenbronk.mapreduce.RunMapReduce
以後在hadoop的目錄下就能夠看到統計後輸出的文件了oop