大數據(hadoop-mapreduce代碼及編程模型講解)

MapReduce編程模型

MapReduce將整個運行過程分爲兩個階段: Map 階段和Reduce階段java

Map階段由必定數量的Map Task組成
   輸入數據格式解析: InputFormat
   輸入數據處理: Mapper 
   數據分組: Partitioner      apache

Reduce階段由必定數量的Reduce Task組成
  數據遠程拷貝
  數據按照key排序
  數據處理:Reducer
  數據輸出格式:OutputFormat編程

Map階段
    InputFormat(默認TextInputFormat)
    Mapper
    Combiner(local Reducer)
    Partitioner
Reduce階段
    Reducer
    OutputFormat(默認TextOutputFormat)
 app

Java編程接口

Java編程接口組成;
    舊API:所在java包: org.apache.hadoop.mapred
    新API:所在java包: org.apache.hadoop.mapreduce
    新API具備更好的擴展性;框架

    兩種編程接口只是暴露給用戶的形式不一樣而已,內部執行引擎是同樣的;ide

 

Java新舊API

從Hadoop1.0.0開始,全部發行版均包含新舊兩類API;函數

實例1: WordCount問題

WordCount問題—map階段oop

WordCount問題—reduce階段spa

WordCount問題—mapper設計與實現設計

WordCount問題—reducer設計與實現

WordCount問題—數據流

示例代碼

package com.vip;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 單詞統計
 * @author huang
 *
 */
public class WordCountTest {

	public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
		//先來定義兩個輸出,k2,v2
		Text k2 = new Text() ;
		IntWritable v2 = new IntWritable() ;
		
		/*
		 * hello you
		 * hello me
		 * 
		 * 1.<k1,v2> 就是<0,hello you>,<10,hello me>這樣得形式
		 * 經過map函數轉換爲
		 * 2.<k2,v2>--> <hello,1><you,1><hello,1><me,1>
		 * */
		
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//對每一行得數據進行處理,拿到單詞
			String[] words = value.toString().split(" ");
			for (String word : words) {
				k2.set(word);			//word就是每行得單詞
				v2.set(1);				//每一個單詞出現得次數就是1
				context.write(k2, v2);	//輸出
			}
		}
	}
	//3.對輸出得全部得k2,v2進行分區partition
	//4.經過shuffle階段以後結果是<hello,{1,1}><me,{1}><you,{1}>
	//3,4階段都是hadoop框架自己幫咱們完成了
	//reduce
	public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//先來定義兩個輸出
			IntWritable v3 = new IntWritable() ;
			int count = 0 ;
			for (IntWritable value : values) {
				count += value.get() ;
			}
			v3.set(count);
			//輸出結果數據
			context.write(key, v3);
		}
	}
	
	//咱們已經完成了主要得map和reduce的函數編寫,把他們組裝起來交給mapreduce去執行
	public static void main(String[] args) throws Exception {
		//加載配置信息
		Configuration conf = new Configuration() ;
		//設置任務
		Job job = Job.getInstance(conf, "word count") ;
		job.setJarByClass(WordCountTest.class);
		
		//指定job要使用得mapper/reducer業務類
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		
		//指定最終輸出得數據得kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//指定job得輸入原始文件所在目錄
		FileInputFormat.addInputPath(job, new Path(args[0]));
		//指定job得輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true)?0:1) ;
	}	
}

 

package com.vip;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//求最大值
public class MapReduceCaseMax extends Configured implements Tool{

	//編寫map
	public static class MaxMapper extends Mapper<Object, Text, LongWritable, NullWritable>{
		//定義一個最小值
		long max = Long.MIN_VALUE ;
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//切割字符串,默認分隔符空格,製表符
			StringTokenizer st = new StringTokenizer(value.toString()) ;
			while(st.hasMoreTokens()){
				//獲取兩個值
				String num1 = st.nextToken() ;
				String num2 = st.nextToken() ;
				//轉換類型
				long n1 = Long.parseLong(num1) ;
				long n2 = Long.parseLong(num2) ;
				//判斷比較
				if(n1 > max){
					max = n1 ;
				}
				if(n2 > max){
					max = n2 ;
				}
			}
		}
		
		//
		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		/*設置任務和主類*/
		Job job = Job.getInstance(getConf(), "MaxFiles") ;
		job.setJarByClass(MapReduceCaseMax.class);
		
		/*設置map方法的類*/
		job.setMapperClass(MaxMapper.class);
		
		/*設置輸出的key和value的類型*/
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		
		/*設置輸入輸出參數*/
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		/*提交做業到集羣並等待任務完成*/
		boolean isSuccess = job.waitForCompletion(true);
		
		return isSuccess ? 0 : 1 ;
	}
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new MapReduceCaseMax(), args) ;
		System.exit(res);
	}
}
相關文章
相關標籤/搜索