大數據篇:MapReduce

MapReduce

MapReduce是什麼?java

MapReduce源自於Google發表於2004年12月的MapReduce論文,是面向大數據並行處理的計算模型、框架和平臺,而Hadoop MapReduce是Google MapReduce克隆版。apache

若是沒有MapReduce!編程

  1. 那麼在分佈式計算上面將很難辦,很差編程。
  2. 在早期沒法處理大數據的離線計算。
  3. 編程中不易擴展性
  4. 分佈式計算任務一旦掛了,沒有容錯機制進行處理

說明:MapReduce不擅長的方面(慢!)網絡

  • 實時計算:像MySQL同樣,在毫秒級或者秒級內返回結果。
  • 流式計算:MapReduce的輸入數據集是靜態的,不能動態變化。
  • DAG計算:多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出

如今MapReduce逐漸被Spark,Flink等框架取代。可是思想很重要,值得學習。架構

1 MapReduce編程模型

  • 場景:有大量文件,裏面存儲了單詞,且一個單詞佔一行
  • 任務:如何統計每一個單詞出現的次數?
  • 相似應用場景:
    • 搜索引擎中,統計最流行的K個搜索詞;
    • 統計搜索詞頻率,幫助優化搜索詞提示
  • 三種問題
    • Case 1:整個文件能夠加載到內存中;sort datafile | uniq -c;
    • Case 2:文件太大不能加載到內存中,但每一行<word, count>能夠存放到內存中;
    • Case 3:文件太大沒法加載到內存中,且<word, count>也不用保存在內存中;
  • 將三種問題範化爲:有一批文件(規模爲TB級或者 PB級),如何統計這些文件中全部單詞出現的次數;
    • 方案:首先,分別統計每一個文件中單詞出現次數,而後累加不一樣文件中同一個單詞出現次數;
    • 典型的MapReduce過程。

1.1 WordCount案例

1.1.1 WordCount流程圖

input階段,咱們取出文件中的一些數據
splitting階段,咱們將取出的單詞進行分片
Mapping階段,將每一個出現的單詞進行1次統計,轉換數據類型爲(單詞,1)
Shuffling階段,進行hash分片,放入對應的桶,俗稱洗牌,將一樣的單詞放入同一個桶。
Reducing階段,進行數據整合,求出每一個詞的出現的次數
Final result階段,最後獲取到的結果app

1.1.2 WordCount代碼及本地運行

  • 1 新建word.txt文件框架

  • Deer Bear River
    Car Car River
    Deer Car Bear
  • 2 導入maven依賴maven

  • <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
  • 3 map類分佈式

  • import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    /**
     * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 四個泛型意思:
     * Mapper<LongWritable, Text, Text, IntWritable>
     * KEYIN -> LongWritable:偏移量(存儲該行在整個文件中的起始字節偏移量)
     * VALUEIN -> Text:進入數據類型
     * KEYOUT -> Text:輸出數據鍵類型
     * VALUEOUT -> IntWritable:輸出數據值類型
     */
    public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text word = new Text();
        private IntWritable one = new IntWritable(1);
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //拿到一行數據,以空格切分
            String[] words = value.toString().split(" ");
            //遍歷單詞數據,將數據變成(單詞,1)的形式放入上下文中(框架)
            for (String word : words) {
                this.word.set(word);
                context.write(this.word, one);
            }
        }
    }
  • 4 reducer類ide

  • import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    /**
     * Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 四個泛型意思:
     * Reducer<Text, IntWritable, Text, IntWritable>
     * KEYIN -> Text:輸入數據鍵類型
     * VALUEIN -> IntWritable:輸入數據值類型
     * KEYOUT -> Text:輸出數據鍵類型
     * VALUEOUT -> IntWritable:輸出數據值類型
     */
    public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable total = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //累加相同單詞的數量
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            //包裝結果爲(單詞,總數)輸出
            total.set(sum);
            context.write(key, total);
        }
    }
  • 5 執行任務Driver類

  • import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    public class WcDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //獲取Job實例
            Job job = Job.getInstance(new Configuration());
    
            //設置工做類
            job.setJarByClass(WcDriver.class);
    
            //設置Mapper和Reducer類
            job.setMapperClass(WcMapper.class);
            job.setReducerClass(WcReducer.class);
    
            //設置Mapper和Reducer輸出的類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //設置輸入輸出數據
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //提交job
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
  • 6 設置文件輸入輸出參數,執行程序,獲得結果

#### 1.1.3 集羣運行

  • 打包上面寫好的項目,上傳集羣,執行提交命令。



2 Hadoop序列化

爲何hadoop要本身實現基本的數據類型而不直接使用Java的類?如:IntWritable,LongWritable,Text。

由於Java的序列化是一個重量級框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(校驗信息,繼承體系,Header等),在網絡中傳輸高效性有影響,因此hadoop本身實現了序列化機制(Writable)。

注:網絡傳輸中的信息都須要序列化,由於hadoop本身實現了序列化機制(Writable),因此咱們才能夠進行簡單的分佈式計算代碼開發。

2.1 手機流量統計(序列化案例)

  • 1 新建flow.txt文件(行號 手機號 IP 網址 上行流量 下行流量 狀態碼)

  • 1   13408542222 192.168.10.1    www.baidu.com   1000    2000    200
    2   17358643333 192.168.10.1    www.baidu.com   2000    4000    200
    3   13408542222 192.168.10.1    www.baidu.com   1000    2000    200
    4   17358643333 192.168.10.1    www.baidu.com   2000    4000    200
    5   13408542222 192.168.10.1    www.baidu.com   1000    2000    200
    6   17358643333 192.168.10.1    www.baidu.com   2000    4000    200
  • 2 導入maven依賴

  • <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.0.0-cdh6.2.0</version>
        </dependency>
  • 3 實體類對象

  • import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import lombok.ToString;
    import org.apache.hadoop.io.Writable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    @Getter
    @Setter
    @NoArgsConstructor
    @ToString
    //注意toString方法和最後打印結果效果相關
    public class Flow implements Writable {
        private long upFlow;
        private long downFlow;
        private long totalFlow;
    
        public void setFlow(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.totalFlow = upFlow + downFlow;
        }
    
        //序列化方法
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
            dataOutput.writeLong(totalFlow);
        }
    
        //反序列化方法
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            upFlow = dataInput.readLong();
            downFlow = dataInput.readLong();
            totalFlow = dataInput.readLong();
        }
    }
  • 4 map類

  • import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    
    
    public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
        private Text phone = new Text();
        private Flow flow = new Flow();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] data = value.toString().split(" ");
            phone.set(data[1]);
            long upFlow = Long.parseLong(data[4]);
            long downFlow = Long.parseLong(data[5]);
            flow.setFlow(upFlow, downFlow);
            context.write(phone, flow);
        }
    }
  • 5 reducer類

  • import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    public class FlowReducer extends Reducer<Text, Flow, Text, Flow> {
        private Flow flow = new Flow();
    
        @Override
        protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
            long sumUpFlow = 0;
            long sumDownFlow = 0;
            for (Flow value : values) {
                sumUpFlow += value.getUpFlow();
                sumDownFlow += value.getDownFlow();
            }
            flow.setFlow(sumUpFlow, sumDownFlow);
            context.write(key, flow);
        }
    }
  • 6 執行任務Driver類

  • import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    public class FlowDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //獲取Job實例
            Job job = Job.getInstance(new Configuration());
    
            //設置工做類
            job.setJarByClass(FlowDriver.class);
    
            //設置Mapper和Reducer類
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
    
            //設置Mapper和Reducer輸入輸出的類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Flow.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Flow.class);
    
            //設置輸入輸出數據
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //提交job
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);
        }
    }
  • 7 設置文件輸入輸出參數,執行程序,獲得結果

3 MapReduce原理

3.1MapReduce的集羣管理架構

1.客戶端發送MR任務到RM上
2.RM分配資源,找到對應的NM,分配Container容器,啓動對應的Application Master
3.Application Master向Applications Manager註冊
4.Application Master向Resource Scheduler申請資源
5.找到對應的NM
6.分配Container容器,啓動對應的的Map Task或者是Reduce Task任務
7.Map Task和Reduce Task對Application Master彙報心跳,任務進度
8.Application Master向Applications Manager彙報總體任務進度,若是執行完了Applications Manager會將Application Master移除

注意:原則上MapReduce分爲兩個階段:Map Task和Reduce Task,可是因爲Shuffling階段很重要,人爲劃分了Shuffling階段,這個階段發生在Map Task和Reduce Task之間,能夠理解爲由Map Task後半段和Reduce Task前半段組成。

3.2 MapReduce的數據流

3.3 MapTask

3.3.1 並行度決定機制

1G的數據,分紅8份並行計算,那麼每一份須要計算的數據爲128M,感受還不錯。

1M的數據,分紅8份並行計算,那麼每一份須要計算的數據爲128B,感受資源浪費嚴重。

那麼就須要有一個東西來決定怎麼切分,它就是InputFormat,而切分大小通常由HDFS塊大小決定。

  1. 一個Job的Map階段並行度由客戶端在提交Job時的切片數決定。
  2. 每個Split切片分配一個MapTask並行處理實例。
  3. 默認狀況下,切片大小=BlockSize(128M)。
  4. 切片時不考慮數據集總體,而是逐個針對每個文件單獨切片。

針對第四點說明:好比有3個文件,一個300M,第二個50M,第三個50M,那麼一共就是切了5個MapTask出來。

針對每個文件,第一個300M切了3個,第二個50M切了一個,第三個50M切了一個,共5個。

而若是隻有一個文件爲128M+1KB,那麼就只會切分一個,由於切片判斷規則爲->若是文件小於切片大小1.1倍,就和上一個切片將就放在一塊兒了,這樣能夠防止太小的切片在執行任務的時候,調度資源的時間超過執行時間的狀況。

3.3.2 InputFormat

TextInputFormat:
  • TextInputFormat是默認的FileInputFormat實現類,按行讀取每條記錄。
  • 鍵:存儲該行在整個文件中的起始字節偏移量,LongWritable類型

  • 值:爲這行內容,不包括任何行止符(如回車,換行)

  • 示例,一個分片中包含了以下記錄:

    • #源文件
      si chuan cheng du
      jiang su wu xi
      he bei bei jing
    • #被TextInputFormat加載後會變成
      (0,si chuan cheng du)
      (18,jiang su wu xi)
      (33,he  bei bei jing)
KeyValueInputFormat:
  • KeyValueInputFormat每一行均爲一條記錄,被分隔符號分割爲key,value。
  • 能夠經過在驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");來設置分隔符。默認\t

  • 示例,一個分片中包含了以下記錄:

    • #源文件
      line1  si chuan cheng du
      line2  jiang su wu xi
      line3  he  bei bei jing
    • #被KeyValueInputFormat加載後會變成
      (line1,si chuan cheng du)
      (line2,jiang su wu xi)
      (line3,he  bei bei jing)
NlineInputFormat:
  • NlineInputFormat表明每一個map進程處理的inputSplit不在按Block塊去劃分,而是按指定的行數N來劃分。
  • 輸入文件的總行數/N=切片數,若是不整除,切片數=商+1
  • 鍵:存儲該行在整個文件中的起始字節偏移量,LongWritable類型

  • 值:爲這行內容,不包括任何行止符(如回車,換行)

  • 示例,一個分片中包含了以下記錄:

    • #源文件
      si chuan cheng du
      jiang su wu xi
      he bei bei jing
      hu bei wu han
    • 若是N是2,則每一個輸入分片包含2行,開啓2個Map Task

    • #第一個map收到
      (0,si chuan cheng du)
      (18,jiang su wu xi)
      #第二個map收到
      (33,he bei bei jing)
      (49,hu bei wu han)
CombineTextInputFormat:
  • 根據設置的閾值來決定切片數。

  • 假設setMaxInputSplitSize值爲5M,以下4個文件

  • a.txt 2.1M
    b.txt 5.8M
    c.txt 3.6M
    d.txt 7.8M
    #虛擬儲存過程(由於如上4小個文件在hdfs上佔用了4個塊,因此要有一個虛擬劃塊的過程)
    2.1M < 5M 劃分一塊,2.1M
    5.8M > 5M 大於5M可是小於2*5M,劃分2個一樣大小的塊,2.9M-2.9M
    3.6M < 5M 劃分一塊,3.6M
    12M  > 5M 大於2*5M,先劃分5M,剩下的 7M > 5M 可是 < 2*5M 劃分2個一樣大小的塊,5M-3.5M-3.5M
    (獲得結果)
    2.1M
    2.9M
    2.9M
    3.6M
    4M
    3.5M
    3.5M
    #切片過程(補夠5M劃成一塊)
    第一塊 2.1M + 2.9M = 5M
    第二塊 2.9M + 3.6M = 6.5M
    第三塊 4M + 3.5M = 7.5M
    第四塊 3.5M
SequenceFileInputFormat:
  • SequenceFile其實就是上一個MR程序的輸出
  • 因爲每個MR都會落地磁盤,那麼框架就提供了一種文件對接格式SequenceFile
  • 使用SequenceFileInputFormat做爲中間結果的連接

3.4 Shuffle

Partitioner分區
  • Partitioner決定了Map Task輸出的每條數據,交給哪一個Reduce Task處理
  • 默認實現:hash(key) mod ReduceTask數目
    • 容許用戶自定義
  • 不少狀況需自定義Partitioner
    • 好比hash(hostname(URL)) mod ReduceTask數目,確保相同域名的網頁交給同一個Reduce Task處理

3.5 Combiner合併

  • Combiner是MR程序中Mapper和Reducer以外的一種組件。

  • Combiner組件的父類是Reducer。

  • Combiner和Reducer的區別是運行位置不一樣。

    • Combiner是在每個MapTask所在的節點運行;
    • Reducer是接受全局全部的Mapper的輸出結果;
  • Combiner的意義就是對每個MapTask的輸出進行局部彙總,以減少網絡傳輸量。

  • Combiner可以應用的前提是不能影響最終的業務邏輯,且Combiner輸出KV要和Reducer的輸入KV對應。

    • Mapper                      Reducer
      3 5 7 ->(3+5+7) / 3 = 5           (5+4) / 2 = 4.5 不等於 (3+5+7+2+6) / 5 = 4.6
      2 6 ->(2+6) / 2 = 4
相關文章
相關標籤/搜索