1.Combinerjava
Combiner是MapReduce的一種優化手段。每個map均可能會產生大量的本地輸出,Combiner的做用就是對map端的輸出先作一次合併,以減小map和reduce結點之間的數據傳輸量,以提升網絡IO性能。只有操做知足結合律的纔可設置combiner。apache
Combiner的做用:緩存
(1)Combiner實現本地key的聚合,對map輸出的key排序value進行迭代:如圖所示:網絡
map: (K1, V1) → list(K2, V2)combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)併發
(2)Combiner還有本地reduce功能(其本質上就是一個reduce)
例如wordcount的例子和找出value的最大值的程序 ,combiner和reduce徹底一致,以下所示:app
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K3, V3) reduce: (K3, list(V3)) → list(K4, V4)ide
使用combiner以後,先完成的map會在本地聚合,提高速度。對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,因此map一結束就能夠進行reduce的value疊加,而沒必要要等到全部的map結束再去進行reduce的value疊加。函數
在實際的Hadoop集羣操做中,咱們是由多臺主機一塊兒進行MapReduce的,若是加入規約操做,每一臺主機會在reduce以前進行一次對本機數據的規約,而後在經過集羣進行reduce操做,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。oop
2.Partitioner性能
step1.3就是分區操做,哪一個key到哪一個reducer的分配過程,是由Partitioner規定的。
用戶在中間key上使用分區函數來對數據進行分區,以後在輸入到後續任務執行進程。一個默認的分區函數式使用hash方法(好比常見的:hash(key) mod R)進行分區。hash方法可以產生很是平衡的分區。
自定製Partitioner函數:
package mapreduce01;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class fenqu {
static String INPUT_PATH="hdfs://master:9000/test";
static String OUTPUT_PATH="hdfs://master:9000/output/fenqu";
static class MyMapper extends Mapper<Object,Object,IntWritable,NullWritable>{
IntWritable output_key=new IntWritable();
NullWritable output_value=NullWritable.get();
protected void map(Object key, Object value, Context context) throw IOException,InterruptedException{
int val=Integer.parseUnsignedInt(value.toString().trim());
output_key.set(val);
context.write(output_key,output_value);
}
}
static class LiuPartitioner extends Partitioner<IntWritable,NullWritable> {
@Override
public int getPartition(IntWritable key, NullWritable value, int numPartitions){
int num=key.get();
if(num>100) return 0;
else return 1;
}
}
static class MyReduce extends Reducer<IntWritable,NullWritable,IntWritable,IntWritable>{
IntWritable output_key=new IntWritable();
int num=1;
protected void reduce(IntWritable key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{
output_key.set(num++);
context.write(output_key,key);
} }
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration(); |
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(LiuPartitioner.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
分區Partitioner主要做用在於如下兩點:
根據業務須要,產生多個輸出文件;多個reduce任務併發運行,提升總體job的運行效率。
3.Shuffle過程
reduce階段的三個步驟:
step2.1就是一個shuffle【隨機、洗牌】操做
shuffle是什麼:針對多個map任務的輸出按照不一樣的分區(Partition)經過網絡複製到不一樣的reduce任務節點上,這個過程就稱做爲Shuffle。
在map端:
1.在map端首先是InputSplit,在InputSplit中含有DataNode中的數據,每個InputSplit都會分配一個Mapper任務,Mapper任務結束後產生<K2,V2>的輸出,這些輸出先存放在緩存中,每一個map有一個環形內存緩衝區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個後臺線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。
2.寫磁盤前,要進行partition、sort和combine等操做。經過分區,將不一樣類型的數據分開處理,以後對不一樣分區的數據進行排序,若是有Combiner,還要對排序後的數據進行combine。等最後記錄寫完,將所有溢出文件合併爲一個分區且排序的文件。
3.最後將磁盤中的數據送到Reduce中,圖中Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其餘Reducer任務中。而圖示的Reducer任務的其餘的三個輸入則來自其餘節點的Map輸出。
reduce端:
1. Copy階段:Reducer經過Http方式獲得輸出文件的分區。
reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束後TaskTracker會獲得消息,進而將消息彙報給JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據複製線程從map端複製數據。
2.Merge階段:若是造成多個磁盤文件會進行合併
從map端複製來的數據首先寫到reduce端的緩存中,一樣緩存佔用到達必定閾值後會將數據寫到磁盤中,一樣會進行partition、combine、排序等過程。若是造成了多個磁盤文件還會進行合併,最後一次合併的結果做爲reduce的輸入而不是寫入到磁盤中。
3.Reducer的參數:最後將合併後的結果做爲輸入傳入Reduce任務中。
4.排序sort
step4.1第四步中須要對不一樣分區中數據進行排序和分組,默認狀況按照key進行排序和分組。
自定義類型MyGrouptestt實現了WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而咱們將其改成了咱們本身定義的比較規則,從而實現咱們想要的效果。
自定義排序:
GroupSort.java
package mapreduce01;
import java.io.IOException;
import mapreduce01.fenqu.LiuPartitioner;
import mapreduce01.fenqu.MyMapper;
import mapreduce01.fenqu.MyReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GroupSort {
static String INPUT_PATH="hdfs://master:9000/input/f.txt";
static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";
static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{
MyGrouptest output_key=new MyGrouptest();
NullWritable output_value=NullWritable.get();
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
String[] tokens=value.toString().split(",",2);
MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));
context.write(output_key,output_value);
}
}
static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{
LongWritable output_key=new LongWritable();
LongWritable output_value=new LongWritable();
protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{
output_key.set(key.getFirstNum());
output_value.set(key.getSecondNum());
context.write(output_key,output_value);
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(LiuPartitioner.class);
job.setMapOutputKeyClass(MyGrouptest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.waitForCompletion(true);
}
}
MyGrouptest.java
package mapreduce01;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MyGrouptest implements WritableComparable<MyGrouptest> {
long firstNum;
long secondNum;
public MyGrouptest() {}
public MyGrouptest(long first, long second) {
firstNum = first;
secondNum = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
@Override
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
} /* * 當key進行排序時會調用如下這個compreTo方法 */
@Override
public int compareTo(MyGrouptest anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) { // 說明第一列不相等,則返回兩數之間小的數
return (int) min;
}
else {
return (int) (secondNum - anotherKey.secondNum);
}
}
public long getFirstNum() { return firstNum; }
public long getSecondNum() { return secondNum; }
}