如何利用Hadoop存儲小文件

************************************************************************************************************php

1. HDFS上的小文件問題

小文件是指文件大小明顯小於HDFS上塊(block)大小(默認64MB)的文件。若是存儲小文件,一定會有大量這樣的小文件,不然你也不會使用Hadoop(If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop)),這樣的文件給hadoop的擴展性和性能帶來嚴重問題。當一個文件的大小小於HDFS的塊大小(默認64MB),就將認定爲小文件不然就是大文件。爲了檢測輸入文件的大小,能夠瀏覽Hadoop DFS 主頁 http://machinename:50070/dfshealth.jsp ,並點擊Browse filesystem(瀏覽文件系統)。java

首先,在HDFS中,任何一個文件,目錄或者block在NameNode節點的內存中均以一個對象表示(元數據)(Every file, directory and block in HDFS is represented as an object in the namenode’s memory),而這受到NameNode物理內存容量的限制。每一個元數據對象約佔150byte,因此若是有1千萬個小文件,每一個文件佔用一個block,則NameNode大約須要2G空間。若是存儲1億個文件,則NameNode須要20G空間,這毫無疑問1億個小文件是不可取的。node

其次,處理小文件並不是Hadoop的設計目標,HDFS的設計目標是流式訪問大數據集(TB級別)。於是,在HDFS中存儲大量小文件是很低效的。訪問大量小文件常常會致使大量的尋找,以及不斷的從一個DatanNde跳到另外一個DataNode去檢索小文件(Reading through small files normally causes lots of seeks and lots of hopping from datanode to datanode to retrieve each small file),這都不是一個頗有效的訪問模式,嚴重影響性能。算法

最後,處理大量小文件速度遠遠小於處理同等大小的大文件的速度。每個小文件要佔用一個slot,而task啓動將耗費大量時間甚至大部分時間都耗費在啓動task和釋放task上。apache

2. MapReduce上的小文件問題

Map任務(task)通常一次處理一個塊大小的輸入(input)(默認使用FileInputFormat)。若是文件很是小,而且擁有大量的這種小文件,那麼每個map task都僅僅處理很是小的input數據,所以會產生大量的map tasks,每個map task都會額外增長bookkeeping開銷(each of which imposes extra bookkeeping overhead)。一個1GB的文件,拆分紅16個塊大小文件(默認block size爲64M),相對於拆分紅10000個100KB的小文件,後者每個小文件啓動一個map task,那麼job的時間將會十倍甚至百倍慢於前者。數組

Hadoop中有一些特性能夠用來減輕bookkeeping開銷:能夠在一個JVM中容許task JVM重用,以支持在一個JVM中運行多個map task,以此來減小JVM的啓動開銷(經過設置mapred.job.reuse.jvm.num.tasks屬性,默認爲1,-1表示無限制)。(譯者注:若是有大量小文件,每一個小文件都要啓動一個map task,則必相應的啓動JVM,這提供的一個解決方案就是重用task 的JVM,以此減小JVM啓動開銷);另 一種方法是使用MultiFileInputSplit,它可使得一個map中可以處理多個split。ruby

3. 爲何會產生大量的小文件

至少有兩種場景下會產生大量的小文件:markdown

(1)這些小文件都是一個大邏輯文件的一部分。因爲HDFS在2.x版本開始支持對文件的append,因此在此以前保存無邊界文件(例如,log文件)(譯者注:持續產生的文件,例如日誌天天都會生成)一種經常使用的方式就是將這些數據以塊的形式寫入HDFS中(a very common pattern for saving unbounded files (e.g. log files) is to write them in chunks into HDFS)。網絡

(2)文件自己就是很小。設想一下,咱們有一個很大的圖片語料庫,每個圖片都是一個獨一的文件,而且沒有一種很好的方法來將這些文件合併爲一個大的文件。app

4. 解決方案

這兩種狀況須要有不一樣的解決方 式。

4.1 第一種狀況

對於第一種狀況,文件是許多記錄(Records)組成的,那麼能夠經過調用HDFS的sync()方法(和append方法結合使用),每隔必定時間生成一個大文件。或者,能夠經過寫一個程序來來合併這些小文件(能夠看一下Nathan Marz關於Consolidator一種小工具的文章)。

4.2 第二種狀況

對於第二種狀況,就須要某種形式的容器經過某種方式來對這些文件進行分組。Hadoop提供了一些選擇:

4.2.1 HAR File

Hadoop Archives (HAR files)是在0.18.0版本中引入到HDFS中的,它的出現就是爲了緩解大量小文件消耗NameNode內存的問題。HAR文件是經過在HDFS上構建一個分層文件系統來工做。HAR文件經過hadoop archive命令來建立,而這個命令實 際上是運行了一個MapReduce做業來將小文件打包成少許的HDFS文件(譯者注:將小文件進行合併幾個大文件)。對於client端來講,使用HAR文件沒有任何的改變:全部的原始文件均可見以及可訪問(只是使用har://URL,而不是hdfs://URL),可是在HDFS中中文件數卻減小了。

讀取HAR中的文件不如讀取HDFS中的文件更有效,而且實際上可能較慢,由於每一個HAR文件訪問須要讀取兩個索引文件以及還要讀取數據文件自己(以下圖)。儘管HAR文件能夠用做MapReduce的輸入,可是沒有特殊的魔法容許MapReduce直接操做HAR在HDFS塊上的全部文件(although HAR files can be used as input to MapReduce, there is no special magic that allows maps to operate over all the files in the HAR co-resident on a HDFS block)。 能夠考慮經過建立一種input format,充分利用HAR文件的局部性優點,可是目前尚未這種input format。須要注意的是:MultiFileInputSplit,即便在HADOOP-4565(https://issues.apache.org/jira/browse/HADOOP-4565)的改進,但始終仍是須要每一個小文件的尋找。咱們很是有興趣看到這個與SequenceFile進行對比。 在目前看來,HARs可能最好僅用於存儲文檔(At the current time HARs are probably best used purely for archival purposes.)。

image

4.2.2 SequenceFile

一般對於"小文件問題"的迴應會是:使用序列文件(SequenceFile)。這種方法的思路是,使用文件名(filename)做爲key,而且文件內容(file contents)做爲value,以下圖。在實踐中這種方式很是有效。咱們回到10,000個100KB小文件問題上,你能夠編寫一個程序將它們放入一個單一的SequenceFile,而後你能夠流式處理它們(直接處理或使用MapReduce)操做SequenceFile。這樣同時會帶來兩個優點:(1)SequenceFiles是可拆分的,所以MapReduce能夠將它們分紅塊並獨立地對每一個塊進行操做;(2)它們同時支持壓縮,不像HAR。 在大多數狀況下,塊壓縮是最好的選擇,由於它將壓縮幾個記錄爲一個塊,而不是一個記錄壓縮一個塊。(Block compression is the best option in most cases, since it compresses blocks of several records (rather than per record))。

image

將現有數據轉換爲SequenceFile可能很慢。 可是,徹底能夠並行建立SequenceFile的集合。(It can be slow to convert existing data into Sequence Files. However, it is perfectly possible to create a collection of Sequence Files in parallel.)Stuart Sierra寫了一篇關於將tar文件轉換爲SequenceFile的文章(https://stuartsierra.com/2008/04/24/a-million-little-files ),像這樣的工具是很是有用的,咱們應該多看看。展望將來,最好設計數據管道,將源數據直接寫入SequenceFile(若是可能),而不是做爲中間步驟寫入小文件。

與HAR文件不一樣,沒有辦法列出SequenceFile中的全部鍵,因此不能讀取整個文件。Map File,就像對鍵進行排序的SequenceFile,只維護了部分索引,因此他們也不能列出全部的鍵,以下圖。

image

SequenceFile是以Java爲中心的。 TFile(https://issues.apache.org/jira/browse/HADOOP-4565 )設計爲跨平臺,而且能夠替代SequenceFile,不過如今還不可用。

4.2.3 HBase

若是你生產不少小文件,那麼根據訪問模式,不一樣類型的存儲可能更合適(If you are producing lots of small files, then, depending on the access pattern, a different type of storage might be more appropriate)。HBase以Map Files(帶索引的SequenceFile)方式存儲數據,若是您須要隨機訪問來執行MapReduce式流式分析,這是一個不錯的選擇( HBase stores data in MapFiles (indexed SequenceFiles), and is a good choice if you need to do MapReduce style streaming analyses with the occasional random look up)。若是延遲是一個問題,那麼還有不少其餘選擇 - 參見Richard Jones對鍵值存儲的調查(http://www.metabrew.com/article/anti-rdbms-a-list-of-distributed-key-value-stores/)。

原文:http://blog.cloudera.com/blog/2009/02/the-small-files-problem/

 

###############################################################################################################

Hadoop對小文件的解決方案

小文件指的是那些size比HDFS的block size(默認64M)小的多的文件。任何一個文件,目錄和block,在HDFS中都會被表示爲一個object存儲在namenode的內存中, 每個object佔用150 bytes的內存空間。因此,若是有10million個文件, 每個文件對應一個block,那麼就將要消耗namenode 3G的內存來保存這些block的信息。若是規模再大一些,那麼將會超出現階段計算機硬件所能知足的極限。 
控制小文件的方法有:

一、應用程序本身控制 
二、archive 
三、Sequence File / Map File 
四、CombineFileInputFormat*** 
五、合併小文件,如HBase部分的compact

一、應用程序本身控制

final Path path = new Path("/combinedfile"); final FSDataOutputStream create = fs.create(path); final File dir = new File("C:\\Windows\\System32\\drivers\\etc"); for(File fileName : dir.listFiles()) { System.out.println(fileName.getAbsolutePath()); final FileInputStream fileInputStream = new FileInputStream(fileName.getAbsolutePath()); final List<String> readLines = IOUtils.readLines(fileInputStream); for (String line : readLines) { create.write(line.getBytes()); } fileInputStream.close(); } create.close();

二、archive 命令行操做

具體參考以下: 
http://blog.csdn.net/scgaliguodong123_/article/details/46341587

文件歸檔 Archive

hadoop不適合小文件的存儲,小文件自己就佔用了不少metadata,就會形成namenode愈來愈大。 
Hadoop Archives (HAR files)是在0.18.0版本中引入的,它的出現就是爲了 
緩解大量小文件消耗namenode內存的問題。 
HAR文件是經過在HDFS上構建一個層次化的文件系統來工做。一個HAR文件是經過hadoop的archive命令來建立,而這個命令實際上也是運行了一個MapReduce任務來將小文件打包成HAR。對於client端來講,使用HAR文件沒有任何影響。全部的原始文件都使用har://URL。但在HDFS端它內部的文件數減小了。 
經過HAR來讀取一個文件並不會比直接從HDFS中讀取文件高效,並且實際上可能還會稍微低效一點,由於對每個HAR文件的訪問都須要完成兩層讀取,index文件的讀取和文件自己數據的讀取。而且儘管HAR文件能夠被用來做爲MapReduce job的input,可是並無特殊的方法來使maps將HAR文件中打包的文件看成一個HDFS文件處理。 
建立文件 hadoop archive -archiveName xxx.har -p /src /dest 
查看內容 hadoop fs -lsr har:///dest/xxx.har

[root@master liguodong]# hadoop archive
archive -archiveName NAME -p <parent path> <src>* <dest>
[root@master liguodong]# hadoop fs -lsr /liguodong
drwxrwxrwx   - hdfs      hdfs          0 2015-05-04 19:40 /liguodong/output
-rwxrwxrwx 3 hdfs hdfs 0 2015-05-04 19:40 /liguodong/output/_SUCCESS -rwxrwxrwx 3 hdfs hdfs 23 2015-05-04 19:40 /liguodong/output/part-r-00000 [root@master liguodong]# hadoop archive -archiveName liguodong.har -p /liguodong output /liguodong/har [root@master liguodong]# hadoop fs -lsr /liguodong drwxr-xr-x - root hdfs 0 2015-06-03 11:15 /liguodong/har drwxr-xr-x - root hdfs 0 2015-06-03 11:15 /liguodong/har/liguodong.har -rw-r--r-- 3 root hdfs 0 2015-06-03 11:15 /liguodong/har/liguodong.har/_SUCCESS -rw-r--r-- 5 root hdfs 254 2015-06-03 11:15 /liguodong/har/liguodong.har/_index -rw-r--r-- 5 root hdfs 23 2015-06-03 11:15 /liguodong/har/liguodong.har/_masterindex -rw-r--r-- 3 root hdfs 23 2015-06-03 11:15 /liguodong/har/liguodong.har/part-0 drwxrwxrwx - hdfs hdfs 0 2015-05-04 19:40 /liguodong/output -rwxrwxrwx 3 hdfs hdfs 0 2015-05-04 19:40 /liguodong/output/_SUCCESS -rwxrwxrwx 3 hdfs hdfs 23 2015-05-04 19:40 /liguodong/output/part-r-00000 查看內容 [root@master liguodong]# hadoop fs -lsr har:///liguodong/har/liguodong.har lsr: DEPRECATED: Please use 'ls -R' instead. drwxr-xr-x - root hdfs 0 2015-05-04 19:40 har:///liguodong/har/liguodong.har/output -rw-r--r-- 3 root hdfs 0 2015-05-04 19:40 har:///liguodong/har/liguodong.har/output/_SUCCESS -rw-r--r-- 3 root hdfs 23 2015-05-04 19:40 har:///liguodong/har/liguodong.har/output/part-r-00000 --------------------------------------------------------------- [root@master liguodong]# hadoop archive -archiveName liguodong2.har -p /liguodong/output /liguodong/har [root@master liguodong]# hadoop fs -lsr har:///liguodong/har/liguodong2.har -rw-r--r-- 3 root hdfs 0 2015-05-04 19:40 har:///liguodong/har/liguodong2.har/_SUCCESS -rw-r--r-- 3 root hdfs 23 2015-05-04 19:40 har:///liguodong/har/liguodong2.har/part-r-00000

三、Sequence File/Map File

Sequence File 
一般對於」the small files problem」的迴應會是:使用SequenceFile。 
這種方法是說,使用filename做爲key,而且file contents做爲value。實踐中這種方式很是管用。 
若是有10000個100KB的文件,能夠寫一個程序來將這些小文件寫入到一個單獨的 SequenceFile中去,而後就能夠在一個streaming fashion(directly or using mapreduce)中來使用這個sequenceFile。不只如此,SequenceFiles也是splittable的,因此mapreduce 能夠break them into chunks,而且分別的被獨立的處理。和HAR不一樣的是,這種方式還支持壓縮。 block的壓縮在許多狀況下都是最好的選擇,由於它將多個 records壓縮到一塊兒,而不是一個record一個壓縮。

在存儲結構上, SequenceFile主要由一個Header後跟多條Record組成。 
Header主要包含了Key classname, Value classname,存儲壓縮算法,用戶自定義元數據等信息,此外,還包含了一些同步標識,用於快速定位到記錄的邊界。

每條Record以鍵值對的方式進行存儲,用來表示它的字符數組可依次解析成:記錄的長度、 Key的長度、 Key值和Value值,而且Value值的結構取決於該記錄是否被壓縮。

數據壓縮有利於節省磁盤空間和加快網絡傳輸, SeqeunceFile支持兩種格式的數據壓縮,分別是: record compression和block compression。 
record compression是對每條記錄的value進行壓縮 
block compression是將一連串的record組織到一塊兒,統一壓縮成一個block。 
block信息主要存儲了:塊所包含的記錄數、每條記錄Key長度的集合、每條記錄Key值的集合、每條記錄Value長度的集合和每條記錄Value值的集合 
注:每一個block的大小是可經過io.seqfile.compress.blocksize屬性來指定的。

Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path seqFile=new Path("seqFile.seq"); //Reader內部類用於文件的讀取操做 SequenceFile.Reader reader=new SequenceFile.Reader(fs,seqFile,conf); //Writer內部類用於文件的寫操做,假設Key和Value都爲Text類型 SequenceFile.Writer writer=new SequenceFile.Writer(fs,conf,seqFile,Text.class,Text.class); //經過writer向文檔中寫入記錄 writer.append(new Text("key"),new Text("value")); IOUtils.closeStream(writer);//關閉write流 //經過reader從文檔中讀取記錄 Text key=new Text(); Text value=new Text(); while(reader.next(key,value)) { System.out.println(key); System.out.println(value); } IOUtils.closeStream(reader);//關閉read流

具體可參考: 
http://blog.csdn.net/scgaliguodong123_/article/details/46391061

MapFile 
MapFile是排序後的SequenceFile,經過觀察其目錄結構能夠看到 
MapFile由兩部分組成,分別是data和index。 
index做爲文件的數據索引,主要記錄了每一個Record的key值,以及 
該Record在文件中的偏移位置。 
在MapFile被訪問的時候,索引文件會被加載到內存,經過索引映射關係可迅速定位到指定Record所在文件位置,所以,相對SequenceFile而言, MapFile的檢索效率是高效的,缺點是會消耗一部份內存來存儲index數據。 
需注意的是, MapFile並不會把全部Record都記錄到index中去,默認狀況下每隔128條記錄存儲一個索引映射。固然,記錄間隔可人爲修改,經過MapFIle.Writer的setIndexInterval()方法,或修改io.map.index.interval屬性; 
另外,與SequenceFile不一樣的是, MapFile的KeyClass必定要實現 
WritableComparable接口 ,即Key值是可比較的。

Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); Path mapFile=new Path("mapFile.map"); //Writer內部類用於文件的寫操做,假設Key和Value都爲Text類型 MapFile.Writer writer=new MapFile.Writer(conf,fs,mapFile.toString(),Text.class,Text.class); //經過writer向文檔中寫入記錄 writer.append(new Text("key"),new Text("value")); IOUtils.closeStream(writer);//關閉write流 //Reader內部類用於文件的讀取操做 MapFile.Reader reader=new MapFile.Reader(fs,mapFile.toString(),conf); //經過reader從文檔中讀取記錄 Text key=new Text(); Text value=new Text(); while(reader.next(key,value)) { System.out.println(key); System.out.println(value); } IOUtils.closeStream(reader);//關閉read流

五、CombineFileInputFormat

相對於大量的小文件來講,hadoop更合適處理少許的大文件。 
CombineFileInputFormat能夠緩解這個問題,它是針對小文件而設計的。 
**注:**CombineFileInputFormat是一個抽象類,須要編寫一個繼承類。 
使用CombineFileInputFormat做爲Map任務的輸入規格描述,首先須要實現一個自定義的RecordReader。

CombineFileInputFormat的大體原理 
它會將輸入多個數據文件(小文件)的元數據所有包裝到CombineFileSplit類裏面。也就是說,由於小文件的狀況下,在HDFS中都是單Block的文件,即一個文件一個Block,一個CombineFileSplit包含了一組文件Block,包括每一個文件的起始偏移(offset),長度(length),Block位置(localtions)等元數據。

若是想要處理一個 CombineFileSplit,很容易想到,對其包含的每一個InputSplit(實際上這裏面沒有這個,你須要讀取一個小文件塊的時候,須要構造一 個FileInputSplit對象)。 
在執行MapReduce任務的時候,須要讀取文件的文本行(簡單一點是文本行,也多是其餘格式數據)。 
那麼對於CombineFileSplit來講,你須要處理其包含的小文件Block,就要對應設置一個RecordReader,才能正確讀取文件數據內容。 
一般狀況下,咱們有一批小文件,格式一般是相同的,只須要在CombineFileSplit實現一個RecordReader的時候, 
內置另外一個用來讀取小文件Block的RecordReader,這樣就能保證讀取CombineFileSplit內部聚積的小文件。

咱們基於Hadoop內置的CombineFileInputFormat來實現處理海量小文件,須要作的工做,以下所示: 
一、實現一個RecordReader來讀取CombineFileSplit包裝的文件Block 
二、繼承自CombineFileInputFormat實現一個使用咱們自定義的RecordReader的輸入規格說明類。 
三、處理數據的Mapper實現類 
四、配置用來處理海量小文件的MapReduce Job

package SmallFile; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable,BytesWritable> { @Override public RecordReader<LongWritable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit)(split); CombineFileRecordReader<LongWritable,BytesWritable> recordReader = new CombineFileRecordReader<LongWritable,BytesWritable> (combineFileSplit, context,CombineSmallfileRecordReader.class); try { recordReader.initialize(combineFileSplit, context); } catch (InterruptedException e) { e.printStackTrace(); } return recordReader; } } class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable> { private CombineFileSplit combineFileSplit; private LineRecordReader lineRecordReader = new LineRecordReader(); private Path[] paths; private int totalLength; private int currentIndex; private float currentProgress = 0; private LongWritable currentKey; private BytesWritable currentValue; public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index) { super(); this.combineFileSplit = combineFileSplit; this.currentIndex = index; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex),combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations()); lineRecordReader.initialize(fileSplit, context); this.paths = combineFileSplit.getPaths(); //分區所在的全部地址 context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName()); //設置輸入文件名 } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(currentIndex>=0 && currentIndex<totalLength) { return lineRecordReader.nextKeyValue(); } return false; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { currentKey = lineRecordReader.getCurrentKey(); return currentKey; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { byte[]value = lineRecordReader.getCurrentValue().getBytes(); currentValue.set(value, 0, value.length); return currentValue; } @Override public float getProgress() throws IOException, InterruptedException { if(currentIndex>=0 && currentIndex<totalLength) { currentProgress = currentIndex/totalLength; return currentProgress; } return currentProgress; } @Override public void close() throws IOException { lineRecordReader.close(); } }
相關文章
相關標籤/搜索