【Hadoop】:手動實現WordCount案例

一.實現案例

實現WorldCount的流程以下:java

備註:其中輸入的數據是一個txt文件,裏面有各類單詞,每一行中用空格進行空行linux

 

一.Mapper的編寫

咱們在IDEA是使用「ctrl+alt+鼠標左鍵點擊」的方式來查看源碼,咱們首先查看mapper 類的源碼,同時源碼我已經使用了,以下所示:apache

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

//在任務開始以前,setup必然被調用一次
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
//在input split的時候,對每個key/value的pair都call once.大多數程序都會overide這個方法
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } //在at the end of the task,這個方法被調用一次 protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } //把整個程序,裏面的全部方法串連起來 public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) {//每次僅讀取一行數據 this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } }
//上下文,封裝了程序當中大量的分析方法
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }

所以咱們根據裏面的源碼,編寫wordcount所須要的mapper的代碼,以下所示:app

//如今咱們開始編寫wordcount的示例
public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//mapper後面的參數:
    // 1.輸入數據的key類型
    // 2.輸入數據的value類型
    // 3.輸出數據的key類型
    // 4.輸出數據的value的類型

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.首先獲取一行
        String line=value.toString();
        //2.將獲取後的單詞進行分割,按照空格進行分割
        String[] words=line.split(" ");
        //3.循環輸出(不是輸出到控制檯上面,是輸出到reducer裏進行處理)
       for(String word:words)
       {
           Text k=new Text();//定義咱們輸出的類型,確定是Text,和整個類extends的順序對應
           k.set(word);
           IntWritable v=new IntWritable();
           v.set(1);//將value設置爲1
           context.write(k,v);
       }
    }
}

 

二.Reducer的編寫

reducer的源碼以下,和mapper的源碼很是類似,其實也就是對reducer的方法進行了封裝,並無方法體:分佈式

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Reducer() {
    }

    protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

代碼以下:ide

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import javax.xml.soap.Text;
import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        super.reduce(key, values, context);
        //在reduce裏拿到的是mapper已經map好的數據
        //如今數據的形式是這樣的:
        //atguigu(key),1(value)
        //atguigu(key),1(value)

        int sum=0;
        //累計求和
        for(IntWritable value: values)
        {
            sum+=value.get();//將intwrite對象轉化爲int對象
        }
        IntWritable v=new IntWritable();
        v.set(sum);
        //2.寫出 atguigu 2
        context.write(key,v);

        //總結,這個程序看起來並無起到分開不一樣單詞,並對同一單詞的value進行相加的做用啊
        //惟一的功能則是統計僅有一個單詞的字符之和,這有啥用......
    }
}

三.Driver程序編寫,讓mapreduce動起來!

代碼以下:oop

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class wordcoundDriver {
    //將mapper和reducer進行啓動的類
    //driver是徹底格式固定的
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        //1.獲取Job對象
        Job job=Job.getInstance(conf);
        //2.設置jar儲存位置
        job.setJarByClass(wordcoundDriver.class);
        //3.關聯map和reduce類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4.設置mapper階段輸出數據的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5.設置最終數據輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6.設置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileInputFormat.setInputPaths(job,new Path(args[1]));
        //7.提交Job
        job.submit();
        job.waitForCompletion(true);
    }
}

這樣就能夠運行起來了!你們能夠嘗試在分佈式集羣上實現wordcount統計這個功能,只須要將這些代碼進行打成jar包,這樣就能夠放到linux操做系統上去運行了!最後運行的時候,路徑寫的是HDFS上的路徑哦!ui

相關文章
相關標籤/搜索