Spark SQL表達式內部可用函數與相關源碼

Spark SQL表達式內部可用函數與相關源碼

版權聲明:本文爲博主原創文章,未經博主容許不得轉載。html

手動碼字不易,請你們尊重勞動成果,謝謝java

做者:http://blog.csdn.net/wang_wbqsql

雖然標題寫着是Spark SQL可用函數,可是我並不想直接把它們貼出來。代碼是最好的老師,所以我要教你們本身從源碼中得到這些函數。由於隨着Spark的版本更新,內置函數會愈來愈多,單純把一個版本的函數列出來可能會誤導他人。因此本文所介紹的內容是各版本通用的。express

代碼中的表達式函數

使用代碼中的表達式函數

我這裏所指的表達式代碼中的函數即sincosisnull這種能夠在表達式中編寫的函數:apache

df.select(sin($"a").as("sin_a"), cos($"a").as("cos_a")).filter(!isnull($"sin_a"))

獲取當前使用版本的表達式函數集合

這個類型的函數是定義在org.apache.spark.sql.functions伴生對象中。api

在使用時,只用import org.apache.spark.sql.functions._便可使用其中的全部表達式函數。在須要使用這種類型的函數時,只須要打開這個類便可查找你所須要的函數。session

好比sin函數的定義爲:app

/** * @param e angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
  def sin(e: Column): Column = withExpr { Sin(e.expr) }

  /** * @param columnName angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
  def sin(columnName: String): Column = sin(Column(columnName))

有兩個重載版本,第一個重載版本sin(e: Column)須要填入一個Column對象,第二個重載版本sin(columnName: String)就能夠直接填入列名字符串了。函數

獲取Column對象

若是咱們須要把列名轉換成Column對象,咱們可使用如下方法:this

一、使用這個列所在的DataFrame對象獲取,即調用df("列名")或者df.col("列名")來精確獲取某個DataFrame中的列,使用這種方式能夠用來處理兩個DataFrame進行join合併後出現列名重複的狀況:

/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
  def apply(colName: String): Column = col(colName)

  /** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
  def col(colName: String): Column = colName match {
    case "*" =>
      Column(ResolvedStar(queryExecution.analyzed.output))
    case _ =>
      if (sqlContext.conf.supportQuotedRegexColumnName) {
        colRegex(colName)
      } else {
        val expr = resolve(colName)
        Column(expr)
      }
  }

二、使用org.apache.spark.sql.functions中定義的col或者column方法來獲取列,若是一個DataFrame裏有列名重複,則會拋異常:

/** * Returns a [[Column]] based on the given column name. * * @group normal_funcs * @since 1.3.0 */
  def col(colName: String): Column = Column(colName)

  /** * Returns a [[Column]] based on the given column name. Alias of [[col]]. * * @group normal_funcs * @since 1.3.0 */
  def column(colName: String): Column = Column(colName)

三、經過聲明import sqlContext.implicits._來使用$"a"獲取表示a列的Column對象:

import sparkSession.sqlContext.implicits._

表達式字符串中的函數

使用表達式字符串

使用表達式字符串有如下幾種方式:

一、使用DataFrame中的selectExpr方法
二、使用使用org.apache.spark.sql.functions中定義的expr方法包裹表達式字符串

// 下面兩個表達式是等價的:
   // 最後的abs爲表達式字符串
   ds.selectExpr("colA", "colB as newName", "abs(colC)")
   ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))

三、使用DataFrame中的def filter(conditionExpr: String)def where(conditionExpr: String)方法

// 其中 '>' 可看成表達式字符串
   peopleDs.filter("age > 15")
   peopleDs.where("age > 15")

四、將一個DataFrame註冊到一個表名上。以後就可使用sparkSession.sql或者sqlContext.sql對該表進行處理。Spark 2.0以後能夠運行全部99個TPC-DS查詢。

Spark 2.0以前使用dataFrame.registerTempTable
Spark 2.0以後使用dataFrame.createOrReplaceTempView

如下例子中沒有涉及表達式字符串

// Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")

    // Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age| name|
    // +----+-------+
    // |null|Michael|
    // | 30| Andy|
    // | 19| Justin|
    // +----+-------+

    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age| name|
    // +----+-------+
    // |null|Michael|
    // | 30| Andy|
    // | 19| Justin|
    // +----+-------+

獲取當前使用版本的字符串函數集合

表達式字符串函數的定義是在org.apache.spark.sql.catalyst.analysis.FunctionRegistry伴生對象中,其中expressions變量定義了全部可用的表達式。

舉例說expression[Substring]("substr")聲明,即聲明瞭你能夠在表達式字符串中使用substr函數。具體使用方法能夠進到前面的Substring類中來,如下就是Substring類的聲明:

/** * A function that takes a substring of its first argument starting at a given position. * Defined for String and Binary types. * * NOTE: that this is not zero based, but 1-based index. The first character in str has index 1. */
@ExpressionDescription(
  usage = "_FUNC_(str, pos[, len]) - Returns the substring of `str` that starts at `pos` and is of length `len`, or the slice of byte array that starts at `pos` and is of length `len`.",
  examples = """ Examples: > SELECT _FUNC_('Spark SQL', 5); k SQL > SELECT _FUNC_('Spark SQL', -3); SQL > SELECT _FUNC_('Spark SQL', 5, 1); k """)
case class Substring(str: Expression, pos: Expression, len: Expression)

經過閱讀類上的介紹咱們就能夠了解其用法。

從Spark 2.3.0開始,官方文檔給出了可用函數列表: http://spark.apache.org/docs/latest/api/sql/index.html 即對應着這裏所聲明的函數。

可是要注意這些函數只適用於Spark 2.3.0及之後版本,裏面介紹的部分函數可能在較低Spark版本中不可用

自定義函數

Spark SQL提供了可自定義字符串函數的功能,定義的該字符串函數只能在定義該函數的sparkSession內使用,不然會報解析失敗:

/** * A collection of methods for registering user-defined functions (UDF). * * The following example registers a Scala closure as UDF: * {{{ * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) * }}} * * The following example registers a UDF in Java: * {{{ * sparkSession.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} * * @note The user-defined functions must be deterministic. Due to optimization, * duplicate invocations may be eliminated or the function may even be invoked more times than * it is present in the query. * * @since 2.0.0 */
  def udf: UDFRegistration = sessionState.udfRegistration
相關文章
相關標籤/搜索