Spark存儲Parquet數據到Hive,對map、array、struct字段類型的處理

利用Spark往Hive中存儲parquet數據,針對一些複雜數據類型如map、array、struct的處理遇到的問題?java

爲了更好的說明致使問題的緣由、現象以及解決方案,首先看下述示例:apache

-- 建立存儲格式爲parquet的Hive非分區表
CREATE EXTERNAL TABLE `t1`(
`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
STORED AS PARQUET
LOCATION '/home/spark/test/tmp/t1';

-- 建立存儲格式爲parquet的Hive分區表
CREATE EXTERNAL TABLE `t2`(
`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/home/spark/test/tmp/t2';

 

分別向t一、t2執行insert into(insert overwrite..select也會致使下列問題)語句,列map_col都存儲爲空map:微信

 

insert into table t1 values(1,map(),array('1,1,1'),named_struct('A','1','B','1'));

insert into table t2 partition(dt='20200101') values(1,map(),array('1,1,1'),named_struct('A','1','B','1'));

 

t1表正常執行,但對t2執行上述insert語句時,報以下異常:app

Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
    at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
    ... 23 more

 

t1和t2從建表看惟一的區別就是t1不是分區表而t2是分區表,僅僅從報錯信息是沒法看出表分區產生這種問題的緣由,看看源碼是作了哪些不一樣的處理(這裏爲了方便,筆者這裏直接給出分析這個問題的源碼思路圖):oop

t1底層存儲指定的是ParquetFilemat,t2底層存儲指定的是HiveFileFormat。這裏主要分析一下存儲空map到t2時,爲何出問題,以及如何處理,看幾個核心的代碼(具體的能夠參考上述源碼圖):學習

從拋出的異常信息empty fields are illegal,關鍵看empty fields在哪裏拋出,作了哪些處理,這要看MessageColumnIO中startField和endField是作了哪些處理:大數據

public void startField(String field, int index) {
  try {
  if (MessageColumnIO.DEBUG) {
    this.log("startField(" + field + ", " + index + ")");
  }

  this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index);
  //MessageColumnIO中,startField方法中首先會將emptyField設置爲true
  this.emptyField = true;
  if (MessageColumnIO.DEBUG) {
      this.printState();
  }

  } catch (RuntimeException var4) {
 throw new ParquetEncodingException("error starting field " + field + " at " + index, var4);
    }
}

//endField方法中會針對emptyField是否爲true來決定是否拋出異常
public void endField(String field, int index) {
   if (MessageColumnIO.DEBUG) {
       this.log("endField(" + field + ", " + index + ")");
   }

   this.currentColumnIO = this.currentColumnIO.getParent();
   //若是到這裏仍爲true,則拋異常
   if (this.emptyField) {
       throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
     } else {
         this.fieldsWritten[this.currentLevel].markWritten(index);
         this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1];
        if (MessageColumnIO.DEBUG) {
            this.printState();
        }

    }
}

 

針對map作處理的一些源碼:this

private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
    // Get the internal map structure (MAP_KEY_VALUE)
    GroupType repeatedType = type.getType(0).asGroupType();

    recordConsumer.startGroup();
    recordConsumer.startField(repeatedType.getName(), 0);

    Map<?, ?> mapValues = inspector.getMap(value);

    Type keyType = repeatedType.getType(0);
    String keyName = keyType.getName();
    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();

    Type valuetype = repeatedType.getType(1);
    String valueName = valuetype.getName();
    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();

    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
      recordConsumer.startGroup();
      if (keyValue != null) {
        // write key element
        Object keyElement = keyValue.getKey();
        //recordConsumer此處對應的是MessageColumnIO中的MessageColumnIORecordConsumer
        //查看其中的startField和endField的處理
        recordConsumer.startField(keyName, 0);
        //查看writeValue中對原始數據類型的處理,如int、boolean、varchar
        writeValue(keyElement, keyInspector, keyType);
        recordConsumer.endField(keyName, 0);

        // write value element
        Object valueElement = keyValue.getValue();
        if (valueElement != null) {
          //同上
          recordConsumer.startField(valueName, 1);
          writeValue(valueElement, valueInspector, valuetype);
          recordConsumer.endField(valueName, 1);
        }
      }
      recordConsumer.endGroup();
    }

    recordConsumer.endField(repeatedType.getName(), 0);
    recordConsumer.endGroup();
}

private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
  //value爲null,則return
  if (value == null) {
    return;
  }

  switch (inspector.getPrimitiveCategory()) {
    //PrimitiveCategory爲VOID,則return
    case VOID:
      return;
    case DOUBLE:
      recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));

break;

//下面是對double、boolean、float、byte、int等數據類型作的處理,這裏不在貼出

....

 

 

能夠看到在startFiled中首先對emptyField設置爲true,只有在結束時好比endField方法中將emptyField設置爲false,纔不會拋出上述異常。而存儲字段類型爲map時,有幾種狀況會致使這種異常的發生,好比map爲空或者map的key爲null。spa

這裏只是以map爲例,對於array、struct都有相似問題,看源碼HiveFileFormat -> DataWritableWriter對這三者處理方式相似。相似的問題,在Hive的issue中https://issues.apache.org/jira/browse/HIVE-11625也有討論。3d

分析出問題解決就比較簡單了,以存儲map類型字段爲例:

1. 若是沒法改變建表schema,或者存儲時底層用的就是HiveFileFormat

若是沒法肯定存儲的map字段是否爲空,存儲以前判斷一下map是否爲空,能夠寫個udf或者用size判斷一下,同時要保證key不能爲null

2. 建表時使用Spark的DataSource表

-- 這種方式本質上仍是用ParquetFileFormat,而且是內部表,生產中不建議直接使用這種方式

CREATE TABLE `test`(

`id` STRING,
`map_col` MAP<STRING, STRING>,
`arr_col` ARRAY<STRING>,
`struct_col` STRUCT<A:STRING,B:STRING>)
USING parquet
OPTIONS(`serialization.format` '1');

 

3. 存儲時指定ParquetFileFormat

好比,ds.write.format("parquet").save("/tmp/test")其實像這類問題,相信不少人都遇到過而且解決了。這裏是爲了給出當遇到問題時,解決的一種思路。不只要知道如何解決,更要知道發生問題是什麼緣由致使的、如何避免這種問題、解決了問題是怎麼解決的(爲何這種方式能解決,有沒有更優的方法)等。

 

近期文章:

Spark SQL解析查詢parquet格式Hive表獲取分區字段和查詢條件

Spark SQL

Apache Hive


 

關注微信公衆號:大數據學習與分享,獲取更對技術乾貨

相關文章
相關標籤/搜索