關於MR的工做原理不作過多敘述,本文將對MapReduce的實例WordCount(單詞計數程序)作實踐,從而理解MapReduce的工做機制。html
WordCount:java
1.應用場景,在大量文件中存儲了單詞,單詞之間用空格分隔git
2.相似場景:搜索引擎中,統計最流行的N個搜索詞,統計搜索詞頻率,幫助優化搜索詞提示。github
3.採用MapReduce執行過程如圖apache
3.1MapReduce將做業的整個運行過程分爲兩個階段服務器
3.1.1Map階段和Reduce階段app
Map階段由必定數量的Map Task組成maven
輸入數據格式解析:InputFormat分佈式
輸入數據處理:Mapper函數
數據分組:Partitioner
3.1.2Reduce階段由必定數量的Reduce Task組成
數據遠程拷貝
數據按照key排序
數據處理:Reducer
數據輸出格式:OutputFormat
4.介紹代碼結構
4.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hadoop</groupId> <artifactId>hadoop.mapreduce</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.7.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.3</version> <configuration> <classifier>dist</classifier> <appendAssemblyId>true</appendAssemblyId> <descriptorRefs> <descriptor>jar-with-dependencies</descriptor> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
4.2 WordCount.java
package hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; public class WordCount { public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable> { public void map(Object key,Text value, Context context) throws IOException, InterruptedException { //在此處寫map代碼 String[] lines = value.toString().split(" "); for (String word : lines) { context.write(new Text(word), new IntWritable(1)); } } } public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //在此處寫reduce代碼 int count=0; for (IntWritable cn : values) { count=count+cn.get(); } context.write(key, new IntWritable(count)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); //設置輸入路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); //設置輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置實現map函數的類 job.setMapperClass(WordCountMap.class); //設置實現reduce函數的類 job.setReducerClass(WordCountReducer.class); //設置map階段產生的key和value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置reduce階段產生的key和value的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //提交job job.waitForCompletion(true); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
4.3 data目錄下文件內容:
to.txt
hadoop spark hive hbase hive
t1.txt
hive spark mapReduce spark
t2.txt
sqoop spark hadoop
5. 數據準備
5.1 maven 打jar包爲hadoop.mapreduce-1.0-SNAPSHOT.jar,傳入master服務器上
5.2 將須要計算的數據文件放入datajar/in (臨時目錄無所謂在哪裏)
5.3 啓動hadoop ,關於hadoop安裝可參考我寫的文章 大數據系列之Hadoop分佈式集羣部署
將datajar/in文件傳至hdfs 上
hadoop fs -put in /in
#查看文件
hadoop fs -ls -R /in
5.4 執行jar
兩種命令方式
#第一種:hadoop jar hadoop jar hadoop.mapreduce-1.0-SNAPSHOT.jar hadoop.mapreduce.WordCount /in/* /out #OR #第二種:yarn jar yarn jar hadoop.mapreduce-1.0-SNAPSHOT.jar hadoop.mapreduce.WordCount /in/* /yarnOut
5.5.執行後輸出內容分別如圖
hadoop jar ...結果
yarn jar ... 結果
6.查看結果內容
#查看hadoop ja 執行後輸出結果目錄 hadoop fs -ls -R /out #查看yarn jar 執行後輸出結果目錄 hadoop fs -ls -R /yarnOut
目錄說明:目錄中_SUCCESS 是日誌文件,part-r-00000是計算結果文件
查看計算結果
#查看out/part-r-00000文件 hadoop fs -text /out/part-r-00000 #查看yarnOut/part-r-00000文件 hadoop fs -text /yarnOut/part-r-00000
完~~~,Java代碼內容已上傳至GitHub https://github.com/fzmeng/MapReduceDemo