Apache Spark 2.2.0 中文文檔 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide

Overview

Spark SQL 是 Spark 處理結構化數據的一個模塊.與基礎的 Spark RDD API 不一樣, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式能夠跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 不管使用哪一種 API / 語言均可以快速的計算.這種統一意味着開發人員可以在基於提供最天然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不一樣的 .html

該頁面全部例子使用的示例數據都包含在 Spark 的發佈中, 而且可使用 spark-shellpyspark shell, 或者 sparkR shell來運行.java

SQL

Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也可以被用於從已存在的 Hive 環境中讀取數據.更多關於如何配置這個特性的信息, 請參考 Hive 表 這部分. 當以另外的編程語言運行SQL 時, 查詢結果將以 Dataset/DataFrame的形式返回.您也可使用 命令行或者經過 JDBC/ODBC與 SQL 接口交互.python

Datasets and DataFrames

一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優勢(強類型化, 可以使用強大的 lambda 函數)與Spark SQL執行引擎的優勢.一個 Dataset 能夠從 JVM 對象來 構造 而且使用轉換功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因爲 Python 的動態特性, 許多 Dataset API 的優勢已經可用了 (也就是說, 你可能經過 name 天生的row.columnName屬性訪問一行中的字段).這種狀況和 R 類似.mysql

一個 DataFrame 是一個 Dataset 組成的指定列.它的概念與一個在關係型數據庫或者在 R/Python 中的表是相等的, 可是有不少優化. DataFrames 能夠從大量的 sources 中構造出來, 好比: 結構化的文本文件, Hive中的表, 外部數據庫, 或者已經存在的 RDDs. DataFrame API 能夠在 Scala, Java, Python, 和 R中實現. 在 Scala 和 Java中, 一個 DataFrame 所表明的是一個多個 Row(行)的的 Dataset(數據集合). 在 the Scala API中, DataFrame僅僅是一個 Dataset[Row]類型的別名. 然而, 在 Java API中, 用戶須要去使用 Dataset<Row> 去表明一個 DataFrame.git

在此文檔中, 咱們將經常會引用 Scala/Java Datasets 的 Rows 做爲 DataFrames.github

開始入門

起始點: SparkSession

Spark SQL中全部功能的入口點是 SparkSession 類. 要建立一個 SparkSession, 僅使用 SparkSession.builder()就能夠了:sql

import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark 2.0 中的SparkSession 爲 Hive 特性提供了內嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數據的能力.爲了使用這些特性, 你不須要去有一個已存在的 Hive 設置.shell

建立 DataFrames

在一個 SparkSession中, 應用程序能夠從一個 已經存在的 RDD, 從hive表, 或者從 Spark數據源中建立一個DataFrames.數據庫

舉個例子, 下面就是基於一個JSON文件建立一個DataFrame:express

val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

無類型的Dataset操做 (aka DataFrame 操做)

DataFrames 提供了一個特定的語法用在 ScalaJavaPython and R中機構化數據的操做.

正如上面提到的同樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個 Rows的Dataset. 這些操做也參考了與強類型的Scala/Java Datasets中的」類型轉換」 對應的」無類型轉換」 .

這裏包括一些使用 Dataset 進行結構化數據處理的示例 :

// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

可以在 DataFrame 上被執行的操做類型的完整列表請參考 API 文檔.

除了簡單的列引用和表達式以外, DataFrame 也有豐富的函數庫, 包括 string 操做, date 算術, 常見的 math 操做以及更多.可用的完整列表請參考  DataFrame 函數指南.

Running SQL Queries Programmatically

SparkSession 的 sql 函數可讓應用程序以編程的方式運行 SQL 查詢, 並將結果做爲一個 DataFrame 返回.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

全局臨時視圖

Spark SQL中的臨時視圖是session級別的, 也就是會隨着session的消失而消失. 若是你想讓一個臨時視圖在全部session中相互傳遞而且可用, 直到Spark 應用退出, 你能夠創建一個全局的臨時視圖.全局的臨時視圖存在於系統數據庫 global_temp中, 咱們必須加上庫名去引用它, 好比. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

建立Datasets

Dataset 與 RDD 類似, 然而, 並非使用 Java 序列化或者 Kryo 編碼器 來序列化用於處理或者經過網絡進行傳輸的對象. 雖然編碼器和標準的序列化都負責將一個對象序列化成字節, 編碼器是動態生成的代碼, 而且使用了一種容許 Spark 去執行許多像 filtering, sorting 以及 hashing 這樣的操做, 不須要將字節反序列化成對象的格式.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

RDD的互操做性

Spark SQL 支持兩種不一樣的方法用於轉換已存在的 RDD 成爲 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基於方法的反射可讓你的代碼更簡潔.

第二種用於建立 Dataset 的方法是經過一個容許你構造一個 Schema 而後把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它容許你去構造 Dataset.

使用反射推斷Schema

Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 爲 DataFrame.Case class 定義了表的 Schema.Case class 的參數名使用反射讀取而且成爲了列名.Case class 也能夠是嵌套的或者包含像 Seq 或者 Array 這樣的複雜類型.這個 RDD 可以被隱式轉換成一個 DataFrame 而後被註冊爲一個表.表能夠用於後續的 SQL 語句.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

以編程的方式指定Schema

當 case class 不可以在執行以前被定義(例如, records 記錄的結構在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析而且不一樣的用戶投影的字段是不同的).一個 DataFrame 可使用下面的三步以編程的方式來建立.

  1. 從原始的 RDD 建立 RDD 的 Row(行);
  2. Step 1 被建立後, 建立 Schema 表示一個 StructType 匹配 RDD 中的 Row(行)的結構.
  3. 經過 SparkSession 提供的 createDataFrame 方法應用 Schema 到 RDD 的 RowS(行).

例如:

import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count()countDistinct()avg()max()min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources (數據源)

Spark SQL 支持經過 DataFrame 接口對各類 data sources (數據源)進行操做. DataFrame 可使用 relational transformations (關係轉換)操做, 也可用於建立 temporary view (臨時視圖). 將 DataFrame 註冊爲 temporary view (臨時視圖)容許您對其數據運行 SQL 查詢. 本節 描述了使用 Spark Data Sources 加載和保存數據的通常方法, 而後涉及可用於 built-in data sources (內置數據源)的 specific options (特定選項).

Generic Load/Save Functions (通用 加載/保存 功能)

在最簡單的形式中, 默認數據源(parquet, 除非另有配置 spark.sql.sources.default )將用於全部操做.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Manually Specifying Options (手動指定選項)

您還能夠 manually specify (手動指定)將與任何你想傳遞給 data source 的其餘選項一塊兒使用的 data source . Data sources 由其 fully qualified name (徹底限定名稱)(即 org.apache.spark.sql.parquet ), 可是對於 built-in sources (內置的源), 你也可使用它們的 shortnames (短名稱)(jsonparquetjdbcorclibsvmcsvtext).從任何 data source type (數據源類型)加載 DataFrames 可使用此 syntax (語法)轉換爲其餘類型.

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Run SQL on files directly (直接在文件上運行 SQL)

不使用讀取 API 將文件加載到 DataFrame 並進行查詢, 也能夠直接用 SQL 查詢該文件.

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Save Modes (保存模式)

Save operations (保存操做)能夠選擇使用 SaveMode , 它指定如何處理現有數據若是存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)而且不是 atomic (原子). 另外, 當執行 Overwrite 時, 數據將在新數據寫出以前被刪除.

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則會拋出異常.
SaveMode.Append "append" 將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已存在, 則 DataFrame 的內容將被 append (附加)到現有數據中.
SaveMode.Overwrite "overwrite" Overwrite mode (覆蓋模式)意味着將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已經存在, 則預期 DataFrame 的內容將 overwritten (覆蓋)現有數據.
SaveMode.Ignore "ignore" Ignore mode (忽略模式)意味着當將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則保存操做預期不會保存 DataFrame 的內容, 而且不更改現有數據. 這與 SQL 中的 CREATE TABLE IF NOT EXISTS 相似.

Saving to Persistent Tables (保存到持久表)

DataFrames 也可使用 saveAsTable 命令做爲 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現有的 Hive 部署)不須要使用此功能. Spark 將爲您建立默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與 createOrReplaceTempView 命令不一樣, saveAsTable 將 materialize (實現) DataFrame 的內容, 並建立一個指向 Hive metastore 中數據的指針. 即便您的 Spark 程序從新啓動, Persistent tables (持久性表)仍然存在, 由於您保持與同一個 metastore 的鏈接. 能夠經過使用表的名稱在 SparkSession上調用 table 方法來建立 persistent tabl (持久表)的 DataFrame .

對於 file-based (基於文件)的 data source (數據源), 例如 text, parquet, json等, 您能夠經過 path 選項指定 custom table path (自定義表路徑), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 而且表數據仍然存在. 若是未指定自定義表路徑, Spark 將把數據寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.

從 Spark 2.1 開始, persistent datasource tables (持久性數據源表)將 per-partition metadata (每一個分區元數據)存儲在 Hive metastore 中. 這帶來了幾個好處:

  • 因爲 metastore 只能返回查詢的必要 partitions (分區), 所以再也不須要將第一個查詢上的全部 partitions discovering 到表中.
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 如今可用於使用 Datasource API 建立的表.

請注意, 建立 external datasource tables (外部數據源表)(帶有 path 選項)的表時, 默認狀況下不會收集 partition information (分區信息). 要 sync (同步) metastore 中的分區信息, 能夠調用 MSCK REPAIR TABLE .

Bucketing, Sorting and Partitioning (分桶, 排序和分區)

對於 file-based data source (基於文件的數據源), 也能夠對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用於 persistent tables :

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

在使用 Dataset API 時, partitioning 能夠同時與 save 和 saveAsTable 一塊兒使用.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

能夠爲 single table (單個表)使用 partitioning 和 bucketing:

peopleDF
  .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

partitionBy 建立一個 directory structure (目錄結構), 如 Partition Discovery 部分所述. 所以, 對 cardinality (基數)較高的 columns 的適用性有限. 相反, bucketBy 能夠在固定數量的 buckets 中分配數據, 而且能夠在 a number of unique values is unbounded (多個惟一值無界時)使用數據.

Parquet Files

Parquet 是許多其餘數據處理系統支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數據的模式). 當編寫 Parquet 文件時, 出於兼容性緣由, 全部 columns 都將自動轉換爲可空.

Loading Data Programmatically (以編程的方式加載數據)

使用上面例子中的數據:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Partition Discovery (分區發現)

Table partitioning (表分區)是在像 Hive 這樣的系統中使用的常見的優化方法. 在 partitioned table (分區表)中, 數據一般存儲在不一樣的目錄中, partitioning column values encoded (分區列值編碼)在每一個 partition directory (分區目錄)的路徑中. Parquet data source (Parquet 數據源)如今能夠自動 discover (發現)和 infer (推斷)分區信息. 例如, 咱們可使用如下 directory structure (目錄結構)將全部之前使用的 population data (人口數據)存儲到 partitioned table (分區表)中, 其中有兩個額外的列 gender 和 country 做爲 partitioning columns (分區列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

經過將 path/to/table 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 將自動從路徑中提取 partitioning information (分區信息). 如今返回的 DataFrame 的 schema (模式)變成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

請注意, 會自動 inferred (推斷) partitioning columns (分區列)的 data types (數據類型).目前, 支持 numeric data types (數字數據類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區列)的數據類型.對於這些用例, automatic type inference (自動類型推斷)能夠由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默認爲 true .當禁用 type inference (類型推斷)時, string type (字符串類型)將用於 partitioning columns (分區列).

從 Spark 1.6.0 開始, 默認狀況下, partition discovery (分區發現)只能找到給定路徑下的 partitions (分區).對於上述示例, 若是用戶將 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load , 則 gender 將不被視爲 partitioning column (分區列).若是用戶須要指定 partition discovery (分區發現)應該開始的基本路徑, 則能夠在數據源選項中設置 basePath.例如, 當 path/to/table/gender=male 是數據的路徑而且用戶將 basePath 設置爲 path/to/table/gender 將是一個 partitioning column (分區列).

Schema Merging (模式合併)

像 ProtocolBuffer , Avro 和 Thrift 同樣, Parquet 也支持 schema evolution (模式演進). 用戶能夠從一個 simple schema (簡單的架構)開始, 並根據須要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不一樣但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數據源)如今可以自動檢測這種狀況並 merge (合併)全部這些文件的 schemas .

因爲 schema merging (模式合併)是一個 expensive operation (相對昂貴的操做), 而且在大多數狀況下不是必需的, 因此默認狀況下從 1.5.0 開始. 你能夠按照以下的方式啓用它:

  1. 讀取 Parquet 文件時, 將 data source option (數據源選項) mergeSchema 設置爲 true (以下面的例子所示), 或
  2. 將 global SQL option (全局 SQL 選項) spark.sql.parquet.mergeSchema 設置爲 true .
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)

當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來得到更好的性能. 此 behavior (行爲)由 spark.sql.hive.convertMetastoreParquet 配置控制, 默認狀況下 turned on (打開).

Hive/Parquet Schema Reconciliation

從 table schema processing (表格模式處理)的角度來講, Hive 和 Parquet 之間有兩個關鍵的區別.

  1. Hive 不區分大小寫, 而 Parquet 不是
  2. Hive 認爲全部 columns (列)均可覺得空, 而 Parquet 中的可空性是 significant (重要)的.

因爲這個緣由, 當將 Hive metastore Parquet 錶轉換爲 Spark SQL Parquet 表時, 咱們必須調整 metastore schema 與 Parquet schema. reconciliation 規則是:

  1. 在兩個 schema 中具備 same name (相同名稱)的 Fields (字段)必須具備 same data type (相同的數據類型), 而無論 nullability (可空性). reconciled field 應具備 Parquet 的數據類型, 以便 nullability (可空性)獲得尊重.

  2. reconciled schema (調和模式)正好包含 Hive metastore schema 中定義的那些字段.

    • 只出如今 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
    • 僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中做爲 nullable field (可空字段)添加.

Metadata Refreshing (元數據刷新)

Spark SQL 緩存 Parquet metadata 以得到更好的性能. 當啓用 Hive metastore Parquet table conversion (轉換)時, 這些 converted tables (轉換表)的 metadata (元數據)也被 cached (緩存). 若是這些表由 Hive 或其餘外部工具更新, 則須要手動刷新以確保 consistent metadata (一致的元數據).

// spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration (配置)

可使用 SparkSession 上的 setConf 方法或使用 SQL 運行 SET key = value 命令來完成 Parquet 的配置.

Property Name (參數名稱) Default(默認) Meaning(含義)
spark.sql.parquet.binaryAsString false 一些其餘 Parquet-producing systems (Parquet 生產系統), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區分 binary data (二進制數據)和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數據)解釋爲 string (字符串)以提供與這些系統的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數據解析爲 timestamp 以提供與這些系統的兼容性.
spark.sql.parquet.cacheMetadata true 打開 Parquet schema metadata 的緩存. 能夠加快查詢靜態數據.
spark.sql.parquet.compression.codec snappy 在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 設置爲 true 時啓用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 當設置爲 false 時, Spark SQL 將使用 Hive SerDe 做爲 parquet tables , 而不是內置的支持.
spark.sql.parquet.mergeSchema false

當爲 true 時, Parquet data source (Parquet 數據源) merges (合併)從全部 data files (數據文件)收集的 schemas , 不然若是沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .

spark.sql.optimizer.metadataOnly true

若是爲 true , 則啓用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區列)而不是 table scans (表掃描). 當 scanned (掃描)的全部 columns (列)都是 partition columns (分區列)而且 query (查詢)具備知足 distinct semantics (不一樣語義)的 aggregate operator (聚合運算符)時, 它將適用.

JSON Datasets (JSON 數據集)

Spark SQL 能夠 automatically infer (自動推斷)JSON dataset 的 schema, 並將其做爲 Dataset[Row] 加載. 這個 conversion (轉換)能夠在 Dataset[String] 上使用 SparkSession.read.json() 來完成, 或 JSON 文件.

請注意, 以 a json file 提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱 JSON Lines text format, also called newline-delimited JSON .

對於 regular multi-line JSON file (常規的多行 JSON 文件), 將 multiLine 選項設置爲 true .

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive 表

Spark SQL 還支持讀取和寫入存儲在 Apache Hive 中的數據。 可是,因爲 Hive 具備大量依賴關係,所以這些依賴關係不包含在默認 Spark 分發中。 若是在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。 請注意,這些 Hive 依賴關係也必須存在於全部工做節點上,由於它們將須要訪問 Hive 序列化和反序列化庫 (SerDes),以訪問存儲在 Hive 中的數據。

經過將 hive-site.xmlcore-site.xml(用於安全配置)和 hdfs-site.xml (用於 HDFS 配置)文件放在 conf/ 中來完成配置。

當使用 Hive 時,必須用 Hive 支持實例化 SparkSession,包括鏈接到持續的 Hive 轉移,支持 Hive serdes 和 Hive 用戶定義的功能。 沒有現有 Hive 部署的用戶仍然能夠啓用 Hive 支持。 當 hive-site.xml 未配置時,上下文會自動在當前目錄中建立 metastore_db,並建立由 spark.sql.warehouse.dir 配置的目錄,該目錄默認爲Spark應用程序當前目錄中的 spark-warehouse 目錄 開始了 請注意,自從2.0.0以來,hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已被棄用。 而是使用 spark.sql.warehouse.dir 來指定倉庫中數據庫的默認位置。 您可能須要向啓動 Spark 應用程序的用戶授予寫權限。å

import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.

指定 Hive 表的存儲格式

建立 Hive 表時,須要定義如何 從/向 文件系統 read/write 數據,即 「輸入格式」 和 「輸出格式」。 您還須要定義該表如何將數據反序列化爲行,或將行序列化爲數據,即 「serde」。 如下選項可用於指定存儲格式 (「serde」, 「input format」, 「output format」),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默認狀況下,咱們將以純文本形式讀取表格文件。 請注意,Hive 存儲處理程序在建立表時不受支持,您可使用 Hive 端的存儲處理程序建立一個表,並使用 Spark SQL 來讀取它。

Property Name Meaning
fileFormat fileFormat是一種存儲格式規範的包,包括 "serde","input format" 和 "output format"。 目前咱們支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定爲字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,若是您已經指定了 "fileFormat" 選項,則沒法指定它們。
serde 此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,若是給定的 `fileFormat` 已經包含 serde 的信息,那麼不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用這3個文件格式的這個選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這些選項只能與 "textfile" 文件格式一塊兒使用。它們定義如何將分隔的文件讀入行。

使用 OPTIONS 定義的全部其餘屬性將被視爲 Hive serde 屬性。

與不一樣版本的 Hive Metastore 進行交互

Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互,這使得 Spark SQL 可以訪問 Hive 表的元數據。 從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制構建可使用下面所述的配置來查詢不一樣版本的 Hive 轉移。 請注意,獨立於用於與轉移點通訊的 Hive 版本,內部 Spark SQL 將針對 Hive 1.2.1 進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。

如下選項可用於配置用於檢索元數據的 Hive 版本:

屬性名稱 默認值 含義
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。 可用選項爲 0.12.0 至 1.2.1
spark.sql.hive.metastore.jars builtin 當啓用 -Phive 時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲 1.2.1 或未定義。 行家 使用從Maven存儲庫下載的指定版本的Hive jar。 一般不建議在生產部署中使用此配置。 ***** 應用於實例化 HiveMetastoreClient 的 jar 的位置。該屬性能夠是三個選項之一:
  1. builtin當啓用 -Phive 時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲 1.2.1 或未定義。
  2. maven使用從 Maven 存儲庫下載的指定版本的 Hive jar。一般不建議在生產部署中使用此配置。
  3. JVM 的標準格式的 classpath。 該類路徑必須包含全部 Hive 及其依賴項,包括正確版本的 Hadoop。這些罐只須要存在於 driver 程序中,但若是您正在運行在 yarn 集羣模式,那麼您必須確保它們與應用程序一塊兒打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。 其它須要共享的類,是須要與已經共享的類進行交互的。 例如,log4j 使用的自定義 appender。

spark.sql.hive.metastore.barrierPrefixes (empty)

一個逗號分隔的類前綴列表,應該明確地爲 Spark SQL 正在通訊的 Hive 的每一個版本從新加載。 例如,在一般將被共享的前綴中聲明的 Hive UDF (即: org.apache.spark.*)。

JDBC 鏈接其它數據庫

Spark SQL 還包括可使用 JDBC 從其餘數據庫讀取數據的數據源。此功能應優於使用 JdbcRDD。 這是由於結果做爲 DataFrame 返回,而且能夠輕鬆地在 Spark SQL 中處理或與其餘數據源鏈接。 JDBC 數據源也更容易從 Java 或 Python 使用,由於它不須要用戶提供 ClassTag。(請注意,這不一樣於 Spark SQL JDBC 服務器,容許其餘應用程序使用 Spark SQL 運行查詢)。

要開始使用,您須要在 Spark 類路徑中包含特定數據庫的 JDBC driver 程序。 例如,要從 Spark Shell 鏈接到 postgres,您將運行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可使用 Data Sources API 未來自遠程數據庫的表做爲 DataFrame 或 Spark SQL 臨時視圖進行加載。 用戶能夠在數據源選項中指定 JDBC 鏈接屬性。用戶 和 密碼一般做爲登陸數據源的鏈接屬性提供。 除了鏈接屬性外,Spark 還支持如下不區分大小寫的選項:

屬性名稱 含義
url 要鏈接的JDBC URL。 源特定的鏈接屬性能夠在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 應該讀取的 JDBC 表。請注意,可使用在SQL查詢的 FROM 子句中有效的任何內容。 例如,您可使用括號中的子查詢代替完整表。
driver 用於鏈接到此 URL 的 JDBC driver 程序的類名。
partitionColumn, lowerBound, upperBound 若是指定了這些選項,則必須指定這些選項。 另外,必須指定 numPartitions. 他們描述如何從多個 worker 並行讀取數據時將表給分區。partitionColumn 必須是有問題的表中的數字列。 請注意,lowerBound 和 upperBound 僅用於決定分區的大小,而不是用於過濾表中的行。 所以,表中的全部行將被分區並返回。此選項僅適用於讀操做。
numPartitions 在表讀寫中能夠用於並行度的最大分區數。這也肯定併發JDBC鏈接的最大數量。 若是要寫入的分區數超過此限制,則在寫入以前經過調用 coalesce(numPartitions) 將其減小到此限制。
fetchsize JDBC 抓取的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操做。
batchsize JDBC 批處理的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能。 該選項僅適用於寫操做。默認值爲 1000.
isolationLevel 事務隔離級別,適用於當前鏈接。 它能夠是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,對應於 JDBC 鏈接對象定義的標準事務隔離級別,默認爲 READ_UNCOMMITTED。 此選項僅適用於寫操做。請參考 java.sql.Connection 中的文檔。
truncate 這是一個與 JDBC 相關的選項。 啓用 SaveMode.Overwrite 時,此選項會致使 Spark 截斷現有表,而不是刪除並從新建立。 這能夠更有效,而且防止表元數據(例如,索引)被移除。 可是,在某些狀況下,例如當新數據具備不一樣的模式時,它將沒法工做。 它默認爲 false。 此選項僅適用於寫操做。
createTableOptions 這是一個與JDBC相關的選項。 若是指定,此選項容許在建立表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此選項僅適用於寫操做。
createTableColumnTypes 使用數據庫列數據類型而不是默認值,建立表時。 數據類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的類型應該是有效的 spark sql 數據類型。此選項僅適用於寫操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

故障排除

  • JDBC driver 程序類必須對客戶端會話和全部執行程序上的原始類加載器可見。 這是由於 Java 的 DriverManager 類執行安全檢查,致使它忽略原始類加載器不可見的全部 driver 程序,當打開鏈接時。一個方便的方法是修改全部工做節點上的compute_classpath.sh 以包含您的 driver 程序 JAR。
  • 一些數據庫,例如 H2,將全部名稱轉換爲大寫。 您須要使用大寫字母來引用 Spark SQL 中的這些名稱。

性能調優

對於某些工做負載,能夠經過緩存內存中的數據或打開一些實驗選項來提升性能。

在內存中緩存數據

Spark SQL 能夠經過調用 spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 來使用內存中的列格式來緩存表。 而後,Spark SQL 將只掃描所需的列,並將自動調整壓縮以最小化內存使用量和 GC 壓力。 您能夠調用 spark.catalog.uncacheTable("tableName") 從內存中刪除該表。

內存緩存的配置可使用 SparkSession 上的 setConf 方法或使用 SQL 運行 SET key=value 命令來完成。

屬性名稱 默認 含義
spark.sql.inMemoryColumnarStorage.compressed true 當設置爲 true 時,Spark SQL 將根據數據的統計信息爲每一個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱狀緩存的大小。更大的批量大小能夠提升內存利用率和壓縮率,可是在緩存數據時會冒出 OOM 風險。

其餘配置選項

如下選項也可用於調整查詢執行的性能。這些選項可能會在未來的版本中被廢棄,由於更多的優化是自動執行的。

屬性名稱 默認值 含義
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在讀取文件時,將單個分區打包的最大字節數。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字節數來衡量的打開文件的估計費用能夠在同一時間進行掃描。 將多個文件放入分區時使用。最好過分估計,那麼具備小文件的分區將比具備較大文件的分區(首先計劃的)更快。
spark.sql.broadcastTimeout 300

廣播鏈接中的廣播等待時間超時(秒)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置執行鏈接時將廣播給全部工做節點的表的最大大小(以字節爲單位)。 經過將此值設置爲-1能夠禁用廣播。 請注意,目前的統計信息僅支持 Hive Metastore 表,其中已運行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

分佈式 SQL 引擎

Spark SQL 也能夠充當使用其 JDBC/ODBC 或命令行界面的分佈式查詢引擎。 在這種模式下,最終用戶或應用程序能夠直接與 Spark SQL 交互運行 SQL 查詢,而不須要編寫任何代碼。

運行 Thrift JDBC/ODBC 服務器

這裏實現的 Thrift JDBC/ODBC 服務器對應於 Hive 1.2 中的 HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。

要啓動 JDBC/ODBC 服務器,請在 Spark 目錄中運行如下命令:

./sbin/start-thriftserver.sh

此腳本接受全部 bin/spark-submit 命令行選項,以及 --hiveconf 選項來指定 Hive 屬性。 您能夠運行 ./sbin/start-thriftserver.sh --help 查看全部可用選項的完整列表。 默認狀況下,服務器監聽 localhost:10000. 您能夠經過環境變量覆蓋此行爲,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

or system properties:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

如今,您可使用 beeline 來測試 Thrift JDBC/ODBC 服務器:

./bin/beeline

使用 beeline 方式鏈接到 JDBC/ODBC 服務器:

beeline> !connect jdbc:hive2://localhost:10000

Beeline 將要求您輸入用戶名和密碼。 在非安全模式下,只需輸入機器上的用戶名和空白密碼便可。 對於安全模式,請按照 beeline 文檔 中的說明進行操做。

配置Hive是經過將 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可使用 Hive 附帶的 beeline 腳本。

Thrift JDBC 服務器還支持經過 HTTP 傳輸發送 thrift RPC 消息。 使用如下設置啓用 HTTP 模式做爲系統屬性或在 conf/ 中的 hive-site.xml 文件中啓用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要測試,請使用 beeline 以 http 模式鏈接到 JDBC/ODBC 服務器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

運行 Spark SQL CLI

Spark SQL CLI 是在本地模式下運行 Hive 轉移服務並執行從命令行輸入的查詢的方便工具。 請注意,Spark SQL CLI 不能與 Thrift JDBC 服務器通訊。

要啓動 Spark SQL CLI,請在 Spark 目錄中運行如下命令:

./bin/spark-sql

配置 Hive 是經過將 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您能夠運行 ./bin/spark-sql --help 獲取全部可用選項的完整列表。

遷移指南

從 Spark SQL 2.1 升級到 2.2

  • Spark 2.1.1 介紹了一個新的配置 key: spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是 NEVER_INFER, 其行爲與 2.1.0 保持一致. 可是,Spark 2.2.0 將此設置的默認值更改成 「INFER_AND_SAVE」,以恢復與底層文件 schema(模式)具備大小寫混合的列名稱的 Hive metastore 表的兼容性。使用 INFER_AND_SAVE 配置的 value, 在第一次訪問 Spark 將對其還沒有保存推測 schema(模式)的任何 Hive metastore 表執行 schema inference(模式推斷). 請注意,對於具備數千個 partitions(分區)的表,模式推斷多是很是耗時的操做。若是不兼容大小寫混合的列名,您能夠安全地將spark.sql.hive.caseSensitiveInferenceMode 設置爲 NEVER_INFER,以免模式推斷的初始開銷。請注意,使用新的默認INFER_AND_SAVE 設置,模式推理的結果被保存爲 metastore key 以供未來使用。所以,初始模式推斷僅發生在表的第一次訪問。

從 Spark SQL 2.0 升級到 2.1

  • Datasource tables(數據源表)如今存儲了 Hive metastore 中的 partition metadata(分區元數據). 這意味着諸如 ALTER TABLE PARTITION ... SET LOCATION 這樣的 Hive DDLs 如今使用 Datasource API 可用於建立 tables(表).
    • 遺留的數據源表能夠經過 MSCK REPAIR TABLE 命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
    • 要肯定表是否已遷移,當在表上發出 DESCRIBE FORMATTED 命令時請查找 PartitionProvider: Catalog 屬性.
  • Datasource tables(數據源表)的 INSERT OVERWRITE TABLE ... PARTITION ... 行爲的更改。
    • 在之前的 Spark 版本中,INSERT OVERWRITE 覆蓋了整個 Datasource table,即便給出一個指定的 partition. 如今只有匹配規範的 partition 被覆蓋。
    • 請注意,這仍然與 Hive 表的行爲不一樣,Hive 表僅覆蓋與新插入數據重疊的分區。

從 Spark SQL 1.6 升級到 2.0

  • SparkSession 如今是 Spark 新的切入點, 它替代了老的 SQLContext 和 HiveContext。注意 : 爲了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能夠從 SparkSession 獲取一個新的 catalog 接口 — 現有的訪問數據庫和表的 API,如 listTablescreateExternalTabledropTempViewcacheTable 都被移到該接口。

  • Dataset API 和 DataFrame API 進行了統一。在 Scala 中,DataFrame 變成了 Dataset[Row] 類型的一個別名,而 Java API 使用者必須將 DataFrame 替換成 Dataset<Row>。Dataset 類既提供了強類型轉換操做(如 mapfilter 以及 groupByKey)也提供了非強類型轉換操做(如 select 和 groupBy)。因爲編譯期的類型安全不是 Python 和 R 語言的一個特性,Dataset 的概念並不適用於這些語言的 API。相反,DataFrame仍然是最基本的編程抽象, 就相似於這些語言中單節點 data frame 的概念。

  • Dataset 和 DataFrame API 中 unionAll 已通過時而且由 union 替代。
  • Dataset 和 DataFrame API 中 explode 已通過時,做爲選擇,能夠結合 select 或 flatMap 使用 functions.explode() 。
  • Dataset 和 DataFrame API 中 registerTempTable 已通過時而且由 createOrReplaceTempView 替代。

  • 對 Hive tables CREATE TABLE ... LOCATION 行爲的更改.
    • 從 Spark 2.0 開始,CREATE TABLE ... LOCATION 與 CREATE EXTERNAL TABLE ... LOCATION 是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現有數據。這意味着,在用戶指定位置的 Spark SQL 中建立的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數據。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意,這與Hive行爲不一樣。
    • 所以,這些表上的 「DROP TABLE」 語句不會刪除數據。

從 Spark SQL 1.5 升級到 1.6

  • 從 Spark 1.6 開始,默認狀況下服務器在多 session(會話)模式下運行。這意味着每一個 JDBC/ODBC 鏈接擁有一份本身的 SQL 配置和臨時函數註冊。緩存表仍在並共享。若是您但願以舊的單會話模式運行 Thrift server,請設置選項 spark.sql.hive.thriftServer.singleSession 爲true。您既能夠將此選項添加到 spark-defaults.conf,或者經過 --conf 將它傳遞給 start-thriftserver.sh
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ... 
  • 從 1.6.1 開始,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現有列。

  • 從 Spark 1.6 開始,LongType 強制轉換爲 TimestampType 指望是秒,而不是微秒。這種更改是爲了匹配 Hive 1.2 的行爲,以便從 numeric(數值)類型進行更一致的類型轉換到 TimestampType。更多詳情請參閱 SPARK-11724 。

從 Spark SQL 1.4 升級到 1.5

  • 使用手動管理的內存優化執行,如今是默認啓用的,以及代碼生成表達式求值。這些功能既能夠經過設置 spark.sql.tungsten.enabled 爲 false 來禁止使用。
  • Parquet 的模式合併默認狀況下再也不啓用。它能夠經過設置 spark.sql.parquet.mergeSchema 到 true 以從新啓用。
  • 字符串在 Python 列的 columns(列)如今支持使用點(.)來限定列或訪問嵌套值。例如 df['table.column.nestedField']。可是,這意味着若是你的列名中包含任何圓點,你如今必須避免使用反引號(如 table.column.with.dots.nested)。
  • 在內存中的列存儲分區修剪默認是開啓的。它能夠經過設置 spark.sql.inMemoryColumnarStorage.partitionPruning 爲 false 來禁用。
  • 無限精度的小數列再也不支持,而不是 Spark SQL 最大精度爲 38 。當從 BigDecimal 對象推斷模式時,如今使用(38,18)。在 DDL 沒有指定精度時,則默認保留 Decimal(10, 0)
  • 時間戳如今存儲在 1 微秒的精度,而不是 1 納秒的。
  • 在 sql 語句中,floating point(浮點數)如今解析爲 decimal。HiveQL 解析保持不變。
  • SQL / DataFrame 函數的規範名稱如今是小寫(例如 sum vs SUM)。
  • JSON 數據源不會自動加載由其餘應用程序(未經過 Spark SQL 插入到數據集的文件)建立的新文件。對於 JSON 持久表(即表的元數據存儲在 Hive Metastore),用戶可使用 REFRESH TABLE SQL 命令或 HiveContext 的 refreshTable 方法,把那些新文件列入到表中。對於表明一個 JSON dataset 的 DataFrame,用戶須要從新建立 DataFrame,同時 DataFrame 中將包括新的文件。
  • PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現有的同名列。

從 Spark SQL 1.3 升級到 1.4

DataFrame data reader/writer interface

基於用戶反饋,咱們建立了一個新的更流暢的 API,用於讀取 (SQLContext.read) 中的數據並寫入數據 (DataFrame.write), 而且舊的 API 將過期(例如,SQLContext.parquetFileSQLContext.jsonFile).

針對 SQLContext.read ( ScalaJavaPython ) 和 DataFrame.write ( ScalaJavaPython ) 的更多細節,請看 API 文檔.

DataFrame.groupBy 保留 grouping columns(分組的列)

根據用戶的反饋, 咱們更改了 DataFrame.groupBy().agg() 的默認行爲以保留 DataFrame 結果中的 grouping columns(分組列). 爲了在 1.3 中保持該行爲,請設置 spark.sql.retainGroupColumns 爲 false.

// In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg($"department", max("age"), sum("expense")) // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")) // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn 上的行爲更改

以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。該列將始終在 DateFrame 結果中被加入做爲新的列,即便現有的列可能存在相同的名稱。從 1.4 版本開始,DataFrame.withColumn() 支持添加與全部現有列的名稱不一樣的列或替換現有的同名列。

請注意,這一變化僅適用於 Scala API,並不適用於 PySpark 和 SparkR。

從 Spark SQL 1.0-1.2 升級到 1.3

在 Spark 1.3 中,咱們從 Spark SQL 中刪除了 「Alpha」 的標籤,做爲一部分已經清理過的可用的 API 。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其餘版本的二進制兼容性。這種兼容性保證不包括被明確標記爲不穩定的(即 DeveloperApi 類或 Experimental) API。

重命名 DataFrame 的 SchemaRDD

升級到 Spark SQL 1.3 版本時,用戶會發現最大的變化是,SchemaRDD 已改名爲 DataFrame。這主要是由於 DataFrames 再也不從 RDD 直接繼承,而是由 RDDS 本身來實現這些功能。DataFrames 仍然能夠經過調用 .rdd 方法轉換爲 RDDS 。

在 Scala 中,有一個從 SchemaRDD 到 DataFrame 類型別名,能夠爲一些狀況提供源代碼兼容性。它仍然建議用戶更新他們的代碼以使用 DataFrame來代替。Java 和 Python 用戶須要更新他們的代碼。

Java 和 Scala APIs 的統一

此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext 和 JavaSchemaRDD),借鑑於 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已經統一。兩種語言的用戶可使用 SQLContext 和 DataFrame。通常來講論文類嘗試使用兩種語言的共有類型(如 Array 替代了一些特定集合)。在某些狀況下不通用的類型狀況下,(例如,passing in closures 或 Maps)使用函數重載代替。

此外,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可使用存在於 org.apache.spark.sql.types 類來描述編程模式。

隔離隱式轉換和刪除 dsl 包(僅Scala)

許多 Spark 1.3 版本之前的代碼示例都以 import sqlContext._ 開始,這提供了從 sqlContext 範圍的全部功能。在 Spark 1.3 中,咱們移除了從 RDDs 到 DateFrame 再到 SQLContext 內部對象的隱式轉換。用戶如今應該寫成 import sqlContext.implicits._.

此外,隱式轉換如今只能使用方法 toDF 來增長由 Product(即 case classes or tuples)構成的 RDD,而不是自動應用。

當使用 DSL 內部的函數時(如今使用 DataFrame API 來替換), 用戶習慣導入 org.apache.spark.sql.catalyst.dsl. 相反,應該使用公共的 dataframe 函數 API: import org.apache.spark.sql.functions._.

針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)

Spark 1.3 移除存在於基本 SQL 包的 DataType 類型別名。開發人員應改成導入類 org.apache.spark.sql.types

UDF 註冊遷移到 sqlContext.udf 中 (Java & Scala)

用於註冊 UDF 的函數,無論是 DataFrame DSL 仍是 SQL 中用到的,都被遷移到 SQLContext 中的 udf 對象中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF 註冊保持不變。

Python DataTypes 再也不是 Singletons(單例的)

在 Python 中使用 DataTypes 時,你須要先構造它們(如:StringType()),而不是引用一個單例對象。

與 Apache Hive 的兼容

Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基於 Hive 1.2.1 版本,而且Spark SQL 能夠鏈接到不一樣版本的Hive metastore(從 0.12.0 到 1.2.1,能夠參考 與不一樣版本的 Hive Metastore 交互

在現有的 Hive Warehouses 中部署

Spark SQL Thrift JDBC server 採用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不須要修改現有的 Hive Metastore , 或者改變數據的位置和表的分區。

所支持的 Hive 特性

Spark SQL 支持絕大部分的 Hive 功能,如:

  • Hive query(查詢)語句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部 Hive 操做, 包括:
    • 關係運算符 (===<><>>=<=, 等等)
    • 算術運算符 (+-*/%, 等等)
    • 邏輯運算符 (AND&&OR||, 等等)
    • 複雜類型的構造
    • 數學函數 (signlncos, 等等)
    • String 函數 (instrlengthprintf, 等等)
  • 用戶定義函數 (UDF)
  • 用戶定義聚合函數 (UDAF)
  • 用戶定義 serialization formats (SerDes)
  • 窗口函數
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查詢)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 全部的 Hive DDL 函數, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的 Hive Data types(數據類型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

未支持的 Hive 函數

如下是目前還不支持的 Hive 函數列表。在 Hive 部署中這些功能大部分都用不到。

主要的 Hive 功能

  • Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.

Esoteric Hive 功能

  • UNION 類型
  • Unique join
  • Column 統計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive 優化

有少數 Hive 優化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因爲 Spark SQL 的這種內存計算模型而顯得不那麼重要。另一些在 Spark SQL 將來的版本中會持續跟蹤。

  • Block 級別的 bitmap indexes 和虛擬 columns (用於構建 indexes)
  • 自動爲 join 和 groupBy 計算 reducer 個數 : 目前在 Spark SQL 中, 你須要使用 「SET spark.sql.shuffle.partitions=[num_tasks];」 來控制 post-shuffle 的並行度.
  • 僅 Meta-data 的 query: 對於只使用 metadata 就能回答的查詢,Spark SQL 仍然會啓動計算結果的任務.
  • Skew data flag: Spark SQL 不遵循 Hive 中 skew 數據的標記.
  • STREAMTABLE hint in join: Spark SQL 不遵循 STREAMTABLE hint.
  • 對於查詢結果合併多個小文件: 若是輸出的結果包括多個小文件, Hive 能夠可選的合併小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 還不支持這樣.

參考

數據類型

Spark SQL 和 DataFrames 支持下面的數據類型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementTypecontainsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataTypenullable is used to indicate if values of this fields can have null values.

Spark SQL 的全部數據類型都在包 org.apache.spark.sql.types 中. 你能夠用下示例示例來訪問它們.

import org.apache.spark.sql.types._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數據類型) Scala 中的 Value 類型 訪問或建立數據類型的 API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note(注意): containsNull 的默認值是 true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note(注意): valueContainsNull 的默認值是 true.
StructType org.apache.spark.sql.Row StructType(fields)
Note(注意): fields 是 StructFields 的 Seq. 全部, 兩個 fields 擁有相同的名稱是不被容許的.
StructField 該 field(字段)數據類型的 Scala 中的 value 類型 (例如, 數據類型爲 IntegerType 的 StructField 是 Int) StructField(name, dataType, [nullable])
Note: nullable 的默認值是 true.

NaN Semantics

當處理一些不符合標準浮點數語義的 float 或 double 類型時,對於 Not-a-Number(NaN) 須要作一些特殊處理. 具體以下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操做中,全部的 NaN values 將被分到同一個組中.
  • 在 join key 中 NaN 能夠當作一個普通的值.
  • NaN 值在升序排序中排到最後,比任何其餘數值都大.

 


Spark SQL, DataFrames and Datasets Guide

Overview

Spark SQL 是 Spark 處理結構化數據的一個模塊.與基礎的 Spark RDD API 不一樣, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式能夠跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 不管使用哪一種 API / 語言均可以快速的計算.這種統一意味着開發人員可以在基於提供最天然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不一樣的 .

該頁面全部例子使用的示例數據都包含在 Spark 的發佈中, 而且可使用 spark-shellpyspark shell, 或者 sparkR shell來運行.

SQL

Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也可以被用於從已存在的 Hive 環境中讀取數據.更多關於如何配置這個特性的信息, 請參考 Hive 表 這部分. 當以另外的編程語言運行SQL 時, 查詢結果將以 Dataset/DataFrame的形式返回.您也可使用 命令行或者經過 JDBC/ODBC與 SQL 接口交互.

Datasets and DataFrames

一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優勢(強類型化, 可以使用強大的 lambda 函數)與Spark SQL執行引擎的優勢.一個 Dataset 能夠從 JVM 對象來 構造 而且使用轉換功能(map, flatMap, filter, 等等). Dataset API 在Scala 和Java是可用的.Python 不支持 Dataset API.可是因爲 Python 的動態特性, 許多 Dataset API 的優勢已經可用了 (也就是說, 你可能經過 name 天生的row.columnName屬性訪問一行中的字段).這種狀況和 R 類似.

一個 DataFrame 是一個 Dataset 組成的指定列.它的概念與一個在關係型數據庫或者在 R/Python 中的表是相等的, 可是有不少優化. DataFrames 能夠從大量的 sources 中構造出來, 好比: 結構化的文本文件, Hive中的表, 外部數據庫, 或者已經存在的 RDDs. DataFrame API 能夠在 Scala, Java, Python, 和 R中實現. 在 Scala 和 Java中, 一個 DataFrame 所表明的是一個多個 Row(行)的的 Dataset(數據集合). 在 the Scala API中, DataFrame僅僅是一個 Dataset[Row]類型的別名. 然而, 在 Java API中, 用戶須要去使用 Dataset<Row> 去表明一個 DataFrame.

在此文檔中, 咱們將經常會引用 Scala/Java Datasets 的 Rows 做爲 DataFrames.

開始入門

起始點: SparkSession

Spark SQL中全部功能的入口點是 SparkSession 類. 要建立一個 SparkSession, 僅使用 SparkSession.builder()就能夠了:

import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Spark 2.0 中的SparkSession 爲 Hive 特性提供了內嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數據的能力.爲了使用這些特性, 你不須要去有一個已存在的 Hive 設置.

建立 DataFrames

在一個 SparkSession中, 應用程序能夠從一個 已經存在的 RDD, 從hive表, 或者從 Spark數據源中建立一個DataFrames.

舉個例子, 下面就是基於一個JSON文件建立一個DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

無類型的Dataset操做 (aka DataFrame 操做)

DataFrames 提供了一個特定的語法用在 ScalaJavaPython and R中機構化數據的操做.

正如上面提到的同樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個 Rows的Dataset. 這些操做也參考了與強類型的Scala/Java Datasets中的」類型轉換」 對應的」無類型轉換」 .

這裏包括一些使用 Dataset 進行結構化數據處理的示例 :

// This import is needed to use the $-notation
import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

可以在 DataFrame 上被執行的操做類型的完整列表請參考 API 文檔.

除了簡單的列引用和表達式以外, DataFrame 也有豐富的函數庫, 包括 string 操做, date 算術, 常見的 math 操做以及更多.可用的完整列表請參考  DataFrame 函數指南.

Running SQL Queries Programmatically

SparkSession 的 sql 函數可讓應用程序以編程的方式運行 SQL 查詢, 並將結果做爲一個 DataFrame 返回.

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

全局臨時視圖

Spark SQL中的臨時視圖是session級別的, 也就是會隨着session的消失而消失. 若是你想讓一個臨時視圖在全部session中相互傳遞而且可用, 直到Spark 應用退出, 你能夠創建一個全局的臨時視圖.全局的臨時視圖存在於系統數據庫 global_temp中, 咱們必須加上庫名去引用它, 好比. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

建立Datasets

Dataset 與 RDD 類似, 然而, 並非使用 Java 序列化或者 Kryo 編碼器 來序列化用於處理或者經過網絡進行傳輸的對象. 雖然編碼器和標準的序列化都負責將一個對象序列化成字節, 編碼器是動態生成的代碼, 而且使用了一種容許 Spark 去執行許多像 filtering, sorting 以及 hashing 這樣的操做, 不須要將字節反序列化成對象的格式.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

RDD的互操做性

Spark SQL 支持兩種不一樣的方法用於轉換已存在的 RDD 成爲 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基於方法的反射可讓你的代碼更簡潔.

第二種用於建立 Dataset 的方法是經過一個容許你構造一個 Schema 而後把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它容許你去構造 Dataset.

使用反射推斷Schema

Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 爲 DataFrame.Case class 定義了表的 Schema.Case class 的參數名使用反射讀取而且成爲了列名.Case class 也能夠是嵌套的或者包含像 Seq 或者 Array 這樣的複雜類型.這個 RDD 可以被隱式轉換成一個 DataFrame 而後被註冊爲一個表.表能夠用於後續的 SQL 語句.

// For implicit conversions from RDDs to DataFrames
import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19)) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

以編程的方式指定Schema

當 case class 不可以在執行以前被定義(例如, records 記錄的結構在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析而且不一樣的用戶投影的字段是不同的).一個 DataFrame 可使用下面的三步以編程的方式來建立.

  1. 從原始的 RDD 建立 RDD 的 Row(行);
  2. Step 1 被建立後, 建立 Schema 表示一個 StructType 匹配 RDD 中的 Row(行)的結構.
  3. 經過 SparkSession 提供的 createDataFrame 方法應用 Schema 到 RDD 的 RowS(行).

例如:

import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count()countDistinct()avg()max()min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources (數據源)

Spark SQL 支持經過 DataFrame 接口對各類 data sources (數據源)進行操做. DataFrame 可使用 relational transformations (關係轉換)操做, 也可用於建立 temporary view (臨時視圖). 將 DataFrame 註冊爲 temporary view (臨時視圖)容許您對其數據運行 SQL 查詢. 本節 描述了使用 Spark Data Sources 加載和保存數據的通常方法, 而後涉及可用於 built-in data sources (內置數據源)的 specific options (特定選項).

Generic Load/Save Functions (通用 加載/保存 功能)

在最簡單的形式中, 默認數據源(parquet, 除非另有配置 spark.sql.sources.default )將用於全部操做.

val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Manually Specifying Options (手動指定選項)

您還能夠 manually specify (手動指定)將與任何你想傳遞給 data source 的其餘選項一塊兒使用的 data source . Data sources 由其 fully qualified name (徹底限定名稱)(即 org.apache.spark.sql.parquet ), 可是對於 built-in sources (內置的源), 你也可使用它們的 shortnames (短名稱)(jsonparquetjdbcorclibsvmcsvtext).從任何 data source type (數據源類型)加載 DataFrames 可使用此 syntax (語法)轉換爲其餘類型.

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Run SQL on files directly (直接在文件上運行 SQL)

不使用讀取 API 將文件加載到 DataFrame 並進行查詢, 也能夠直接用 SQL 查詢該文件.

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Save Modes (保存模式)

Save operations (保存操做)能夠選擇使用 SaveMode , 它指定如何處理現有數據若是存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)而且不是 atomic (原子). 另外, 當執行 Overwrite 時, 數據將在新數據寫出以前被刪除.

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則會拋出異常.
SaveMode.Append "append" 將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已存在, 則 DataFrame 的內容將被 append (附加)到現有數據中.
SaveMode.Overwrite "overwrite" Overwrite mode (覆蓋模式)意味着將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已經存在, 則預期 DataFrame 的內容將 overwritten (覆蓋)現有數據.
SaveMode.Ignore "ignore" Ignore mode (忽略模式)意味着當將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則保存操做預期不會保存 DataFrame 的內容, 而且不更改現有數據. 這與 SQL 中的 CREATE TABLE IF NOT EXISTS 相似.

Saving to Persistent Tables (保存到持久表)

DataFrames 也可使用 saveAsTable 命令做爲 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現有的 Hive 部署)不須要使用此功能. Spark 將爲您建立默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與 createOrReplaceTempView 命令不一樣, saveAsTable 將 materialize (實現) DataFrame 的內容, 並建立一個指向 Hive metastore 中數據的指針. 即便您的 Spark 程序從新啓動, Persistent tables (持久性表)仍然存在, 由於您保持與同一個 metastore 的鏈接. 能夠經過使用表的名稱在 SparkSession上調用 table 方法來建立 persistent tabl (持久表)的 DataFrame .

對於 file-based (基於文件)的 data source (數據源), 例如 text, parquet, json等, 您能夠經過 path 選項指定 custom table path (自定義表路徑), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 而且表數據仍然存在. 若是未指定自定義表路徑, Spark 將把數據寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.

從 Spark 2.1 開始, persistent datasource tables (持久性數據源表)將 per-partition metadata (每一個分區元數據)存儲在 Hive metastore 中. 這帶來了幾個好處:

  • 因爲 metastore 只能返回查詢的必要 partitions (分區), 所以再也不須要將第一個查詢上的全部 partitions discovering 到表中.
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 如今可用於使用 Datasource API 建立的表.

請注意, 建立 external datasource tables (外部數據源表)(帶有 path 選項)的表時, 默認狀況下不會收集 partition information (分區信息). 要 sync (同步) metastore 中的分區信息, 能夠調用 MSCK REPAIR TABLE .

Bucketing, Sorting and Partitioning (分桶, 排序和分區)

對於 file-based data source (基於文件的數據源), 也能夠對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用於 persistent tables :

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

在使用 Dataset API 時, partitioning 能夠同時與 save 和 saveAsTable 一塊兒使用.

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

能夠爲 single table (單個表)使用 partitioning 和 bucketing:

peopleDF
  .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed") 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

partitionBy 建立一個 directory structure (目錄結構), 如 Partition Discovery 部分所述. 所以, 對 cardinality (基數)較高的 columns 的適用性有限. 相反, bucketBy 能夠在固定數量的 buckets 中分配數據, 而且能夠在 a number of unique values is unbounded (多個惟一值無界時)使用數據.

Parquet Files

Parquet 是許多其餘數據處理系統支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數據的模式). 當編寫 Parquet 文件時, 出於兼容性緣由, 全部 columns 都將自動轉換爲可空.

Loading Data Programmatically (以編程的方式加載數據)

使用上面例子中的數據:

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Partition Discovery (分區發現)

Table partitioning (表分區)是在像 Hive 這樣的系統中使用的常見的優化方法. 在 partitioned table (分區表)中, 數據一般存儲在不一樣的目錄中, partitioning column values encoded (分區列值編碼)在每一個 partition directory (分區目錄)的路徑中. Parquet data source (Parquet 數據源)如今能夠自動 discover (發現)和 infer (推斷)分區信息. 例如, 咱們可使用如下 directory structure (目錄結構)將全部之前使用的 population data (人口數據)存儲到 partitioned table (分區表)中, 其中有兩個額外的列 gender 和 country 做爲 partitioning columns (分區列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

經過將 path/to/table 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 將自動從路徑中提取 partitioning information (分區信息). 如今返回的 DataFrame 的 schema (模式)變成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

請注意, 會自動 inferred (推斷) partitioning columns (分區列)的 data types (數據類型).目前, 支持 numeric data types (數字數據類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區列)的數據類型.對於這些用例, automatic type inference (自動類型推斷)能夠由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默認爲 true .當禁用 type inference (類型推斷)時, string type (字符串類型)將用於 partitioning columns (分區列).

從 Spark 1.6.0 開始, 默認狀況下, partition discovery (分區發現)只能找到給定路徑下的 partitions (分區).對於上述示例, 若是用戶將 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load , 則 gender 將不被視爲 partitioning column (分區列).若是用戶須要指定 partition discovery (分區發現)應該開始的基本路徑, 則能夠在數據源選項中設置 basePath.例如, 當 path/to/table/gender=male 是數據的路徑而且用戶將 basePath 設置爲 path/to/table/gender 將是一個 partitioning column (分區列).

Schema Merging (模式合併)

像 ProtocolBuffer , Avro 和 Thrift 同樣, Parquet 也支持 schema evolution (模式演進). 用戶能夠從一個 simple schema (簡單的架構)開始, 並根據須要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不一樣但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數據源)如今可以自動檢測這種狀況並 merge (合併)全部這些文件的 schemas .

因爲 schema merging (模式合併)是一個 expensive operation (相對昂貴的操做), 而且在大多數狀況下不是必需的, 因此默認狀況下從 1.5.0 開始. 你能夠按照以下的方式啓用它:

  1. 讀取 Parquet 文件時, 將 data source option (數據源選項) mergeSchema 設置爲 true (以下面的例子所示), 或
  2. 將 global SQL option (全局 SQL 選項) spark.sql.parquet.mergeSchema 設置爲 true .
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)

當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來得到更好的性能. 此 behavior (行爲)由 spark.sql.hive.convertMetastoreParquet 配置控制, 默認狀況下 turned on (打開).

Hive/Parquet Schema Reconciliation

從 table schema processing (表格模式處理)的角度來講, Hive 和 Parquet 之間有兩個關鍵的區別.

  1. Hive 不區分大小寫, 而 Parquet 不是
  2. Hive 認爲全部 columns (列)均可覺得空, 而 Parquet 中的可空性是 significant (重要)的.

因爲這個緣由, 當將 Hive metastore Parquet 錶轉換爲 Spark SQL Parquet 表時, 咱們必須調整 metastore schema 與 Parquet schema. reconciliation 規則是:

  1. 在兩個 schema 中具備 same name (相同名稱)的 Fields (字段)必須具備 same data type (相同的數據類型), 而無論 nullability (可空性). reconciled field 應具備 Parquet 的數據類型, 以便 nullability (可空性)獲得尊重.

  2. reconciled schema (調和模式)正好包含 Hive metastore schema 中定義的那些字段.

    • 只出如今 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
    • 僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中做爲 nullable field (可空字段)添加.

Metadata Refreshing (元數據刷新)

Spark SQL 緩存 Parquet metadata 以得到更好的性能. 當啓用 Hive metastore Parquet table conversion (轉換)時, 這些 converted tables (轉換表)的 metadata (元數據)也被 cached (緩存). 若是這些表由 Hive 或其餘外部工具更新, 則須要手動刷新以確保 consistent metadata (一致的元數據).

// spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration (配置)

可使用 SparkSession 上的 setConf 方法或使用 SQL 運行 SET key = value 命令來完成 Parquet 的配置.

Property Name (參數名稱) Default(默認) Meaning(含義)
spark.sql.parquet.binaryAsString false 一些其餘 Parquet-producing systems (Parquet 生產系統), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區分 binary data (二進制數據)和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數據)解釋爲 string (字符串)以提供與這些系統的兼容性.
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數據解析爲 timestamp 以提供與這些系統的兼容性.
spark.sql.parquet.cacheMetadata true 打開 Parquet schema metadata 的緩存. 能夠加快查詢靜態數據.
spark.sql.parquet.compression.codec snappy 在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdown true 設置爲 true 時啓用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquet true 當設置爲 false 時, Spark SQL 將使用 Hive SerDe 做爲 parquet tables , 而不是內置的支持.
spark.sql.parquet.mergeSchema false

當爲 true 時, Parquet data source (Parquet 數據源) merges (合併)從全部 data files (數據文件)收集的 schemas , 不然若是沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .

spark.sql.optimizer.metadataOnly true

若是爲 true , 則啓用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區列)而不是 table scans (表掃描). 當 scanned (掃描)的全部 columns (列)都是 partition columns (分區列)而且 query (查詢)具備知足 distinct semantics (不一樣語義)的 aggregate operator (聚合運算符)時, 它將適用.

JSON Datasets (JSON 數據集)

Spark SQL 能夠 automatically infer (自動推斷)JSON dataset 的 schema, 並將其做爲 Dataset[Row] 加載. 這個 conversion (轉換)能夠在 Dataset[String] 上使用 SparkSession.read.json() 來完成, 或 JSON 文件.

請注意, 以 a json file 提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱 JSON Lines text format, also called newline-delimited JSON .

對於 regular multi-line JSON file (常規的多行 JSON 文件), 將 multiLine 選項設置爲 true .

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Hive 表

Spark SQL 還支持讀取和寫入存儲在 Apache Hive 中的數據。 可是,因爲 Hive 具備大量依賴關係,所以這些依賴關係不包含在默認 Spark 分發中。 若是在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。 請注意,這些 Hive 依賴關係也必須存在於全部工做節點上,由於它們將須要訪問 Hive 序列化和反序列化庫 (SerDes),以訪問存儲在 Hive 中的數據。

經過將 hive-site.xmlcore-site.xml(用於安全配置)和 hdfs-site.xml (用於 HDFS 配置)文件放在 conf/ 中來完成配置。

當使用 Hive 時,必須用 Hive 支持實例化 SparkSession,包括鏈接到持續的 Hive 轉移,支持 Hive serdes 和 Hive 用戶定義的功能。 沒有現有 Hive 部署的用戶仍然能夠啓用 Hive 支持。 當 hive-site.xml 未配置時,上下文會自動在當前目錄中建立 metastore_db,並建立由 spark.sql.warehouse.dir 配置的目錄,該目錄默認爲Spark應用程序當前目錄中的 spark-warehouse 目錄 開始了 請注意,自從2.0.0以來,hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已被棄用。 而是使用 spark.sql.warehouse.dir 來指定倉庫中數據庫的默認位置。 您可能須要向啓動 Spark 應用程序的用戶授予寫權限。å

import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.

指定 Hive 表的存儲格式

建立 Hive 表時,須要定義如何 從/向 文件系統 read/write 數據,即 「輸入格式」 和 「輸出格式」。 您還須要定義該表如何將數據反序列化爲行,或將行序列化爲數據,即 「serde」。 如下選項可用於指定存儲格式 (「serde」, 「input format」, 「output format」),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默認狀況下,咱們將以純文本形式讀取表格文件。 請注意,Hive 存儲處理程序在建立表時不受支持,您可使用 Hive 端的存儲處理程序建立一個表,並使用 Spark SQL 來讀取它。

Property Name Meaning
fileFormat fileFormat是一種存儲格式規範的包,包括 "serde","input format" 和 "output format"。 目前咱們支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定爲字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,若是您已經指定了 "fileFormat" 選項,則沒法指定它們。
serde 此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,若是給定的 `fileFormat` 已經包含 serde 的信息,那麼不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用這3個文件格式的這個選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這些選項只能與 "textfile" 文件格式一塊兒使用。它們定義如何將分隔的文件讀入行。

使用 OPTIONS 定義的全部其餘屬性將被視爲 Hive serde 屬性。

與不一樣版本的 Hive Metastore 進行交互

Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互,這使得 Spark SQL 可以訪問 Hive 表的元數據。 從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制構建可使用下面所述的配置來查詢不一樣版本的 Hive 轉移。 請注意,獨立於用於與轉移點通訊的 Hive 版本,內部 Spark SQL 將針對 Hive 1.2.1 進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。

如下選項可用於配置用於檢索元數據的 Hive 版本:

屬性名稱 默認值 含義
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。 可用選項爲 0.12.0 至 1.2.1
spark.sql.hive.metastore.jars builtin 當啓用 -Phive 時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲 1.2.1 或未定義。 行家 使用從Maven存儲庫下載的指定版本的Hive jar。 一般不建議在生產部署中使用此配置。 ***** 應用於實例化 HiveMetastoreClient 的 jar 的位置。該屬性能夠是三個選項之一:
  1. builtin當啓用 -Phive 時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲 1.2.1 或未定義。
  2. maven使用從 Maven 存儲庫下載的指定版本的 Hive jar。一般不建議在生產部署中使用此配置。
  3. JVM 的標準格式的 classpath。 該類路徑必須包含全部 Hive 及其依賴項,包括正確版本的 Hadoop。這些罐只須要存在於 driver 程序中,但若是您正在運行在 yarn 集羣模式,那麼您必須確保它們與應用程序一塊兒打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。 其它須要共享的類,是須要與已經共享的類進行交互的。 例如,log4j 使用的自定義 appender。

spark.sql.hive.metastore.barrierPrefixes (empty)

一個逗號分隔的類前綴列表,應該明確地爲 Spark SQL 正在通訊的 Hive 的每一個版本從新加載。 例如,在一般將被共享的前綴中聲明的 Hive UDF (即: org.apache.spark.*)。

JDBC 鏈接其它數據庫

Spark SQL 還包括可使用 JDBC 從其餘數據庫讀取數據的數據源。此功能應優於使用 JdbcRDD。 這是由於結果做爲 DataFrame 返回,而且能夠輕鬆地在 Spark SQL 中處理或與其餘數據源鏈接。 JDBC 數據源也更容易從 Java 或 Python 使用,由於它不須要用戶提供 ClassTag。(請注意,這不一樣於 Spark SQL JDBC 服務器,容許其餘應用程序使用 Spark SQL 運行查詢)。

要開始使用,您須要在 Spark 類路徑中包含特定數據庫的 JDBC driver 程序。 例如,要從 Spark Shell 鏈接到 postgres,您將運行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可使用 Data Sources API 未來自遠程數據庫的表做爲 DataFrame 或 Spark SQL 臨時視圖進行加載。 用戶能夠在數據源選項中指定 JDBC 鏈接屬性。用戶 和 密碼一般做爲登陸數據源的鏈接屬性提供。 除了鏈接屬性外,Spark 還支持如下不區分大小寫的選項:

屬性名稱 含義
url 要鏈接的JDBC URL。 源特定的鏈接屬性能夠在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 應該讀取的 JDBC 表。請注意,可使用在SQL查詢的 FROM 子句中有效的任何內容。 例如,您可使用括號中的子查詢代替完整表。
driver 用於鏈接到此 URL 的 JDBC driver 程序的類名。
partitionColumn, lowerBound, upperBound 若是指定了這些選項,則必須指定這些選項。 另外,必須指定 numPartitions. 他們描述如何從多個 worker 並行讀取數據時將表給分區。partitionColumn 必須是有問題的表中的數字列。 請注意,lowerBound 和 upperBound 僅用於決定分區的大小,而不是用於過濾表中的行。 所以,表中的全部行將被分區並返回。此選項僅適用於讀操做。
numPartitions 在表讀寫中能夠用於並行度的最大分區數。這也肯定併發JDBC鏈接的最大數量。 若是要寫入的分區數超過此限制,則在寫入以前經過調用 coalesce(numPartitions) 將其減小到此限制。
fetchsize JDBC 抓取的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操做。
batchsize JDBC 批處理的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能。 該選項僅適用於寫操做。默認值爲 1000.
isolationLevel 事務隔離級別,適用於當前鏈接。 它能夠是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,對應於 JDBC 鏈接對象定義的標準事務隔離級別,默認爲 READ_UNCOMMITTED。 此選項僅適用於寫操做。請參考 java.sql.Connection 中的文檔。
truncate 這是一個與 JDBC 相關的選項。 啓用 SaveMode.Overwrite 時,此選項會致使 Spark 截斷現有表,而不是刪除並從新建立。 這能夠更有效,而且防止表元數據(例如,索引)被移除。 可是,在某些狀況下,例如當新數據具備不一樣的模式時,它將沒法工做。 它默認爲 false。 此選項僅適用於寫操做。
createTableOptions 這是一個與JDBC相關的選項。 若是指定,此選項容許在建立表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此選項僅適用於寫操做。
createTableColumnTypes 使用數據庫列數據類型而不是默認值,建立表時。 數據類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的類型應該是有效的 spark sql 數據類型。此選項僅適用於寫操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

故障排除

  • JDBC driver 程序類必須對客戶端會話和全部執行程序上的原始類加載器可見。 這是由於 Java 的 DriverManager 類執行安全檢查,致使它忽略原始類加載器不可見的全部 driver 程序,當打開鏈接時。一個方便的方法是修改全部工做節點上的compute_classpath.sh 以包含您的 driver 程序 JAR。
  • 一些數據庫,例如 H2,將全部名稱轉換爲大寫。 您須要使用大寫字母來引用 Spark SQL 中的這些名稱。

性能調優

對於某些工做負載,能夠經過緩存內存中的數據或打開一些實驗選項來提升性能。

在內存中緩存數據

Spark SQL 能夠經過調用 spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 來使用內存中的列格式來緩存表。 而後,Spark SQL 將只掃描所需的列,並將自動調整壓縮以最小化內存使用量和 GC 壓力。 您能夠調用 spark.catalog.uncacheTable("tableName") 從內存中刪除該表。

內存緩存的配置可使用 SparkSession 上的 setConf 方法或使用 SQL 運行 SET key=value 命令來完成。

屬性名稱 默認 含義
spark.sql.inMemoryColumnarStorage.compressed true 當設置爲 true 時,Spark SQL 將根據數據的統計信息爲每一個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱狀緩存的大小。更大的批量大小能夠提升內存利用率和壓縮率,可是在緩存數據時會冒出 OOM 風險。

其餘配置選項

如下選項也可用於調整查詢執行的性能。這些選項可能會在未來的版本中被廢棄,由於更多的優化是自動執行的。

屬性名稱 默認值 含義
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在讀取文件時,將單個分區打包的最大字節數。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字節數來衡量的打開文件的估計費用能夠在同一時間進行掃描。 將多個文件放入分區時使用。最好過分估計,那麼具備小文件的分區將比具備較大文件的分區(首先計劃的)更快。
spark.sql.broadcastTimeout 300

廣播鏈接中的廣播等待時間超時(秒)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置執行鏈接時將廣播給全部工做節點的表的最大大小(以字節爲單位)。 經過將此值設置爲-1能夠禁用廣播。 請注意,目前的統計信息僅支持 Hive Metastore 表,其中已運行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

分佈式 SQL 引擎

Spark SQL 也能夠充當使用其 JDBC/ODBC 或命令行界面的分佈式查詢引擎。 在這種模式下,最終用戶或應用程序能夠直接與 Spark SQL 交互運行 SQL 查詢,而不須要編寫任何代碼。

運行 Thrift JDBC/ODBC 服務器

這裏實現的 Thrift JDBC/ODBC 服務器對應於 Hive 1.2 中的 HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。

要啓動 JDBC/ODBC 服務器,請在 Spark 目錄中運行如下命令:

./sbin/start-thriftserver.sh

此腳本接受全部 bin/spark-submit 命令行選項,以及 --hiveconf 選項來指定 Hive 屬性。 您能夠運行 ./sbin/start-thriftserver.sh --help 查看全部可用選項的完整列表。 默認狀況下,服務器監聽 localhost:10000. 您能夠經過環境變量覆蓋此行爲,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

or system properties:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

如今,您可使用 beeline 來測試 Thrift JDBC/ODBC 服務器:

./bin/beeline

使用 beeline 方式鏈接到 JDBC/ODBC 服務器:

beeline> !connect jdbc:hive2://localhost:10000

Beeline 將要求您輸入用戶名和密碼。 在非安全模式下,只需輸入機器上的用戶名和空白密碼便可。 對於安全模式,請按照 beeline 文檔 中的說明進行操做。

配置Hive是經過將 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。

您也可使用 Hive 附帶的 beeline 腳本。

Thrift JDBC 服務器還支持經過 HTTP 傳輸發送 thrift RPC 消息。 使用如下設置啓用 HTTP 模式做爲系統屬性或在 conf/ 中的 hive-site.xml 文件中啓用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要測試,請使用 beeline 以 http 模式鏈接到 JDBC/ODBC 服務器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

運行 Spark SQL CLI

Spark SQL CLI 是在本地模式下運行 Hive 轉移服務並執行從命令行輸入的查詢的方便工具。 請注意,Spark SQL CLI 不能與 Thrift JDBC 服務器通訊。

要啓動 Spark SQL CLI,請在 Spark 目錄中運行如下命令:

./bin/spark-sql

配置 Hive 是經過將 hive-site.xmlcore-site.xml 和 hdfs-site.xml 文件放在 conf/ 中完成的。 您能夠運行 ./bin/spark-sql --help 獲取全部可用選項的完整列表。

遷移指南

從 Spark SQL 2.1 升級到 2.2

  • Spark 2.1.1 介紹了一個新的配置 key: spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是 NEVER_INFER, 其行爲與 2.1.0 保持一致. 可是,Spark 2.2.0 將此設置的默認值更改成 「INFER_AND_SAVE」,以恢復與底層文件 schema(模式)具備大小寫混合的列名稱的 Hive metastore 表的兼容性。使用 INFER_AND_SAVE 配置的 value, 在第一次訪問 Spark 將對其還沒有保存推測 schema(模式)的任何 Hive metastore 表執行 schema inference(模式推斷). 請注意,對於具備數千個 partitions(分區)的表,模式推斷多是很是耗時的操做。若是不兼容大小寫混合的列名,您能夠安全地將spark.sql.hive.caseSensitiveInferenceMode 設置爲 NEVER_INFER,以免模式推斷的初始開銷。請注意,使用新的默認INFER_AND_SAVE 設置,模式推理的結果被保存爲 metastore key 以供未來使用。所以,初始模式推斷僅發生在表的第一次訪問。

從 Spark SQL 2.0 升級到 2.1

  • Datasource tables(數據源表)如今存儲了 Hive metastore 中的 partition metadata(分區元數據). 這意味着諸如 ALTER TABLE PARTITION ... SET LOCATION 這樣的 Hive DDLs 如今使用 Datasource API 可用於建立 tables(表).
    • 遺留的數據源表能夠經過 MSCK REPAIR TABLE 命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
    • 要肯定表是否已遷移,當在表上發出 DESCRIBE FORMATTED 命令時請查找 PartitionProvider: Catalog 屬性.
  • Datasource tables(數據源表)的 INSERT OVERWRITE TABLE ... PARTITION ... 行爲的更改。
    • 在之前的 Spark 版本中,INSERT OVERWRITE 覆蓋了整個 Datasource table,即便給出一個指定的 partition. 如今只有匹配規範的 partition 被覆蓋。
    • 請注意,這仍然與 Hive 表的行爲不一樣,Hive 表僅覆蓋與新插入數據重疊的分區。

從 Spark SQL 1.6 升級到 2.0

  • SparkSession 如今是 Spark 新的切入點, 它替代了老的 SQLContext 和 HiveContext。注意 : 爲了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能夠從 SparkSession 獲取一個新的 catalog 接口 — 現有的訪問數據庫和表的 API,如 listTablescreateExternalTabledropTempViewcacheTable 都被移到該接口。

  • Dataset API 和 DataFrame API 進行了統一。在 Scala 中,DataFrame 變成了 Dataset[Row] 類型的一個別名,而 Java API 使用者必須將 DataFrame 替換成 Dataset<Row>。Dataset 類既提供了強類型轉換操做(如 mapfilter 以及 groupByKey)也提供了非強類型轉換操做(如 select 和 groupBy)。因爲編譯期的類型安全不是 Python 和 R 語言的一個特性,Dataset 的概念並不適用於這些語言的 API。相反,DataFrame仍然是最基本的編程抽象, 就相似於這些語言中單節點 data frame 的概念。

  • Dataset 和 DataFrame API 中 unionAll 已通過時而且由 union 替代。
  • Dataset 和 DataFrame API 中 explode 已通過時,做爲選擇,能夠結合 select 或 flatMap 使用 functions.explode() 。
  • Dataset 和 DataFrame API 中 registerTempTable 已通過時而且由 createOrReplaceTempView 替代。

  • 對 Hive tables CREATE TABLE ... LOCATION 行爲的更改.
    • 從 Spark 2.0 開始,CREATE TABLE ... LOCATION 與 CREATE EXTERNAL TABLE ... LOCATION 是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現有數據。這意味着,在用戶指定位置的 Spark SQL 中建立的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數據。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意,這與Hive行爲不一樣。
    • 所以,這些表上的 「DROP TABLE」 語句不會刪除數據。

從 Spark SQL 1.5 升級到 1.6

  • 從 Spark 1.6 開始,默認狀況下服務器在多 session(會話)模式下運行。這意味着每一個 JDBC/ODBC 鏈接擁有一份本身的 SQL 配置和臨時函數註冊。緩存表仍在並共享。若是您但願以舊的單會話模式運行 Thrift server,請設置選項 spark.sql.hive.thriftServer.singleSession 爲true。您既能夠將此選項添加到 spark-defaults.conf,或者經過 --conf 將它傳遞給 start-thriftserver.sh
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ... 
  • 從 1.6.1 開始,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現有列。

  • 從 Spark 1.6 開始,LongType 強制轉換爲 TimestampType 指望是秒,而不是微秒。這種更改是爲了匹配 Hive 1.2 的行爲,以便從 numeric(數值)類型進行更一致的類型轉換到 TimestampType。更多詳情請參閱 SPARK-11724 。

從 Spark SQL 1.4 升級到 1.5

  • 使用手動管理的內存優化執行,如今是默認啓用的,以及代碼生成表達式求值。這些功能既能夠經過設置 spark.sql.tungsten.enabled 爲 false 來禁止使用。
  • Parquet 的模式合併默認狀況下再也不啓用。它能夠經過設置 spark.sql.parquet.mergeSchema 到 true 以從新啓用。
  • 字符串在 Python 列的 columns(列)如今支持使用點(.)來限定列或訪問嵌套值。例如 df['table.column.nestedField']。可是,這意味着若是你的列名中包含任何圓點,你如今必須避免使用反引號(如 table.column.with.dots.nested)。
  • 在內存中的列存儲分區修剪默認是開啓的。它能夠經過設置 spark.sql.inMemoryColumnarStorage.partitionPruning 爲 false 來禁用。
  • 無限精度的小數列再也不支持,而不是 Spark SQL 最大精度爲 38 。當從 BigDecimal 對象推斷模式時,如今使用(38,18)。在 DDL 沒有指定精度時,則默認保留 Decimal(10, 0)
  • 時間戳如今存儲在 1 微秒的精度,而不是 1 納秒的。
  • 在 sql 語句中,floating point(浮點數)如今解析爲 decimal。HiveQL 解析保持不變。
  • SQL / DataFrame 函數的規範名稱如今是小寫(例如 sum vs SUM)。
  • JSON 數據源不會自動加載由其餘應用程序(未經過 Spark SQL 插入到數據集的文件)建立的新文件。對於 JSON 持久表(即表的元數據存儲在 Hive Metastore),用戶可使用 REFRESH TABLE SQL 命令或 HiveContext 的 refreshTable 方法,把那些新文件列入到表中。對於表明一個 JSON dataset 的 DataFrame,用戶須要從新建立 DataFrame,同時 DataFrame 中將包括新的文件。
  • PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現有的同名列。

從 Spark SQL 1.3 升級到 1.4

DataFrame data reader/writer interface

基於用戶反饋,咱們建立了一個新的更流暢的 API,用於讀取 (SQLContext.read) 中的數據並寫入數據 (DataFrame.write), 而且舊的 API 將過期(例如,SQLContext.parquetFileSQLContext.jsonFile).

針對 SQLContext.read ( ScalaJavaPython ) 和 DataFrame.write ( ScalaJavaPython ) 的更多細節,請看 API 文檔.

DataFrame.groupBy 保留 grouping columns(分組的列)

根據用戶的反饋, 咱們更改了 DataFrame.groupBy().agg() 的默認行爲以保留 DataFrame 結果中的 grouping columns(分組列). 爲了在 1.3 中保持該行爲,請設置 spark.sql.retainGroupColumns 爲 false.

// In 1.3.x, in order for the grouping column "department" to show up, // it must be included explicitly as part of the agg function call. df.groupBy("department").agg($"department", max("age"), sum("expense")) // In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(max("age"), sum("expense")) // Revert to 1.3 behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn 上的行爲更改

以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。該列將始終在 DateFrame 結果中被加入做爲新的列,即便現有的列可能存在相同的名稱。從 1.4 版本開始,DataFrame.withColumn() 支持添加與全部現有列的名稱不一樣的列或替換現有的同名列。

請注意,這一變化僅適用於 Scala API,並不適用於 PySpark 和 SparkR。

從 Spark SQL 1.0-1.2 升級到 1.3

在 Spark 1.3 中,咱們從 Spark SQL 中刪除了 「Alpha」 的標籤,做爲一部分已經清理過的可用的 API 。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其餘版本的二進制兼容性。這種兼容性保證不包括被明確標記爲不穩定的(即 DeveloperApi 類或 Experimental) API。

重命名 DataFrame 的 SchemaRDD

升級到 Spark SQL 1.3 版本時,用戶會發現最大的變化是,SchemaRDD 已改名爲 DataFrame。這主要是由於 DataFrames 再也不從 RDD 直接繼承,而是由 RDDS 本身來實現這些功能。DataFrames 仍然能夠經過調用 .rdd 方法轉換爲 RDDS 。

在 Scala 中,有一個從 SchemaRDD 到 DataFrame 類型別名,能夠爲一些狀況提供源代碼兼容性。它仍然建議用戶更新他們的代碼以使用 DataFrame來代替。Java 和 Python 用戶須要更新他們的代碼。

Java 和 Scala APIs 的統一

此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext 和 JavaSchemaRDD),借鑑於 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已經統一。兩種語言的用戶可使用 SQLContext 和 DataFrame。通常來講論文類嘗試使用兩種語言的共有類型(如 Array 替代了一些特定集合)。在某些狀況下不通用的類型狀況下,(例如,passing in closures 或 Maps)使用函數重載代替。

此外,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可使用存在於 org.apache.spark.sql.types 類來描述編程模式。

隔離隱式轉換和刪除 dsl 包(僅Scala)

許多 Spark 1.3 版本之前的代碼示例都以 import sqlContext._ 開始,這提供了從 sqlContext 範圍的全部功能。在 Spark 1.3 中,咱們移除了從 RDDs 到 DateFrame 再到 SQLContext 內部對象的隱式轉換。用戶如今應該寫成 import sqlContext.implicits._.

此外,隱式轉換如今只能使用方法 toDF 來增長由 Product(即 case classes or tuples)構成的 RDD,而不是自動應用。

當使用 DSL 內部的函數時(如今使用 DataFrame API 來替換), 用戶習慣導入 org.apache.spark.sql.catalyst.dsl. 相反,應該使用公共的 dataframe 函數 API: import org.apache.spark.sql.functions._.

針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)

Spark 1.3 移除存在於基本 SQL 包的 DataType 類型別名。開發人員應改成導入類 org.apache.spark.sql.types

UDF 註冊遷移到 sqlContext.udf 中 (Java & Scala)

用於註冊 UDF 的函數,無論是 DataFrame DSL 仍是 SQL 中用到的,都被遷移到 SQLContext 中的 udf 對象中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF 註冊保持不變。

Python DataTypes 再也不是 Singletons(單例的)

在 Python 中使用 DataTypes 時,你須要先構造它們(如:StringType()),而不是引用一個單例對象。

與 Apache Hive 的兼容

Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基於 Hive 1.2.1 版本,而且Spark SQL 能夠鏈接到不一樣版本的Hive metastore(從 0.12.0 到 1.2.1,能夠參考 與不一樣版本的 Hive Metastore 交互

在現有的 Hive Warehouses 中部署

Spark SQL Thrift JDBC server 採用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不須要修改現有的 Hive Metastore , 或者改變數據的位置和表的分區。

所支持的 Hive 特性

Spark SQL 支持絕大部分的 Hive 功能,如:

  • Hive query(查詢)語句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部 Hive 操做, 包括:
    • 關係運算符 (===<><>>=<=, 等等)
    • 算術運算符 (+-*/%, 等等)
    • 邏輯運算符 (AND&&OR||, 等等)
    • 複雜類型的構造
    • 數學函數 (signlncos, 等等)
    • String 函數 (instrlengthprintf, 等等)
  • 用戶定義函數 (UDF)
  • 用戶定義聚合函數 (UDAF)
  • 用戶定義 serialization formats (SerDes)
  • 窗口函數
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查詢)
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 全部的 Hive DDL 函數, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的 Hive Data types(數據類型), 包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

未支持的 Hive 函數

如下是目前還不支持的 Hive 函數列表。在 Hive 部署中這些功能大部分都用不到。

主要的 Hive 功能

  • Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.

Esoteric Hive 功能

  • UNION 類型
  • Unique join
  • Column 統計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive 優化

有少數 Hive 優化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因爲 Spark SQL 的這種內存計算模型而顯得不那麼重要。另一些在 Spark SQL 將來的版本中會持續跟蹤。

  • Block 級別的 bitmap indexes 和虛擬 columns (用於構建 indexes)
  • 自動爲 join 和 groupBy 計算 reducer 個數 : 目前在 Spark SQL 中, 你須要使用 「SET spark.sql.shuffle.partitions=[num_tasks];」 來控制 post-shuffle 的並行度.
  • 僅 Meta-data 的 query: 對於只使用 metadata 就能回答的查詢,Spark SQL 仍然會啓動計算結果的任務.
  • Skew data flag: Spark SQL 不遵循 Hive 中 skew 數據的標記.
  • STREAMTABLE hint in join: Spark SQL 不遵循 STREAMTABLE hint.
  • 對於查詢結果合併多個小文件: 若是輸出的結果包括多個小文件, Hive 能夠可選的合併小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 還不支持這樣.

參考

數據類型

Spark SQL 和 DataFrames 支持下面的數據類型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementTypecontainsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataTypenullable is used to indicate if values of this fields can have null values.

Spark SQL 的全部數據類型都在包 org.apache.spark.sql.types 中. 你能夠用下示例示例來訪問它們.

import org.apache.spark.sql.types._ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數據類型) Scala 中的 Value 類型 訪問或建立數據類型的 API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note(注意): containsNull 的默認值是 true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note(注意): valueContainsNull 的默認值是 true.
StructType org.apache.spark.sql.Row StructType(fields)
Note(注意): fields 是 StructFields 的 Seq. 全部, 兩個 fields 擁有相同的名稱是不被容許的.
StructField 該 field(字段)數據類型的 Scala 中的 value 類型 (例如, 數據類型爲 IntegerType 的 StructField 是 Int) StructField(name, dataType, [nullable])
Note: nullable 的默認值是 true.

NaN Semantics

當處理一些不符合標準浮點數語義的 float 或 double 類型時,對於 Not-a-Number(NaN) 須要作一些特殊處理. 具體以下:

  • NaN = NaN 返回 true.
  • 在 aggregations(聚合)操做中,全部的 NaN values 將被分到同一個組中.
  • 在 join key 中 NaN 能夠當作一個普通的值.
  • NaN 值在升序排序中排到最後,比任何其餘數值都大.


相關文章
相關標籤/搜索