Hadoop框架:MapReduce基本原理和入門案例

本文源碼:GitHub·點這裏 || GitEE·點這裏java

1、MapReduce概述

一、基本概念

Hadoop核心組件之一:分佈式計算的方案MapReduce,是一種編程模型,用於大規模數據集的並行運算,其中Map(映射)和Reduce(歸約)。git

MapReduce既是一個編程模型,也是一個計算組件,處理的過程分爲兩個階段,Map階段:負責把任務分解爲多個小任務,Reduce負責把多個小任務的處理結果進行彙總。其中Map階段主要輸入是一對Key-Value,通過map計算後輸出一對Key-Value值;而後將相同Key合併,造成Key-Value集合;再將這個Key-Value集合轉入Reduce階段,通過計算輸出最終Key-Value結果集。github

二、特色描述

MapReduce能夠實現基於上千臺服務器併發工做,提供很強大的數據處理能力,若是其中單臺服務掛掉,計算任務會自動轉義到另外節點執行,保證高容錯性;可是MapReduce不適應於實時計算與流式計算,計算的數據是靜態的。算法

2、操做案例

一、流程描述

image

數據文件通常以CSV格式居多,數據行一般以空格分隔,這裏須要考慮數據內容特色;spring

文件通過切片分配在不一樣的MapTask任務中併發執行;apache

MapTask任務執行完畢以後,執行ReduceTask任務,依賴Map階段的數據;編程

ReduceTask任務執行完畢後,輸出文件結果。設計模式

二、基礎配置

hadoop:
  # 讀取的文件源
  inputPath: hdfs://hop01:9000/hopdir/javaNew.txt
  # 該路徑必須是程序運行前不存在的
  outputPath: /wordOut

三、Mapper程序

public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text mapKey = new Text();
    IntWritable mapValue = new IntWritable(1);

    @Override
    protected void map (LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
        // 一、讀取行
        String line = value.toString();
        // 二、行內容切割,根據文件中分隔符
        String[] words = line.split(" ");
        // 三、存儲
        for (String word : words) {
            mapKey.set(word);
            context.write(mapKey, mapValue);
        }
    }
}

四、Reducer程序

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum ;
    IntWritable value = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                        throws IOException, InterruptedException {
        // 一、累加求和統計
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        // 二、輸出結果
        value.set(sum);
        context.write(key,value);
    }
}

五、執行程序

@RestController
public class WordWeb {

    @Resource
    private MapReduceConfig mapReduceConfig ;

    @GetMapping("/getWord")
    public String getWord () throws IOException, ClassNotFoundException, InterruptedException {
        // 聲明配置
        Configuration hadoopConfig = new Configuration();
        hadoopConfig.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        hadoopConfig.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
        Job job = Job.getInstance(hadoopConfig);

        // Job執行做業 輸入路徑
        FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath()));
        // Job執行做業 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath()));

        // 自定義 Mapper和Reducer 兩個階段的任務處理類
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);

        // 設置輸出結果的Key和Value的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //執行Job直到完成
        job.waitForCompletion(true);
        return "success" ;
    }
}

六、執行結果查看

將應用程序打包放到hop01服務上執行;服務器

java -jar map-reduce-case01.jar

image

3、案例分析

一、數據類型

Java數據類型與對應的Hadoop數據序列化類型;網絡

Java類型 Writable類型 Java類型 Writable類型
String Text float FloatWritable
int IntWritable long LongWritable
boolean BooleanWritable double DoubleWritable
byte ByteWritable array DoubleWritable
map MapWritable

二、核心模塊

Mapper模塊:處理輸入的數據,業務邏輯在map()方法中完成,輸出的數據也是KV格式;

Reducer模塊:處理Map程序輸出的KV數據,業務邏輯在reduce()方法中;

Driver模塊:將程序提交到yarn進行調度,提交封裝了運行參數的job對象;

4、序列化操做

一、序列化簡介

序列化:將內存中對象轉換爲二進制的字節序列,能夠經過輸出流持久化存儲或者網絡傳輸;

反序列化:接收輸入字節流或者讀取磁盤持久化的數據,加載到內存的對象過程;

Hadoop序列化相關接口:Writable實現的序列化機制、Comparable管理Key的排序問題;

二、案例實現

案例描述:讀取文件,並對文件相同的行作數據累加計算,輸出計算結果;該案例演示在本地執行,不把Jar包上傳的hadoop服務器,驅動配置一致。

實體對象屬性

public class AddEntity implements Writable {

    private long addNum01;
    private long addNum02;
    private long resNum;

    // 構造方法
    public AddEntity() {
        super();
    }
    public AddEntity(long addNum01, long addNum02) {
        super();
        this.addNum01 = addNum01;
        this.addNum02 = addNum02;
        this.resNum = addNum01 + addNum02;
    }

    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(addNum01);
        dataOutput.writeLong(addNum02);
        dataOutput.writeLong(resNum);
    }
    // 反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        // 注意:反序列化順序和寫序列化順序一致
        this.addNum01  = dataInput.readLong();
        this.addNum02 = dataInput.readLong();
        this.resNum = dataInput.readLong();
    }
    // 省略Get和Set方法
}

Mapper機制

public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> {

    Text myKey = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 讀取行
        String line = value.toString();

        // 行內容切割
        String[] lineArr = line.split(",");

        // 內容格式處理
        String lineNum = lineArr[0];
        long addNum01 = Long.parseLong(lineArr[1]);
        long addNum02 = Long.parseLong(lineArr[2]);

        myKey.set(lineNum);
        AddEntity myValue = new AddEntity(addNum01,addNum02);

        // 輸出
        context.write(myKey, myValue);
    }
}

Reducer機制

public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> {

    @Override
    protected void reduce(Text key, Iterable<AddEntity> values, Context context)
            throws IOException, InterruptedException {

        long addNum01Sum = 0;
        long addNum02Sum = 0;

        // 處理Key相同
        for (AddEntity addEntity : values) {
            addNum01Sum += addEntity.getAddNum01();
            addNum02Sum += addEntity.getAddNum02();
        }

        // 最終輸出
        AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum);
        context.write(key, addRes);
    }
}

案例最終結果:

image

5、源代碼地址

GitHub·地址
https://github.com/cicadasmile/big-data-parent
GitEE·地址
https://gitee.com/cicadasmile/big-data-parent

推薦閱讀:編程體系整理

序號 項目名稱 GitHub地址 GitEE地址 推薦指數
01 Java描述設計模式,算法,數據結構 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆☆
02 Java基礎、併發、面向對象、Web開發 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆
03 SpringCloud微服務基礎組件案例詳解 GitHub·點這裏 GitEE·點這裏 ☆☆☆
04 SpringCloud微服務架構實戰綜合案例 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆☆
05 SpringBoot框架基礎應用入門到進階 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆
06 SpringBoot框架整合開發經常使用中間件 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆☆
07 數據管理、分佈式、架構設計基礎案例 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆☆
08 大數據系列、存儲、組件、計算等框架 GitHub·點這裏 GitEE·點這裏 ☆☆☆☆☆
相關文章
相關標籤/搜索