利用sequencefile處理小文件實例

重寫RecordReader:
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FSDataInputStream;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.ByteWritable;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.IOUtils;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {  
    private FileSplit fileSplit ;  
    private Configuration conf ;  
    private boolean processed = false ;  
    private BytesWritable value = new BytesWritable() ;  
    @Override  
    public void close() throws IOException {  
    }  
    @Override  
    public NullWritable getCurrentKey() throws IOException,  
            InterruptedException {  
        return NullWritable.get();  
    }  
    @Override  
    public BytesWritable getCurrentValue() throws IOException,  
            InterruptedException {  
        return value;  
    }  
    @Override  
    public float getProgress() throws IOException, InterruptedException {  
        return processed ? 1.0f : 0.0f;  
    }  
    @Override  
    public void initialize(InputSplit inputsplit,  
            TaskAttemptContext taskattemptcontext) throws IOException,  
            InterruptedException {  
        this.fileSplit = (FileSplit) inputsplit ;  
        this.conf = taskattemptcontext.getConfiguration();  
    }  
    @Override  
    public boolean nextKeyValue() throws IOException, InterruptedException {  
        if (!processed) {  
            byte[]  contents = new byte[(int)fileSplit.getLength()] ;  
            Path file = fileSplit.getPath();  
            FileSystem fs = file.getFileSystem(conf);  
            FSDataInputStream in = null ;  
              
            in = fs.open(file);  
            IOUtils.readFully(in, contents, 0, contents.length);  
            value.set(contents, 0, contents.length);  
            IOUtils.closeStream(in);  
            processed = true ;  
            return true ;  
        }  
        return false;  
    }   
  
}

 WholeFileInputFormat:只須要重載createRecordReader()和isSplitable()方法就行,代碼以下:java

import java.io.IOException;  
  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.ByteWritable;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{  
      
      
    @Override  
    protected boolean isSplitable(JobContext context, Path filename) {  
        return false ;  
    }  
  
    @Override  
    public RecordReader<NullWritable, BytesWritable> createRecordReader(  
            InputSplit inputsplit, TaskAttemptContext taskattemptcontext)  
            throws IOException, InterruptedException {  
        WholeFileRecordReader reader = new WholeFileRecordReader();  
        reader.initialize(inputsplit, taskattemptcontext);  
        return reader;  
    }  
      
}

 測試類:SmallFilesToSequenceFileConverter
web

import java.io.IOException;  
import java.util.Random;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  
public class SmallFilesToSequenceFileConverter {  
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {  
        private Text filenameKey;  
          
        @Override  
        protected void setup(Context context) throws IOException,  
        InterruptedException {  
            InputSplit split = context.getInputSplit();  
            Path path = ((FileSplit)split).getPath();  
            filenameKey = new Text(path.toString());  
        }  
  
        @Override  
        protected void map(NullWritable key, BytesWritable value,  
                Context context) throws IOException, InterruptedException {  
            context.write(filenameKey, value);  
        }  
  
    }  
      
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
        Configuration conf = new Configuration() ;  
        Job job = new Job(conf,"fdaf");  
          
        FileInputFormat.setInputPaths(job, new Path("D:/keywordzip-zip/testNull")) ;  
        FileOutputFormat.setOutputPath(job, new Path("D:/mapreduce-out/1AA" + new Random().nextInt(100))) ;  
        job.setJarByClass(SmallFilesToSequenceFileConverter.class);  
        job.setInputFormatClass(WholeFileInputFormat.class);  
        job.setOutputFormatClass(SequenceFileOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(BytesWritable.class);  
        job.setMapperClass(SequenceFileMapper.class);  
          
        job.waitForCompletion(true);  
    }  
}

 以上代碼能夠正確運行,輸出二進制文件apache

相關文章
相關標籤/搜索