spark dataset 相同列名 join

具備部分相同、部分不一樣列名的兩個Dataset按照部分相同、部分不一樣列相等進行join操做,有如下幾種方式:sql

val df1 = Seq((1, 2, 3),(1, 1, 1)).toDF("a", "b", "c")
val df2 = Seq((1, 2, 4),(2, 2, 2)).toDF("a", "b1", "d")

df1.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  1|  1|  1|
+---+---+---+
df2.show
+---+---+---+
|  a| b1|  d|
+---+---+---+
|  1|  2|  4|
|  2|  2|  2|
+---+---+---+
//join條件:df1("a") == df2("a") && df1("b") == df2("b1") 

//如果直接join會報錯:org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be:...
df1.join(df2, col("a") === col("a") && col("b") === col("b1"), "outer").show
//能夠改成這樣:
df1.join(df2, df1("a") === df2("a") && col("b") === col("b1"), "outer").show
+----+----+----+----+----+----+
|   a|   b|   c|   a|  b1|   d|
+----+----+----+----+----+----+
|null|null|null|   2|   2|   2|
|   1|   2|   3|   1|   2|   4|
|   1|   1|   1|null|null|null|
+----+----+----+----+----+----+

//固然也能夠將其中一個Dataset的列更名,改成都相同或都不一樣,再用上面的方法join
df1.join(df2.withColumnRenamed("b1", "b"), Seq("a", "b"), "outer").show
+---+---+----+----+
|  a|  b|   c|   d|
+---+---+----+----+
|  2|  2|null|   2|
|  1|  2|   3|   4|
|  1|  1|   1|null|
+---+---+----+----+

//還能夠用Dataset的as方法(與alias方法等效),給Dataset命名,而後消除歧義。(Dataset的別名相似SQL中表的別名)
df1.alias("df1")
    .join(df2.as("df2"), col("df1.a") === col("df2.a") && col("b") === col("b1"), "outer")
    .show
+----+----+----+----+----+----+
|   a|   b|   c|   a|  b1|   d|
+----+----+----+----+----+----+
|null|null|null|   2|   2|   2|
|   1|   2|   3|   1|   2|   4|
|   1|   1|   1|null|null|null|
+----+----+----+----+----+----+
//若是隻想保留df2的a列:
val t = df1.alias("df1")
    .join(df2.as("df2"), col("df1.a") === col("df2.a") && col("b") === col("b1"), "outer")
    .drop(col("df1.a")).show
+----+----+----+----+----+
|   b|   c|   a|  b1|   d|
+----+----+----+----+----+
|null|null|   2|   2|   2|
|   2|   3|   1|   2|   4|
|   1|   1|null|null|null|
+----+----+----+----+----+

補充: 
Dataset的as方法(與alias方法等效):爲Dataset對象起別名,Dataset的別名相似SQL中表的別名。apache

val df = Seq((1, 2),(1, 1)).toDF("a", "b")
df.select("a").show
+---+
|  a|
+---+
|  1|
|  1|
+---+

df.select("df.a").show
//報錯:org.apache.spark.sql.AnalysisException: cannot resolve '`df.a`' given input columns: [a, b];

df.as("df").select("df.a").show
+---+
|  a|
+---+
|  1|
|  1|
+---+
相關文章
相關標籤/搜索