MapReduce是Google的一項重要技術,它首先是一個編程模型,用以進行大數據量的計算。對於大數據量的計算,一般採用的處理手法就是並行計算。但對許多開發者來講,本身完徹底全實現一個並行計算程序難度太大,而MapReduce就是一種簡化並行計算的編程模型,它使得那些沒有多有多少並行計算經驗的開發人員也能夠開發並行應用程序。這也就是MapReduce的價值所在,經過簡化編程模型,下降了開發並行應用的入門門檻。html
Hadoop MapReduce是一個軟件框架,基於該框架可以容易地編寫應用程序,這些應用程序可以運行在由上千個商用機器組成的大集羣上,並以一種可靠的,具備容錯能力的方式並行地處理上TB級別的海量數據集。這個定義裏面有着這些關鍵詞,一是軟件框架,二是並行處理,三是可靠且容錯,四是大規模集羣,五是海量數據集。java
所以,對於MapReduce,能夠簡潔地認爲,它是一個軟件框架,海量數據是它的「菜」,它在大規模集羣上以一種可靠且容錯的方式並行地「烹飪這道菜」。程序員
簡單地講,MapReduce能夠作大數據處理。所謂大數據處理,即以價值爲導向,對大數據加工、挖掘和優化等各類處理。apache
MapReduce擅長處理大數據,它爲何具備這種能力呢?這可由MapReduce的設計思想發覺。MapReduce的思想就是「分而治之」。編程
(1)Mapper負責「分」,即把複雜的任務分解爲若干個「簡單的任務」來處理。「簡單的任務」包含三層含義:一是數據或計算的規模相對原任務要大大縮小;二是就近計算原則,即任務會分配到存放着所需數據的節點上進行計算;三是這些小任務能夠並行計算,彼此間幾乎沒有依賴關係。網絡
(2)Reducer負責對map階段的結果進行彙總。至於須要多少個Reducer,用戶能夠根據具體問題,經過在mapred-site.xml配置文件裏設置參數mapred.reduce.tasks的值,缺省值爲1。app
一個比較形象的語言解釋MapReduce: 框架
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.分佈式
咱們要數圖書館中的全部書。你數1號書架,我數2號書架。這就是「Map」。咱們人越多,數書就更快。ide
Now we get together and add our individual counts. That’s reduce.
如今咱們到一塊兒,把全部人的統計數加在一塊兒。這就是「Reduce」。
MapReduce的整個工做過程如上圖所示,它包含以下4個獨立的實體:
實體一:客戶端,用來提交MapReduce做業。
實體二:JobTracker,用來協調做業的運行。
實體三:TaskTracker,用來處理做業劃分後的任務。
實體四:HDFS,用來在其它實體間共享做業文件。
經過審閱MapReduce的工做流程圖,能夠看出MapReduce整個工做過程有序地包含以下工做環節:
在Hadoop中,一個MapReduce做業一般會把輸入的數據集切分爲若干獨立的數據塊,由Map任務以徹底並行的方式去處理它們。框架會對Map的輸出先進行排序,而後把結果輸入給Reduce任務。一般做業的輸入和輸出都會被存儲在文件系統中,整個框架負責任務的調度和監控,以及從新執行已經關閉的任務。
一般,MapReduce框架和分佈式文件系統是運行在一組相同的節點上,也就是說,計算節點和存儲節點一般都是在一塊兒的。這種配置容許框架在那些已經存好數據的節點上高效地調度任務,這可使得整個集羣的網絡帶寬被很是高效地利用。
(1)JobTracker
JobTracker負責調度構成一個做業的全部任務,這些任務分佈在不一樣的TaskTracker上(由上圖的JobTracker能夠看到2 assign map 和 3 assign reduce)。你能夠將其理解爲公司的項目經理,項目經理接受項目需求,並劃分具體的任務給下面的開發工程師。
(2)TaskTracker
TaskTracker負責執行由JobTracker指派的任務,這裏咱們就能夠將其理解爲開發工程師,完成項目經理安排的開發任務便可。
MapReduce框架運轉在<key,value>鍵值對上,也就是說,框架把做業的輸入當作是一組<key,value>鍵值對,一樣也產生一組<key,value>鍵值對做爲做業的輸出,這兩組鍵值對有多是不一樣的。
一個MapReduce做業的輸入和輸出類型以下圖所示:能夠看出在整個流程中,會有三組<key,value>鍵值對類型的存在。
這裏以WordCount單詞計數爲例,介紹map和reduce兩個階段須要進行哪些處理。單詞計數主要完成的功能是:統計一系列文本文件中每一個單詞出現的次數,如圖所示:
(1)map任務處理
(2)reduce任務處理
WordCount單詞計數是最簡單也是最能體現MapReduce思想的程序之一,該程序完整的代碼能夠在Hadoop安裝包的src/examples目錄下找到。
WordCount單詞計數主要完成的功能是:統計一系列文本文件中每一個單詞出現的次數;
首先在Linux中經過Vim編輯一個簡單的words.txt,其內容很簡單以下所示:
Hello Edison Chou
Hello Hadoop RPC
Hello Wncud Chou
Hello Hadoop MapReduce
Hello Dick Gu
經過Shell命令將其上傳到一個指定目錄中,這裏指定爲:/testdir/input
在Hadoop 中, map 函數位於內置類org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函數位於內置類org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。
咱們要作的就是覆蓋map 函數和reduce 函數,首先咱們來覆蓋map函數:繼承Mapper類並重寫map方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k1 表示每一行的起始位置(偏移量offset) * @param VALUEIN * →v1 表示每一行的文本內容 * @param KEYOUT * →k2 表示每一行中的每一個單詞 * @param VALUEOUT * →v2 表示每一行中的每一個單詞的出現次數,固定值爲1 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); } }; }
Mapper 類,有四個泛型,分別是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面兩個KEYIN、VALUEIN 指的是map 函數輸入的參數key、value 的類型;後面兩個KEYOUT、VALUEOUT 指的是map 函數輸出的key、value 的類型;
從代碼中能夠看出,在Mapper類和Reducer類中都使用了Hadoop自帶的基本數據類型,例如String對應Text,long對應LongWritable,int對應IntWritable。這是由於HDFS涉及到序列化的問題,Hadoop的基本數據類型都實現了一個Writable接口,而實現了這個接口的類型都支持序列化。
這裏的map函數中經過空格符號來分割文本內容,並對其進行記錄;
如今咱們來覆蓋reduce函數:繼承Reducer類並重寫reduce方法
/** * @author Edison Chou * @version 1.0 * @param KEYIN * →k2 表示每一行中的每一個單詞 * @param VALUEIN * →v2 表示每一行中的每一個單詞的出現次數,固定值爲1 * @param KEYOUT * →k3 表示每一行中的每一個單詞 * @param VALUEOUT * →v3 表示每一行中的每一個單詞的出現次數之和 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }; }
Reducer 類,也有四個泛型,同理,分別指的是reduce 函數輸入的key、value類型(這裏輸入的key、value類型一般和map的輸出key、value類型保持一致)和輸出的key、value 類型。
這裏的reduce函數主要是將傳入的<k2,v2>進行最後的合併統計,造成最後的統計結果。
(1)設定輸入目錄,固然也能夠做爲參數傳入
public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
(2)設定輸出目錄(輸出目錄須要是空目錄),固然也能夠做爲參數傳入
public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
(3)Main函數的主要代碼
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定對輸入數據進行格式化處理的類(能夠省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // 1.3:指定map輸出的<K,V>類型(若是<k3,v3>的類型與<k2,v2>的類型一致則能夠省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分區(能夠省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:設置要運行的Reducer的數量(能夠省略) job.setNumReduceTasks(1); // 1.6:指定自定義的Reducer類 job.setReducerClass(MyReducer.class); // 1.7:指定reduce輸出的<K,V>類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定對輸出數據進行格式化處理的類(能夠省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交做業 boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } }
在Main函數中,主要作了三件事:一是指定輸入、輸出目錄;二是指定自定義的Mapper類和Reducer類;三是提交做業;匆匆看下來,代碼有點多,但有些實際上是能夠省略的。
(4)完整代碼以下所示
package mapreduce; import java.io.FileInputStream; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyWordCountJob { /** * @author Edison Chou * @version 1.0 * @param KEYIN * →k1 表示每一行的起始位置(偏移量offset) * @param VALUEIN * →v1 表示每一行的文本內容 * @param KEYOUT * →k2 表示每一行中的每一個單詞 * @param VALUEOUT * →v2 表示每一行中的每一個單詞的出現次數,固定值爲1 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); } }; } /** * @author Edison Chou * @version 1.0 * @param KEYIN * →k2 表示每一行中的每一個單詞 * @param VALUEIN * →v2 表示每一行中的每一個單詞的出現次數,固定值爲1 * @param KEYOUT * →k3 表示每一行中的每一個單詞 * @param VALUEOUT * →v3 表示每一行中的每一個單詞的出現次數之和 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }; } // 輸入文件路徑 public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt"; // 輸出文件路徑 public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定對輸入數據進行格式化處理的類(能夠省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // 1.3:指定map輸出的<K,V>類型(若是<k3,v3>的類型與<k2,v2>的類型一致則能夠省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分區(能夠省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:設置要運行的Reducer的數量(能夠省略) job.setNumReduceTasks(1); // 1.6:指定自定義的Reducer類 job.setReducerClass(MyReducer.class); // 1.7:指定reduce輸出的<K,V>類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定對輸出數據進行格式化處理的類(能夠省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交做業 boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } } }
(1)調試查看控制檯狀態信息
(2)經過Shell命令查看統計結果
Hadoop有個ToolRunner類,它是個好東西,簡單好用。不管在《Hadoop權威指南》仍是Hadoop項目源碼自帶的example,都推薦使用ToolRunner。
下面咱們看下src/example目錄下WordCount.java文件,它的代碼結構是這樣的:
public class WordCount { // 略... public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // 略... Job job = new Job(conf, "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); } }
WordCount.java中使用到了GenericOptionsParser這個類,它的做用是將命令行中參數自動設置到變量conf中。舉個例子,好比我但願經過命令行設置reduce task數量,就這麼寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
上面這樣就能夠了,不須要將其硬編碼到java代碼中,很輕鬆就能夠將參數與代碼分離開。
至此,咱們尚未說到ToolRunner,上面的代碼咱們使用了GenericOptionsParser幫咱們解析命令行參數,編寫ToolRunner的程序員更懶,它將 GenericOptionsParser調用隱藏到自身run方法,被自動執行了,修改後的代碼變成了這樣:
public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }
看看這段代碼上有什麼不一樣:
(1)讓WordCount繼承Configured並實現Tool接口。
(2)重寫Tool接口的run方法,run方法不是static類型,這很好。
(3)在WordCount中咱們將經過getConf()獲取Configuration對象。
能夠看出,經過簡單的幾步,就能夠實現代碼與配置隔離、上傳文件到DistributeCache等功能。修改MapReduce參數不須要修改java代碼、打包、部署,提升工做效率。
public class MyJob extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... } }; } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... }; } // 輸入文件路徑 public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt"; // 輸出文件路徑 public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount"; @Override public int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "WordCount"); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
(1)王路情,《Hadoop之MapReduce》:http://blog.csdn.net/wangloveall/article/details/21407531
(2)Suddenly,《Hadoop日記之MapReduce》:http://www.cnblogs.com/sunddenly/p/3985386.html
(3)伯樂在線,《我是如何向老婆解釋MapReduce的》:http://blog.jobbole.com/1321/
(4)codingwu,《MapReduce原理與設計思想》:http://www.cnblogs.com/archimedes/p/mapreduce-principle.html
(5)codingwu,《MapReduce實例淺析》:http://www.cnblogs.com/archimedes/p/mapreduce-example-analysis.html
(6)挑燈看劍,《圖解MapReduce原理和執行過程》:http://blog.csdn.net/michael_kong_nju/article/details/23826979
(7)萬川梅、謝正蘭,《Hadoop應用開發實戰詳解(修訂版)》:http://item.jd.com/11508248.html
(8)張月,《Hadoop MapReduce開發最佳實踐》:http://www.infoq.com/cn/articles/MapReduce-Best-Practice-1