大數據-MapReduce

源碼見:https://github.com/hiszm/hadoop-trainjava

MapReduce概述

是一個分佈式計算框架 ,用於編寫批處理應用程序。編寫好的程序能夠提交到 Hadoop 集羣上用於並行處理大規模的數據集。MapReduce 做業經過將輸入的數據集拆分爲獨立的塊,這些塊由 map並行 的方式處理,框架對 map 的輸出進行排序,而後輸入到 reducegit

  • 源自於Google的MapReduce論文 ,論文發表於2004年12月
  • Hadoop MapReduce是Google MapReduce的克隆版
  • MapReduce優勢:海量數據離線處理&易開發&易運行
  • MapReduce缺點:實時流式計算

MapReduce編程模型

MapReduce

咱們編程主要關注的是如何Splitting和如何Reduce
MapReduce 框架專門用於 <key,value> 鍵值對處理,它將做業的輸入視爲一組 <key,value> 對,並生成一組 <key,value> 對做爲輸出。github

MapReduce將做業拆分紅Map階段和Reduce階段apache

  1. input : 讀取文本文件;編程

  2. splitting : 將文件按照行進行拆分,此時獲得的 K1 行數,V1 表示對應行的文本內容;markdown

  3. mapping : 並行將每一行按照空格進行拆分,拆分獲得的 List(K2,V2),其中 K2 表明每個單詞,因爲是作詞頻統計,因此 V2 的值爲 1,表明出現 1 次;app

  4. shuffling:因爲 Mapping 操做多是在不一樣的機器上並行處理的,因此須要經過 shuffling 將相同 key 值的數據分發到同一個節點上去合併,這樣才能統計出最終的結果,此時獲得 K2 爲每個單詞,List(V2) 爲可迭代集合,V2 就是 Mapping 中的 V2;框架

  5. Reducing : 這裏的案例是統計單詞出現的總次數,因此 ReducingList(V2) 進行歸約求和操做,最終輸出。
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

Mapper分佈式

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

    protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }

    protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

Reduceride

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Reducer() {
    }

    protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

MapReduce編程模型之執行步驟

  • 準備map處理的輸入數據
  • Mapper處理
  • Shuffle
  • Reduce
  • 輸出結果

MapReduce編程模型之核心概念

  • Split
  • InputFormat
  • OutputFormat
  • Combiner
  • Partitioner

image.png

相關文章
相關標籤/搜索