RCFile在平臺的應用場景中多數用於存儲須要「長期留存」的數據文件,在咱們的實踐過程當中,RCFile的數據壓縮比一般能夠達到8 : 1或者10 : 1,特別適用於存儲用戶經過Hive(MapReduce)分析的結果。目前平臺的計算引擎正逐步由Hadoop MapReduce遷移至Spark,存儲方面咱們依然想利用RCFile的優點,可是具體實踐中遇到那麼幾個「坑」。
數據分析師使用PySpark構建Spark分析程序,源數據是按行存儲的文本文件(可能有壓縮),結果數據爲Python list,list的元素類型爲tuple,而tuple的元素類型爲unicode(Python2,爲了保持你們對數據認知的一致性,源數據是文本,咱們要求用戶的處理結果也爲文本),能夠看出list實際能夠理解爲一張表(Table),list的元素tuple爲行(Row),tuple的元素爲列(Column),所以可以很好的利用RCFile的列式存儲特性。
RCFile擴展自Hadoop InputFormat、OutputFormat、Writable:
org.apache.hadoop.hive.ql.io.RCFileInputFormat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable
注意:RCFile的使用須要依賴於Hive的Jar。
使用RCFileOutputFormat時咱們須要處理tuple => BytesRefArrayWritable(Object[] => BytesRefArrayWritable)的數據類型轉換,使用RCFileInputFormat時咱們須要處理BytesRefArrayWritable => tuple(BytesRefArrayWritable => Object[])的數據類型轉換,也就是說咱們須要擴展兩個Converter:
ObjectArrayToBytesRefArrayWritableConverter:用於Object[] => BytesRefArrayWritable的數據類型轉換;
BytesRefArrayWritableToObjectArrayConverter:用於BytesRefArrayWritable => Object[]的數據類型轉換;
(1)ObjectArrayToBytesRefArrayWritableConverter;
convert的參數類型爲Object[],返回值類型爲BytesRefArrayWritable。
(2)BytesRefArrayWritableToObjectArrayConverter;
convert的參數類型爲BytesRefArrayWritable,返回值類型爲Object[]。
1. 模擬數據(用戶分析結果),將其以RCFile的形式保存至HDFS;
咱們模擬的數據爲三行三列,數據類型均爲文本,須要注意的是RCFile在保存數據時須要經過Hadoop Configuration指定「列數」,不然會出現如下異常:
此外RCFileOutputFormat RecordWriter會丟棄「key」:
所以「key」能夠是任意值,只要兼容Hadoop Writable便可,此處咱們將「key」處理爲None,並設置keyClass爲org.apache.hadoop.io.NullWritable。
並且運行上述程序以前,還須要將com.sina.dip.spark.converter.ObjectArrayToBytesRefArrayWritableConverter編譯打包爲獨立的Jar:rcfile.jar,運行命令以下:
spark-submit --jars rcfile.jar 1.5.1/examples/app/spark_app_save_data_to_rcfile.py
出乎意料,異常信息出現:
引起異常的代碼並非咱們自定義擴展的ObjectArrayToBytesRefArrayWritableConverter,而是RCFileOutputFormat,怎麼可能,這不是官方提供的代碼麼?根據異常堆棧可知,RCFileOutputFormat第79行(不一樣版本的Hive可能代碼行數不一樣)代碼出現空指針異常:
該行可能引起空指針異常的惟一緣由就是outputPath == null,而outputPath的值由方法getWorkOutputPath計算而得:
其中JobContext.TASK_OUTPUT_DIR的值爲mapreduce.task.output.dir。
熟悉Hadoop的同窗可能已經想到,方法getWorkOutputPath是用來計算Map或Reduce Task臨時輸出目錄的,JobContext.TASK_OUTPUT_DIR屬性也是之前綴「mapreduce」開頭的,「
Spark運行時是不會爲該屬性設置值的」,因此outputPath == null,那麼咱們應該如何計算outputPath呢?
困惑之餘,咱們聯想到當初調研Spark時是以文本爲基礎進行功能測試的,也就是說在Spark中使用TextInputFormat、TextOutputFormat是沒有任何問題的,果斷參考一下TextOutputFormat是如何實現的?
FileOutputFormat是一個基礎類,這意味着咱們能夠重寫RCFileOutputFormat getRecordWriter,使用FileOutputFormat.getTaskOutputPath替換getWorkOutputPath:
能夠看出,重寫後的getRecordWriter僅僅是改變了outputPath的計算方式,其它邏輯並無改變,咱們將重寫後的類命名爲com.sina.dip.spark.output.DipRCFileOutputFormat,並將其一併編譯打包爲獨立的Jar:rcfile.jar。
從新修改Spark代碼:
咱們做出了兩個地方的修改:
(1)parallelize numSlices:1,考慮到模擬的數據量比較小,爲了便於查看結果,咱們將「分區數」設置爲1,這樣最終僅有一個數據文件;
(2)outputFormatClass:com.sina.dip.spark.output.DipRCFileOutputFormat;
再次運行命令:
spark-submit --jars rcfile.jar 1.5.1/examples/app/spark_app_save_data_to_rcfile.py
程序執行結果以後,咱們經過HDFS FS命令查看目錄:hdfs://dip.dev.cdh5:8020/user/yurun/rcfile/:
數據文件已成功生成,爲了確認寫入的正確性,咱們經過Hive RCFileCat命令查看文件:hdfs://dip.dev.cdh5:8020/user/yurun/rcfile/part-00000:
可見寫入文件的數據與咱們模擬的數據是一致的。
2. 讀取上一步寫入的數據;
運行上述程序以前,還須要將com.sina.dip.spark.converter.BytesRefArrayWritableToObjectArrayConverter編譯打包爲獨立的Jar:rcfile.jar,運行命令以下:
spark-submit --jars rcfile.jar 1.5.1/examples/app/spark_app_read_data_from_rcfile.py
輸出結果:
咱們使用Hive原生的RCFileInputFormat,以及咱們本身擴展的BytesRefArrayWritableToObjectArrayConverter正確完成了RCFile數據的讀取,實際上pair[0]能夠理解爲「行數」(注意keyClass的設置),一般狀況下沒有實際意義,能夠選擇忽略。
綜上所述,Spark(PySpark)使用RCFile的過程當中會遇到三個「坑」:
(1)須要重寫RCFileOutputFormat getRecordWriter;
(2)須要擴展Converter支持tuple(Object[]) => BytesRefArrayWritable的數據類型轉換;
(3)須要擴展Converter支持BytesRefArrayWritable => tuple (Object[])的數據類型轉換。