本篇文章將介紹shuffle的過程以及MapReduce中的其餘一些組件。java
Shuffle實際上是一個過程,並非MapperReducer的一個組件,這個過程是從map輸出數據,到reduce接收處理數據以前,橫跨Mapper和Reducer兩端的,以下圖:node
shuffle分爲Mapper階段和Reducer階段,下面就兩個階段作具體分析。正則表達式
每一個MapperTask有一個環形內存緩衝區,用於存儲map任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值默認0.8(io.sort.spill.percent),就會啓動一個後臺線程把環形緩衝區中的內容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。shell
這裏有一個map()方法(簡稱m)寫入的速度和spill溢出(簡稱s)的速度,m的數據在內存中移動,s是數據由內存到磁盤,雖然s是磁盤的連續寫,可是也比不上m的內存速度,產生的現象是s在前面跑,m在後面追,m的速度>s的速度,那麼m確定在一個時間節點上會追上s,那麼當m追上s的時候,m寫入環形緩衝區的線程就會被阻塞暫停,直到s將環形緩衝區中的數據所有寫入到磁盤中,m的寫入線程纔會被啓動。因此環形緩衝區大小和閥值的大小是能夠根據業務進行調優的點。bash
寫磁盤前,要partition、sort、Combiner。若是有後續的數據,將會繼續寫入環形緩衝區中,最終寫入下一個溢出文件中。app
等最後記錄寫完,合併所有溢出寫文件爲一個分區且排序的文件。ide
若是在最終合併時,被合併的文件大於等於3個,則合併完會再執行一次Combiner,不然不會。工具
總體Mapper階段的流程以下:oop
input->map->buffer->split(partition-sort-combiner)->merge(partition-sort-combiner(file>=3))->數據落地。性能
Reducer主動找Mapper獲取本身負責的分區的數據,並不須要全部的Mapper都執行完成後再獲取,哪一個Mapper執行完,當即就去複製。
複製後,來自多個Mapper的數據要進行merge合併操做。合併後進行分組、排序,造成k3v3,進入reduce處理,處理後產生的結果輸出到目的地。
總體Reducer階段流程以下:
fetch->merge(combiner)->grouping->sort->reduce->output。
Mapper的數量在默認狀況下不可直接控制干預,Mapper的數量由輸入的大小和個數決定。在默認狀況下,最終input佔據了多少block,就應該啓動多少個Mapper。
此種狀況下,若是有大量的小文件須要處理,則會形成Hadoop集羣崩潰。大量的小文件,每一個小文件都獨佔一個Mapper處理線程,這樣啓動線程和關閉線程消耗的資源會很龐大,文件數量到達一個量級會直接致使集羣崩潰。
鑑於以上狀況,能夠經過配置mapred.min.split.size來控制split的size的最小值。當每一個split的大小達不到設置的最小值,Hadoop會將這些達不到最小值的split拼接到一塊兒,使用一個Mapper來處理這些文件,當大小超過最小值,才啓動一個新Mapper進行處理。這樣就能夠避免Mapper線程過多致使集羣崩潰的結果。
求一組數據的最大值或者最小值。
數據樣例:
123 235345 234 654768 234 4545 324
public class MaxMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { private int max = Integer.MIN_VALUE; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { int num = Integer.parseInt(value.toString()); max = max < num ? num : max; } @Override protected void cleanup(Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new IntWritable(max), NullWritable.get()); } }
public class MaxReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> { private int max=Integer.MIN_VALUE; @Override protected void reduce(IntWritable k3, Iterable<NullWritable> v3s, Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { int num=Integer.parseInt(k3.toString()); max=max>num?max:num; } @Override protected void cleanup(Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new IntWritable(max), NullWritable.get()); } }
public class MaxDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Max_Job"); job.setJarByClass(cn.tedu.max.MaxDriver.class); job.setMapperClass(cn.tedu.max.MaxMapper.class); job.setReducerClass(cn.tedu.max.MaxReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/maxdata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/maxresult")); if (!job.waitForCompletion(true)) return; } }
按月份產生文件,統計每一個人的語數外及總分。
數據樣例:
math.txt
1 zhang 85 2 zhang 59 3 zhang 95 1 wang 74 2 wang 67 3 wang 96 1 li 45 2 li 76 3 li 67
chinese.txt
1 zhang 89 2 zhang 73 3 zhang 67 1 wang 49 2 wang 83 3 wang 27 1 li 77 2 li 66 3 li 89
english.txt)
1 zhang 55 2 zhang 69 3 zhang 75 1 wang 44 2 wang 64 3 wang 86 1 li 76 2 li 84 3 li 93
//如下代碼涉及到的重要方法 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fn = inputSplit.getPath().getName();
public class ScoreBean implements Writable { private String name; private String subject; private int month; private int score; //這裏省去了如下方法,記得補上 //……get/set…… //……有參/無參構造…… //……read/write…… }
public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ScoreBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String attr[] = line.split(" "); String name = attr[1]; FileSplit fs = (FileSplit) context.getInputSplit(); String path = fs.getPath().getName(); String subject = path.substring(0, path.lastIndexOf(".")); ScoreBean sb = new ScoreBean(attr[1], subject, Integer.parseInt(attr[0]), Integer.parseInt(attr[2])); context.write(new Text(name), sb); } }
public class ScoreReducer extends Reducer<Text, ScoreBean, Text, NullWritable> { @Override protected void reduce(Text k3, Iterable<ScoreBean> v3s, Reducer<Text, ScoreBean, Text, NullWritable>.Context context) throws IOException, InterruptedException { String name = k3.toString(); Map<String, Integer> map = new HashMap<String, Integer>(); int count = 0; Iterator<ScoreBean> it = v3s.iterator(); while (it.hasNext()) { ScoreBean sb = it.next(); map.put(sb.getSubject(), sb.getScore()); count += sb.getScore(); } String result = name + " " + map.get("chinese") + " " + map.get("math") + " " + map.get("english") + " " + count; context.write(new Text(result), NullWritable.get()); } }
public class ScoreMonthPartitioner extends Partitioner<Text, ScoreBean>{ @Override public int getPartition(Text key, ScoreBean value, int numPartitions) { return value.getMonth()-1; } }
public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Scorec_Job"); job.setJarByClass(cn.tedu.score.ScoreDriver.class); job.setMapperClass(cn.tedu.score.ScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreBean.class); job.setReducerClass(cn.tedu.score.ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(ScoreMonthPartitioner.class); job.setNumReduceTasks(3); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/scoredata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/scoresult")); if (!job.waitForCompletion(true)) return; } }
InputFormat:輸入格式化器。
MapReduce開始階段階段,InputFormat類用來產生InputSplit,並把基於RecordReader它切分紅record,造成Mapper的輸入。
Hadoop自己提供了若干內置的InputFormat,其中若是不明確指定默認使用TextInputFormat。
做爲默認的文件輸入格式,用於讀取純文本文件,文件被分爲一系列以LF或者CR結束的行,key是每一行的位置偏移量,是LongWritable類型的,value是每一行的內容,爲Text類型。
一樣用於讀取文件,若是行被分隔符(缺省是tab)分割爲兩部分,第一部分爲key,剩下的部分爲value;若是沒有分隔符,整行做爲key,value爲空。
用於讀取sequence file。sequence file是Hadoop用於存儲數據自定義格式的binary文件。它有兩個子類:SequenceFileAsBinaryInputFormat,將key和value以BytesWritable的類型讀出;SequenceFileAsTextInputFormat,將key和value以Text類型讀出。
根據filter從sequence文件中取得部分知足條件的數據,經過setFilterClass指定Filter,內置了三種Filter,RegexFilter取key值知足指定的正則表達式的記錄;PercentFilter經過指定參數f,取記錄行數%f==0的記錄;MD5Filter經過指定參數f,取MD5(key)%f==0的記錄。
0.18.x版本新加入,能夠將文件以行爲單位進行split,好比文件的每一行對應一個mapper。獲得的key是每一行的位置偏移量(LongWritable類型),value是每一行的內容,Text類型。適用於行少列多的文件。
用於多個數據源的join。
能夠經過job.setInputFormatClass(XxxInputFormat.class);來設定選用哪一種InputFormat。
若是以上InputFormat不夠用,咱們也能夠本身定義InputFormat。
全部InputFormat都要直接或間接的繼承InputFormat抽象類。
InputFormat抽象類中主要定義了以下兩個方法:
① getSplits(JobContext context)
生產InputSplit集合的方法,此方法接受JobContext接受環境信息,獲得要處理的文件信息後,進行邏輯切割,產生InputSplit集合返回。
List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
②createRecordReader(InputSplit split,TaskAttemptContext context)
此方法返回RecordReader對象。一個RecordReader包含方法描述如何從InputSplit切分出要送入Mapper的K一、V1對。
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
咱們能夠直接繼承InputFormat,但更多的時候咱們會選擇繼承他的一個實現子類,好比:FileInputFormat。此類是全部來源爲文件的InputFormat的基類,默認的TextInputFormat就繼承自它。
FileInputFormat實現了InputFormat抽象類,實現了getSplits方法,根據配置去邏輯切割文件,返回FileSplit的集合,並提供了isSplitable()方法,子類能夠經過在這個方法中返回boolean類型的值代表是否要對文件進行邏輯切割,若是返回false則不管文件是否超過一個Block大小都不會進行切割,而將這個文件做爲一個邏輯塊返回。而對createRecordReader方法則沒有提供實現,設置爲了抽象方法,要求子類實現。
若是想要更精細的改變邏輯切塊規則能夠覆蓋getSplits方法本身編寫代碼實現。
而更多的時候,咱們直接使用父類中的方法而將精力放置在createRecordReader上,決定如何將InputSplit轉換爲一個個的Recoder。
讀取score1.txt文件,從中每4行讀取成績,其中第一行爲姓名,後3行爲單科成績,計算總分,最終輸出爲姓名:總分格式的文件。
文件內容樣例:
張三 語文 97 數學 77 英語 69 李四 語文 87 數學 57 英語 63 王五 語文 47 數學 54 英語 39
分析:
在此例子中,須要按照每三行一次進行讀取稱爲一個InputSplit,通過比較可使用NLineInputFormat,可是爲了演示,使用自定義InputFormat來作這件事。
/** * 自定義InputFormat類,來實現每三行做爲一個Recorder觸發一次Mapper的效果 */ public class MyFileInputFormat extends FileInputFormat<Text, Text> { /** * 由於要每三行讀取做爲一個Recoder,因此若是切塊形成塊內數據行數不是3的倍數可能形成處理出問題 * 所以返回false,標識文件不要進行切塊 */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * 返回自定義的MyRecordReader */ @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(); } }
/** * 自定義的RecordReader,代表瞭如何將一個InputSplit切割出一個個的Recorder * */ public class MyRecordReader extends RecordReader<Text, Text> { private LineReader lineReader = null; private Text key = null; private Text value = null; private boolean hasMore = true; /** * 初始化的方法,將在InputSplit被切割前調用 */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //獲取文件路徑 FileSplit fs = (FileSplit) split; Path path = fs.getPath(); //獲取文件系統 Configuration conf = context.getConfiguration(); FileSystem fileSystem = path.getFileSystem(conf); //從文件系統中讀取文件路徑獲得文件流 FSDataInputStream fin = fileSystem.open(path); //將文件流包裝爲行讀取器 lineReader = new LineReader(fin); } /** * 下一個keyvalue方法 * 返回值表當前是否讀取到了新的紀錄 */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { key = new Text(); value = new Text(); Text temp = new Text(); int count = 0; for (int i = 0; i < 4; i++) { int len = lineReader.readLine(temp); if (len == 0) { hasMore = false; break; } else { count++; value.append(temp.getBytes(), 0, temp.getLength()); value.append("\r\n".getBytes(), 0, "\r\n".length()); temp.clear(); } } key.set(count+""); return count != 0; } /** * 獲取當前key方法 */ @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } /** * 獲取當前value方法 */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** * 獲取處理進度的方法,返回0.0f-1.0f之間的值表示進度 */ @Override public float getProgress() throws IOException, InterruptedException { if (hasMore) return 0.0f; else return 1.0f; } /** * 關閉資源的方法 * 當切割InputSplit結束會被調用用來釋放資源 */ @Override public void close() throws IOException { lineReader.close(); } }
/** * 計算成績的Mapper * */ public class ScoreMapper extends Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String str = value.toString(); String lines [] = str.split("\r\n"); String name = lines[0]; for(int i = 1;i<lines.length;i++){ context.write(new Text(name), new Text(lines[i])); } } }
/** * 計算成績的Reducer */ public class ScoreReducer extends Reducer<Text, Text, Text, IntWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = key.toString(); int countFen = 0; for(Text v : values){ String line = v.toString(); String subject = line.split(" ")[0]; int fen = Integer.parseInt(line.split(" ")[1]); countFen += fen; } context.write(new Text(name), new IntWritable(countFen)); } }
/** * ScoreDriver */ public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "score_job"); job.setJarByClass(ScoreDriver.class); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //指定InputFormat爲自定義的MyFileInputFormat job.setInputFormatClass(MyFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result")); if (!job.waitForCompletion(true)) return; } }
MultipleInputs能夠將多個輸入組裝起來,同時爲Mapper提供數據,當咱們但願從多個來源讀取數據時可使用。甚至,在指定來源時能夠爲不一樣來源的數據指定不一樣的InputFormat和Mapper以應對不一樣格式的輸入數據。
此類上提供的靜態方法有:
/** * 指定數據來源及對應的InputFormat */ MultipleInputs.addInputPath(job, path, inputFormatClass); /** * 指定數據來源及對應的InputFormat 和 Mapper */ MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass);
改造上述案例,同時從另外一個文件score2.txt中讀取數據統計成績。score2.txt中的數據是一行爲一個學生的成績。
數據樣例:
趙六 56 47 69 陳七 73 84 91 劉八 45 56 66
代碼:
/** * 計算成績的Mapper */ public class ScoreMapper2 extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String attrs [] = line.split(" "); String name = attrs[0]; for(int i = 1;i<attrs.length;i++){ if(i == 1){ context.write(new Text(name), new Text("語文\t"+attrs[i])); }else if(i == 2){ context.write(new Text(name), new Text("數學\t"+attrs[i])); }else if(i == 3){ context.write(new Text(name), new Text("英語\t"+attrs[i])); } } } }
/** * ScoreDriver */ public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "score_job"); job.setJarByClass(ScoreDriver.class); //job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //指定InputFormat爲自定義的MyFileInputFormat job.setInputFormatClass(MyFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt")); MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score1.txt"), MyFileInputFormat.class,ScoreMapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score2.txt"), TextInputFormat.class,ScoreMapper2.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result")); if (!job.waitForCompletion(true)) return; } }
OutputFormat:輸出格式化器。
MapReduce結束階段,OutputFormat類決定了Reducer如何產生輸出。
Hadoop自己提供了若干內置的OutputFormat,其中若是不明確指定默認使用TextOutputFormat。
(實現OutputFormat接口)- 全部輸出到文件的OutputFormats的基類。
以行分隔、包含製表符定界的鍵值對的文本文件格式。
二進制鍵值數據的壓縮格式。
原生二進制數據的壓縮格式。
一種使用部分索引鍵的格式。
使用鍵值對參數寫入文件的抽象類。
輸出多個以標準行分割、製表符定界格式的文件。
輸出多個壓縮格式的文件。
能夠經過job.setOutputFormatClass(XxxoutputFormat.class);來設定選用哪一種OutputFormat。
若是以上OutputFormat不夠用,一樣也能夠本身定義OutputFormat。
全部的OutputFormat都要直接或間接的繼承OutputFormat抽象類
OutputFormat抽象類中定義了以下的抽象方法:
①getRecordWriter(TaskAttemptContext context)
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; public abstract void checkOutputSpecs(JobContext context ) throws IOException,InterruptedException; public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
咱們能夠直接繼承OutputFormat,但更多的時候咱們會選擇繼承他的一個實現子類,好比FileOutputFormat。- 此類是全部目的地爲文件的OutputFormat的基類,例如默認的TextOutputFormat就繼承自它。
FileOutputFormat實現了OutputFormat接口,默認實現了checkOutputSpecs和getOutputCommitter方法,並將getRecordWriter()設置爲抽象方法要求咱們去實現。
若是想要更精細的改變邏輯能夠本身去編寫getOutputCommitter和checkOutputSpecs方法。
而更多的時候,咱們直接使用父類中的方法而將精力放置在getRecordWriter上,決定如何產生輸出。
編寫wordcount案例,並將輸出按照'#'進行分割,輸出爲一行。
數據樣例:
hello tom hello joy hello rose hello joy hello jerry hello tom hello rose hello joy
代碼以下:
public class MyRecordWriter<K,V> extends RecordWriter<K,V> { protected DataOutputStream out = null; private final byte[] keyValueSeparator; //public static final String NEW_LINE = System.getProperty("line.separator"); public static final String NEW_LINE = "#"; public MyRecordWriter(DataOutputStream out,String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(); } @Override public void write(K key, V value) throws IOException, InterruptedException { if(key!=null){ out.write(key.toString().getBytes()); out.write(keyValueSeparator); } out.write(value.toString().getBytes()); out.write(NEW_LINE.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } }
public class MyFileOutputFormat<K,V> extends FileOutputFormat<K,V> { @Override public RecordWriter<K,V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path file = getDefaultWorkFile(context, ""); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter<K,V>(fileOut, " "); } }
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words [] = line.split(" "); for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { Iterator<LongWritable> it = values.iterator(); long count = 0; while(it.hasNext()){ long c = it.next().get(); count += c; } context.write(key, new LongWritable(count)); } }
public class WCDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WCJob"); job.setJarByClass(cn.tedu.wc.WCDriver.class); job.setMapperClass(cn.tedu.wc.WCMapper.class); job.setReducerClass(cn.tedu.wc.WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(MyFileOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/park/words.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/park/result2")); if (!job.waitForCompletion(true)) return; } }
MultipleOutputs能夠令一個Reducer產生多個輸出文件。
爲特定kv打上指定標記
MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context); mos.write("flag", key, value); /** 爲指定標記內容指定輸出方式 能夠指定多個 */ MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);
改造上面的wordcount案例,將首字母爲a-j的輸出到"small"中。其餘輸出到"big"中。
代碼以下:
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words [] = line.split(" "); for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { MultipleOutputs<Text,LongWritable> mos = null; @Override protected void setup(Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { this.mos = new MultipleOutputs<Text,LongWritable>(context); } public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { Iterator<LongWritable> it = values.iterator(); long count = 0; while(it.hasNext()){ long c = it.next().get(); count += c; } if(key.toString().matches("[a-j][a-z]*")){ mos.write("small", key, new LongWritable(count)); }else{ mos.write("big", key, new LongWritable(count)); } } }
public class WCDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WCJob"); job.setJarByClass(cn.tedu.wc.WCDriver.class); job.setMapperClass(cn.tedu.wc.WCMapper.class); job.setReducerClass(cn.tedu.wc.WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); MultipleOutputs.addNamedOutput(job, "small", MyFileOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "big", MyFileOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.242.101:9000/park/words.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.242.101:9000/park/result2")); if (!job.waitForCompletion(true)) return; } }
在reduce階段進行,對mapper發送過來的數據會進行分組的操做,這個過程稱爲爲Grouping。默認狀況下會將k2相同的內容做爲一組。
咱們能夠經過job.setGroupingComparatorClass(MyGroupingComparator.class);方法本身指定Grouping規則。
改造WordCount案例,實現統計a-h 和 i-z開頭的單詞數量統計。
public class WCComparator extends WritableComparator { @Override public int compare(byte[] b1, int begin1, int len1, byte[] b2, int begin2, int len2) { try { DataInput in = new DataInputStream(new ByteArrayInputStream(b1,begin1,len1)); Text ta = new Text(); ta.readFields(in); in = new DataInputStream(new ByteArrayInputStream(b2,begin2,len2)); Text tb = new Text(); tb.readFields(in); if(ta.toString().matches("^[a-n][a-z]*$") && tb.toString().matches("^[a-n][a-z]*$")){ return 0; }else if(ta.toString().matches("^[o-z][a-z]*$") && tb.toString().matches("^[o-z][a-z]*$")){ return 0; }else{ return 1; } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } }
所謂二次排序就是首先按照第一字段排序,而後再對第一字段相同的行按照第二字段排序,注意不能破壞第一個字段的排序順序。
/** * 開發bean封裝數據 */ public class NumBean implements WritableComparable<NumBean> { private int lnum; private int rnum; public NumBean() { } public NumBean(int lnum, int rnum) { this.lnum = lnum; this.rnum = rnum; } public int getLnum() { return lnum; } public void setLnum(int lnum) { this.lnum = lnum; } public int getRnum() { return rnum; } public void setRnum(int rnum) { this.rnum = rnum; } @Override public void readFields(DataInput in) throws IOException { this.lnum = in.readInt(); this.rnum = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(lnum); out.writeInt(rnum); } @Override public int compareTo(NumBean o) { if(this.lnum != o.getLnum()){ return this.lnum - o.getLnum(); }else{ return this.rnum - o.getRnum(); } } }
/** * 開發Mapper 用 NumBean做爲k2 因爲 NumBean覆蓋了compareTo方法 能夠實現自動二次排序 */ public class NumMapper extends Mapper<LongWritable, Text, NumBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NumBean, NullWritable>.Context context) throws IOException, InterruptedException { //1.讀取行 String line = value.toString(); //2.切分出lnum 和 rnum String [] attrs = line.split(" "); int lnum = Integer.parseInt(attrs[0]); int rnum = Integer.parseInt(attrs[1]); //3.封裝數據到bean NumBean nb = new NumBean(lnum,rnum); //4.發送數據 context.write(nb, NullWritable.get()); } }
/** * 開發Reducer輸出結果 shuffle階段已經完成了二次排序 此處直接輸出便可 */ public class NumReducer extends Reducer<NumBean, NullWritable, IntWritable, IntWritable> { @Override protected void reduce(NumBean key, Iterable<NullWritable> values, Reducer<NumBean, NullWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { int lnum = key.getLnum(); int rnum = key.getRnum(); context.write(new IntWritable(lnum), new IntWritable(rnum)); } }
/** * 爲了防止重複 數據被grouping成一條數據 形成結果丟失 自定義gourping過程 固定返回-1 表示不管什麼狀況都不合並數據 */ public class NumWritableComparator extends WritableComparator { @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return -1; } }
/** * 開發Driver組裝程序 */ public class NumDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "num_job"); job.setJarByClass(cn.tedu.mr.grouping.num.NumDriver.class); job.setMapperClass(cn.tedu.mr.grouping.num.NumMapper.class); job.setMapOutputKeyClass(NumBean.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(cn.tedu.mr.grouping.num.NumReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setGroupingComparatorClass(NumWritableComparator.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/ndata")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/nresult")); if (!job.waitForCompletion(true)) return; } }
在開發MR程序時,可能遇到的數據分配不一致,形成程序性能降低的問題,這個問題稱之爲數據傾斜問題。
解決辦法:
若是是由於shuffle分配數據不均勻形成數據傾斜,重寫parition均勻分配數據便可。
若是是數據自己帶有傾斜的特色沒法經過修改parition來解決傾斜問題,那麼能夠經過如下幾個方法嘗試解決:
1.利用combiner減輕傾斜的狀況。
2.將形成傾斜的數據拿出單獨處理。
3.將一個mr拆分紅多個mr下降傾斜形成的危害。
每一個小文件不管多小都會對應一個block,而每個block在NameNode中都要有元數據的記錄,若是存在大量小文件,則NameNode中的大量空間都用來存放這些小文件的元數據信息,實際上是至關浪費的,對於NameNode的性能有比較大的影響。
當使用mapreduce處理大量小文件時,默認狀況下mapreduce在進行切片操做時規則是和block切的規則同樣,即一個block一個InputSplit,而一個InputSplit就對應一個Mapper,這樣會形成開啓大量的MapperTask,可是每一個MapperTask處理的數據量都頗有限。極端狀況下開啓大量Mapper耗費內存甚至可能形成程序的崩潰。
Hadoop Archive或者HAR,是一個高效地將小文件放入HDFS塊中的文件存檔工具,它可以將多個小文件打包成一個HAR文件,這樣在減小namenode內存使用的同時,仍然容許對文件進行透明的訪問。
HAR是在Hadoop file system之上的一個文件系統,所以全部fs shell命令對HAR文件都可用,只不過是文件路徑格式不同,HAR的訪問路徑能夠是如下兩種格式:
har://scheme-hostname:port/archivepath/fileinarchive
har:///archivepath/fileinarchive(本節點)
第一,對小文件進行存檔後,原文件並不會自動被刪除,須要用戶本身刪除;
第二,建立HAR文件的過程其實是在運行一個mapreduce做業,於是須要有一個hadoop集羣運行此命令。
第一,一旦建立,Archives便不可改變。要增長或移除裏面的文件,必須從新建立歸檔文件。
第二,要歸檔的文件名中不能有空格,不然會拋出異常,能夠將空格用其餘符號替換(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement參數)。
hadoop archive -archiveName <NAME>.har -p <parent path>[-r <replication factor>]<src>* <dest>
案例:將hdfs:///small中的內容壓縮成small.har
將某個文件打成har:
hadoop archive -archiveName small.har -p /small/small1.txt /small
將多個small開頭的文件打成har:
hadoop archive -archiveName small.har -p /small/small* /small
將某個文件夾下全部文件打成har:
hadoop archive -archiveName small.har -p /small /small
查看HAR文件存檔中的文件
hadoop fs -ls har:///small/small.har
輸出har文件內容到本地系統
hadoop fs -get har:///small/small.har/smallx
SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。目前,也有很多人在該文件的基礎之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進行合併成一個大文件,同時對這些小文件的位置信息構建索引。
文件不支持複寫操做,不能向已存在的SequenceFile(MapFile)追加存儲記錄。
當write流不關閉的時候,沒有辦法構造read流。也就是在執行文件寫操做的時候,該文件是不可讀取的。
@Test /** * SequenceFile 寫操做 */ public void SequenceWriter() throws Exception{ final String INPUT_PATH= "hdfs://192.168.242.101:9000/big"; final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big2"; //獲取文件系統 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); //建立seq的輸出流 Text key = new Text(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path(OUTPUT_PATH), key.getClass(), value.getClass()); //寫新的數據 System.out.println(writer.getLength()); key.set("small4.txt".getBytes()); value.set("ddddddd".getBytes()); writer.append(key, value); //關閉流 IOUtils.closeStream(writer); } @Test /** * SequenceFile 讀操做 */ public void sequenceRead() throws Exception { final String INPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq"; //獲取文件系統 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); //準備讀取seq的流 Path path = new Path(INPUT_PATH); SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path, conf); //經過seq流得到key和value準備承載數據 Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); //循環從流中讀取key和value long position = reader.getPosition(); while(reader.next(key, value)){ //打印當前key value System.out.println(key+":"+value); //移動遊標指向下一個key value position=reader.getPosition(); } //關閉流 IOUtils.closeStream(reader); } @Test /** * 多個小文件合併成大seq文件 * @throws Exception */ public void small2Big() throws Exception{ final String INPUT_PATH= "hdfs://192.168.242.101:9000/small"; final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq"; //獲取文件系統 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000"); FileSystem fs = FileSystem.get(conf); //經過文件系統獲取全部要處理的文件 FileStatus[] files = fs.listStatus(new Path(INPUT_PATH)); //建立能夠輸出seq文件的輸出流 Text key = new Text(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(OUTPUT_PATH), key.getClass(),value.getClass()); //循環處理每一個文件 for (int i = 0; i < files.length; i++) { //key設置爲文件名 key.set(files[i].getPath().getName()); //讀取文件內容 InputStream in = fs.open(files[i].getPath()); byte[] buffer = new byte[(int) files[i].getLen()]; IOUtils.readFully(in, buffer, 0, buffer.length); //值設置爲文件內容 value.set(buffer); //關閉輸入流 IOUtils.closeStream(in); //將key文件名value文件內容寫入seq流中 writer.append(key, value); } //關閉seq流 IOUtils.closeStream(writer); }
用於多個數據源的join。
此類能夠解決多個小文件在進行mr操做時map建立過多的問題。
此類的原理在於,它本質上市一個InputFormat,在其中的getSplits方法中,將他能讀到的全部的文件生成一個InputSplit
使用此類須要配合自定義的RecordReader,須要本身開發一個RecordReader指定如何從InputSplit中讀取數據。
也能夠經過參數控制最大的InputSplit大小。
CombineTextInputFormat.setMaxInputSplitSize(job, 256*1024*1024);