帶你入坑大數據(三) --- MapReduce介紹

前言


在上一篇文章中咱們已經瞭解了HDFS的讀寫流程,HA高可用,聯邦和Sequence Files方案,簡單回顧一下HDFS的寫流程html

Client調用Distributed FileSystem的create方法,這個過程是遠程調用了NameNode的create方法,此時NameNode就會作四件事情apache

  1. 檢查本身是否正常運行編程

  2. 判斷要寫進HDFS的文件是否存在api

  3. 檢查client是否具備建立權限緩存

  4. 對這次操做進行日誌記錄(edits log)服務器

此時create方法會返回一個OutputStream,這個流還需啊喲和NameNode進行交互,調用NameNode的addBlock()方法,以得知這個block須要寫在哪些數據節點上。網絡

開始寫數據時先寫在一個chuck上,附帶着一個4字節的checkSum,總共516字節,而後再把這些chuck寫在一個更大的結構package中,在package被多個chuck寫滿以後,把package放到一個叫作data queue的隊列中,以後所作的事情有兩個數據結構

  1. data queue中的package往數據節點DataNode上傳輸,傳輸的順序按照NameNode的addBlock()方法返回的列表依次傳輸app

  2. 往DataNode上傳輸的同時也往確認隊列ack queue上傳輸框架

  3. 針對DataNode中傳輸完成的數據作一個checkSum,並與本來打包前的checkSum作一個比較

  4. 校驗成功,就從確認隊列ack queue中刪除該package,不然該package從新置入data queue重傳

完成後經過心跳機制NameNode就能夠得知副本已經建立完成,再調用addBlock()方法寫以後的文件。

異常的狀況就再也不從新說明了,能夠直接跳到第二篇進行查看

1、MapReduce編程模型


MapReduce是採用一種分而治之思想設計出來的分佈式計算框架

在計算複雜或者計算量大的任務,單臺服務器沒法勝任時,可將其切分紅一個個小的任務,小任務分別在不一樣的服務器上並行執行,最終再彙總每一個小任務的結果便可

MapReduce由兩個階段組成,切分紅小任務的Map階段和彙總小任務的Reduce階段,以下圖,須要注意,三個小任務是能夠並行執行的

1.1 Map階段

map()函數的輸入時鍵值對,輸出的是一系列鍵值對,輸出的結果時寫入本地磁盤的

1.2 Reduce階段

reduce()函數的輸入時鍵值對(即map()函數的輸出),輸出是一系列鍵值對,最終寫入HDFS

大致邏輯在下面的圖很是清晰明瞭了,shuffle的過程以後再說明

2、MapReduce編程示例


永遠都逃不過的詞頻統計,統計一篇文章中,各個單詞出現的次數

2.1 原理圖分析

從左到右,有一個文件,HDFS對它進行了分塊存儲,且每個塊咱們也能夠視爲是一個分片(split),而後它提供一個kv對(0,Dear Bear River)過來,key爲何是0呢?那這裏的0實際上是偏移量,這個偏移量是會隨着文件中的數據字節大小進行變化的。在當前例子中暫時咱們還用不上,咱們須要作的只是把做爲value的Dear Bear River作一個拆分,而後進行統計,統計完成後開始讀第二行的Dear Car,一樣輸出便可。

以後這個文件分紅的3個塊都統計好以後,再按照同一個單詞彙聚到同一個節點進行統計的方式,得出結果便可

須要注意的問題

1.咱們能夠看到在上圖存在着 4 個單詞 4 個 reduce task,可是這個reduce task的個數是由開發人員本身決定的,只是一個SetReduceNum(4)的問題

2.爲何reduce能夠得知究竟有多少個單詞,提到shuffle時咱們再說。

3.細心的你應該會發現shufflling事後的那些(Dear,1)有4個,但是key不該該只能存在一個麼,這也是shuffle的時候要說的

2.2 mapper代碼

public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        for (String word : words) {
            // 每一個單詞出現1次,做爲中間結果輸出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
複製代碼

這裏的LongWritable對應Java裏面的Long類型,Text對應String類型,由於分佈式框架中數據從一個節點到另外一個節點時會存在序列化和反序列化的問題,因此Hadoop自身提供了一些帶有序列化功能的類供咱們使用,也就是平時咱們看到的鍵值對是(Long,String),在這裏就變成了(LongWritable,Text)而已。

以後就是覆寫map()方法,實現單詞分割,以後把每一個單詞做爲key,以(word,1)這種狀態輸出出去。

想要查看這些API方法的話,能夠去hadoop官網查看,這裏我用的仍是2.7.3,看過上一篇的同窗應該也是知道了

這裏有兩個Mapper是由於第一個Mapper是老的Mapper,如今已經使用新的了。點擊Method以後就能夠看到剛剛使用的map()方法了

2.3 Reducer代碼

public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /*
        key: hello
        value: List(1, 1, ...)
    */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        int sum = 0;

        for (IntWritable count : values) {
            sum = sum + count.get();
        }
        context.write(key, new IntWritable(sum));// 輸出最終結果
    };
}
複製代碼

有了上一個2.2的基礎,這個代碼就再也不展開說明了,就是把value進行累加,而後得出一個sum,key仍是指單詞,以後以(word,sum)這種狀態輸出出去。

補充:當value中的列表很是大時,會選擇提升集羣內存或者設置一些讀句子時候的限制(自定義InputFormat類,MapReduce默認的是TextInputFormat)把數據大小給減小。

2.4 程序執行的main()方法

這裏的main方法基本每個都是直接拷貝過來而後填填set方法的參數直接用的

public class WordMain {
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }

        Configuration configuration = new Configuration();
        // 生成一個job實例
        Job job = Job.getInstance(configuration, WordMain.class.getSimpleName());

        // 打jar包以後,找程序入口用
        job.setJarByClass(WordMain.class);

        // 經過job設置輸入/輸出格式
        // MR的默認輸入格式就是TextInputFormat,因此註釋掉也沒問題
        //job.setInputFormatClass(TextInputFormat.class);
        //job.setOutputFormatClass(TextOutputFormat.class);

        // 設置輸入/輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 設置處理Map/Reduce階段的類
        job.setMapperClass(WordMap.class);
        job.setReducerClass(WordReduce.class);
        //若是map、reduce的輸出的kv對類型一致,直接設置reduce的輸出的kv對就行;若是不同,須要分別設置map, reduce的輸出的kv類型
        //job.setMapOutputKeyClass(.class)
        // 設置最終輸出key/value的類型m
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交做業
        job.waitForCompletion(true);
    }
}
複製代碼

運行的方式能夠本地運行,能夠集羣運行,能夠maven打包運行也能夠,運行結果能夠經過yarn查看,由於考慮到你們可能沒時間去搭建一個集羣玩這裏就不貼圖了,後面找機會分享一下簡答的3節點的集羣搭建。

2.5 combiner

map端的本地聚合,不管運行多少次combiner操做,都不會影響最終的結果

注意:不是全部MapReduce程序都適合使用,好比求average

WordCountMap與WordCountReduce代碼不變
WordCountMain中,增長job.setCombinerClass(WordCountReduce.class);
複製代碼

鍵值對一開始的時候是第一張圖的樣子,如今咱們剛通過Mapping時會存在大量的鍵值對,它們會經過網絡傳到對應的Reducing那,若是都是按照(word,1)的格式傳輸過去,傳輸的數據量就變得很是巨大,因此這時候最好的方案是先在本地對某一個單詞先作一個彙總,也就是combine操做,如圖,兩個(dear,1)變成了一個(Dear,2),2個(Car,1)變成了(Car,2)等···

2.6 shuffle過程

map task 輸出的時候會輸出到一個環形緩衝區中,每個環形緩衝區是100M大小,隨着數據的不斷讀寫,讓環形緩衝區的內存達到80%,這時候會形成溢出寫磁盤,把這些文件寫到磁盤中,而這個寫到磁盤的操做會經歷3個過程

首先是分區,默認狀況下是利用key來進行分區操做,MapReduce框架專門提供了一個HashPartitioner用於進行分區操做

環形緩衝區的kv對在落入磁盤前都須要去調用一下getPartition()方法,此時咱們能夠看到,它使用了一個比較巧妙的方法:先是計算了一下這個key的hashcode,再模上一個reduce的個數,這種時候咱們看上面的圖,reduce的個數是4,那咱們一個數字去模4,結果只會是4個,也就是0,1,2,3,因此這四個結果就會對應不一樣的緩衝區

剩下的就是reduce task來進行拉取數據,剛開始時會放到內存當中,放不下的時候也會溢出寫到磁盤

固然若是一開始的時候有進行setCombine操做的話就會變成(Dear,4),在圖中由於咱們是舉例說明,實際狀況下每一個分區都有不少不一樣的單詞,在reduce操做時就會進行合併操做,即相同的key放在一塊兒,而後按照字母順序排序。

combine,merge,和最後的reduce task,這些功能都同樣,只不過做用的階段不一樣,方便提高性能。只要達到業務要求就行,有時候一個map就能解決需求,有時候須要map和reduce兩個階段。

以後每個reduce task的結果都會寫到HDFS的一個文件裏。當map task完成後,後面說yarn的時候會有一個appMaster,作一個輪詢的確認,確認完成後再通知reduce task從本地磁盤拉取,有比較多的具體知識須要後續跟進時纔會在最後造成一個比較清晰的概念,這也是很是正常的。

2.7 二次排序

MapReduce中根據key進行分區排序和分組,若是如今須要自定義key類型,並自定義key的排序規則,如何實現(結合代碼講解)

public class Person implements WritableComparable<Person> {
    private String name;
    private int age;
    private int salary;

    public Person() {
    }

    public Person(String name, int age, int salary) {
        //super();
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }

    @Override
    public String toString() {
        return this.salary + "  " + this.age + "    " + this.name;
    }

    //先比較salary,高的排序在前;若相同,age小的在前
    public int compareTo(Person o) {
        int compareResult1= this.salary - o.salary;
        if(compareResult1 != 0) {
            return -compareResult1;
        } else {
            return this.age - o.age;
        }
    }

    //序列化,將NewKey轉化成使用流傳送的二進制
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeInt(salary);
    }

    //使用in讀字段的順序,要與write方法中寫的順序保持一致
    public void readFields(DataInput dataInput) throws IOException {
        //read string
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt();
        this.salary = dataInput.readInt();
    }
}
複製代碼

講解內容··

2.8 數據傾斜

數據傾斜是數據中的常見狀況。數據中不可避免地會出現離羣值(outlier),並致使數據傾斜。這些離羣值會顯著地拖慢MapReduce的執行。常見的數據傾斜有如下幾類:

  1. 數據頻率傾斜——某一個區域的數據量要遠遠大於其餘區域。(reduce傾斜)
  2. 數據大小傾斜——部分記錄的大小遠遠大於平均值。(map傾斜)

在map端和reduce端都有可能發生數據傾斜。在map端的數據傾斜會讓多樣化的數據集的處理效率更低。在reduce端的數據傾斜經常來源於MapReduce的默認分區器。

數據傾斜會致使map和reduce的任務執行時間大爲延長,也會讓須要緩存數據集的操做消耗更多的內存資源。

2.8.1 如何診斷是否存在數據傾斜

  1. 關注由map的輸出數據中的數據頻率傾斜的問題。
  2. 如何診斷map輸出中哪些鍵存在數據傾斜?
    • 在reduce方法中加入記錄map輸出鍵的詳細狀況的功能

    • 在發現了傾斜數據的存在以後,就頗有必要診斷形成數據傾斜的那些鍵。有一個簡便方法就是在代碼裏實現追蹤每一個鍵的最大值。爲了減小追蹤量,能夠設置數據量閥值,只追蹤那些數據量大於閥值的鍵,並輸出到日誌中。

8.2 減緩Reduce端數據傾斜

  1. Reduce數據傾斜通常是指map的輸出數據中存在數據頻率傾斜的情況,也就是部分輸出鍵的數據量遠遠大於其它的輸出鍵

  2. 如何減少reduce端數據傾斜的性能損失?

① 抽樣和範圍分區
Hadoop默認的分區器是基於map輸出鍵的哈希值分區。這僅在數據分佈比較均勻時比較好。在有數據傾斜時就頗有問題。

使用分區器須要首先了解數據的特性。**TotalOrderPartitioner**中,能夠經過對原始數據進行抽樣獲得的結果集來預設分區邊界值。TotalOrderPartitioner中的範圍分區器能夠經過預設的分區邊界值進行分區。所以它也能夠很好地用在矯正數據中的部分鍵的數據傾斜問題。
複製代碼
② 自定義分區
另外一個抽樣和範圍分區的替代方案是基於輸出鍵的背景知識進行自定義分區。例如,若是map輸出鍵的單詞來源於一本書。其中大部分必然是省略詞(stopword)。那麼就能夠將自定義分區將這部分省略詞發送給固定的一部分reduce實例。而將其餘的都發送給剩餘的reduce實例。
複製代碼
③ Combine
使用Combine能夠大量地減少數據頻率傾斜和數據大小傾斜。在可能的狀況下,combine的目的就是聚合並精簡數據。在技術48種介紹了combine。
複製代碼
④ Map端鏈接和半鏈接
若是鏈接的數據集太大而不能在map端的鏈接中使用。那麼能夠考慮第4章和第7章中介紹的超大數據集的鏈接優化方案。
複製代碼
⑤ 數據大小傾斜的自定義策略
在map端或reduce端的數據大小傾斜都會對緩存形成較大的影響,乃至致使OutOfMemoryError異常。處理這種狀況並不容易。能夠參考如下方法。

- 設置mapred.linerecordreader.maxlength來限制RecordReader讀取的最大長度。RecordReader在TextInputFormat和KeyValueTextInputFormat類中使用。默認長度沒有上限。
- 經過org.apache.hadoop.contrib.utils.join設置緩存的數據集的記錄數上限。在reduce中默認的緩存記錄數上限是100條。
- 考慮使用有損數據結構壓縮數據,如Bloom過濾器。
複製代碼

finally

MR的沒有分篇,篇幅很大,但願你們可以耐心看完。

根據順序下一篇是Yarn,走完大數據的這個流程。

相關文章
相關標籤/搜索