MultipleOutputs 類能夠將數據寫到多個文件,這些文件的名稱源於輸出的鍵和值或者任意字符串。這容許每一個 reducer(或者只有 map 做業的 mapper)建立多個文件。 採用name-m-nnnnn 形式的文件名用於 map 輸出,name-r-nnnnn 形式的文件名用於 reduce 輸出,其中 name 是由程序設定的任意名字, nnnnn 是一個指明塊號的整數(從 0 開始)。塊號保證從不一樣塊(mapper 或 reducer)輸出在相同名字狀況下不會衝突java
一、項目需求git
假如這裏有一份郵箱數據文件,咱們指望統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下。redis
二、數據集apache
wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.comapp
三、實現ide
1 package com.buaa; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22 23 /** 24 * @ProjectName MultipleOutputsDemo 25 * @PackageName com.buaa 26 * @ClassName EmailMultipleOutputsDemo 27 * @Description 統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下 28 * @Author 劉吉超 29 * @Date 2016-05-02 15:25:18 30 */ 31 public class EmailMultipleOutputsDemo extends Configured implements Tool { 32 33 public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 34 private final static IntWritable one = new IntWritable(1); 35 36 @Override 37 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 38 context.write(value, one); 39 } 40 } 41 42 public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 43 private MultipleOutputs<Text, IntWritable> multipleOutputs; 44 45 @Override 46 protected void setup(Context context) throws IOException ,InterruptedException{ 47 multipleOutputs = new MultipleOutputs< Text, IntWritable>(context); 48 } 49 50 protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException { 51 // 開始位置 52 int begin = Key.toString().indexOf("@"); 53 // 結束位置 54 int end = Key.toString().indexOf("."); 55 56 if(begin >= end){ 57 return; 58 } 59 60 // 獲取郵箱類別,好比 qq 61 String name = Key.toString().substring(begin+1, end); 62 63 int sum = 0; 64 for (IntWritable value : Values) { 65 sum += value.get(); 66 } 67 68 /* 69 * multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數代表了該輸出所在的目錄(相對於用戶指定的輸出目錄)。 70 * 若是baseOutputPath不包含文件分隔符"/",那麼輸出的文件格式爲baseOutputPath-r-nnnnn(name-r-nnnnn); 71 * 若是包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那麼輸出文件則爲029070-99999/1901/part-r-nnnnn 72 */ 73 multipleOutputs.write(Key, new IntWritable(sum), name); 74 } 75 76 @Override 77 protected void cleanup(Context context) throws IOException ,InterruptedException{ 78 multipleOutputs.close(); 79 } 80 } 81 82 @SuppressWarnings("deprecation") 83 @Override 84 public int run(String[] args) throws Exception { 85 // 讀取配置文件 86 Configuration conf = new Configuration(); 87 88 // 判斷目錄是否存在,若是存在,則刪除 89 Path mypath = new Path(args[1]); 90 FileSystem hdfs = mypath.getFileSystem(conf); 91 if (hdfs.isDirectory(mypath)) { 92 hdfs.delete(mypath, true); 93 } 94 95 // 新建一個任務 96 Job job = new Job(conf, "MultipleDemo"); 97 // 主類 98 job.setJarByClass(EmailMultipleOutputsDemo.class); 99 100 // 輸入路徑 101 FileInputFormat.addInputPath(job, new Path(args[0])); 102 // 輸出路徑 103 FileOutputFormat.setOutputPath(job, new Path(args[1])); 104 105 // Mapper 106 job.setMapperClass(EmailMapper.class); 107 // Reducer 108 job.setReducerClass(EmailReducer.class); 109 110 // key輸出類型 111 job.setOutputKeyClass(Text.class); 112 // value輸出類型 113 job.setOutputValueClass(IntWritable.class); 114 115 // 去掉job設置outputFormatClass,改成經過LazyOutputFormat設置 116 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 117 118 return job.waitForCompletion(true)?0:1; 119 } 120 121 public static void main(String[] args0) throws Exception { 122 // 數據輸入路徑和輸出路徑 123 // String[] args0 = { 124 // "hdfs://ljc:9000/buaa/email/email.txt", 125 // "hdfs://ljc:9000/buaa/email/out/" 126 // }; 127 int ec = ToolRunner.run(new Configuration(), new EmailMultipleOutputsDemo(), args0); 128 System.exit(ec); 129 } 130 }
四、運行效果函數
五、注意事項oop
一、在reducer中調用時,要調用MultipleOutputs如下接口url
1 public void write(KEYOUT key,VALUEOUT value, String baseOutputPath) throws IOException,InterruptedException spa
若是調用
1 public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException
則須要在job中,預先聲明named output(以下),否則會報錯:named output xxx not defined:
1 MultipleOutputs.addNamedOutput(job, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class); 2 MultipleOutputs.addNamedOutput(job, "maoxiandao", TextOutputFormat.class, Text.class, Text.class); 3 MultipleOutputs.addNamedOutput(job, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class);
2. 默認狀況下,輸出目錄會生成part-r-00000或者part-m-00000的空文件,須要以下設置後,纔不會生成
// job.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
就是去掉job設置outputFormatClass,改成經過LazyOutputFormat設置
3. multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數代表了該輸出所在的目錄(相對於用戶指定的輸出目錄)。若是baseOutputPath不包含文件分隔符「/」,那麼輸出的文件格式爲baseOutputPath-r-nnnnn(name-r-nnnnn);若是包含文件分隔符「/」,例如baseOutputPath=「029070-99999/1901/part」,那麼輸出文件則爲