分佈式處理框架MapReduce

一.概述

  • MapReduce源自 Google的MapReduce論文,發表於2004年12月
  • 優勢:海量數據離線處理&易開發&易運行
  • 缺點:實時流式運算困難

二.wordcount分詞系統案例入門

  

  輸入經過InputFormat讀取,每讀一行交由map處理,通過Shuffling分序丟到Reducing上面處理,最後經過OutputFormat把記錄輸出到文件系統(HDFS)上面去。java

  java源碼:apache

  

package com.cracker.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 使用MapReduce開發WordCount應用程序
 */
public class WordCountApp {

    /**
     * Map:讀取輸入的文件
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        LongWritable one = new LongWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 接收到的每一行數據
            String line = value.toString();

            //按照指定分隔符進行拆分
            String[] words = line.split(" ");

            for (String word : words) {
                // 經過上下文把map的處理結果輸出
                context.write(new Text(word), one);
            }

        }
    }

    /**
     * Reduce:歸併操做
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
                InterruptedException {

            long sum = 0;
            for (LongWritable value : values) {
                // 求key出現的次數總和
                sum += value.get();
            }

            // 最終統計結果的輸出
            context.write(key, new LongWritable(sum));
        }
    }

    /**
     * 定義Driver:封裝了MapReduce做業的全部信息
     */
    public static void main(String[] args) throws Exception {

        //建立Configuration
        Configuration configuration = new Configuration();

        //建立Job
        Job job = Job.getInstance(configuration, "wordcount");

        //設置job的處理類
        job.setJarByClass(WordCountApp.class);

        //設置做業處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //設置map相關參數
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //設置reduce相關參數
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //設置做業處理的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
View Code

  相關命令服務器

  本地編譯app

  mvn clean package -DskipTestside

  服務器oop

  hadoop jar /root/app/hadoop-train-1.0.jar com.cracker.hadoop.mapreduce.WordCountApp hdfs://localhost:8020/hello.txt  hdfs://localhost:8020/output/wcspa

相關文章
相關標籤/搜索