【Hadoop】- MapReduce 代碼工做過程

Hadoop MapReduce基礎案例java

MapReduce:Hadoop分佈式並行計算框架web

思想:分治法apache

通俗解釋

工廠給客戶交付貨物1000噸,卡車A運量50噸,須要順序20次,若是平時客戶不忙20次運輸所需的時間客戶可以接受,忽然市場競爭激烈,工廠爲了提供失效,每次運輸從單臺卡車運輸提升到20臺卡車運輸,這樣整個運量1次就搞定,Map Reduce相似,就是將一些廉價機器組成一個集羣,每一個節點都處理整個做業的一部分,最後進行彙總,從而快速提升大數據的處理能力app

MapReduce工做流程圖

這裏寫圖片描述

Maprreduce工做流程

HDFS提供數據源,通過splitting將數據切割成數據片,表示爲K-V數據模型做爲Mapping的輸入,Mapping對數據進行進一步的處理,並造成客戶須要的數據K-V數據模型,Shuffing對Mapping產生的K-V數據模型根據key進行排序彙總,而後將數據傳給Reduce做業,reduce對key-ValueList進行相應的處理,最終彙總出最終的結果框架


基礎案例:統計文本中出現的單詞及個數wordcount分佈式

① :啓動Hadoop集羣:start-all.shide

這裏寫圖片描述

② :準備測試的數據源函數

這裏寫圖片描述

HDFS的存儲目錄工具

這裏寫圖片描述

③ :建立Mapper:數據分割oop

package mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WcMapper extends Mapper<LongWritable,Text,Text, IntWritable>{

	@Override
	public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
		
		String line = value.toString();
		StringTokenizer st = new StringTokenizer(line);  //分詞工具類
		while(st.hasMoreTokens()){
			String word = st.nextToken();  //獲取單詞信息
			
			 /**
			  * 將數據寫入shufer過程,用於reduce的結果合併
			  * new Text(word):做爲key
			  * new IntWritable(1):做爲value,reduce會對其進行合併
			  */
			context.write(new Text(word), new IntWritable(1));  
		}
	}
}

Mapper泛型參數解釋:

  • LongWritable,Text,Text, IntWritable

  • LongWritable:Hadoop序列化的類型,可理解成Long的包裝類,這裏表示splitting Data的編號,hadoop默認按照」行」進行數據切割,這裏能夠近似理解爲第N行

  • Text:表示輸入數據的格式:這裏文件中存放的是字符串,使用Text

  • Text:表示map函數輸出的key的類型,所以是單詞計數,因此是單詞爲key,Text類型

  • IntWritable:表示map函數的輸出的value的類型,這裏表示單詞出現的次數

  • context.write(new Text(word), new IntWritable(1)); 表示每次出現一個單詞就向shuffling輸出數據:單詞及次數1,也就是說每次單詞出現一次就輸出1

④ :建立Reducer:數據彙總

package mapreduce;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text text, Iterable<IntWritable> iterable,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {

		int sum = 0;
		for(IntWritable intWritable:iterable){
			sum = sum+intWritable.get();  
		}
		context.write(new Text(text), new IntWritable(sum));
	}
}

Reducer泛型參數解釋:

  • Text, IntWritable, Text, IntWritable

說明Mapper傳遞參數和Reducer的輸出參數是字符串和整型,對應單詞和單詞次數

reduce方法形參解釋

  • Text:表示Mapper傳遞的Key的值
  • Iterable:表示Mapper傳遞的Key對應的值列表

例如:Map傳遞的值多是片斷
zhangsan 1
zhangsan 1
Map事後進入shuffling過程,shuffling會將Mapper的數據進行彙總,變成相似形式 zhangsan{1,1},也就是key-valueList數據模型,而後將其傳遞給reduce函數

int sum = 0;
for(IntWritable intWritable:iterable){
	sum = sum+intWritable.get();  
}
context.write(new Text(text), new IntWritable(sum));

對valueList集合中的值進行彙總,獲得單個單詞的累計出現次數

⑤ :建立Job測試類:Job做爲Mapreduce做業的啓動類,主要是將做業交給JobTracker,JobTacker經過調度Hadoop集羣中的taskTracker進行做業處理

package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *運行mapreduce做業:mapreduce打成jar包
 *控制檯運行:hadoop jar jar包路徑  JobRun類便可運行 
 *
 *MapReduce web訪問端口:50030
 */
public class WcJobRun {

	public static void main(String[] args) {
		
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "localhost:9001");
		try{
		Job job = new Job();
		job.setJarByClass(WcJobRun.class); //設置啓動做業類
		job.setMapperClass(WcMapper.class); //設置Map類
		job.setReducerClass(WcReducer.class);
		job.setMapOutputKeyClass(Text.class); //設置mapper輸出的key類型
		job.setMapOutputValueClass(IntWritable.class); //設置mapper輸出的value類型
		
		job.setNumReduceTasks(1); //設置Reduce Task的數量
		
		//設置mapreduce的輸入和輸出目錄
		FileInputFormat.addInputPath(job, new Path("/user/squirrel/in"));
		FileOutputFormat.setOutputPath(job, new Path("/user/squirrel/out") );
		
		//等待mapreduce整個過程完成
		System.exit(job.waitForCompletion(true)?0:1);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

⑥ :MapReduce做業測試運行

Eclipse將相關類打包爲jar文件,經過」hadoop jar 包名 job類名」運行mapreduce做業 命令:hadoop jar wc.jar mapreduce.WcJobRun

控制檯日誌:

這裏寫圖片描述

web監控mapreduce做業過程:http://192.168.174.135:50030

這裏寫圖片描述

這裏寫圖片描述

這裏寫圖片描述

查看MapReduce處理以後的文件:注意存儲在HDFS文件系統上

這裏寫圖片描述

注意:

MapReduce結果的存放目錄以前不可以在HDFS文件存在,不然拋出異常,若是想提升MapReduce做業的靈活性徹底能夠將Job類的HDFS輸入和輸出路徑引用爲main方法的形參args[0]、args[1],經過」hadoop jar wc.jar mapreduce.WcJobRun HDFS輸入路徑 HDFS輸出路徑」處理便可

相關文章
相關標籤/搜索