Hadoop 之 MapReduce

1 MapReduce 概述

MapReduce 是一個分佈式運算程序的編程框架,是用戶開發基於 Hadoop 的數據分析應用的核心框架。java

MapReduce 核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個Hadoop 集羣上。算法

1.1 MapReduce 優缺點

優勢:spring

  • MapReduce 易於編程apache

    它簡單的實現一些接口,就能夠完成一個分佈式程序,這個分佈式程序能夠分佈到大量廉價的 PC 機器上運行,也就是說寫一個分佈式程序,跟寫一個簡單的串行程序是如出一轍的,就是由於這個特色使得 MapReduce 編程變得很是流行。編程

  • 良好的擴展性緩存

    當計算資源不能獲得知足的時候,能夠經過簡單的增長機器來擴展它的計算能力。服務器

  • 高容錯性網絡

    MapReduce 設計的初衷就是使程序可以部署在廉價的 PC 機器上,這就要求它具備很高的容錯性,好比其中一臺機器掛了,它能夠把上面的計算任務轉移到另一個節點上運行,不至於這個任務運行失敗,並且這個過程不須要人工參與,而徹底是由 Hadoop 內部完成的。數據結構

  • 適合 PB 級以上海量數據的離線處理併發

    能夠實現上千臺服務器集羣併發工做,提供數據處理能力。

缺點:

  • 不擅長實時計算

    MapReduce 沒法像 MySQL 同樣,在毫秒或者秒級內返回結果。

  • 不擅長流式計算

    流式計算的輸入數據是動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是由於 MapReduce 自身的設計特色決定了數據源必須是靜態的。

  • 不擅長DAG(有向圖)計算

    多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出,在這種狀況下,MapReduce 並非不能作,而是使用後,每一個 MapReduce 做業的輸出結果都會寫入到磁盤,會形成大量的磁盤 IO,致使性能很是的低下。

1.2 MapReduce 核心思想

Hadoop 之 MapReduce

分佈式的運算程序每每須要分紅至少 2 個階段。

第一個階段的 MapTask 併發實例,徹底並行運行,互不相干。

第二個階段的 ReduceTask 併發實例互不相干,可是他們的數據依賴於上一個階段的全部 MapTask 併發實例的輸出。

MapReduce 編程模型只能包含一個 Map 階段和一個 Reduce 階段,若是用戶的業務邏輯很是複雜,那就只能多個MapReduce 程序,串行運行。

1.3 MapReduce 進程

一個完整的 MapReduce 程序在分佈式運行時有三類實例進程:

MrAppMaster 負責整個程序的過程調度及狀態協調

MapTask 負責 Map 階段的整個數據處理流程。

ReduceTask 負責 Reduce 階段的整個數據處理流程。

1.4 經常使用數據序列化類型

Java 類型 Hadoop Writable 類型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

1.5 MapReduce 編程規範

用戶編寫的程序分紅三個部分:

Mapper 階段

  • 用戶自定義的 Mapper 要繼承本身的父類
  • Mapper 的輸入數據是 KV 對的形式(KV的類型可自定義
  • Mapper 中的業務邏輯寫在 map() 方法中
  • Mapper的輸出數據是 KV對的形式(KV的類型可自定義)
  • map() 方法(MapTask 進程)對每個 <k,v> 調用一次

Reduce 階段

  • 用戶自定義的 Reducer 要繼承本身的父類
  • Reducer 的輸入數據類型對應 Mapper 的輸出數據類型,也是 KV
  • Reducer 的業務邏輯寫在 reduce() 方法中
  • ReduceTask 進程對每一組相同 k 的 <k,v> 組調用一次reduce()方法

Driver 階段

  • 至關於 YARN 集羣的客戶端,用於提交咱們整個程序到 YARN 集羣,提交的是封裝了 MapReduce 程序相關運行參數的job對象

1.6 WordCount 案例實操

導入依賴

<dependencies>
    <dependency>
        <groupid>junit</groupid>
        <artifactid>junit</artifactid>
        <version>RELEASE</version>
    </dependency>
    <dependency>
        <groupid>org.apache.logging.log4j</groupid>
        <artifactid>log4j-core</artifactid>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-common</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-client</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>org.apache.hadoop</groupid>
        <artifactid>hadoop-hdfs</artifactid>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupid>jdk.tools</groupid>
        <artifactid>jdk.tools</artifactid>
        <version>1.8</version>
        <scope>system</scope>
        <systempath>${JAVA_HOME}/lib/tools.jar</systempath>
    </dependency>
</dependencies>

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

WcMapper

package com.djm.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WcMapper extends Mapper<longwritable, text,text, intwritable> {

    private Text key = new Text();

    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            this.key.set(word);
            context.write(this.key, this.one);
        }
    }
}

WcReduce

package com.djm.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcReduce extends Reducer<text, intwritable, text, intwritable> {

    private IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable count : values) {
            sum += 1;
        }
        this.total.set(sum);
        context.write(key, this.total);
    }
}

WcDriver

package com.djm.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 得到任務
        Job job = Job.getInstance(new Configuration());
        // 設置Classpath
        job.setJarByClass(WcDriver.class);
        // 設置Mapper
        job.setMapperClass(WcMapper.class);
        // 設置Reducer
        job.setReducerClass(WcReduce.class);
        // 設置Mapper的輸出key和value的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 設置Reducer的輸出key和value的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 設置輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

2 Hadoop 序列化

2.1 爲何不使用 Java 序列化框架進行序列化

Serializable 是一個重量級的 Java 序列框架,一個對象被序列化後,會產生不少額外的信息(各類校驗信息,Header,繼承體系等),會產生大量的 IO,因此不適合在網絡中高效的傳輸,因此,Hadoop 本身開發了一個輕量級的序列化框架(Writable)。

Hadoop序列化特色:

一、緊湊:高效使用存儲空間。

二、快速:讀寫數據的額外開銷小

三、可擴展:隨着通訊協議的升級而可升級。

四、 互操做:支持多語言的交互。

2.2 自定義 bean 對象實現序列化接口

在開發過程當中每每提供的基本序列化類型不能知足要求,通常狀況都須要建立一個 Bean 實現 Writable 接口。

具體實現 bean 對象序列化步驟以下 7 步:

一、實現 Writable 接口

二、反序列化時,須要反射調用空參構造函數,必須提供空參構造

三、重寫序列化方法

四、重寫反序列方法

五、反序列化和序列化的順序必須徹底一致

六、要想把結果顯示在文件中,須要重寫 toString()

七、若是須要將自定義的 bean 放在 key 中傳輸,則還須要實現 Comparable 接口,由於 MapReduce 框中的 Shuffle 過程要求對 key 必須能排序

2.3 序列化案例實操

統計每個手機號耗費的總上行流量、下行流量、總流量

輸入數據格式:id 手機號碼 網絡ip 上行流量 下行流量 網絡狀態碼

輸出數據格式:手機號碼 上行流量 下行流量 總流量

FlowBean

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public FlowBean() {
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
}

FlowMapper

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<longwritable, text, flowbean> {

    private FlowBean flowBean = new FlowBean();
    private Text phone = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        phone.set(words[1]);
        long upFlow = Long.parseLong(words[words.length - 3]);
        long downFlow = Long.parseLong(words[words.length - 2]);
        flowBean.set(upFlow, downFlow);
        context.write(phone, flowBean);
    }
}

FlowReduce

package com.djm.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReduce extends Reducer<text, flowbean, text, flowbean> {

    private FlowBean totalFlow = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<flowbean> values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        for (FlowBean value : values) {
            long upFlow = value.getUpFlow();
            long downFlow = value.getDownFlow();
            sumUpFlow += upFlow;
            sumDownFlow += downFlow;
        }
        totalFlow.set(sumUpFlow, sumDownFlow);
        context.write(key, totalFlow);

    }
}

FlowDriver

package com.djm.mapreduce.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3 MapReduce 框架原理

3.1 InputFormat 數據輸入

3.1.1 切片與MapTask並行度決定機制

一個 Job 的 Map 階段並行度由客戶端在提交 Job 時的切片數決定

每個Split切片分配一個MapTask並行實例處理

默認狀況下,切片大小=BlockSize

切片時不考慮數據集總體,而是逐個針對每個文件單獨切片

3.1.2 FileInputFormat 切片機制

切片機制:

  • 簡單的按照文件的內容長度進行切片
  • 切片大小等於 Block 大小
  • 切片時不考慮數據集總體,而是逐個針對每一個文件單獨切片

源碼中如何計算切片大小的?

  • Math.max(minSize, Math.min(maxSize, blockSize));
  • mapreduce.input.fileinputformat.split.minsize=1 默認值爲1
  • mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue

如何自定義切片大小?

  • maxsize(切片最大值):參數若是調得比blockSize小,則會讓切片變小,並且就等於配置的這個參數的值。
  • minsize(切片最小值):參數調的比blockSize大,則可讓切片變得比blockSize還大。

3.1.3 CombineTextInputFormat 切片機制

CombineTextInputFormat 用於小文件過多的場景,它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個 MapTask 處理。

Hadoop 之 MapReduce

3.1.4 FileInputFormat 的其餘實現類

TextInputFormat:

TextInputForma 是默認的 FileInputFormat 實現類,按行讀取每條記錄,鍵是存儲該行在整個文件中的起始字節偏移量,LongWritable 類型,值是這行的內容,不包括任何行終止符(換行符和回車符),Text類型。

KeyValueTextInputFormat:

每一行均爲一條記錄,被分隔符分割爲 key,value,能夠經過在驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); 來設定分隔符,默認分隔符是 tab。

NLineInputFormat:

若是使用 NlineInputFormat,表明每一個 map 進程處理的 InputSplit 再也不按 Block 塊去劃分,而是按 NlineInputFormat 指定的行數N來劃分,即輸入文件的總行數 /N = 切片數,若是不整除,切片數 = 商 + 1。

3.1.5 自定義 InputFormat

不管 HDFS 仍是 MapReduce,在處理小文件時效率都很是低,但又不免面臨處理大量小文件的場景,此時,就須要有相應解決方案。能夠自定義 InputFormat 實現小文件的合併。
Hadoop 之 MapReduce

程序實現:

WholeFileInputformat

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputformat extends FileInputFormat<text, byteswritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    public RecordReader<text, byteswritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new WholeRecordReader();
    }
}

WholeRecordReader

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeRecordReader extends RecordReader<text, byteswritable> {

    private boolean notRead  = true;

    private Text key = new Text();

    private BytesWritable value = new BytesWritable();

    private FSDataInputStream fis;

    private FileSplit fs;

    /**
     * 初始化方法,框架會在開始的時候調用一次
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 轉換切換類型爲文件切片
        fs = (FileSplit) split;
        // 經過切片獲取文件路徑
        Path path = fs.getPath();
        // 經過路徑獲取文件系統
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        // 開流
        fis = fileSystem.open(path);
    }

    /**
     * 讀取下一組KV
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (notRead) {
            // 讀K
            key.set(fs.getPath().toString());
            // 讀V
            byte[] buf = new byte[(int) fs.getLength()];
            fis.read(buf);
            value.set(buf, 0, buf.length);
            notRead = false;
            return true;
        } else {
            return false;
        }
    }

    /**
     * 獲取當前讀到的key
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public Text getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /**
     * 獲取當前讀到的value
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    /**
     * 當前數據讀取的進度
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public float getProgress() throws IOException, InterruptedException {
        return notRead ? 0 : 1;
    }

    /**
     * 關閉資源
     * @throws IOException
     */
    public void close() throws IOException {
        if (fis != null) {
            fis.close();
        }

    }
}

WholeFileDriver

package com.djm.mapreduce.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class WholeFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WholeFileDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(WholeFileInputformat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

3.2 MapReduce工做流程

Hadoop 之 MapReduce

Hadoop 之 MapReduce

上面的流程是整個 MapReduce 最全工做流程,可是 Shuffle 過程只是從第 7 步開始到第 16 步結束,具體 Shuffle 過程詳解,以下:

1)MapTask 收集咱們的 map() 方法輸出的 KV 對,放到內存緩衝區中

2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合併成大的溢出文件

4)在溢出過程及合併的過程當中,都要調用 Partitioner 進行分區和針對 key 進行排序

5)ReduceTask 根據本身的分區號,去各個 MapTask 機器上取相應的結果分區數據

6)ReduceTask 會取到同一個分區的來自不一樣 MapTask 的結果文件,ReduceTask 會將這些文件再進行合併(歸併排序)

7)合併成大文件後,Shuffle 的過程也就結束了,後面進入 ReduceTask 的邏輯運算過程(從文件中取出一個一個的鍵值對 Group,調用用戶自定義的 reduce() 方法)

3.3 Shuffle 機制

Hadoop 之 MapReduce

3.3.1 Partition 分區

分區能夠將統計結果按照條件輸出到不一樣的文件中

默認 Partition 分區:

public class HashPartitioner<k, v> extends Partitioner<k, v> {

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

默認分區是根據 key 的 hashCode 對 ReduceTasks 個數取模決定的。

自定義 Partition 步驟:

  • 自定義類繼承 Partitioner,重寫 getPartition() 方法
public class CustomPartitioner extends Partitioner<text, flowbean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 控制分區代碼邏輯
        return partition;
    }
}
  • 在驅動類中,指定 Partitioner
  • 自定義 Partition 後,要根據自定義 Partitioner 的邏輯設置相應數量的 ReduceTask

注意:

  • 若是 ReduceTask 的數量 > getPartition 的結果數,則會多產生幾個空的輸出文件 part-r-000xx;
  • 若是 1< ReduceTask的數量 < getPartition 的結果數,則有一部分分區數據無處安放,會 Exception;
  • 若是 ReduceTask 的數量 = 1,則無論 MapTask 端輸出多少個分區文件,最終結果都交給這一個 ReduceTask,最終也就只會產生一個結果文件 part-r-00000;
  • 分區號必須從零開始,逐一累加。

需求分析:

Hadoop 之 MapReduce

代碼實現:

# ProvincePartitioner
package com.djm.mapreduce.partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<flowbean, text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        switch (text.toString().substring(0, 3)) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }
    }
}

# PartitionerFlowDriver
package com.djm.mapreduce.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionerFlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(PartitionerFlowDriver.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReduce.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(5);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.3.2 WritableComparable 排序

排序是 MapReduce 框架中最重要的操做之一,MapTask 和 ReduceTask 均會對數據按照 key 進行排序,該操做屬於Hadoop 的默認行爲,任何應用程序中的數據均會被排序,而無論邏輯上是否須要。

默認排序是按照字典順序排序,且實現該排序的方法是快速排序:

對於 MapTask,它會將處理的結果暫時放到環形緩衝區中,當環形緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次快速排序,並將這些有序數據溢寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行歸併排序。

對於 ReduceTask,它從每一個 MapTask 上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則溢寫磁盤上,不然存儲在內存中,若是磁盤上文件數目達到必定閾值,則進行一次歸併排序以生成一個更大文件,若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據溢寫到磁盤上,當全部數據拷貝完畢後,ReduceTask 統一對內存和磁盤上的全部數據進行一次歸併排序。

排序分類:

Hadoop 之 MapReduce

需求分析:
Hadoop 之 MapReduce

代碼實現:

package com.djm.mapreduce.partitioner;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class FlowBean implements WritableComparable<flowbean> {

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = this.upFlow + this.downFlow;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean o) {
        return this.sumFlow > o.sumFlow ? -1:1;
    }
}

3.3.3 GroupingComparator 分組

對 Reduce 階段的數據根據某一個或幾個字段進行分組。

分組排序步驟:

  • 自定義類繼承WritableComparator

  • 重寫compare()方法

  • 建立一個構造將比較對象的類傳給父類

    protected OrderGroupingComparator() {
      super(OrderBean.class, true);
    }

需求分析:

Hadoop 之 MapReduce

代碼實現:

# OrderBean
package com.djm.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class OrderBean implements WritableComparable<orderbean> {

    private String orderId;

    private String productId;

    private double price;

    @Override
    public int compareTo(OrderBean o) {
        int compare = this.orderId.compareTo(o.orderId);

        if (compare == 0) {
            return Double.compare(o.price, this.price);
        } else {
            return compare;
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.price = in.readDouble();
    }
}

# OrderSortGroupingComparator
package com.djm.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderSortGroupingComparator extends WritableComparator {

    public OrderSortGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;

        return oa.getOrderId().compareTo(ob.getOrderId());
    }
}

# OrderSortDriver
package com.djm.mapreduce.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(OrderSortDriver.class);
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReduce.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.4 MapTask 工做機制

Hadoop 之 MapReduce

1)Read 階段:MapTask 經過用戶編寫的 RecordReader,從輸入 InputSplit 中解析出一個個 key/value。

2)Map階段:該節點主要是將解析出的 key/value 交給用戶編寫 map() 函數處理,併產生一系列新的 key/value。

3)Collect 收集階段:在用戶編寫 map() 函數中,當數據處理完成後,通常會調用 OutputCollector.collect() 輸出結果,在該函數內部,它會將生成的 key/value 分區(調用Partitioner),並寫入一個環形內存緩衝區中。

4)Spill 階段:即溢寫,當環形緩衝區滿後,MapReduce 會將數據寫到本地磁盤上,生成一個臨時文件,須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。

  • 利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號 Partition 進行排序,而後按照 key 進行排序,這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照 key 有序。
  • 按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件 output/spillN.out(N表示當前溢寫次數)中,若是用戶設置了 Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做。
  • 將分區數據的元信息寫到內存索引數據結構 SpillRecord 中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小,若是當前內存索引大小超過 1MB,則將內存索引寫到文件 output/spillN.out.index中。

5)Combine 階段:當全部數據處理完成後,MapTask 對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。

6)當全部數據處理完後,MapTask 會將全部臨時文件合併成一個大文件,並保存到文件 output/file.out 中,同時生成相應的索引文件 output/file.out.index。

7)在進行文件合併過程當中,MapTask 以分區爲單位進行合併,對於某個分區,它將採用多輪遞歸合併的方式,每輪合併io.sort.factor(默認10)個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。

8)讓每一個 MapTask 最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

3.5 ReduceTask 工做機制

Hadoop 之 MapReduce

1)Copy 階段:ReduceTask 從各個 MapTask 上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中。

2)Merge 階段:在遠程拷貝數據的同時,ReduceTask 啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。

3)Sort 階段:按照 MapReduce 語義,用戶編寫 reduce() 函數輸入數據是按 key 進行彙集的一組數據,爲了將 key 相同的數據聚在一塊兒,Hadoop 採用了基於排序的策略,因爲各個 MapTask 已經實現對本身的處理結果進行了局部排序,所以,ReduceTask 只需對全部數據進行一次歸併排序便可。

4)Reduce 階段:reduce() 函數將計算結果寫到 HDFS 上。

ReduceTask 的並行度一樣影響整個 Job 的執行併發度和執行效率,但與 MapTask 的併發數由切片數決定不一樣,ReduceTask 數量的決定是能夠直接手動設置:

job.setNumReduceTasks(4);

注意事項:

  • ReduceTask=0,表示沒有 Reduce 階段,輸出文件個數和 Map 個數一致
  • ReduceTask 默認值就是 1,因此輸出文件個數爲一個
  • 若是數據分佈不均勻,就有可能在 Reduce 階段產生數據傾斜
  • ReduceTask 數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有 1 個ReduceTask
  • 具體多少個 ReduceTask,須要根據集羣性能而定
  • 若是分區數不是 1,可是 ReduceTask 爲 1,不會執行分區過程

3.6 OutputFormat 數據輸出

3.6.1 OutputFormat 接口實現類

Hadoop 之 MapReduce

3.6.2 自定義 OutputFormat

Hadoop 之 MapReduce

需求分析:

Hadoop 之 MapReduce

代碼實現:

# FilterOutputFormat
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterOutputFormat extends FileOutputFormat<text, nullwritable> {
    @Override
    public RecordWriter<text, nullwritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new FilterRecordWriter(job);
    }
}

# FilterRecordWriter
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class FilterRecordWriter extends RecordWriter<text, nullwritable> {

    private FSDataOutputStream atguiguOut = null;
    private FSDataOutputStream otherOut = null;

    public FilterRecordWriter() {
    }

    public FilterRecordWriter(TaskAttemptContext job) {
        FileSystem fs;

        try {
            fs = FileSystem.get(job.getConfiguration());
            Path atguigu = new Path("C:\\Application\\Apache\\hadoop-2.7.2\\djm.log");
            Path other = new Path("C:\\Application\\Apache\\hadoop-2.7.2\\other.log");
            atguiguOut = fs.create(atguigu);
            otherOut = fs.create(other);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        if (key.toString().contains("atguigu")) {
            atguiguOut.write(key.toString().getBytes());
        } else {
            otherOut.write(key.toString().getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

# FilterDriver
package com.djm.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FilterDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FilterDriver.class);
        job.setMapperClass(FilterMapper.class);
        job.setReducerClass(FilterReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(FilterOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.7 Join

3.7.1 Reduce Join

工做原理:

  • Map 端

    爲來自不一樣表或文件的 key/value 對,打標籤以區別不一樣來源的記錄,而後用鏈接字段做爲 key,其他部分和新加的標誌做爲 value,最後進行輸出。

  • Reduce端

    在 Reduce 端以鏈接字段做爲 key 的分組已經完成,咱們只須要在每個分組當中將那些來源於不一樣文件的記錄分開,最後進行合併。

需求分析:

Hadoop 之 MapReduce

代碼實現:

# TableBean
package com.djm.mapreduce.table;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class TableBean implements Writable {

    private String orderId;

    private String productId;

    private int amount;

    private String pname;

    private String flag;

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(productId);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.productId = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();

    }
}

# TableMapper
package com.djm.mapreduce.table;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<longwritable, text, tablebean>{

    String name;
    TableBean bean = new TableBean();
    Text k = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        name = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (name.startsWith("order")) {// 訂單表處理
            String[] fields = line.split("\t");
            bean.setOrder_id(fields[0]);
            bean.setP_id(fields[1]);
            bean.setAmount(Integer.parseInt(fields[2]));
            bean.setPname("");
            bean.setFlag("order");
            k.set(fields[1]);
        }else {
            String[] fields = line.split("\t");
            bean.setP_id(fields[0]);
            bean.setPname(fields[1]);
            bean.setFlag("pd");
            bean.setAmount(0);
            bean.setOrder_id("");
            k.set(fields[0]);
        }
        context.write(k, bean);
    }
}

# TableReducer
package com.djm.mapreduce.table;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<text, tablebean, nullwritable> {
    @Override
    protected void reduce(Text key, Iterable<tablebean> values, Context context) throws IOException, InterruptedException {
        ArrayList<tablebean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();
        for (TableBean bean : values) {
            if ("order".equals(bean.getFlag())) {
                TableBean orderBean = new TableBean();
                try {
                    BeanUtils.copyProperties(orderBean, bean);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(orderBean);
            }
            else {
                try {
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    e.printStackTrace();
                }
            }

        }
        for (TableBean bean :orderBeans) {
            bean.setPname (pdBean.getPname());
            context.write(bean, NullWritable.get());
        }
    }
}

package com.djm.mapreduce.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.addCacheFile(new URI("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt"));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.7.2 Map Join

Map Join 適用於一張表十分小、一張表很大的場景。

優勢:

在 Map 端緩存多張表,提早處理業務邏輯,這樣增長 Map 端業務,減小 Reduce 端數據的壓力,就能夠儘量的減小數據傾斜。

需求分析:

Hadoop 之 MapReduce

代碼實現:

# TableMapper
package com.djm.mapreduce.table;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class TableMapper extends Mapper<longwritable, text, nullwritable> {

    private Text k = new Text();

    private Map<string, string> pdMap = new HashMap<>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = context.getCacheFiles();
        String path = cacheFiles[0].getPath();

        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8));
        String line;
        while(StringUtils.isNotEmpty(line = reader.readLine())){
            String[] fields = line.split("\t");
            pdMap.put(fields[0], fields[1]);
        }
        reader.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] fields = value.toString().split("\t");
        String pId = fields[1];
        String pdName = pdMap.get(pId);
        k.set(fields[0] + "\t"+ pdName + "\t" + fields[2]);
        context.write(k, NullWritable.get());
    }
}

# TableDriver
package com.djm.mapreduce.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.addCacheFile(new URI("file:///C:/Application/Apache/hadoop-2.7.2/input/pd.txt"));
        job.setNumReduceTasks(0);
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.8 ETL

在運行核心業務 MapReduce 程序以前,每每要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程每每只須要運行 Mapper 程序,不須要運行 Reduce 程序。

需求分析:

須要在 Map 階段對輸入的數據根據規則進行過濾清洗。

代碼實現:

# LogBean
package com.djm.mapreduce.etl;

@Data
public class LogBean {

    private String remoteAddr;

    private String remoteUser;

    private String timeLocal;

    private String request;

    private String status;

    private String bodyBytesSent;

    private String httpReferer;

    private String httpUserAgent;

    private boolean valid = true;
}
# LogMapper
package com.djm.mapreduce.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<longwritable, text, nullwritable> {

    private Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        LogBean bean = parseLog(line);
        if (!bean.isValid()) {
            return;
        }
        k.set(bean.toString());
        context.write(k, NullWritable.get());
    }

    private LogBean parseLog(String line) {
        LogBean logBean = new LogBean();
        String[] fields = line.split(" ");
        if (fields.length > 11) {
            logBean.setRemoteAddr(fields[0]);
            logBean.setRemoteUser(fields[1]);
            logBean.setTimeLocal(fields[3].substring(1));
            logBean.setRequest(fields[6]);
            logBean.setStatus(fields[8]);
            logBean.setBodyBytesSent(fields[9]);
            logBean.setHttpReferer(fields[10]);
            if (fields.length > 12) {
                logBean.setHttpUserAgent(fields[11] + " " + fields[12]);
            } else {
                logBean.setHttpUserAgent(fields[11]);
            }
            if (Integer.parseInt(logBean.getStatus()) >= 400) {
                logBean.setValid(false);
            }
        } else {
            logBean.setValid(false);
        }
        return logBean;
    }
}

# LogDriver
package com.djm.mapreduce.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

3.9 MapReduce 開發總結

在編寫 MapReduce 程序時,須要考慮以下幾個方面:

Mapper

  • 用戶根據業務需求實現其中三個方法:map() setup() cleanup ()

Partitioner分區

  • 有默認實現 HashPartitioner,邏輯是根據 key 的哈希值和 numReduces 來返回一個分區號

    key.hashCode()&Integer.MAXVALUE % numReduces
  • 若是業務上有特別的需求,能夠自定義分區

Comparable

  • 當咱們用自定義的對象做爲 key 來輸出時,就必需要實現 WritableComparable 接口,重寫其中的 compareTo() 方法
  • 部分排序:對最終輸出的每個文件進行內部排序
  • 全排序:對全部數據進行排序,一般只有一個 Reduce
  • 二次排序:排序的條件有兩個

Combiner

  • Combiner 合併能夠提升程序執行效率,減小 IO 傳輸,可是使用時必須不能影響原有的業務處理結果

GroupingComparator

  • 在 Reduce 端對 key 進行分組

Reducer

  • 用戶根據業務需求實現其中三個方法:reduce() setup() cleanup ()

OutputFormat

  • 默認實現類是 TextOutputFormat,功能邏輯是:將每個 KV 對,向目標文本文件輸出一行
  • 將 SequenceFileOutputFormat 輸出做爲後續 MapReduce任務的輸入,這即是一種好的輸出格式,由於它的格式緊湊,很容易被壓縮
  • 用戶還能夠自定義 OutputFormat
相關文章
相關標籤/搜索