Hadoop-MapReduce基本原理及相關操做

Hadoop-MapReduce基本原理及相關操做

一、概述

1.思考

    求和:1+3+5+8+2+7+3+4+9+...+Integer.MAX_VALUE。java

    這是一個簡單的加法,若是這道題單臺機器線性執行的話,能夠想一想這個時間的消耗有多大,若是咱們換一種思惟來進行計算那麼這個時間就能夠減小不少,將整個加法分紅若干個段進行相加,最後將這些結果段再進行相加。這樣就能夠實行分佈式的計算。apache

    上述的方法的思想就是:分而治之,而後彙總。windows

2.MapReduce分佈式計算框架

    MapReduce是一種分佈式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題。bash

    Apache對其作了開源實現,整合在hadoop中實現通用分佈式數據計算。併發

    MR由兩個階段組成:Map和Reduce,用戶只須要實現map()和reduce()兩個函數,便可實現分佈式計算,很是簡單。大大簡化了分佈式併發處理程序的開發。app

    Map階段就是進行分段處理。框架

    Reduce階段就是進行彙總處理。彙總以後還能夠進行數據的一系列美化操做,而後再輸出。eclipse

3.MapReduce原理

    MapReduce原理圖: 此圖借鑑的網上的。具體出處如圖上的地址。分佈式

  

二、Map、Reduce的入門案例

1.入門案例

1>實現WordCount

WcMapperide

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	@Override
	protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// 1.獲得行
		String line = v1.toString();
		// 2.切行爲單詞
		String[] wds = line.split(" ");
		// 3.輸出單詞和數量,即k二、v2
		for (String w : wds) {
			context.write(new Text(w), new IntWritable(1));
		}
	}
}

WcReduce

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text k3, Iterable<IntWritable> v3s,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		// 1.獲取單詞
		String word = k3.toString();
		// 2.遍歷v3s,累計數量
		int count = 0;
		Iterator<IntWritable> it = v3s.iterator();
		while (it.hasNext()) {
			count += it.next().get();
		}
		// 3.輸出結果
		context.write(new Text(word), new IntWritable(count));
	}
}

WcDerver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WcDriver {
	public static void main(String[] args) throws Exception {
		// 1.聲明一個做業
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		// 2.聲明做業的入口
		job.setJarByClass(WcDriver.class);
		// 3.聲明Mapper
		job.setMapperClass(WcMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 4.聲明Reducer
		job.setReducerClass(WcReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 5.聲明輸入位置
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/wcdata/words.txt"));
		// 6.聲明輸出位置
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/wcresult"));
		// 7.啓動做業
		job.waitForCompletion(true);
	}
}

    將程序成打成jar,提交到集羣中運行。

    集羣搭建能夠參見:僞分佈式集羣搭建點我徹底分佈式集羣搭建點我

    如下的介紹中,我將使用k1代替mapper第一次輸入的數據key,v1表明mapper第一次輸入的數據的value值,k2表明mapper輸出數據的key,v2表明mapper輸出數據的value;k3表明reducer接收數據的key,v3表明reducer接收數據的value;

2>Eclipse導出jar包

    導出jar包有下面四個頁面:

    右鍵項目-export:搜索jar-java-JAR file-next。

    選擇要打包的項目-去掉.classpath和.project的勾選-JAR file:輸出路徑及jar包名字-next。

    next。

    main class:選擇主類-Finish。

    hadoop jar xxx.jar

環境問題

    在eclipse中使用hadoop插件開發mapreduce可能遇到的問題及解決方案:

空指針異常

    本地hadoop缺乏支持包,將winutils和hadoop.dll(及其餘)放置到eclips關聯的hadoop/bin下,並將hadoop/bin配置到PATH環境變量中。若是還不行,就再放一份到c:/windows/system32下。

不打印日誌

    在mr程序下放置一個log4j.properties文件。

null\bin\winutils.exe

    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

    解決方法1:

    配置HADOOP_HOME環境變量,可能須要重啓電腦。

    解決方法2:

    若是不想配置環境變量,能夠在代碼中寫上。

    System.setProperty("hadoop.home.dir", "本機hadoop地址");

ExitCodeException

    本地的Hadoop程序中hadoop-2.7.1\share\hadoop\common\hadoop-common-2.7.1.jar中NativeIO這個類加載不出來,須要將這個類從新打包。

2.map任務處理

    文件邏輯切片,每個切片對應一個Mapper。

    Mapper讀取輸入切片內容,每行解析成一個k一、v1對(默認狀況)。每個鍵值對調用一次map函數。執行map中的邏輯,對輸入的k一、v1處理,轉換成新的k二、v2輸出。

3.shuffle階段

    分配map輸出的數據到reduce其中會將k二、v2轉換爲k三、v3。

    中間包括buffer、split、partition、combiner、grouping、sort、combiner等操做。

4.reduce任務處理

    輸入shuffle獲得的k三、v3執行reduce處理獲得k四、v4,把k四、v4寫出到目的地。

三、MR內部執行流程

    MR能夠單獨運行,也能夠經由YARN分配資源運行。這裏先簡單說一下Yarn,後面會有具體講Yarn的文章更新。

    YARN框架的組成:

    1.0版本:JobTracker、2.0版本:ResourceManager。

    1.0版本:TaskTracker 、2.0版本:NodeManager。

    Mapper、Reducer。

 

1.運行Job

    客戶端提交一個mr的jar包給JobClient。

    提交方式爲執行Hadoop的提交命令:

hadoop jar [jar包名]

2.請求做業

    JobClient經過RPC和ResourceManager進行通訊,代表要發起一個做業,ResourceManager返回一個存放jar包的地址(HDFS)和jobId。

3.上傳做業資源

    client將jar包和相關配置信息寫入到HDFS指定的位置。

    path=hdfs上的地址+jobId。

4.提交做業

    看成業資源上傳完畢以後,Client聯繫ResourceManager提交做業任務。此處提交的做業任務,只是任務的描述信息,不是jar包。

    任務描述包括:jobid,jar存放的位置,配置信息等等。

5.初始化做業

    ResourceManager獲得Client提交的做業任務信息,會根據信息進行做業初始化,建立做業對象。

6.計算分配任務

    建立好做業對象以後,ResourceManager讀取HDFS上的要處理的文件,開始計算輸入分片split,規劃出Mapper和Reducer的數量,規劃分配任務方案,一般採用本地化策略將任務分配給NodeManager。ResourceManager不會主動聯繫NodeManager,而是等待NodeManager心跳報告。

    本地化任務策略:數據在那個節點上存儲,就將任務交給那個節點。

7.領取任務

    NodeManager經過心跳機制領取任務。這裏領取的只是任務的描述信息(即數據的元數據)。經過任務描述信息,NodeManager訪問hdfs獲取所需的jar,配置文件等。準備進行任務工做。

8.進行任務

    當準備任務完成以後,NodeManager會啓動一個單獨的java child子進程:worker進程,讓worker進程來執行具體的任務。Worker中運行指定的Mapper或Reducer,最終將結果寫入到HDFS當中。

    這裏另外啓動一個進程來執行具體的任務,其實能夠算是NodeManager的一個自保機制,由於Mapper和Reducer的代碼是工程師編寫的,這裏面避免不了會存在致使線程崩潰的代碼,或者意外狀況,致使線程中斷。這樣作能夠保護NodeManager一直處於正常工做狀態,不會由於執行Mapper和Reducer代碼致使NodeManager死亡。NodeManager還有重啓任務的機制,保證在乎外狀況下致使Mapper和Reducer執行中斷,能夠完成任務。

    整個過程傳遞的是代碼,而不是數據。即數據在哪裏,就讓運算髮生在哪裏,減小對數據的移動,提升效率。

四、MR的序列化機制

    因爲集羣工做過程當中須要用到RPC操做,因此想要MR處理的對象的類必須能夠進行序列化/反序列化操做。

    Hadoop並無使用Java原生的序列化,它的底層實際上是經過AVRO實現序列化/反序列化,而且在其基礎上提供了便捷API。

1.AVRO API

    以前用到的Text、LongWritable、IntWritable……其實都是在原有類型上包裝了一下,增長了AVRO序列化、反序列化的能力。

    咱們也可使用本身定義的類型來做爲MR的kv使用,要求是必須也去實現AVRO序列化反序列化。

1>Job

    用於整個做業的管理。

重要方法

1)getInstance(Configuration conf,String Jobname);

    獲取job對象。

2)setJarByClass(class<?> cal);

    設置程序入口。

3)setMapperClass(class<?> cal);

    設置Mapper類。

4)setMapOutputKeyClass(class<?> cal);

    設置Mapper類輸出的key值的類型。

5)setMapOutputValueClass(class<?> cal);

    設置Mapper類輸出的value值類型。

6)setReducerClass(class<?> cal);

    設置Reducer類。

7)setOutputKeyClass(class<?> cal);

    設置Reducer類輸出的key值類型,若是Mapper類和Reducer類的輸出key值同樣,能夠只設置這一個。

8)setOutputValueClass(class<?> cal);

    設置Reducer類輸出的value值類型,若是Mapper類和Reducer類的輸出value值類型型同樣,能夠只設置這一個。

9)waitForCompletion(boolean fg);

    開啓job任務。true開啓,false關閉。

2>Writable

    序列化標識接口,須要實現裏面的write()和readFileds()兩個方法。

重要方法

1)write(DataOutput out);

    此方法用於序列化,屬性的序列化順序要和反序列化順序一致。

2)readFields(DataInput in);

    此方法是用於反序列化的方法,屬性的反序列化順序要和序列化順序一致。

3>WritableComparable

    此接口用於序列化和排序的標識接口。WritableComparable = Writable + Comparable。

重要方法

1)write(DataOutput out);

2)readFields(DataInput in);

3)compareTo();

    此方法用來實現排序比較的,java基礎有講過。返回負數代表調用此方法的對象小,返回0代表兩個對象相等,返回整數代表調用此方法的對象大。

4>應用

    若是對象只是用做k一、k4或value則只實現Writable接口便可。

    若是對象用做k二、k3則類除了實現Writable接口外還要實現Comparable接口,也能夠直接實現WritableComparable效果是相同的。

案例

    統計流量(文件:flow.txt)自定義對象做爲keyvalue。

    文件樣例:

13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987

1>FlowBean

    寫一個Bean實現Writable接口,實現其中的write和readFields方法,注意這兩個方法中屬性處理的順序和類型。

public class FlowBean implements Writable {

	private String phone;
	private String addr;
	private String name;
	private long flow;

	public FlowBean() {}
	public FlowBean(String phone, String addr, String name, long flow) {
		super();
		this.phone = phone;
		this.addr = addr;
		this.name = name;
		this.flow = flow;
	}
//對應的get/set方法,這裏省略
//對應的toString()
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeUTF(addr);
		out.writeUTF(name);
		out.writeLong(flow);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phone = in.readUTF();
		this.addr = in.readUTF();
		this.name = in.readUTF();
		this.flow = in.readLong();
	}
}

    編寫完成以後,這個類的對象就能夠用於MR了。

2>FlowMapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 1.獲取行,按照空格切分
		String line = value.toString();
		String attr[] = line.split(" ");
		// 2.獲取其中的手機號,做爲k2
		String phone = attr[0];
		// 3.封裝其餘信息爲FlowBean,做爲v2
		FlowBean fb = new FlowBean(attr[0], attr[1], attr[2], Long.parseLong(attr[3]));
		// 4.發送數據。
		context.write(new Text(phone), fb);
	}
}

 

3>FlowReducer

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
	public void reduce(Text k3, Iterable<FlowBean> v3s, Context context) throws IOException, InterruptedException {
		// 1.經過k3獲取手機號
		String phone = k3.toString();
		// 2.遍歷v3s累計流量
		FlowBean fb = new FlowBean();
		Iterator<FlowBean> it = v3s.iterator();
		while (it.hasNext()) {
			FlowBean nfb = it.next();
			fb.setAddr(nfb.getAddr());
			fb.setName(nfb.getName());
			fb.setPhone(nfb.getPhone());
			fb.setFlow(fb.getFlow() + nfb.getFlow());
		}
		// 3.輸出結果
		context.write(new Text(phone), fb);
	}
}

 

4>FlowDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
	public static void main(String[] args) throws Exception {
		// 1.建立做業對象
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		// 2.設置入口類
		job.setJarByClass(FlowDriver.class);
		// 3.設置mapper類
		job.setMapperClass(FlowMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		// 4.設置Reducer類
		job.setReducerClass(cn.tedu.flow.FlowReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		// 5.設置輸入位置
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/flowdata"));
		// 6.設置輸出位置
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/flowresult"));
		// 7.啓動做業
		if (!job.waitForCompletion(true))
			return;
	}
}

 

五、排序

    Map執行事後,在數據進入reduce操做以前,數據將會按照K3進行排序,利用這個特性能夠實現大數據場景下排序的需求。

1.案例:計算利潤

    計算利潤,進行排序(文件:profit.txt)。

    數據樣例:

1 ls 2850 100
2 ls 3566 200
3 ls 4555 323
1 zs 19000 2000
2 zs 28599 3900
3 zs 34567 5000
1 ww 355 10
2 ww 555 222
3 ww 667 192

1>分析

    此案例,須要兩個MR操做,合併數據、進行排序。

    在真實開發場景中 對於複雜的業務場景,常常須要連續運行多個MR來進行處理。

2>代碼實現

ProfitMapper

public class ProfitMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 1.獲取行,按照空格切分
		String line=value.toString();
		String attr[]=line.split(" ");
		// 2.獲取人名做爲k2
		String name=attr[1];
		// 3.獲取當月收入和支出計算利潤
		int sum=Integer.parseInt(attr[2])-Integer.parseInt(attr[3]);
		// 4.輸出數據
		context.write(new Text(name), new IntWritable(sum));
	}
}

 

ProfitReducer

public class ProfitReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text k3, Iterable<IntWritable> v3s, Context context) throws IOException, InterruptedException {
		// 1.經過k3獲取人名
		String name = k3.toString();
		// 2.遍歷v3累計利潤
		Iterator<IntWritable> it = v3s.iterator();
		int cprofit = 0;
		while (it.hasNext()) {
			cprofit += it.next().get();
		}
		// 3.輸出數據
		context.write(new Text(name), new IntWritable(cprofit));
	}
}

ProfitDriver

public class ProfitDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "profit_job");
		job.setJarByClass(ProfitDriver.class);
		job.setMapperClass(ProfitMapper.class);
		job.setReducerClass(ProfitReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/pdata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/presult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

ProfitBean

    建立Bean對象實現WritableComparable接口實現其中的write readFields compareTo方法,在Map操做時,將Bean對象做爲Key輸出,從而在Reduce接受到數據時已經通過排序,而Reduce操做時,只需原樣輸出數據便可。

public class ProfitBean implements WritableComparable<ProfitBean> {
	private String name;
	private int profit;
	public ProfitBean() {
	}
	public ProfitBean(String name, int profit) {
		this.name = name;
		this.profit = profit;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getProfit() {
		return profit;
	}
	public void setProfit(int profit) {
		this.profit = profit;
	}
	@Override
	public String toString() {
		return "ProfitBean [name=" + name + ", profit=" + profit + "]";
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(name);
		out.writeInt(profit);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.name=in.readUTF();
		this.profit=in.readInt();
	}
	@Override
	public int compareTo(ProfitBean profit) {
		return this.profit-profit.getProfit()<=0?1:-1;
	}
}

ProfitSortMapper

public class ProfitSortMapper extends Mapper<LongWritable, Text, ProfitBean, NullWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line=value.toString();
		String attr[]=line.split("\t");
		ProfitBean pb=new ProfitBean(attr[0],Integer.parseInt(attr[1]));
		context.write(pb, NullWritable.get());
	}
}

ProfitSortReducer

public class ProfitSortReducer extends Reducer<ProfitBean, NullWritable, Text, IntWritable> {
	public void reduce(ProfitBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
		String name=key.getName();
		int profit=key.getProfit();
		context.write(new Text(name), new IntWritable(profit));
	}
}

此案例中也能夠沒有Reducer,MapReduce中能夠只有Map沒有Reducer,若是不配置Reduce,hadoop會自動增長一個默認Reducer,功能是原樣輸出數據。

ProfitSortDriver

public class ProfitSortDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "pro_sort_job");
		job.setJarByClass(ProfitSortDriver.class);
		job.setMapperClass(ProfitSortMapper.class);
		job.setMapOutputKeyClass(ProfitBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setReducerClass(ProfitSortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/presult"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/psresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

六、Partitioner分區

    分區操做是shuffle操做中的一個重要過程,做用就是將map的結果按照規則分發到不一樣reduce中進行處理,從而按照分區獲得多個輸出結果。

1.Partitionner

    Partitioner是partitioner的基類,若是須要定製partitioner也須要繼承該類。

    HashPartitioner是mapreduce的默認partitioner。計算方法是:

    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

    注:默認狀況下,reduceTask數量爲1。

    不少時候MR自帶的分區規則並不能知足咱們需求,爲了實現特定的效果,能夠須要本身來定義分區規則。

2.案例:改造流量統計

    改造如上統計流量案例,根據不一樣地區分區存放數據。

1>分析

    開發Partitioner代碼,寫一個類實現Partitioner接口,在其中描述分區規則。

2>代碼實現

FlowBean

public class FlowBean implements Writable{
	private String phone;
	private String addr;
	private String name;
private long flow;
//……無參、有參構造……
//……get/set……
//……toString()……
//……read/write……
}

FlowMapper

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		String line=value.toString();
		String attr[]=line.split(" ");
		String phone=attr[0];
		FlowBean fb=new FlowBean(attr[0], attr[1], attr[2], Integer.parseInt(attr[3]));
		context.write(new Text(phone), fb);
	}
}

FlowReducer

public class FlowReducer extends Reducer<Text, FlowBean, Text, NullWritable> {
	@Override
	protected void reduce(Text k3, Iterable<FlowBean> v3s, Reducer<Text, FlowBean, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		Iterator<FlowBean> it=v3s.iterator();
		FlowBean fb=new FlowBean();
		while(it.hasNext()){
			FlowBean nextFb=it.next();
			fb.setAddr(nextFb.getAddr());
			fb.setName(nextFb.getName());
			fb.setPhone(nextFb.getPhone());
			fb.setFlow(fb.getFlow()+nextFb.getFlow());
		}
		Text t=new Text(fb.getName()+" "+fb.getPhone()+" "+fb.getAddr()+" "+fb.getFlow());
		context.write(t, NullWritable.get());
	}
}

FlowCityPartitioner

public class FlowCityPartitioner extends Partitioner<Text, FlowBean> {
	@Override
	public int getPartition(Text k2, FlowBean v2, int num) {
		// 1.獲取流量所屬地區信息
		String addr = v2.getAddr();
		// 2.根據地區返回不一樣分區編號 實現 不一樣Reducer處理不一樣 地區數據的效果
		switch (addr) {
		case "bj":
			return 0;
		case "sh":
			return 1;
		case "sz":
			return 2;
		default:
			return 3;
		}
	}
}

FlowDriver

public class FlowDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Flow_Addr_Job");
		job.setJarByClass(cn.tedu.flow2.FlowDriver.class);
		job.setMapperClass(cn.tedu.flow2.FlowMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		job.setReducerClass(cn.tedu.flow2.FlowReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		//--設置Reducer的數量 默認爲1
		job.setNumReduceTasks(4);
		//--設置當前 job的Partitioner實現根據城市分配數據
		job.setPartitionerClass(FlowCityPartitioner.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/f2data"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/f2result"));
		if (!job.waitForCompletion(true))
			return;
	}
}

    Partitioner將會將數據發往不一樣reducer,這就要求reducer的數量應該大於等於Partitioner可能的結果的數量,若是少於則在執行的過程當中會報錯。

七、Combiner合併

    每個MapperTask可能會產生大量的輸出,combiner的做用就是在MapperTask端對輸出先作一次合併,以減小傳輸到reducerTask的數據量。

    combiner是實如今Mapper端進行key的歸併,combiner具備相似本地的reduce功能。

    若是不用combiner,那麼,全部的結果都是reduce完成,效率會相對低下。使用combiner,先完成在Mapper的本地聚合,從而提高速度。

job.setCombinerClass(WCReducer.class);

1.案例:改造WordCount

    改造WordCount案例,增長Combiner,從而提升效率。

1>WcMapper

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String attr[] =line.split(" ");
		for(String w:attr){
			context.write(new Text(w), new IntWritable(1));
		}
	}
}

2>WcReducer

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	protected void reduce(Text k3, Iterable<IntWritable> v3s,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		Iterator<IntWritable> it=v3s.iterator();
		int count=0;
		while(it.hasNext()){
			count+=it.next().get();
		}
		context.write(k3, new IntWritable(count));
	}
}

3>WcDriver

public class WcDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Wc_addr_Job");
		job.setJarByClass(cn.tedu.wc2.WcDriver.class);
		job.setMapperClass(cn.tedu.wc2.WcMapper.class);
		job.setReducerClass(cn.tedu.wc2.WcReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//爲當前job設置Combiner
		job.setCombinerClass(WcReducer.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/wdata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/wresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

    MapReduce的重點樹shuffle的過程,這個我會單獨出一篇文章進行講解。

 

上一篇:Hadoop-HDFS基礎原理與操做

下一篇:Hadoop-MapReduce的shuffle過程及其餘

相關文章
相關標籤/搜索