大數據開發 | MapReduce介紹

1.  MapReduce 介紹

1.1MapReduce的做用

假設有一個計算文件中單詞個數的需求,文件比較多也比較大,在單擊運行的時候機器的內存受限,磁盤受限,運算能力受限,而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度,所以這個工做可能完成不了。針對以上這個案例,MapReduce在這裏能起到什麼做用呢,引入MapReduce框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理。編程

可見在程序由單機版擴成分佈式時,會引入大量的複雜工做。爲了提升開發效率,能夠將分佈式程序中的公共功能封裝成框架,讓開發人員能夠將精力集中於業務邏輯。而MapReduce就是這樣一個分佈式程序的通用框架。數組

 

1.2MapReduce架構圖

 

MapReduce 也採用了 Master/SlaveM/S)架構。它主要由如下幾個組件組成 :ClientJobTrackerTaskTracker Task。下面分別對這幾個組件進行介紹。 服務器

1Client架構

 用戶編寫的MapReduce程序經過Client提交到JobTracker同時用戶可經過Client提供的一些接口查看做業運行狀態。在Hadoop內部用做業Job)表示MapReduce程序。一個 MapReduce程序可對應若干個做業,而每一個做業會被分解成若干個Map/Reduce任務(Task)。併發

2JobTrackerapp

JobTracker 主要負責資源監控和做業調度。JobTracker 監控全部 TaskTracker 與做業Job的健康情況,一旦發現失敗狀況後,其會將相應的任務轉移到其餘節點;同時,JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閒時,選擇合適的任務使用這些資源。在Hadoop 中,任務調度器是一個可插拔的模塊,用戶能夠根據本身的須要設計相應的調度器。框架

3TaskTracker分佈式

   TaskTracker會週期性地經過Heartbeat將本節點上資源的使用狀況和任務的運行進度彙報給JobTracker,同時接收JobTracker發送過來的命令並執行相應的操做(如啓動新任務、殺死 任務等)。TaskTracker 使用「slot」等量劃分本節點上的資源量。 「slot」表明計算資源(CPU、 內存等)。一個 Task 獲取到一個slot 後纔有機會運行,而Hadoop調度器的做用就是將各個TaskTracker上的空閒slot分配給Task使用。slot分爲Map slotReduce slot 兩種,分別供Map TaskReduce Task使用。TaskTracker經過slot數目(可配置參數)限定Task的併發度。ide

4Task函數

Task 分爲 Map Task Reduce Task 兩種,均由TaskTracker啓動。從上一小節中咱們知道,HDFS以固定大小的block 爲基本單位存儲數據,而對於MapReduce 而言,其處理單位是splitsplit block 的對應關係以下圖所示。split 是一個邏輯概念,它只包含一些元數據信息,好比 數據起始位置、數據長度、數據所在節點等。它的劃分方法徹底由用戶本身決定。但須要注意的是,split的多少決定了Map Task的數目,由於每一個split會交由一個Map Task處理。

 

Map Task 執行過程以下圖所示。由該圖可知,Map Task 先將對應的split 迭代解析成一 個個 key/value 對,依次調用用戶自定義的map() 函數進行處理,最終將臨時結果存放到本地磁盤上,其中臨時數據被分紅若干個partition(分片),每一個partition 將被一個Reduce Task處理。

Reduce Task 執行過程以下圖所示。該過程分爲三個階段:

從遠程節點上讀取Map Task 中間結果(稱爲「Shuffle階段);

按照keykey/value 對進行排序(稱爲「Sort階段);

依次讀取 <key, value list>,調用用戶自定義的 reduce() 函數處理,並將最終結果存到HDFS上(稱爲「Reduce 階段)。

MapReduce是一種並行編程模式,利用這種模式軟件開發者能夠輕鬆地編寫出分佈式並行程序。在Hadoop的體系結構中,MapReduce是一個簡單易用的軟件框架,基於它能夠將任務分發到由上千臺商用機器組成的集羣上,並以一種可靠容錯的方式並行處理大量的數據集,實現Hadoop的並行任務處理功能。MapReduce框架是由一個單獨運行在主節點的JobTrack和運行在每一個集羣從節點的TaskTrack共同組成的。

主節點負責調度構成一個做業的全部任務,這些任務分佈在不一樣的節點上。主節點監控它們的執行狀況,而且從新執行以前失敗的任務;

從節點僅負責由主節點指派的任務。

當一個Job任務被提交時,JobTrack接收到提交做業和其配置信息以後,就會配置信息等發給從節點,同時調度任務並監控TaskTrack的執行。

 

1.3MapReduce程序運行演示

Hadoop的發佈包中內置了一個hadoop-mapreduce-example-2.6.5.jar,這個jar包中有各類MR示例程序,能夠經過如下步驟運行:

啓動hdfsyarn,而後在集羣中的任意一臺服務器上啓動執行程序(好比運行wordcount):

hadoop jar hadoop-mapreduce-example-2.6.5.jar wordcount  /wordcount/data /wordcount/out

 

2.MapReduce 編程

2.1編程規範

 

1) 用戶編寫的程序分紅三個部分:MapperReducerDriver(提交運行mr程序的客戶端)

2) Mapper的輸入數據是KV對的形式(KV的類型可自定義)

3) Mapper的輸出數據是KV對的形式(KV的類型可自定義)

4) Mapper中的業務邏輯寫在map()方法中

5) map()方法(maptask進程)對每個<K,V>調用一次

6) Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV

7) Reducer的業務邏輯寫在reduce()方法中

8) Reducetask進程對每一組相同k<k,v>組調用一次reduce()方法

9) 用戶自定義的MapperReducer都要繼承各自的父類

10) 整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象

 

2.2wordcount 示例編寫

 

需求:在一堆給定的文本文件中統計輸出每個單詞出現的總次數

(1)定義一個mapper

 

//首先要定義四個泛型的類型
//keyin:  LongWritable    valuein: Text
//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    //map方法的生命週期:  框架每傳一行數據就被調用一次
    //key :  這一行的起始點在文件中的偏移量
    //value: 這一行的內容
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //拿到一行數據轉換爲string
        String line = value.toString();
        //將這一行切分出各個單詞
        String[] words = line.split(" ");
        //遍歷數組,輸出<單詞,1>
        for(String word:words){
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

 

(2)定義一個reducer

 

//生命週期:框架每傳遞進來一個kv 組,reduce方法被調用一次
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定義一個計數器
        int count = 0;
        //遍歷這一組kv的全部v,累加到count中
        for(IntWritable value:values){
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

 

 

(3)定義一個主類,用來描述job並提交job

 

public class WordCountRunner {
    //把業務邏輯相關的信息(哪一個是mapper,哪一個是reducer,要處理的數據在哪裏,輸出的結果放哪裏……)描述成一個job對象
    //把這個描述好的job提交給集羣去運行
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);
        //指定我這個job所在的jar包
//        wcjob.setJar("/home/hadoop/wordcount.jar");
        wcjob.setJarByClass(WordCountRunner.class);
        
        wcjob.setMapperClass(WordCountMapper.class);
        wcjob.setReducerClass(WordCountReducer.class);
        //設置咱們的業務邏輯Mapper類的輸出key和value的數據類型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(IntWritable.class);
        //設置咱們的業務邏輯Reducer類的輸出key和value的數據類型
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(IntWritable.class);
        
        //指定要處理的數據所在的位置
        FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
        //指定處理完成以後的結果所保存的位置
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
        
        //向yarn集羣提交這個job
        boolean res = wcjob.waitForCompletion(true);
        System.exit(res?0:1);
    }

 

2.3集羣運行模式

 

1) mapreduce程序提交給yarn集羣resourcemanager,分發到不少的節點上併發執行

2) 處理的數據和輸出結果應該位於hdfs文件系統

3) 提交集羣的實現步驟:

將程序打成JAR包,而後在集羣的任意一個節點上用hadoop命令啓動hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath

 

做者:傑瑞教育
出處: http://www.cnblogs.com/jerehedu/ 
版權聲明:本文版權歸 傑瑞教育 技有限公司和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。
技術諮詢:JRedu技術交流
相關文章
相關標籤/搜索