WordCount程序就是MapReduce的HelloWord程序。經過對WordCount程序分析,咱們能夠了解MapReduce程序的基本結構和執行過程。html
WordCount設計思路
WordCount程序很好的體現了MapReduce編程思想。
通常來講,文本做爲MapReduce的輸入,MapReduce會將文本進行切分處理並將行號做爲輸入鍵值對的鍵,文本內容做爲鍵值對的值,經map方法處理後,輸出中間結果爲<word,1>形式。MapReduce會默認按鍵值分發給reduce方法,在完成計數並輸出最後結果<word,count>
java
MapReduce運行方式分爲本地運行和服務端運行兩種。
本地運行多指本地Windows環境,方便開發調試。
而服務端運行,多用於實際生產環境。node
hadoop本地安裝apache
下載hadoop-2.7.3.tar.gz,解壓縮。好比解壓縮到D盤編程
hadoop根目錄就是D:\hadoop-2.7.3服務器
配置環境變量app
path環境配置框架
下載對應hadoop源代碼oop
hadoop-2.7.3-src.tar.gzspa
修改Hadoop源碼
注意,在Windows本地運行MapReduce程序時,須要修改Hadoop源碼。若是在Linux服務器運行,則不須要修改Hadoop源碼。
修改Hadoop源碼,其實就是簡單修改一下Hadoop的NativeIO類的源碼
下載對應hadoop源代碼,hadoop-2.7.3-src.tar.gz解壓,hadoop-2.7.3-src\hadoop-common-project\hadoop-common\src\main\java\org\apache\hadoop\io\nativeio下NativeIO.java 複製到對應的IDEA的project.
修改代碼
修改代碼
public static boolean access(String path, AccessRight desiredAccess) throws IOException { return true; //return access0(path, desiredAccess.accessRight()); }
若是不修改NativeIO類的源碼,在Windows本地運行MapReduce程序會產生異常:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609) at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977) at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187) at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174) at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:115) at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125) at org.apache.hadoop.mapred.LocalJobRunner$Job.<init>(LocalJobRunner.java:163) at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:731) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:240) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308) at cn.hadron.mr.RunJob.main(RunJob.java:33)
定義Mapper類
package com.hadron.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; //4個泛型參數:前兩個表示map的輸入鍵值對的key和value的類型,後兩個表示輸出鍵值對的key和value的類型 public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> { //該方法循環調用,從文件的split中讀取每行調用一次,把該行所在的下標爲key,該行的內容爲value protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String [] words = StringUtils.split(value.toString(),' '); for ( String w : words ) { context.write(new Text(w), new IntWritable(1)); } } }
代碼說明:
Mapper類用於讀取數據輸入並執行map方法,編寫Mapper類須要繼承org.apache.hadoop.mapreduce.Mapper類,而且根據相應問題實現map方法。
Mapper類的4個泛型參數:前兩個表示map的輸入鍵值對的key和value的類型,後兩個表示輸出鍵值對的key和value的類型
MapReduce計算框架會將鍵值對做爲參數傳遞給map方法。該方法有3個參數,第1個是Object類型(通常使用LongWritable類型)參數,表明行號,第2個是Object類型參數(通常使用Text類型),表明該行內容,第3個Context參數,表明上下文。
Context類全名是org.apache.hadoop.mapreduce.Mapper.Context,也就是說Context類是Mapper類的靜態內容類,在Mapper類中能夠直接使用Context類。
在map方法中使用StringUtils的split方法,按空格將輸入行內容分割成單詞,而後經過Context類的write方法將其做爲中間結果輸出。
定義Reducer類
package com.hadron.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /** * Map過程輸出<key,values>中key爲單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入, * 每組調用一次,這一組數據特色:key相同,value可能有多個。 * /因此reduce方法只要遍歷values並求和,便可獲得某個單詞的總次數。 */ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum=0; for ( IntWritable i: values ) { sum=sum+i.get(); } context.write(key,new IntWritable(sum)); } }
代碼說明:
Reducer類用於接收Mapper輸出的中間結果做爲Reducer類的輸入,並執行reduce方法。
Reducer類的4個泛型參數:前2個表明reduce方法輸入的鍵值對類型(對應map輸出類型),後2個表明reduce方法輸出鍵值對的類型
reduce方法參數:key是單個單詞,values是對應單詞的計數值所組成的列表,Context類型是org.apache.hadoop.mapreduce.Reducer.Context,是Reducer的上下文。
定義主方法(主類)
package com.hadron.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { //設置環境變量HADOOP_USER_NAME,其值是root System.setProperty("HADOOP_USER_NAME", "root"); //Configuration類包含了Hadoop的配置 Configuration conf =new Configuration(); //設置fs.defaultFS conf.set("fs.defaultFS", "hdfs://192.168.55.128:9000"); //設置yarn.resourcemanager節點 conf.set("yarn.resourcemanager.hostname", "node1"); try { FileSystem fs =FileSystem.get(conf); Job job =Job.getInstance(conf); job.setJarByClass(RunJob.class); job.setJobName("wc"); //設置Mapper類 job.setMapperClass(WordCountMapper.class); //設置Reduce類 job.setReducerClass(WordCountReducer.class); //設置reduce方法輸出key的類型 job.setOutputKeyClass(Text.class); //設置reduce方法輸出value的類型 job.setOutputValueClass(IntWritable.class); //指定輸入路徑 FileInputFormat.addInputPath(job, new Path("/user/root/input/")); //指定輸出路徑(會自動建立) Path outpath=new Path("/user/root/output"); //輸出路徑是MapReduce自動建立的,若是存在則須要先刪除 if (fs.exists(outpath)){ fs.delete(outpath,true); } FileOutputFormat.setOutputPath(job,outpath); boolean f=job.waitForCompletion(true); if (f) { System.out.println("job任務執行成功"); } } catch (Exception e) { e.printStackTrace(); } } }
hdfs 上 新建input目錄
[root@node1 ~]# hdfs dfs -mkdir /user/root/input
本地運行
執行結果: