Hadoop 做爲MR 的開源實現,一直以動態運行解析文件格式並得到比MPP數據庫快上幾倍的裝載速度爲優點。不過,MPP數據庫社區也一直批評Hadoop因爲文件格式並不是爲特定目的而建,所以序列化和反序列化的成本太高。 html
目前 hadoop 中流行的文件格式有以下幾種: java
SequenceFile是Hadoop API 提供的一種二進制文件,它將數據以<key,value>的形式序列化到文件中。這種二進制文件內部使用Hadoop 的標準的Writable 接口實現序列化和反序列化。它與Hadoop API中的MapFile 是互相兼容的。Hive 中的SequenceFile 繼承自Hadoop API 的SequenceFile,不過它的key爲空,使用value 存放實際的值, 這樣是爲了不MR 在運行map 階段的排序過程。若是你用Java API 編寫SequenceFile,並讓Hive 讀取的話,請確保使用value字段存放數據,不然你須要自定義讀取這種SequenceFile 的InputFormat class 和OutputFormat class。 git
RCFile是Hive推出的一種專門面向列的數據格式。 它遵循「先按列劃分,再垂直劃分」的設計理念。當查詢過程當中,針對它並不關心的列時,它會在IO上跳過這些列。須要說明的是,RCFile在map階段從遠端拷貝仍然是拷貝整個數據塊,而且拷貝到本地目錄後RCFile並非真正直接跳過不須要的列,並跳到須要讀取的列, 而是經過掃描每個row group的頭部定義來實現的,可是在整個HDFS Block 級別的頭部並無定義每一個列從哪一個row group起始到哪一個row group結束。因此在讀取全部列的狀況下,RCFile的性能反而沒有SequenceFile高。 github
HDFS塊內行存儲的例子 算法
HDFS塊內列存儲的例子 sql
HDFS塊內RCFile方式存儲的例子
數據庫
Avro是一種用於支持數據密集型的二進制文件格式。它的文件格式更爲緊湊,若要讀取大量數據時,Avro可以提供更好的序列化和反序列化性能。而且Avro數據文件天生是帶Schema定義的,因此它不須要開發者在API 級別實現本身的Writable對象。最近多個Hadoop 子項目都支持Avro 數據格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。 apache
除上面提到的3種二進制格式以外,文本格式的數據也是Hadoop中常常碰到的。如TextFile 、XML和JSON。 文本格式除了會佔用更多磁盤資源外,對它的解析開銷通常會比二進制格式高几十倍以上,尤爲是XML 和JSON,它們的解析開銷比Textfile 還要大,所以強烈不建議在生產系統中使用這些格式進行儲存。 若是須要輸出這些格式,請在客戶端作相應的轉換操做。 文本格式常常會用於日誌收集,數據庫導入,Hive默認配置也是使用文本格式,並且經常容易忘了壓縮,因此請確保使用了正確的格式。另外文本格式的一個缺點是它不具有類型和模式,好比銷售金額、利潤這類數值數據或者日期時間類型的數據,若是使用文本格式保存,因爲它們自己的字符串類型的長短不一,或者含有負數,致使MR沒有辦法排序,因此每每須要將它們預處理成含有模式的二進制格式,這又致使了沒必要要的預處理步驟的開銷和儲存資源的浪費。 架構
Hadoop實際上支持任意文件格式,只要可以實現對應的RecordWriter和RecordReader便可。其中數據庫格式也是會常常儲存在Hadoop中,好比Hbase,Mysql,Cassandra,MongoDB。 這些格式通常是爲了不大量的數據移動和快速裝載的需求而用的。他們的序列化和反序列化都是由這些數據庫格式的客戶端完成,而且文件的儲存位置和數據佈局(Data Layout)不禁Hadoop控制,他們的文件切分也不是按HDFS的塊大小(blocksize)進行切割。 app
Facebook曾在2010 ICDE(IEEE International Conference on Data Engineering)會議上介紹了數據倉庫Hive。Hive存儲海量數據在Hadoop系統中,提供了一套類數據庫的數據存儲和處理機制。它採用類SQL語言對數據進行自動化管理和處理,通過語句解析和轉換,最終生成基於Hadoop的MapReduce任務,經過執行這些任務完成數據處理。下圖顯示了Hive數據倉庫的系統結構。
Facebook在數據倉庫上遇到的存儲可擴展性的挑戰是獨一無二的。他們在基於Hive的數據倉庫中存儲了超過300PB的數據,而且以每日新增600TB的速度增加。去年這個數據倉庫所存儲的數據量增加了3倍。考慮到這個增加趨勢,存儲效率問題是facebook數據倉庫基礎設施方面目前乃至未來一段時間內最須要關注的。facebook工程師發表的RCFile: A Fast and Spaceefficient Data Placement Structure in MapReducebased Warehouse Systems一文,介紹了一種高效的數據存儲結構——RCFile(Record Columnar File),並將其應用於Facebook的數據倉庫Hive中。與傳統數據庫的數據存儲結構相比,RCFile更有效地知足了基於MapReduce的數據倉庫的四個關鍵需求,即Fast data loading、Fast query processing、Highly efficient storage space utilization和Strong adaptivity to highly dynamic workload patterns。RCFile 普遍應用於Facebook公司的數據分析系統Hive中。首先,RCFile具有至關於行存儲的數據加載速度和負載適應能力;其次,RCFile的讀優化能夠在掃描表格時避免沒必要要的列讀取,測試顯示在多數狀況下,它比其餘結構擁有更好的性能;再次,RCFile使用列維度的壓縮,所以可以有效提高存儲空間利用率。
爲了提升存儲空間利用率,Facebook各產品線應用產生的數據從2010年起均採用RCFile結構存儲,按行存儲(SequenceFile/TextFile)結構保存的數據集也轉存爲RCFile格式。此外,Yahoo公司也在Pig數據分析系統中集成了RCFile,RCFile正在用於另外一個基於Hadoop的數據管理系統Howl(http://wiki.apache.org/pig/Howl)。並且,根據Hive開發社區的交流,RCFile也成功整合加入其餘基於MapReduce的數據分析平臺。有理由相信,做爲數據存儲標準的RCFile,將繼續在MapReduce環境下的大規模數據分析中扮演重要角色。
facebook 的數據倉庫中數據被加載到表裏面時首先使用的存儲格式是Facebook本身開發的Record-Columnar File Format(RCFile)。RCFile是一種「容許按行查詢,提供了列存儲的壓縮效率」的混合列存儲格式。它的核心思想是首先把Hive表水平切分紅多個行組(row groups),而後組內按照列垂直切分,這樣列與列的數據在磁盤上就是連續的存儲塊了。
當一個行組內的全部列寫到磁盤時,RCFile就會以列爲單位對數據使用相似zlib/lzo的算法進行壓縮。當讀取列數據的時候使用惰性解壓策略( lazy decompression),也就是說用戶的某個查詢若是隻是涉及到一個表中的部分列的時候,RCFile會跳過不須要的列的解壓縮和反序列化的過程。經過在facebook的數據倉庫中選取有表明性的例子實驗,RCFile可以提供5倍的壓縮比。
隨着數據倉庫中存儲的數據量持續增加,FB組內的工程師開始研究提升壓縮效率的技術和方法。研究的焦點集中在列級別的編碼方法,例如行程長度編碼(run-length encoding)、詞典編碼(dictionary encoding)、參考幀編碼(frame of reference encoding)、可以在通用壓縮過程以前更好的在列級別下降邏輯冗餘的數值編碼方法。FB也嘗試過新的列類型(例如JSON是在Facebook內部普遍使用的格式,把JSON格式的數據按照結構化的方式存儲既能夠知足高效查詢的需求,同時也下降了JSON元數據存儲的冗餘)。FB的實驗代表列級別的編碼若是使用得當的話可以顯著提升RCFile的壓縮比。
與此同時,Hortonworks也在嘗試相似的思路去改進Hive的存儲格式。Hortonworks的工程團隊設計和實現了ORCFile(包括存儲格式和讀寫接口),這幫助Facebook的數據倉庫設計和實現新的存儲格式提供了一個很好的開始。
關於 ORCFile 的介紹請見這裏:http://yanbohappy.sinaapp.com/?p=478
關於性能評測,筆者這裏暫時沒有條件,貼一張某次 hive 技術峯會演講嘉賓的截圖:
上面說了這麼多,想必你已經知道 RCFile 主要用於提高 hive 的查詢效率,那如何生成這種格式的文件呢?
例如:
insert overwrite table http_RCTable partition(dt='2013-09-30') select p_id,tm,idate,phone from tmp_testp where dt='2013-09-30';
目前爲止,mapreduce 並無提供內置 API 對 RCFile 進行支持,卻是 pig、hive、hcatalog 等 hadoop生態圈裏的其餘項目進行了支持,究其緣由是由於 RCFile 相比 textfile 等其它文件格式,對於 mapreduce 的應用場景來講沒有顯著的優點。
爲了不重複造輪子,下面的生成 RCFile 的 mapreduce 代碼調用了 hive 和 hcatalog 的相關類,注意你在測試下面的代碼時,你的 hadoop、hive、hcatalog 版本要一致,不然。。。你懂的。。。
好比我用的 hive-0.10.0+198-1.cdh4.4.0,那麼就應該下載對應的版本:http://archive.cloudera.com/cdh4/cdh/4/
PS:下面的代碼已經測試經過,木有問題。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat; import org.apache.hcatalog.rcfile.RCFileMapReduceOutputFormat; public class TextToRCFile extends Configured implements Tool{ public static class Map extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{ private byte[] fieldData; private int numCols; private BytesRefArrayWritable bytes; @Override protected void setup(Context context) throws IOException, InterruptedException { numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0); bytes = new BytesRefArrayWritable(numCols); } public void map(Object key, Text line, Context context ) throws IOException, InterruptedException { bytes.clear(); String[] cols = line.toString().split("\\|"); System.out.println("SIZE : "+cols.length); for (int i=0; i<numCols; i++){ fieldData = cols[i].getBytes("UTF-8"); BytesRefWritable cu = null; cu = new BytesRefWritable(fieldData, 0, fieldData.length); bytes.set(i, cu); } context.write(NullWritable.get(), bytes); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 2){ System.out.println("Usage: " + "hadoop jar RCFileLoader.jar <main class> " + "-tableName <tableName> -numCols <numberOfColumns> -input <input path> " + "-output <output path> -rowGroupSize <rowGroupSize> -ioBufferSize <ioBufferSize>"); System.out.println("For test"); System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " + "-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 " + "-output RCFileLoaderTest/RCFile_test1"); System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " + "-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 " + "-output RCFileLoaderTest/RCFile_test2"); return 2; } /* For test */ String tableName = ""; int numCols = 0; String inputPath = ""; String outputPath = ""; int rowGroupSize = 16 *1024*1024; int ioBufferSize = 128*1024; for (int i=0; i<otherArgs.length - 1; i++){ if("-tableName".equals(otherArgs[i])){ tableName = otherArgs[i+1]; }else if ("-numCols".equals(otherArgs[i])){ numCols = Integer.parseInt(otherArgs[i+1]); }else if ("-input".equals(otherArgs[i])){ inputPath = otherArgs[i+1]; }else if("-output".equals(otherArgs[i])){ outputPath = otherArgs[i+1]; }else if("-rowGroupSize".equals(otherArgs[i])){ rowGroupSize = Integer.parseInt(otherArgs[i+1]); }else if("-ioBufferSize".equals(otherArgs[i])){ ioBufferSize = Integer.parseInt(otherArgs[i+1]); } } conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize); conf.setInt("io.file.buffer.size", ioBufferSize); Job job = new Job(conf, "RCFile loader: loading table " + tableName + " with " + numCols + " columns"); job.setJarByClass(TextToRCFile.class); job.setMapperClass(Map.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(BytesRefArrayWritable.class); // job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(inputPath)); job.setOutputFormatClass(RCFileMapReduceOutputFormat.class); RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols); RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath)); RCFileMapReduceOutputFormat.setCompressOutput(job, false); System.out.println("Loading table " + tableName + " from " + inputPath + " to RCFile located at " + outputPath); System.out.println("number of columns:" + job.getConfiguration().get("hive.io.rcfile.column.number.conf")); System.out.println("RCFile row group size:" + job.getConfiguration().get("hive.io.rcfile.record.buffer.size")); System.out.println("io bufer size:" + job.getConfiguration().get("io.file.buffer.size")); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TextToRCFile(), args); System.exit(res); } }
(1)淺析Hadoop文件格式 http://www.infoq.com/cn/articles/hadoop-file-format
(2)Facebook數據倉庫揭祕:RCFile高效存儲結構 http://www.csdn.net/article/2011-04-29/296900
(3)Facebook的數據倉庫是如何擴展到300PB的 http://yanbohappy.sinaapp.com/?p=478
(4)Hive架構 http://www.jdon.com/bigdata/hive.html
(5)Hive:ORC File Format存儲格式詳解 http://www.iteblog.com/archives/1014
(6)普通文本壓縮成RcFile的通用類 https://github.com/ysmart-xx/ysmart/blob/master/javatest/TextToRCFile.java
http://hugh-wangp.iteye.com/blog/1405804 基於HIVE文件格式的map reduce代碼編寫
http://smallboby.iteye.com/blog/1596776 普通文本壓縮成RcFile的通用類
http://smallboby.iteye.com/blog/1592531 RcFile存儲和讀取操做
https://github.com/kevinweil/elephant-bird/blob/master/rcfile/src/main/java/com/twitter/elephantbird/mapreduce/output/RCFileOutputFormat.java