《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南

《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南

spark-1.6.0 [原文地址]html

Spark SQL, DataFrames 以及 Datasets 編程指南

概要

Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不一樣,Spark SQL的接口提供了更多關於數據的結構信息和計算任務的運行時信息。在Spark內部,Spark SQL會可以用於作優化的信息比RDD API更多一些。Spark SQL現在有了三種不一樣的API:SQL語句、DataFrame API和最新的Dataset API。不過真正運行計算的時候,不管你使用哪一種API或語言,Spark SQL使用的執行引擎都是同一個。這種底層的統一,使開發者能夠在不一樣的API之間來回切換,你能夠選擇一種最天然的方式,來表達你的需求。java

 

本文中全部的示例都使用Spark發佈版本中自帶的示例數據,而且能夠在spark-shell、pyspark shell以及sparkR shell中運行。python

 

SQL

Spark SQL的一種用法是直接執行SQL查詢語句,你可以使用最基本的SQL語法,也能夠選擇HiveQL語法。Spark SQL能夠從已有的Hive中讀取數據。更詳細的請參考Hive Tables 這一節。若是用其餘編程語言運行SQL,Spark SQL將以DataFrame返回結果。你還能夠經過命令行command-line 或者 JDBC/ODBC 使用Spark SQL。mysql

DataFrames

DataFrame是一種分佈式數據集合,每一條數據都由幾個命名字段組成。概念上來講,她和關係型數據庫的表 或者 R和Python中的data frame等價,只不過在底層,DataFrame採用了更多優化。DataFrame能夠從不少數據源(sources)加載數據並構造獲得,如:結構化數據文件,Hive中的表,外部數據庫,或者已有的RDD。sql

DataFrame API支持ScalaJavaPython, and Rshell

Datasets

Dataset是Spark-1.6新增的一種API,目前仍是實驗性的。Dataset想要把RDD的優點(強類型,可使用lambda表達式函數)和Spark SQL的優化執行引擎的優點結合到一塊兒。Dataset能夠由JVM對象構建(constructed )獲得,然後Dataset上可使用各類transformation算子(map,flatMap,filter 等)。數據庫

Dataset API 對 Scala 和 Java的支持接口是一致的,但目前還不支持Python,不過Python自身就有語言動態特性優點(例如,你可使用字段名來訪問數據,row.columnName)。對Python的完整支持在將來的版本會增長進來。apache

入門

入口:SQLContext

Spark SQL全部的功能入口都是SQLContext 類,及其子類。不過要建立一個SQLContext對象,首先須要有一個SparkContext對象。編程

val sc: SparkContext // 假設已經有一個 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用於包含RDD到DataFrame隱式轉換操做
import sqlContext.implicits._

除了SQLContext以外,你也能夠建立HiveContext,HiveContext是SQLContext 的超集。json

除了SQLContext的功能以外,HiveContext還提供了完整的HiveQL語法,UDF使用,以及對Hive表中數據的訪問。要使用HiveContext,你並不須要安裝Hive,並且SQLContext能用的數據源,HiveContext也同樣能用。HiveContext是單獨打包的,從而避免了在默認的Spark發佈版本中包含全部的Hive依賴。若是這些依賴對你來講不是問題(不會形成依賴衝突等),建議你在Spark-1.3以前使用HiveContext。然後續的Spark版本,將會逐漸把SQLContext升級到和HiveContext功能差很少的狀態。

spark.sql.dialect選項能夠指定不一樣的SQL變種(或者叫SQL方言)。這個參數能夠在SparkContext.setConf裏指定,也能夠經過 SQL語句的SET key=value命令指定。對於SQLContext,該配置目前惟一的可選值就是」sql」,這個變種使用一個Spark SQL自帶的簡易SQL解析器。而對於HiveContext,spark.sql.dialect 默認值爲」hiveql」,固然你也能夠將其值設回」sql」。僅就目前而言,HiveSQL解析器支持更加完整的SQL語法,因此大部分狀況下,推薦使用HiveContext。

建立DataFrame

Spark應用能夠用SparkContext建立DataFrame,所需的數據來源能夠是已有的RDD(existing RDD),或者Hive表,或者其餘數據源(data sources.)

如下是一個從JSON文件建立DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 將DataFrame內容打印到stdout
df.show()

DataFrame操做

DataFrame提供告終構化數據的領域專用語言支持,包括ScalaJavaPython and R.

這裏咱們給出一個結構化數據處理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立一個 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展現 DataFrame 的內容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印數據樹形結構
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展現全部人,但全部人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 篩選出年齡大於21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 計算各個年齡的人數
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrame的完整API列表請參考這裏:API Documentation

除了簡單的字段引用和表達式支持以外,DataFrame還提供了豐富的工具函數庫,包括字符串組裝,日期處理,常見的數學函數等。完整列表見這裏:DataFrame Function Reference.

編程方式執行SQL查詢

SQLContext.sql能夠執行一個SQL查詢,並返回DataFrame結果。

val sqlContext = ... // 已有一個 SQLContext 對象
val df = sqlContext.sql("SELECT * FROM table")

建立Dataset

Dataset API和RDD相似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通訊。若是這個編碼器和標準序列化都能把對象轉字節,那麼編碼器就能夠根據代碼動態生成,並使用一種特殊數據格式,這種格式下的對象不須要反序列化回來,就能容許Spark進行操做,如過濾、排序、哈希等。

// 對普通類型數據的Encoder是由 importing sqlContext.implicits._ 自動提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 如下這行不只定義了case class,同時也自動爲其建立了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一個和數據schema對應的class便可轉換爲 Dataset。Spark會根據字段名進行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

和RDD互操做

Spark SQL有兩種方法將RDD轉爲DataFrame。

1. 使用反射機制,推導包含指定類型對象RDD的schema。這種基於反射機制的方法使代碼更簡潔,並且若是你事先知道數據schema,推薦使用這種方式;

2. 編程方式構建一個schema,而後應用到指定RDD上。這種方式更囉嗦,但若是你事先不知道數據有哪些字段,或者數據schema是運行時讀取進來的,那麼你極可能須要用這種方式。

利用反射推導schema

Spark SQL的Scala接口支持自動將包含case class對象的RDD轉爲DataFrame。對應的case class定義了表的schema。case class的參數名經過反射,映射爲表的字段名。case class還能夠嵌套一些複雜類型,如Seq和Array。RDD隱式轉換成DataFrame後,能夠進一步註冊成表。隨後,你就能夠對錶中數據使用SQL語句查詢了。

// sc 是已有的 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 爲了支持RDD到DataFrame的隱式轉換
import sqlContext.implicits._

// 定義一個case class.
// 注意:Scala 2.10的case class最多支持22個字段,要繞過這一限制,
// 你可使用自定義class,並實現Product接口。固然,你也能夠改用編程方式定義schema
case class Person(name: String, age: Int)

// 建立一個包含Person對象的RDD,並將其註冊成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法能夠直接執行SQL語句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查詢的返回結果是一個DataFrame,且可以支持全部常見的RDD算子
// 查詢結果中每行的字段能夠按字段索引訪問:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名訪問:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 會一次性返回多列,並以Map[String, T]爲返回結果類型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回結果: Map("name" -> "Justin", "age" -> 19)

編程方式定義Schema

若是不能事先經過case class定義schema(例如,記錄的字段結構是保存在一個字符串,或者其餘文本數據集中,須要先解析,又或者字段對不一樣用戶有所不一樣),那麼你可能須要按如下三個步驟,以編程方式的建立一個DataFrame:

  1. 從已有的RDD建立一個包含Row對象的RDD
  2. 用StructType建立一個schema,和步驟1中建立的RDD的結構相匹配
  3. 把獲得的schema應用於包含Row對象的RDD,調用這個方法來實現這一步:SQLContext.createDataFrame

For example:

例如:

// sc 是已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 建立一個RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 數據的schema被編碼與一個字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各個數據類型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基於前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 將RDD[people]的各個記錄轉換爲Rows,即:獲得一個包含Row對象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 將schema應用到包含Row對象的RDD上,獲得一個DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 將DataFrame註冊爲table
peopleDataFrame.registerTempTable("people")

// 執行SQL語句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查詢的結果是DataFrame,且可以支持全部常見的RDD算子
// 而且其字段能夠以索引訪問,也能夠用字段名訪問
results.map(t => "Name: " + t(0)).collect().foreach(println)

數據源

Spark SQL支持基於DataFrame操做一系列不一樣的數據源。DataFrame既能夠當成一個普通RDD來操做,也能夠將其註冊成一個臨時表來查詢。把DataFrame註冊爲table以後,你就能夠基於這個table執行SQL語句了。本節將描述加載和保存數據的一些通用方法,包含了不一樣的Spark數據源,而後深刻介紹一下內建數據源可用選項。

通用加載/保存函數

在最簡單的狀況下,全部操做都會以默認類型數據源來加載數據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手動指定選項

你也能夠手動指定數據源,並設置一些額外的選項參數。數據源可由其全名指定(如,org.apache.spark.sql.parquet),而對於內建支持的數據源,可使用簡寫名(json, parquet, jdbc)。任意類型數據源建立的DataFrame均可以用下面這種語法轉成其餘類型數據格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接對文件使用SQL

Spark SQL還支持直接對文件使用SQL查詢,不須要用read方法把文件加載進來。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save操做有一個可選參數SaveMode,用這個參數能夠指定如何處理數據已經存在的狀況。很重要的一點是,這些保存模式都沒有加鎖,因此其操做也不是原子性的。另外,若是使用Overwrite模式,實際操做是,先刪除數據,再寫新數據。

僅Scala/Java 全部支持的語言 含義
SaveMode.ErrorIfExists (default) "error" (default) (默認模式)從DataFrame向數據源保存數據時,若是數據已經存在,則拋異常。
SaveMode.Append "append" 若是數據或表已經存在,則將DataFrame的數據追加到已有數據的尾部。
SaveMode.Overwrite "overwrite" 若是數據或表已經存在,則用DataFrame數據覆蓋之。
SaveMode.Ignore "ignore" 若是數據已經存在,那就放棄保存DataFrame數據。這和SQL裏CREATE TABLE IF NOT EXISTS有點相似。

保存到持久化表

在使用HiveContext的時候,DataFrame能夠用saveAsTable方法,將數據保存成持久化的表。與registerTempTable不一樣,saveAsTable會將DataFrame的實際數據內容保存下來,而且在HiveMetastore中建立一個遊標指針。持久化的表會一直保留,即便Spark程序重啓也沒有影響,只要你鏈接到同一個metastore就能夠讀取其數據。讀取持久化表時,只須要用用表名做爲參數,調用SQLContext.table方法便可獲得對應DataFrame。

默認狀況下,saveAsTable會建立一個」managed table「,也就是說這個表數據的位置是由metastore控制的。一樣,若是刪除表,其數據也會同步刪除。

Parquet文件

Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,並且Parquet文件可以自動保存原始數據的schema。寫Parquet文件的時候,全部的字段都會自動轉成nullable,以便向後兼容。

編程方式加載數據

仍然使用上面例子中的數據:

// 咱們繼續沿用以前例子中的sqlContext對象
// 爲了支持RDD隱式轉成DataFrame
import sqlContext.implicits._

val people: RDD[Person] = ... // 和上面例子中相同,一個包含case class對象的RDD

// 該RDD將隱式轉成DataFrame,而後保存爲parquet文件
people.write.parquet("people.parquet")

// 讀取上面保存的Parquet文件(多個文件 - Parquet保存完實際上是不少個文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加載Parquet文件,並返回DataFrame結果
val parquetFile = sqlContext.read.parquet("people.parquet")

// Parquet文件(多個)能夠註冊爲臨時表,而後在SQL語句中直接查詢
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分區發現

像Hive這樣的系統,一個很經常使用的優化手段就是表分區。在一個支持分區的表中,數據是保存在不一樣的目錄中的,而且將分區鍵以編碼方式保存在各個分區目錄路徑中。Parquet數據源如今也支持自動發現和推導分區信息。例如,咱們能夠把以前用的人口數據存到一個分區表中,其目錄結構以下所示,其中有2個額外的字段,gender和country,做爲分區鍵:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

在這個例子中,若是須要讀取Parquet文件數據,咱們只須要把 path/to/table 做爲參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL可以自動的從路徑中提取出分區信息,隨後返回的DataFrame的schema以下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分區鍵的數據類型將是自動推導出來的。目前,只支持數值類型和字符串類型數據做爲分區鍵。

有的用戶可能不想要自動推導出來的分區鍵數據類型。這種狀況下,你能夠經過 spark.sql.sources.partitionColumnTypeInference.enabled (默認是true)來禁用分區鍵類型推導。禁用以後,分區鍵老是被當成字符串類型。

從Spark-1.6.0開始,分區發現默認只在指定目錄的子目錄中進行。以上面的例子來講,若是用戶把 path/to/table/gender=male 做爲參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load,那麼gender就不會被做爲分區鍵。若是用戶想要指定分區發現的基礎目錄,能夠經過basePath選項指定。例如,若是把 path/to/table/gender=male做爲數據目錄,而且將basePath設爲 path/to/table,那麼gender仍然會最爲分區鍵。

Schema合併

像ProtoBuffer、Avro和Thrift同樣,Parquet也支持schema演變。用戶從一個簡單的schema開始,逐漸增長所需的新字段。這樣的話,用戶最終會獲得多個schema不一樣但互相兼容的Parquet文件。目前,Parquet數據源已經支持自動檢測這種狀況,併合並全部文件的schema。

由於schema合併相對代價比較大,而且在多數狀況下不是必要的,因此從Spark-1.5.0以後,默認是被禁用的。你能夠這樣啓用這一功能:

  1. 讀取Parquet文件時,將選項mergeSchema設爲true(見下面的示例代碼)
  2. 或者,將全局選項spark.sql.parquet.mergeSchema設爲true
// 繼續沿用以前的sqlContext對象
// 爲了支持RDD隱式轉換爲DataFrame
import sqlContext.implicits._

// 建立一個簡單的DataFrame,存到一個分區目錄中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// 建立另外一個DataFrame放到新的分區目錄中,
// 並增長一個新字段,丟棄一個老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// 讀取分區表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// 最終的schema將由3個字段組成(single,double,triple)
// 而且分區鍵出如今目錄路徑中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metastore Parquet table轉換

在讀寫Hive metastore Parquet 表時,Spark SQL用的是內部的Parquet支持庫,而不是Hive SerDe,由於這樣性能更好。這一行爲是由spark.sql.hive.convertMetastoreParquet 配置項來控制的,並且默認是啓用的。

Hive/Parquet schema調和

Hive和Parquet在表結構處理上主要有2個不一樣點:

  1. Hive大小寫敏感,而Parquet不是
  2. Hive全部字段都是nullable的,而Parquet須要顯示設置

因爲以上緣由,咱們必須在Hive metastore Parquet table轉Spark SQL Parquet table的時候,對Hive metastore schema作調整,調整規則以下:

  1. 兩種schema中字段名和字段類型必須一致(不考慮nullable)。調和後的字段類型必須在Parquet格式中有相對應的數據類型,因此nullable是也是須要考慮的。
  2. 調和後Spark SQL Parquet table schema將包含如下字段:
    • 只出如今Parquet schema中的字段將被丟棄
    • 只出如今Hive metastore schema中的字段將被添加進來,並顯式地設爲nullable。

刷新元數據

Spark SQL會緩存Parquet元數據以提升性能。若是Hive metastore Parquet table轉換被啓用的話,那麼轉換過來的schema也會被緩存。這時候,若是這些表由Hive或其餘外部工具更新了,你必須手動刷新元數據。

// 注意,這裏sqlContext是一個HiveContext
sqlContext.refreshTable("my_table")

配置

Parquet配置能夠經過 SQLContext.setConf 或者 SQL語句中 SET key=value來指定。

屬性名 默認值 含義
spark.sql.parquet.binaryAsString false 有些老系統,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不區分二進制數據和字符串類型數據。這個標誌的意思是,讓Spark SQL把二進制數據當字符串處理,以兼容老系統。
spark.sql.parquet.int96AsTimestamp true 有些老系統,如:特定版本的Impala,Hive,把時間戳存成INT96。這個配置的做用是,讓Spark SQL把這些INT96解釋爲timestamp,以兼容老系統。
spark.sql.parquet.cacheMetadata true 緩存Parquet schema元數據。能夠提高查詢靜態數據的速度。
spark.sql.parquet.compression.codec gzip 設置Parquet文件的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip(默認), lzo
spark.sql.parquet.filterPushdown true 啓用過濾器下推優化,能夠講過濾條件儘可能推導最下層,已取得性能提高
spark.sql.hive.convertMetastoreParquet true 若是禁用,Spark SQL將使用Hive SerDe,而不是內建的對Parquet tables的支持
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.
ParquetOutputCommitter
Parquet使用的數據輸出類。這個類必須是 org.apache.hadoop.mapreduce.OutputCommitter的子類。通常來講,它也應該是 org.apache.parquet.hadoop.ParquetOutputCommitter的子類。注意:1. 若是啓用spark.speculation, 這個選項將被自動忽略

 

2. 這個選項必須用hadoop configuration設置,而不是Spark SQLConf

3. 這個選項會覆蓋 spark.sql.sources.outputCommitterClass

Spark SQL有一個內建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 這個類的在輸出到S3的時候比默認的ParquetOutputCommitter類效率高。

spark.sql.parquet.mergeSchema false 若是設爲true,那麼Parquet數據源將會merge 全部數據文件的schema,不然,schema是從summary file獲取的(若是summary file沒有設置,則隨機選一個)

JSON數據集

Spark SQL在加載JSON數據的時候,能夠自動推導其schema並返回DataFrame。用SQLContext.read.json讀取一個包含String的RDD或者JSON文件,便可實現這一轉換。

注意,一般所說的json文件只是包含一些json數據的文件,而不是咱們所須要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。所以,一個常規的多行json文件常常會加載失敗。

// sc是已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 數據集是由路徑指定的
// 路徑既能夠是單個文件,也能夠仍是存儲文本文件的目錄
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推導出來的schema,可由printSchema打印出來
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 將DataFrame註冊爲table
people.registerTempTable("people")

// 跑SQL語句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另外一種方法是,用一個包含JSON字符串的RDD來建立DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

Spark SQL支持從Apache Hive讀寫數據。然而,Hive依賴項太多,因此沒有把Hive包含在默認的Spark發佈包裏。要支持Hive,須要在編譯spark的時候增長-Phive和-Phive-thriftserver標誌。這樣編譯打包的時候將會把Hive也包含進來。注意,hive的jar包也必須出如今全部的worker節點上,訪問Hive數據時候會用到(如:使用hive的序列化和反序列化SerDes時)。

Hive配置在conf/目錄下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。請注意,若是在YARN cluster(yarn-cluster mode)模式下執行一個查詢的話,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅動器(driver)和全部執行器(executor)均可用。一種簡便的方法是,經過spark-submit命令的–jars和–file選項來提交這些文件。

若是使用Hive,則必須構建一個HiveContext,HiveContext是派生於SQLContext的,添加了在Hive Metastore裏查詢表的支持,以及對HiveQL的支持。用戶沒有現有的Hive部署,也能夠建立一個HiveContext。若是沒有在hive-site.xml裏配置,那麼HiveContext將會自動在當前目錄下建立一個metastore_db目錄,再根據HiveConf設置建立一個warehouse目錄(默認/user/hive/warehourse)。因此請注意,你必須把/user/hive/warehouse的寫權限賦予啓動spark應用程序的用戶。

// sc是一個已有的SparkContext對象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 這裏用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

和不一樣版本的Hive Metastore交互

Spark SQL對Hive最重要的支持之一就是和Hive metastore進行交互,這使得Spark SQL能夠訪問Hive表的元數據。從Spark-1.4.0開始,Spark SQL有專門單獨的二進制build版本,能夠用來訪問不一樣版本的Hive metastore,其配置表以下。注意,無論所訪問的hive是什麼版本,Spark SQL內部都是以Hive 1.2.1編譯的,並且內部使用的Hive類也是基於這個版本(serdes,UDFs,UDAFs等)

如下選項可用來配置Hive版本以便訪問其元數據:

屬性名 默認值 含義
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本,可選的值爲0.12.0 到 1.2.1
spark.sql.hive.metastore.jars builtin 初始化HiveMetastoreClient的jar包。這個屬性能夠是如下三者之一:

 

  1. builtin

目前內建爲使用Hive-1.2.1,編譯的時候啓用-Phive,則會和spark一塊兒打包。若是沒有-Phive,那麼spark.sql.hive.metastore.version要麼是1.2.1,要就是未定義

  1. maven

使用maven倉庫下載的jar包版本。這個選項建議不要再生產環境中使用

  1. JVM格式的classpath。這個classpath必須包含全部Hive及其依賴的jar包,且包含正確版本的hadoop。這些jar包必須部署在driver節點上,若是你使用yarn-cluster模式,那麼必須確保這些jar包也隨你的應用程序一塊兒打包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
一個逗號分隔的類名前綴列表,這些類使用classloader加載,且能夠在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就須要這種共享。其餘須要共享的類,是與某些已經共享的類有交互的類。例如,自定義的log4j appender
spark.sql.hive.metastore.barrierPrefixes (empty) 一個逗號分隔的類名前綴列表,這些類在每一個Spark SQL所訪問的Hive版本中都會被顯式的reload。例如,某些在共享前綴列表(spark.sql.hive.metastore.sharedPrefixes)中聲明爲共享的Hive UD函數

用JDBC鏈接其餘數據庫

Spark SQL也能夠用JDBC訪問其餘數據庫。這一功能應該優先於使用JdbcRDD。由於它返回一個DataFrame,而DataFrame在Spark SQL中操做更簡單,且更容易和來自其餘數據源的數據進行交互關聯。JDBC數據源在java和python中用起來也很簡單,不須要用戶提供額外的ClassTag。(注意,這與Spark SQL JDBC server不一樣,Spark SQL JDBC server容許其餘應用執行Spark SQL查詢)

首先,你須要在spark classpath中包含對應數據庫的JDBC driver,下面這行包括了用於訪問postgres的數據庫driver

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

遠程數據庫的表能夠經過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。如下是選項列表:

屬性名 含義
url 須要鏈接的JDBC URL
dbtable 須要讀取的JDBC表。注意,任何能夠填在SQL的where子句中的東西,均可以填在這裏。(既能夠填完整的表名,也可填括號括起來的子查詢語句)
driver JDBC driver的類名。這個類必須在master和worker節點上均可用,這樣各個節點才能將driver註冊到JDBC的子系統中。
partitionColumn, lowerBound, upperBound, numPartitions 這幾個選項,若是指定其中一個,則必須所有指定。他們描述了多個worker如何並行的讀入數據,並將表分區。partitionColumn必須是所查詢的表中的一個數值字段。注意,lowerBound和upperBound只是用於決定分區跨度的,而不是過濾表中的行。所以,表中全部的行都會被分區而後返回。
fetchSize JDBC fetch size,決定每次獲取多少行數據。在JDBC驅動上設成較小的值有利於性能優化(如,Oracle上設爲10)
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

疑難解答

  • JDBC driver class必須在全部client session或者executor上,對java的原生classloader可見。這是由於Java的DriverManager在打開一個鏈接以前,會作安全檢查,並忽略全部對原聲classloader不可見的driver。最簡單的一種方法,就是在全部worker節點上修改compute_classpath.sh,幷包含你所需的driver jar包。
  • 一些數據庫,如H2,會把全部的名字轉大寫。對於這些數據庫,在Spark SQL中必須也使用大寫。

性能調整

對於有必定計算量的Spark做業來講,可能的性能改進的方式,不是把數據緩存在內存裏,就是調整一些開銷較大的選項參數。

內存緩存

Spark SQL能夠經過調用SQLContext.cacheTable(「tableName」)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減小內存佔用和GC壓力。你也能夠用SQLContext.uncacheTable(「tableName」)來刪除內存中的table。

你還可使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。

屬性名 默認值 含義
spark.sql.inMemoryColumnarStorage.compressed true 若是設置爲true,Spark SQL將會根據數據統計信息,自動爲每一列選擇單獨的壓縮編碼方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批量的大小。增大批量大小能夠提升內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。

其餘配置選項

如下選項一樣也能夠用來給查詢任務調性能。不過這些選項在將來可能被放棄,由於spark將支持愈來愈多的自動優化。

屬性名 默認值 含義
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操做時,可以做爲廣播變量的最大table的大小。設置爲-1,表示禁用廣播。注意,目前的元數據統計僅支持Hive metastore中的表,而且須要運行這個命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.tungsten.enabled true 設爲true,則啓用優化的Tungsten物理執行後端。Tungsten會顯式的管理內存,並動態生成表達式求值的字節碼
spark.sql.shuffle.partitions 200 配置數據混洗(shuffle)時(join或者聚合操做),使用的分區數。

分佈式SQL引擎

Spark SQL能夠做爲JDBC/ODBC或者命令行工具的分佈式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就能夠直接在Spark SQL中運行SQL查詢。

運行Thrift JDBC/ODBC server

這裏實現的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2是相同的。你可使用beeline腳原本測試Spark或者Hive-1.2.1的JDBC server。

在Spark目錄下運行下面這個命令,啓動一個JDBC/ODBC server

./sbin/start-thriftserver.sh

這個腳本能接受全部 bin/spark-submit 命令支持的選項參數,外加一個 –hiveconf 選項,來指定Hive屬性。運行./sbin/start-thriftserver.sh –help能夠查看完整的選項列表。默認狀況下,啓動的server將會在localhost:10000端口上監聽。要改變監聽主機名或端口,能夠用如下環境變量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者Hive系統屬性 來指定

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

接下來,你就能夠開始在beeline中測試這個Thrift JDBC/ODBC server:

./bin/beeline

下面的指令,能夠鏈接到一個JDBC/ODBC server

beeline> !connect jdbc:hive2://localhost:10000

可能須要輸入用戶名和密碼。在非安全模式下,只要輸入你本機的用戶名和一個空密碼便可。對於安全模式,請參考beeline documentation.

Hive的配置是在conf/目錄下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。

你也能夠在beeline的腳本中指定。

Thrift JDBC server也支持經過HTTP傳輸Thrift RPC消息。如下配置(在conf/hive-site.xml中)將啓用HTTP模式:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

一樣,在beeline中也能夠用HTTP模式鏈接JDBC/ODBC server:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

使用Spark SQL命令行工具

Spark SQL CLI是一個很方便的工具,它能夠用local mode運行hive metastore service,而且在命令行中執行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通訊。

用以下命令,在spark目錄下啓動一個Spark SQL CLI

./bin/spark-sql

Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你能夠用這個命令查看完整的選項列表:./bin/spark-sql –help

升級指南

1.5升級到1.6

  • 從Spark-1.6.0起,默認Thrift server 將運行於多會話並存模式下(multi-session)。這意味着,每一個JDBC/ODBC鏈接有其獨立的SQL配置和臨時函數註冊表。table的緩存仍然是公用的。若是你更喜歡老的單會話模式,只需設置spark.sql.hive.thriftServer.singleSession爲true便可。固然,你也可在spark-defaults.conf中設置,或者將其值傳給start-thriftserver.sh –conf(以下):
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

1.4升級到1.5

  • Tungsten引擎如今默認是啓用的,Tungsten是經過手動管理內存優化執行計劃,同時也優化了表達式求值的代碼生成。這兩個特性均可以經過把spark.sql.tungsten.enabled設爲false來禁用。
  • Parquet schema merging默認不啓用。須要啓用的話,設置spark.sql.parquet.mergeSchema爲true便可
  • Python接口支持用點(.)來訪問字段內嵌值,例如df[‘table.column.nestedField’]。但這也意味着,若是你的字段名包含點號(.)的話,你就必須用重音符來轉義,如:table.`column.with.dots`.nested。
  • 列式存儲內存分區剪枝默認是啓用的。要禁用,設置spark.sql.inMemoryColumarStorage.partitionPruning爲false便可
  • 再也不支持無精度限制的decimal。Spark SQL如今強制最大精度爲38位。對於BigDecimal對象,類型推導將會使用(38,18)精度的decimal類型。若是DDL中沒有指明精度,默認使用的精度是(10,0)
  • 時間戳精確到1us(微秒),而不是1ns(納秒)
  • 在「sql」這個SQL變種設置中,浮點數將被解析爲decimal。HiveQL解析保持不變。
  • 標準SQL/DataFrame函數均爲小寫,例如:sum vs SUM。
  • 當推測任務被啓用是,使用DirectOutputCommitter是不安全的,所以,DirectOutputCommitter在推測任務啓用時,將被自動禁用,且忽略相關配置。
  • JSON數據源再也不自動加載其餘程序產生的新文件(例如,不是Spark SQL插入到dataset中的文件)。對於一個JSON的持久化表(如:Hive metastore中保存的表),用戶可使用REFRESH TABLE這個SQL命令或者HiveContext.refreshTable來把新文件包括進來。

1.3升級到1.4

DataFrame數據讀寫接口

根據用戶的反饋,咱們提供了一個新的,更加流暢的API,用於數據讀(SQLContext.read)寫(DataFrame.write),同時老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)將被廢棄。

有關SQLContext.read和DataFrame.write的更詳細信息,請參考API文檔。

DataFrame.groupBy保留分組字段

根據用戶的反饋,咱們改變了DataFrame.groupBy().agg()的默認行爲,在返回的DataFrame結果中保留了分組字段。若是你想保持1.3中的行爲,設置spark.sql.retainGroupColumns爲false便可。

// 在1.3.x中,若是要保留分組字段"department", 你必須顯式的在agg聚合時包含這個字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 而在1.4+,分組字段"department"默認就會包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滾到1.3的行爲(不包含分組字段),按以下設置便可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

1.2升級到1.3

在Spark 1.3中,咱們去掉了Spark SQL的」Alpha「標籤,並清理了可用的API。從Spark 1.3起,Spark SQL將對1.x系列二進制兼容。這個兼容性保證不包括顯式的標註爲」unstable(如:DeveloperAPI或Experimental)「的API。

SchemaRDD重命名爲DataFrame

對於用戶來講,Spark SQL 1.3最大的改動就是SchemaRDD更名爲DataFrame。主要緣由是,DataFrame再也不直接由RDD派生,而是經過本身的實現提供RDD的功能。DataFrame只須要調用其rdd方法就能轉成RDD。

在Scala中仍然有SchemaRDD,只不過這是DataFrame的一個別名,以便兼容一些現有代碼。但仍然建議用戶改用DataFrame。Java和Python用戶就沒這個福利了,他們必須改代碼。

統一Java和Scala API

在Spark 1.3以前,有單獨的java兼容類(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的鏡像。Spark 1.3中將Java API和Scala API統一。兩種語言的用戶都應該使用SQLContext和DataFrame。通常這些類中都會使用兩種語言中都有的類型(如:Array取代各語言獨有的集合)。有些狀況下,沒有通用的類型(例如:閉包或者maps),將會使用函數重載來解決這個問題。

另外,java特有的類型API被刪除了。Scala和java用戶都應該用org.apache.spark.sql.types來編程描述一個schema。

隱式轉換隔離,DSL包移除 – 僅針對scala

Spark 1.3以前的不少示例代碼,都在開頭用 import sqlContext._,這行將會致使全部的sqlContext的函數都被引入進來。所以,在Spark 1.3咱們把RDDs到DataFrames的隱式轉換隔離出來,單獨放到SQLContext.implicits對象中。用戶如今應該這樣寫:import sqlContext.implicits._

另外,隱式轉換也支持由Product(如:case classes或tuples)組成的RDD,但須要調用一個toDF方法,而不是自動轉換。

若是須要使用DSL(被DataFrame取代的API)中的方法,用戶以前須要導入DSL(import org.apache.spark.sql.catalyst.dsl), 而如今應該要導入 DataFrame API(import org.apache.spark.sql.functions._)

移除org.apache.spark.sql中DataType別名 – 僅針對scala

Spark 1.3刪除了sql包中的DataType類型別名。如今,用戶應該使用 org.apache.spark.sql.types中的類。

UDF註冊挪到sqlContext.udf中 – 針對java和scala

註冊UDF的函數,無論是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF註冊保持不變。

Python DataTypes再也不是單例

在python中使用DataTypes,你須要先構造一個對象(如:StringType()),而不是引用一個單例。

Shark用戶遷移指南

調度

用戶能夠經過以下命令,爲JDBC客戶端session設定一個Fair Scheduler pool。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer個數

在Shark中,默認的reducer個數是1,而且由mapred.reduce.tasks設定。Spark SQL廢棄了這個屬性,改成 spark.sql.shuffle.partitions, 而且默認200,用戶可經過以下SET命令來自定義:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

你也能夠把這個屬性放到hive-site.xml中來覆蓋默認值。

目前,mapred.reduce.tasks屬性仍然能被識別,而且自動轉成spark.sql.shuffle.partitions

緩存

shark.cache表屬性已經不存在了,而且以」_cached」結尾命名的表也再也不會自動緩存。取而代之的是,CACHE TABLE和UNCACHE TABLE語句,用以顯式的控制表的緩存:

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意:CACHE TABLE tbl 如今默認是飢餓模式,而非懶惰模式。不再須要手動調用其餘action來觸發cache了!

從Spark-1.2.0開始,Spark SQL新提供了一個語句,讓用戶本身控制表緩存是不是懶惰模式

CACHE [LAZY] TABLE [AS SELECT] ...

如下幾個緩存相關的特性再也不支持:

  • 用戶定義分區級別的緩存逐出策略
  • RDD 重加載
  • 內存緩存直接寫入策略

兼容Apache Hive

Spark SQL設計時考慮了和Hive metastore,SerDes以及UDF的兼容性。目前這些兼容性鬥是基於Hive-1.2.1版本,而且Spark SQL能夠連到不一樣版本的Hive metastore(從0.12.0到1.2.1,參考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore

部署在已有的Hive倉庫之上

Spark SQL Thrift JDBC server採用了」out of the box」(開箱即用)的設計,使用很方便,併兼容已有的Hive安裝版本。你不須要修改已有的Hive metastore或者改變數據的位置,或者表分區。

支持的Hive功能

Spark SQL 支持絕大部分Hive功能,如:

  • Hive查詢語句:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 全部的Hive操做符:
    • Relational operators (===<><>>=<=, etc)
    • Arithmetic operators (+-*/%, etc)
    • Logical operators (AND&&OR||, etc)
    • Complex type constructors
    • Mathematical functions (signlncos, etc)
    • String functions (instrlengthprintf, etc)
  • 用戶定義函數(UDF)
  • 用戶定義聚合函數(UDAF)
  • 用戶定義序列化、反序列化(SerDes)
  • 窗口函數(Window functions)
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 查詢子句
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 採樣
  • 執行計劃詳細(Explain)
  • 分區表,包括動態分區插入
  • 視圖
  • 全部Hive DDL(data definition language):
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 絕大部分Hive數據類型:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

不支持的Hive功能

如下是目前不支持的Hive特性的列表。多數是不經常使用的。

不支持的Hive常見功能

  • bucket表:butcket是Hive表的一個哈希分區

不支持的Hive高級功能

  • UNION類操做
  • 去重join
  • 字段統計信息收集:Spark SQL不支持同步的字段統計收集

Hive輸入、輸出格式

  • CLI文件格式:對於須要回顯到CLI中的結果,Spark SQL僅支持TextOutputFormat。
  • Hadoop archive — Hadoop歸檔

Hive優化

一些比較棘手的Hive優化目前尚未在Spark中提供。有一些(如索引)對應Spark SQL這種內存計算模型來講並不重要。另一些,在Spark SQL將來的版本中會支持。

  • 塊級別位圖索引和虛擬字段(用來建索引)
  • 自動計算reducer個數(join和groupBy算子):目前在Spark SQL中你須要這樣控制混洗後(post-shuffle)併發程度:」SET spark.sql.shuffle.partitions=[num_tasks];」
  • 元數據查詢:只查詢元數據的請求,Spark SQL仍須要啓動任務來計算結果
  • 數據傾斜標誌:Spark SQL不會理會Hive中的數據傾斜標誌
  • STREAMTABLE join提示:Spark SQL裏沒有這玩藝兒
  • 返回結果時合併小文件:若是返回的結果有不少小文件,Hive有個選項設置,來合併小文件,以免超過HDFS的文件數額度限制。Spark SQL不支持這個。

參考

數據類型

Spark SQL和DataFrames支持以下數據類型:

  • Numeric types(數值類型)
    • ByteType: 1字節長的有符號整型,範圍:-128 到 127.
    • ShortType: 2字節長有符號整型,範圍:-32768 到 32767.
    • IntegerType: 4字節有符號整型,範圍:-2147483648 到 2147483647.
    • LongType: 8字節有符號整型,範圍: -9223372036854775808 to 9223372036854775807.
    • FloatType: 4字節單精度浮點數。
    • DoubleType: 8字節雙精度浮點數
    • DecimalType: 任意精度有符號帶小數的數值。內部使用java.math.BigDecimal, BigDecimal包含任意精度的不縮放整型,和一個32位的縮放整型
  • String type(字符串類型)
    • StringType: 字符串
  • Binary type(二進制類型)
    • BinaryType: 字節序列
  • Boolean type(布爾類型)
    • BooleanType: 布爾類型
  • Datetime type(日期類型)
    • TimestampType: 表示包含年月日、時分秒等字段的日期
    • DateType: 表示包含年月日字段的日期
  • Complex types(複雜類型)
    • ArrayType(elementType, containsNull):數組類型,表達一系列的elementType類型的元素組成的序列,containsNull表示數組可否包含null值
    • MapType(keyType, valueType, valueContainsNull):映射集合類型,表示一個鍵值對的集合。鍵的類型是keyType,值的類型則由valueType指定。對應MapType來講,鍵是不能爲null的,而值可否爲null則取決於valueContainsNull。
    • StructType(fields):表示包含StructField序列的結構體。
      • StructField(name, datatype, nullable): 表示StructType中的一個字段,name是字段名,datatype是數據類型,nullable表示該字段是否能夠爲空

全部Spark SQL支持的數據類型都在這個包裏:org.apache.spark.sql.types,你能夠這樣導入之:

import  org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:默認containsNull爲true
MapType scala.collection.Map MapType(keyTypevalueType, [valueContainsNull])注意:默認valueContainsNull爲true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一個StructFields的序列,而且同名的字段是不容許的。
StructField 定義字段的數據對應的Scala類型(例如,若是StructField的dataType爲IntegerType,則其數據對應的scala類型爲Int) StructField(namedataTypenullable)

NaN語義

這是Not-a-Number的縮寫,某些float或double類型不符合標準浮點數語義,須要對其特殊處理:

  • NaN == NaN,即:NaN和NaN老是相等
  • 在聚合函數中,全部NaN分到同一組
  • NaN在join操做中能夠當作一個普通的join key
  • NaN在升序排序中排到最後,比任何其餘數值都大

原創文章,轉載請註明: 轉載自併發編程網 – ifeve.com本文連接地址: 《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南

Favorite添加本文到個人收藏

Related Posts:

  1. 《Spark 官方文檔》Spark快速入門
  2. 《Spark 官方文檔》
  3. 《Spark 官方文檔》Spark安全性
  4. 《Spark 官方文檔》硬件配置
  5. 《Spark 官方文檔》監控和工具
  6. 《Spark 官方文檔》Spark做業調度
  7. 《Spark 官方文檔》在Mesos上運行Spark
  8. 《Spark 官方文檔》Spark編程指南
  9. 《Spark 官方文檔》Spark獨立模式
  10. 《Spark 官方文檔》在YARN上運行Spark
  11. 《Spark 官方文檔》Spark配置
  12. 《Spark 官方文檔》在Amazon EC2上運行Spark
  13. 《Spark 官方文檔》Spark調優
  14. 《Spark官方文檔》Spark Streaming編程指南
  15. Apache Storm 官方文檔 —— 使用非 JVM 語言開發
相關文章
相關標籤/搜索