【原創】大叔問題定位分享(15)spark寫parquet數據報錯ParquetEncodingException: empty fields are illegal, the field shoul

spark 2.1.1java

 

spark裏執行sql報錯sql

insert overwrite table test_parquet_table select * from dummyapache

報錯以下:app

org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:333)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Parquet record is malformed: empty fields are illegal, the field should be ommited completely instead
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:64)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:123)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:42)
at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:111)
at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:124)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:321)
... 8 more
Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89)
at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60)
... 16 more函數

跟進代碼

org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteroop

    private void writeMap(Object value, MapObjectInspector inspector, GroupType type) {
        GroupType repeatedType = type.getType(0).asGroupType();
        this.recordConsumer.startGroup();
        this.recordConsumer.startField(repeatedType.getName(), 0);
        Map<?, ?> mapValues = inspector.getMap(value);
        Type keyType = repeatedType.getType(0);
        String keyName = keyType.getName();
        ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
        Type valuetype = repeatedType.getType(1);
        String valueName = valuetype.getName();
        ObjectInspector valueInspector = inspector.getMapValueObjectInspector();

        for(Iterator i$ = mapValues.entrySet().iterator(); i$.hasNext(); this.recordConsumer.endGroup()) {
            Entry<?, ?> keyValue = (Entry)i$.next();
            this.recordConsumer.startGroup();
            if (keyValue != null) {
                Object keyElement = keyValue.getKey();
                this.recordConsumer.startField(keyName, 0);
                this.writeValue(keyElement, keyInspector, keyType);
                this.recordConsumer.endField(keyName, 0);
                Object valueElement = keyValue.getValue();
                if (valueElement != null) {
                    this.recordConsumer.startField(valueName, 1);
                    this.writeValue(valueElement, valueInspector, valuetype);
                    this.recordConsumer.endField(valueName, 1);
                }
            }
        }

        this.recordConsumer.endField(repeatedType.getName(), 0);
        this.recordConsumer.endGroup();
    }

    private void writeValue(Object value, ObjectInspector inspector, Type type) {
        if (type.isPrimitive()) {
            this.checkInspectorCategory(inspector, Category.PRIMITIVE);
            this.writePrimitive(value, (PrimitiveObjectInspector)inspector);
        } else {
            GroupType groupType = type.asGroupType();
            OriginalType originalType = type.getOriginalType();
            if (originalType != null && originalType.equals(OriginalType.LIST)) {
                this.checkInspectorCategory(inspector, Category.LIST);
                this.writeArray(value, (ListObjectInspector)inspector, groupType);
            } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
                this.checkInspectorCategory(inspector, Category.MAP);
                this.writeMap(value, (MapObjectInspector)inspector, groupType);
            } else {
                this.checkInspectorCategory(inspector, Category.STRUCT);
                this.writeGroup(value, (StructObjectInspector)inspector, groupType);
            }
        }

    }
    
    private void writePrimitive(Object value, PrimitiveObjectInspector inspector) {
        if (value != null) {
            switch(inspector.getPrimitiveCategory()) {
            case VOID:
                return;
            case DOUBLE:
                this.recordConsumer.addDouble(((DoubleObjectInspector)inspector).get(value));
                break;
            case BOOLEAN:
                this.recordConsumer.addBoolean(((BooleanObjectInspector)inspector).get(value));
                break;
            case FLOAT:
                this.recordConsumer.addFloat(((FloatObjectInspector)inspector).get(value));
                break;
            case BYTE:
                this.recordConsumer.addInteger(((ByteObjectInspector)inspector).get(value));
                break;
            case INT:
                this.recordConsumer.addInteger(((IntObjectInspector)inspector).get(value));
                break;
            case LONG:
                this.recordConsumer.addLong(((LongObjectInspector)inspector).get(value));
                break;
            case SHORT:
                this.recordConsumer.addInteger(((ShortObjectInspector)inspector).get(value));
                break;
            case STRING:
                String v = ((StringObjectInspector)inspector).getPrimitiveJavaObject(value);
                this.recordConsumer.addBinary(Binary.fromString(v));
                break;
            case CHAR:
                String vChar = ((HiveCharObjectInspector)inspector).getPrimitiveJavaObject(value).getStrippedValue();
                this.recordConsumer.addBinary(Binary.fromString(vChar));
                break;
            case VARCHAR:
                String vVarchar = ((HiveVarcharObjectInspector)inspector).getPrimitiveJavaObject(value).getValue();
                this.recordConsumer.addBinary(Binary.fromString(vVarchar));
                break;
            case BINARY:
                byte[] vBinary = ((BinaryObjectInspector)inspector).getPrimitiveJavaObject(value);
                this.recordConsumer.addBinary(Binary.fromByteArray(vBinary));
                break;
            case TIMESTAMP:
                Timestamp ts = ((TimestampObjectInspector)inspector).getPrimitiveJavaObject(value);
                this.recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
                break;
            case DECIMAL:
                HiveDecimal vDecimal = (HiveDecimal)inspector.getPrimitiveJavaObject(value);
                DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
                this.recordConsumer.addBinary(this.decimalToBinary(vDecimal, decTypeInfo));
                break;
            case DATE:
                Date vDate = ((DateObjectInspector)inspector).getPrimitiveJavaObject(value);
                this.recordConsumer.addInteger(DateWritable.dateToDays(vDate));
                break;
            default:
                throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
            }

        }
    }

 

parquet.io.MessageColumnIO.MessageColumnIORecordConsumerthis

        public void startField(String field, int index) {
            try {
                if (MessageColumnIO.DEBUG) {
                    this.log("startField(" + field + ", " + index + ")");
                }

                this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index);
                this.emptyField = true;
                if (MessageColumnIO.DEBUG) {
                    this.printState();
                }

            } catch (RuntimeException var4) {
                throw new ParquetEncodingException("error starting field " + field + " at " + index, var4);
            }
        }

        public void endField(String field, int index) {
            if (MessageColumnIO.DEBUG) {
                this.log("endField(" + field + ", " + index + ")");
            }

            this.currentColumnIO = this.currentColumnIO.getParent();
            if (this.emptyField) {
                throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
            } else {
                this.fieldsWritten[this.currentLevel].markWritten(index);
                this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1];
                if (MessageColumnIO.DEBUG) {
                    this.printState();
                }

            }
        }
        
        public void addInteger(int value) {
            if (MessageColumnIO.DEBUG) {
                this.log("addInt(" + value + ")");
            }

            this.emptyField = false;
            this.getColumnWriter().write(value, this.r[this.currentLevel], this.currentColumnIO.getDefinitionLevel());
            this.setRepetitionLevel();
            if (MessageColumnIO.DEBUG) {
                this.printState();
            }

        }

 

DataWritableWriter報錯的關鍵代碼是這幾行spa

                Object keyElement = keyValue.getKey(); this.recordConsumer.startField(keyName, 0); this.writeValue(keyElement, keyInspector, keyType); this.recordConsumer.endField(keyName, 0);

代碼流程梳理以下:scala

DataWritableWriter.writeMapcode

         MessageColumnIORecordConsumer.startField

                  註釋:this.emptyField = true;

         迭代entry

                  處理key

                          Object keyElement = keyValue.getKey();

                          MessageColumnIORecordConsumer.startField

                          DataWritableWriter.writeValue

                                   DataWritableWriter.isPrimitive

                                            DataWritableWriter.writePrimitive

                                                     1)if (value == null) 或是Void

                                                             註釋:this.emptyField依舊爲true

                                                     2)if (value != null) MessageColumnIORecordConsumer.addInteger

                                                             註釋:this.emptyField = false;

                          MessageColumnIORecordConsumer.endField

         MessageColumnIORecordConsumer.endField

                註釋:if (this.emptyField) {throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");}

當map<?,?>或array<?>類型的列插入空集合或者map中存在key爲null的情形時,就會觸發這個錯誤,

後來發現官方已經有討論:https://issues.apache.org/jira/browse/HIVE-11625

 

要避免這個問題有兩種方式:

1 改用hive執行sql;

2 增長udf函數filter_map,當map爲空集合時置爲null,當map不爲空集合時過濾掉map值中全部key爲null的entry

spark.udf.register("filter_map", ((map : Map[String, String]) => {if (map != null && !map.isEmpty) map.filter(_._1 != null) else null}))
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息