Hadoop 小文件的處理

hadoop的HDFS和MapReduce自己都是用戶處理大量數據的大文件,對於小文件來講,因爲namenode會在記錄每一個block對象,若是存在大量的小文件,會佔用namenode的大量內存空間,並且HDFS存儲文件是按block來存儲,即便一個文件的大小不足一個block的大小,文件仍是會佔用一個block的存儲空間,因此大量的小文件會對HDFS的存儲和訪問都帶來不利的影響。 hadoop對於小文件的處理主要有Hadoop Archive,Sequence file和CombineFileInputFormat三種方式。 ##Hadoop Archive Hadoop Archive是hadoop的歸檔命令,能夠將hdfs上的小文件打包成一個har文件,這種方式雖然不會減小小文件佔用大量存儲空間的問題,可是會減小namenode的內存空間。同時har文件支持hdfs命令對其的訪問。html

命令:hadoop archive -archiveName 歸檔名稱 -p 父目錄 [-r <複製因子>] 原路徑(能夠多個) 目的路徑java

-archiveNames設置歸檔生成文件的名字node

-p 須要進行歸檔的文件的父目錄shell

例子:apache

$ hadoop fs -ls /user/test/yhj/input/
Found 3 items
-rw-r--r--   3 root hdfs        760 2018-07-04 11:48 /user/test/yhj/input/word1.txt
-rw-r--r--   3 root hdfs         82 2018-07-04 11:48 /user/test/yhj/input/word2.txt
-rw-r--r--   3 root hdfs       1738 2018-07-04 11:48 /user/test/yhj/input/word3.txt
$ hadoop archive -archiveName word.har -p /user/test/yhj/input/ word1.txt word2.txt word3.txt /user/test/yhj/harInput/
$ hadoop fs -ls /user/test/yhj/harInput/
Found 1 items
drwxr-xr-x   - hdfs hdfs          0 2018-07-05 20:18 /user/test/yhj/harInput/word.har

HAR文件的生成是經過運行一個mapreduce的程序生成,因此須要集羣環境中裝個mapreduceapi

HAR是在Hadoop file system之上的一個文件系統,所以全部fs shell命令對HAR文件都可用,但使用不一樣的URI。另外,請注意檔案是不可變的。因此,重命名,刪除並建立返回一個錯誤,例如:app

$ hadoop fs -ls /user/test/yhj/harInput/word.har
Found 4 items
-rw-r--r--   3 hdfs hdfs          0 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_SUCCESS
-rw-r--r--   5 hdfs hdfs        255 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_index
-rw-r--r--   5 hdfs hdfs         22 2018-07-05 20:18 /user/test/yhj/harInput/word.har/_masterindex
-rw-r--r--   3 hdfs hdfs       2580 2018-07-05 20:18 /user/test/yhj/harInput/word.har/part-0
$ hadoop fs -ls har:/user/test/yhj/harInput/word.har
Found 3 items
-rw-r--r--   3 hdfs hdfs        760 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word1.txt
-rw-r--r--   3 hdfs hdfs         82 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word2.txt
-rw-r--r--   3 hdfs hdfs       1738 2018-07-04 11:48 har:///user/test/yhj/harInput/word.har/word3.txt

能夠看到Hadoop存檔目錄包含元數據(採用_index和_masterindex形式)、數據部分data(part- *)文件、歸檔文件的名稱和部分文件中的位置(_index文件)。ide

HAR文件也能夠被mapreduce讀取,路徑的URI可使用不一樣的URI,好比例子中的文件輸入的路徑URI能夠下面兩種方式使用函數

hdfs://10.1.13.111:8020/user/test/yhj/harInput/word.har
har://hdfs-10.1.13.111:8020/user/test/yhj/harInput/word.har

可是這個例子的文件來講,兩個輸入路徑產生map的個數是不一樣的,har的路徑產生的map有三個,對應三個word*.txt,而hdfs的路徑只有一個,對應word.har/part-0oop

若是是文件支持行記錄切分使用mapreduce來處理數據(文件的先後數據不相互影響),建議使用hdfs的URI路徑,由於存檔目錄的part-*可能包括多個小文件的數據,這樣能夠減小map的個數,不會爲每一個單獨的小文件啓動一個map。

CombineFileInputFormat

將大量小文件作爲mapreduce的輸入是不合適的,由於FileInputFormat只會分割大文件(文件大小超過設定的分片大小,默認爲HDFS的塊大小),對於小於分片大小的文件,每一個文件做爲一個分片,若是文件大小小於一個塊的大小,mapreduce會爲每一個小文件產生一個map,這樣會產生大量小文件,而每一個map只會處理少許數據,每次map操做都會產生開銷。固然能夠經過mapred.min.split.size和mapred.max.split.size來控制map數量。

CombineFileInputFormat是mapreduce針對小文件而設計的,CombineFileInputFormat能夠將多個小文件打包進一個分片,另外,比直接設置map數量好的在於,CombineFileInputFormat在決定將那些塊放入一個分片是會考慮到塊所在的節點和機架的位置,避免操做分片是過多的數據傳輸。

CombineFileInputFormat是一個抽象類,hadoop自帶的實現的有CombineTextInputFormat,咱們能夠經過繼承CombineFileInputFormat實現createRecordReader方法,自定義RecordReader類來實現理海量小文件的MapReduce。

InputFormat主要有兩個方法,getSplits(計算獲得分片),createRecordReader(產生返回RecordReader,RecordReader生成輸出map讀入的鍵值對)

CombineFileInputFormat中已經實現了getSplits,即將多個小文件打包進一個分片中CombineFileSplit,咱們須要實現createRecordReader方法,返回一個能夠讀取該分片中內容的RecordReader。

MyCombineInputFormat的實現

public class MyCombineInputFormat extends CombineFileInputFormat<LongWritable, Text>{
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        RecordReader<LongWritable, Text> reader = new CombineFileRecordReader<>((CombineFileSplit) inputSplit, taskAttemptContext, MyCombineFileRecordReader.class);
        try {
            reader.initialize(inputSplit, taskAttemptContext);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return reader;
    }
}

這裏實際返回了一個CombineFileRecordReader的對象,CombineFileRecordReader經過CombineFileSplit,context和Class<? extends RecordReader>類型構造,MyCombineFileRecordReader是咱們對於CombineFileSplit中每個文件的產生map的輸入的方法。 CombineFileRecordReader中的nextKeyValue方法,會爲每個打包在CombineFileSplit中的文件構造一個RecordReader方法,讀取文件中的記錄。

public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
	...
    public CombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Class<? extends RecordReader<K, V>> rrClass) throws IOException {
        this.split = split;
        this.context = context;
        this.idx = 0;
        this.curReader = null;
        this.progress = 0L;

        try {
            this.rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
            this.rrConstructor.setAccessible(true);
        } catch (Exception var5) {
            throw new RuntimeException(rrClass.getName() + " does not have valid constructor", var5);
        }

        this.initNextRecordReader();
    }


	protected boolean initNextRecordReader() throws IOException {
        if(this.curReader != null) {
            this.curReader.close();
            this.curReader = null;
            if(this.idx > 0) {
                this.progress += this.split.getLength(this.idx - 1);
            }
        }

        if(this.idx == this.split.getNumPaths()) {
            return false;
        } else {
            this.context.progress();

            try {
                Configuration conf = this.context.getConfiguration();
                conf.set("mapreduce.map.input.file", this.split.getPath(this.idx).toString());
                conf.setLong("mapreduce.map.input.start", this.split.getOffset(this.idx));
                conf.setLong("mapreduce.map.input.length", this.split.getLength(this.idx));
                this.curReader = (RecordReader)this.rrConstructor.newInstance(new Object[]{this.split, this.context, Integer.valueOf(this.idx)});
                if(this.idx > 0) {
                    this.curReader.initialize(this.split, this.context);
                }
            } catch (Exception var2) {
                throw new RuntimeException(var2);
            }

            ++this.idx;
            return true;
	}
	
	 public boolean nextKeyValue() throws IOException, InterruptedException {
        do {
            if(this.curReader != null && this.curReader.nextKeyValue()) {
                return true;
            }
        } while(this.initNextRecordReader());

        return false;
    }

    public K getCurrentKey() throws IOException, InterruptedException {
        return this.curReader.getCurrentKey();
    }

    public V getCurrentValue() throws IOException, InterruptedException {
        return this.curReader.getCurrentValue();
    }
	...
}

在nextKeyValue方法中經過自定義的RecordReader的nextKeyValue讀取當前文件的對象,當讀完當前文件中的信息,後會經過initNextRecordReader返回初始化的下一個文件的RecordReader,因此咱們只需實現相應的讀取一個文件的RecordReader便可。

MyCombineFileRecordReader的實現

public class MyCombineFileRecordReader extends RecordReader<LongWritable, Text> {

    private CombineFileSplit combineFileSplit;
    private int currentIndex;
    private LineRecordReader reader = new LineRecordReader();
    private int totalNum;

    public MyCombineFileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index){
        super();
        this.combineFileSplit = combineFileSplit;
        this.currentIndex = index;
        this.totalNum = combineFileSplit.getNumPaths();
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex),
                combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
        context.getConfiguration().set("mapreduce.map.input.file.name", fileSplit.getPath().getName());
        this.reader.initialize(fileSplit, context);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(currentIndex >= 0 && currentIndex < totalNum){
            return reader.nextKeyValue();
        }else {
            return false;
        }
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if(currentIndex >= 0 && currentIndex < totalNum){
            return (float)currentIndex/totalNum;
        }
        return 0;
    }

    @Override
    public void close() throws IOException {
        reader.close();
    }
}

MyCombineFileRecordReader中經過LineRecordReader按行來讀取文本記錄,在initialize方法中經過CombineFileSplit和index(CombineFileSplit中文件信息的索引位置)來獲得相應文件的信息,建立對應的FileSplit,接着建立LineRecordReader對象,在nextKeyValue中委託給LineRecordReader爲mapper產生鍵-值對象。

最後入口函數和map類的實現,將InputFormatClass替換成自定義的MyCombineInputFormat類

public class CombineInputFromatMain extends Configured implements Tool{

    public static class CombineInputFormatMap extends Mapper<Object, Text, Text, Text>{
        private Text outKey = new Text();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            outKey.set(context.getConfiguration().get("mapreduce.map.input.file.name"));
            context.write(outKey, value);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        //設定默認job和設置輸入輸出路徑的函數
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("CombineInputFormat Text");
        job.setJarByClass(CombineInputFromatMain.class);
        job.setMapperClass(CombineInputFormatMap.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(MyCombineInputFormat.class);
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new CombineInputFromatMain(), args));
    }
}

在這例子中將三個word*.txt文件打包進一個分片,實際只產生了一個map。

Sequence file

sequence file由一系列的二進制key/value組成,若是爲key小文件名,value爲文件內容,則能夠將大批小文件合併成一個大文件。

順序文件由文件頭和隨後的記錄內容組成,順序文件的前三個字節爲SEQ(順序文件代碼),緊接着一個字節表示文件的版本號,文件頭還包括鍵和值的類型,數據是否壓縮的標誌位,是否進行快壓縮的標誌位, 數據的壓縮形式,用戶自定義的數據以及同步標識。順序文件讀取內容只能從同步標識開始讀取。同步標識位於記錄和記錄之間,也就是說沒法從記錄中間開始讀取順序文件的內容。

Sequence file的格式主要有三種,分爲未壓縮,記錄壓縮和塊壓縮。主要格式的存儲方式能夠查看官方給出的api:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/SequenceFile.html

將小文件合併成一個sequence file的實現(代碼參考hadoop 權威指南)

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

    public static class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{

        /**
         * 不切分文件,一個split讀入整個文件
         * @param context
         * @param filename
         * @return
         */
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }

        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            RecordReader reader = new WholeFileRecordReader();
            reader.initialize(inputSplit, taskAttemptContext);
            return reader;
        }
    }

    /**
     * 自定義RecordReader,讀取整個小文件內容
     */
    public static class WholeFileRecordReader extends RecordReader<LongWritable, Text>{

        private FileSplit fileSplit;
        private Configuration conf;
        private LongWritable key = new LongWritable();
        private Text value = new Text();
        private boolean process = false;

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.fileSplit = (FileSplit)inputSplit;
            this.conf = taskAttemptContext.getConfiguration();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(!process){
                FileSystem fs = fileSplit.getPath().getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = new FSDataInputStream(fs.open(fileSplit.getPath()));
                    byte[] contextByte = new byte[(int)fileSplit.getLength()];
                    IOUtils.readFully(in, contextByte, 0, contextByte.length);
                    //等同於 in.read(contextByte, 0, contextByte.length);
                    String context = new String(contextByte, "utf-8");
                    key.set(fileSplit.getStart());
                    value.set(context);
                }finally {
                    IOUtils.closeStream(in);
                }
                process = true;
                return true;
            }
            return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return process? 1.0f:1.0f;
        }

        @Override
        public void close() throws IOException {

        }
    }


    public static class SmallFilesToSequenceFileMap extends Mapper<Object, Text, Text, Text>{

        private Text outKey = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            outKey.set(((FileSplit)context.getInputSplit()).getPath().toString());
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            context.write(outKey, value);
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        //設定默認job和設置輸入輸出路徑的函數
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("SmallFiles To SequenceFile");
        job.setMapperClass(SmallFilesToSequenceFileMap.class);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        return job.waitForCompletion(true)? 0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new SmallFilesToSequenceFileConverter(), args));
    }
}

hdfs能夠經過命令行hadoop fs -text來顯示以文本的方式顯示順序文件

讀取SequenceFile簡單實現

public class SequenceFileReadMain extends Configured implements Tool{

    public static class SequenceFileReadMap extends Mapper<Text, Text, Text, Text>{
        private Text outKey = new Text();
        private Text outValue = new Text();
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            outKey.set("key : " + key.toString());
            outValue.set("value : " + value.toString());
            context.write(outKey, outValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobDefaultInit.getClusterDefaultJob(this, getConf(), args);
        job.setJobName("Sequence File Read");
        job.setMapperClass(SequenceFileReadMap.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new SequenceFileReadMain(), args));
    }
}

這時候讀取SequenceFile的時候,key對應的是小文件的名字,value是一個小文件的全部內容,因此須要在map編寫處理整個小文件內容的代碼

參考資料:

https://blog.csdn.net/u011007180/article/details/52333387

https://www.cnblogs.com/staryea/p/8603112.html

http://dongxicheng.org/mapreduce/hdfs-small-files-solution/

相關文章
相關標籤/搜索