13-hadoop-入門程序

經過以前的操做, html

http://www.cnblogs.com/wenbronk/p/6636926.htmljava

http://www.cnblogs.com/wenbronk/p/6659481.htmlapache

hadoop-HA的集羣已經搭建完成了, 須要寫個小程序來認識下hadoop了小程序

統計文本文件中, 每一個單詞出現的次數數組

1, Eclipse下新建Java-project

2, 新建lib文件, 導入jar包, 並buildpath

hadoop-2.5.1\share\hadoop\common  全部jar,
hadoop-2.5.1\share\hadoop\common\lib  全部jar,

hadoop-2.5.1\share\hadoop\hdfs  全部jar
hadoop-2.5.1\share\hadoop\mapreduce  全部jar
hadoop-2.5.1\share\hadoop\yarn  全部jar

3, Mapper類: WordCountMapper.java

package com.wenbronk.mapreduce;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * 測試mapreduce, 計算單詞出現的次數
 * @author wenbronk
 * KEYIN: split的鍵, 行坐在的下標
 * VALUEIN: split的值, 行值
 * KEYOUT: 需求, 輸出給reduce
 * VALUEOUT: 需求, 輸出給reduce
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 重寫map方法, 循環調用
     * 從split中讀取一行調用一次, 以行所在下標爲key, 行內容爲value
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        // text 轉string, toString(), 使用空格分隔爲單詞數組
        String[] words = StringUtils.split(value.toString(), ' ');
        for (String word : words) {
            // 鍵值對輸出, 輸出給reduce
            context.write(new Text(word), new IntWritable(1));
        }
        
    }
    
}

4, Reduce類, WordCountReduce.java

package com.wenbronk.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * shuffling 後傳給 reduce
 * @author wenbronk
 * KEYIN: mapper的輸出
 * VALUEIN: mapper的輸出
 */
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{

    /**
     * 循環調用
     * 每組調用一次, key相同, value可能多個, 使用迭代器
     */
    @Override
    protected void reduce(Text arg0, Iterable<IntWritable> arg1,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 對值進行累加
        int sum = 0;
        // 使用迭代器
        for (IntWritable value : arg1) {
            sum += value.get();
        }
        // 使用context輸出
        context.write(arg0 , new IntWritable(sum));
    }
    
}

5, 而後是具體的執行類: RunMapReduce.java

package com.wenbronk.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 執行mapreduce
 * 統計單詞出新的次數
 * @author wenbr
 *
 */
public class RunMapReduce {

    public static void main(String[] args) throws Exception {
        // 初始化時加載src或classpath下全部的hadoop配置文件
        Configuration configuration = new Configuration();
        
        // 獲得執行的任務
        Job job = Job.getInstance(config);
        
        // 入口類
        job.setJarByClass(RunMapReduce.class);
        
        // job名字
        job.setJobName("wordCount");
        
        // job執行是map執行的類
        job.setMapperClass(WordCountMapper.class);
        
        // job執行的reduce類
        job.setReducerClass(WordCountReduce.class);
        
        // job輸出的鍵類型
        job.setMapOutputKeyClass(Text.class);
        
        // job輸出的value類型
        job.setMapOutputValueClass(IntWritable.class);
        
        //**** 使用插件上傳data.txt到hdfs/root/usr/data.txt
        
        // 使用文件
        FileInputFormat.addInputPath(job, new Path("/root/usr/"));
        
        // 使用一個不存在的目錄進行
        Path path = new Path("/root/usr/output");
        // 若是存在刪除
        FileSystem fs = FileSystem.get(configuration);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        
        // 輸出
        FileOutputFormat.setOutputPath(job, path);
        
        boolean forCompletion = job.waitForCompletion(true);
        
        if (forCompletion) {
            System.out.println("success");
        }
    }
    
}

全部的類編寫好了, 接下來是上傳文件服務器

6, 使用eclipse插件上傳data.txt到hadoop目錄 /usr/data.txt

我是用的插件爲: app

 

7, 運行

這兒使用直接發佈到服務器運行的方式eclipse

eclipse打包項目成jar包(只須要源碼便可), 而後上傳到服務器目錄下, 使用hadoop命令執行
格式: hadoop jar jar路徑 類全限定名ide

hadoop jar wc.jar com.wenbronk.mapreduce.RunMapReduce

 以後在hadoop的目錄下就能夠看到統計後輸出的文件了oop

相關文章
相關標籤/搜索