針對前面介紹的輸入格式,Hadoop 都有相應的輸出格式。默認狀況下只有一個 Reduce,輸出只有一個文件,默認文件名爲 part-r-00000,輸出文件的個數與 Reduce 的個數一致。 若是有兩個Reduce,輸出結果就有兩個文件,第一個爲part-r-00000,第二個爲part-r-00001,依次類推。html
OutputFormat主要用於描述輸出數據的格式,它可以將用戶提供的key/value對寫入特定格式的文件中。 經過OutputFormat 接口,實現具體的輸出格式,過程有些複雜也沒有這個必要。Hadoop 自帶了不少 OutputFormat 的實現,它們與InputFormat實現相對應,足夠知足咱們業務的須要。 OutputFormat 類的層次結構以下圖所示。java
OutputFormat 是 MapReduce 輸出的基類,全部實現 MapReduce 輸出都實現了 OutputFormat 接口。 咱們能夠把這些實現接口類分爲如下幾種類型,分別一一介紹。jquery
默認的輸出格式是 TextOutputFormat,它把每條記錄寫爲文本行。它的鍵和值能夠是任意類型,由於 TextOutputFormat 調用 toString() 方法把它們轉換爲字符串。 每一個鍵/值對由製表符進行分割,固然也能夠設定 mapreduce.output.textoutputformat.separator 屬性(舊版本 API 中的 mapred.textoutputformat.separator)改變默認的分隔符。 與 TextOutputFormat 對應的輸入格式是 KeyValueTextInputFormat,它經過可配置的分隔符將鍵/值對文本分割。redis
可使用 NullWritable 來省略輸出的鍵或值(或二者都省略,至關於 NullOutputFormat 輸出格式,後者什麼也不輸出)。 這也會致使無分隔符輸出,以使輸出適合用 TextInputFormat 讀取。sql
一、關於SequenceFileOutputFormat數據庫
顧名思義,SequenceFileOutputFormat 將它的輸出寫爲一個順序文件。若是輸出須要做爲後續 MapReduce 任務的輸入,這即是一種好的輸出格式, 由於它的格式緊湊,很容易被壓縮。apache
二、關於SequenceFileAsBinaryOutputFormatapp
SequenceFileAsBinaryOutputFormat 把鍵/值對做爲二進制格式寫到一個 SequenceFile 容器中。ide
三、關於MapFileOutputFormatoop
MapFileOutputFormat 把 MapFile 做爲輸出。MapFile 中的鍵必須順序添加,因此必須確保 reducer 輸出的鍵已經排好序。
上面咱們提到,默認狀況下只有一個 Reduce,輸出只有一個文件。有時可能須要對輸出的文件名進行控制或讓每一個 reducer 輸出多個文件。 咱們有兩種方式實現reducer輸出多個文件。
一、Partitioner
咱們考慮這樣一個需求:按學生的年齡段,將數據輸出到不一樣的文件路徑下。這裏咱們分爲三個年齡段:小於等於20歲、大於20歲小於等於50歲和大於50歲。
咱們採用的方法是每一個年齡段對應一個 reducer。爲此,咱們須要經過如下兩步實現。
第一步:把做業的 reducer 數設爲年齡段數即爲3。
job.setPartitionerClass(PCPartitioner.class);//設置Partitioner類 job.setNumReduceTasks(3);// reduce個數設置爲3
第二步:寫一個 Partitioner,把同一個年齡段的數據放到同一個分區。
public static class PCPartitioner extends Partitioner< Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { // TODO Auto-generated method stub String[] nameAgeScore = value.toString().split("\t"); String age = nameAgeScore[1];//學生年齡 int ageInt = Integer.parseInt(age);//按年齡段分區 // 默認指定分區 0 if (numReduceTasks == 0) return 0; //年齡小於等於20,指定分區0 if (ageInt <= 20) { return 0; } // 年齡大於20,小於等於50,指定分區1 if (ageInt > 20 && ageInt <= 50) { return 1 % numReduceTasks; } // 剩餘年齡,指定分區2 else return 2 % numReduceTasks; } }
這種方法實現多文件輸出,也只能知足此種需求。不少狀況下是沒法實現的,由於這樣作存在兩個缺點。
第一,須要在做業運行以前須要知道分區數和年齡段的個數,若是分區數很大或者未知,就沒法操做。
第二,通常來講,讓應用程序來嚴格限定分區數並很差,由於可能致使分區數少或分區不均。
二、MultipleOutputs 類
MultipleOutputs 類能夠將數據寫到多個文件,這些文件的名稱源於輸出的鍵和值或者任意字符串。這容許每一個 reducer(或者只有 map 做業的 mapper)建立多個文件。 採用name-m-nnnnn 形式的文件名用於 map 輸出,name-r-nnnnn 形式的文件名用於 reduce 輸出,其中 name 是由程序設定的任意名字, nnnnn 是一個指明塊號的整數(從 0 開始)。塊號保證從不一樣塊(mapper 或 reducer)寫的輸出在相同名字狀況下不會衝突。
假如這裏有一份郵箱數據文件,咱們指望統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下。數據集示例以下所示。
wolys@21cn.com zss1984@126.com 294522652@qq.com simulateboy@163.com zhoushigang_123@163.com sirenxing424@126.com lixinyu23@qq.com chenlei1201@gmail.com 370433835@qq.com cxx0409@126.com viv093@sina.com q62148830@163.com 65993266@qq.com summeredison@sohu.com zhangbao-autumn@163.com diduo_007@yahoo.com.cn fxh852@163.com
下面咱們編寫 MapReduce 程序,實現上述業務需求。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Email extends Configured implements Tool { public static class MailMapper extends Mapper< LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, one); } } public static class MailReducer extends Reducer< Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); private MultipleOutputs< Text, IntWritable> multipleOutputs; @Override protected void setup(Context context) throws IOException ,InterruptedException{ multipleOutputs = new MultipleOutputs< Text, IntWritable>(context); } protected void reduce(Text Key, Iterable< IntWritable> Values,Context context) throws IOException, InterruptedException { int begin = Key.toString().indexOf("@"); int end = Key.toString().indexOf("."); if(begin>=end){ return; } //獲取郵箱類別,好比 qq String name = Key.toString().substring(begin+1, end); int sum = 0; for (IntWritable value : Values) { sum += value.get(); } result.set(sum); multipleOutputs.write(Key, result, name); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException{ multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration();// 讀取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf);//建立輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance();// 新建一個任務 job.setJarByClass(Email.class);// 主類 FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑 job.setMapperClass(MailMapper.class);// Mapper job.setReducerClass(MailReducer.class);// Reducer job.setOutputKeyClass(Text.class);// key輸出類型 job.setOutputValueClass(IntWritable.class);// value輸出類型 job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://single.hadoop.dajiangtai.com:9000/junior/mail.txt", "hdfs://single.hadoop.dajiangtai.com:9000/junior/mail-out/" }; int ec = ToolRunner.run(new Configuration(), new Email(), args0); System.exit(ec); } }
在 reducer 中,在 setup() 方法中構造一個 MultipleOutputs 的實例並將它賦給一個實例變量。在 reduce() 方法中使用 MultipleOutputs 實例來寫輸出, 而不是 context 。write() 方法做用於鍵、值、和名字。
程序運行以後,輸出文件的命名以下所示。
/mail-out/163-r-00000 /mail-out/126-r-00000 /mail-out/21cn-r-00000 /mail-out/gmail-r-00000 /mail-out/qq-r-00000 /mail-out/sina-r-00000 /mail-out/sohu-r-00000 /mail-out/yahoo-r-00000 /mail-out/part-r-00000
在 MultipleOutputs 的 write() 方法中指定的基本路徑至關於輸出路徑進行解釋,由於它能夠包含文件路徑分隔符(/), 建立任意深度的子目錄是有可能的。
DBOutputFormat 適用於將做業輸出數據(中等規模的數據)轉存到Mysql、Oracle等數據庫。