Spark SQL, DataFrames and Datasets Guide
html
Overviewjava
SQLpython
開始入門git
起始點: SparkSessiongithub
無類型的Dataset操做 (aka DataFrame 操做)shell
Running SQL Queries Programmatically數據庫
全局臨時視圖express
Untyped User-Defined Aggregate Functions
Type-Safe User-Defined Aggregate Functions
Generic Load/Save Functions (通用 加載/保存 功能)
Manually Specifying Options (手動指定選項)
Run SQL on files directly (直接在文件上運行 SQL)
Saving to Persistent Tables (保存到持久表)
Bucketing, Sorting and Partitioning (分桶, 排序和分區)
Loading Data Programmatically (以編程的方式加載數據)
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)
Hive/Parquet Schema Reconciliation
DataFrame data reader/writer interface
DataFrame.groupBy 保留 grouping columns(分組的列)
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)
UDF 註冊遷移到sqlContext.udf中 (Java & Scala)
Python DataTypes 再也不是 Singletons(單例的)
Overview
Spark SQL 是 Spark 處理結構化數據的一個模塊.與基礎的 Spark RDD API 不一樣, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式能夠跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 不管使用哪一種 API / 語言均可以快速的計算.這種統一意味着開發人員可以在基於提供最天然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不一樣的 .
該頁面全部例子使用的示例數據都包含在 Spark 的發佈中, 而且可使用spark-shell,pysparkshell, 或者sparkRshell來運行.
SQL
Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也可以被用於從已存在的 Hive 環境中讀取數據.更多關於如何配置這個特性的信息, 請參考Hive 表這部分. 當以另外的編程語言運行SQL 時, 查詢結果將以Dataset/DataFrame的形式返回.您也可使用命令行或者經過JDBC/ODBC與 SQL 接口交互.
Datasets and DataFrames
一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優勢(強類型化, 可以使用強大的 lambda 函數)與Spark SQL執行引擎的優勢.一個 Dataset 能夠從 JVM 對象來構造而且使用轉換功能(map, flatMap, filter, 等等). Dataset API 在Scala和Java是可用的.Python 不支持 Dataset API.可是因爲 Python 的動態特性, 許多 Dataset API 的優勢已經可用了 (也就是說, 你可能經過 name 天生的row.columnName屬性訪問一行中的字段).這種狀況和 R 類似.
一個 DataFrame 是一個Dataset組成的指定列.它的概念與一個在關係型數據庫或者在 R/Python 中的表是相等的, 可是有不少優化. DataFrames 能夠從大量的sources中構造出來, 好比: 結構化的文本文件, Hive中的表, 外部數據庫, 或者已經存在的 RDDs. DataFrame API 能夠在 Scala, Java,Python, 和R中實現. 在 Scala 和 Java中, 一個 DataFrame 所表明的是一個多個Row(行)的的 Dataset(數據集合). 在the Scala API中,DataFrame僅僅是一個Dataset[Row]類型的別名. 然而, 在Java API中, 用戶須要去使用Dataset去表明一個DataFrame.
在此文檔中, 咱們將經常會引用 Scala/Java Datasets 的Rows 做爲 DataFrames.
開始入門
起始點: SparkSession
Spark SQL中全部功能的入口點是SparkSession類. 要建立一個SparkSession, 僅使用SparkSession.builder()就能夠了:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFramesimportspark.implicits._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark 2.0 中的SparkSession爲 Hive 特性提供了內嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數據的能力.爲了使用這些特性, 你不須要去有一個已存在的 Hive 設置.
建立 DataFrames
在一個SparkSession中, 應用程序能夠從一個已經存在的RDD, 從hive表, 或者從Spark數據源中建立一個DataFrames.
舉個例子, 下面就是基於一個JSON文件建立一個DataFrame:
valdf=spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
無類型的Dataset操做 (aka DataFrame 操做)
DataFrames 提供了一個特定的語法用在Scala,Java,PythonandR中機構化數據的操做.
正如上面提到的同樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個Rows的Dataset. 這些操做也參考了與強類型的Scala/Java Datasets中的」類型轉換」 對應的」無類型轉換」 .
這裏包括一些使用 Dataset 進行結構化數據處理的示例 :
// This import is needed to use the $-notationimportspark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name",$"age"+1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter($"age">21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
可以在 DataFrame 上被執行的操做類型的完整列表請參考API 文檔.
除了簡單的列引用和表達式以外, DataFrame 也有豐富的函數庫, 包括 string 操做, date 算術, 常見的 math 操做以及更多.可用的完整列表請參考DataFrame 函數指南.
Running SQL Queries Programmatically
SparkSession的sql函數可讓應用程序以編程的方式運行 SQL 查詢, 並將結果做爲一個DataFrame返回.
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")valsqlDF=spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
全局臨時視圖
Spark SQL中的臨時視圖是session級別的, 也就是會隨着session的消失而消失. 若是你想讓一個臨時視圖在全部session中相互傳遞而且可用, 直到Spark 應用退出, 你能夠創建一個全局的臨時視圖.全局的臨時視圖存在於系統數據庫global_temp中, 咱們必須加上庫名去引用它, 好比.SELECT * FROM global_temp.view1.
// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
建立Datasets
Dataset 與 RDD 類似, 然而, 並非使用 Java 序列化或者 Kryo編碼器來序列化用於處理或者經過網絡進行傳輸的對象. 雖然編碼器和標準的序列化都負責將一個對象序列化成字節, 編碼器是動態生成的代碼, 而且使用了一種容許 Spark 去執行許多像 filtering, sorting 以及 hashing 這樣的操做, 不須要將字節反序列化成對象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name:String,age:Long)// Encoders are created for case classesvalcaseClassDS=Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._valprimitiveDS=Seq(1,2,3).toDS()primitiveDS.map(_+1).collect()// Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by namevalpath="examples/src/main/resources/people.json"valpeopleDS=spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
RDD的互操做性
Spark SQL 支持兩種不一樣的方法用於轉換已存在的 RDD 成爲 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基於方法的反射可讓你的代碼更簡潔.
第二種用於建立 Dataset 的方法是經過一個容許你構造一個 Schema 而後把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它容許你去構造 Dataset.
使用反射推斷Schema
Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 爲 DataFrame.Case class 定義了表的 Schema.Case class 的參數名使用反射讀取而且成爲了列名.Case class 也能夠是嵌套的或者包含像Seq或者Array這樣的複雜類型.這個 RDD 可以被隱式轉換成一個 DataFrame 而後被註冊爲一個表.表能夠用於後續的 SQL 語句.
// For implicit conversions from RDDs to DataFramesimportspark.implicits._// Create an RDD of Person objects from a text file, convert it to a DataframevalpeopleDF=spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by SparkvalteenagersDF=spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager=>"Name: "+teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager=>"Name: "+teenager.getAs[String]("name")).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager=>teenager.getValuesMap[Any](List("name","age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
以編程的方式指定Schema
當 case class 不可以在執行以前被定義(例如, records 記錄的結構在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析而且不一樣的用戶投影的字段是不同的).一個DataFrame可使用下面的三步以編程的方式來建立.
從原始的 RDD 建立 RDD 的Row(行);
Step 1 被建立後, 建立 Schema 表示一個StructType匹配 RDD 中的Row(行)的結構.
經過SparkSession提供的createDataFrame方法應用 Schema 到 RDD 的 RowS(行).
例如:
importorg.apache.spark.sql.types._// Create an RDDvalpeopleRDD=spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringvalschemaString="name age"// Generate the schema based on the string of schemavalfields=schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))valschema=StructType(fields)// Convert records of the RDD (people) to RowsvalrowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0),attributes(1).trim))// Apply the schema to the RDDvalpeopleDF=spark.createDataFrame(rowRDD,schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesvalresults=spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes=>"Name: "+attributes(0)).show()// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Aggregations
Thebuilt-in DataFrames functionsprovide common aggregations such ascount(),countDistinct(),avg(),max(),min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them inScalaandJavato work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend theUserDefinedAggregateFunctionabstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.expressions.UserDefinedAggregateFunctionimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionobjectMyAverageextendsUserDefinedAggregateFunction{// Data types of input arguments of this aggregate functiondefinputSchema:StructType=StructType(StructField("inputColumn",LongType)::Nil)// Data types of values in the aggregation bufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)}// The data type of the returned valuedefdataType:DataType=DoubleType// Whether this function always returns the same output on the identical inputdefdeterministic:Boolean=true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.definitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0Lbuffer(1)=0L}// Updates the given aggregation buffer `buffer` with new input data from `input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0)){buffer(0)=buffer.getLong(0)+input.getLong(0)buffer(1)=buffer.getLong(1)+1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`defmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}// Calculates the final resultdefevaluate(buffer:Row):Double=buffer.getLong(0).toDouble/buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage",MyAverage)valdf=spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+valresult=spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around theAggregatorabstract class. For example, a type-safe user-defined average can look like:
importorg.apache.spark.sql.expressions.Aggregatorimportorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encodersimportorg.apache.spark.sql.SparkSessioncaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]{// A zero value for this aggregation. Should satisfy the property that any b + zero = bdefzero:Average=Average(0L,0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdefreduce(buffer:Average,employee:Employee):Average={buffer.sum+=employee.salarybuffer.count+=1buffer}// Merge two intermediate valuesdefmerge(b1:Average,b2:Average):Average={b1.sum+=b2.sumb1.count+=b2.countb1}// Transform the output of the reductiondeffinish(reduction:Average):Double=reduction.sum.toDouble/reduction.count// Specifies the Encoder for the intermediate value typedefbufferEncoder:Encoder[Average]=Encoders.product// Specifies the Encoder for the final output value typedefoutputEncoder:Encoder[Double]=Encoders.scalaDouble}valds=spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+// Convert the function to a `TypedColumn` and give it a namevalaverageSalary=MyAverage.toColumn.name("average_salary")valresult=ds.select(averageSalary)result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.
Data Sources (數據源)
Spark SQL 支持經過 DataFrame 接口對各類 data sources (數據源)進行操做. DataFrame 可使用 relational transformations (關係轉換)操做, 也可用於建立 temporary view (臨時視圖). 將 DataFrame 註冊爲 temporary view (臨時視圖)容許您對其數據運行 SQL 查詢. 本節 描述了使用 Spark Data Sources 加載和保存數據的通常方法, 而後涉及可用於 built-in data sources (內置數據源)的 specific options (特定選項).
Generic Load/Save Functions (通用 加載/保存 功能)
在最簡單的形式中, 默認數據源(parquet, 除非另有配置spark.sql.sources.default)將用於全部操做.
valusersDF=spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Manually Specifying Options (手動指定選項)
您還能夠 manually specify (手動指定)將與任何你想傳遞給 data source 的其餘選項一塊兒使用的 data source . Data sources 由其 fully qualified name (徹底限定名稱)(即org.apache.spark.sql.parquet), 可是對於 built-in sources (內置的源), 你也可使用它們的 shortnames (短名稱)(json,parquet,jdbc,orc,libsvm,csv,text).從任何 data source type (數據源類型)加載 DataFrames 可使用此 syntax (語法)轉換爲其餘類型.
valpeopleDF=spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Run SQL on files directly (直接在文件上運行 SQL)
不使用讀取 API 將文件加載到 DataFrame 並進行查詢, 也能夠直接用 SQL 查詢該文件.
valsqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Save Modes (保存模式)
Save operations (保存操做)能夠選擇使用SaveMode, 它指定如何處理現有數據若是存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)而且不是 atomic (原子). 另外, 當執行Overwrite時, 數據將在新數據寫出以前被刪除.
Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則會拋出異常.
SaveMode.Append"append"將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已存在, 則 DataFrame 的內容將被 append (附加)到現有數據中.
SaveMode.Overwrite"overwrite"Overwrite mode (覆蓋模式)意味着將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已經存在, 則預期 DataFrame 的內容將 overwritten (覆蓋)現有數據.
SaveMode.Ignore"ignore"Ignore mode (忽略模式)意味着當將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則保存操做預期不會保存 DataFrame 的內容, 而且不更改現有數據. 這與 SQL 中的CREATE TABLE IF NOT EXISTS相似.
Saving to Persistent Tables (保存到持久表)
DataFrames也可使用saveAsTable命令做爲 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現有的 Hive 部署)不須要使用此功能. Spark 將爲您建立默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與createOrReplaceTempView命令不一樣,saveAsTable將 materialize (實現) DataFrame 的內容, 並建立一個指向 Hive metastore 中數據的指針. 即便您的 Spark 程序從新啓動, Persistent tables (持久性表)仍然存在, 由於您保持與同一個 metastore 的鏈接. 能夠經過使用表的名稱在SparkSession上調用table方法來建立 persistent tabl (持久表)的 DataFrame .
對於 file-based (基於文件)的 data source (數據源), 例如 text, parquet, json等, 您能夠經過path選項指定 custom table path (自定義表路徑), 例如df.write.option("path", "/some/path").saveAsTable("t"). 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 而且表數據仍然存在. 若是未指定自定義表路徑, Spark 將把數據寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.
從 Spark 2.1 開始, persistent datasource tables (持久性數據源表)將 per-partition metadata (每一個分區元數據)存儲在 Hive metastore 中. 這帶來了幾個好處:
因爲 metastore 只能返回查詢的必要 partitions (分區), 所以再也不須要將第一個查詢上的全部 partitions discovering 到表中.
Hive DDLs 如ALTER TABLE PARTITION ... SET LOCATION如今可用於使用 Datasource API 建立的表.
請注意, 建立 external datasource tables (外部數據源表)(帶有path選項)的表時, 默認狀況下不會收集 partition information (分區信息). 要 sync (同步) metastore 中的分區信息, 能夠調用MSCK REPAIR TABLE.
Bucketing, Sorting and Partitioning (分桶, 排序和分區)
對於 file-based data source (基於文件的數據源), 也能夠對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用於 persistent tables :
peopleDF.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
在使用 Dataset API 時, partitioning 能夠同時與save和saveAsTable一塊兒使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
能夠爲 single table (單個表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
partitionBy建立一個 directory structure (目錄結構), 如Partition Discovery部分所述. 所以, 對 cardinality (基數)較高的 columns 的適用性有限. 相反,bucketBy能夠在固定數量的 buckets 中分配數據, 而且能夠在 a number of unique values is unbounded (多個惟一值無界時)使用數據.
Parquet Files
Parquet是許多其餘數據處理系統支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數據的模式). 當編寫 Parquet 文件時, 出於兼容性緣由, 全部 columns 都將自動轉換爲可空.
Loading Data Programmatically (以編程的方式加載數據)
使用上面例子中的數據:
// Encoders for most common types are automatically provided by importing spark.implicits._importspark.implicits._valpeopleDF=spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFramevalparquetFileDF=spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")valnamesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes=>"Name: "+attributes(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Partition Discovery (分區發現)
Table partitioning (表分區)是在像 Hive 這樣的系統中使用的常見的優化方法. 在 partitioned table (分區表)中, 數據一般存儲在不一樣的目錄中, partitioning column values encoded (分區列值編碼)在每一個 partition directory (分區目錄)的路徑中. Parquet data source (Parquet 數據源)如今能夠自動 discover (發現)和 infer (推斷)分區信息. 例如, 咱們可使用如下 directory structure (目錄結構)將全部之前使用的 population data (人口數據)存儲到 partitioned table (分區表)中, 其中有兩個額外的列gender和country做爲 partitioning columns (分區列):
path└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
經過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load, Spark SQL 將自動從路徑中提取 partitioning information (分區信息). 如今返回的 DataFrame 的 schema (模式)變成:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
請注意, 會自動 inferred (推斷) partitioning columns (分區列)的 data types (數據類型).目前, 支持 numeric data types (數字數據類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區列)的數據類型.對於這些用例, automatic type inference (自動類型推斷)能夠由spark.sql.sources.partitionColumnTypeInference.enabled配置, 默認爲true.當禁用 type inference (類型推斷)時, string type (字符串類型)將用於 partitioning columns (分區列).
從 Spark 1.6.0 開始, 默認狀況下, partition discovery (分區發現)只能找到給定路徑下的 partitions (分區).對於上述示例, 若是用戶將path/to/table/gender=male傳遞給SparkSession.read.parquet或SparkSession.read.load, 則gender將不被視爲 partitioning column (分區列).若是用戶須要指定 partition discovery (分區發現)應該開始的基本路徑, 則能夠在數據源選項中設置basePath.例如, 當path/to/table/gender=male是數據的路徑而且用戶將basePath設置爲path/to/table/,gender將是一個 partitioning column (分區列).
Schema Merging (模式合併)
像 ProtocolBuffer , Avro 和 Thrift 同樣, Parquet 也支持 schema evolution (模式演進). 用戶能夠從一個 simple schema (簡單的架構)開始, 並根據須要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不一樣但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數據源)如今可以自動檢測這種狀況並 merge (合併)全部這些文件的 schemas .
因爲 schema merging (模式合併)是一個 expensive operation (相對昂貴的操做), 而且在大多數狀況下不是必需的, 因此默認狀況下從 1.5.0 開始. 你能夠按照以下的方式啓用它:
讀取 Parquet 文件時, 將 data source option (數據源選項)mergeSchema設置爲true(以下面的例子所示), 或
將 global SQL option (全局 SQL 選項)spark.sql.parquet.mergeSchema設置爲true.
// This is used to implicitly convert an RDD to a DataFrame.importspark.implicits._// Create a simple DataFrame, store into a partition directoryvalsquaresDF=spark.sparkContext.makeRDD(1to5).map(i=>(i,i*i)).toDF("value","square")squaresDF.write.parquet("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")cubesDF.write.parquet("data/test_table/key=2")// Read the partitioned tablevalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")mergedDF.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root// |-- value: int (nullable = true)// |-- square: int (nullable = true)// |-- cube: int (nullable = true)// |-- key: int (nullable = true)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)
當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來得到更好的性能. 此 behavior (行爲)由spark.sql.hive.convertMetastoreParquet配置控制, 默認狀況下 turned on (打開).
Hive/Parquet Schema Reconciliation
從 table schema processing (表格模式處理)的角度來講, Hive 和 Parquet 之間有兩個關鍵的區別.
Hive 不區分大小寫, 而 Parquet 不是
Hive 認爲全部 columns (列)均可覺得空, 而 Parquet 中的可空性是 significant (重要)的.
因爲這個緣由, 當將 Hive metastore Parquet 錶轉換爲 Spark SQL Parquet 表時, 咱們必須調整 metastore schema 與 Parquet schema. reconciliation 規則是:
在兩個 schema 中具備 same name (相同名稱)的 Fields (字段)必須具備 same data type (相同的數據類型), 而無論 nullability (可空性). reconciled field 應具備 Parquet 的數據類型, 以便 nullability (可空性)獲得尊重.
reconciled schema (調和模式)正好包含 Hive metastore schema 中定義的那些字段.
只出如今 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中做爲 nullable field (可空字段)添加.
Metadata Refreshing (元數據刷新)
Spark SQL 緩存 Parquet metadata 以得到更好的性能. 當啓用 Hive metastore Parquet table conversion (轉換)時, 這些 converted tables (轉換表)的 metadata (元數據)也被 cached (緩存). 若是這些表由 Hive 或其餘外部工具更新, 則須要手動刷新以確保 consistent metadata (一致的元數據).
// spark is an existing SparkSessionspark.catalog.refreshTable("my_table")
Configuration (配置)
可使用SparkSession上的setConf方法或使用 SQL 運行SET key = value命令來完成 Parquet 的配置.
Property Name (參數名稱)Default(默認)Meaning(含義)
spark.sql.parquet.binaryAsStringfalse一些其餘 Parquet-producing systems (Parquet 生產系統), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區分 binary data (二進制數據)和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數據)解釋爲 string (字符串)以提供與這些系統的兼容性.
spark.sql.parquet.int96AsTimestamptrue一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數據解析爲 timestamp 以提供與這些系統的兼容性.
spark.sql.parquet.cacheMetadatatrue打開 Parquet schema metadata 的緩存. 能夠加快查詢靜態數據.
spark.sql.parquet.compression.codecsnappy在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdowntrue設置爲 true 時啓用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquettrue當設置爲 false 時, Spark SQL 將使用 Hive SerDe 做爲 parquet tables , 而不是內置的支持.
spark.sql.parquet.mergeSchemafalse當爲 true 時, Parquet data source (Parquet 數據源) merges (合併)從全部 data files (數據文件)收集的 schemas , 不然若是沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .
spark.sql.optimizer.metadataOnlytrue若是爲 true , 則啓用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區列)而不是 table scans (表掃描). 當 scanned (掃描)的全部 columns (列)都是 partition columns (分區列)而且 query (查詢)具備知足 distinct semantics (不一樣語義)的 aggregate operator (聚合運算符)時, 它將適用.
JSON Datasets (JSON 數據集)
Spark SQL 能夠 automatically infer (自動推斷)JSON dataset 的 schema, 並將其做爲Dataset[Row]加載. 這個 conversion (轉換)能夠在Dataset[String]上使用SparkSession.read.json()來完成, 或 JSON 文件.
請注意, 以a json file提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱JSON Lines text format, also called newline-delimited JSON.
對於 regular multi-line JSON file (常規的多行 JSON 文件), 將multiLine選項設置爲true.
// Primitive types (Int, String, etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.importspark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesvalpath="examples/src/main/resources/people.json"valpeopleDF=spark.read.json(path)// The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by sparkvalteenagerNamesDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()// +------+// | name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset[String] storing one JSON object per stringvalotherPeopleDataset=spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""::Nil)valotherPeople=spark.read.json(otherPeopleDataset)otherPeople.show()// +---------------+----+// | address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive 表
Spark SQL 還支持讀取和寫入存儲在Apache Hive中的數據。 可是,因爲 Hive 具備大量依賴關係,所以這些依賴關係不包含在默認 Spark 分發中。 若是在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。 請注意,這些 Hive 依賴關係也必須存在於全部工做節點上,由於它們將須要訪問 Hive 序列化和反序列化庫 (SerDes),以訪問存儲在 Hive 中的數據。
經過將hive-site.xml,core-site.xml(用於安全配置)和hdfs-site.xml(用於 HDFS 配置)文件放在conf/中來完成配置。
當使用 Hive 時,必須用 Hive 支持實例化SparkSession,包括鏈接到持續的 Hive 轉移,支持 Hive serdes 和 Hive 用戶定義的功能。 沒有現有 Hive 部署的用戶仍然能夠啓用 Hive 支持。 當hive-site.xml未配置時,上下文會自動在當前目錄中建立metastore_db,並建立由spark.sql.warehouse.dir配置的目錄,該目錄默認爲Spark應用程序當前目錄中的spark-warehouse目錄 開始了 請注意,自從2.0.0以來,hive-site.xml中的hive.metastore.warehouse.dir屬性已被棄用。 而是使用spark.sql.warehouse.dir來指定倉庫中數據庫的默認位置。 您可能須要向啓動 Spark 應用程序的用戶授予寫權限。å
importjava.io.Fileimportorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessioncaseclassRecord(key:Int,value:String)// warehouseLocation points to the default location for managed databases and tablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).enableHiveSupport().getOrCreate()importspark.implicits._importspark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsql("SELECT * FROM src").show()// +---+-------+// |key| value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.sql("SELECT COUNT(*) FROM src").show()// +--------+// |count(1)|// +--------+// | 500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.valsqlDF=sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.valstringsDS=sqlDF.map{caseRow(key:Int,value:String)=>s"Key:$key, Value:$value"}stringsDS.show()// +--------------------+// | value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// | 2| val_2| 2| val_2|// | 4| val_4| 4| val_4|// | 5| val_5| 5| val_5|// ...
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.
指定 Hive 表的存儲格式
建立 Hive 表時,須要定義如何 從/向 文件系統 read/write 數據,即 「輸入格式」 和 「輸出格式」。 您還須要定義該表如何將數據反序列化爲行,或將行序列化爲數據,即 「serde」。 如下選項可用於指定存儲格式 (「serde」, 「input format」, 「output format」),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默認狀況下,咱們將以純文本形式讀取表格文件。 請注意,Hive 存儲處理程序在建立表時不受支持,您可使用 Hive 端的存儲處理程序建立一個表,並使用 Spark SQL 來讀取它。
Property NameMeaning
fileFormatfileFormat是一種存儲格式規範的包,包括 "serde","input format" 和 "output format"。 目前咱們支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定爲字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,若是您已經指定了 "fileFormat" 選項,則沒法指定它們。
serde此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,若是給定的 `fileFormat` 已經包含 serde 的信息,那麼不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用這3個文件格式的這個選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim這些選項只能與 "textfile" 文件格式一塊兒使用。它們定義如何將分隔的文件讀入行。
使用OPTIONS定義的全部其餘屬性將被視爲 Hive serde 屬性。
與不一樣版本的 Hive Metastore 進行交互
Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互,這使得 Spark SQL 可以訪問 Hive 表的元數據。 從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制構建可使用下面所述的配置來查詢不一樣版本的 Hive 轉移。 請注意,獨立於用於與轉移點通訊的 Hive 版本,內部 Spark SQL 將針對 Hive 1.2.1 進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。
如下選項可用於配置用於檢索元數據的 Hive 版本:
屬性名稱默認值含義
spark.sql.hive.metastore.version1.2.1Hive metastore 版本。 可用選項爲0.12.0至1.2.1。
spark.sql.hive.metastore.jarsbuiltin當啓用-Phive時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲1.2.1或未定義。 行家 使用從Maven存儲庫下載的指定版本的Hive jar。 一般不建議在生產部署中使用此配置。 ***** 應用於實例化 HiveMetastoreClient 的 jar 的位置。該屬性能夠是三個選項之一:
builtin當啓用-Phive時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version必須爲1.2.1或未定義。
maven使用從 Maven 存儲庫下載的指定版本的 Hive jar。一般不建議在生產部署中使用此配置。
JVM 的標準格式的 classpath。 該類路徑必須包含全部 Hive 及其依賴項,包括正確版本的 Hadoop。這些罐只須要存在於 driver 程序中,但若是您正在運行在 yarn 集羣模式,那麼您必須確保它們與應用程序一塊兒打包。
spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。 其它須要共享的類,是須要與已經共享的類進行交互的。 例如,log4j 使用的自定義 appender。
spark.sql.hive.metastore.barrierPrefixes(empty)一個逗號分隔的類前綴列表,應該明確地爲 Spark SQL 正在通訊的 Hive 的每一個版本從新加載。 例如,在一般將被共享的前綴中聲明的 Hive UDF (即: �org.apache.spark.*)。
JDBC 鏈接其它數據庫
Spark SQL 還包括可使用 JDBC 從其餘數據庫讀取數據的數據源。此功能應優於使用JdbcRDD。 這是由於結果做爲 DataFrame 返回,而且能夠輕鬆地在 Spark SQL 中處理或與其餘數據源鏈接。 JDBC 數據源也更容易從 Java 或 Python 使用,由於它不須要用戶提供 ClassTag。(請注意,這不一樣於 Spark SQL JDBC 服務器,容許其餘應用程序使用 Spark SQL 運行查詢)。
要開始使用,您須要在 Spark 類路徑中包含特定數據庫的 JDBC driver 程序。 例如,要從 Spark Shell 鏈接到 postgres,您將運行如下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
可使用 Data Sources API 未來自遠程數據庫的表做爲 DataFrame 或 Spark SQL 臨時視圖進行加載。 用戶能夠在數據源選項中指定 JDBC 鏈接屬性。用戶和密碼一般做爲登陸數據源的鏈接屬性提供。 除了鏈接屬性外,Spark 還支持如下不區分大小寫的選項:
�屬性名稱含義
url要鏈接的JDBC URL。 源特定的鏈接屬性能夠在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable應該讀取的 JDBC 表。請注意,可使用在SQL查詢的FROM子句中有效的任何內容。 例如,您可使用括號中的子查詢代替完整表。
driver用於鏈接到此 URL 的 JDBC driver 程序的類名。
partitionColumn, lowerBound, upperBound若是指定了這些選項,則必須指定這些選項。 另外,必須指定numPartitions. 他們描述如何從多個 worker 並行讀取數據時將表給分區。partitionColumn必須是有問題的表中的數字列。 請注意,lowerBound和upperBound僅用於決定分區的大小,而不是用於過濾表中的行。 所以,表中的全部行將被分區並返回。此選項僅適用於讀操做。
numPartitions在表讀寫中能夠用於並行度的最大分區數。這也肯定併發JDBC鏈接的最大數量。 若是要寫入的分區數超過此限制,則在寫入以前經過調用coalesce(numPartitions)將其減小到此限制。
fetchsizeJDBC 抓取的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操做。
batchsizeJDBC 批處理的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能。 該選項僅適用於寫操做。默認值爲1000.
isolationLevel事務隔離級別,適用於當前鏈接。 它能夠是NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ, 或SERIALIZABLE之一,對應於 JDBC 鏈接對象定義的標準事務隔離級別,默認爲READ_UNCOMMITTED。 此選項僅適用於寫操做。請參考java.sql.Connection中的文檔。
truncate這是一個與 JDBC 相關的選項。 啓用SaveMode.Overwrite時,此選項會致使 Spark 截斷現有表,而不是刪除並從新建立。 這能夠更有效,而且防止表元數據(例如,索引)被移除。 可是,在某些狀況下,例如當新數據具備不一樣的模式時,它將沒法工做。 它默認爲false。 此選項僅適用於寫操做。
createTableOptions這是一個與JDBC相關的選項。 若是指定,此選項容許在建立表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)。此選項僅適用於寫操做。
createTableColumnTypes使用數據庫列數據類型而不是默認值,建立表時。 數據類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的類型應該是有效的 spark sql 數據類型。此選項僅適用於寫操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourcevaljdbcDF=spark.read.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").load()valconnectionProperties=newProperties()connectionProperties.put("user","username")connectionProperties.put("password","password")valjdbcDF2=spark.read.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Saving data to a JDBC sourcejdbcDF.write.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Specifying create table column data types on writejdbcDF.write.option("createTableColumnTypes","name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
故障排除
JDBC driver 程序類必須對客戶端會話和全部執行程序上的原始類加載器可見。 這是由於 Java 的 DriverManager 類執行安全檢查,致使它忽略原始類加載器不可見的全部 driver 程序,當打開鏈接時。一個方便的方法是修改全部工做節點上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些數據庫,例如 H2,將全部名稱轉換爲大寫。 您須要使用大寫字母來引用 Spark SQL 中的這些名稱。
性能調優
對於某些工做負載,能夠經過緩存內存中的數據或打開一些實驗選項來提升性能。
在內存中緩存數據
Spark SQL 能夠經過調用spark.catalog.cacheTable("tableName")或dataFrame.cache()來使用內存中的列格式來緩存表。 而後,Spark SQL 將只掃描所需的列,並將自動調整壓縮以最小化內存使用量和 GC 壓力。 您能夠調用spark.catalog.uncacheTable("tableName")從內存中刪除該表。
內存緩存的配置可使用SparkSession上的setConf方法或使用 SQL 運行SET key=value命令來完成。
屬性名稱默認含義
spark.sql.inMemoryColumnarStorage.compressedtrue當設置爲 true 時,Spark SQL 將根據數據的統計信息爲每一個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize10000控制批量的柱狀緩存的大小。更大的批量大小能夠提升內存利用率和壓縮率,可是在緩存數據時會冒出 OOM 風險。
其餘配置選項
如下選項也可用於調整查詢執行的性能。這些選項可能會在未來的版本中被廢棄,由於更多的優化是自動執行的。
屬性名稱默認值含義
spark.sql.files.maxPartitionBytes134217728 (128 MB)在讀取文件時,將單個分區打包的最大字節數。
spark.sql.files.openCostInBytes4194304 (4 MB)按照字節數來衡量的打開文件的估計費用能夠在同一時間進行掃描。 將多個文件放入分區時使用。最好過分估計,那麼具備小文件的分區將比具備較大文件的分區(首先計劃的)更快。
spark.sql.broadcastTimeout300廣播鏈接中的廣播等待時間超時(秒)
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置執行鏈接時將廣播給全部工做節點的表的最大大小(以字節爲單位)。 經過將此值設置爲-1能夠禁用廣播。 請注意,目前的統計信息僅支持 Hive Metastore 表,其中已運行命令ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations.
分佈式 SQL 引擎
Spark SQL 也能夠充當使用其 JDBC/ODBC 或命令行界面的分佈式查詢引擎。 在這種模式下,最終用戶或應用程序能夠直接與 Spark SQL 交互運行 SQL 查詢,而不須要編寫任何代碼。
運行 Thrift JDBC/ODBC 服務器
這裏實現的 Thrift JDBC/ODBC 服務器對應於 Hive 1.2 中的HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。
要啓動 JDBC/ODBC 服務器,請在 Spark 目錄中運行如下命令:
./sbin/start-thriftserver.sh
此腳本接受全部bin/spark-submit命令行選項,以及--hiveconf選項來指定 Hive 屬性。 您能夠運行./sbin/start-thriftserver.sh --help查看全部可用選項的完整列表。 默認狀況下,服務器監聽 localhost:10000. 您能夠經過環境變量覆蓋此行爲,即:
exportHIVE_SERVER2_THRIFT_PORT=exportHIVE_SERVER2_THRIFT_BIND_HOST=./sbin/start-thriftserver.sh\--master \...
or system properties:
./sbin/start-thriftserver.sh\--hiveconf hive.server2.thrift.port=\--hiveconf hive.server2.thrift.bind.host=\--master ...
如今,您可使用 beeline 來測試 Thrift JDBC/ODBC 服務器:
./bin/beeline
使用 beeline 方式鏈接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://localhost:10000
Beeline 將要求您輸入用戶名和密碼。 在非安全模式下,只需輸入機器上的用戶名和空白密碼便可。 對於安全模式,請按照beeline 文檔中的說明進行操做。
配置Hive是經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的。
您也可使用 Hive 附帶的 beeline 腳本。
Thrift JDBC 服務器還支持經過 HTTP 傳輸發送 thrift RPC 消息。 使用如下設置啓用 HTTP 模式做爲系統屬性或在conf/中的hive-site.xml文件中啓用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
要測試,請使用 beeline 以 http 模式鏈接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://:/?hive.server2.transport.mode=http;hive.server2.thrift.http.path=
運行 Spark SQL CLI
Spark SQL CLI 是在本地模式下運行 Hive 轉移服務並執行從命令行輸入的查詢的方便工具。 請注意,Spark SQL CLI 不能與 Thrift JDBC 服務器通訊。
要啓動 Spark SQL CLI,請在 Spark 目錄中運行如下命令:
./bin/spark-sql
配置 Hive 是經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的。 您能夠運行./bin/spark-sql --help獲取全部可用選項的完整列表。
遷移指南
從 Spark SQL 2.1 升級到 2.2
Spark 2.1.1 介紹了一個新的配置 key:spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是NEVER_INFER, 其行爲與 2.1.0 保持一致. 可是,Spark 2.2.0 將此設置的默認值更改成 「INFER_AND_SAVE」,以恢復與底層文件 schema(模式)具備大小寫混合的列名稱的 Hive metastore 表的兼容性。使用INFER_AND_SAVE配置的 value, 在第一次訪問 Spark 將對其還沒有保存推測 schema(模式)的任何 Hive metastore 表執行 schema inference(模式推斷). 請注意,對於具備數千個 partitions(分區)的表,模式推斷多是很是耗時的操做。若是不兼容大小寫混合的列名,您能夠安全地將spark.sql.hive.caseSensitiveInferenceMode設置爲NEVER_INFER,以免模式推斷的初始開銷。請注意,使用新的默認INFER_AND_SAVE設置,模式推理的結果被保存爲 metastore key 以供未來使用。所以,初始模式推斷僅發生在表的第一次訪問。
從 Spark SQL 2.0 升級到 2.1
Datasource tables(數據源表)如今存儲了 Hive metastore 中的 partition metadata(分區元數據). 這意味着諸如ALTER TABLE PARTITION ... SET LOCATION這樣的 Hive DDLs 如今使用 Datasource API 可用於建立 tables(表).
遺留的數據源表能夠經過MSCK REPAIR TABLE命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
要肯定表是否已遷移,當在表上發出DESCRIBE FORMATTED命令時請查找PartitionProvider: Catalog屬性.
Datasource tables(數據源表)的INSERT OVERWRITE TABLE ... PARTITION ...行爲的更改。
在之前的 Spark 版本中,INSERT OVERWRITE覆蓋了整個 Datasource table,即便給出一個指定的 partition. 如今只有匹配規範的 partition 被覆蓋。
請注意,這仍然與 Hive 表的行爲不一樣,Hive 表僅覆蓋與新插入數據重疊的分區。
從 Spark SQL 1.6 升級到 2.0
SparkSession如今是 Spark 新的切入點, 它替代了老的SQLContext和HiveContext。注意 : 爲了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能夠從SparkSession獲取一個新的catalog接口 — 現有的訪問數據庫和表的 API,如listTables,createExternalTable,dropTempView,cacheTable都被移到該接口。
Dataset API 和 DataFrame API 進行了統一。在 Scala 中,DataFrame變成了Dataset[Row]類型的一個別名,而 Java API 使用者必須將DataFrame替換成Dataset。Dataset 類既提供了強類型轉換操做(如map,filter以及groupByKey)也提供了非強類型轉換操做(如select和groupBy)。因爲編譯期的類型安全不是 Python 和 R 語言的一個特性,Dataset 的概念並不適用於這些語言的 API。相反,DataFrame仍然是最基本的編程抽象, 就相似於這些語言中單節點 data frame 的概念。
Dataset 和 DataFrame API 中 unionAll 已通過時而且由union替代。
Dataset 和 DataFrame API 中 explode 已通過時,做爲選擇,能夠結合 select 或 flatMap 使用functions.explode()。
Dataset 和 DataFrame API 中registerTempTable已通過時而且由createOrReplaceTempView替代。
對 Hive tablesCREATE TABLE ... LOCATION行爲的更改.
從 Spark 2.0 開始,CREATE TABLE ... LOCATION與CREATE EXTERNAL TABLE ... LOCATION是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現有數據。這意味着,在用戶指定位置的 Spark SQL 中建立的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數據。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意,這與Hive行爲不一樣。
所以,這些表上的 「DROP TABLE」 語句不會刪除數據。
從 Spark SQL 1.5 升級到 1.6
從 Spark 1.6 開始,默認狀況下服務器在多 session(會話)模式下運行。這意味着每一個 JDBC/ODBC 鏈接擁有一份本身的 SQL 配置和臨時函數註冊。緩存表仍在並共享。若是您但願以舊的單會話模式運行 Thrift server,請設置選項spark.sql.hive.thriftServer.singleSession爲true。您既能夠將此選項添加到spark-defaults.conf,或者經過--conf將它傳遞給start-thriftserver.sh。
./sbin/start-thriftserver.sh\--conf spark.sql.hive.thriftServer.singleSession=true\...
從 1.6.1 開始,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現有列。
從 Spark 1.6 開始,LongType 強制轉換爲 TimestampType 指望是秒,而不是微秒。這種更改是爲了匹配 Hive 1.2 的行爲,以便從 numeric(數值)類型進行更一致的類型轉換到 TimestampType。更多詳情請參閱SPARK-11724。
從 Spark SQL 1.4 升級到 1.5
使用手動管理的內存優化執行,如今是默認啓用的,以及代碼生成表達式求值。這些功能既能夠經過設置spark.sql.tungsten.enabled爲false來禁止使用。
Parquet 的模式合併默認狀況下再也不啓用。它能夠經過設置spark.sql.parquet.mergeSchema到true以從新啓用。
字符串在 Python 列的 columns(列)如今支持使用點(.)來限定列或訪問嵌套值。例如df['table.column.nestedField']。可是,這意味着若是你的列名中包含任何圓點,你如今必須避免使用反引號(如table.column.with.dots.nested)。
在內存中的列存儲分區修剪默認是開啓的。它能夠經過設置spark.sql.inMemoryColumnarStorage.partitionPruning爲false來禁用。
無限精度的小數列再也不支持,而不是 Spark SQL 最大精度爲 38 。當從BigDecimal對象推斷模式時,如今使用(38,18)。在 DDL 沒有指定精度時,則默認保留Decimal(10, 0)。
時間戳如今存儲在 1 微秒的精度,而不是 1 納秒的。
在 sql 語句中,floating point(浮點數)如今解析爲 decimal。HiveQL 解析保持不變。
SQL / DataFrame 函數的規範名稱如今是小寫(例如 sum vs SUM)。
JSON 數據源不會自動加載由其餘應用程序(未經過 Spark SQL 插入到數據集的文件)建立的新文件。對於 JSON 持久表(即表的元數據存儲在 Hive Metastore),用戶可使用REFRESH TABLESQL 命令或HiveContext的refreshTable方法,把那些新文件列入到表中。對於表明一個 JSON dataset 的 DataFrame,用戶須要從新建立 DataFrame,同時 DataFrame 中將包括新的文件。
PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現有的同名列。
從 Spark SQL 1.3 升級到 1.4
DataFrame data reader/writer interface
基於用戶反饋,咱們建立了一個新的更流暢的 API,用於讀取 (SQLContext.read) 中的數據並寫入數據 (DataFrame.write), 而且舊的 API 將過期(例如,SQLContext.parquetFile,SQLContext.jsonFile).
針對SQLContext.read(Scala,Java,Python) 和DataFrame.write(Scala,Java,Python) 的更多細節,請看 API 文檔.
DataFrame.groupBy 保留 grouping columns(分組的列)
根據用戶的反饋, 咱們更改了DataFrame.groupBy().agg()的默認行爲以保留DataFrame結果中的 grouping columns(分組列). 爲了在 1.3 中保持該行爲,請設置spark.sql.retainGroupColumns爲false.
// In 1.3.x, in order for the grouping column "department" to show up,// it must be included explicitly as part of the agg function call.df.groupBy("department").agg($"department",max("age"),sum("expense"))// In 1.4+, grouping column "department" is included automatically.df.groupBy("department").agg(max("age"),sum("expense"))// Revert to 1.3 behavior (not retaining grouping column) by:sqlContext.setConf("spark.sql.retainGroupColumns","false")
DataFrame.withColumn 上的行爲更改
以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。該列將始終在 DateFrame 結果中被加入做爲新的列,即便現有的列可能存在相同的名稱。從 1.4 版本開始,DataFrame.withColumn() 支持添加與全部現有列的名稱不一樣的列或替換現有的同名列。
請注意,這一變化僅適用於 Scala API,並不適用於 PySpark 和 SparkR。
從 Spark SQL 1.0-1.2 升級到 1.3
在 Spark 1.3 中,咱們從 Spark SQL 中刪除了 「Alpha」 的標籤,做爲一部分已經清理過的可用的 API 。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其餘版本的二進制兼容性。這種兼容性保證不包括被明確標記爲不穩定的(即 DeveloperApi 類或 Experimental) API。
重命名 DataFrame 的 SchemaRDD
升級到 Spark SQL 1.3 版本時,用戶會發現最大的變化是,SchemaRDD已改名爲DataFrame。這主要是由於 DataFrames 再也不從 RDD 直接繼承,而是由 RDDS 本身來實現這些功能。DataFrames 仍然能夠經過調用.rdd方法轉換爲 RDDS 。
在 Scala 中,有一個從SchemaRDD到DataFrame類型別名,能夠爲一些狀況提供源代碼兼容性。它仍然建議用戶更新他們的代碼以使用DataFrame來代替。Java 和 Python 用戶須要更新他們的代碼。
Java 和 Scala APIs 的統一
此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext和JavaSchemaRDD),借鑑於 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已經統一。兩種語言的用戶可使用SQLContext和DataFrame。通常來講論文類嘗試使用兩種語言的共有類型(如Array替代了一些特定集合)。在某些狀況下不通用的類型狀況下,(例如,passing in closures 或 Maps)使用函數重載代替。
此外,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可使用存在於org.apache.spark.sql.types類來描述編程模式。
隔離隱式轉換和刪除 dsl 包(僅Scala)
許多 Spark 1.3 版本之前的代碼示例都以import sqlContext._開始,這提供了從 sqlContext 範圍的全部功能。在 Spark 1.3 中,咱們移除了從RDDs 到DateFrame再到SQLContext內部對象的隱式轉換。用戶如今應該寫成import sqlContext.implicits._.
此外,隱式轉換如今只能使用方法toDF來增長由Product(即 case classes or tuples)構成的RDD,而不是自動應用。
當使用 DSL 內部的函數時(如今使用DataFrameAPI 來替換), 用戶習慣導入org.apache.spark.sql.catalyst.dsl. 相反,應該使用公共的 dataframe 函數 API:import org.apache.spark.sql.functions._.
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)
Spark 1.3 移除存在於基本 SQL 包的DataType類型別名。開發人員應改成導入類org.apache.spark.sql.types。
UDF 註冊遷移到sqlContext.udf中 (Java & Scala)
用於註冊 UDF 的函數,無論是 DataFrame DSL 仍是 SQL 中用到的,都被遷移到SQLContext中的 udf 對象中。
sqlContext.udf.register("strLen",(s:String)=>s.length())
Python UDF 註冊保持不變。
Python DataTypes 再也不是 Singletons(單例的)
在 Python 中使用 DataTypes 時,你須要先構造它們(如:StringType()),而不是引用一個單例對象。
與 Apache Hive 的兼容
Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基於 Hive 1.2.1 版本,而且Spark SQL 能夠鏈接到不一樣版本的Hive metastore(從 0.12.0 到 1.2.1,能夠參考與不一樣版本的 Hive Metastore 交互)
在現有的 Hive Warehouses 中部署
Spark SQL Thrift JDBC server 採用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不須要修改現有的 Hive Metastore , 或者改變數據的位置和表的分區。
所支持的 Hive 特性
Spark SQL 支持絕大部分的 Hive 功能,如:
Hive query(查詢)語句, 包括:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
全部 Hive 操做, 包括:
關係運算符 (=,⇔,==,<>,<,>,>=,<=, 等等)
算術運算符 (+,-,*,/,%, 等等)
邏輯運算符 (AND,&&,OR,||, 等等)
複雜類型的構造
數學函數 (sign,ln,cos, 等等)
String 函數 (instr,length,printf, 等等)
用戶定義函數 (UDF)
用戶定義聚合函數 (UDAF)
用戶定義 serialization formats (SerDes)
窗口函數
Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
Unions
Sub-queries(子查詢)
SELECT col FROM ( SELECT a + b AS col from t1) t2
Sampling
Explain
Partitioned tables including dynamic partition insertion
View
全部的 Hive DDL 函數, 包括:
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
大部分的 Hive Data types(數據類型), 包括:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
未支持的 Hive 函數
如下是目前還不支持的 Hive 函數列表。在 Hive 部署中這些功能大部分都用不到。
主要的 Hive 功能
Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.
Esoteric Hive 功能
UNION類型
Unique join
Column 統計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.
Hive Input/Output Formats
File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
Hadoop archive
Hive 優化
有少數 Hive 優化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因爲 Spark SQL 的這種內存計算模型而顯得不那麼重要。另一些在 Spark SQL 將來的版本中會持續跟蹤。
Block 級別的 bitmap indexes 和虛擬 columns (用於構建 indexes)
自動爲 join 和 groupBy 計算 reducer 個數 : 目前在 Spark SQL 中, 你須要使用 「SET spark.sql.shuffle.partitions=[num_tasks];」 來控制 post-shuffle 的並行度.
僅 Meta-data 的 query: 對於只使用 metadata 就能回答的查詢,Spark SQL 仍然會啓動計算結果的任務.
Skew data flag: Spark SQL 不遵循 Hive 中 skew 數據的標記.
STREAMTABLEhint in join: Spark SQL 不遵循STREAMTABLEhint.
對於查詢結果合併多個小文件: 若是輸出的結果包括多個小文件, Hive 能夠可選的合併小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 還不支持這樣.
參考
數據類型
Spark SQL 和 DataFrames 支持下面的數據類型:
Numeric types
ByteType: Represents 1-byte signed integer numbers. The range of numbers is from-128to127.
ShortType: Represents 2-byte signed integer numbers. The range of numbers is from-32768to32767.
IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from-2147483648to2147483647.
LongType: Represents 8-byte signed integer numbers. The range of numbers is from-9223372036854775808to9223372036854775807.
FloatType: Represents 4-byte single-precision floating point numbers.
DoubleType: Represents 8-byte double-precision floating point numbers.
DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally byjava.math.BigDecimal. ABigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
String type
StringType: Represents character string values.
Binary type
BinaryType: Represents byte sequence values.
Boolean type
BooleanType: Represents boolean values.
Datetime type
TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
DateType: Represents values comprising values of fields year, month, day.
Complex types
ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type ofelementType.containsNullis used to indicate if elements in aArrayTypevalue can havenullvalues.
MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described bykeyTypeand the data type of values are described byvalueType. For aMapTypevalue, keys are not allowed to havenullvalues.valueContainsNullis used to indicate if values of aMapTypevalue can havenullvalues.
StructType(fields): Represents values with the structure described by a sequence ofStructFields (fields).
StructField(name, dataType, nullable): Represents a field in aStructType. The name of a field is indicated byname. The data type of a field is indicated bydataType.nullableis used to indicate if values of this fields can havenullvalues.
Spark SQL 的全部數據類型都在包org.apache.spark.sql.types中. 你能夠用下示例示例來訪問它們.
importorg.apache.spark.sql.types._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數據類型)Scala 中的 Value 類型訪問或建立數據類型的 API
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayTypescala.collection.SeqArrayType(elementType, [containsNull])
Note(注意):containsNull的默認值是true.
MapTypescala.collection.MapMapType(keyType,valueType, [valueContainsNull])
Note(注意):valueContainsNull的默認值是true.
StructTypeorg.apache.spark.sql.RowStructType(fields)
Note(注意):fields是 StructFields 的 Seq. 全部, 兩個 fields 擁有相同的名稱是不被容許的.
StructField該 field(字段)數據類型的 Scala 中的 value 類型 (例如, 數據類型爲 IntegerType 的 StructField 是 Int)StructField(name,dataType, [nullable])
Note:nullable的默認值是true.
NaN Semantics
當處理一些不符合標準浮點數語義的float或double類型時,對於 Not-a-Number(NaN) 須要作一些特殊處理. 具體以下:
NaN = NaN 返回 true.
在 aggregations(聚合)操做中,全部的 NaN values 將被分到同一個組中.
在 join key 中 NaN 能夠當作一個普通的值.
NaN 值在升序排序中排到最後,比任何其餘數值都大.
Spark SQL, DataFrames and Datasets Guide
無類型的Dataset操做 (aka DataFrame 操做)
Running SQL Queries Programmatically
Untyped User-Defined Aggregate Functions
Type-Safe User-Defined Aggregate Functions
Generic Load/Save Functions (通用 加載/保存 功能)
Manually Specifying Options (手動指定選項)
Run SQL on files directly (直接在文件上運行 SQL)
Saving to Persistent Tables (保存到持久表)
Bucketing, Sorting and Partitioning (分桶, 排序和分區)
Loading Data Programmatically (以編程的方式加載數據)
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)
Hive/Parquet Schema Reconciliation
DataFrame data reader/writer interface
DataFrame.groupBy 保留 grouping columns(分組的列)
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)
UDF 註冊遷移到sqlContext.udf中 (Java & Scala)
Python DataTypes 再也不是 Singletons(單例的)
Overview
Spark SQL 是 Spark 處理結構化數據的一個模塊.與基礎的 Spark RDD API 不一樣, Spark SQL 提供了查詢結構化數據及計算結果等信息的接口.在內部, Spark SQL 使用這個額外的信息去執行額外的優化.有幾種方式能夠跟 Spark SQL 進行交互, 包括 SQL 和 Dataset API.當使用相同執行引擎進行計算時, 不管使用哪一種 API / 語言均可以快速的計算.這種統一意味着開發人員可以在基於提供最天然的方式來表達一個給定的 transformation API 之間實現輕鬆的來回切換不一樣的 .
該頁面全部例子使用的示例數據都包含在 Spark 的發佈中, 而且可使用spark-shell,pysparkshell, 或者sparkRshell來運行.
SQL
Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也可以被用於從已存在的 Hive 環境中讀取數據.更多關於如何配置這個特性的信息, 請參考Hive 表這部分. 當以另外的編程語言運行SQL 時, 查詢結果將以Dataset/DataFrame的形式返回.您也可使用命令行或者經過JDBC/ODBC與 SQL 接口交互.
Datasets and DataFrames
一個 Dataset 是一個分佈式的數據集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的優勢(強類型化, 可以使用強大的 lambda 函數)與Spark SQL執行引擎的優勢.一個 Dataset 能夠從 JVM 對象來構造而且使用轉換功能(map, flatMap, filter, 等等). Dataset API 在Scala和Java是可用的.Python 不支持 Dataset API.可是因爲 Python 的動態特性, 許多 Dataset API 的優勢已經可用了 (也就是說, 你可能經過 name 天生的row.columnName屬性訪問一行中的字段).這種狀況和 R 類似.
一個 DataFrame 是一個Dataset組成的指定列.它的概念與一個在關係型數據庫或者在 R/Python 中的表是相等的, 可是有不少優化. DataFrames 能夠從大量的sources中構造出來, 好比: 結構化的文本文件, Hive中的表, 外部數據庫, 或者已經存在的 RDDs. DataFrame API 能夠在 Scala, Java,Python, 和R中實現. 在 Scala 和 Java中, 一個 DataFrame 所表明的是一個多個Row(行)的的 Dataset(數據集合). 在the Scala API中,DataFrame僅僅是一個Dataset[Row]類型的別名. 然而, 在Java API中, 用戶須要去使用Dataset去表明一個DataFrame.
在此文檔中, 咱們將經常會引用 Scala/Java Datasets 的Rows 做爲 DataFrames.
開始入門
起始點: SparkSession
Spark SQL中全部功能的入口點是SparkSession類. 要建立一個SparkSession, 僅使用SparkSession.builder()就能夠了:
importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()// For implicit conversions like converting RDDs to DataFramesimportspark.implicits._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Spark 2.0 中的SparkSession爲 Hive 特性提供了內嵌的支持, 包括使用 HiveQL 編寫查詢的能力, 訪問 Hive UDF,以及從 Hive 表中讀取數據的能力.爲了使用這些特性, 你不須要去有一個已存在的 Hive 設置.
建立 DataFrames
在一個SparkSession中, 應用程序能夠從一個已經存在的RDD, 從hive表, 或者從Spark數據源中建立一個DataFrames.
舉個例子, 下面就是基於一個JSON文件建立一個DataFrame:
valdf=spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
無類型的Dataset操做 (aka DataFrame 操做)
DataFrames 提供了一個特定的語法用在Scala,Java,PythonandR中機構化數據的操做.
正如上面提到的同樣, Spark 2.0中, DataFrames在Scala 和 Java API中, 僅僅是多個Rows的Dataset. 這些操做也參考了與強類型的Scala/Java Datasets中的」類型轉換」 對應的」無類型轉換」 .
這裏包括一些使用 Dataset 進行結構化數據處理的示例 :
// This import is needed to use the $-notationimportspark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name",$"age"+1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter($"age">21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
可以在 DataFrame 上被執行的操做類型的完整列表請參考API 文檔.
除了簡單的列引用和表達式以外, DataFrame 也有豐富的函數庫, 包括 string 操做, date 算術, 常見的 math 操做以及更多.可用的完整列表請參考DataFrame 函數指南.
Running SQL Queries Programmatically
SparkSession的sql函數可讓應用程序以編程的方式運行 SQL 查詢, 並將結果做爲一個DataFrame返回.
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")valsqlDF=spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
全局臨時視圖
Spark SQL中的臨時視圖是session級別的, 也就是會隨着session的消失而消失. 若是你想讓一個臨時視圖在全部session中相互傳遞而且可用, 直到Spark 應用退出, 你能夠創建一個全局的臨時視圖.全局的臨時視圖存在於系統數據庫global_temp中, 咱們必須加上庫名去引用它, 好比.SELECT * FROM global_temp.view1.
// Register the DataFrame as a global temporary viewdf.createGlobalTempView("people")// Global temporary view is tied to a system preserved database `global_temp`spark.sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// Global temporary view is cross-sessionspark.newSession().sql("SELECT * FROM global_temp.people").show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
建立Datasets
Dataset 與 RDD 類似, 然而, 並非使用 Java 序列化或者 Kryo編碼器來序列化用於處理或者經過網絡進行傳輸的對象. 雖然編碼器和標準的序列化都負責將一個對象序列化成字節, 編碼器是動態生成的代碼, 而且使用了一種容許 Spark 去執行許多像 filtering, sorting 以及 hashing 這樣的操做, 不須要將字節反序列化成對象的格式.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecaseclassPerson(name:String,age:Long)// Encoders are created for case classesvalcaseClassDS=Seq(Person("Andy",32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._valprimitiveDS=Seq(1,2,3).toDS()primitiveDS.map(_+1).collect()// Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by namevalpath="examples/src/main/resources/people.json"valpeopleDS=spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
RDD的互操做性
Spark SQL 支持兩種不一樣的方法用於轉換已存在的 RDD 成爲 Dataset.第一種方法是使用反射去推斷一個包含指定的對象類型的 RDD 的 Schema.在你的 Spark 應用程序中當你已知 Schema 時這個基於方法的反射可讓你的代碼更簡潔.
第二種用於建立 Dataset 的方法是經過一個容許你構造一個 Schema 而後把它應用到一個已存在的 RDD 的編程接口.然而這種方法更繁瑣, 當列和它們的類型知道運行時都是未知時它容許你去構造 Dataset.
使用反射推斷Schema
Spark SQL 的 Scala 接口支持自動轉換一個包含 case classes 的 RDD 爲 DataFrame.Case class 定義了表的 Schema.Case class 的參數名使用反射讀取而且成爲了列名.Case class 也能夠是嵌套的或者包含像Seq或者Array這樣的複雜類型.這個 RDD 可以被隱式轉換成一個 DataFrame 而後被註冊爲一個表.表能夠用於後續的 SQL 語句.
// For implicit conversions from RDDs to DataFramesimportspark.implicits._// Create an RDD of Person objects from a text file, convert it to a DataframevalpeopleDF=spark.sparkContext.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(attributes=>Person(attributes(0),attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by SparkvalteenagersDF=spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager=>"Name: "+teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager=>"Name: "+teenager.getAs[String]("name")).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicitvalmapEncoder=org.apache.spark.sql.Encoders.kryo[Map[String,Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager=>teenager.getValuesMap[Any](List("name","age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
以編程的方式指定Schema
當 case class 不可以在執行以前被定義(例如, records 記錄的結構在一個 string 字符串中被編碼了, 或者一個 text 文本 dataset 將被解析而且不一樣的用戶投影的字段是不同的).一個DataFrame可使用下面的三步以編程的方式來建立.
從原始的 RDD 建立 RDD 的Row(行);
Step 1 被建立後, 建立 Schema 表示一個StructType匹配 RDD 中的Row(行)的結構.
經過SparkSession提供的createDataFrame方法應用 Schema 到 RDD 的 RowS(行).
例如:
importorg.apache.spark.sql.types._// Create an RDDvalpeopleRDD=spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringvalschemaString="name age"// Generate the schema based on the string of schemavalfields=schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))valschema=StructType(fields)// Convert records of the RDD (people) to RowsvalrowRDD=peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0),attributes(1).trim))// Apply the schema to the RDDvalpeopleDF=spark.createDataFrame(rowRDD,schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesvalresults=spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes=>"Name: "+attributes(0)).show()// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Aggregations
Thebuilt-in DataFrames functionsprovide common aggregations such ascount(),countDistinct(),avg(),max(),min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them inScalaandJavato work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
Untyped User-Defined Aggregate Functions
Users have to extend theUserDefinedAggregateFunctionabstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:
importorg.apache.spark.sql.expressions.MutableAggregationBufferimportorg.apache.spark.sql.expressions.UserDefinedAggregateFunctionimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionobjectMyAverageextendsUserDefinedAggregateFunction{// Data types of input arguments of this aggregate functiondefinputSchema:StructType=StructType(StructField("inputColumn",LongType)::Nil)// Data types of values in the aggregation bufferdefbufferSchema:StructType={StructType(StructField("sum",LongType)::StructField("count",LongType)::Nil)}// The data type of the returned valuedefdataType:DataType=DoubleType// Whether this function always returns the same output on the identical inputdefdeterministic:Boolean=true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.definitialize(buffer:MutableAggregationBuffer):Unit={buffer(0)=0Lbuffer(1)=0L}// Updates the given aggregation buffer `buffer` with new input data from `input`defupdate(buffer:MutableAggregationBuffer,input:Row):Unit={if(!input.isNullAt(0)){buffer(0)=buffer.getLong(0)+input.getLong(0)buffer(1)=buffer.getLong(1)+1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`defmerge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}// Calculates the final resultdefevaluate(buffer:Row):Double=buffer.getLong(0).toDouble/buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage",MyAverage)valdf=spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+valresult=spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around theAggregatorabstract class. For example, a type-safe user-defined average can look like:
importorg.apache.spark.sql.expressions.Aggregatorimportorg.apache.spark.sql.Encoderimportorg.apache.spark.sql.Encodersimportorg.apache.spark.sql.SparkSessioncaseclassEmployee(name:String,salary:Long)caseclassAverage(varsum:Long,varcount:Long)objectMyAverageextendsAggregator[Employee,Average,Double]{// A zero value for this aggregation. Should satisfy the property that any b + zero = bdefzero:Average=Average(0L,0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdefreduce(buffer:Average,employee:Employee):Average={buffer.sum+=employee.salarybuffer.count+=1buffer}// Merge two intermediate valuesdefmerge(b1:Average,b2:Average):Average={b1.sum+=b2.sumb1.count+=b2.countb1}// Transform the output of the reductiondeffinish(reduction:Average):Double=reduction.sum.toDouble/reduction.count// Specifies the Encoder for the intermediate value typedefbufferEncoder:Encoder[Average]=Encoders.product// Specifies the Encoder for the final output value typedefoutputEncoder:Encoder[Double]=Encoders.scalaDouble}valds=spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// +-------+------+// | name|salary|// +-------+------+// |Michael| 3000|// | Andy| 4500|// | Justin| 3500|// | Berta| 4000|// +-------+------+// Convert the function to a `TypedColumn` and give it a namevalaverageSalary=MyAverage.toColumn.name("average_salary")valresult=ds.select(averageSalary)result.show()// +--------------+// |average_salary|// +--------------+// | 3750.0|// +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.
Data Sources (數據源)
Spark SQL 支持經過 DataFrame 接口對各類 data sources (數據源)進行操做. DataFrame 可使用 relational transformations (關係轉換)操做, 也可用於建立 temporary view (臨時視圖). 將 DataFrame 註冊爲 temporary view (臨時視圖)容許您對其數據運行 SQL 查詢. 本節 描述了使用 Spark Data Sources 加載和保存數據的通常方法, 而後涉及可用於 built-in data sources (內置數據源)的 specific options (特定選項).
Generic Load/Save Functions (通用 加載/保存 功能)
在最簡單的形式中, 默認數據源(parquet, 除非另有配置spark.sql.sources.default)將用於全部操做.
valusersDF=spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Manually Specifying Options (手動指定選項)
您還能夠 manually specify (手動指定)將與任何你想傳遞給 data source 的其餘選項一塊兒使用的 data source . Data sources 由其 fully qualified name (徹底限定名稱)(即org.apache.spark.sql.parquet), 可是對於 built-in sources (內置的源), 你也可使用它們的 shortnames (短名稱)(json,parquet,jdbc,orc,libsvm,csv,text).從任何 data source type (數據源類型)加載 DataFrames 可使用此 syntax (語法)轉換爲其餘類型.
valpeopleDF=spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Run SQL on files directly (直接在文件上運行 SQL)
不使用讀取 API 將文件加載到 DataFrame 並進行查詢, 也能夠直接用 SQL 查詢該文件.
valsqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Save Modes (保存模式)
Save operations (保存操做)能夠選擇使用SaveMode, 它指定如何處理現有數據若是存在的話. 重要的是要意識到, 這些 save modes (保存模式)不使用任何 locking (鎖定)而且不是 atomic (原子). 另外, 當執行Overwrite時, 數據將在新數據寫出以前被刪除.
Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則會拋出異常.
SaveMode.Append"append"將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已存在, 則 DataFrame 的內容將被 append (附加)到現有數據中.
SaveMode.Overwrite"overwrite"Overwrite mode (覆蓋模式)意味着將 DataFrame 保存到 data source (數據源)時, 若是 data/table 已經存在, 則預期 DataFrame 的內容將 overwritten (覆蓋)現有數據.
SaveMode.Ignore"ignore"Ignore mode (忽略模式)意味着當將 DataFrame 保存到 data source (數據源)時, 若是數據已經存在, 則保存操做預期不會保存 DataFrame 的內容, 而且不更改現有數據. 這與 SQL 中的CREATE TABLE IF NOT EXISTS相似.
Saving to Persistent Tables (保存到持久表)
DataFrames也可使用saveAsTable命令做爲 persistent tables (持久表)保存到 Hive metastore 中. 請注意, existing Hive deployment (現有的 Hive 部署)不須要使用此功能. Spark 將爲您建立默認的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 與createOrReplaceTempView命令不一樣,saveAsTable將 materialize (實現) DataFrame 的內容, 並建立一個指向 Hive metastore 中數據的指針. 即便您的 Spark 程序從新啓動, Persistent tables (持久性表)仍然存在, 由於您保持與同一個 metastore 的鏈接. 能夠經過使用表的名稱在SparkSession上調用table方法來建立 persistent tabl (持久表)的 DataFrame .
對於 file-based (基於文件)的 data source (數據源), 例如 text, parquet, json等, 您能夠經過path選項指定 custom table path (自定義表路徑), 例如df.write.option("path", "/some/path").saveAsTable("t"). 當表被 dropped (刪除)時, custom table path (自定義表路徑)將不會被刪除, 而且表數據仍然存在. 若是未指定自定義表路徑, Spark 將把數據寫入 warehouse directory (倉庫目錄)下的默認表路徑. 當表被刪除時, 默認的表路徑也將被刪除.
從 Spark 2.1 開始, persistent datasource tables (持久性數據源表)將 per-partition metadata (每一個分區元數據)存儲在 Hive metastore 中. 這帶來了幾個好處:
因爲 metastore 只能返回查詢的必要 partitions (分區), 所以再也不須要將第一個查詢上的全部 partitions discovering 到表中.
Hive DDLs 如ALTER TABLE PARTITION ... SET LOCATION如今可用於使用 Datasource API 建立的表.
請注意, 建立 external datasource tables (外部數據源表)(帶有path選項)的表時, 默認狀況下不會收集 partition information (分區信息). 要 sync (同步) metastore 中的分區信息, 能夠調用MSCK REPAIR TABLE.
Bucketing, Sorting and Partitioning (分桶, 排序和分區)
對於 file-based data source (基於文件的數據源), 也能夠對 output (輸出)進行 bucket 和 sort 或者 partition . Bucketing 和 sorting 僅適用於 persistent tables :
peopleDF.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
在使用 Dataset API 時, partitioning 能夠同時與save和saveAsTable一塊兒使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
能夠爲 single table (單個表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
partitionBy建立一個 directory structure (目錄結構), 如Partition Discovery部分所述. 所以, 對 cardinality (基數)較高的 columns 的適用性有限. 相反,bucketBy能夠在固定數量的 buckets 中分配數據, 而且能夠在 a number of unique values is unbounded (多個惟一值無界時)使用數據.
Parquet Files
Parquet是許多其餘數據處理系統支持的 columnar format (柱狀格式). Spark SQL 支持讀寫 Parquet 文件, 可自動保留 schema of the original data (原始數據的模式). 當編寫 Parquet 文件時, 出於兼容性緣由, 全部 columns 都將自動轉換爲可空.
Loading Data Programmatically (以編程的方式加載數據)
使用上面例子中的數據:
// Encoders for most common types are automatically provided by importing spark.implicits._importspark.implicits._valpeopleDF=spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFramevalparquetFileDF=spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")valnamesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes=>"Name: "+attributes(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Partition Discovery (分區發現)
Table partitioning (表分區)是在像 Hive 這樣的系統中使用的常見的優化方法. 在 partitioned table (分區表)中, 數據一般存儲在不一樣的目錄中, partitioning column values encoded (分區列值編碼)在每一個 partition directory (分區目錄)的路徑中. Parquet data source (Parquet 數據源)如今能夠自動 discover (發現)和 infer (推斷)分區信息. 例如, 咱們可使用如下 directory structure (目錄結構)將全部之前使用的 population data (人口數據)存儲到 partitioned table (分區表)中, 其中有兩個額外的列gender和country做爲 partitioning columns (分區列):
path└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
經過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load, Spark SQL 將自動從路徑中提取 partitioning information (分區信息). 如今返回的 DataFrame 的 schema (模式)變成:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
請注意, 會自動 inferred (推斷) partitioning columns (分區列)的 data types (數據類型).目前, 支持 numeric data types (數字數據類型)和 string type (字符串類型).有些用戶可能不想自動推斷 partitioning columns (分區列)的數據類型.對於這些用例, automatic type inference (自動類型推斷)能夠由spark.sql.sources.partitionColumnTypeInference.enabled配置, 默認爲true.當禁用 type inference (類型推斷)時, string type (字符串類型)將用於 partitioning columns (分區列).
從 Spark 1.6.0 開始, 默認狀況下, partition discovery (分區發現)只能找到給定路徑下的 partitions (分區).對於上述示例, 若是用戶將path/to/table/gender=male傳遞給SparkSession.read.parquet或SparkSession.read.load, 則gender將不被視爲 partitioning column (分區列).若是用戶須要指定 partition discovery (分區發現)應該開始的基本路徑, 則能夠在數據源選項中設置basePath.例如, 當path/to/table/gender=male是數據的路徑而且用戶將basePath設置爲path/to/table/,gender將是一個 partitioning column (分區列).
Schema Merging (模式合併)
像 ProtocolBuffer , Avro 和 Thrift 同樣, Parquet 也支持 schema evolution (模式演進). 用戶能夠從一個 simple schema (簡單的架構)開始, 並根據須要逐漸向 schema 添加更多的 columns (列). 以這種方式, 用戶可能會使用不一樣但相互兼容的 schemas 的 multiple Parquet files (多個 Parquet 文件). Parquet data source (Parquet 數據源)如今可以自動檢測這種狀況並 merge (合併)全部這些文件的 schemas .
因爲 schema merging (模式合併)是一個 expensive operation (相對昂貴的操做), 而且在大多數狀況下不是必需的, 因此默認狀況下從 1.5.0 開始. 你能夠按照以下的方式啓用它:
讀取 Parquet 文件時, 將 data source option (數據源選項)mergeSchema設置爲true(以下面的例子所示), 或
將 global SQL option (全局 SQL 選項)spark.sql.parquet.mergeSchema設置爲true.
// This is used to implicitly convert an RDD to a DataFrame.importspark.implicits._// Create a simple DataFrame, store into a partition directoryvalsquaresDF=spark.sparkContext.makeRDD(1to5).map(i=>(i,i*i)).toDF("value","square")squaresDF.write.parquet("data/test_table/key=1")// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnvalcubesDF=spark.sparkContext.makeRDD(6to10).map(i=>(i,i*i*i)).toDF("value","cube")cubesDF.write.parquet("data/test_table/key=2")// Read the partitioned tablevalmergedDF=spark.read.option("mergeSchema","true").parquet("data/test_table")mergedDF.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root// |-- value: int (nullable = true)// |-- square: int (nullable = true)// |-- cube: int (nullable = true)// |-- key: int (nullable = true)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive metastore Parquet table conversion (Hive metastore Parquet table 轉換)
當讀取和寫入 Hive metastore Parquet 表時, Spark SQL 將嘗試使用本身的 Parquet support (Parquet 支持), 而不是 Hive SerDe 來得到更好的性能. 此 behavior (行爲)由spark.sql.hive.convertMetastoreParquet配置控制, 默認狀況下 turned on (打開).
Hive/Parquet Schema Reconciliation
從 table schema processing (表格模式處理)的角度來講, Hive 和 Parquet 之間有兩個關鍵的區別.
Hive 不區分大小寫, 而 Parquet 不是
Hive 認爲全部 columns (列)均可覺得空, 而 Parquet 中的可空性是 significant (重要)的.
因爲這個緣由, 當將 Hive metastore Parquet 錶轉換爲 Spark SQL Parquet 表時, 咱們必須調整 metastore schema 與 Parquet schema. reconciliation 規則是:
在兩個 schema 中具備 same name (相同名稱)的 Fields (字段)必須具備 same data type (相同的數據類型), 而無論 nullability (可空性). reconciled field 應具備 Parquet 的數據類型, 以便 nullability (可空性)獲得尊重.
reconciled schema (調和模式)正好包含 Hive metastore schema 中定義的那些字段.
只出如今 Parquet schema 中的任何字段將被 dropped (刪除)在 reconciled schema 中.
僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中做爲 nullable field (可空字段)添加.
Metadata Refreshing (元數據刷新)
Spark SQL 緩存 Parquet metadata 以得到更好的性能. 當啓用 Hive metastore Parquet table conversion (轉換)時, 這些 converted tables (轉換表)的 metadata (元數據)也被 cached (緩存). 若是這些表由 Hive 或其餘外部工具更新, 則須要手動刷新以確保 consistent metadata (一致的元數據).
// spark is an existing SparkSessionspark.catalog.refreshTable("my_table")
Configuration (配置)
可使用SparkSession上的setConf方法或使用 SQL 運行SET key = value命令來完成 Parquet 的配置.
Property Name (參數名稱)Default(默認)Meaning(含義)
spark.sql.parquet.binaryAsStringfalse一些其餘 Parquet-producing systems (Parquet 生產系統), 特別是 Impala, Hive 和舊版本的 Spark SQL , 在 writing out (寫出) Parquet schema 時, 不區分 binary data (二進制數據)和 strings (字符串). 該 flag 告訴 Spark SQL 將 binary data (二進制數據)解釋爲 string (字符串)以提供與這些系統的兼容性.
spark.sql.parquet.int96AsTimestamptrue一些 Parquet-producing systems , 特別是 Impala 和 Hive , 將 Timestamp 存入INT96 . 該 flag 告訴 Spark SQL 將 INT96 數據解析爲 timestamp 以提供與這些系統的兼容性.
spark.sql.parquet.cacheMetadatatrue打開 Parquet schema metadata 的緩存. 能夠加快查詢靜態數據.
spark.sql.parquet.compression.codecsnappy在編寫 Parquet 文件時設置 compression codec (壓縮編解碼器)的使用. 可接受的值包括: uncompressed, snappy, gzip, lzo .
spark.sql.parquet.filterPushdowntrue設置爲 true 時啓用 Parquet filter push-down optimization .
spark.sql.hive.convertMetastoreParquettrue當設置爲 false 時, Spark SQL 將使用 Hive SerDe 做爲 parquet tables , 而不是內置的支持.
spark.sql.parquet.mergeSchemafalse當爲 true 時, Parquet data source (Parquet 數據源) merges (合併)從全部 data files (數據文件)收集的 schemas , 不然若是沒有可用的 summary file , 則從 summary file 或 random data file 中挑選 schema .
spark.sql.optimizer.metadataOnlytrue若是爲 true , 則啓用使用表的 metadata 的 metadata-only query optimization 來生成 partition columns (分區列)而不是 table scans (表掃描). 當 scanned (掃描)的全部 columns (列)都是 partition columns (分區列)而且 query (查詢)具備知足 distinct semantics (不一樣語義)的 aggregate operator (聚合運算符)時, 它將適用.
JSON Datasets (JSON 數據集)
Spark SQL 能夠 automatically infer (自動推斷)JSON dataset 的 schema, 並將其做爲Dataset[Row]加載. 這個 conversion (轉換)能夠在Dataset[String]上使用SparkSession.read.json()來完成, 或 JSON 文件.
請注意, 以a json file提供的文件不是典型的 JSON 文件. 每行必須包含一個 separate (單獨的), self-contained valid (獨立的有效的)JSON 對象. 有關更多信息, 請參閱JSON Lines text format, also called newline-delimited JSON.
對於 regular multi-line JSON file (常規的多行 JSON 文件), 將multiLine選項設置爲true.
// Primitive types (Int, String, etc) and Product types (case classes) encoders are// supported by importing this when creating a Dataset.importspark.implicits._// A JSON dataset is pointed to by path.// The path can be either a single text file or a directory storing text filesvalpath="examples/src/main/resources/people.json"valpeopleDF=spark.read.json(path)// The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by sparkvalteenagerNamesDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()// +------+// | name|// +------+// |Justin|// +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by// a Dataset[String] storing one JSON object per stringvalotherPeopleDataset=spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""::Nil)valotherPeople=spark.read.json(otherPeopleDataset)otherPeople.show()// +---------------+----+// | address|name|// +---------------+----+// |[Columbus,Ohio]| Yin|// +---------------+----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Hive 表
Spark SQL 還支持讀取和寫入存儲在Apache Hive中的數據。 可是,因爲 Hive 具備大量依賴關係,所以這些依賴關係不包含在默認 Spark 分發中。 若是在類路徑中找到 Hive 依賴項,Spark 將自動加載它們。 請注意,這些 Hive 依賴關係也必須存在於全部工做節點上,由於它們將須要訪問 Hive 序列化和反序列化庫 (SerDes),以訪問存儲在 Hive 中的數據。
經過將hive-site.xml,core-site.xml(用於安全配置)和hdfs-site.xml(用於 HDFS 配置)文件放在conf/中來完成配置。
當使用 Hive 時,必須用 Hive 支持實例化SparkSession,包括鏈接到持續的 Hive 轉移,支持 Hive serdes 和 Hive 用戶定義的功能。 沒有現有 Hive 部署的用戶仍然能夠啓用 Hive 支持。 當hive-site.xml未配置時,上下文會自動在當前目錄中建立metastore_db,並建立由spark.sql.warehouse.dir配置的目錄,該目錄默認爲Spark應用程序當前目錄中的spark-warehouse目錄 開始了 請注意,自從2.0.0以來,hive-site.xml中的hive.metastore.warehouse.dir屬性已被棄用。 而是使用spark.sql.warehouse.dir來指定倉庫中數據庫的默認位置。 您可能須要向啓動 Spark 應用程序的用戶授予寫權限。å
importjava.io.Fileimportorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessioncaseclassRecord(key:Int,value:String)// warehouseLocation points to the default location for managed databases and tablesvalwarehouseLocation=newFile("spark-warehouse").getAbsolutePathvalspark=SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir",warehouseLocation).enableHiveSupport().getOrCreate()importspark.implicits._importspark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsql("SELECT * FROM src").show()// +---+-------+// |key| value|// +---+-------+// |238|val_238|// | 86| val_86|// |311|val_311|// ...// Aggregation queries are also supported.sql("SELECT COUNT(*) FROM src").show()// +--------+// |count(1)|// +--------+// | 500 |// +--------+// The results of SQL queries are themselves DataFrames and support all normal functions.valsqlDF=sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal.valstringsDS=sqlDF.map{caseRow(key:Int,value:String)=>s"Key:$key, Value:$value"}stringsDS.show()// +--------------------+// | value|// +--------------------+// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// |Key: 0, Value: val_0|// ...// You can also use DataFrames to create temporary views within a SparkSession.valrecordsDF=spark.createDataFrame((1to100).map(i=>Record(i,s"val_$i")))recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()// +---+------+---+------+// |key| value|key| value|// +---+------+---+------+// | 2| val_2| 2| val_2|// | 4| val_4| 4| val_4|// | 5| val_5| 5| val_5|// ...
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.
指定 Hive 表的存儲格式
建立 Hive 表時,須要定義如何 從/向 文件系統 read/write 數據,即 「輸入格式」 和 「輸出格式」。 您還須要定義該表如何將數據反序列化爲行,或將行序列化爲數據,即 「serde」。 如下選項可用於指定存儲格式 (「serde」, 「input format」, 「output format」),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默認狀況下,咱們將以純文本形式讀取表格文件。 請注意,Hive 存儲處理程序在建立表時不受支持,您可使用 Hive 端的存儲處理程序建立一個表,並使用 Spark SQL 來讀取它。
Property NameMeaning
fileFormatfileFormat是一種存儲格式規範的包,包括 "serde","input format" 和 "output format"。 目前咱們支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定爲字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,若是您已經指定了 "fileFormat" 選項,則沒法指定它們。
serde此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,若是給定的 `fileFormat` 已經包含 serde 的信息,那麼不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可使用這3個文件格式的這個選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim這些選項只能與 "textfile" 文件格式一塊兒使用。它們定義如何將分隔的文件讀入行。
使用OPTIONS定義的全部其餘屬性將被視爲 Hive serde 屬性。
與不一樣版本的 Hive Metastore 進行交互
Spark SQL 的 Hive 支持的最重要的部分之一是與 Hive metastore 進行交互,這使得 Spark SQL 可以訪問 Hive 表的元數據。 從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制構建可使用下面所述的配置來查詢不一樣版本的 Hive 轉移。 請注意,獨立於用於與轉移點通訊的 Hive 版本,內部 Spark SQL 將針對 Hive 1.2.1 進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。
如下選項可用於配置用於檢索元數據的 Hive 版本:
屬性名稱默認值含義
spark.sql.hive.metastore.version1.2.1Hive metastore 版本。 可用選項爲0.12.0至1.2.1。
spark.sql.hive.metastore.jarsbuiltin當啓用-Phive時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version 必須爲1.2.1或未定義。 行家 使用從Maven存儲庫下載的指定版本的Hive jar。 一般不建議在生產部署中使用此配置。 ***** 應用於實例化 HiveMetastoreClient 的 jar 的位置。該屬性能夠是三個選項之一:
builtin當啓用-Phive時,使用 Hive 1.2.1,它與 Spark 程序集捆綁在一塊兒。選擇此選項時,spark.sql.hive.metastore.version必須爲1.2.1或未定義。
maven使用從 Maven 存儲庫下載的指定版本的 Hive jar。一般不建議在生產部署中使用此配置。
JVM 的標準格式的 classpath。 該類路徑必須包含全部 Hive 及其依賴項,包括正確版本的 Hadoop。這些罐只須要存在於 driver 程序中,但若是您正在運行在 yarn 集羣模式,那麼您必須確保它們與應用程序一塊兒打包。
spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc使用逗號分隔的類前綴列表,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。 一個共享類的示例就是用來訪問 Hive metastore 的 JDBC driver。 其它須要共享的類,是須要與已經共享的類進行交互的。 例如,log4j 使用的自定義 appender。
spark.sql.hive.metastore.barrierPrefixes(empty)一個逗號分隔的類前綴列表,應該明確地爲 Spark SQL 正在通訊的 Hive 的每一個版本從新加載。 例如,在一般將被共享的前綴中聲明的 Hive UDF (即: �org.apache.spark.*)。
JDBC 鏈接其它數據庫
Spark SQL 還包括可使用 JDBC 從其餘數據庫讀取數據的數據源。此功能應優於使用JdbcRDD。 這是由於結果做爲 DataFrame 返回,而且能夠輕鬆地在 Spark SQL 中處理或與其餘數據源鏈接。 JDBC 數據源也更容易從 Java 或 Python 使用,由於它不須要用戶提供 ClassTag。(請注意,這不一樣於 Spark SQL JDBC 服務器,容許其餘應用程序使用 Spark SQL 運行查詢)。
要開始使用,您須要在 Spark 類路徑中包含特定數據庫的 JDBC driver 程序。 例如,要從 Spark Shell 鏈接到 postgres,您將運行如下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
可使用 Data Sources API 未來自遠程數據庫的表做爲 DataFrame 或 Spark SQL 臨時視圖進行加載。 用戶能夠在數據源選項中指定 JDBC 鏈接屬性。用戶和密碼一般做爲登陸數據源的鏈接屬性提供。 除了鏈接屬性外,Spark 還支持如下不區分大小寫的選項:
�屬性名稱含義
url要鏈接的JDBC URL。 源特定的鏈接屬性能夠在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable應該讀取的 JDBC 表。請注意,可使用在SQL查詢的FROM子句中有效的任何內容。 例如,您可使用括號中的子查詢代替完整表。
driver用於鏈接到此 URL 的 JDBC driver 程序的類名。
partitionColumn, lowerBound, upperBound若是指定了這些選項,則必須指定這些選項。 另外,必須指定numPartitions. 他們描述如何從多個 worker 並行讀取數據時將表給分區。partitionColumn必須是有問題的表中的數字列。 請注意,lowerBound和upperBound僅用於決定分區的大小,而不是用於過濾表中的行。 所以,表中的全部行將被分區並返回。此選項僅適用於讀操做。
numPartitions在表讀寫中能夠用於並行度的最大分區數。這也肯定併發JDBC鏈接的最大數量。 若是要寫入的分區數超過此限制,則在寫入以前經過調用coalesce(numPartitions)將其減小到此限制。
fetchsizeJDBC 抓取的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操做。
batchsizeJDBC 批處理的大小,用於肯定每次數據往返傳遞的行數。 這有利於提高 JDBC driver 的性能。 該選項僅適用於寫操做。默認值爲1000.
isolationLevel事務隔離級別,適用於當前鏈接。 它能夠是NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ, 或SERIALIZABLE之一,對應於 JDBC 鏈接對象定義的標準事務隔離級別,默認爲READ_UNCOMMITTED。 此選項僅適用於寫操做。請參考java.sql.Connection中的文檔。
truncate這是一個與 JDBC 相關的選項。 啓用SaveMode.Overwrite時,此選項會致使 Spark 截斷現有表,而不是刪除並從新建立。 這能夠更有效,而且防止表元數據(例如,索引)被移除。 可是,在某些狀況下,例如當新數據具備不一樣的模式時,它將沒法工做。 它默認爲false。 此選項僅適用於寫操做。
createTableOptions這是一個與JDBC相關的選項。 若是指定,此選項容許在建立表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)。此選項僅適用於寫操做。
createTableColumnTypes使用數據庫列數據類型而不是默認值,建立表時。 數據類型信息應以與 CREATE TABLE 列語法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的類型應該是有效的 spark sql 數據類型。此選項僅適用於寫操做。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods// Loading data from a JDBC sourcevaljdbcDF=spark.read.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").load()valconnectionProperties=newProperties()connectionProperties.put("user","username")connectionProperties.put("password","password")valjdbcDF2=spark.read.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Saving data to a JDBC sourcejdbcDF.write.format("jdbc").option("url","jdbc:postgresql:dbserver").option("dbtable","schema.tablename").option("user","username").option("password","password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)// Specifying create table column data types on writejdbcDF.write.option("createTableColumnTypes","name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver","schema.tablename",connectionProperties)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
故障排除
JDBC driver 程序類必須對客戶端會話和全部執行程序上的原始類加載器可見。 這是由於 Java 的 DriverManager 類執行安全檢查,致使它忽略原始類加載器不可見的全部 driver 程序,當打開鏈接時。一個方便的方法是修改全部工做節點上的compute_classpath.sh 以包含您的 driver 程序 JAR。
一些數據庫,例如 H2,將全部名稱轉換爲大寫。 您須要使用大寫字母來引用 Spark SQL 中的這些名稱。
性能調優
對於某些工做負載,能夠經過緩存內存中的數據或打開一些實驗選項來提升性能。
在內存中緩存數據
Spark SQL 能夠經過調用spark.catalog.cacheTable("tableName")或dataFrame.cache()來使用內存中的列格式來緩存表。 而後,Spark SQL 將只掃描所需的列,並將自動調整壓縮以最小化內存使用量和 GC 壓力。 您能夠調用spark.catalog.uncacheTable("tableName")從內存中刪除該表。
內存緩存的配置可使用SparkSession上的setConf方法或使用 SQL 運行SET key=value命令來完成。
屬性名稱默認含義
spark.sql.inMemoryColumnarStorage.compressedtrue當設置爲 true 時,Spark SQL 將根據數據的統計信息爲每一個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize10000控制批量的柱狀緩存的大小。更大的批量大小能夠提升內存利用率和壓縮率,可是在緩存數據時會冒出 OOM 風險。
其餘配置選項
如下選項也可用於調整查詢執行的性能。這些選項可能會在未來的版本中被廢棄,由於更多的優化是自動執行的。
屬性名稱默認值含義
spark.sql.files.maxPartitionBytes134217728 (128 MB)在讀取文件時,將單個分區打包的最大字節數。
spark.sql.files.openCostInBytes4194304 (4 MB)按照字節數來衡量的打開文件的估計費用能夠在同一時間進行掃描。 將多個文件放入分區時使用。最好過分估計,那麼具備小文件的分區將比具備較大文件的分區(首先計劃的)更快。
spark.sql.broadcastTimeout300廣播鏈接中的廣播等待時間超時(秒)
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置執行鏈接時將廣播給全部工做節點的表的最大大小(以字節爲單位)。 經過將此值設置爲-1能夠禁用廣播。 請注意,目前的統計信息僅支持 Hive Metastore 表,其中已運行命令ANALYZE TABLE COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions200Configures the number of partitions to use when shuffling data for joins or aggregations.
分佈式 SQL 引擎
Spark SQL 也能夠充當使用其 JDBC/ODBC 或命令行界面的分佈式查詢引擎。 在這種模式下,最終用戶或應用程序能夠直接與 Spark SQL 交互運行 SQL 查詢,而不須要編寫任何代碼。
運行 Thrift JDBC/ODBC 服務器
這裏實現的 Thrift JDBC/ODBC 服務器對應於 Hive 1.2 中的HiveServer2。 您可使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 服務器。
要啓動 JDBC/ODBC 服務器,請在 Spark 目錄中運行如下命令:
./sbin/start-thriftserver.sh
此腳本接受全部bin/spark-submit命令行選項,以及--hiveconf選項來指定 Hive 屬性。 您能夠運行./sbin/start-thriftserver.sh --help查看全部可用選項的完整列表。 默認狀況下,服務器監聽 localhost:10000. 您能夠經過環境變量覆蓋此行爲,即:
exportHIVE_SERVER2_THRIFT_PORT=exportHIVE_SERVER2_THRIFT_BIND_HOST=./sbin/start-thriftserver.sh\--master \...
or system properties:
./sbin/start-thriftserver.sh\--hiveconf hive.server2.thrift.port=\--hiveconf hive.server2.thrift.bind.host=\--master ...
如今,您可使用 beeline 來測試 Thrift JDBC/ODBC 服務器:
./bin/beeline
使用 beeline 方式鏈接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://localhost:10000
Beeline 將要求您輸入用戶名和密碼。 在非安全模式下,只需輸入機器上的用戶名和空白密碼便可。 對於安全模式,請按照beeline 文檔中的說明進行操做。
配置Hive是經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的。
您也可使用 Hive 附帶的 beeline 腳本。
Thrift JDBC 服務器還支持經過 HTTP 傳輸發送 thrift RPC 消息。 使用如下設置啓用 HTTP 模式做爲系統屬性或在conf/中的hive-site.xml文件中啓用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
要測試,請使用 beeline 以 http 模式鏈接到 JDBC/ODBC 服務器:
beeline> !connect jdbc:hive2://:/?hive.server2.transport.mode=http;hive.server2.thrift.http.path=
運行 Spark SQL CLI
Spark SQL CLI 是在本地模式下運行 Hive 轉移服務並執行從命令行輸入的查詢的方便工具。 請注意,Spark SQL CLI 不能與 Thrift JDBC 服務器通訊。
要啓動 Spark SQL CLI,請在 Spark 目錄中運行如下命令:
./bin/spark-sql
配置 Hive 是經過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf/中完成的。 您能夠運行./bin/spark-sql --help獲取全部可用選項的完整列表。
遷移指南
從 Spark SQL 2.1 升級到 2.2
Spark 2.1.1 介紹了一個新的配置 key:spark.sql.hive.caseSensitiveInferenceMode. 它的默認設置是NEVER_INFER, 其行爲與 2.1.0 保持一致. 可是,Spark 2.2.0 將此設置的默認值更改成 「INFER_AND_SAVE」,以恢復與底層文件 schema(模式)具備大小寫混合的列名稱的 Hive metastore 表的兼容性。使用INFER_AND_SAVE配置的 value, 在第一次訪問 Spark 將對其還沒有保存推測 schema(模式)的任何 Hive metastore 表執行 schema inference(模式推斷). 請注意,對於具備數千個 partitions(分區)的表,模式推斷多是很是耗時的操做。若是不兼容大小寫混合的列名,您能夠安全地將spark.sql.hive.caseSensitiveInferenceMode設置爲NEVER_INFER,以免模式推斷的初始開銷。請注意,使用新的默認INFER_AND_SAVE設置,模式推理的結果被保存爲 metastore key 以供未來使用。所以,初始模式推斷僅發生在表的第一次訪問。
從 Spark SQL 2.0 升級到 2.1
Datasource tables(數據源表)如今存儲了 Hive metastore 中的 partition metadata(分區元數據). 這意味着諸如ALTER TABLE PARTITION ... SET LOCATION這樣的 Hive DDLs 如今使用 Datasource API 可用於建立 tables(表).
遺留的數據源表能夠經過MSCK REPAIR TABLE命令遷移到這種格式。建議遷移遺留表利用 Hive DDL 的支持和提供的計劃性能。
要肯定表是否已遷移,當在表上發出DESCRIBE FORMATTED命令時請查找PartitionProvider: Catalog屬性.
Datasource tables(數據源表)的INSERT OVERWRITE TABLE ... PARTITION ...行爲的更改。
在之前的 Spark 版本中,INSERT OVERWRITE覆蓋了整個 Datasource table,即便給出一個指定的 partition. 如今只有匹配規範的 partition 被覆蓋。
請注意,這仍然與 Hive 表的行爲不一樣,Hive 表僅覆蓋與新插入數據重疊的分區。
從 Spark SQL 1.6 升級到 2.0
SparkSession如今是 Spark 新的切入點, 它替代了老的SQLContext和HiveContext。注意 : 爲了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。能夠從SparkSession獲取一個新的catalog接口 — 現有的訪問數據庫和表的 API,如listTables,createExternalTable,dropTempView,cacheTable都被移到該接口。
Dataset API 和 DataFrame API 進行了統一。在 Scala 中,DataFrame變成了Dataset[Row]類型的一個別名,而 Java API 使用者必須將DataFrame替換成Dataset。Dataset 類既提供了強類型轉換操做(如map,filter以及groupByKey)也提供了非強類型轉換操做(如select和groupBy)。因爲編譯期的類型安全不是 Python 和 R 語言的一個特性,Dataset 的概念並不適用於這些語言的 API。相反,DataFrame仍然是最基本的編程抽象, 就相似於這些語言中單節點 data frame 的概念。
Dataset 和 DataFrame API 中 unionAll 已通過時而且由union替代。
Dataset 和 DataFrame API 中 explode 已通過時,做爲選擇,能夠結合 select 或 flatMap 使用functions.explode()。
Dataset 和 DataFrame API 中registerTempTable已通過時而且由createOrReplaceTempView替代。
對 Hive tablesCREATE TABLE ... LOCATION行爲的更改.
從 Spark 2.0 開始,CREATE TABLE ... LOCATION與CREATE EXTERNAL TABLE ... LOCATION是相同的,以防止意外丟棄用戶提供的 locations(位置)中的現有數據。這意味着,在用戶指定位置的 Spark SQL 中建立的 Hive 表始終是 Hive 外部表。刪除外部表將不會刪除數據。 用戶不能指定 Hive managed tables(管理表)的位置. 請注意,這與Hive行爲不一樣。
所以,這些表上的 「DROP TABLE」 語句不會刪除數據。
從 Spark SQL 1.5 升級到 1.6
從 Spark 1.6 開始,默認狀況下服務器在多 session(會話)模式下運行。這意味着每一個 JDBC/ODBC 鏈接擁有一份本身的 SQL 配置和臨時函數註冊。緩存表仍在並共享。若是您但願以舊的單會話模式運行 Thrift server,請設置選項spark.sql.hive.thriftServer.singleSession爲true。您既能夠將此選項添加到spark-defaults.conf,或者經過--conf將它傳遞給start-thriftserver.sh。
./sbin/start-thriftserver.sh\--conf spark.sql.hive.thriftServer.singleSession=true\...
從 1.6.1 開始,在 sparkR 中 withColumn 方法支持添加一個新列或更換 DataFrame 同名的現有列。
從 Spark 1.6 開始,LongType 強制轉換爲 TimestampType 指望是秒,而不是微秒。這種更改是爲了匹配 Hive 1.2 的行爲,以便從 numeric(數值)類型進行更一致的類型轉換到 TimestampType。更多詳情請參閱SPARK-11724。
從 Spark SQL 1.4 升級到 1.5
使用手動管理的內存優化執行,如今是默認啓用的,以及代碼生成表達式求值。這些功能既能夠經過設置spark.sql.tungsten.enabled爲false來禁止使用。
Parquet 的模式合併默認狀況下再也不啓用。它能夠經過設置spark.sql.parquet.mergeSchema到true以從新啓用。
字符串在 Python 列的 columns(列)如今支持使用點(.)來限定列或訪問嵌套值。例如df['table.column.nestedField']。可是,這意味着若是你的列名中包含任何圓點,你如今必須避免使用反引號(如table.column.with.dots.nested)。
在內存中的列存儲分區修剪默認是開啓的。它能夠經過設置spark.sql.inMemoryColumnarStorage.partitionPruning爲false來禁用。
無限精度的小數列再也不支持,而不是 Spark SQL 最大精度爲 38 。當從BigDecimal對象推斷模式時,如今使用(38,18)。在 DDL 沒有指定精度時,則默認保留Decimal(10, 0)。
時間戳如今存儲在 1 微秒的精度,而不是 1 納秒的。
在 sql 語句中,floating point(浮點數)如今解析爲 decimal。HiveQL 解析保持不變。
SQL / DataFrame 函數的規範名稱如今是小寫(例如 sum vs SUM)。
JSON 數據源不會自動加載由其餘應用程序(未經過 Spark SQL 插入到數據集的文件)建立的新文件。對於 JSON 持久表(即表的元數據存儲在 Hive Metastore),用戶可使用REFRESH TABLESQL 命令或HiveContext的refreshTable方法,把那些新文件列入到表中。對於表明一個 JSON dataset 的 DataFrame,用戶須要從新建立 DataFrame,同時 DataFrame 中將包括新的文件。
PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替換現有的同名列。
從 Spark SQL 1.3 升級到 1.4
DataFrame data reader/writer interface
基於用戶反饋,咱們建立了一個新的更流暢的 API,用於讀取 (SQLContext.read) 中的數據並寫入數據 (DataFrame.write), 而且舊的 API 將過期(例如,SQLContext.parquetFile,SQLContext.jsonFile).
針對SQLContext.read(Scala,Java,Python) 和DataFrame.write(Scala,Java,Python) 的更多細節,請看 API 文檔.
DataFrame.groupBy 保留 grouping columns(分組的列)
根據用戶的反饋, 咱們更改了DataFrame.groupBy().agg()的默認行爲以保留DataFrame結果中的 grouping columns(分組列). 爲了在 1.3 中保持該行爲,請設置spark.sql.retainGroupColumns爲false.
// In 1.3.x, in order for the grouping column "department" to show up,// it must be included explicitly as part of the agg function call.df.groupBy("department").agg($"department",max("age"),sum("expense"))// In 1.4+, grouping column "department" is included automatically.df.groupBy("department").agg(max("age"),sum("expense"))// Revert to 1.3 behavior (not retaining grouping column) by:sqlContext.setConf("spark.sql.retainGroupColumns","false")
DataFrame.withColumn 上的行爲更改
以前 1.4 版本中,DataFrame.withColumn() 只支持添加列。該列將始終在 DateFrame 結果中被加入做爲新的列,即便現有的列可能存在相同的名稱。從 1.4 版本開始,DataFrame.withColumn() 支持添加與全部現有列的名稱不一樣的列或替換現有的同名列。
請注意,這一變化僅適用於 Scala API,並不適用於 PySpark 和 SparkR。
從 Spark SQL 1.0-1.2 升級到 1.3
在 Spark 1.3 中,咱們從 Spark SQL 中刪除了 「Alpha」 的標籤,做爲一部分已經清理過的可用的 API 。從 Spark 1.3 版本以上,Spark SQL 將提供在 1.X 系列的其餘版本的二進制兼容性。這種兼容性保證不包括被明確標記爲不穩定的(即 DeveloperApi 類或 Experimental) API。
重命名 DataFrame 的 SchemaRDD
升級到 Spark SQL 1.3 版本時,用戶會發現最大的變化是,SchemaRDD已改名爲DataFrame。這主要是由於 DataFrames 再也不從 RDD 直接繼承,而是由 RDDS 本身來實現這些功能。DataFrames 仍然能夠經過調用.rdd方法轉換爲 RDDS 。
在 Scala 中,有一個從SchemaRDD到DataFrame類型別名,能夠爲一些狀況提供源代碼兼容性。它仍然建議用戶更新他們的代碼以使用DataFrame來代替。Java 和 Python 用戶須要更新他們的代碼。
Java 和 Scala APIs 的統一
此前 Spark 1.3 有單獨的Java兼容類(JavaSQLContext和JavaSchemaRDD),借鑑於 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已經統一。兩種語言的用戶可使用SQLContext和DataFrame。通常來講論文類嘗試使用兩種語言的共有類型(如Array替代了一些特定集合)。在某些狀況下不通用的類型狀況下,(例如,passing in closures 或 Maps)使用函數重載代替。
此外,該 Java 的特定類型的 API 已被刪除。Scala 和 Java 的用戶可使用存在於org.apache.spark.sql.types類來描述編程模式。
隔離隱式轉換和刪除 dsl 包(僅Scala)
許多 Spark 1.3 版本之前的代碼示例都以import sqlContext._開始,這提供了從 sqlContext 範圍的全部功能。在 Spark 1.3 中,咱們移除了從RDDs 到DateFrame再到SQLContext內部對象的隱式轉換。用戶如今應該寫成import sqlContext.implicits._.
此外,隱式轉換如今只能使用方法toDF來增長由Product(即 case classes or tuples)構成的RDD,而不是自動應用。
當使用 DSL 內部的函數時(如今使用DataFrameAPI 來替換), 用戶習慣導入org.apache.spark.sql.catalyst.dsl. 相反,應該使用公共的 dataframe 函數 API:import org.apache.spark.sql.functions._.
針對 DataType 刪除在 org.apache.spark.sql 包中的一些類型別名(僅限於 Scala)
Spark 1.3 移除存在於基本 SQL 包的DataType類型別名。開發人員應改成導入類org.apache.spark.sql.types。
UDF 註冊遷移到sqlContext.udf中 (Java & Scala)
用於註冊 UDF 的函數,無論是 DataFrame DSL 仍是 SQL 中用到的,都被遷移到SQLContext中的 udf 對象中。
sqlContext.udf.register("strLen",(s:String)=>s.length())
Python UDF 註冊保持不變。
Python DataTypes 再也不是 Singletons(單例的)
在 Python 中使用 DataTypes 時,你須要先構造它們(如:StringType()),而不是引用一個單例對象。
與 Apache Hive 的兼容
Spark SQL 在設計時就考慮到了和 Hive metastore,SerDes 以及 UDF 之間的兼容性。目前 Hive SerDes 和 UDF 都是基於 Hive 1.2.1 版本,而且Spark SQL 能夠鏈接到不一樣版本的Hive metastore(從 0.12.0 到 1.2.1,能夠參考與不一樣版本的 Hive Metastore 交互)
在現有的 Hive Warehouses 中部署
Spark SQL Thrift JDBC server 採用了開箱即用的設計以兼容已有的 Hive 安裝版本。你不須要修改現有的 Hive Metastore , 或者改變數據的位置和表的分區。
所支持的 Hive 特性
Spark SQL 支持絕大部分的 Hive 功能,如:
Hive query(查詢)語句, 包括:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
全部 Hive 操做, 包括:
關係運算符 (=,⇔,==,<>,<,>,>=,<=, 等等)
算術運算符 (+,-,*,/,%, 等等)
邏輯運算符 (AND,&&,OR,||, 等等)
複雜類型的構造
數學函數 (sign,ln,cos, 等等)
String 函數 (instr,length,printf, 等等)
用戶定義函數 (UDF)
用戶定義聚合函數 (UDAF)
用戶定義 serialization formats (SerDes)
窗口函數
Joins
JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
Unions
Sub-queries(子查詢)
SELECT col FROM ( SELECT a + b AS col from t1) t2
Sampling
Explain
Partitioned tables including dynamic partition insertion
View
全部的 Hive DDL 函數, 包括:
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
大部分的 Hive Data types(數據類型), 包括:
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
未支持的 Hive 函數
如下是目前還不支持的 Hive 函數列表。在 Hive 部署中這些功能大部分都用不到。
主要的 Hive 功能
Tables 使用 buckets 的 Tables: bucket 是 Hive table partition 中的 hash partitioning. Spark SQL 還不支持 buckets.
Esoteric Hive 功能
UNION類型
Unique join
Column 統計信息的收集: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.
Hive Input/Output Formats
File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
Hadoop archive
Hive 優化
有少數 Hive 優化尚未包含在 Spark 中。其中一些(好比 indexes 索引)因爲 Spark SQL 的這種內存計算模型而顯得不那麼重要。另一些在 Spark SQL 將來的版本中會持續跟蹤。
Block 級別的 bitmap indexes 和虛擬 columns (用於構建 indexes)
自動爲 join 和 groupBy 計算 reducer 個數 : 目前在 Spark SQL 中, 你須要使用 「SET spark.sql.shuffle.partitions=[num_tasks];」 來控制 post-shuffle 的並行度.
僅 Meta-data 的 query: 對於只使用 metadata 就能回答的查詢,Spark SQL 仍然會啓動計算結果的任務.
Skew data flag: Spark SQL 不遵循 Hive 中 skew 數據的標記.
STREAMTABLEhint in join: Spark SQL 不遵循STREAMTABLEhint.
對於查詢結果合併多個小文件: 若是輸出的結果包括多個小文件, Hive 能夠可選的合併小文件到一些大文件中去,以免溢出 HDFS metadata. Spark SQL 還不支持這樣.
參考
數據類型
Spark SQL 和 DataFrames 支持下面的數據類型:
Numeric types
ByteType: Represents 1-byte signed integer numbers. The range of numbers is from-128to127.
ShortType: Represents 2-byte signed integer numbers. The range of numbers is from-32768to32767.
IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from-2147483648to2147483647.
LongType: Represents 8-byte signed integer numbers. The range of numbers is from-9223372036854775808to9223372036854775807.
FloatType: Represents 4-byte single-precision floating point numbers.
DoubleType: Represents 8-byte double-precision floating point numbers.
DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally byjava.math.BigDecimal. ABigDecimalconsists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
String type
StringType: Represents character string values.
Binary type
BinaryType: Represents byte sequence values.
Boolean type
BooleanType: Represents boolean values.
Datetime type
TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
DateType: Represents values comprising values of fields year, month, day.
Complex types
ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type ofelementType.containsNullis used to indicate if elements in aArrayTypevalue can havenullvalues.
MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described bykeyTypeand the data type of values are described byvalueType. For aMapTypevalue, keys are not allowed to havenullvalues.valueContainsNullis used to indicate if values of aMapTypevalue can havenullvalues.
StructType(fields): Represents values with the structure described by a sequence ofStructFields (fields).
StructField(name, dataType, nullable): Represents a field in aStructType. The name of a field is indicated byname. The data type of a field is indicated bydataType.nullableis used to indicate if values of this fields can havenullvalues.
Spark SQL 的全部數據類型都在包org.apache.spark.sql.types中. 你能夠用下示例示例來訪問它們.
importorg.apache.spark.sql.types._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Data type(數據類型)Scala 中的 Value 類型訪問或建立數據類型的 API
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayTypescala.collection.SeqArrayType(elementType, [containsNull])
Note(注意):containsNull的默認值是true.
MapTypescala.collection.MapMapType(keyType,valueType, [valueContainsNull])
Note(注意):valueContainsNull的默認值是true.
StructTypeorg.apache.spark.sql.RowStructType(fields)
Note(注意):fields是 StructFields 的 Seq. 全部, 兩個 fields 擁有相同的名稱是不被容許的.
StructField該 field(字段)數據類型的 Scala 中的 value 類型 (例如, 數據類型爲 IntegerType 的 StructField 是 Int)StructField(name,dataType, [nullable])
Note:nullable的默認值是true.
NaN Semantics
當處理一些不符合標準浮點數語義的float或double類型時,對於 Not-a-Number(NaN) 須要作一些特殊處理. 具體以下:
NaN = NaN 返回 true.
在 aggregations(聚合)操做中,全部的 NaN values 將被分到同一個組中.
在 join key 中 NaN 能夠當作一個普通的值.
NaN 值在升序排序中排到最後,比任何其餘數值都大.
原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sql-programming-guide.html
網頁地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(以爲不錯麻煩給個 Star,謝謝!~)