【Spark 2.0官方文檔】Spark SQL、DataFrames以及Datasets指南

#文檔說明html

#概述 Spark SQL 是 Spark 用於處理結構化數據的一個模塊。不一樣於基礎的 Spark RDD API,Spark SQL 提供的接口提供了更多關於數據和執行的計算任務的結構信息。Spark SQL 內部使用這些額外的信息來執行一些額外的優化操做。有幾種方式能夠與 Spark SQL 進行交互,其中包括 SQL 和 Dataset API。當計算一個結果時 Spark SQL 使用的執行引擎是同樣的, 它跟你使用哪一種 API 或編程語言來表達計算無關。這種統一意味着開發人員能夠很容易地在不一樣的 API 之間來回切換,基於哪一種 API 可以提供一種最天然的方式來表達一個給定的變換 (transformation)。java

本文中全部的示例程序都使用 Spark 發行版本中自帶的樣本數據,而且能夠在 spark-shell、pyspark shell 以及 sparkR shell 中運行。python

##SQLmysql

Spark SQL 的用法之一是執行 SQL 查詢,它也能夠從現有的 Hive 中讀取數據,想要了解更多關於如何配置這個特性的細節, 請參考 Hive表 這節。若是從其它編程語言內部運行 SQL,查詢結果將做爲一個 Dataset/DataFrame 返回。你還可使用命令行或者經過 JDBC/ODBC 來與 SQL 接口進行交互。sql

##Dataset和DataFrameshell

Dataset 是一個分佈式數據集,它是 Spark 1.6 版本中新增的一個接口, 它結合了 RDD(強類型,可使用強大的 lambda 表達式函數)和 Spark SQL 的優化執行引擎的好處。Dataset 能夠從 JVM 對象構造獲得,隨後可使用函數式的變換(map,flatMap,filter 等)進行操做。Dataset API 目前支持 Scala 和 Java 語言,還不支持 Python, 不過因爲 Python 語言的動態性, Dataset API 的許多好處早就已經可用了(例如,你可使用 row.columnName 來訪問數據行的某個字段)。這種場景對於 R 語言來講是相似的。數據庫

DataFrame 是按命名列方式組織的一個 Dataset。從概念上來說,它等同於關係型數據庫中的一張表或者 R 和 Python 中的一個 data frame,只不過在底層進行了更多的優化。DataFrame 能夠從不少數據源構造獲得,好比:結構化的數據文件,Hive 表,外部數據庫或現有的 RDD。DataFrame API 支持 Scala, Java, Python 以及 R 語言。在 Scala 和 Java 語言中, DataFrame 由 Row 的 Dataset 來表示的。在 Scala API 中, DataFrame 僅僅只是 Dataset[Row] 的一個類型別名,而在 Java API 中, 開發人員須要使用 Dataset<Row> 來表示一個 DataFrame。apache

在本文中, 咱們每每把 Scala/Java 中 Row 的 Dataset 當作 DataFrame。編程

#入門json

##入口:SparkSession

Spark 中全部的功能入口都是 SparkSession 類。要建立一個基本的 SparkSession 對象,只須要使用 SparkSession.builder() 方法。

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL Example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// 用於隱式轉換,像將RDD轉換成DataFrame
import spark.implicits._

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL Example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

#####Python

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/python/sql.py" 文件。

Spark 2.0 版本中的 SparkSession 對於 Hive 的一些功能特性提供了內置支持, 包括使用 HiveQL 編寫查詢語句, 訪問 Hive UDF 以及從 Hive 表中讀取數據。想要使用這些特性, 須要確保你已經安裝了 Hive。

##建立DataFrame

Spark 應用程序可使用 SparkSession 從現有的RDD、Hive 表或 Spark 數據源建立DataFrame。

下面這個示例基於一個 JSON 文件內容建立了一個 DataFrame:

#####Scala

val df = spark.read.json("examples/src/main/resources/people.json")

// 將 DataFrame 內容展現到標準輸出
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// 將 DataFrame 內容展現到標準輸出
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

#####Python

# spark是已存在的 SparkSession 對象
df = spark.read.json("examples/src/main/resources/people.json")

# 將 DataFrame 內容展現到標準輸出
df.show()

##非強類型Dataset操做(即DataFrame操做)

DataFrame 爲 Scala, Java, Python 以及 R 語言中的結構化數據操做提供了一個領域特定語言。

上面咱們也提到過, Spark 2.0 版本中, Scala 和 Java API 中的 DataFrame 只是 Row 的 Dataset。與強類型 Scala/Java Dataset 提供的"強類型變換(typed transformations)"相比,這些操做也被稱爲"非強類型變換(untyped transformations)" 。

下面提供了幾個使用 Dataset 處理結構化數據的基礎示例:

#####Scala

// import語句須要使用 $ 符號
import spark.implicits._
// 按照tree格式打印schema
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 只選擇"name"列
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 選擇全部人, 可是age加 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 選擇年齡大於21歲的人
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按年齡統計人數
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

DataFrame 上可執行的操做類型的一個完整列表能夠參考:API文檔

除了簡單的列引用和表達式以外,DataFrame 還提供了豐富的函數庫,包括字符串操做,日期計算,常見的數學操做等。完整列表能夠參見:DataFrame函數參考文檔

#####Java

// col("...") 比 df.col("...") 更好
import static org.apache.spark.sql.functions.col;

// 按照tree格式打印schema
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 只選擇"name"列
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// 選擇全部人, 可是age加 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// 選擇年齡大於21歲的人
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// 按年齡統計人數
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

DataFrame 上可執行的操做類型的一個完整列表能夠參考:API文檔

除了簡單的列引用和表達式以外,DataFrame 還提供了豐富的函數庫,包括字符串操做,日期計算,常見的數學操做等。完整列表能夠參見:DataFrame函數參考文檔

#####Python

在 Python 中, 可使用屬性 (df.age) 或者經過索引 (df['age']) 來訪問 DataFrame 的列。 雖然前者對於交互式數據檢索很是方便, 可是咱們仍是很是建議開發人員使用後者, 由於後者更面向於將來,而且不會與也是 DataFrame 類上的屬性的列名衝突。

# spark是一個已存在的 SparkSession 對象

# 建立 DataFrame
df = spark.read.json("examples/src/main/resources/people.json")

# 將 DataFrame 內容展現到標準輸出
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# 按照tree格式打印schema
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# 只選擇"name"列
df.select("name").show()
## name
## Michael
## Andy
## Justin

# 選擇全部人, 可是age加 1
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# 選擇年齡大於21歲的人
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# 按年齡統計人數
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

DataFrame 上可執行的操做類型的一個完整列表能夠參考:API文檔

除了簡單的列引用和表達式以外,DataFrame 還提供了豐富的函數庫,包括字符串操做,日期計算,常見的數學操做等。完整列表能夠參見:DataFrame函數參考文檔

##編程方式運行SQL查詢

#####Scala

SparkSession 中的 sql 函數可使應用程序可以以編程方式運行 SQL 查詢並將查詢結果做爲一個 DataFrame 返回。

// 將 DataFrame 註冊爲一個SQL臨時視圖
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

SparkSession 中的 sql 函數可使應用程序可以以編程方式運行 SQL 查詢並將結果做爲一個 Dataset<Row> 返回。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// 將 DataFrame 註冊爲一個SQL臨時視圖
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

#####Python

SparkSession 中的 sql 函數可使應用程序可以以編程方式運行 SQL 查詢並將結果做爲一個 DataFrame 返回。

# spark 是一個已存在的 SparkSession 對象
df = spark.sql("SELECT * FROM table")

##建立Dataset

Dataset 和 RDD 相似, 可是 Dataset 使用的是一個專門的編碼器(Encoder )來序列化對象以進行跨網絡的數據處理和傳輸, 而不是使用 Java 序列化或者 Kryo 。 雖然編碼器和標準的序列化均可以將對象轉化成字節, 可是編碼器能夠根據代碼動態生成而且使用一種能夠容許 Spark 執行不少像過濾、排序、哈希等操做而不須要將字節反序列化成一個對象的特殊的數據格式。

#####Scala

// 注意: Scala 2.10 中的Case classes 最多隻能支持到22個字段,想要突破這個限制,
// 你可使用實現了Product接口的自定義類
case class Person(name: String, age: Long)

// 爲 case classes 建立Encoders
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 經過引入spark.implicits._來自動地提供可用於大多數經常使用類型的Encoders
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// 經過提供一個類,DataFrames能夠轉換成一個 Dataset。映射是基於名稱的。
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// 建立一個Person類的實例
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// 建立用於Java bean的Encoders
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// 大多數經常使用類型的Encoders是由 Encoder 類提供的
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// 經過提供一個類,DataFrames能夠轉換成一個 Dataset。映射是基於名稱的。
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

##與RDD互操做

Spark SQL 支持兩種不一樣的方法來將現有 RDD 轉化成 Dataset. 第一種方法是使用反射來推導包含特定類型對象的 RDD 的 schema。當你編寫 Spark 應用程序的時候若是已經知道了 schema, 那麼這種基於反射機制的方法可以使代碼更加簡潔而且運行良好。

第二種用於建立 Dataset 的方法是經過一個容許你構造一個 schema 並將其應用到一個現有 RDD 上的編程接口。儘管這種方法代碼很是冗長, 可是它容許你在不知道列和列類型的狀況下構造 Dataset。

###使用反射推導Schema

#####Scala

Spark SQL 的 Scala 接口支持自動地將包含 case class 的 RDD 轉化爲一個 DataFrame, 其中 case class 定義了表的 schema。case class 的參數名稱使用反射讀取並映射成表的列名。Case classes能夠是嵌套的或者包含一些像 Seq 或 Array 這樣的複雜類型。這個 RDD 能夠隱式地轉換成一個 DataFrame 而後註冊成一個表。 能夠在後續的 SQL 語句中使用這些表。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder

// 用於從RDD到DataFrame的隱式轉換
import spark.implicits._

// 從一個文本文件中建立一個Person對象的RDD,將其轉換成一個Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// 將DataFrame註冊成一個臨時視圖
peopleDF.createOrReplaceTempView("people")

// SQL語句能夠經過Spark的sql方法來運行
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// 查詢結果中某一行的列能夠經過字段索引來訪問 
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// 或者經過字段名稱
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// 沒有用於Dataset[Map[K,V]]的預約義 encoders , 須要顯示地定義
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// 基本類型和 case classes 也能夠定義成:
implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()

// row.getValuesMap[T] 一次檢索多個列到一個 Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

Spark SQL 支持自動地將一個 JavaBean 的 RDD 轉換成一個 DataFrame。使用反射得到的 BeanInfo 對象定義了表的 schema。目前 Spark SQL 不支持包含 Map 字段的 JavaBean,可是支持嵌套的 JavaBean 和 List 或 Array 字段。你能夠建立一個實現了 Serializable 接口的JavaBean,而且爲它全部的字段生成 getters 和 setters 方法。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// 從一個文本文件中建立Person對象的RDD
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(new Function<String, Person>() {
    @Override
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");
      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      return person;
    }
  });

// 將 schema 應用到 JavaBean 的 RDD 上以獲取一個 DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// 將DataFrame註冊成一個臨時視圖
peopleDF.createOrReplaceTempView("people");

// SQL語句能夠經過Spark的sql方法來運行
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// 查詢結果中某一行的列能夠經過字段索引來訪問 
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// 或者經過字段名稱
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.<String>getAs("name");
  }
}, stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

#####Python

Spark SQL 能夠將 Row 對象的 RDD 轉化成一個 DataFrame, 並推導其數據類型。Row 是經過將 key/value 鍵值對列表做爲 kwargs 參數傳遞給 Row 類構造而來。這個列表的鍵定義了表的列名, 而且其類型是經過抽樣整個數據集推導而來, 相似於 JSON 文件上執行的推導。

# spark是一個已存在的SparkSession對象.
from pyspark.sql import Row
sc = spark.sparkContext

# 加載一個文本文件並將每一行轉換成一個Row對象.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# 推導 schema , 並將DataFrame註冊成一個表.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL 能夠運行在已經註冊成表的DataFrame上
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# SQL查詢的結果是RDD並支持全部標準的RDD操做
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)

###編程方式指定Schema

#####Scala

若是不能事先定義 case class (例如, 要解析編碼於一個字符串或是一個文本數據集中的記錄結構, 並且字段對於不一樣的用戶字段映射不一樣), 那麼按照下面三個步驟能夠以編程方式建立一個 DataFrame :

  1. 從原始的 RDD 建立一個Row 的 RDD;
  2. 建立由 StructType 表示的 schema, 它和步驟1中建立的 RDD 中的 Row 結構相匹配;
  3. 經過 SparkSession 提供的 createDataFrame 方法將獲得的 schema 應用到 Row 的 RDD 上。

例如:

import org.apache.spark.sql.types._

// 建立一個 RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// schema編碼於一個字符串中
val schemaString = "name age"

// 基於schema的字符串生成schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

//   將RDD (people) 的記錄轉換成 Row
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

//   將schema應用到RDD上
val peopleDF = spark.createDataFrame(rowRDD, schema)

// 使用DataFrame建立一個臨時視圖
peopleDF.createOrReplaceTempView("people")

// SQL能夠運行在一個使用DataFrame建立的臨時視圖上
val results = spark.sql("SELECT name FROM people")

// SQL查詢的結果是DataFrame並支持全部標準的RDD操做
// 查詢結果中某一行的列能夠經過字段索引或字段名稱來訪問
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|

// +-------------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

#####Java

若是不能事先定義 JavaBean (例如, 要解析編碼於一個字符串或是一個文本數據集中的記錄結構, 並且字段對於不一樣的用戶字段映射不一樣), 那麼按照下面三個步驟能夠以編程方式建立一個 Dataset<Row>:

  1. 從原始的 RDD 建立一個 Row的 RDD;
  2. 建立由 StructType 表示的 schema, 它和步驟1中建立的 RDD 中的 Row 結構相匹配;
  3. 經過 SparkSession 提供的 createDataFrame 方法將獲得的 schema 應用到 Row的 RDD 上。

例如:

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// 建立一個 RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// schema編碼於一個字符串中
String schemaString = "name age";

// 基於schema的字符串生成schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// 將RDD (people) 的記錄轉換成 Row
JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
  @Override
  public Row call(String record) throws Exception {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
  }
});

// 將schema應用到RDD上
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// 使用DataFrame建立一個臨時視圖
peopleDataFrame.createOrReplaceTempView("people");

// SQL能夠運行在一個使用DataFrame建立的臨時視圖上
Dataset<Row> results = spark.sql("SELECT name FROM people");

// SQL查詢的結果是DataFrame並支持全部標準的RDD操做
// 查詢結果中某一行的列能夠經過字段索引或字段名稱來訪問
Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 文件。

#####Python

若是不能事先定義 kwargs 字典 (例如, 要解析編碼於一個字符串或是一個文本數據集中的記錄結構, 並且字段對於不一樣的用戶字段映射不一樣), 那麼按照下面三個步驟能夠以編程方式建立一個 DataFrame:

  1. 從原始的 RDD 建立一個 tuple 或 list 的 RDD;
  2. 建立由 StructType 表示的 schema, 它和步驟1中建立的 RDD 中的 tuple 或 list 結構相匹配;
  3. 經過 SparkSession 提供的 createDataFrame 方法將獲得的 schema 應用到 RDD 上。

例如:

# 引入 SparkSession 和數據類型
from pyspark.sql.types import *

# spark 是一個已存在的SparkSession對象
sc = spark.sparkContext

# 加載一個文本文件並將每一行轉換成一個tuple
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# schema編碼於一個字符串中
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# 將schema應用到RDD上
schemaPeople = spark.createDataFrame(people, schema)

# 使用DataFrame建立一個臨時視圖
schemaPeople.createOrReplaceTempView("people")

# SQL能夠運行在一個已經註冊成表的DataFrame上
results = spark.sql("SELECT name FROM people")

# SQL查詢的結果是DataFrame並支持全部標準的RDD操做
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print(name)

#數據源

經過統一的 DataFrame 接口, Spark SQL 支持在不一樣的數據源上進行操做。既能夠在 DataFrame 上使用關係型的變換 (transformations) 進行操做,也能夠用其建立一個臨時視圖。將 DataFrame 註冊成一個臨時視圖能夠容許你在它的數據上運行SQL查詢。本節將描述使用 Spark 數據源加載和保存數據的一些通用方法,而後深刻介紹一下專門用於內置數據源的一些選項。

##通用的加載/保存函數

最簡單的方式就是全部操做都使用默認的數據源(除非使用了spark.sql.sources.default 進行配置,不然默認值是 parquet)。

#####Scala

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"文件。

#####Python

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

###手動指定選項

你也能夠手動指定要使用的數據源,並設置一些你想傳遞給數據源的額外選項。數據源可由其全限定名指定(例如,org.apache.spark.sql.parquet),而對於內置數據源,你可使用簡寫(json, parquet, jdbc)。使用下面的語法能夠將從任意類型的數據源中加載的 DataFrame 轉換成其它的類型。

#####Scala

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。

#####Python

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

###直接在文件上運行SQL

除了使用讀取 API 將文件加載到 DataFrame 而後執行查詢,你還能夠直接使用 SQL 查詢文件。

#####Scala

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。

####Python

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

###保存模式

Save 操做能夠選擇使用一個 SaveMode,它指定了如何處理數據已經存在的狀況。很重要的一點咱們須要意識到的是,這些保存模式都沒有加鎖而且都不是原子操做。另外,當執行 Overwrite 操做時,寫入新數據以前原有的數據會被刪除。

Scala/Java 任何語言 含義
SaveMode.ErrorIfExists (默認) "error" (默認) 當保存 DataFrame 到數據源時,若是數據已經存在,將會拋出異常。
SaveMode.Append "append" 當保存 DataFrame 到數據源時,若是數據或表已經存在,則將 DataFrame 內容追加到現有數據末尾。
SaveMode.Overwrite "overwrite" Overwrite 模式意味着保存 DataFrame 到數據源時,若是數據或表已經存在,則使用 DataFrame 內容覆蓋已有數據。
SaveMode.Ignore "ignore" Ignore 模式意味着保存 DataFrame 到數據源時,若是數據已經存在,那麼保存操做將不會保存 DataFrame 內容而且不會改變現有數據。這和 SQL 裏的 CREATE TABLE IF NOT EXISTS 相似。

###保存到持久化表

可使用 saveAsTable 命令將 DataFrame 做爲持久化表保存在 Hive metastore 中。須要注意的是現有的 Hive 部署不必使用這個特性。Spark 將會建立一個默認的本地 Hive metastore (使用 Derby)。不一樣於 createOrReplaceTempView 命令,saveAsTable 將會物化 DataFrame 內容並建立一個指向 Hive metastore 中數據的指針。即便 Spark 應用程序重啓, 持久化表也會一直存在,只要你維持到同一個 metastore 的鏈接。經過調用 SparkSession 上帶有表名參數的 table 方法就能夠建立 DataFrame 的一個持久化表。

默認狀況下,saveAsTable 會建立一個 "managed table (託管表或內部表)", 意味着數據的位置將由 metastore 控制。當表被刪除時, 託管表(或內部表)也會自動刪除它們的數據。

##Parquet文件

Parquet 是一種列式存儲格式,不少其它的數據處理系統都支持它。Spark SQL 提供了對可以自動保存原始數據的 schema 的 Parquet 文件的讀寫支持。寫 Parquet 文件時,處於兼容性考慮,全部列都自動地轉換爲nullable。

###編程方式加載數據

仍然使用上面例子中的數據:

#####Scala

// 引入spark.implicits._以自動提供用於大多數經常使用類型的Encoder
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrame能夠保存爲Parquet文件,以維護schema信息
peopleDF.write.parquet("people.parquet")

// 讀入上面建立的parquet文件
// Parquet文件是自描述的,所以schema得以保存
// Parquet文件的加載結果也是一個DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet文件也能夠用於建立一個臨時視圖接着能夠用在SQL語句中
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
// import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrame能夠保存爲Parquet文件, 以維護schema信息
peopleDF.write().parquet("people.parquet");
// 讀入上面建立的parquet文件
// Parquet文件是自描述的,所以schema得以保存
// Parquet文件的加載結果也是一個DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet文件也能夠用於建立一個臨時視圖接着能夠用在SQL語句中
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中。

#####Python

# 以前示例使用的spark變量用在這個示例中

schemaPeople # 以前示例使用的 DataFrame

# DataFrame能夠保存爲Parquet文件,以維護schema信息
schemaPeople.write.parquet("people.parquet")

# 讀入上面建立的parquet文件
# Parquet文件是自描述的,所以schema得以保存
# Parquet文件的加載結果也是一個DataFrame
parquetFile = spark.read.parquet("people.parquet")

# Parquet文件也能夠用於建立一個臨時視圖接着能夠用在SQL語句中
parquetFile.createOrReplaceTempView("parquetFile");
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)

#####Sql

CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

###分區發現(Partition Discovery)

像 Hive 這樣的系統中,表分區是一個經常使用的優化方法。在一個分區表中,數據一般存儲在不一樣的目錄下,分區列值編碼於各個分區目錄的路徑中。Parquet 數據源如今能夠自動地發現和推導分區信息。例如,咱們可使用下面的目錄結構把以前使用的人口數據存儲到一個分區表中,並使用2個額外的列gender和country來做爲分區列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

經過將 path/to/table 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load 方法, Spark SQL 將會自動從路徑中提取分區信息。如今返回的 DataFrame 的 schema 以下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意:分區列的數據類型是自動推導出來的。目前,分區列只支持數值類型和字符串類型。有時候用戶可能不想要自動推導分區列的數據類型,對於這些使用案例,自動類型推導能夠經過 spark.sql.sources.partitionColumnTypeInference.enabled 來配置是否開啓,其默認值是true。當禁用類型推導後,字符串類型將用於分區列類型。

從Spark 1.6.0 版本開始,分區發現(partition discovery)默認只查找給定路徑下的分區。拿上面的例子來講,若是用戶將 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或者 SparkSession.read.load 方法,那麼 gender 將不會被看成分區列。若是用戶想要指定分區發現(partition discovery)開始的基路徑,能夠在數據源選項中設置 basePath。例如,若是 path/to/table/gender=male 是數據路徑,而且用戶將 basePath 設置爲 path/to/table,那麼 gender 將是一個分區列。

###Schema合併(Schema Merging)

和 ProtocolBuffer、Avro 以及 Thrift 同樣,Parquet 也支持 schema 演變。用戶能夠從一個簡單的 schema 開始,逐漸往 schema 中增長所須要的列。經過這種方式,用戶最終可能會獲得多個有着不一樣 schema 但互相兼容 的 Parquet 文件。Parquet 數據源如今可以自動檢測這種狀況併合並全部這些文件的 schema。

由於 schema 合併相對來講是一個代價高昂的操做,而且在大多數狀況下都不須要,因此從Spark 1.5.0 版本開始,默認關閉了Schema合併。你能夠這樣啓用這一功能:

  • 讀取 Parquet 文件時,將數據源選項 mergeSchema 設置爲 true(見下面的示例代碼)
  • 或者,將全局的 SQL 選項 spark.sql.parquet.mergeSchema 設置爲true。

#####Scala

// 用於隱式地將RDD轉換成DataFrame
import spark.implicits._

// 建立一個簡單的DataFrame,將其存儲到一個分區目錄下
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// 在一個新的分區目錄下建立另外一個DataFrame,
// 增長一個新列並刪除一個現有的列
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// 讀取分區表
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// 最終的schema由Parquet文件中全部3列以及出如今分區目錄路徑中的分區列組成
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key : int (nullable = true)

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// 建立一個簡單的DataFrame,將其存儲到一個分區目錄下
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// 在一個新的分區目錄下建立另外一個DataFrame,
// 增長一個新列並刪除一個現有的列
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// 讀取分區表
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// 最終的schema由Parquet文件中全部3列以及出如今分區目錄路徑中的分區列組成
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key : int (nullable = true)

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。

#####Python

# 以前示例使用的spark變量用在這個示例中

# 建立一個簡單的DataFrame,將其存儲到一個分區目錄下
df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.write.parquet("data/test_table/key=1")

# 在一個新的分區目錄下建立另外一個DataFrame,
# 增長一個新列並刪除一個現有的列
df2 = spark.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.write.parquet("data/test_table/key=2")

# 讀取分區表
df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

# 最終的schema由Parquet文件中全部3列以及出如今分區目錄路徑中的分區列組成
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)

# |-- key : int (nullable = true)

###Hive metastore Parquet錶轉換

當讀寫 Hive metastore Parquet 表時,Spark SQL 會使用自帶的 Parquet 支持而不是 Hive SerDe,以便得到更好的性能。這是由 spark.sql.hive.convertMetastoreParquet 配置項控制的,而且默認是開啓的。

####Hive/Parquet Schema調整

從表的 schema 處理的角度來看, Hive 和 Parquet 有2個關鍵的不一樣點:

  1. Hive 是非大小寫敏感的,而 Parquet 是大小寫敏感的。
  2. Hive 認爲全部列都是 nullable,而 Parquet 中爲空性是很重要的。

因爲這個緣由,在將一個 Hive metastore Parquet 錶轉換成一個 Spark SQL Parquet 表時,必需要使 Hive metastore schema 和 Parquet schema 協調一致。調整規則以下:

  1. 無論爲空性如何, 若是兩個 schema 中字段的名稱相同,那麼字段的數據類型也必須相同。調整後的字段應該使用Parquet schema 的數據類型,因此爲空性須要重視。
  2. 調整後的 schema 必須徹底包含 Hive metastore schema 中定義的字段。
    • 只出如今 Parquet schema 中的字段將在調整後的 schema 中丟棄。
    • 只出如今 Hive metastore schema 中的字段將做爲 nullable 字段添加到調整後的 schema 中。

####元數據刷新(Metadata Refreshing)

Spark SQL 會緩存 Parquet 元數據以提升性能。若是啓用了Hive metastore Parquet錶轉換,那麼那些轉換後的表的 schema 也會被緩存起來。若是這些表被 Hive 或其它外部工具更新, 那麼你須要手動地刷新它們以確保元數據的一致性。

#####Scala

// spark是一個已存在的SparkSession對象

spark.catalog.refreshTable("my_table")

#####Java

// spark是一個已存在的SparkSession對象
spark.catalog().refreshTable("my_table");

#####Python

# spark是一個已存在的 HiveContext
spark.refreshTable("my_table")

#####Sql

REFRESH TABLE my_table;

###配置

Parquet 配置可使用 SparkSession 上的 setConf 方法或者經過使用 SQL語句運行 SET key=value 命令來進行設置。

屬性名 默認值 含義
spark.sql.parquet.binaryAsString false 其它一些使用 Parquet 的系統, 特別是 Impala,Hive 以及老版本的 Spark SQL,當寫 Parquet schema 時都不區分二進制數據和字符串。這個標識告訴 Spark SQL 把二進制數據當字符串處理,以兼容這些系統。
spark.sql.parquet.int96AsTimestamp true 一些使用 Parquet 的系統, 特別是 Impala 和 Hive,將 Timestamp 存儲成 INT96。這個標識告訴 Spark SQL 將 INT96 數據解析成 Timestamp,以兼容這些系統。
spark.sql.parquet.cacheMetadata true 開啓 Parquet schema 元數據緩存。能夠提高查詢靜態數據的速度。
spark.sql.parquet.compression.codec gzip 設置寫 Parquet 文件使用的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip, lzo
spark.sql.parquet.filterPushdown true 設置爲 true 時啓用 Parquet 過濾器 push-down 優化
spark.sql.hive.convertMetastoreParquet true 設置爲false時,Spark SQL將使用 Hive SerDe 而不是內置的 parquet 表支持
spark.sql.parquet.mergeSchema false 若是設爲 true,Parquet 數據源將會合並全部數據文件的 schema,不然,從彙總文件或隨機選取一個數據文件(若是沒有彙總文件)選取schema

##JSON數據集

#####Scala

Spark SQL 能夠自動推導一個 JSON 數據集的 schema 並將其加載爲一個 Dataset[Row]。這種轉換能夠經過在一個 String 的 RDD 或一個 JSON 文件上使用 SparkSession.read.json() 方法來完成。

注意: 做爲_json文件_提供的文件並非一個典型的JSON文件。JSON 文件的每一行必須包含一個獨立的、完整有效的 JSON 對象。所以,一個常規的多行 json 文件常常會加載失敗。

// 由路徑指向的JSON數據集,這個路徑既能夠是一個簡單文本文件也能夠是一個存儲文本文件的目錄
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// 推導出的schema可使用printSchema()方法可視化
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// 使用DataFrame建立一個臨時視圖
peopleDF.createOrReplaceTempView("people")

// 使用spark提供的sql方法能夠運行SQL語句
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// 或者, 能夠爲JSON數據集建立一個由RDD[String]表示的DataFrame,每一個字符串存儲一個JSON對象
val otherPeopleRDD = spark.sparkContext.makeRDD(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 文件。

#####Java

Spark SQL 能夠自動推導一個 JSON 數據集的 schema 並將其加載爲一個 Dataset<Row>。這種轉換能夠經過在一個 String 的 RDD 或一個 JSON 文件上使用 SparkSession.read.json() 方法來完成。

注意: 做爲_json文件_提供的文件並非一個典型的JSON文件。JSON 文件的每一行必須包含一個獨立的、完整有效的 JSON 對象。所以,一個常規的多行 json 文件常常會加載失敗。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// 由路徑指向的JSON數據集,這個路徑既能夠是一個簡單文本文件也能夠是一個存儲文本文件的目錄
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// 推導出的schema可使用printSchema()方法可視化
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// 使用DataFrame建立一個臨時視圖
people.createOrReplaceTempView("people");

// 使用spark提供的sql方法能夠運行SQL語句
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 文件。

#####Python

Spark SQL 能夠自動推導一個 JSON 數據集的 schema 並將其加載爲一個 DataFrame。這種轉換能夠經過在一個 JSON 文件上使用 SparkSession.read.json 方法來完成。

注意: 做爲_json文件_提供的文件並非一個典型的JSON文件。JSON 文件的每一行必須包含一個獨立的、完整有效的 JSON 對象。所以,一個常規的多行 json 文件常常會加載失敗。

# spark是一個已存在的SparkSession對象

# 由路徑指向的JSON數據集,這個路徑既能夠是一個簡單文本文件也能夠是一個存儲文本文件的目錄
people = spark.read.json("examples/src/main/resources/people.json")

# 推導出的schema可使用printSchema()方法可視化
people.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# 使用DataFrame建立一個臨時視圖
people.createOrReplaceTempView("people")

# 使用spark提供的sql方法能夠運行SQL語句
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 或者, 能夠爲JSON數據集建立一個由RDD[String]表示的DataFrame,每一個字符串存儲一個JSON對象
anotherPeopleRDD = sc.parallelize([
  '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
anotherPeople = spark.jsonRDD(anotherPeopleRDD)

#####Sql

CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

##Hive表

Spark SQL 還支持讀寫存儲於 Apache Hive 中的數據。然而因爲 Hive 的依賴項太多,在默認的 Spark 發行版本中並無包含這些依賴。Spark 會自動加載 classpath 上的 Hive 依賴。注意:這些 Hive 依賴也必須放到全部的 worker 節點上,由於若是要訪問 Hive 中的數據它們須要訪問Hive 序列化和反序列化庫(SerDes)。

能夠將 hive-site.xml,core-site.xml(用於安全配置)以及 hdfs-site.xml(用於HDFS配置)文件放置在 conf/ 目錄下來完成 Hive 配置。

使用 Hive 時, 你必需要實例化一個支持 Hive 的 SparkSession, 包括鏈接到一個持久化的 Hive metastore, Hive serdes 以及 Hive 用戶自定義函數。即便用戶沒有安裝部署 Hive 也仍然能夠啓用Hive支持。若是沒有在 hive-site.xml 文件中配置, Spark應用程序啓動以後,上下文會自動在當前目錄下建立一個 metastore_db 目錄並建立一個由 spark.sql.warehouse.dir 配置的、默認值是當前目錄下的 spark-warehouse 目錄的目錄。注意: 從 Spark 2.0.0 版本開始, hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已通過時, 你可使用 spark.sql.warehouse.dir 來指定 Hive倉庫中數據庫的默認存儲位置。你可能還須要給啓動 Spark 應用程序的用戶賦予寫權限。

#####Scala

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation 指向用於管理數據庫和表的默認路徑
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// HiveQL表示的查詢語句
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// 還支持聚合查詢
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// SQL查詢結果自己就是DataFrame並支持全部標準函數
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// DaraFrame中數據項的類型是Row,它容許你有序的訪問每一列
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// 你也能夠在HiveContext內部使用DataFrame來建立臨時視圖
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// 能夠關聯查詢DataFrame數據和存儲在Hive中的數據
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" 文件。

#####Java

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation 指向用於管理數據庫和表的默認路徑
String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// HiveQL表示的查詢語句
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// 還支持聚合查詢
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// SQL查詢結果自己就是DataFrame並支持全部標準函數
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// DaraFrame中數據項的類型是Row,它容許你有序的訪問每一列
Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
  @Override
  public String call(Row row) throws Exception {
    return "Key: " + row.get(0) + ", Value: " + row.get(1);
  }
}, Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// 你也能夠在HiveContext內部使用DataFrame來建立臨時視圖
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// 能夠關聯查詢DataFrame數據和存儲在Hive中的數據
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" 文件。

#####Python

# spark是一個已存在的SparkSession對象

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# HiveQL表示的查詢語句

results = spark.sql("FROM src SELECT key, value").collect()

###與不一樣版本的Hive Metastore交互

Spark SQL 對 Hive 最重要的一個支持就是能夠和 Hive metastore 進行交互,這使得 Spark SQL 能夠訪問 Hive 表的元數據。從 Spark 1.4.0 版本開始,經過使用下面描述的配置, Spark SQL一個簡單的二進制編譯版本能夠用來查詢不一樣版本的 Hive metastore。注意:無論用於訪問 metastore 的 Hive 是什麼版本,Spark SQL 內部都使用 Hive 1.2.1 版本進行編譯, 而且使用這個版本的一些類用於內部執行(serdes,UDFs,UDAFs等)。

下面的選項可用來配置用於檢索元數據的 Hive 版本:

屬性名 默認值 含義
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本,可用選項從0.12.0 到 1.2.1
spark.sql.hive.metastore.jars builtin 存放用於實例化 HiveMetastoreClient 的 jar 包位置。這個屬性能夠是下面三個選項之一:1. builtin。使用 Hive 1.2.1 版本,當啓用 -Phive 時會和 Spark一塊兒打包。若是使用了這個選項, 那麼 spark.sql.hive.metastore.version 要麼是1.2.1,要麼就不定義。2. maven。使用從 Maven 倉庫下載的指定版本的 Hive jar包。生產環境部署一般不建議使用這個選項。3. 標準格式的 JVM classpath。這個 classpath 必須包含全部 Hive 及其依賴的 jar包,包括正確版本的 Hadoop。這些 jar 包只須要部署在 driver 節點上,可是若是你使用 yarn cluster模式運行,那麼你必需要確保這些 jar 包是和應用程序一塊兒打包的。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 使用類加載器加載的一個逗號分隔的類名前綴列表,它能夠在 Spark SQL 和特定版本的 Hive 間共享。一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。其它須要共享的類,是須要與已經共享的類進行交互的類。例如,log4j 使用的自定義 appender
spark.sql.hive.metastore.barrierPrefixes (empty) 一個逗號分隔的類名前綴列表,對於 Spark SQL 訪問的每一個 Hive 版本都須要顯式地從新加載這個列表。例如,在一個共享前綴列表(org.apache.spark.*)中聲明的 Hive UDF一般須要被共享。

##使用JDBC鏈接其它數據庫

Spark SQL 還包括一個可使用 JDBC 從其它數據庫讀取數據的數據源。該功能比使用 JdbcRDD 更好,由於它的返回結果是一個 DataFrame,這樣能夠很容易地在 Spark SQL 中進行處理或和其它數據源進行關聯操做。JDBC 數據源在 Java 和 Python 中用起來很簡單,由於不須要用戶提供一個 ClassTag。(注意,這和 Spark SQL JDBC server 不一樣,Spark SQL JDBC server 容許其它應用程序使用 Spark SQL 執行查詢)

首先,你須要在 Spark classpath 中包含特定數據庫的 JDBC driver。例如,爲了從 Spark Shell鏈接到 postgres 數據庫,你須要運行下面的命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

經過使用數據源 API, 遠程數據庫的表能夠加載爲一個 DataFrame 或 Spark SQL 臨時表。支持的一些選項以下:

屬性名 含義
url 鏈接的 JDBC URL
dbtable 須要讀取的 JDBC 表。注意:可使用一個 SQL 查詢語句的 FROM 子句中任何有效內容。例如, 除使用一個完整的表名以外,你還可使用括號中的子查詢。
driver 用來鏈接到 JDBC URL 的 JDBC driver 類名。
partitionColumn, lowerBound, upperBound, numPartitions 這幾個選項,若是指定了其中一個,那麼其它選項必須所有指定。它們描述了從多個 worker 並行讀入數據時如何進行表分區。partitionColumn 必須是所查詢表中的一個數值類型的列。注意,lowerBound 和 upperBound 只是用來決定分區跨度的,而不是過濾表中的行。所以,表中全部的行都會被分區而後返回。
fetchSize JDBC fetch size,用來限制每次獲取多少行數據。默認設置成一個比較小的 fetch size (如,Oracle上設爲10)有助於 JDBC驅動上的性能提高。

#####Scala

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

#####Java

Map<String, String> options = new HashMap<>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");
Dataset<Row> jdbcDF = spark.read().format("jdbc"). options(options).load();

#####Python

df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()

#####Sql

CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename"
)

##故障排查

  • 在客戶端會話以及全部的executor上,JDBC 驅動器類必須對啓動類加載器可見,這是由於 Java 的 DriverManager 類在打開一個鏈接以前會作一個安全檢查,這樣就致使它忽略了對於啓動類加載器不可見的全部驅動器。一種簡單的方法就是修改全部 worker 節點上的 compute_classpath.sh 以包含你驅動器的 jar 包。

  • 有些數據庫,好比H2,會把全部的名稱轉換成大寫。在Spark SQL中你也須要使用大寫來引用這些名稱。

#性能優化

對於有些負載的 Spark 任務,能夠將數據放入內存進行緩存或開啓一些試驗選項來提高性能。

##在內存中緩存數據

經過調用 spark.cacheTable(「tableName」) 或者 dataFrame.cache() 方法, Spark SQL 可使用一種內存列存儲格式緩存表。而後 Spark SQL 只掃描必要的列,而且自動調整壓縮比例,以最小化內存佔用和GC壓力。你能夠調用 spark.uncacheTable(「tableName」) 方法刪除內存中的表。

可使用 SparkSession 類中的 setConf 方法或經過 SQL 語句運行 SET key=value 命令來完成內存緩存配置。

屬性名 默認值 含義
spark.sql.inMemoryColumnarStorage.compressed true 若是設置爲 true,Spark SQL 將會基於統計數據自動地爲每一列選擇一種壓縮編碼方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。

##其它配置選項

下面的選項也能夠用來提高執行的查詢性能。隨着 Spark 自動地執行愈來愈多的優化操做, 這些選項在將來的發佈版本中可能會廢棄。

屬性名 默認值 含義
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 讀取文件時單個分區可容納的最大字節數
spark.sql.files.openCostInBytes 4194304 (4 MB) 打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
spark.sql.shuffle.partitions 200 用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。

#分佈式SQL引擎

經過使用 JDBC/ODBC 或者命令行接口,Spark SQL 還能夠做爲一個分佈式查詢引擎。在這種模式下,終端用戶或應用程序能夠運行 SQL 查詢來直接與 Spark SQL 交互,而不須要編寫任何代碼。

##運行Thrift JDBC/ODBC server

這裏實現的Thrift JDBC/ODBC server 對應於 Hive 1.2.1 版本中的 HiveServer2。你可使用Spark 或者 Hive 1.2.1 自帶的 beeline 腳原本測試這個 JDBC server。

要啓動 JDBC/ODBC server, 須要在 Spark 安裝目錄下運行下面這個命令:

./sbin/start-thriftserver.sh

這個腳本能接受全部 bin/spark-submit 命令行選項,外加一個用於指定 Hive 屬性的 --hiveconf 選項。你能夠運行 ./sbin/start-thriftserver.sh --help 來查看全部可用選項的一個完整列表。默認狀況下,啓動的 server 將會在 localhost:10000 上進行監聽。你能夠覆蓋該行爲, 好比使用如下環境變量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者系統屬性:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

如今你可使用 beeline 來測試這個 Thrift JDBC/ODBC server:

./bin/beeline

在 beeline 中使用如下命令鏈接到 JDBC/ODBC server:

beeline> !connect jdbc:hive2://localhost:10000

Beeline 會要求你輸入用戶名和密碼。在非安全模式下,只須要輸入你本機的用戶名和一個空密碼便可。對於安全模式,請參考 beeline 文檔中的指示.

將 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放置在 conf/ 目錄下能夠完成 Hive 配置。

你也可使用Hive 自帶的 beeline 的腳本。

Thrift JDBC server 還支持經過 HTTP 傳輸來發送 Thrift RPC 消息。使用下面的設置來啓用 HTTP 模式:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

爲了測試,下面在 HTTP 模式中使用 beeline 鏈接到 JDBC/ODBC server:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

##運行Spark SQL CLI

Spark SQL CLI 是一個很方便的工具,它能夠在本地模式下運行 Hive metastore 服務,而且執行從命令行中輸入的查詢語句。注意:Spark SQL CLI 沒法與 Thrift JDBC server 通訊。

要啓動用Spark SQL CLI, 能夠在Spark安裝目錄運行下面的命令:

./bin/spark-sql

將 hive-site.xml,core-site.xml 以及 hdfs-site.xml 文件放置在 conf/ 目錄下能夠完成 Hive 配置。你能夠運行 ./bin/spark-sql –help 來獲取全部可用選項的一個完整列表。

#遷移指南

##Spark SQL從1.6版本升級到2.0版本

  • SparkSession 如今是 Spark 新的切入點, 它替代了老的 SQLContext 和 HiveContext。注意:爲了向下兼容, 老的 SQLContext 和 HiveContext 仍然保留。能夠從 SparkSession 獲取一個新的 catalog 接口- 現有的訪問數據庫和表的API, 如 listTables, createExternalTable, dropTempView, cacheTable 都被移到該接口。

  • Dataset API 和 DataFrame API 進行了統一。在 Scala 中, DataFrame 變成了 Dataset[Row] 的一個類型別名, 而 Java API 使用者必須將 DataFrame 替換成 Dataset<Row>。Dataset 類既提供了強類型變換操做 (如 map, filter 以及 groupByKey) 也提供了非強類型變換操做 (如 select 和 groupBy) 。因爲編譯期的類型安全不是 Python 和 R 語言的一個特性, Dataset 的概念並不適用於這些語言的 API。相反, DataFrame 仍然是最基本的編程抽象, 就相似於這些語言中單節點數據幀的概念。

  • Dataset 和 DataFrame API 中 unionAll 已通過時而且由union替代。

  • Dataset 和 DataFrame API 中 explode 已通過時。或者 functions.explode() 能夠結合 select 或 flatMap 一塊兒使用。

  • Dataset 和 DataFrame API 中 registerTempTable 已通過時而且由 createOrReplaceTempView替代。

##Spark SQL從1.5版本升級到1.6版本

  • 從 Spark 1.6 版本開始,Thrift server 默認運行於多會話模式下, 這意味着每一個 JDBC/ODBC 鏈接都有獨有一份 SQL 配置和臨時函數註冊表的拷貝。儘管如此, 緩存的表仍然能夠共享。若是你更喜歡在老的單會話模式中運行 Thrift server,只須要將 spark.sql.hive.thriftServer.singleSession 選項設置爲 true 便可。固然,你也可在 spark-defaults.conf 文件中添加這個選項,或者經過 --conf 將其傳遞給 start-thriftserver.sh:

     

./sbin/start-thriftserver.sh
--conf spark.sql.hive.thriftServer.singleSession=true
... ```

  • 從1.6.1版本開始, sparkR 中的 withColumn 方法支持向 DataFrame 新增一列 或 替換已有的名稱相同的列。

  • 從 Spark 1.6 版本開始, LongType 轉換成 TimestampType 將源值以秒而不是毫秒做爲單位處理。作出這個變動是爲了的匹配 Hive 1.2 版本中從數值類型轉換成TimestampType的這個行爲以得到更一致的類型。更多細節請參見 SPARK-11724

##Spark SQL從1.4版本升級到1.5版本

  • 使用手動管理內存(Tungsten引擎)的執行優化以及用於表達式求值的代碼自動生成如今默認是啓用的。這些特性能夠經過將 spark.sql.tungsten.enabled 的值設置爲 false 來同時禁用。

  • 默認不啓用 Parquet schema 合併。能夠將 spark.sql.parquet.mergeSchema 的值設置爲 true 來從新啓用。

  • Python 中對於列的字符串分解如今支持使用點號(.)來限定列或訪問內嵌值,例如 df[‘table.column.nestedField’]。然而這也意味着若是你的列名包含任何點號(.)的話,你就必需要使用反引號來轉義它們(例如:table.column.with.dots.nested)。

  • 默認啓用內存中列式存儲分區修剪。能夠經過設置 spark.sql.inMemoryColumarStorage.partitionPruning 值爲 false 來禁用它。

  • 再也不支持無精度限制的 decimal,相反, Spark SQL 如今強制限制最大精度爲38位。從 BigDecimal 對象推導 schema 時會使用(38,18)這個精度。若是在 DDL 中沒有指定精度,則默認使用精度 Decimal(10,0)。

  • 存儲的時間戳(Timestamp)如今精確到1us(微秒),而不是1ns(納秒)

  • 在 sql 方言中,浮點數如今被解析成 decimal。HiveQL 的解析保持不變。

  • SQL/DataFrame 函數的規範名稱均爲小寫(例如:sum vs SUM)。

  • JSON 數據源不會再自動地加載其餘應用程序建立的新文件(例如,不是由 Spark SQL 插入到dataset中的文件)。對於一個 JSON 持久化表(例如:存儲在 Hive metastore 中的表的元數據),用戶可使用 REFRESH TABLE 這個 SQL 命令或者 HiveContext 的 refreshTable 方法來把新文件添加進表。對於一個表示 JSON 數據集的 DataFrame, 用戶須要重建這個 DataFrame, 這樣新的 DataFrame 就會包含新的文件。

  • pySpark 中的 DataFrame.withColumn 方法支持新增一列或是替換名稱相同列。

##Spark SQL從1.3版本升級到1.4版本

###DataFrame數據讀寫接口

根據用戶的反饋,咱們提供了一個用於數據讀入(SQLContext.read)和數據寫出(DataFrame.write)的新的、更加流暢的API,同時老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)將被廢棄。

有關 SQLContext.read ( Scala, Java, Python ) 和 DataFrame.write ( Scala, Java, Python ) 的更多信息,請參考API文檔。

###DataFrame.groupBy保留分組列

根據用戶的反饋,咱們改變了 DataFrame.groupBy().agg() 的默認行爲,就是在返回的 DataFrame 結果中保留分組的列。若是你想保持1.3版本中的行爲,能夠將 spark.sql.retainGroupColumns 設置爲 false。

#####Scala

// 在1.3.x版本中, 爲了顯示分組列 "department", 它必需要做爲 agg 函數調用的一部分顯示地包含進來
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 在1.4+版本中, 分組列 "department"自動包含進來
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滾到 1.3 的行爲 (不包含分組列), 能夠進行以下設置:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

#####Java

// 在1.3.x版本中, 爲了顯示分組列 "department", 它必需要做爲 agg 函數調用的一部分顯示地包含進來
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// 在1.4+版本中, 分組列 "department"自動包含進來
df.groupBy("department").agg(max("age"), sum("expense"));

// 要回滾到 1.3 的行爲 (不包含分組列), 能夠進行以下設置:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

#####Python

import pyspark.sql.functions as func

# 在1.3.x版本中, 爲了顯示分組列 "department", 它必需要做爲 agg 函數調用的一部分顯示地包含進來
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# 在1.4+版本中, 分組列 "department"自動包含進來
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# 要回滾到 1.3 的行爲 (不包含分組列), 能夠進行以下設置:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

###DataFrame.withColumn行爲變動

1.4版本以前, DataFrame.withColumn() 只支持新增一列。在 DataFrame 結果中指定名稱的列老是做爲一個新列添加進來,即便已經存在了相同名稱的列。從1.4版本開始, DataFrame.withColumn() 支持新增一個和現有列名不重複的新列和替換有相同名稱的列。

注意:這個變動只針對 Scala API, 不針對 PySpark 和 SparkR。

##Spark SQL從1.0-1.2版本升級到1.3版本

Spark 1.3 版本咱們去掉了 Spark SQL 的 "Alpha" 標籤而且清理了現有的API。從 Spark 1.3 版本開始,Spark SQL 將提供 1.x 系列中其它發行版本的二進制兼容。這個兼容性保證不包括顯式地標註爲不穩定(例如:DeveloperAPI 或 Experimental)的 API。

###SchemaRDD重命名爲DataFrame

升級到 Spark SQL 1.3 後,用戶將會注意到最大的改動就是 SchemaRDD 更名爲 DataFrame。主要緣由是 DataFrame 再也不直接繼承於 RDD,而是經過本身的實現來提供 RDD 中提供的絕大多數功能。經過調用 .rdd 方法 DataFrame 仍然能夠轉換成 RDD。

在 Scala 中有一個從 SchemaRDD 到 DataFrame 的類型別名來提供某些使用場景下的代碼兼容性。但仍然建議用戶在代碼中改用 DataFrame。Java 和 Python 用戶必需要修改代碼。

###統一Java和Scala API

Spark 1.3 以前的版本中有兩個單獨的 Java 兼容類(JavaSQLContext 和 JavaSchemaRDD)能夠映射到 Scala API。Spark 1.3 版本將 Java API 和 Scala API 進行了統一。兩種語言的用戶都應該使用 SQLContext 和 DataFrame。一般狀況下這些類都會使用兩種語言中都支持的類型(例如:使用 Array 來取代語言特有的集合)。有些狀況下沒有通用的類型(例如:閉包或maps中用於傳值),則會使用函數重載。

另外,移除了 Java 特有的類型 API。Scala 和 Java 用戶都應該使用 org.apache.spark.sql.types 包中的類來編程式地描述 schema。

###隔離隱式轉換並移除dsl包 (只針對Scala)

Spark 1.3 版本以前的不少示例代碼都以 import sqlContext._ 語句做爲開頭,這樣會引入 sqlContext的全部函數。在Spark 1.3 版本中咱們隔離了 RDD 到 DataFrame的隱式轉換,將其單獨放到 SQLContext 內部的一個對象中。用戶如今應該這樣寫:import sqlContext.implicits._。

另外,隱式轉換如今也只能使用 toDF 方法來增長由 Product(例如:case classes 或 元祖)組成的 RDD,而不是自動轉換。

使用 DSL(如今被 DataFrame API取代)的內部方法時,用戶須要引入 import org.apache.spark.sql.catalyst.dsl。而如今應該要使用公用的 DataFrame 函數 API:import org.apache.spark.sql.functions._

###移除org.apache.spark.sql包中DataType的類型別名 (只針對Scala)

Spark 1.3 版本刪除了基礎 sql 包中 DataType 的類型別名。開發人員應該引入 org.apache.spark.sql.types 中的類。

###UDF註冊遷移到sqlContext.udf中 (針對Java和Scala)

用於註冊 UDF 的函數,無論是 DataFrame DSL 仍是 SQL 中用到的,都被遷移到 SQLContext中的 udf 對象中。

#####Scala

sqlContext.udf.register("strLen", (s: String) => s.length())

#####Java

sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF註冊保持不變。

###Python的DataType再也不是單例的

在 Python 中使用 DataTypes 時,你須要先構造它們(如:StringType()),而不是引用一個單例對象。

##兼容Apache Hive

Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基於 Hive 1.2.1 版本,而且 Spark SQL 能夠鏈接到不一樣版本的 Hive metastore(從0.12.0到1.2.1,能夠參考[與不一樣版本的Hive Metastore交互])

###在現有的Hive倉庫中部署

Spark SQL Thrift JDBC server 採用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不須要修改現有的 Hive Metastore , 或者改變數據的位置和表的分區。

###支持的Hive特性

Spark SQL 支持絕大部分的 Hive 功能,如:

  • Hive 查詢語句, 包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部的 Hive 運算符, 包括:
    • 關係運算符 (=, ⇔, ==, <>, <, >, >=, <=, etc)
    • 算術運算符 (+, -, *, /, %, etc)
    • 邏輯運算符 (AND, &&, OR, ||, etc)
    • 複雜類型構造器
    • 數學函數 (sign, ln, cos等)
    • String 函數 (instr, length, printf等)
  • 用戶自定義函數(UDF)
  • 用戶自定義聚合函數(UDAF)
  • 用戶自定義序列化格式(SerDes)
  • 窗口函數
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 子查詢
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 採樣
  • Explain
  • 分區表,包括動態分區插入
  • 視圖
  • 全部Hive DDL功能, 包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 絕大多數 Hive 數據類型,包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

###不支持的Hive功能

如下是目前還不支持的 Hive 功能列表。在Hive部署中這些功能大部分都用不到。

####Hive核心功能

  • bucket:bucket 是 Hive表分區內的一個哈希分區,Spark SQL 目前還不支持 bucket。

####Hive高級功能

  • UNION 類型
  • Unique join
  • 列統計數據收集:Spark SQL 目前不依賴掃描來收集列統計數據而且僅支持填充 Hive metastore 的 sizeInBytes 字段。

####Hive輸入輸出格式

  • CLI文件格式:對於回顯到CLI中的結果,Spark SQL 僅支持 TextOutputFormat。
  • Hadoop archive

####Hive優化

有少數Hive優化尚未包含在 Spark 中。其中一些(好比索引)因爲 Spark SQL 的這種內存計算模型而顯得不那麼重要。另一些在 Spark SQL 將來的版本中會持續跟蹤。

  • 塊級別位圖索引和虛擬列(用來建索引)
  • 自動爲 join 和 groupBy 計算 reducer 個數:目前在 Spark SQL 中,你須要使用 "SET spark.sql.shuffle.partitions=[num_tasks];" 來控制後置混洗的並行程度。
  • 僅查詢元數據:對於只須要使用元數據的查詢請求,Spark SQL 仍須要啓動任務來計算結果
  • 數據傾斜標誌:Spark SQL 不遵循 Hive 中的數據傾斜標誌
  • STREAMTABLE join 操做提示:Spark SQL不遵循 STREAMTABLE 提示。
  • 對於查詢結果合併多個小文件:若是返回的結果有不少小文件,Hive 有個選項設置,來合併小文件,以免超過 HDFS 的文件數額度限制。Spark SQL 不支持這個。

#參考

##數據類型

Spark SQL和 DataFrames 支持下面的數據類型:

  • 數值類型

    • ByteType: 表示1字節長的有符號整型,數值範圍:-128 到 127.
    • ShortType: 表示2字節長的有符號整型,數值範圍:-32768 到 32767.
    • IntegerType: 表示4字節長的有符號整型,數值範圍:-2147483648 到 2147483647.
    • LongType: 表示8字節長的有符號整型,數值範圍: -9223372036854775808 to 9223372036854775807.
    • FloatType: 表示4字節長的單精度浮點數。
    • DoubleType: 表示8字節長的雙精度浮點數
    • DecimalType: 表示任意精度有符號帶小數的數值。內部使用java.math.BigDecimal, 一個BigDecimal 由一個任意精度的整數非標度值和一個32位的整數標度 (scale) 組成。
  • 字符串類型

    • StringType: 表示字符串值
  • 二進制類型

    • BinaryType: 表示字節序列值
  • 布爾類型

    • BooleanType: 表示布爾值
  • 日期類型

    • TimestampType: 表示包含年月日、時分秒等字段的日期值
    • DateType: 表示包含年月日字段的日期值
  • Complex types(複雜類型)

    • ArrayType(elementType, containsNull):數組類型,表示一個由類型爲elementType的元素組成的序列,containsNull用來表示ArrayType中的元素是否能爲null值。
    • MapType(keyType, valueType, valueContainsNull):映射類型,表示一個鍵值對的集合。鍵的類型由keyType表示,值的類型則由valueType表示。對於一個MapType值,鍵是不容許爲null值。valueContainsNull用來表示一個MapType的值是否能爲null值。
  • StructType(fields):表示由StructField序列描述的結構。

    • StructField(name, datatype, nullable): 表示StructType中的一個字段,name表示字段名,datatype是字段的數據類型,nullable用來表示該字段是否能夠爲空值。

#####Scala

Spark SQL 全部的數據類型都放在 org.apache.spark.sql.types 這個包下。你能夠這樣獲取它們:

import org.apache.spark.sql.types._

完整示例代碼參見 Spark 倉庫中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 文件。

數據類型 Scala中值類型 用於獲取或建立一個數據類型的API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:containsNull 的默認值是true
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])注意:valueContainsNull 的默認值是true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一個StructField序列,另外不容許出現名稱重複的字段。
StructField Scala中該字段的數據類型對應的值類型(例如,若是StructField的數據類型爲IntegerType,則Scala中其值類型爲Int) StructField(name, dataType, nullable)

#####Java

Spark SQL全部的數據類型都放在 org.apache.spark.sql.types 這個包下。爲了獲取或建立一個數據類型, 請使用 org.apache.spark.sql.types.DataTypes 中提供的工廠方法。

數據類型 Java中值類型 用於獲取或建立一個數據類型的API
ByteType byte 或 Byte DataTypes.ByteType
ShortType short 或 Short DataTypes.ShortType
IntegerType int 或 Integer DataTypes.IntegerType
LongType long 或 Long DataTypes.LongType
FloatType float 或 Float DataTypes.FloatType
DoubleType double 或 Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType(),DataTypes.createDecimalType(precision, scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean 或 Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType) 注意:containsNull 的默認值是true, DataTypes.createArrayType(elementType, containsNull)
MapType java.util.Map DataTypes.createMapType(keyType, valueType) 注意:valueContainsNull 的默認值是true, DataTypes.createMapType(keyType, valueType,valueContainsNull)
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields) 注意:fields 是一個StructField 列表或數組,另外不容許出現名稱重複的字段。
StructField Java中該字段的數據類型對應的值類型 (例如,若是StructField的數據類型爲IntegerType,則Java中其值類型爲 int) DataTypes.createStructField(name, dataType, nullable)

#####Python

Spark SQL全部的數據類型都放在 pyspark.sql.types 這個包下. 你能夠這樣獲取它們:

from pyspark.sql.types import *
數據類型 Python中值類型 用於獲取或建立一個數據類型的API
ByteType int 或 long,注意: 數字在運行時會被轉化成1字節的有符號整數。請確保數字是在 -128 到 127這個範圍內 ByteType()
ShortType int 或 long,注意: 數字在運行時會被轉化成2字節的有符號整數。請確保數字是在 -32768 到 32767這個範圍內。 ShortType()
IntegerType int 或 long IntegerType()
LongType long,注意: 數字在運行時會被轉化成8字節的有符號整數。請確保數字是在 -9223372036854775808 到 9223372036854775807這個範圍內.不然, 請將數據轉化成 decimal.Decimal 並使用 DecimalType LongType()
FloatType float,注意: 數字在運行時會被轉化成4字節的單精度浮點數。 FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple 或 array ArrayType(elementType, [containsNull]) 注意:containsNull 的默認值是true
MapType dict MapType(keyType, valueType, [valueContainsNull]) 注意:valueContainsNull 的默認值是True
StructType list 或 tuple StructType(fields) 注意:fields 是一個StructFields序列,另外不容許出現名稱重複的字段。
StructField Python中該字段的數據類型對應的值類型 (例如,若是StructField的數據類型爲IntegerType,則Python中其值類型爲 Int) StructField(name, dataType, nullable)

##NaN語義

當處理一些不符合標準浮點數語義的 float 或 double 類型時,對於 Not-a-Number(NaN) 須要作一些特殊處理。具體以下:

  • NaN = NaN 返回 true。
  • 在聚合操做中,全部 NaN 值都被分到同一組。
  • 在join key中NaN能夠當作一個普通的值。
  • NaN值在升序排序中排到最後,比任何其餘數值都大。
相關文章
相關標籤/搜索