MapReduce:Shuffle過程詳解

一、Map任務處理html

  

  1.1 讀取HDFS中的文件。每一行解析成一個<k,v>。每個鍵值對調用一次map函數。                <0,hello you>   <10,hello me>                    java

  1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換爲新的<k,v>輸出。          <hello,1> <you,1> <hello,1> <me,1>apache

  1.3 對1.2輸出的<k,v>進行分區。默認分爲一個區。Partitioner數組

    • partitioner的做用是將mapper輸出的鍵/值對拆分爲分片(shard),每一個reducer對應一個分片。 默認狀況下,partitioner先計算目標的散列值(一般爲md5值)。而後,經過reducer個數執行取模運算key.hashCode()%(reducer的個數)。 這種方式不只可以隨機地將整個鍵空間平均分發給每一個reducer,同時也能確保不一樣mapper產生的相同鍵能被分發至同一個reducer。 若是用戶本身對Partitioner有需求,能夠訂製並設置到job上。 job.setPartitionerClass(clz);

  1.4 溢寫Split網絡

    • map以後的key/value對以及Partition的結果都會被序列化成字節數組寫入緩衝區,這個內存緩衝區是有大小限制的,默認是100MB。
    • 當緩衝區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啓動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還能夠往剩下的20MB內存中寫,互不影響。 
    •  當溢寫線程啓動後,須要對這80MB空間內的key作排序sort(1.5)
    • 內存緩衝區沒有對將發送到相同reduce端的數據作合併,那麼這種合併應該是體現是磁盤文件中的。即Combine(1.6)。

  1.5 對不一樣分區中的數據進行排序(按照k)Sortapp

    • 排序:每一個分區內調用job.setSortComparatorClass()設置的Key比較函數類排序。能夠看到,這自己就是一個二次排序。若是沒有經過job.setSortComparatorClass()設置 Key比較函數類,則使用Key實現的compareTo()方法,即字典排序。job.setSortComparatorClass(clz);       排序後:<hello,1> <hello,1> <me,1> <you,1>  

  1.6 (可選)對分組後的數據進行約。Combiner函數

    • combiner是一個可選的本地reducer,能夠在map階段聚合數據。combiner經過執行單個map範圍內的聚合,減小經過網絡傳輸的數據量。oop

    • 例如,一個聚合的計數是每一個部分計數的總和,用戶能夠先將每一箇中間結果取和,再將中間結果的和相加,從而獲得最終結果。post

    • 求平均值的時候不能用,由於123的平均是2,12平均再和3平均結果就不對了。Combiner應該用於那種Reduce的輸入key/value與輸出key/value類型徹底一致,且不影響最終結果的場景,好比累加,最大值等。url

  1.7 合併Merge

    • 每次溢寫會在磁盤上生成一個溢寫文件,若是map的輸出結果然的很大,有屢次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。
    • 最終的文件只有一個,因此須要將這些溢寫文件歸併到一塊兒,這個過程就叫作Merge。
    • 「hello」從兩個map task讀取過來,由於它們有相同的key,因此得merge成group。什麼是group。group中的值就是從不一樣溢寫文件中讀取出來的,而後再把這些值加起來。group後:<hello,{1,1}><me,{1}><you,{1}>
    • 由於merge是將多個溢寫文件合併到一個文件,因此可能也有相同的key存在,在這個過程當中若是設置過Combiner,也會使用Combiner來合併相同的key。 

 

二、Reduce任務處理

  

  2.1 拉取數據Fetch

    • Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。

  2.2 合併Merge

    • Copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端的更爲靈活,它基於JVM的heap size設置,由於Shuffle階段Reducer不運行,因此應該把絕大部分的內存都給Shuffle用。
    • 這裏須要強調的是,merge有三種形式:1)內存到內存  2)內存到磁盤  3)磁盤到磁盤。
    • 默認狀況下第一種形式不啓用。
    • 當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map 端相似,這也是溢寫的過程,也有sort排序,若是設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束。
    • 而後啓動第三種磁盤到磁盤的merge方式,有相同的key的鍵值隊,merge成groupjob.setGroupingComparatorClass設置的分組函數類,進行分組,同一個分組的value放在一個迭代器裏面(二次排序會從新設置分組規則)。若是未指定GroupingComparatorClass則則使用Key的實現的compareTo方法來對其分組。group中的值就是從不一樣溢寫文件中讀取出來的,group後:<hello,{1,1}><me,{1}><you,{1}>
    • 最終的生成的文件做爲Reducer的輸入,整個Shuffle才最終結束。

  2.3  Reduce

    • Reducer執行業務邏輯,產生新的<k,v>輸出,將結果寫到HDFS中。

 

三、WordCount代碼

package mapreduce;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountApp {
    static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
    static final String OUT_PATH = "hdfs://chaoren:9000/out";

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        Path outPath = new Path(OUT_PATH);
        if (fileSystem.exists(outPath)) {
            fileSystem.delete(outPath, true);
        }

        Job job = new Job(conf, WordCountApp.class.getSimpleName());

        // 指定讀取的文件位於哪裏
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        // 指定如何對輸入的文件進行格式化,把輸入文件每一行解析成鍵值對
        //job.setInputFormatClass(TextInputFormat.class);

        // 指定自定義的map類
        job.setMapperClass(MyMapper.class);
        // map輸出的<k,v>類型。若是<k3,v3>的類型與<k2,v2>類型一致,則能夠省略
        //job.setOutputKeyClass(Text.class);
        //job.setOutputValueClass(LongWritable.class);

        // 指定自定義reduce類
        job.setReducerClass(MyReducer.class);
        // 指定reduce的輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定寫出到哪裏
        FileOutputFormat.setOutputPath(job, outPath);
        // 指定輸出文件的格式化類
        //job.setOutputFormatClass(TextOutputFormat.class);
        
        // 分區
        //job.setPartitionerClass(clz);

        // 排序、分組、歸約  
        //job.setSortComparatorClass(clz);
        //job.setGroupingComparatorClass(clz);
        //job.setCombinerClass(clz);

        // 有一個reduce任務運行
        //job.setNumReduceTasks(1);

        // 把job提交給jobtracker運行
        job.waitForCompletion(true);
    }

    /**
     * 
     * KEYIN     即K1     表示行的偏移量 
     * VALUEIN     即V1     表示行文本內容 
     * KEYOUT     即K2     表示行中出現的單詞 
     * VALUEOUT 即V2        表示行中出現的單詞的次數,固定值1
     * 
     */
    static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException, InterruptedException {
            String[] splited = v1.toString().split("\t");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));
            }
        };
    }

    /**
     * KEYIN     即K2     表示行中出現的單詞 
     * VALUEIN     即V2     表示出現的單詞的次數 
     * KEYOUT     即K3     表示行中出現的不一樣單詞
     * VALUEOUT 即V3     表示行中出現的不一樣單詞的總次數
     */
    static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException, InterruptedException {
            long times = 0L;
            for (LongWritable count : v2s) {
                times += count.get();
            }
            ctx.write(k2, new LongWritable(times));
        };
    }
}  
相關文章
相關標籤/搜索