MapReduce 簡介

2. MapReduce 簡介

MapReduce 其實是分爲兩個過程java

  1. map 過程 : 數據的讀取
  2. reduce 過程 : 數據的計算

並行計算是一個很是複雜的過程, mapreduce是一個並行框架。apache

在Hadoop中,每一個MapReduce任務都被初始化爲一個Job,每一個Job又能夠分爲兩種階段:map階段和reduce階段。這兩個階段分別用兩個函數表示,即map函數和reduce函數app

咱們能夠看下典型的官方列子框架

開發

用idea 開發開發分佈式

pom.xml 添加依賴ide

<dependencies>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
</dependency>
</dependencies>

寫代碼:函數

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;oop

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;ui

/**idea

  • Created by diwu.sld on 2016/4/13.
    */
    public class WordCount{

    public static class CountMap extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable longWritable,
                     Text text,
                     OutputCollector<Text, IntWritable> outputCollector,
                     Reporter reporter) throws IOException {
         String line = text.toString();
         StringTokenizer tokenizer = new StringTokenizer(line);
    
         while(tokenizer.hasMoreTokens()){
             word.set(tokenizer.nextToken());
             outputCollector.collect(word, one);
         }
     }

    }

    public static class CountReduce extends MapReduceBase implements
    Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator values,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {
    int sum = 0;
    while (values.hasNext()) {
    sum += values.next().get();
    }
    output.collect(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
    
     conf.setMapperClass(CountMap.class);
     conf.setCombinerClass(CountReduce.class);
     conf.setReducerClass(CountReduce.class);
    
     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);
    
     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
     JobClient.runJob(conf);

    }
    }

而後打好包 HadoopDemo:

1. Project Sturcture->Artifacts->+
2. Build Artifacts

放到 hadoop 目錄下運行

運行

  1. bin/hadoop fs -mkdir -p input
  2. bin/hadoop fs -copyFromLocal README.txt input
  3. bin/hadoop jar demos/HadoopDemo.jar WorldCount input output
  4. bin/hadoop fs -cat output/* 或者bin/hadoop fs -ls output
  5. bin/hadoop fs -cat output/part-r-00000

總結

若是有N個文件,和對這個N個文件的計算,咱們能夠用並行來提升運行效率。可是文件有大有小, 計算量有多又少, 如何進行並行和分配任務是一個很是繁瑣的事情。 因此有了Hadoop這個並行框架來解決咱們的問題。

Hadoop 主要分爲兩大塊: 分佈式文件存儲和分佈式計算。

在分佈式文件存儲中,他會把文件分割爲想多相同的小塊。

相關文章
相關標籤/搜索