版權聲明:本文爲博主原創文章,未經博主容許不得轉載。html
手動碼字不易,請你們尊重勞動成果,謝謝java
雖然標題寫着是Spark SQL可用函數,可是我並不想直接把它們貼出來。代碼是最好的老師,所以我要教你們本身從源碼中得到這些函數。由於隨着Spark的版本更新,內置函數會愈來愈多,單純把一個版本的函數列出來可能會誤導他人。因此本文所介紹的內容是各版本通用的。express
我這裏所指的表達式代碼中的函數即sin
、cos
、isnull
這種能夠在表達式中編寫的函數:apache
df.select(sin($"a").as("sin_a"), cos($"a").as("cos_a")).filter(!isnull($"sin_a"))
這個類型的函數是定義在org.apache.spark.sql.functions伴生對象中。api
在使用時,只用import org.apache.spark.sql.functions._便可使用其中的全部表達式函數。在須要使用這種類型的函數時,只須要打開這個類便可查找你所須要的函數。session
好比sin
函數的定義爲:app
/** * @param e angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
def sin(e: Column): Column = withExpr { Sin(e.expr) }
/** * @param columnName angle in radians * @return sine of the angle, as if computed by `java.lang.Math.sin` * * @group math_funcs * @since 1.4.0 */
def sin(columnName: String): Column = sin(Column(columnName))
有兩個重載版本,第一個重載版本sin(e: Column)
須要填入一個Column
對象,第二個重載版本sin(columnName: String)
就能夠直接填入列名字符串了。函數
若是咱們須要把列名轉換成Column
對象,咱們可使用如下方法:this
一、使用這個列所在的DataFrame
對象獲取,即調用df("列名")
或者df.col("列名")
來精確獲取某個DataFrame中的列,使用這種方式能夠用來處理兩個DataFrame進行join
合併後出現列名重複的狀況:
/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
def apply(colName: String): Column = col(colName)
/** * Selects column based on the column name and returns it as a [[Column]]. * * @note The column name can also reference to a nested column like `a.b`. * * @group untypedrel * @since 2.0.0 */
def col(colName: String): Column = colName match {
case "*" =>
Column(ResolvedStar(queryExecution.analyzed.output))
case _ =>
if (sqlContext.conf.supportQuotedRegexColumnName) {
colRegex(colName)
} else {
val expr = resolve(colName)
Column(expr)
}
}
二、使用org.apache.spark.sql.functions中定義的col
或者column
方法來獲取列,若是一個DataFrame裏有列名重複,則會拋異常:
/** * Returns a [[Column]] based on the given column name. * * @group normal_funcs * @since 1.3.0 */
def col(colName: String): Column = Column(colName)
/** * Returns a [[Column]] based on the given column name. Alias of [[col]]. * * @group normal_funcs * @since 1.3.0 */
def column(colName: String): Column = Column(colName)
三、經過聲明import sqlContext.implicits._
來使用$"a"
獲取表示a
列的Column對象:
import sparkSession.sqlContext.implicits._
使用表達式字符串有如下幾種方式:
一、使用DataFrame
中的selectExpr
方法
二、使用使用org.apache.spark.sql.functions中定義的expr
方法包裹表達式字符串
// 下面兩個表達式是等價的:
// 最後的abs爲表達式字符串
ds.selectExpr("colA", "colB as newName", "abs(colC)")
ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
三、使用DataFrame
中的def filter(conditionExpr: String)
或def where(conditionExpr: String)
方法
// 其中 '>' 可看成表達式字符串
peopleDs.filter("age > 15")
peopleDs.where("age > 15")
四、將一個DataFrame註冊到一個表名上。以後就可使用sparkSession.sql或者sqlContext.sql對該表進行處理。Spark 2.0以後能夠運行全部99個TPC-DS查詢。
Spark 2.0以前使用dataFrame.registerTempTable
Spark 2.0以後使用dataFrame.createOrReplaceTempView
如下例子中沒有涉及表達式字符串
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
表達式字符串函數的定義是在org.apache.spark.sql.catalyst.analysis.FunctionRegistry伴生對象中,其中expressions變量定義了全部可用的表達式。
舉例說expression[Substring]("substr")
聲明,即聲明瞭你能夠在表達式字符串中使用substr函數。具體使用方法能夠進到前面的Substring類中來,如下就是Substring類的聲明:
/** * A function that takes a substring of its first argument starting at a given position. * Defined for String and Binary types. * * NOTE: that this is not zero based, but 1-based index. The first character in str has index 1. */
@ExpressionDescription(
usage = "_FUNC_(str, pos[, len]) - Returns the substring of `str` that starts at `pos` and is of length `len`, or the slice of byte array that starts at `pos` and is of length `len`.",
examples = """ Examples: > SELECT _FUNC_('Spark SQL', 5); k SQL > SELECT _FUNC_('Spark SQL', -3); SQL > SELECT _FUNC_('Spark SQL', 5, 1); k """)
case class Substring(str: Expression, pos: Expression, len: Expression)
經過閱讀類上的介紹咱們就能夠了解其用法。
從Spark 2.3.0開始,官方文檔給出了可用函數列表: http://spark.apache.org/docs/latest/api/sql/index.html 即對應着這裏所聲明的函數。
可是要注意這些函數只適用於Spark 2.3.0及之後版本,裏面介紹的部分函數可能在較低Spark版本中不可用。
Spark SQL提供了可自定義字符串函數的功能,定義的該字符串函數只能在定義該函數的sparkSession
內使用,不然會報解析失敗:
/** * A collection of methods for registering user-defined functions (UDF). * * The following example registers a Scala closure as UDF: * {{{ * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1) * }}} * * The following example registers a UDF in Java: * {{{ * sparkSession.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} * * @note The user-defined functions must be deterministic. Due to optimization, * duplicate invocations may be eliminated or the function may even be invoked more times than * it is present in the query. * * @since 2.0.0 */
def udf: UDFRegistration = sessionState.udfRegistration