SparkSQL與Hive metastore Parquet轉換

Spark SQL爲了更好的性能,在讀寫Hive metastore parquet格式的表時,會默認使用本身的Parquet SerDe,而不是採用Hive的SerDe進行序列化和反序列化。該行爲能夠經過配置參數spark.sql.hive.convertMetastoreParquet進行控制,默認true。

這裏從表schema的處理角度而言,就必須注意Hive和Parquet兼容性,主要有兩個區別:
css

1.Hive是大小寫敏感的,但Parquet相反
java

2.Hive會將全部列視爲nullable,可是nullability在parquet裏有獨特的意義
git

因爲上面的緣由,在將Hive metastore parquet轉化爲Spark SQL parquet時,須要兼容處理一下Hive和Parquet的schema,即須要對兩者的結構進行一致化。主要處理規則是:

1.有相同名字的字段必需要有相同的數據類型,忽略nullability。兼容處理的字段應該保持Parquet側的數據類型,這樣就能夠處理到nullability類型了(空值問題)
sql

2.兼容處理的schema應只包含在Hive元數據裏的schema信息,主要體如今如下兩個方面:
express

(1)只出如今Parquet schema的字段會被忽略
apache

(2)只出如今Hive元數據裏的字段將會被視爲nullable,並處理到兼容後的schema中
關於schema(或者說元數據metastore),Spark SQL在處理Parquet表時,一樣爲了更好的性能,會緩存Parquet的元數據信息。此時,若是咱們直接經過Hive或者其餘工具對該Parquet表進行修改致使了元數據的變化,那麼Spark SQL緩存的元數據並不能同步更新,此時須要手動刷新Spark SQL緩存的元數據,來確保元數據的一致性,方式以下:
// 第一種方式應用的比較多1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")2. sparkSession.catalog.refreshByPath(s"${path}")

最後說一下最近後臺小夥伴在生產中遇到的一個問題,你們若是在業務處理中遇到相似的問題,提供一個思路。
緩存

在說問題以前首先了解一個參數spark.sql.parquet.writeLegacyFormat(默認false)的做用:
ruby

設置爲true時,數據會以Spark1.4和更早的版本的格式寫入。好比decimal類型的值會被以Apache Parquet的fixed-length byte array格式寫出,該格式是其餘系統例如Hive、Impala等使用的
設置爲false時,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。若是Spark SQL要以Parquet輸出而且結果會被不支持新格式的其餘系統使用的話,須要設置爲true。

好比,對於decimal數據類型的兼容處理,不設置true時,常常會報相似以下的錯誤:
微信

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)... Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)...

此時咱們須要將spark.sql.parquet.writeLegacyFormat設置爲true來解決上述的異常問題。app

但若是同時設置spark.sql.hive.convertMetastoreParquet爲false時,要注意一些數據類型以及精度的處理,好比對於decimal類型的處理。經過一個例子復原一下當時的場景:

1.建立Hive外部表testdb.test_decimal,其中字段fee_rate爲decimal(10,10)

CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING , `fee_rate` DECIMAL(10,10)) PARTITIONED BY (`dt` STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop/data/test_decimal' TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;

2.將testdb.item中的數據處理後保存到testdb.test_decimal中

// 這裏爲了展現方便,直接查詢testdb.item中的數據// 注意: 字段fee_rate的類型爲decimal(10,6)select no, fee_rate from testdb.item  where dt=20190528;
// testdb.item中數據示例以下+-------------------+----------------+| no| fee_rate|+-------------------+----------------+| 1| 0.000000|| 2| 0.000000||        3|        0.000000|+-------------------+----------------+
3.將testdb.item中的數據保存到testdb.test_decimal中
// tmp是上述查詢testdb.item得到的臨時表// 以parquet格式保存到test_decimal的20200529分區中save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`; msck repair TABLE testdb.item;

上述1-3都能成功執行,數據也能保存到testdb.test_decimal中,可是當查詢testdb.test_decimal中的數據時,好比執行sql:

select * from testdb.test_decimal where dt = 20200529;
會報以下空指針的異常:
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  ...

究其緣由是由於按照上述兩個參數的配置,testdb.item中fee_rate字段類型爲decimal(10,6),數據爲0.000000,通過一系列處理0.000000最終會被處理爲0,看下邊最終致使空指針異常的部分,就會一目瞭然。

public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) { if (bd == null) { return null; } else { bd = trim(bd); if (bd.scale() > maxScale) { bd = bd.setScale(maxScale, RoundingMode.HALF_UP); }            // testdb.test_decimal中fee_rate的類型decimal(10,10),即precision爲10,scale也爲10            // 對應這裏即maxPrecision和maxScale分別爲10,則maxIntDigits爲0            int maxIntDigits = maxPrecision - maxScale;                        // bd對應0。對於0而言,precision爲1,scale爲0            // 處理以後 intDigits爲1            int intDigits = bd.precision() - bd.scale(); return intDigits > maxIntDigits ? null : bd; }}

解決辦法也很簡單,就是將testdb.test_decimal中的fee_rate數據類型和依賴的表testdb.item中的fee_rate保持徹底一致,即也爲decimal(10,6)。

這個現象在實際應用環境中常常遇到,通用的解決辦法就是將要保存的表中的數據類型與依賴的表(物理表或者臨時表)的字段類型保持徹底一致。


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,按期推送精彩案例,技術專家直播,問答區近萬人Spark技術同窗在線提問答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大數據和感興趣的同窗能夠加小編微信(下圖二維碼,備註「進羣」)進入技術交流微信羣。

Apache Spark技術交流社區公衆號,微信掃一掃關注


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索