Spark SQL, DataFrames and Datasets Guide
- Overview
- 開始入門
- Data Sources (數據源)
- 性能調優
- 分佈式 SQL 引擎
- 遷移指南
- 參考
Overview
Spark SQL 是 Spark 處理結構化數據的一個模塊.與基礎的 Spark RDD API 不一樣, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式能夠跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 不管使用哪一種 API / 語言均可以快速的計算.這種統一意味着開發人員可以在基於提供最天然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不一樣的 .html
該頁面全部例子使用的示例數據都包含在 Spark 的發佈中, 而且可使用 spark-shell
, pyspark
shell, 或者 sparkR
shell來運行.java
SQL
Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也可以被用於從已存在的 Hive 環境中讀取數據.更多關於如何配置這個特性的信息, 請參考 Hive 表 這部分. 當以另外的編程語言運行SQL 時, 查詢結果將以 Dataset/DataFrame的形式返回.您也可使用 命令行或者經過 JDBC/ODBC與 SQL 接口交互.python
Datasets and DataFrames
一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優勢(強類型化, 可以使用強大的 lambda 函數)與Spark SQL執行引擎的優勢.一個 Dataset 能夠從 JVM 對象來 構造 而且使用轉換功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因爲 Python 的動態特性, 許多 Dataset API 的優勢已經可用了 (也就是說, 你可能經過 name 天生的row.columnName
屬性訪問一行中的字段).這種狀況和 R 類似.mysql
一個 DataFrame 是一個 Dataset 組成的指定列.它的概念與一個在關係型數據庫或者在 R/Python 中的表是相等的, 可是有不少優化. DataFrames 能夠從大量的 sources 中構造出來, 好比: 結構化的文本文件, Hive中的表, 外部數據庫, 或者已經存在的 RDDs. DataFrame API 能夠在 Scala, Java, Python, 和 R中實現. 在 Scala 和 Java中, 一個 DataFrame 所表明的是一個多個 Row
(行)的的 Dataset(數據集合). 在 the Scala API中, DataFrame
僅僅是一個 Dataset[Row]
類型的別名. 然而, 在 Java API中, 用戶須要去使用 Dataset<Row>
去表明一個 DataFrame
.git
在此文檔中, 咱們將經常會引用 Scala/Java Datasets 的 Row
s 做爲 DataFrames.github
開始入門
起始點: SparkSession
Spark SQL中全部功能的入口點是 SparkSession
類. 要建立一個 SparkSession
, 僅使用 SparkSession.builder()
就能夠了:sql
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
Spark 2.0 中的SparkSession
爲 Hive 特性提供了內嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數據的能力.爲了使用這些特性, 你不須要去有一個已存在的 Hive 設置.shell
建立 DataFrames
在一個 SparkSession
中, 應用程序能夠從一個 已經存在的 RDD
, 從hive表, 或者從 Spark數據源中建立一個DataFrames.數據庫
舉個例子, 下面就是基於一個JSON文件建立一個DataFrame:express
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
無類型的Dataset操做 (aka DataFrame 操做)
DataFrames 提供了一個特定的語法用在 Scala, Java, Python and R中機構化數據的操做.
正如上面提到的同樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個 Row
s的Dataset. 這些操做也參考了與強類型的Scala/Java Datasets中的」類型轉換」 對應的」無類型轉換」 .
這裏包括一些使用 Dataset 進行結構化數據處理的示例 :
// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
可以在 DataFrame 上被執行的操做類型的完整列表請參考 API 文檔.
除了簡單的列引用和表達式以外, DataFrame 也有豐富的函數庫, 包括 string 操做, date 算術, 常見的 math 操做以及更多.可用的完整列表請參考 DataFrame 函數指南.
Running SQL Queries Programmatically
SparkSession
的 sql
函數可讓應用程序以編程的方式運行 SQL 查詢, 並將結果做爲一個 DataFrame
返回.
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
全局臨時視圖
Spark SQL中的臨時視圖是session級別的, 也就是會隨着session的消失而消失. 若是你想讓一個臨時視圖在全部session中相互傳遞而且可用, 直到Spark 應用退出, 你能夠創建一個全局的臨時視圖.全局的臨時視圖存在於系統數據庫 global_temp
中, 咱們必須加上庫名去引用它, 好比. SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
建立Datasets
Dataset 與 RDD 類似, 然而, 並非使用 Java 序列化或者 Kryo 編碼器 來序列化用於處理或者經過網絡進行傳輸的對象. 雖然編碼器和標準的序列化都負責將一個對象序列化成字節, 編碼器是動態生成的代碼, 而且使用了一種容許 Spark 去執行許多像 filtering, sorting 以及 hashing 這樣的操做, 不須要將字節反序列化成對象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
RDD的互操做性
Spark SQL 支持兩種不一樣的方法用於轉換已存在的 RDD 成爲 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基於方法的反射可讓你的代碼更簡潔.
第二種用於建立 Dataset 的方法是經過一個容許你構造一個 Schema 而後把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它容許你去構造 Dataset.
使用反射推斷Schema
Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 爲 DataFrame.Case class 定義了表的 Schema.Case class 的參數名使用反射讀取而且成爲了列名.Case class 也能夠是嵌套的或者包含像 Seq
或者 Array
這樣的複雜類型.這個 RDD 可以被隱式轉換成一個 DataFrame 而後被註冊爲一個表.表能夠用於後續的 SQL 語句.
// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
以編程的方式指定Schema
當 case class 不可以在執行以前被定義(例如, records 記錄的結構在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析而且不一樣的用戶投影的字段是不同的).一個 DataFrame
可使用下面的三步以編程的方式來建立.
- 從原始的 RDD 建立 RDD 的
Row
(行); - Step 1 被建立後, 建立 Schema 表示一個
StructType
匹配 RDD 中的Row
(行)的結構. - 經過
SparkSession
提供的createDataFrame
方法應用 Schema 到 RDD 的 RowS(行).
例如:
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
Aggregations
The built-in DataFrames functions provide common aggregations such as count()
, countDistinct()
, avg()
, max()
, min()
, etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register(