Hadoop 專欄 - MapReduce 入門

MapReduce的基本思想

先舉一個簡單的例子: 打個比方咱們有三我的鬥地主, 要數數牌夠不夠, 一種最簡單的方法能夠找一我的數數是否是有54張(傳統單機計算); 還能夠三我的各分一摞牌數各自的(Map階段), 三我的的總數加起來彙總(Reduce階段).java

因此MapReduce的思想即: "分治"+"彙總". 大數據量下, 一臺機器處理不了的數據, 就用多臺機器, 以分佈式集羣的形式來處理.apache

關於Map與Reduce有不少文章將這兩個詞直譯爲映射和規約, 其實Map的思想就是各自負責一塊實行分治, Reduce的思想即: 將分治的結果彙總. 幹嗎翻譯的這麼生硬呢(故意讓人以爲大數據很神祕麼?)編程

MapReduce的編程入門

仍是很簡單的模式: 包含8個步驟app

咱們那最簡單的單詞計數來舉例(號稱大數據的HelloWorld), 先讓你們跑起來看看現象再說.分佈式

按照MapReduce思想有兩個主要步驟, Mapper與Reducer, 剩餘的東西Hadoop都幫助咱們實現了, 先入門實踐再瞭解原理;ide

MapReducer有兩種運行模式: 1,集羣模式(生產環境);2,本地模式(試驗學習)oop

前提: 學習

1, 下載一個Hadoop的安裝包, 放到本地, 並配置到環境變量裏面;大數據

2, 下載一個hadoop.dll放到hadoop的bin目錄下翻譯

 

建立Maven工程, 導入依賴

<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.10.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.10.1</version>
    </dependency>

數據文件D:\Source\data\demo_result1\xx.txt

hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop

 

開始編寫代碼

第一步, 建立Mapper類

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BaseMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(",");
        Text keyout = new Text();
        LongWritable valueout = new LongWritable(1);
        for (String word : words) {
            keyout.set(word);
            context.write(keyout, valueout);
        }
    }
}

 

第二步, 建立Reducer類

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class BaseReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        int x = 0;
        for (LongWritable value : values) {
            x += value.get();
        }
        context.write(key, new LongWritable(x));
    }
}

 

第三步, 建立Job啓動類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), MainJob.class.getName());
		//集羣運行時候: 要打包
        job.setJarByClass(MainJob.class);
        //1, 讀取輸入文件解析類
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job,new Path("D:\\Source\\data\\data_in"));
        //2, 設置Mapper類
        job.setMapperClass(BaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
		 //3, 設置shuffle階段的分區, 排序, 規約, 分組
        //7, 設置Reducer類
        job.setReducerClass(BaseReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //8, 設置文件輸出類以及輸出地址
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("D:\\Source\\data\\demo_result1"));				
      	//啓動MapReduceJob
        boolean completion = job.waitForCompletion(true);
        return completion?0:1;
    }
    public static void main(String[] args) {
        MainJob mainJob = new MainJob();
        try {
            Configuration configuration = new Configuration();
            configuration.set("mapreduce.framework.name","local");
            configuration.set("yarn.resourcemanager.hostname","local");
            int run = ToolRunner.run(configuration, mainJob, args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索