MapReduce的輸出格式

MapReduce的輸出格式


        針對前面介紹的輸入格式,Hadoop 都有相應的輸出格式。默認狀況下只有一個 Reduce,輸出只有一個文件,默認文件名爲 part-r-00000,輸出文件的個數與 Reduce 的個數一致。 若是有兩個Reduce,輸出結果就有兩個文件,第一個爲part-r-00000,第二個爲part-r-00001,依次類推。html


OutputFormat 接口

        OutputFormat主要用於描述輸出數據的格式,它可以將用戶提供的key/value對寫入特定格式的文件中。 經過OutputFormat 接口,實現具體的輸出格式,過程有些複雜也沒有這個必要。Hadoop 自帶了不少 OutputFormat 的實現,它們與InputFormat實現相對應,足夠知足咱們業務的須要。 OutputFormat 類的層次結構以下圖所示。java

OutputFormat 類的層次結構

        OutputFormat 是 MapReduce 輸出的基類,全部實現 MapReduce 輸出都實現了 OutputFormat 接口。 咱們能夠把這些實現接口類分爲如下幾種類型,分別一一介紹。jquery

文本輸出

        默認的輸出格式是 TextOutputFormat,它把每條記錄寫爲文本行。它的鍵和值能夠是任意類型,由於 TextOutputFormat 調用 toString() 方法把它們轉換爲字符串。 每一個鍵/值對由製表符進行分割,固然也能夠設定 mapreduce.output.textoutputformat.separator 屬性(舊版本 API 中的 mapred.textoutputformat.separator)改變默認的分隔符。 與 TextOutputFormat 對應的輸入格式是 KeyValueTextInputFormat,它經過可配置的分隔符將鍵/值對文本分割。redis

        可使用 NullWritable 來省略輸出的鍵或值(或二者都省略,至關於 NullOutputFormat 輸出格式,後者什麼也不輸出)。 這也會致使無分隔符輸出,以使輸出適合用 TextInputFormat 讀取。sql

二進制輸出

        一、關於SequenceFileOutputFormat數據庫

        顧名思義,SequenceFileOutputFormat 將它的輸出寫爲一個順序文件。若是輸出須要做爲後續 MapReduce 任務的輸入,這即是一種好的輸出格式, 由於它的格式緊湊,很容易被壓縮。apache

        二、關於SequenceFileAsBinaryOutputFormatapp

        SequenceFileAsBinaryOutputFormat 把鍵/值對做爲二進制格式寫到一個 SequenceFile 容器中。ide

        三、關於MapFileOutputFormatoop

        MapFileOutputFormat 把 MapFile 做爲輸出。MapFile 中的鍵必須順序添加,因此必須確保 reducer 輸出的鍵已經排好序。

多個輸出

        上面咱們提到,默認狀況下只有一個 Reduce,輸出只有一個文件。有時可能須要對輸出的文件名進行控制或讓每一個 reducer 輸出多個文件。 咱們有兩種方式實現reducer輸出多個文件。

        一、Partitioner

        咱們考慮這樣一個需求:按學生的年齡段,將數據輸出到不一樣的文件路徑下。這裏咱們分爲三個年齡段:小於等於20歲、大於20歲小於等於50歲和大於50歲。

        咱們採用的方法是每一個年齡段對應一個 reducer。爲此,咱們須要經過如下兩步實現。

        第一步:把做業的 reducer 數設爲年齡段數即爲3。

job.setPartitionerClass(PCPartitioner.class);//設置Partitioner類
job.setNumReduceTasks(3);// reduce個數設置爲3

        第二步:寫一個 Partitioner,把同一個年齡段的數據放到同一個分區。

public static class PCPartitioner extends Partitioner< Text, Text> {
		@Override
		public int getPartition(Text key, Text value, int numReduceTasks) {
			// TODO Auto-generated method stub
			String[] nameAgeScore = value.toString().split("\t");
			String age = nameAgeScore[1];//學生年齡
			int ageInt = Integer.parseInt(age);//按年齡段分區

			// 默認指定分區 0
			if (numReduceTasks == 0)
				return 0;

			//年齡小於等於20,指定分區0
			if (ageInt <= 20) {				return 0;
			}
			// 年齡大於20,小於等於50,指定分區1
			if (ageInt > 20 && ageInt <= 50) {				return 1 % numReduceTasks;
			}
			// 剩餘年齡,指定分區2
			else
				return 2 % numReduceTasks;
		}
}

        這種方法實現多文件輸出,也只能知足此種需求。不少狀況下是沒法實現的,由於這樣作存在兩個缺點。

        第一,須要在做業運行以前須要知道分區數和年齡段的個數,若是分區數很大或者未知,就沒法操做。

        第二,通常來講,讓應用程序來嚴格限定分區數並很差,由於可能致使分區數少或分區不均。

        二、MultipleOutputs 類

        MultipleOutputs 類能夠將數據寫到多個文件,這些文件的名稱源於輸出的鍵和值或者任意字符串。這容許每一個 reducer(或者只有 map 做業的 mapper)建立多個文件。 採用name-m-nnnnn 形式的文件名用於 map 輸出,name-r-nnnnn 形式的文件名用於 reduce 輸出,其中 name 是由程序設定的任意名字, nnnnn 是一個指明塊號的整數(從 0 開始)。塊號保證從不一樣塊(mapper 或 reducer)寫的輸出在相同名字狀況下不會衝突。

        假如這裏有一份郵箱數據文件,咱們指望統計郵箱出現次數並按照郵箱的類別,將這些郵箱分別輸出到不一樣文件路徑下。數據集示例以下所示。

wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.com

        下面咱們編寫 MapReduce 程序,實現上述業務需求。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Email extends Configured implements Tool {

	public static class MailMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			context.write(value, one);
		}
	}

	public static class MailReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		private MultipleOutputs< Text, IntWritable> multipleOutputs;

		@Override
		protected void setup(Context context) throws IOException ,InterruptedException{
			multipleOutputs = new MultipleOutputs< Text, IntWritable>(context);
		}
		protected void reduce(Text Key, Iterable< IntWritable> Values,Context context) throws IOException, InterruptedException {
			int begin = Key.toString().indexOf("@");
            int end = Key.toString().indexOf(".");
            if(begin>=end){
            	return;
            }
            //獲取郵箱類別,好比 qq
            String name = Key.toString().substring(begin+1, end);
			int sum = 0;
			for (IntWritable value : Values) {
				sum += value.get();
			}
			result.set(sum);
			multipleOutputs.write(Key, result, name);
		}
		@Override
		protected void cleanup(Context context) throws IOException ,InterruptedException{
			multipleOutputs.close();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();// 讀取配置文件
		
		Path mypath = new Path(args[1]);
		FileSystem hdfs = mypath.getFileSystem(conf);//建立輸出路徑
		if (hdfs.isDirectory(mypath)) {
			hdfs.delete(mypath, true);
		}
		Job job = Job.getInstance();// 新建一個任務
		job.setJarByClass(Email.class);// 主類
		
		FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑

		job.setMapperClass(MailMapper.class);// Mapper
		job.setReducerClass(MailReducer.class);// Reducer
		
		job.setOutputKeyClass(Text.class);// key輸出類型
		job.setOutputValueClass(IntWritable.class);// value輸出類型
		
		job.waitForCompletion(true);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		String[] args0 = {
				"hdfs://single.hadoop.dajiangtai.com:9000/junior/mail.txt",
				"hdfs://single.hadoop.dajiangtai.com:9000/junior/mail-out/" };
		int ec = ToolRunner.run(new Configuration(), new Email(), args0);
		System.exit(ec);
	}
}

        在 reducer 中,在 setup() 方法中構造一個 MultipleOutputs 的實例並將它賦給一個實例變量。在 reduce() 方法中使用 MultipleOutputs 實例來寫輸出, 而不是 context 。write() 方法做用於鍵、值、和名字。

        程序運行以後,輸出文件的命名以下所示。

/mail-out/163-r-00000
/mail-out/126-r-00000
/mail-out/21cn-r-00000
/mail-out/gmail-r-00000
/mail-out/qq-r-00000
/mail-out/sina-r-00000
/mail-out/sohu-r-00000
/mail-out/yahoo-r-00000
/mail-out/part-r-00000

        在 MultipleOutputs 的 write() 方法中指定的基本路徑至關於輸出路徑進行解釋,由於它能夠包含文件路徑分隔符(/), 建立任意深度的子目錄是有可能的。

數據庫輸出

        DBOutputFormat 適用於將做業輸出數據(中等規模的數據)轉存到Mysql、Oracle等數據庫。

相關文章
相關標籤/搜索