MapReduce入門(品讀WordCount)

解讀WordCount

WordCount程序就是MapReduce的HelloWord程序。經過對WordCount程序分析,咱們能夠了解MapReduce程序的基本結構和執行過程。html

 WordCount設計思路

WordCount程序很好的體現了MapReduce編程思想。
通常來講,文本做爲MapReduce的輸入,MapReduce會將文本進行切分處理並將行號做爲輸入鍵值對的鍵,文本內容做爲鍵值對的值,經map方法處理後,輸出中間結果爲<word,1>形式。MapReduce會默認按鍵值分發給reduce方法,在完成計數並輸出最後結果<word,count>
java

 

MapReduce運行方式

MapReduce運行方式分爲本地運行和服務端運行兩種。
本地運行多指本地Windows環境,方便開發調試。
而服務端運行,多用於實際生產環境。node

 hadoop本地安裝apache

下載hadoop-2.7.3.tar.gz,解壓縮。好比解壓縮到D盤編程

hadoop根目錄就是D:\hadoop-2.7.3服務器

配置環境變量app

path環境配置框架

 

下載對應hadoop源代碼oop

hadoop-2.7.3-src.tar.gzspa

修改Hadoop源碼
注意,在Windows本地運行MapReduce程序時,須要修改Hadoop源碼。若是在Linux服務器運行,則不須要修改Hadoop源碼。

修改Hadoop源碼,其實就是簡單修改一下Hadoop的NativeIO類的源碼

下載對應hadoop源代碼,hadoop-2.7.3-src.tar.gz解壓,hadoop-2.7.3-src\hadoop-common-project\hadoop-common\src\main\java\org\apache\hadoop\io\nativeio下NativeIO.java 複製到對應的IDEA的project.
修改代碼

修改代碼

public static boolean access(String path, AccessRight desiredAccess)
        throws IOException {
       return true;
      //return access0(path, desiredAccess.accessRight());
    }

 若是不修改NativeIO類的源碼,在Windows本地運行MapReduce程序會產生異常:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609) at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977) at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187) at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174) at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:115) at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125) at org.apache.hadoop.mapred.LocalJobRunner$Job.<init>(LocalJobRunner.java:163) at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:731) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:240) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308) at cn.hadron.mr.RunJob.main(RunJob.java:33)

 定義Mapper類

package com.hadron.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

//4個泛型參數:前兩個表示map的輸入鍵值對的key和value的類型,後兩個表示輸出鍵值對的key和value的類型
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
//該方法循環調用,從文件的split中讀取每行調用一次,把該行所在的下標爲key,該行的內容爲value
     protected  void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
         String [] words = StringUtils.split(value.toString(),' ');
         for ( String  w :  words
              ) {
             context.write(new Text(w), new IntWritable(1));

         }

     }
}

 

代碼說明:

    Mapper類用於讀取數據輸入並執行map方法,編寫Mapper類須要繼承org.apache.hadoop.mapreduce.Mapper類,而且根據相應問題實現map方法。
    Mapper類的4個泛型參數:前兩個表示map的輸入鍵值對的key和value的類型,後兩個表示輸出鍵值對的key和value的類型
    MapReduce計算框架會將鍵值對做爲參數傳遞給map方法。該方法有3個參數,第1個是Object類型(通常使用LongWritable類型)參數,表明行號,第2個是Object類型參數(通常使用Text類型),表明該行內容,第3個Context參數,表明上下文。
    Context類全名是org.apache.hadoop.mapreduce.Mapper.Context,也就是說Context類是Mapper類的靜態內容類,在Mapper類中能夠直接使用Context類。
    在map方法中使用StringUtils的split方法,按空格將輸入行內容分割成單詞,而後經過Context類的write方法將其做爲中間結果輸出。

定義Reducer類

 

package com.hadron.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer  extends Reducer<Text,IntWritable,Text,IntWritable> {
    /**
     * Map過程輸出<key,values>中key爲單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,
     * 每組調用一次,這一組數據特色:key相同,value可能有多個。
     * /因此reduce方法只要遍歷values並求和,便可獲得某個單詞的總次數。
     */

    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        int sum=0;
        for ( IntWritable i: values
             ) {
            sum=sum+i.get();
            
        }
        context.write(key,new IntWritable(sum));
    }
}

 代碼說明:

    Reducer類用於接收Mapper輸出的中間結果做爲Reducer類的輸入,並執行reduce方法。
    Reducer類的4個泛型參數:前2個表明reduce方法輸入的鍵值對類型(對應map輸出類型),後2個表明reduce方法輸出鍵值對的類型
    reduce方法參數:key是單個單詞,values是對應單詞的計數值所組成的列表,Context類型是org.apache.hadoop.mapreduce.Reducer.Context,是Reducer的上下文。
定義主方法(主類)

 

package com.hadron.mr;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RunJob {
    public static void main(String[] args) {
        //設置環境變量HADOOP_USER_NAME,其值是root
        System.setProperty("HADOOP_USER_NAME", "root");
        //Configuration類包含了Hadoop的配置
        Configuration conf =new Configuration();
        //設置fs.defaultFS
        conf.set("fs.defaultFS", "hdfs://192.168.55.128:9000");
        //設置yarn.resourcemanager節點
        conf.set("yarn.resourcemanager.hostname", "node1");
        try {
            FileSystem fs =FileSystem.get(conf);
            Job job =Job.getInstance(conf);
            job.setJarByClass(RunJob.class);
            job.setJobName("wc");
            //設置Mapper類
            job.setMapperClass(WordCountMapper.class);
            //設置Reduce類
            job.setReducerClass(WordCountReducer.class);
            //設置reduce方法輸出key的類型
            job.setOutputKeyClass(Text.class);
            //設置reduce方法輸出value的類型
            job.setOutputValueClass(IntWritable.class);
            //指定輸入路徑

            FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
            //指定輸出路徑(會自動建立)
            Path outpath=new Path("/user/root/output");
            //輸出路徑是MapReduce自動建立的,若是存在則須要先刪除
            if (fs.exists(outpath)){
                fs.delete(outpath,true);
            }
            FileOutputFormat.setOutputPath(job,outpath);
            boolean f=job.waitForCompletion(true);
            if (f) {
                System.out.println("job任務執行成功");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

hdfs 上 新建input目錄

[root@node1 ~]# hdfs dfs -mkdir /user/root/input

 

 

 

 

本地運行

執行結果:

 

 

相關文章
相關標籤/搜索