mapreduce學習筆記

本文是對mapreduce技術的一個初步學習的總結,包括以下章節的內容:css

  • 概述
  • 發展史
  • 基本概念
  • 程序編寫
  • 運行測試
  • 其它案例

參考資料:java

一、本文介紹的內容依賴hadoop環境,關於hadoop運行環境的搭建可參見《Hadoop運行環境搭建》程序員

二、mapreduce的編程模型設計受到了函數式編程中的的map和reduce原語的啓發,爲了有助於更好的理解mapreduce的編程模型,可先閱讀《函數式編程之集合操做》sql

1、概述

大數據的應用,有兩個最核心的任務要處理,一是海量數據的存儲,二是對海量數據的分析和處理。hadoop分別提供了分佈式文件系統hdfs和分佈式計算框架mapreduce來解決。其中mapreduce是來解決海量數據的分析和處理的。數據庫

可是咱們在實際的開發中,不多會去編寫mapreduce代碼來進行大數據的分析處理。咱們更多的是聽到的是諸如hive, spark,storm這些技術,更多的大數據學習者也是學習這些技術。這是爲何呢?apache

這主要是有兩個緣由,一是mapreduce是一種適合離線數據分析的技術,其效率上比較低,不能知足一些低時延需求的數據分析業務,低時延的數據處理每每採用諸如spark,storm這些技術。編程

其次,使用mapreduce須要根據業務場景來設計map和reduce的處理邏輯,編寫java代碼,並提交到集羣上執行,這屬於比較底層的操做,對程序員的要求較高。而hive等技術借鑑了關係數據庫的特色,提供你們很熟悉的類sql機制,可讓程序員以較低門檻的方式來處理大數據。swift

那爲何咱們還要來學習mapreduce呢?首先它是大數據處理的最先解決方案或者說是鼻祖,並且是hive等技術的基礎(Hive是將類sql語句最終轉換成mapreduce程序來處理),學習它,有助於加深對hive等技術的使用。其次數據處理的思路是相同的,瞭解mapreduce的機制和原理,對熟悉其它大數據分析處理技術(如spark,storm,impala等)也是有幫助的。最後,雖然如今直接編寫mapreduce程序不多了,但在某些應用場景下,編寫mapreduce程序就是很好的解決方案。綜上所說,做爲一個大數據技術的學習者,是很是有必要來學習mapreduce技術的。api

2、發展史

mapreduce是跟隨hadoop一塊兒推出的,分爲第一代(稱爲 MapReduce 1.0或者MRv1,對應hadoop第1代)和第二代(稱爲MapReduce 2.0或者MRv2,對應hadoop第2代)。app

第一代MapReduce計算框架,它由兩部分組成:編程模型(programming model)和運行時環境(runtime environment)。它的基本編程模型是將問題抽象成Map和Reduce兩個階段,其中Map階段將輸入數據解析成key/value,迭代調用map()函數處理後,再以key/value的形式輸出到本地目錄,而Reduce階段則將key相同的value進行規約處理,並輸出最終結果。它的運行時環境由兩類服務組成:JobTracker和TaskTracker,其中,JobTracker負責資源管理和全部做業的控制,而TaskTracker負責接收來自JobTracker的命令並執行它。

MapReduce 2.0或者MRv2具備與MRv1相同的編程模型,惟一不一樣的是運行時環境。MRv2是在MRv1基礎上經加工以後,運行於資源管理框架YARN之上的MRv1,它再也不由JobTracker和TaskTracker組成,而是變爲一個做業控制進程ApplicationMaster,且ApplicationMaster僅負責一個做業的管理,至於資源的管理,則由YARN完成。

總結下,MRv1是一個獨立的離線計算框架,而MRv2則是運行於YARN之上的MRv1。

做爲MapReduce 程序的開發人員,尤爲是初學者,咱們在瞭解其原理的基礎上,重要的是學會如何使用框架提供的api去編寫代碼。MapReduce 的api分爲新舊兩套,新舊api位於不一樣的java包中。其中舊的api位於org.apache.hadoop.mapred包(子包)中,新的api位於org.apache.hadoop.mapreduce包(子包)中。本文介紹的例子使用的都是新的api。

3、基本概念

咱們編寫mapreduce程序(後面用mr來簡稱mapreduce)是用來進行數據處理的,每次數據的處理咱們稱爲一個mr做業(或任務)。一個mr任務的處理過程分爲兩個階段:map階段 和 reduce階段。

每一個階段都以鍵值對(key-value對)做爲輸入和輸出,其數據類型是由程序員來選擇的,即在代碼中設置的。其程序執行的基本的過程以下:

一、mr框架讀取待處理的數據(通常來自HDFS文件),生成map階段全部的key-value數據集合,交由map階段處理。

二、map階段處理上面的key-value數據,生成新的key-value數據集合。map階段的處理的核心就是由框架調用一個程序員編寫的map函數來處理。每一個輸入的鍵值對都會調用一次map函數來處理,map函數的輸出結果也是key-value鍵值對。

三、框架對全部map階段的輸出數據進行排序和分組(這過程稱爲shuffle),生成新的key-value數據集合,交由reduce處理。

四、reduce階段會對數據進行操做,最後也是生成key-value數據,這也是mr任務最終的輸出結果。reduce階段的處理的核心就是框架調用一個程序員編寫的reduce函數來處理。每一個輸入的鍵值對都會調用一次reduce函數來處理,reduce函數的輸出結果也是key-value鍵值對,就是最終的結果。

咱們下面經過一個具體的例子來進一步理解mr的運行機制。該例子是,從文件中統計單詞重複出現的次數。假設輸入的文件中的內容以下:

mary jack
this is jack he is mary 

對於上述待處理文件,mr框架會讀取文件中的內容,生成給map處理的key-value集合,生成的數據內容以下(注意下面只是示意,不是實際的存儲格式):

(1, mary jack) (10,this is jack) (22,he is mary) 

對於文本文件,在默認狀況下,mr框架生成的key-value的key是每行的首字符在文件中的位置,value是每行的文本,如上面的數據。

對於上面的每對key-value數據,會交給map處理,本例子是爲了獲取單詞重複的次數,首先須要將單詞區分出來,顯然,map階段能夠用來幹這個事,這樣咱們map階段能夠有這樣的輸出(咱們這裏先直接給出結果,後面會有具體的代碼):

(mary,1) (jack,1) (this,1) (is,1) (jack,1) (he,1) (is,1) (mary,1) 

也就是map階段輸出的key-value對是每一個單詞,其中key是單詞自己,value是固定值爲1。

map處理後,框架會對Map輸出的Key-value數據基於key來對數據進行排序和分組(這過程稱爲shuffle),結果數據以下:

(mary,[1,1]) (jack,[1,1]) (this,[1]) (is,[1,1]) (he,[1]) 

能夠看出,shuffle操做的結果是,將map輸出數據中相同Key的value進行合併成列表,生成新的key-value數據,傳給reduce。

這樣,reduce要作的事情就很簡單了,就是將每對key-value數據中的value中的各個元素的值彙總便可。輸出結果如:

he  1 is 2 jack 2 mary 2 this 1 

上面就是整個Mr程序最終的輸出結果。

Mr程序是用來計算海量數據的,提交一次Mr任務到集羣上,通常會由多個map來同時處理(每一個map位於一個節點上)。

框架會將待處理的數據分紅1個或多個「輸入分片(split)」,每一個map只處理一個分片,每一個分片被劃分爲若干條記錄,每條記錄就是一個鍵/值對,map就是一個接一個的處理這些鍵/值對,也就是說,對於每一個鍵/值對,Map函數都會被執行一次,這個分片中有多少個鍵/值對,該map類中的map函數就會被調用多少次。

reduce任務的數量不是由輸入數據的大小決定的,而是由程序員在代碼中指定的,默認是1。若是是1,則map全部的輸出數據都由該reduce節點來處理。若是是多個,則由框架將全部map的輸出數據依據必定規則分爲各個部分交給各個reduce分別處理。

下圖是一個mr任務的數據流程圖,能夠比較清晰的展現上面描述的過程。


 
 

(摘自Hadoop權威指南一書)

4、程序編寫

編寫一個簡單的mr程序,通常至少須要編寫3個類,分別是:

一、Mapper類的一個繼承類,用於實現map函數;

二、Reducer類的實現類,用於實現reduce函數;

三、程序入口類(帶main方法的),用於編寫mr做業運行的一些代碼。

下面咱們以上一節提到的統計單詞重複次數的例子來介紹如何編寫Mr程序代碼。hadoop版本中也自帶了這個例子的代碼,具體位置位於hadoop安裝目錄下的

share\hadoop\mapreduce\sources\hadoop-mapreduce-examples-2.7.6-sources.jar文件中。

第一,首先編寫Mapper類的繼承類,重寫map函數,類的完整代碼以下:

package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable intValue = new IntWritable(1); private Text keyword = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(" "); for (String item : values) { if(item.isEmpty()){ continue; } keyword.set(item); context.write(keyword, intValue); } } } 

Mapper類是一個泛型類型,它有4個形參類型,須要程序員來指定,這4個類型按順序分別是 輸入給map函數的鍵值對數據的key的類型和value的類型,以及map輸出鍵值對數據的key的類型和value的類型。由於對於單詞統計這個例子,map輸入的key爲數值(對應mr中的類型爲LongWritable,相似java中的long類型),value爲字符串(對應mr中的類型爲Text,相似java中的String類型),map輸出的key類型是字符串,value類型是數值。

Mapper類的map方法(函數)有3個參數,前2個參數對應map輸入鍵值對數據的key的類型和value的類型,即和Mapper類的前兩個泛型參數一致。第3個參數是Context 類型,用於寫入map函數處理後要輸出的結果。

map方法的處理邏輯很簡單,輸入的key不關心,把輸入的value(即每行數據)進行字符串split操做得到字符串中的各個單詞,而後經過Context 類的write方法將結果輸出。

第二,而後編寫Reducer類的繼承類,重寫reduce函數,類的完整代碼以下:

package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable wordNum = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : value) { sum += val.get(); } wordNum.set(sum); context.write(key, wordNum); } } 

Reducer類也是一個泛型類,同Mapper類相似,也有4個參數類型,用於指定輸入和輸出類型。須要注意的是,Reducer類的輸入類型必須匹配Mapper類的輸出類型,這個很好理解,由於Map的輸出就是reduce的輸入。

reduce方法(函數)也有3個參數,第1個參數是輸入鍵值對的鍵,第2個參數是一個迭代器,對應map輸出後由框架進行shuffle操做後的值的集合,第3個參數Contex用於寫輸出結果的。

reduce方法的邏輯也比較簡單,由於咱們要統計單詞的重複個數,因此就對第2個參數進行遍歷,算出總數便可。而後按照key-value的方式經過Context參數輸出。

第三,有了map和reduce代碼,還須要編寫一個java入口類,用於完成Mr任務的相關設置,完整代碼以下:

package com.mrexample.wordcount; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); //指定本job所在的jar包 job.setJarByClass(CountMain.class); //設置本job所用的mapper邏輯類和reducer邏輯類 job.setMapperClass(CountMap.class); job.setReducerClass(CountReduce.class); //設置最終輸出的kv數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置輸入和輸出文件的路徑 FileInputFormat.setInputPaths(job, new Path("實際輸入路徑或文件")); FileOutputFormat.setOutputPath(job, new Path("實際輸出路徑")); //提交job給hadoop集羣,等待做業完成後main方法纔會結束 job.waitForCompletion(true); } } 

上面建立的類是一個普通的帶main方法的java類,在main方法中,對本Mr任務進行相關設置。上面代碼中的設置是最小設置,不少設置採用的是默認值。下面對上面代碼進行一一的解釋。

首先經過Job.getInstance()建立一個Job對象,該對象用於進行做業信息的設置,用於控制整個做業的運行。

要想做業在集羣中運行,須要把代碼打包成一個Jar包文件(mr框架會在整個集羣上發佈整個Jar文件),咱們須要在代碼中經過setJarByClass方法傳遞一個類,這樣mr框架就能根據這個類來查找到相關的jar文件。

而後調用setMapperClass和setReducerClass方法指定本做業執行所須要的Mapper類和Reducer類。

還須要調用setOutputKeyClass和setOutputValueClass指定做業最終(即reduce操做)輸出的key-value鍵值對的數據類型,這個要與Reducer實現類代碼中指定的Reducer類中的泛型參數保持一致。須要注意的是,若是map操做的輸出類型與最終的輸出類型不一致,則須要顯示的單獨設置map的輸出類型,可調用Job類的setMapOutputKeyClass和setMapOutputValueClass方法進行設置,由於咱們這個例子中map的輸出類型和reduce的輸出類型一致,因此不用單獨再設置map的輸出類型。

在咱們這個例子,任務的輸入來自文件,輸出也是寫入文件。因此須要設置輸入路徑和輸出路徑,輸入路徑經過調用FileInputFormat類的靜態方法setInputPaths來設置,能夠是一個文件名,也能夠是一個目錄,若是是一個目錄,則該目錄下的全部文件都會被做爲輸入文件處理;輸出路徑經過調用FileOutputFormat類的靜態方法setOutputPath來設置,須要注意的是,mr框架要求在做業運行前該輸出目錄是不存在的,若是存在,程序會報錯。

最後調用waitForCompletion方法來提交做業,參數傳入true表示等待做業完成方法才返回,這樣整個做業完成後main方法纔會結束。

5、運行測試

寫好mr程序後,正常狀況下咱們是要把代碼打成jar包,而後提交到hadoop集羣環境下去運行。但若是咱們每次都在集羣環境下去驗證代碼的正確性,就比較複雜,一來集羣環境準備比較麻煩,二來執行比較耗時,三來調試、查找問題比較麻煩。所以,咱們最好先能在本地進行驗證,先保證代碼邏輯是正確的。

好在mr程序能夠在本地執行,咱們能夠在本地準備一個小型數據進行測試,以驗證代碼是否有問題。當確保沒有代碼的問題後,咱們再拿到集羣上去驗證性能等問題。

要想mr程序在本地運行,咱們須要設置Mr程序不使用hdfs文件系統上的文件(而使用本地文件),同時不使用yarn進行資源調度。咱們須要在代碼裏進行參數的設置,如:

Configuration conf= **new** Configuration(); conf.set("fs.defaultFS","file:///"); conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf); 

上面的代碼是前面例子中的代碼,"fs.defaultFS"參數表明使用哪一個文件系統,這裏設置值爲"file:///"表示使用本地文件系統;"mapreduce.framework.name"參數表明執行Mr程序的方式,這裏設置值爲"local"表示使用本地的方式,不使用yarn。若是不在代碼中進行設置,這些參數的值是取的當前環境下的hadoop配置文件中設置的值,具體可參考《Hadoop運行環境搭建》中的配置文件設置介紹。這樣咱們須要建立一個Configuration 對象,進行相關參數設置後,並傳給建立Job對象的getInstance靜態方法。

另外須要注意的是,須要把前面章節中例子代碼中的輸入、輸出路徑改成實際的本地路徑。

通常狀況下,咱們會在IDE工具中(如eclipse,intellij)中進行代碼的開發,爲了編譯經過,須要引入所依賴的相應的jar包,這有兩種方式,一是利用maven的pom文件自動引入,二是直接在IDE中顯示的設置。要想編譯沒問題,只須要引入hadoop-common-2.7.6.jar(位於安裝目錄的share\hadoop\common目錄下)和hadoop-mapreduce-client-core-2.7.6.jar(位於安裝目錄的share\hadoop\mapreduce目錄下)。

上面引入的兩個Jar包只能讓編譯經過。但若是要執行mr程序,須要依賴更多Jar包。最簡單的運行方式是,將Mr程序編譯後的class打成jar包,而後利用hadoop jar命令來執行,該命令會自動引入執行Mr程序須要的jar包。

假設上面的wordcount例子的代碼已經打成Jar包,jar包名爲wordcount.jar。進入命令行界面,當前目錄爲wordcount.jar所在的目錄,而後執行:

hadoop jar wordcount.jar com.mrexample.wordcount.CountMain

會有不少信息在控制檯上輸出,若是執行成功,咱們打開上面代碼中設置的輸出目錄,會發現生成了不少文件。其中結果位於一個或多個part-r-xxxxx文件中,其中xxxxx是一串數字編號,從00000開始。打開part-r-xxxxx文件,能夠檢查輸出結果是否與預期一致,從而判斷代碼邏輯是否正確。

6、其它案例

經過上面的統計單詞重複次數的例子,咱們能夠看出,編寫mr程序的關鍵是根據需求,依照mr模型的要求,設計出相應的map函數和reduce函數。咱們再來看一個更簡單的例子,加深下理解。

假設有不少文本文件,文件中的各行文本數據有重複的,咱們須要將這些文件中重複的行(包括不一樣文件中的重複行)去除掉。這個若是採用mr來實現,就很是簡單了。

首先咱們考慮map函數怎麼寫?由於對於文本文件,mr框架默認處理後傳給map的key-value鍵值對的key是行首字母在文件中的位置,value是該行的文本。因此咱們的map只需將行文本做爲Key輸出,對應的vaule沒有做用,能夠是一個空串。代碼如:

public class RemoveMap extends Mapper<LongWritable, Text, Text, Text> { private Text tag = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, tag); } } 

這樣map輸出的key-value數據通過mr框架shuffle操做後,輸出的數據的key就是不重複的行數據了(即沒有重複的行了)。這樣咱們的reduce函數只需將傳入的weikey輸出便可,代碼如:

public class RemoveReduce extends Reducer<Text, Text, Text, Text>{ private Text tag = new Text(); protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { context.write(key, tag); } } 

能夠看出,mr程序實際上就是將輸入轉爲key-value格式的數據流,分別通過map函數和reduce函數處理後,最後輸出key-value格式的數據。這點與函數式編程的中的高階函數map和reduce的概念很是相似,map是將一個數據集合轉換爲另外一個數據集合,reduce是對一個數據集合進行聚合等相應的操做。

前面的例子,mr處理的數據來自文本文件,最後生成的結果也到文本文件中。這時是Mr框架採用默認的方式來讀取數據和寫入數據的。在實際的場景中,咱們的數據可能不是來自於文件,輸出也不必定寫入文件中。或者即便是文件,也多是二進制的,不是文本文件。

其實MR能夠處理不少不一樣類型的數據格式。這個咱們在後續的文章中再介紹。

相關文章
相關標籤/搜索