spark union 特別注意

今天遇到一個很詭異的問題。java

表Aredis

userid housecode res ctime
u1 code1 1 1301

表Bide

userid housecode res ctime
u2 code2 0 1302

表C函數

userid name type time
u1 大海 0 1303

而後對錶A進行處理操做大數據

表A.createOrReplaceTempView("t1");
JavaRDD<HistoryModelExt> rdd=removeDuplicateData(t1);
t1= s.createDataFrame(rdd, HistoryModelExt.class);code

而後查看t1, t1.show()對象

u1 code1 1 1301
.. .. .. ..

數據還在,而後 B union A 而後 join C(經過userid), 理論上應該是有結果的,感受就像1+1=2 這麼確定,可是還真沒有數據,很是詫異。rem

剛開始覺得是本身程序哪裏有問題,苦苦尋找,發現一切正常, 最後回到 union這個方法上。get

爲了看清楚來龍去脈, 我把B union A的數據打印了出來,發現了一個奇怪的事情it

userid housecode res ctime
u2 code2 0 1302
1301 code1 1 u1

當時一會兒就明白爲何join 沒有數據了, A的schema已經與B不一致了。
原來 union函數並非按照列名合併,而是按照位置合併。
可是在JavaRDD<HistoryModelExt> rdd=removeDuplicateData(t1); 這步以前仍是一致的,爲何轉成java對象後,schema就變了呢

查看源代碼

/**
   * Applies a schema to an RDD of Java Beans.
   *
   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
   * SELECT * queries will return the columns in an undefined order.
   *
   * @since 2.0.0
   */

 def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
    val attributeSeq: Seq[AttributeReference] = getSchema(beanClass)
    val className = beanClass.getName
    val rowRdd = rdd.mapPartitions { iter =>
    // BeanInfo is not serializable so we must rediscover it remotely for each partition.
      SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
    }
    Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self))
  }

看註釋,fields的順序是不保證的, 原來如此。

這樣你在union前乖乖的執行

t1.select("userId","houseCode","res","ctime");

這樣順序就又恢復了,大數據排查問題特別麻煩,感受是一個很大的坑,但願能幫到後來人。

相關文章
相關標籤/搜索