Spark SQL和DataFrame指南[中]

翻譯自: http://spark.apache.org/docs/1.3.0/sql-programming-guide.htmlhtml

概述(Overview)

Spark SQL是Spark的一個模塊,用於結構化數據處理。它提供了一個編程的抽象被稱爲DataFrames,也能夠做爲分佈式SQL查詢引擎。java

DataFrames

DataFrame是一種以命名列方式組織的分佈式數據集。它概念上至關於關係型數據庫中的表,或者R/Python中的數據幀,可是具備更豐富的優化。有不少方式能夠構造出一個DataFrame,例如:結構化數據文件,Hive中的tables,外部數據庫或者存在的RDDs。node

DataFrame的API適用於Scala、Java和Python。sql

該頁上全部的例子使用Spark分佈式中的樣本數據,能夠運行在spark-shell或者pyspark shell中。shell

入口點: SQLContext

Spark SQL中全部功能的入口點是SQLContext類,或者它子類中的一個。爲了建立一個基本的SQLContext,你所須要的是一個SparkContext。數據庫

 

val sc: SparkContext // An existing SparkContext.express

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.apache

import sqlContext.implicits._編程

 

除了基本的SQLContext,你還能夠建立一個HiveContext,它提供了基本的SQLContext的所提供的功能的超集。這些功能中包括附加的特性,能夠編寫查詢,使用更徹底的HiveQL解析器,訪問Hive UDFs,可以從Hive表中讀取數據。想要使用HiveContext,你不須要有一個存在的Hive步驟,而且全部SQLContext可用的數據源仍舊可用。HiveContext只是單獨打包,以免包含默認Spark build中的全部Hive依賴。若是這些依賴對於你的應用不是一個問題,那麼推薦使用Spark 1.3版本的HiveContext。json

使用spark.sql.dialect選項,能夠選擇SQL的具體變種,用它來解析查詢。這個參數可使用SQLContext上的setConf方法或者在SQL中使用一組key=value命令。對於SQLContext,惟一能夠的dialect是「sql」,它可使用SparkSQL提供的一個簡單的SQL解析器。在HiveContext中,默認的是「hiveql」,儘管「sql」也是可用的。由於HiveOL解析器更加完整,在大多數狀況下, 推薦使用這個。

建立DataFrames

使用SQLContext,應用能夠從一個已經存在的RDD、Hive表或者數據源中建立DataFrames。

例如,如下根據一個JSON文件建立出一個DataFrame:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create the DataFrame

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")// Show the content of the DataFrame

df.show()

// age  name

// null Michael

// 30   Andy

// 19   Justin

 // Print the schema in a tree format

df.printSchema()

// root// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

 // Select only the "name" column

df.select("name").show()

// name

// Michael

// Andy

// Justin

 // Select everybody, but increment the age by 1

// df.select("name", df("age") + 1).show() //官方文檔這樣的,可是測試時發現這樣編譯不經過。下面的形式能夠

df.select(df("name"),df("age")+1).show()

// name    (age + 1)

// Michael null

// Andy    31

// Justin  20

 // Select people older than 21

df.filter(df("age") > 21).show()

// age name

// 30  Andy

 // Count people by age

df.groupBy("age").count().show()

// age  count

// null 1

// 19   1

// 30   1

以編程方式運行SQL查詢

SQLContext中的sql函數使應用能夠以編程方式運行SQL查詢,而且將結果以DataFrame形式返回。

val sqlContext = ...  // An existing SQLContext

val df = sqlContext.sql("SELECT * FROM table")

RRDs之間的互操做(Interoperating with RDDs)

Spark SQL支持兩種不一樣的方法,用於將存在的RDDs轉換成DataFrames。第一種方法使用反射來推斷包含特定類型的對象的RDD的模式。在寫Spark應用時,當你已知schema的狀況下,這種基於反射的方式使得代碼更加簡介,而且效果更好。

建立DataFrames的第二種方法是經過編程接口,它容許你構建一個模式,而後將其應用到現有的RDD上。這種方式更加的繁瑣,它容許你構建一個DataFrame當列以及類型未知,直到運行時才能知道時。

使用反射推斷模式

Spark SQL中的Scala接口支持自動地將包含case類的RDD轉換成DataFrame。case類定義了表的模式,case類的參數的名稱使用反射來讀取,而後稱爲列的名稱。case類還能夠嵌套或者包含複雜的類型,例如Sequences或者Arrays。這個RDD能夠隱式地轉換爲DataFrame,而後註冊成表,表能夠在後續SQL語句中使用

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

一、使用case類定義schema

二、建立一個SQLContext

三、導入sqlContext.implicits._,用於隱式地將RDD轉換成DataFrame

四、建立一個DataFrame,並將它註冊成表。

五、使用sqlContext提供的sql方法,就可使用SQL語句來查詢了。查詢後返回的結果是DataFrame,它支持全部的RDD操做

以編程方式指定模式

當case類不能提早定義時(例如,記錄的結構被編碼在一個String中,或者不一樣的用戶會將文本數據集和字段進行不一樣的解析和投影),DataFrame可使用如下三步,以編程的方式實現:

1.Create an RDD of Rows from the original RDD;

2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

3.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

1.從原有的RDD中建立行的RDD。

2.建立一個由StructType表示的模式,StructType符合由步驟1建立的RDD的行的結構。

3.經過SQLContext提供的createDataFrame方法,將模式應用於行的RDD。

For example:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a string

val schemaString = "name age"// Import Spark SQL data types and Row.

import org.apache.spark.sql._// Generate the schema based on the string of schema

val schema =  StructType(    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

results.map(t => "Name: " + t(0)).collect().foreach(println)

 

-----------------------------------------------------------

    //my code

    import org.apache.spark._

    import org.apache.spark.sql._

    import org.apache.spark.sql.types.{StructType, StructField, StringType}

    val conf = new SparkConf().setMaster("local").setAppName("XX")

    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val schemaString = "fullName age"

    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

    val rowRDD = sc.textFile("data/people.txt").map(_.split(" ")).map(p=> Row(p(0),p(1).trim))

    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    peopleDataFrame.registerTempTable("people")

    val young = sqlContext.sql("select * from people where age <25")

    young.show()

數據源(Data Sources)

Spark SQL支持經過DataFrame接口在多種數據源上進行操做。一個DataFrame能夠如同一個標準的RDDs那樣進行操做,還能夠註冊成臨時的表。將一個DataFrame註冊成臨時表容許你在它的數據上運行SQL查詢。本節介紹使用Spark數據源裝載和保存數據的經常使用方法,使用Spark數據源保存數據。而後進入可用於內置數據源的特定選項。

通用的加載/保存功能

在最簡單的形式中,默認的數據源(parquet除非經過spark.sql.sources.default另外進行配置)將被用於全部的操做。

val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")

手動指定選項

你還能夠手動指定數據源,這些數據源將與任何額外的選項一同使用,你但願將這些選項傳入到數據源中。數據源是經過它們的全名來指定的(如org.apache.spark.sql.parquet),可是對於內置的數據源,你也可使用簡短的名稱(json, parquet, jdbc)。任何類型的DataFrames使用這些語法能夠轉化成其餘的數據源:

val df = sqlContext.load("people.json", "json")

df.select("name", "age").save("namesAndAges.parquet", "parquet")

保存模式

Save操做能夠可選擇性地接收一個SaveModel,若是數據已經存在了,指定如何處理已經存在的數據。意識到這些保存模式沒有利用任何鎖,也不是原子的,這很重要。所以,若是有多個寫入者試圖往同一個地方寫入,這是不安全的。此外,當執行一個Overwrite,在寫入新的數據以前會將原來的數據進行刪除。

 

Scala/Java

Python

Meaning

SaveMode.ErrorIfExists (default)

"error" (default)

When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 

當往一個數據源中保存一個DataFrame,若是數據已經存在,會拋出一個異常。

SaveMode.Append

"append"

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 

當往一個數據源中保存一個DataFrame,若是data/table已經存在,DataFrame的內容會追加到已經存在的數據後面。

SaveMode.Overwrite

"overwrite"

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. 

Overwrite模式意味着當向數據源中保存一個DataFrame時,若是data/table已經存在了,已經存在的數據會被DataFrame中內容覆蓋掉。

SaveMode.Ignore

"ignore"

Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. 

Ignore模式意味着當向數據源中保存一個DataFrame時,若是數據已經存在,save操做不會將DataFrame的內容進行保存,也不會修改已經存在的數據。這與SQL中的`CREATE TABLE IF NOT EXISTS`類似。

 

保存爲持久化表

當與HiveContext一塊兒工做時,DataFrames也可使用saveAsTable命令保存爲持久化的表。不像registerTempTable命令,saveAsTable會將DataFrame的內容進行物化,而且在HiveMetastore中建立一個指向數據的指針。持久化表會仍舊存在即便你的Spark程序從新啓動。只要你保持鏈接到相同的元存儲( metastore)。一個持久化表的DataFrame能夠經過調用SQLContext上的帶有表的名稱的table方法來建立。

默認狀況下,saveAsTable會建立一個「管理表(managed table)」,意味着元存儲控制數據的位置。當一個表被刪除後,managed table會自動地刪除它們的數據。

Parquet Files

Parquet 是一種柱狀的格式,被許多其餘數據處理系統所支持。Spark SQL支持度對Parquet文件的讀和寫,自動保存原有數據的模式。

以編程方式加載數據

Loading Data Programmatically

Using the data from the above example:

使用上面例子中的數據:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._val people: RDD[Person] = ...

 // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分區發現

在系統中,如Hive,使用表分區是一個常見的優化途徑。在一個分區表中,數據常常存儲在不一樣的目錄中,對每個分區目錄中的路徑中,對分區列的值進行編碼。Parquet數據源如今能夠自動地發現而且推斷出分區的信息。例如,咱們能夠將以前使用的人口數據存儲成下列目錄結構的分區表,兩個額外的列,gender和country做爲分區列:

path└── to    └── table        ├── gender=male        │   ├── ...        │   │        │   ├── country=US        │   │   └── data.parquet        │   ├── country=CN        │   │   └── data.parquet        │   └── ...        └── gender=female            ├── ...            │            ├── country=US            │   └── data.parquet            ├── country=CN            │   └── data.parquet            └── ...

經過向SQLContext.parquetFile或者 SQLContext.load中傳入path/to/table,Spark SQL會自動地從路徑中提取分區信息。如今返回的DataFrame模式變成:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

注意到分區列的數據類型自動被推斷出來。目前支持數字的數據類型和string類型。

模式合併

像ProtocolBuffer, Avro和Thrift那樣,Parquet還支持模式演化。用戶能夠從一個簡單的模式開始,而且根據須要逐漸地向模式中添加更多的列。這樣,用戶最終可能會有多個不一樣可是具備相互兼容的模式的Parquet文件。Parquet數據源如今能夠自動地發現這種狀況,而且將全部這些文件的模式進行合併。

// sqlContext from the previous example is used in this example

.// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directory

val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.saveAsParquetFile("data/test_table/key=1")// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.saveAsParquetFile("data/test_table/key=2")// Read the partitioned table

val df3 = sqlContext.parquetFile("data/test_table")

df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together

// with the partiioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

配置

Parquet的配置可使用SQLContext的setConf來設置或者經過使用SQL運行SET key=value命令

Property Name

Default

Meaning

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

 其餘的一些產生Parquet的系統,特別是Impala和SparkSQL的老版本,當將Parquet模式寫出時不會區分二進制數據和字符串。這個標誌告訴Spark SQL將二進制數據解析成字符串,以提供對這些系統的兼容。

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. 

其餘的一些產生Parquet的系統,特別是Impala,將時間戳存儲爲INT96的形式。Spark也將時間戳存儲爲INT96,由於咱們要避免納秒級字段的精度的損失。這個標誌告訴Spark SQL將INT96數據解析爲一個時間戳,以提供對這些系統的兼容。

spark.sql.parquet.cacheMetadata

true

Turns on caching of Parquet schema metadata. Can speed up querying of static data. 

打開Parquet模式的元數據的緩存。可以加快對靜態數據的查詢。

spark.sql.parquet.compression.codec

gzip

Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. 

設置壓縮編碼解碼器,當寫入一個Parquet文件時。可接收的值包括:uncompressed, snappy, gzip, lzo

spark.sql.parquet.filterPushdown

false

Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known bug in Paruet 1.6.0rc3 (PARQUET-136). However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn this feature on. 

打開Parquet過濾器的後進先出存儲的優化。這個功能默認是被關閉的,由於一個Parquet中的一個已知的bug 1.6.0rc3 (PARQUET-136)。然而,若是你的表中不包含任何的可爲空的(nullable)字符串或者二進制列,那麼打開這個功能是安全的。

spark.sql.hive.convertMetastoreParquet

true

When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support. 

當設置成false,Spark SQL會爲parquet表使用Hive SerDe(Serialize/Deserilize)而不是內置的支持。

 

JSON數據集

Spark SQL能夠自動推斷出JSON數據集的模式,將它做爲DataFrame進行加載。這個轉換能夠經過使用SQLContext中的下面兩個方法中的任意一個來完成。

• jsonFile - 從一個JSON文件的目錄中加載數據,文件中的每個行都是一個JSON對象。

• jsonRDD - 從一個已經存在的RDD中加載數據,每個RDD的元素是一個包含一個JSON對象的字符串。

注意,做爲jsonFile提供deep文件不是一個典型的JSON文件。每一行必須包含一個分開的獨立的有效JSON對象。所以,常規的多行JSON文件一般會失敗。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

// Create a DataFrame from the file(s) pointed to by path

val people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

//  |-- age: integer (nullable = true)

//  |-- name: string (nullable = true)// Register this DataFrame as a table.

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive表

Spark SQL還支持對存儲在Apache Hive中的數據的讀和寫。可是,由於Hive有大量的依賴,它不包含在默認的Spark assembly中。對Hive的支持是經過在Spark的build中添加 -Phive 和 -Phive-thriftserver 標誌來完成。這個命令構建了一個新的assembly jar ,它包含Hive。注意,這個HIve assembly jar還必須出如今全部的worker節點,由於爲了訪問存儲在Hive中的數據,它們會訪問Hive的序列化和反序列化庫(SerDes) 。

Hive的配置是經過將你的hive-site.xml文件放到conf/下來完成。

當使用Hive時,必須構建一個HiveContext,它繼承自SQLContext,對尋找MetaStore中的表和使用HiveQL編寫查詢提供了支持。沒有部署Hive的用戶也能夠建立一個HiveContext。當沒有經過hive-site.xml進行配置,context會在當前目錄下自動建立一個metastore_db和warehouse。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC To Other Databases

Spark SQL還包含了一個數據源,它可使用JDBC從其餘數據庫中讀取數據。這個功能應該優先使用JdbcRDD。這是由於結果是做爲DataFrame返回的,而且在Spark SQL中能夠簡單的被使用或加入其餘數據源。JDBC數據源也可以輕鬆地被Java或者Python來使用,由於它不須要用戶提供一個ClassTag(注意,這不一樣於Spark SQL JDBC服務,它容許其餘應用使用Spark SQL運行查詢)。

要開始使用時,你須要爲你的特定數據塊在Spark的類路徑中添加JDBC驅動。例如,從Spark Shell中鏈接到postgres,你須要運行如下的命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

經過使用數據源的API,能夠將遠程數據庫中的表加載爲DataFrame或者Spark SQL 臨時表。如下的選項是被支持的:

Property Name

Meaning

url

The JDBC URL to connect to. 

要鏈接到的JDBC URL

dbtable

The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. 

須要讀取的JDBC表。注意,SQL查詢中的「From」子句中的任何部分都是可使用的。例如,你能夠在括號中使用子查詢,而不是一個完整的表。

driver

The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. 

須要鏈接到的URL的JDBC驅動的類名。這個類會在運行一個JDBC命令以前被加載到master和workers上,容許驅動註冊本身和JDBC子系統。

partitionColumn, lowerBound, upperBound, numPartitions

These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. 

若是這些選項中的一個被指定了,那麼全部的選項必須被指定。它們描述了當從多個workers上並行讀取時如何進行分區。partitionColumn必須是表中的一個數字列。

 

val jdbcDF = sqlContext.load("jdbc", Map(  "url" -> "jdbc:postgresql:dbserver",  "dbtable" -> "schema.tablename"))

故障排除

* 在客戶端session以及全部executors上,JDBC驅動類必須對原來的類加載器是可見的。這是由於Java的DriverManager 類進行安全檢查,這致使打開一個鏈接時,它會忽略全部對於原始來加載器不可見的驅動。一個方便的方法是在全部的worker節點上修改compute_classpath.sh以包含驅動的JARs。

* 一些數據庫,例如H2,將全部的名稱轉化成大寫的。在Spark SQL中你須要使用大寫來指定那些名稱。

性能調節(Performance Tuning)

對於一些工做負載來講,經過將數據緩存到內存中或者打開一些實驗性的選項,可能會提升性能。

在內存中緩存數據

Spark SQL可使用內存中的柱狀格式來對錶進行緩存,經過調用sqlContext.cacheTable("tableName") 或者rdataFrame.cache()。而後Spark SQL會掃描僅僅須要的列以及自動地調節壓縮,以減小內存使用和GC壓力。你能夠調用sqlContext.uncacheTable("tableName") 來將表從內存中移除。內存緩存的配置可使用SQLContext上的setConf或者經過使用SQL執行SET key=value命令的方式來完成。

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. 

當設置爲true時,SparkSQL會根據統計出的各項數據爲每個列選擇一種壓縮編解碼器。

spark.sql.inMemoryColumnarStorage.batchSize

10000

Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. 

爲列狀緩存控制批處理大小。較大的批大小能夠提升內存的利用率和壓縮,可是緩存數據時有OOMs的風險。

其餘的配置選項

如下選項也能夠用來調節執行查詢時的性能。隨着更多的優化被自動地執行,這些選項有可能會在未來的版本中被棄用,

Property Name

Default

Meaning

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. 

spark.sql.codegen

false

When true, code will be dynamically generated at runtime for expression evaluation in a specific query. For some queries with complicated expression this option can lead to significant speed-ups. However, for simple queries this can actually slow down query execution. 

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations. 

分佈式SQL引擎

Spark SQL也可使用它的JDBC/ODBC或者命令行接口做爲分佈式查詢引擎。以這種模式,終端用戶或者應用能夠直接與Spark SQL交互來運行SQL查詢,而不須要寫任何代碼。

運行Thrift JDBC/ODBC 服務

這裏實現的Thrift JDBC/ODBC至關於Hive0.13中的HiveServer2.你能夠用Spark或者Hive0.13自帶的beeline腳原本測試JDBC服務。

在Spark目錄中運行下面的命令來啓動JDBC/ODBC server:

./sbin/start-thriftserver.sh

該腳本接受全部的bin/spark-submit命令行選項,外加一個--hiveconf選項來指定Hive屬性。可能執行./sbin/start-thriftserver.sh --help來查看完整的可用選項列表。默認狀況下,該服務會監聽localhost:10000。你能夠經過任一環境變量覆蓋這個bahaviour,也就是:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>

export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>./sbin/start-thriftserver.sh \  --master <master-uri> \  ...

或者系統屬性

./sbin/start-thriftserver.sh \  --hiveconf hive.server2.thrift.port=<listening-port> \  --hiveconf hive.server2.thrift.bind.host=<listening-host> \  --master <master-uri>  ...

如今你可使用beeline來測試Thrift JDBC/ODBC服務:

./bin/beeline

使用beeline來鏈接JDBC/ODBC:

beeline> !connect jdbc:hive2://localhost:10000

Beeline會向你詢問用戶名和密碼。在非安全模式下,僅僅輸入你機子的用戶名以及一個空白的密碼。對於安全模式,請按照beeline文檔給出的指示。

經過將hive-site.xml文件放到conf/下來完成Hive的配置。

你也可使用Hive自帶的beeline腳本。

Thrift JDBC服務還支持經過HTTP傳輸發送Thrift RPC消息。使用下列設置做爲系統屬性或者經過conf/中hive-site.xml文件來啓用HTTP模式:

hive.server2.transport.mode - Set this to value: httphive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001hive.server2.http.endpoint - HTTP endpoint; default is cliservice

爲了測試,使用beeline以http模式鏈接JDBC/ODBC服務:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

運行Spark SQL CLI

Spark SQL CLI是一種方便的工具用來以本地模式運行Hive metastore服務,而且從命令行輸入執行查詢。注意,Spark SQL CLI不能和Thrift JDBC服務進行會話。

在Spark目錄下,執行如下命令來啓動Spark SQL CLI:

./bin/spark-sql

Hive的配置已經完成,經過將 hive-site.xml 文件放入conf/中。你能夠運行 ./bin/spark-sql --help來查看完整的可用選項的列表。

遷移指南

從Spark SQL1.0-1.2升級到1.3Upgrading from Spark SQL 1.0-1.2 to 1.3。

在Spark 1.3中,咱們從Spark SQL中移除了「Alpha」標籤,做爲其中的一部分,對可用的APIs進行了清理。從Spark 1.3之後,Spark SQL會與1.X系列中的其餘版本提供二進制兼容。這種兼容性保證不包括的APIs明確地標記爲不穩定。(也就是 DeveloperAPI 或者rExperimental)

重命名SchemaRDD爲DataFrame

當升級到Spark SQL 1.3後,用戶能夠發現最大的改變是SchemaRDD重命名爲DataFrame。這主要是由於DataFrame再也不直接繼承自RDD。而是本身實現了RDDs提供的大多數功能。DataFrames仍舊能夠經過調用.rdd方法來轉化成RDDs。

Scala中有一種類型別名,從 SchemaRDD到DataFrame爲一些用例提供了源的兼容性。但仍是建議用戶更新它們的代碼來使用DataFrame。Java和Python 用戶須要更新它們的代碼。

Java和Scala APIs的統一

Spark1.3以前,有單獨的Java兼容性類(JavaSQLContext 和 JavaSchemaRDD)映射成Scala API。在Spark1.3中,Java API和Scala API進行了統一。任意語言的用戶應該使用SQLContext和DataFrame。一般,這些類會嘗試使用兩種語言中均可用的類型(即,Array而不是語言特定的集合)。在有些狀況下,若是沒有相同的類型存在,則會使用函數重載來替代。

此外,Java指定的類型API被移除了。Scala和Java的用戶應該使用inorg.apache.spark.sql.types中的類來以編程方式描述模式。

隱式轉換的隔離以及dsl包的刪除(只有Scala)

在Spark 1.3以前許多代碼的例子以import sqlContext._開始,它會將sqlContext中全部的函數引入到scope中。在Spark 1.3中,咱們將在SQLContext內部的RDDs轉換成DataFrames轉成成對象進行了隱式轉化的隔離。用戶如今須要寫 import sqlContext.implicits._。

此外,隱式轉換如今只有組成Rroducts的RDDs參數帶有一個toDF方法,而不是自動地應用。

當使用DSL(如今替代爲 DataFrame API)內部的方法時,用戶以前會import org.apache.spark.sql.catalyst.dsl。如今使用公共的dataframe方法API應該用import org.apache.spark.sql.functions._。

爲DataType刪除在org.apache.spark.sql中的類型別名(只有Scala)

Spark 1.3爲DataType刪除了出如今根本的sql包中的類型別名。如今,用戶應該引入org.apache.spark.sql.types中的類。

UDF註冊移到sqlContext.udf 中(Java 和 Scala)

用於註冊UDFs的函數,不是用於DataFrame DSL就是SQL,已經被移動了SQLContext中的udf對象中。

sqlCtx.udf.register("strLen", (s: String) => s.length())

Python的UDF註冊沒有改變。

Python中的DataTypes再也不是單例了(Python DataTypes No Longer Singletons)。

當使用Python中的DataTypes,你須要建立它們(i.e. StringType()),而不是引用一個單例。

與Apache Hive的兼容性

Spark SQL被設計出來用於兼容Hive Metastore, SerDes 以及 UDFs。目前的Spark SQL是基於Hive 0.12.0和0.13.1。

部署在現有的Hive倉庫中(Deploying in Existing Hive Warehouses)

The Spark SQL Thrigt JDBC服務被設計出來「當即可用」 的兼容現有的Hive安裝。你不須要修改已經存在的Hive Metastore 或者更改數據位置或者表分區。

支持的Hive特性

Spark SQL 支持大量的Hive特性,例如:

· Hive query statements, including:

o SELECT

o GROUP BY

o ORDER BY

o CLUSTER BY

o SORT BY

· All Hive operators, including:

o Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc)

o Arithmetic operators (+, -, *, /, %, etc)

o Logical operators (AND, &&, OR, ||, etc)

o Complex type constructors

o Mathematical functions (sign, ln, cos, etc)

o String functions (instr, length, printf, etc)

· User defined functions (UDF)

· User defined aggregation functions (UDAF)

· User defined serialization formats (SerDes)

· Joins

o JOIN

o {LEFT|RIGHT|FULL} OUTER JOIN

o LEFT SEMI JOIN

o CROSS JOIN

· Unions

· Sub-queries

o SELECT col FROM ( SELECT a + b AS col from t1) t2

· Sampling

· Explain

· Partitioned tables

· View

· All Hive DDL Functions, including:

o CREATE TABLE

o CREATE TABLE AS SELECT

o ALTER TABLE

· Most Hive Data types, including:

o TINYINT

o SMALLINT

o INT

o BIGINT

o BOOLEAN

o FLOAT

o DOUBLE

o STRING

o BINARY

o TIMESTAMP

o DATE

o ARRAY<>

o MAP<>

o STRUCT<>

不支持的Hive功能

下面是不支持的Hive特性的列表。大多數特性不多會在Hive部署中用到。

Major Hive Features

· Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.

· 帶有buckets的Table:bucket是Hive表分區中的哈希分區。Spark SQL不支持buckets。

Esoteric Hive Features 

* UNION type * Unique join * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

· File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.

· Hadoop archive

· CLI文件格式:對於結果,顯示回CLI,Spark SQL 只支持TextOutputFormat

· Hadoop存檔

Hive Optimizations

少許的Hive優化沒有包含在Spark中。因爲Spark SQL的內存計算模型它們中的有一些(例如索引)是次要的。其餘的一些會來的Spark SQL版本中加入。

· Block level bitmap indexes and virtual columns 

· Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using 「SET spark.sql.shuffle.partitions=[num_tasks];」.

· Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.

· Skew data flag: Spark SQL does not follow the skew data flags in Hive.

· STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.

· Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

· 塊級別的位圖索引和虛擬列(用來建立索引)

· 自動爲joins和groupbys決定reducers的數量:目前在Spark SQL中,你須要使用「SET spark.sql.shuffle.partitions=[num_tasks];」來控制並行的post-shuffle的度。

· 僅元數據查詢:對於僅使用元數據來回答的查詢,Spark SQL仍是啓動任務來計算結果。

· 偏斜數據標誌:Spark SQL不遵循Hive中的偏斜數據標誌。

· join中的STREAMTABLE hint:Spark SQL不遵循STREAMTABLE hint。

· 爲查詢結果合併多個小文件:若是結果輸出中包含多個小文件,Hive能夠選擇性地將多個小文件合併成更少的更大的文件,避免HDFS元數據的溢出。Spark SQL不支持這些。

數據類型

Spark SQL 和 DataFrames支持如下的數據類型:

· Numeric types

o ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.

o ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.

o IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.

o LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.

o FloatType: Represents 4-byte single-precision floating point numbers.

o DoubleType: Represents 8-byte double-precision floating point numbers.

o DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.

· String type

o StringType: Represents character string values.

· Binary type

o BinaryType: Represents byte sequence values.

· Boolean type

o BooleanType: Represents boolean values.

· Datetime type

o TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.

o DateType: Represents values comprising values of fields year, month, day.

· Complex types

o ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType.containsNull is used to indicate if elements in a ArrayType value can have null values.

o MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.

o StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).

§ StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中全部的數據類型的都在包 package org.apache.spark.sql.types中,你能夠這樣訪問它們:

import  org.apache.spark.sql.types._

 

Data type

Value type in Scala

API to access or create a data type

ByteType

Byte

ByteType

ShortType

Short

ShortType

IntegerType

Int

IntegerType

LongType

Long

LongType

FloatType

Float

FloatType

DoubleType

Double

DoubleType

DecimalType

java.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BooleanType

Boolean

BooleanType

TimestampType

java.sql.Timestamp

TimestampType

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.

StructType

org.apache.spark.sql.Row

StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.

StructField

The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

StructField(name, dataType, nullable)

相關文章
相關標籤/搜索