MR案例:輸出/輸入SequenceFile

SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。在SequenceFile文件中,每個key-value對被看作是一條記錄(Record),基於Record的壓縮策略,SequenceFile文件支持三種壓縮類型:java

NONE: 對records不進行壓縮; (組合1)apache

RECORD: 僅壓縮每個record中的value值(不包括key); (組合2)app

BLOCK: 將一個block中的全部records(包括key)壓縮在一塊兒;(組合3)ide

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class Test0829 {

    public static void main(String[] args) throws Exception {        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Test0829.class);

        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VLongWritable.class);        

        // 設置輸出類
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        /**
         * 設置sequecnfile的格式,對於sequencefile的輸出格式,有多種組合方式,
         * 從下面的模式中選擇一種,並將其他的註釋掉
         */

// 組合方式1:不壓縮模式 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.NONE); //組合方式2:record壓縮模式,並指定採用的壓縮方式 :默認、gzip壓縮等 // SequenceFileOutputFormat.setOutputCompressionType(job, // CompressionType.RECORD); // SequenceFileOutputFormat.setOutputCompressorClass(job, // DefaultCodec.class); //組合方式3:block壓縮模式,並指定採用的壓縮方式 :默認、gzip壓縮等 // SequenceFileOutputFormat.setOutputCompressionType(job, // CompressionType.BLOCK); // SequenceFileOutputFormat.setOutputCompressorClass(job, // DefaultCodec.class); FileInputFormat.addInputPaths(job, args[0]); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } //map public static class WCMapper extends Mapper<LongWritable, Text, Text, VLongWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":",2); if(split.length!=1){ String[] splited = split[1].split(","); for(String s : splited){ context.write(new Text(s), new VLongWritable(1L)); } } } } //reduce public static class WCReducer extends Reducer<Text, VLongWritable, Text, VLongWritable>{ @Override protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context) throws IOException, InterruptedException { long sum=0; for(VLongWritable vl : v2s){ sum += vl.get(); } context.write(key, new VLongWritable(sum)); } } }

MR輸入SequenceFile函數

當輸入文件格式是SequenceFile的時候,要使用SequenceFileInputformat類。因爲SequenceFile都是以key和value的二進制形式存放的(注意hadoop類型的二進制的解釋方式和原始二進制不同,會多一些維護信息),因此在讀取SequenceFile文件時必須預先知道key和value對應的hadoop類型oop

對於上面代碼產生的SequenceFile結果文件,以SequenceFileInputformat類進行讀取。其中key爲Text類型,value爲VLongWritable類型。spa

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SFInput02 {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SFinput.class);

        job.setMapperClass(SFMapper.class);
        job.setReducerClass(SFReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(VLongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(VLongWritable.class);

        job.setInputFormatClass(SequenceFileInputFormat.class);

        SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
    public static class SFMapper extends Mapper<Text, VLongWritable,Text, VLongWritable> {
        public void map(Text key, VLongWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }

    }
    //reduce
    public static class SFReducer extends Reducer<Text, VLongWritable,Text, VLongWritable>{
        @Override
        protected void reduce(Text key, Iterable<VLongWritable> v2s,Context context)
                throws IOException, InterruptedException {
            for(VLongWritable vl : v2s){
                context.write(key, vl);
            }
        }
    }
}

如若不清楚SequenceFile文件中key和value的類型,能夠使用SequenceFileAsTextInputFormat。它將SequenceFile的key和value都轉化成Text對象傳入map中。  設計

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SFinput {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SFinput.class);

        job.setMapperClass(SFMapper.class);
        job.setReducerClass(SFReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(SequenceFileAsTextInputFormat.class);

        SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
    public static class SFMapper extends Mapper<Text, Text,Text, Text> {
        public void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }

    }
    //reduce
    public static class SFReducer extends Reducer<Text, Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> v2s,Context context)
                throws IOException, InterruptedException {
            for(Text text : v2s){
                context.write(key, text);
            }
        }
    }
}

最後還有一種sequencefileAsBinaryInputFormat 類,它將SequenceFile中的key和value都以原始二進制的形式封裝在byteswritable對象中傳給map,如何對二進制數據進行解釋是map函數編寫者的工做。code

相關文章
相關標籤/搜索