hadoop 平臺運行mapreduce代碼

WordCount是寫hadoop mapreduce入門級程序,會寫wordcount的話,基本上80%的mapreduce就懂了。java

mapreduce分爲map過程和reduce過程,用戶能夠根據本身的業務自定義map過程和reduce過程。apache

以wordcount爲例,要計算文本中單詞出現的個數,須要讀取文本,並針對單詞進行統計。服務器

map過程

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 *
 * Created by Frankie on 2018/1/14.
 * KEYIN: 默認狀況下,是mr框架所讀到的一行文本的起始偏移量, Long    在hadoop中有本身的精簡序列化接口,因此不直接使用long, 而用LongWritable
 * VALUEIN: 默認狀況下,是mr框架所讀到的一行文本的內容, String
 * KEYOUT: 是用戶自定義邏輯處理完成後輸出數據中的key, 在此處是單詞,String
 * VALUEOUT: 是用戶自定義邏輯處理完成後輸出數據中的vlaue, 在次數是單詞次數,Integer
 *
 *
 **/

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /*
    * map 階段的業務邏輯就寫在自定義的Map()方法中
    * map task會對每一行輸入數據調用一次咱們自定義map()方法
    * */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        String line = value.toString();
        String[] words = line.split(" ");
        for(String word: words){
            // 將單詞做爲key, 將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會用到相同的reduce task
            // map task會收集,寫在一個文件上
            context.write(new Text(word), new IntWritable(1));
        }
    }

}

reduce過程

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * Created by Frankie on 2018/1/14.
 *
 * KEYIN, VALUEIN 對應 mapper輸出的KEYOUT, VALUEOUT類型對應
 * KEYOUT, VALUEOUT是自定義reduce邏輯處理結果的輸出數據類型
 * KEYOUT是單詞,
 * VALUE是總次數
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
        /*
        入參key, 是一組相同單詞kv對的key
            Context 上下文
        * */
        int count = 0;

//        Iterator<IntWritable> iterator = values.iterator();
//        while(iterator.hasNext()){
//            count += iterator.next().get();
//        }
//

        for( IntWritable value: values){
            count += value.get();
        }
        context.write(key, new IntWritable(count));

    }
}

mapreduce過程存在一些問題,好比,app

Map task如何進行任務分配?框架

Reduce task如何進行任務分配?ide

Map task與 reduce task如何進行銜接?oop

若是某map task 運行失敗,如何處理?code

Map task若是都要本身負責數據的分區,很麻煩orm

爲例解決這些問題,須要有個master專門對map reduce進行管理。接口

在WordCount文件中,有專門對做業進行配置,以及最後將代碼提交到客戶端。

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * Created by Frankie on 2018/1/14.
 *
 * 至關於yarn集羣的客戶端
 * 須要在此封裝mr程序的相關運行參數,指定jar包,最後提交給yarn
 */

public class WordCount {

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "wordcount");

        // 指定本程序的jar包所在的本地路徑
        job.setJarByClass(WordCount.class);

        // 指定本業務使用的map業務類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定job的輸入原始文件所在目錄
        // /data/adult.data
        FileInputFormat.setInputPaths(job, new Path(args[1]));

        // 指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

//        // 將job中配置的相關參數,以及job所用的java類所在的Jar包,提交給yarn去運行
//        job.submit();

        // 提交job配置,一直等待到運行結束
        boolean res = job.waitForCompletion(true);
        System.exit(res? 0: 1);
    }
}

代碼編輯完成後,對代碼進行打包。咱們在這裏選擇不依賴第三方包的打包方式進行打包。

打完包後,將生成的jar包提交到服務器中去。 並執行,

leiline@master:~/Documents/hadoop/myJars$ hadoop jar HadoopMapReduce.jar com.hadoop.mapreduce.WordCount /data/adult /data/out

注意,out文件是由程序自動建立的,不須要用戶手動去建立。最後,代碼執行完畢後,能夠在hdfs中看到執行的結果:

Found 2 items
-rw-r--r--   3 leiline supergroup          0 2018-01-14 19:01 /data/out/_SUCCESS
-rw-r--r--   3 leiline supergroup     216737 2018-01-14 19:01 /data/out/part-r-00000
相關文章
相關標籤/搜索