Spark SQL 列名帶點的處理方法

Spark SQL 列名帶點的處理方法

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。css

手動碼字不易,請你們尊重勞動成果,謝謝java

做者:http://blog.csdn.net/wang_wbqsql

因爲這個問題遇到的比較多,所以從個人另外一篇博客中摘錄出來:https://blog.csdn.net/wang_wbq/article/details/79672768apache

這裏咱們將介紹結構體數據的索引獲取,包括路徑中包含特殊字符如點」.」反引號」`」,首先咱們先建立一個基本的DataFrame:ide

scala> case class A(a: Int, b: String, c: Boolean, d: java.sql.Date, e: java.sql.Timestamp, f: Array[Byte])
defined class A

scala> case class B(`b.g`: Byte, `b.h`: Short, `b.i`: Long, `b.j`: Float)
defined class B

scala> case class C(x: A, y: A, z: B)
defined class C

scala> val ss = (1 to 10).map(i => C(A(i, s"str_$i", i % 2 == 0, new java.sql.Date(i * 10000000), new java.sql.Timestamp(i * 10000000), s"str_$i".getBytes), A(i, s"str_$i", i % 2 == 0, new java.sql.Date(i * 10000000), new java.sql.Timestamp(i * 10000000), s"str_$i".getBytes), B(i.toByte, i.toShort, i, i)))
ss: scala.collection.immutable.IndexedSeq[C] = Vector(C(A(1,str_1,false,1970-01-01,1970-01-01 10:46:40.0,[B@6f558842),A(1,str_1,false,1970-01-01,1970-01-01 10:46:40.0,[B@6a7fd5ba),B(1,1,1,1.0)), C(A(2,str_2,true,1970-01-01,1970-01-01 13:33:20.0,[B@38726836),A(2,str_2,true,1970-01-01,1970-01-01 13:33:20.0,[B@43a5e5a6),B(2,2,2,2.0)), C(A(3,str_3,false,1970-01-01,1970-01-01 16:20:00.0,[B@f17208),A(3,str_3,false,1970-01-01,1970-01-01 16:20:00.0,[B@1437b5a9),B(3,3,3,3.0)), C(A(4,str_4,true,1970-01-01,1970-01-01 19:06:40.0,[B@3f2f4cc1),A(4,str_4,true,1970-01-01,1970-01-01 19:06:40.0,[B@534b0657),B(4,4,4,4.0)), C(A(5,str_5,false,1970-01-01,1970-01-01 21:53:20.0,[B@58215f04),A(5,str_5,false,1970-01-01,1970-01-01 21:53:20.0,[B@71923354),B(5,5,5,5.0)), C(A(6,str_6,true,1970-01-02,1970-01-02 00:40...

scala> val df = spark.createDataFrame(ss).select(col("x").as("`x`"), col("y"), col("z").as("c.z"))
df: org.apache.spark.sql.DataFrame = [`x`: struct<a: int, b: string ... 4 more fields>, y: struct<a: int, b: string ... 4 more fields> ... 1 more field]

scala> df.show(10, false)
+-----------------------------------------------------------+-----------------------------------------------------------+------------------+
|`x`                                                        |y                                                          |c.z               |
+-----------------------------------------------------------+-----------------------------------------------------------+------------------+
|[1, str_1, false, 1970-01-01, 1970-01-01 10:46:40, str_1]  |[1, str_1, false, 1970-01-01, 1970-01-01 10:46:40, str_1]  |[1, 1, 1, 1.0]    |
|[2, str_2, true, 1970-01-01, 1970-01-01 13:33:20, str_2]   |[2, str_2, true, 1970-01-01, 1970-01-01 13:33:20, str_2]   |[2, 2, 2, 2.0]    |
|[3, str_3, false, 1970-01-01, 1970-01-01 16:20:00, str_3]  |[3, str_3, false, 1970-01-01, 1970-01-01 16:20:00, str_3]  |[3, 3, 3, 3.0]    |
|[4, str_4, true, 1970-01-01, 1970-01-01 19:06:40, str_4]   |[4, str_4, true, 1970-01-01, 1970-01-01 19:06:40, str_4]   |[4, 4, 4, 4.0]    |
|[5, str_5, false, 1970-01-01, 1970-01-01 21:53:20, str_5]  |[5, str_5, false, 1970-01-01, 1970-01-01 21:53:20, str_5]  |[5, 5, 5, 5.0]    |
|[6, str_6, true, 1970-01-02, 1970-01-02 00:40:00, str_6]   |[6, str_6, true, 1970-01-02, 1970-01-02 00:40:00, str_6]   |[6, 6, 6, 6.0]    |
|[7, str_7, false, 1970-01-02, 1970-01-02 03:26:40, str_7]  |[7, str_7, false, 1970-01-02, 1970-01-02 03:26:40, str_7]  |[7, 7, 7, 7.0]    |
|[8, str_8, true, 1970-01-02, 1970-01-02 06:13:20, str_8]   |[8, str_8, true, 1970-01-02, 1970-01-02 06:13:20, str_8]   |[8, 8, 8, 8.0]    |
|[9, str_9, false, 1970-01-02, 1970-01-02 09:00:00, str_9]  |[9, str_9, false, 1970-01-02, 1970-01-02 09:00:00, str_9]  |[9, 9, 9, 9.0]    |
|[10, str_10, true, 1970-01-02, 1970-01-02 11:46:40, str_10]|[10, str_10, true, 1970-01-02, 1970-01-02 11:46:40, str_10]|[10, 10, 10, 10.0]|
+-----------------------------------------------------------+-----------------------------------------------------------+------------------+


scala> df.printSchema
root
 |-- `x`: struct (nullable = true)
 |    |-- a: integer (nullable = false)
 |    |-- b: string (nullable = true)
 |    |-- c: boolean (nullable = false)
 |    |-- d: date (nullable = true)
 |    |-- e: timestamp (nullable = true)
 |    |-- f: binary (nullable = true)
 |-- y: struct (nullable = true)
 |    |-- a: integer (nullable = false)
 |    |-- b: string (nullable = true)
 |    |-- c: boolean (nullable = false)
 |    |-- d: date (nullable = true)
 |    |-- e: timestamp (nullable = true)
 |    |-- f: binary (nullable = true)
 |-- c.z: struct (nullable = true)
 |    |-- b.g: byte (nullable = false)
 |    |-- b.h: short (nullable = false)
 |    |-- b.i: long (nullable = false)
 |    |-- b.j: float (nullable = false)

對於列名中不包含點」.」反引號」`」的狀況下,咱們能夠直接使用點分隔符來獲取其中的值,因爲咱們使用了重命名錶達式name AS alias,所以咱們使用的是selectExpr:函數

scala> df.selectExpr("y.a", "y.b", "y.c as boolean_value", "y.d as data_value", "y.e as timestmp_value", "y.f").show
+---+------+-------------+----------+-------------------+-------------------+
|  a|     b|boolean_value|data_value|     timestmp_value|                  f|
+---+------+-------------+----------+-------------------+-------------------+
|  1| str_1|        false|1970-01-01|1970-01-01 10:46:40|   [73 74 72 5F 31]|
|  2| str_2|         true|1970-01-01|1970-01-01 13:33:20|   [73 74 72 5F 32]|
|  3| str_3|        false|1970-01-01|1970-01-01 16:20:00|   [73 74 72 5F 33]|
|  4| str_4|         true|1970-01-01|1970-01-01 19:06:40|   [73 74 72 5F 34]|
|  5| str_5|        false|1970-01-01|1970-01-01 21:53:20|   [73 74 72 5F 35]|
|  6| str_6|         true|1970-01-02|1970-01-02 00:40:00|   [73 74 72 5F 36]|
|  7| str_7|        false|1970-01-02|1970-01-02 03:26:40|   [73 74 72 5F 37]|
|  8| str_8|         true|1970-01-02|1970-01-02 06:13:20|   [73 74 72 5F 38]|
|  9| str_9|        false|1970-01-02|1970-01-02 09:00:00|   [73 74 72 5F 39]|
| 10|str_10|         true|1970-01-02|1970-01-02 11:46:40|[73 74 72 5F 31 30]|
+---+------+-------------+----------+-------------------+-------------------+


scala> df.selectExpr("y.a", "y.b", "y.c as boolean_value", "y.d as data_value", "y.e as timestmp_value", "y.f").printSchema
root
 |-- a: integer (nullable = true)
 |-- b: string (nullable = true)
 |-- boolean_value: boolean (nullable = true)
 |-- data_value: date (nullable = true)
 |-- timestmp_value: timestamp (nullable = true)
 |-- f: binary (nullable = true)

若是路徑名中帶有點」.」的話,若是直接使用點的話,會報錯。從報錯裏看你可能會疑惑,明明裏面有爲何報取不出來:學習

scala> df.selectExpr("c.z")
org.apache.spark.sql.AnalysisException: cannot resolve '`c.z`' given input columns: [`x`, y, c.z]; line 1 pos 0;
'Project ['c.z]
+- AnalysisBarrier
      +- Project [x#142 AS `x`#148, y#143, z#144 AS c.z#149]
         +- LocalRelation [x#142, y#143, z#144]

在路徑名中帶有點」.」的狀況下,咱們要使用反引號」`」將一個完整名字包裹起來,讓Spark SQL認爲這是一個完整的總體而不是兩層路徑:spa

scala> df.select("`c.z`").show
+------------------+
|               c.z|
+------------------+
|    [1, 1, 1, 1.0]|
|    [2, 2, 2, 2.0]|
|    [3, 3, 3, 3.0]|
|    [4, 4, 4, 4.0]|
|    [5, 5, 5, 5.0]|
|    [6, 6, 6, 6.0]|
|    [7, 7, 7, 7.0]|
|    [8, 8, 8, 8.0]|
|    [9, 9, 9, 9.0]|
|[10, 10, 10, 10.0]|
+------------------+

scala> df.select("`c.z`").printSchema
root
 |-- c.z: struct (nullable = true)
 |    |-- b.g: byte (nullable = false)
 |    |-- b.h: short (nullable = false)
 |    |-- b.i: long (nullable = false)
 |    |-- b.j: float (nullable = false)

//不一樣層級的查詢依然使用點分隔符

scala> df.select(col("`c.z`.`b.g`"), expr("`c.z`.`b.g` AS czbg"), col("`c.z`.`b.i`").as("czbi"), $"`c.z`.`b.j`").show
+---+----+----+----+
|b.g|czbg|czbi| b.j|
+---+----+----+----+
|  1|   1|   1| 1.0|
|  2|   2|   2| 2.0|
|  3|   3|   3| 3.0|
|  4|   4|   4| 4.0|
|  5|   5|   5| 5.0|
|  6|   6|   6| 6.0|
|  7|   7|   7| 7.0|
|  8|   8|   8| 8.0|
|  9|   9|   9| 9.0|
| 10|  10|  10|10.0|
+---+----+----+----+


scala> df.select(col("`c.z`.`b.g`"), expr("`c.z`.`b.g` AS czbg"), col("`c.z`.`b.i`").as("czbi"), $"`c.z`.`b.j`").printSchema
root
 |-- b.g: byte (nullable = true)
 |-- czbg: byte (nullable = true)
 |-- czbi: long (nullable = true)
 |-- b.j: float (nullable = true)

//你也可使用中括號或者小括號獲取下一個層級

scala> df.select(expr("`c.z`['b.g'] As czbg"), col("`c.z`")("b.i").as("czbi")).printSchema
root
 |-- czbg: byte (nullable = true)
 |-- czbi: long (nullable = true)

在路徑名中帶有反引號」`」的狀況下,咱們要使用雙反引號來代替一個反引號:.net

scala> df.select(expr("```x```.a"), expr("```x```")("b")).show
+---+------+
|  a| `x`.b|
+---+------+
|  1| str_1|
|  2| str_2|
|  3| str_3|
|  4| str_4|
|  5| str_5|
|  6| str_6|
|  7| str_7|
|  8| str_8|
|  9| str_9|
| 10|str_10|
+---+------+


scala> df.select(expr("```x```.a"), expr("```x```")("b")).printSchema
root
 |-- a: integer (nullable = true)
 |-- `x`.b: string (nullable = true)

這種作法的代碼依據爲,在org.apache.spark.sql.catalyst.parser.PostProcessor extends SqlBaseBaseListener類中,咱們能夠看到Spark SQL重寫了Antlr 4的Listener方法,將兩個反引號替換爲一個反引號,因爲它是在表達式解析器中定義的,所以咱們必須在expr函數中來使用它scala

override def exitQuotedIdentifier(ctx: SqlBaseParser.QuotedIdentifierContext): Unit = {
    replaceTokenByIdentifier(ctx, 1) { token =>
      // Remove the double back ticks in the string.
      token.setText(token.getText.replace("``", "`"))
      token
    }
  }

在Antlr 4的詞法描述文件中,能夠看到quotedIdentifier是用來匹配反引號包裹的字符串的:

quotedIdentifier
    : BACKQUOTED_IDENTIFIER ;

BACKQUOTED_IDENTIFIER
    : '`' ( ~'`' | '``' )* '`' ;

在這篇博客裏介紹了在Spark 2.0以後的版本里,如何去找Spark SQL的Antlr 4語法描述文件,若是有興趣能夠繼續深刻學習Antlr 4與Spark SQL源碼:http://www.javashuo.com/article/p-fnnqcoao-cz.html

相關文章
相關標籤/搜索