經過MultipleOutputs寫到多個文件

  MultipleOutputs 類能夠將數據寫到多個文件,這些文件的名稱源於輸出的鍵和值或者任意字符串。這容許每一個 reducer(或者只有 map 做業的 mapper)建立多個文件。 採用name-m-nnnnn 形式的文件名用於 map 輸出,name-r-nnnnn 形式的文件名用於 reduce 輸出,其中 name 是由程序設定的任意名字, nnnnn 是一個指明塊號的整數(從 0 開始)。塊號保證從不一樣塊(mapper 或 reducer)輸出在相同名字狀況下不會衝突java

一、項目需求git

  假如這裏有一份郵箱數據文件,咱們指望統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下。redis

二、數據集apache

  wolys@21cn.com
  zss1984@126.com
  294522652@qq.com
  simulateboy@163.com
  zhoushigang_123@163.com
  sirenxing424@126.com
  lixinyu23@qq.com
  chenlei1201@gmail.com
  370433835@qq.com
  cxx0409@126.com
  viv093@sina.com
  q62148830@163.com
  65993266@qq.com
  summeredison@sohu.com
  zhangbao-autumn@163.com
  diduo_007@yahoo.com.cn
  fxh852@163.com
app

三、實現ide

  1 package com.buaa;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 import org.apache.hadoop.util.Tool;
 21 import org.apache.hadoop.util.ToolRunner;
 22 
 23 /** 
 24 * @ProjectName MultipleOutputsDemo
 25 * @PackageName com.buaa
 26 * @ClassName EmailMultipleOutputsDemo
 27 * @Description 統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下
 28 * @Author 劉吉超
 29 * @Date 2016-05-02 15:25:18
 30 */
 31 public class EmailMultipleOutputsDemo extends Configured implements Tool {
 32     
 33     public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
 34         private final static IntWritable one = new IntWritable(1);
 35 
 36         @Override
 37         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 38             context.write(value, one);
 39         }
 40     }
 41     
 42     public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 43         private MultipleOutputs<Text, IntWritable> multipleOutputs;
 44         
 45         @Override
 46         protected void setup(Context context) throws IOException ,InterruptedException{
 47             multipleOutputs = new MultipleOutputs< Text, IntWritable>(context);
 48         }
 49         
 50         protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
 51             // 開始位置
 52             int begin = Key.toString().indexOf("@");
 53             // 結束位置
 54             int end = Key.toString().indexOf(".");
 55             
 56             if(begin >= end){
 57                 return;
 58             }
 59             
 60             // 獲取郵箱類別,好比 qq
 61             String name = Key.toString().substring(begin+1, end);
 62             
 63             int sum = 0;
 64             for (IntWritable value : Values) {
 65                 sum += value.get();
 66             }
 67             
 68             /*
 69              * multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數代表了該輸出所在的目錄(相對於用戶指定的輸出目錄)。
 70              * 若是baseOutputPath不包含文件分隔符"/",那麼輸出的文件格式爲baseOutputPath-r-nnnnn(name-r-nnnnn);
 71              * 若是包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那麼輸出文件則爲029070-99999/1901/part-r-nnnnn
 72              */
 73             multipleOutputs.write(Key, new IntWritable(sum), name);
 74         }
 75         
 76         @Override
 77         protected void cleanup(Context context) throws IOException ,InterruptedException{
 78             multipleOutputs.close();
 79         }
 80     }
 81     
 82     @SuppressWarnings("deprecation")
 83     @Override
 84     public int run(String[] args) throws Exception {
 85         // 讀取配置文件
 86         Configuration conf = new Configuration();
 87         
 88         // 判斷目錄是否存在,若是存在,則刪除
 89         Path mypath = new Path(args[1]);
 90         FileSystem hdfs = mypath.getFileSystem(conf);
 91         if (hdfs.isDirectory(mypath)) {
 92             hdfs.delete(mypath, true);
 93         }
 94         
 95         // 新建一個任務
 96         Job job = new Job(conf, "MultipleDemo");  
 97         // 主類
 98         job.setJarByClass(EmailMultipleOutputsDemo.class);
 99         
100         // 輸入路徑
101         FileInputFormat.addInputPath(job, new Path(args[0]));
102         // 輸出路徑
103         FileOutputFormat.setOutputPath(job, new Path(args[1]));
104         
105         // Mapper
106         job.setMapperClass(EmailMapper.class);
107         // Reducer
108         job.setReducerClass(EmailReducer.class);
109         
110         // key輸出類型
111         job.setOutputKeyClass(Text.class);
112         // value輸出類型
113         job.setOutputValueClass(IntWritable.class);
114         
115         // 去掉job設置outputFormatClass,改成經過LazyOutputFormat設置
116         LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
117         
118         return job.waitForCompletion(true)?0:1;
119     }
120     
121     public static void main(String[] args0) throws Exception {
122         // 數據輸入路徑和輸出路徑  
123 //        String[] args0 = {  
124 //                "hdfs://ljc:9000/buaa/email/email.txt",  
125 //                "hdfs://ljc:9000/buaa/email/out/"  
126 //        }; 
127         int ec = ToolRunner.run(new Configuration(), new EmailMultipleOutputsDemo(), args0);
128         System.exit(ec);
129     }
130 }

四、運行效果函數

運行效果(EmailMultipleOutputsDemo)

五、注意事項oop

  一、在reducer中調用時,要調用MultipleOutputs如下接口url

1 public void write(KEYOUT key,VALUEOUT value, String baseOutputPath) throws IOException,InterruptedException spa

  若是調用

1 public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException 

  則須要在job中,預先聲明named output(以下),否則會報錯:named output xxx not defined:

1 MultipleOutputs.addNamedOutput(job, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class);
2 MultipleOutputs.addNamedOutput(job, "maoxiandao", TextOutputFormat.class, Text.class, Text.class);
3 MultipleOutputs.addNamedOutput(job, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class);

2. 默認狀況下,輸出目錄會生成part-r-00000或者part-m-00000的空文件,須要以下設置後,纔不會生成

// job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

  就是去掉job設置outputFormatClass,改成經過LazyOutputFormat設置
3. multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數代表了該輸出所在的目錄(相對於用戶指定的輸出目錄)。若是baseOutputPath不包含文件分隔符「/」,那麼輸出的文件格式爲baseOutputPath-r-nnnnn(name-r-nnnnn);若是包含文件分隔符「/」,例如baseOutputPath=「029070-99999/1901/part」,那麼輸出文件則爲

  ajNCh3H

若是,您認爲閱讀這篇博客讓您有些收穫,不妨點擊一下右下角的【推薦】。
若是,您但願更容易地發現個人新博客,不妨點擊一下左下角的【關注我】。
若是,您對個人博客所講述的內容有興趣,請繼續關注個人後續博客,我是【劉超★ljc】。

本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。

實現代碼及數據:下載

相關文章
相關標籤/搜索