spark-1.6.0 [原文地址]html
Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不一樣,Spark SQL的接口提供了更多關於數據的結構信息和計算任務的運行時信息。在Spark內部,Spark SQL會可以用於作優化的信息比RDD API更多一些。Spark SQL現在有了三種不一樣的API:SQL語句、DataFrame API和最新的Dataset API。不過真正運行計算的時候,不管你使用哪一種API或語言,Spark SQL使用的執行引擎都是同一個。這種底層的統一,使開發者能夠在不一樣的API之間來回切換,你能夠選擇一種最天然的方式,來表達你的需求。java
本文中全部的示例都使用Spark發佈版本中自帶的示例數據,而且能夠在spark-shell、pyspark shell以及sparkR shell中運行。python
Spark SQL的一種用法是直接執行SQL查詢語句,你可以使用最基本的SQL語法,也能夠選擇HiveQL語法。Spark SQL能夠從已有的Hive中讀取數據。更詳細的請參考Hive Tables 這一節。若是用其餘編程語言運行SQL,Spark SQL將以DataFrame返回結果。你還能夠經過命令行command-line 或者 JDBC/ODBC 使用Spark SQL。mysql
DataFrame是一種分佈式數據集合,每一條數據都由幾個命名字段組成。概念上來講,她和關係型數據庫的表 或者 R和Python中的data frame等價,只不過在底層,DataFrame採用了更多優化。DataFrame能夠從不少數據源(sources)加載數據並構造獲得,如:結構化數據文件,Hive中的表,外部數據庫,或者已有的RDD。sql
DataFrame API支持Scala, Java, Python, and R。shell
Dataset是Spark-1.6新增的一種API,目前仍是實驗性的。Dataset想要把RDD的優點(強類型,可使用lambda表達式函數)和Spark SQL的優化執行引擎的優點結合到一塊兒。Dataset能夠由JVM對象構建(constructed )獲得,然後Dataset上可使用各類transformation算子(map,flatMap,filter 等)。數據庫
Dataset API 對 Scala 和 Java的支持接口是一致的,但目前還不支持Python,不過Python自身就有語言動態特性優點(例如,你可使用字段名來訪問數據,row.columnName)。對Python的完整支持在將來的版本會增長進來。apache
Spark SQL全部的功能入口都是SQLContext
類,及其子類。不過要建立一個SQLContext對象,首先須要有一個SparkContext對象。編程
val sc: SparkContext // 假設已經有一個 SparkContext 對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 用於包含RDD到DataFrame隱式轉換操做 import sqlContext.implicits._
除了SQLContext以外,你也能夠建立HiveContext,HiveContext是SQLContext 的超集。json
除了SQLContext的功能以外,HiveContext還提供了完整的HiveQL語法,UDF使用,以及對Hive表中數據的訪問。要使用HiveContext,你並不須要安裝Hive,並且SQLContext能用的數據源,HiveContext也同樣能用。HiveContext是單獨打包的,從而避免了在默認的Spark發佈版本中包含全部的Hive依賴。若是這些依賴對你來講不是問題(不會形成依賴衝突等),建議你在Spark-1.3以前使用HiveContext。然後續的Spark版本,將會逐漸把SQLContext升級到和HiveContext功能差很少的狀態。
spark.sql.dialect選項能夠指定不一樣的SQL變種(或者叫SQL方言)。這個參數能夠在SparkContext.setConf裏指定,也能夠經過 SQL語句的SET key=value命令指定。對於SQLContext,該配置目前惟一的可選值就是」sql」,這個變種使用一個Spark SQL自帶的簡易SQL解析器。而對於HiveContext,spark.sql.dialect 默認值爲」hiveql」,固然你也能夠將其值設回」sql」。僅就目前而言,HiveSQL解析器支持更加完整的SQL語法,因此大部分狀況下,推薦使用HiveContext。
Spark應用能夠用SparkContext建立DataFrame,所需的數據來源能夠是已有的RDD(existing RDD
),或者Hive表,或者其餘數據源(data sources.)
如下是一個從JSON文件建立DataFrame的小栗子:
val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // 將DataFrame內容打印到stdout df.show()
DataFrame提供告終構化數據的領域專用語言支持,包括Scala, Java, Python and R.
這裏咱們給出一個結構化數據處理的基本示例:
val sc: SparkContext // 已有的 SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 建立一個 DataFrame val df = sqlContext.read.json("examples/src/main/resources/people.json") // 展現 DataFrame 的內容 df.show() // age name // null Michael // 30 Andy // 19 Justin // 打印數據樹形結構 df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // select "name" 字段 df.select("name").show() // name // Michael // Andy // Justin // 展現全部人,但全部人的 age 都加1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Michael null // Andy 31 // Justin 20 // 篩選出年齡大於21的人 df.filter(df("age") > 21).show() // age name // 30 Andy // 計算各個年齡的人數 df.groupBy("age").count().show() // age count // null 1 // 19 1 // 30 1
DataFrame的完整API列表請參考這裏:API Documentation
除了簡單的字段引用和表達式支持以外,DataFrame還提供了豐富的工具函數庫,包括字符串組裝,日期處理,常見的數學函數等。完整列表見這裏:DataFrame Function Reference.
SQLContext.sql能夠執行一個SQL查詢,並返回DataFrame結果。
val sqlContext = ... // 已有一個 SQLContext 對象 val df = sqlContext.sql("SELECT * FROM table")
Dataset API和RDD相似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通訊。若是這個編碼器和標準序列化都能把對象轉字節,那麼編碼器就能夠根據代碼動態生成,並使用一種特殊數據格式,這種格式下的對象不須要反序列化回來,就能容許Spark進行操做,如過濾、排序、哈希等。
// 對普通類型數據的Encoder是由 importing sqlContext.implicits._ 自動提供的 val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // 返回: Array(2, 3, 4) // 如下這行不只定義了case class,同時也自動爲其建立了Encoder case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrame 只需提供一個和數據schema對應的class便可轉換爲 Dataset。Spark會根據字段名進行映射。 val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
Spark SQL有兩種方法將RDD轉爲DataFrame。
1. 使用反射機制,推導包含指定類型對象RDD的schema。這種基於反射機制的方法使代碼更簡潔,並且若是你事先知道數據schema,推薦使用這種方式;
2. 編程方式構建一個schema,而後應用到指定RDD上。這種方式更囉嗦,但若是你事先不知道數據有哪些字段,或者數據schema是運行時讀取進來的,那麼你極可能須要用這種方式。
Spark SQL的Scala接口支持自動將包含case class對象的RDD轉爲DataFrame。對應的case class定義了表的schema。case class的參數名經過反射,映射爲表的字段名。case class還能夠嵌套一些複雜類型,如Seq和Array。RDD隱式轉換成DataFrame後,能夠進一步註冊成表。隨後,你就能夠對錶中數據使用SQL語句查詢了。
// sc 是已有的 SparkContext 對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 爲了支持RDD到DataFrame的隱式轉換 import sqlContext.implicits._ // 定義一個case class. // 注意:Scala 2.10的case class最多支持22個字段,要繞過這一限制, // 你可使用自定義class,並實現Product接口。固然,你也能夠改用編程方式定義schema case class Person(name: String, age: Int) // 建立一個包含Person對象的RDD,並將其註冊成table val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // sqlContext.sql方法能夠直接執行SQL語句 val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // SQL查詢的返回結果是一個DataFrame,且可以支持全部常見的RDD算子 // 查詢結果中每行的字段能夠按字段索引訪問: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // 或者按字段名訪問: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] 會一次性返回多列,並以Map[String, T]爲返回結果類型 teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // 返回結果: Map("name" -> "Justin", "age" -> 19)
若是不能事先經過case class定義schema(例如,記錄的字段結構是保存在一個字符串,或者其餘文本數據集中,須要先解析,又或者字段對不一樣用戶有所不一樣),那麼你可能須要按如下三個步驟,以編程方式的建立一個DataFrame:
For example:
例如:
// sc 是已有的SparkContext對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 建立一個RDD val people = sc.textFile("examples/src/main/resources/people.txt") // 數據的schema被編碼與一個字符串中 val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL 各個數據類型 import org.apache.spark.sql.types.{StructType,StructField,StringType}; // 基於前面的字符串生成schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // 將RDD[people]的各個記錄轉換爲Rows,即:獲得一個包含Row對象的RDD val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // 將schema應用到包含Row對象的RDD上,獲得一個DataFrame val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // 將DataFrame註冊爲table peopleDataFrame.registerTempTable("people") // 執行SQL語句 val results = sqlContext.sql("SELECT name FROM people") // SQL查詢的結果是DataFrame,且可以支持全部常見的RDD算子 // 而且其字段能夠以索引訪問,也能夠用字段名訪問 results.map(t => "Name: " + t(0)).collect().foreach(println)
Spark SQL支持基於DataFrame操做一系列不一樣的數據源。DataFrame既能夠當成一個普通RDD來操做,也能夠將其註冊成一個臨時表來查詢。把DataFrame註冊爲table以後,你就能夠基於這個table執行SQL語句了。本節將描述加載和保存數據的一些通用方法,包含了不一樣的Spark數據源,而後深刻介紹一下內建數據源可用選項。
在最簡單的狀況下,全部操做都會以默認類型數據源來加載數據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
你也能夠手動指定數據源,並設置一些額外的選項參數。數據源可由其全名指定(如,org.apache.spark.sql.parquet),而對於內建支持的數據源,可使用簡寫名(json, parquet, jdbc)。任意類型數據源建立的DataFrame均可以用下面這種語法轉成其餘類型數據格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Spark SQL還支持直接對文件使用SQL查詢,不須要用read方法把文件加載進來。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save操做有一個可選參數SaveMode,用這個參數能夠指定如何處理數據已經存在的狀況。很重要的一點是,這些保存模式都沒有加鎖,因此其操做也不是原子性的。另外,若是使用Overwrite模式,實際操做是,先刪除數據,再寫新數據。
僅Scala/Java | 全部支持的語言 | 含義 |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" (default) |
(默認模式)從DataFrame向數據源保存數據時,若是數據已經存在,則拋異常。 |
SaveMode.Append |
"append" |
若是數據或表已經存在,則將DataFrame的數據追加到已有數據的尾部。 |
SaveMode.Overwrite |
"overwrite" |
若是數據或表已經存在,則用DataFrame數據覆蓋之。 |
SaveMode.Ignore |
"ignore" |
若是數據已經存在,那就放棄保存DataFrame數據。這和SQL裏CREATE TABLE IF NOT EXISTS有點相似。 |
在使用HiveContext的時候,DataFrame能夠用saveAsTable方法,將數據保存成持久化的表。與registerTempTable不一樣,saveAsTable會將DataFrame的實際數據內容保存下來,而且在HiveMetastore中建立一個遊標指針。持久化的表會一直保留,即便Spark程序重啓也沒有影響,只要你鏈接到同一個metastore就能夠讀取其數據。讀取持久化表時,只須要用用表名做爲參數,調用SQLContext.table方法便可獲得對應DataFrame。
默認狀況下,saveAsTable會建立一個」managed table「,也就是說這個表數據的位置是由metastore控制的。一樣,若是刪除表,其數據也會同步刪除。
Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,並且Parquet文件可以自動保存原始數據的schema。寫Parquet文件的時候,全部的字段都會自動轉成nullable,以便向後兼容。
仍然使用上面例子中的數據:
// 咱們繼續沿用以前例子中的sqlContext對象 // 爲了支持RDD隱式轉成DataFrame import sqlContext.implicits._ val people: RDD[Person] = ... // 和上面例子中相同,一個包含case class對象的RDD // 該RDD將隱式轉成DataFrame,而後保存爲parquet文件 people.write.parquet("people.parquet") // 讀取上面保存的Parquet文件(多個文件 - Parquet保存完實際上是不少個文件)。Parquet文件是自描述的,文件中保存了schema信息 // 加載Parquet文件,並返回DataFrame結果 val parquetFile = sqlContext.read.parquet("people.parquet") // Parquet文件(多個)能夠註冊爲臨時表,而後在SQL語句中直接查詢 parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
像Hive這樣的系統,一個很經常使用的優化手段就是表分區。在一個支持分區的表中,數據是保存在不一樣的目錄中的,而且將分區鍵以編碼方式保存在各個分區目錄路徑中。Parquet數據源如今也支持自動發現和推導分區信息。例如,咱們能夠把以前用的人口數據存到一個分區表中,其目錄結構以下所示,其中有2個額外的字段,gender和country,做爲分區鍵:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
在這個例子中,若是須要讀取Parquet文件數據,咱們只須要把 path/to/table 做爲參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL可以自動的從路徑中提取出分區信息,隨後返回的DataFrame的schema以下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
注意,分區鍵的數據類型將是自動推導出來的。目前,只支持數值類型和字符串類型數據做爲分區鍵。
有的用戶可能不想要自動推導出來的分區鍵數據類型。這種狀況下,你能夠經過 spark.sql.sources.partitionColumnTypeInference.enabled (默認是true)來禁用分區鍵類型推導。禁用以後,分區鍵老是被當成字符串類型。
從Spark-1.6.0開始,分區發現默認只在指定目錄的子目錄中進行。以上面的例子來講,若是用戶把 path/to/table/gender=male 做爲參數傳給 SQLContext.read.parquet 或者 SQLContext.read.load,那麼gender就不會被做爲分區鍵。若是用戶想要指定分區發現的基礎目錄,能夠經過basePath選項指定。例如,若是把 path/to/table/gender=male做爲數據目錄,而且將basePath設爲 path/to/table,那麼gender仍然會最爲分區鍵。
像ProtoBuffer、Avro和Thrift同樣,Parquet也支持schema演變。用戶從一個簡單的schema開始,逐漸增長所需的新字段。這樣的話,用戶最終會獲得多個schema不一樣但互相兼容的Parquet文件。目前,Parquet數據源已經支持自動檢測這種狀況,併合並全部文件的schema。
由於schema合併相對代價比較大,而且在多數狀況下不是必要的,因此從Spark-1.5.0以後,默認是被禁用的。你能夠這樣啓用這一功能:
// 繼續沿用以前的sqlContext對象 // 爲了支持RDD隱式轉換爲DataFrame import sqlContext.implicits._ // 建立一個簡單的DataFrame,存到一個分區目錄中 val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // 建立另外一個DataFrame放到新的分區目錄中, // 並增長一個新字段,丟棄一個老字段 val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // 讀取分區表 val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // 最終的schema將由3個字段組成(single,double,triple) // 而且分區鍵出如今目錄路徑中 // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
在讀寫Hive metastore Parquet 表時,Spark SQL用的是內部的Parquet支持庫,而不是Hive SerDe,由於這樣性能更好。這一行爲是由spark.sql.hive.convertMetastoreParquet 配置項來控制的,並且默認是啓用的。
Hive和Parquet在表結構處理上主要有2個不一樣點:
因爲以上緣由,咱們必須在Hive metastore Parquet table轉Spark SQL Parquet table的時候,對Hive metastore schema作調整,調整規則以下:
Spark SQL會緩存Parquet元數據以提升性能。若是Hive metastore Parquet table轉換被啓用的話,那麼轉換過來的schema也會被緩存。這時候,若是這些表由Hive或其餘外部工具更新了,你必須手動刷新元數據。
// 注意,這裏sqlContext是一個HiveContext sqlContext.refreshTable("my_table")
Parquet配置能夠經過 SQLContext.setConf 或者 SQL語句中 SET key=value來指定。
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.parquet.binaryAsString |
false | 有些老系統,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不區分二進制數據和字符串類型數據。這個標誌的意思是,讓Spark SQL把二進制數據當字符串處理,以兼容老系統。 |
spark.sql.parquet.int96AsTimestamp |
true | 有些老系統,如:特定版本的Impala,Hive,把時間戳存成INT96。這個配置的做用是,讓Spark SQL把這些INT96解釋爲timestamp,以兼容老系統。 |
spark.sql.parquet.cacheMetadata |
true | 緩存Parquet schema元數據。能夠提高查詢靜態數據的速度。 |
spark.sql.parquet.compression.codec |
gzip | 設置Parquet文件的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip(默認), lzo |
spark.sql.parquet.filterPushdown |
true | 啓用過濾器下推優化,能夠講過濾條件儘可能推導最下層,已取得性能提高 |
spark.sql.hive.convertMetastoreParquet |
true | 若是禁用,Spark SQL將使用Hive SerDe,而不是內建的對Parquet tables的支持 |
spark.sql.parquet.output.committer.class |
org.apache.parquet.hadoop. |
Parquet使用的數據輸出類。這個類必須是 org.apache.hadoop.mapreduce.OutputCommitter的子類。通常來講,它也應該是 org.apache.parquet.hadoop.ParquetOutputCommitter的子類。注意:1. 若是啓用spark.speculation, 這個選項將被自動忽略
2. 這個選項必須用hadoop configuration設置,而不是Spark SQLConf 3. 這個選項會覆蓋 spark.sql.sources.outputCommitterClass Spark SQL有一個內建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 這個類的在輸出到S3的時候比默認的ParquetOutputCommitter類效率高。 |
spark.sql.parquet.mergeSchema |
false |
若是設爲true,那麼Parquet數據源將會merge 全部數據文件的schema,不然,schema是從summary file獲取的(若是summary file沒有設置,則隨機選一個) |
Spark SQL在加載JSON數據的時候,能夠自動推導其schema並返回DataFrame。用SQLContext.read.json讀取一個包含String的RDD或者JSON文件,便可實現這一轉換。
注意,一般所說的json文件只是包含一些json數據的文件,而不是咱們所須要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。所以,一個常規的多行json文件常常會加載失敗。
// sc是已有的SparkContext對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 數據集是由路徑指定的 // 路徑既能夠是單個文件,也能夠仍是存儲文本文件的目錄 val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // 推導出來的schema,可由printSchema打印出來 people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // 將DataFrame註冊爲table people.registerTempTable("people") // 跑SQL語句吧! val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // 另外一種方法是,用一個包含JSON字符串的RDD來建立DataFrame val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Spark SQL支持從Apache Hive讀寫數據。然而,Hive依賴項太多,因此沒有把Hive包含在默認的Spark發佈包裏。要支持Hive,須要在編譯spark的時候增長-Phive和-Phive-thriftserver標誌。這樣編譯打包的時候將會把Hive也包含進來。注意,hive的jar包也必須出如今全部的worker節點上,訪問Hive數據時候會用到(如:使用hive的序列化和反序列化SerDes時)。
Hive配置在conf/目錄下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。請注意,若是在YARN cluster(yarn-cluster mode)模式下執行一個查詢的話,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅動器(driver)和全部執行器(executor)均可用。一種簡便的方法是,經過spark-submit命令的–jars和–file選項來提交這些文件。
若是使用Hive,則必須構建一個HiveContext,HiveContext是派生於SQLContext的,添加了在Hive Metastore裏查詢表的支持,以及對HiveQL的支持。用戶沒有現有的Hive部署,也能夠建立一個HiveContext。若是沒有在hive-site.xml裏配置,那麼HiveContext將會自動在當前目錄下建立一個metastore_db目錄,再根據HiveConf設置建立一個warehouse目錄(默認/user/hive/warehourse)。因此請注意,你必須把/user/hive/warehouse的寫權限賦予啓動spark應用程序的用戶。
// sc是一個已有的SparkContext對象 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // 這裏用的是HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
Spark SQL對Hive最重要的支持之一就是和Hive metastore進行交互,這使得Spark SQL能夠訪問Hive表的元數據。從Spark-1.4.0開始,Spark SQL有專門單獨的二進制build版本,能夠用來訪問不一樣版本的Hive metastore,其配置表以下。注意,無論所訪問的hive是什麼版本,Spark SQL內部都是以Hive 1.2.1編譯的,並且內部使用的Hive類也是基於這個版本(serdes,UDFs,UDAFs等)
如下選項可用來配置Hive版本以便訪問其元數據:
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.hive.metastore.version |
1.2.1 |
Hive metastore版本,可選的值爲0.12.0 到 1.2.1 |
spark.sql.hive.metastore.jars |
builtin |
初始化HiveMetastoreClient的jar包。這個屬性能夠是如下三者之一:
目前內建爲使用Hive-1.2.1,編譯的時候啓用-Phive,則會和spark一塊兒打包。若是沒有-Phive,那麼spark.sql.hive.metastore.version要麼是1.2.1,要就是未定義
使用maven倉庫下載的jar包版本。這個選項建議不要再生產環境中使用
|
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一個逗號分隔的類名前綴列表,這些類使用classloader加載,且能夠在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就須要這種共享。其餘須要共享的類,是與某些已經共享的類有交互的類。例如,自定義的log4j appender |
spark.sql.hive.metastore.barrierPrefixes |
(empty) |
一個逗號分隔的類名前綴列表,這些類在每一個Spark SQL所訪問的Hive版本中都會被顯式的reload。例如,某些在共享前綴列表(spark.sql.hive.metastore.sharedPrefixes)中聲明爲共享的Hive UD函數 |
Spark SQL也能夠用JDBC訪問其餘數據庫。這一功能應該優先於使用JdbcRDD。由於它返回一個DataFrame,而DataFrame在Spark SQL中操做更簡單,且更容易和來自其餘數據源的數據進行交互關聯。JDBC數據源在java和python中用起來也很簡單,不須要用戶提供額外的ClassTag。(注意,這與Spark SQL JDBC server不一樣,Spark SQL JDBC server容許其餘應用執行Spark SQL查詢)
首先,你須要在spark classpath中包含對應數據庫的JDBC driver,下面這行包括了用於訪問postgres的數據庫driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
遠程數據庫的表能夠經過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。如下是選項列表:
屬性名 | 含義 |
---|---|
url |
須要鏈接的JDBC URL |
dbtable |
須要讀取的JDBC表。注意,任何能夠填在SQL的where子句中的東西,均可以填在這裏。(既能夠填完整的表名,也可填括號括起來的子查詢語句) |
driver |
JDBC driver的類名。這個類必須在master和worker節點上均可用,這樣各個節點才能將driver註冊到JDBC的子系統中。 |
partitionColumn, lowerBound, upperBound, numPartitions |
這幾個選項,若是指定其中一個,則必須所有指定。他們描述了多個worker如何並行的讀入數據,並將表分區。partitionColumn必須是所查詢的表中的一個數值字段。注意,lowerBound和upperBound只是用於決定分區跨度的,而不是過濾表中的行。所以,表中全部的行都會被分區而後返回。 |
fetchSize |
JDBC fetch size,決定每次獲取多少行數據。在JDBC驅動上設成較小的值有利於性能優化(如,Oracle上設爲10) |
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
對於有必定計算量的Spark做業來講,可能的性能改進的方式,不是把數據緩存在內存裏,就是調整一些開銷較大的選項參數。
Spark SQL能夠經過調用SQLContext.cacheTable(「tableName」)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨後,Spark SQL將會掃描必要的列,並自動調整壓縮比例,以減小內存佔用和GC壓力。你也能夠用SQLContext.uncacheTable(「tableName」)來刪除內存中的table。
你還可使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 若是設置爲true,Spark SQL將會根據數據統計信息,自動爲每一列選擇單獨的壓縮編碼方式。 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式緩存批量的大小。增大批量大小能夠提升內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。 |
如下選項一樣也能夠用來給查詢任務調性能。不過這些選項在將來可能被放棄,由於spark將支持愈來愈多的自動優化。
屬性名 | 默認值 | 含義 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置join操做時,可以做爲廣播變量的最大table的大小。設置爲-1,表示禁用廣播。注意,目前的元數據統計僅支持Hive metastore中的表,而且須要運行這個命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan |
spark.sql.tungsten.enabled |
true | 設爲true,則啓用優化的Tungsten物理執行後端。Tungsten會顯式的管理內存,並動態生成表達式求值的字節碼 |
spark.sql.shuffle.partitions |
200 | 配置數據混洗(shuffle)時(join或者聚合操做),使用的分區數。 |
Spark SQL能夠做爲JDBC/ODBC或者命令行工具的分佈式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就能夠直接在Spark SQL中運行SQL查詢。
這裏實現的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2
是相同的。你可使用beeline腳原本測試Spark或者Hive-1.2.1的JDBC server。
在Spark目錄下運行下面這個命令,啓動一個JDBC/ODBC server
./sbin/start-thriftserver.sh
這個腳本能接受全部 bin/spark-submit 命令支持的選項參數,外加一個 –hiveconf 選項,來指定Hive屬性。運行./sbin/start-thriftserver.sh –help能夠查看完整的選項列表。默認狀況下,啓動的server將會在localhost:10000端口上監聽。要改變監聽主機名或端口,能夠用如下環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
或者Hive系統屬性 來指定
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
接下來,你就能夠開始在beeline中測試這個Thrift JDBC/ODBC server:
./bin/beeline
下面的指令,能夠鏈接到一個JDBC/ODBC server
beeline> !connect jdbc:hive2://localhost:10000
可能須要輸入用戶名和密碼。在非安全模式下,只要輸入你本機的用戶名和一個空密碼便可。對於安全模式,請參考beeline documentation.
Hive的配置是在conf/目錄下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。
你也能夠在beeline的腳本中指定。
Thrift JDBC server也支持經過HTTP傳輸Thrift RPC消息。如下配置(在conf/hive-site.xml中)將啓用HTTP模式:
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
一樣,在beeline中也能夠用HTTP模式鏈接JDBC/ODBC server:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
Spark SQL CLI是一個很方便的工具,它能夠用local mode運行hive metastore service,而且在命令行中執行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通訊。
用以下命令,在spark目錄下啓動一個Spark SQL CLI
./bin/spark-sql
Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你能夠用這個命令查看完整的選項列表:./bin/spark-sql –help
./sbin/start-thriftserver.sh \ --conf spark.sql.hive.thriftServer.singleSession=true \ ...
根據用戶的反饋,咱們提供了一個新的,更加流暢的API,用於數據讀(SQLContext.read)寫(DataFrame.write),同時老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)將被廢棄。
有關SQLContext.read和DataFrame.write的更詳細信息,請參考API文檔。
根據用戶的反饋,咱們改變了DataFrame.groupBy().agg()的默認行爲,在返回的DataFrame結果中保留了分組字段。若是你想保持1.3中的行爲,設置spark.sql.retainGroupColumns爲false便可。
// 在1.3.x中,若是要保留分組字段"department", 你必須顯式的在agg聚合時包含這個字段 df.groupBy("department").agg($"department", max("age"), sum("expense")) // 而在1.4+,分組字段"department"默認就會包含在返回的DataFrame中 df.groupBy("department").agg(max("age"), sum("expense")) // 要回滾到1.3的行爲(不包含分組字段),按以下設置便可: sqlContext.setConf("spark.sql.retainGroupColumns", "false")
在Spark 1.3中,咱們去掉了Spark SQL的」Alpha「標籤,並清理了可用的API。從Spark 1.3起,Spark SQL將對1.x系列二進制兼容。這個兼容性保證不包括顯式的標註爲」unstable(如:DeveloperAPI或Experimental)「的API。
對於用戶來講,Spark SQL 1.3最大的改動就是SchemaRDD更名爲DataFrame。主要緣由是,DataFrame再也不直接由RDD派生,而是經過本身的實現提供RDD的功能。DataFrame只須要調用其rdd方法就能轉成RDD。
在Scala中仍然有SchemaRDD,只不過這是DataFrame的一個別名,以便兼容一些現有代碼。但仍然建議用戶改用DataFrame。Java和Python用戶就沒這個福利了,他們必須改代碼。
在Spark 1.3以前,有單獨的java兼容類(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的鏡像。Spark 1.3中將Java API和Scala API統一。兩種語言的用戶都應該使用SQLContext和DataFrame。通常這些類中都會使用兩種語言中都有的類型(如:Array取代各語言獨有的集合)。有些狀況下,沒有通用的類型(例如:閉包或者maps),將會使用函數重載來解決這個問題。
另外,java特有的類型API被刪除了。Scala和java用戶都應該用org.apache.spark.sql.types來編程描述一個schema。
Spark 1.3以前的不少示例代碼,都在開頭用 import sqlContext._,這行將會致使全部的sqlContext的函數都被引入進來。所以,在Spark 1.3咱們把RDDs到DataFrames的隱式轉換隔離出來,單獨放到SQLContext.implicits對象中。用戶如今應該這樣寫:import sqlContext.implicits._
另外,隱式轉換也支持由Product(如:case classes或tuples)組成的RDD,但須要調用一個toDF方法,而不是自動轉換。
若是須要使用DSL(被DataFrame取代的API)中的方法,用戶以前須要導入DSL(import org.apache.spark.sql.catalyst.dsl), 而如今應該要導入 DataFrame API(import org.apache.spark.sql.functions._)
Spark 1.3刪除了sql包中的DataType類型別名。如今,用戶應該使用 org.apache.spark.sql.types中的類。
註冊UDF的函數,無論是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。
sqlContext.udf.register("strLen", (s: String) => s.length())
Python UDF註冊保持不變。
在python中使用DataTypes,你須要先構造一個對象(如:StringType()),而不是引用一個單例。
用戶能夠經過以下命令,爲JDBC客戶端session設定一個Fair Scheduler pool。
SET spark.sql.thriftserver.scheduler.pool=accounting;
在Shark中,默認的reducer個數是1,而且由mapred.reduce.tasks設定。Spark SQL廢棄了這個屬性,改成 spark.sql.shuffle.partitions, 而且默認200,用戶可經過以下SET命令來自定義:
SET spark.sql.shuffle.partitions=10; SELECT page, count(*) c FROM logs_last_month_cached GROUP BY page ORDER BY c DESC LIMIT 10;
你也能夠把這個屬性放到hive-site.xml中來覆蓋默認值。
目前,mapred.reduce.tasks屬性仍然能被識別,而且自動轉成spark.sql.shuffle.partitions
shark.cache表屬性已經不存在了,而且以」_cached」結尾命名的表也再也不會自動緩存。取而代之的是,CACHE TABLE和UNCACHE TABLE語句,用以顯式的控制表的緩存:
CACHE TABLE logs_last_month; UNCACHE TABLE logs_last_month;
注意:CACHE TABLE tbl 如今默認是飢餓模式,而非懶惰模式。不再須要手動調用其餘action來觸發cache了!
從Spark-1.2.0開始,Spark SQL新提供了一個語句,讓用戶本身控制表緩存是不是懶惰模式
CACHE [LAZY] TABLE [AS SELECT] ...
如下幾個緩存相關的特性再也不支持:
Spark SQL設計時考慮了和Hive metastore,SerDes以及UDF的兼容性。目前這些兼容性鬥是基於Hive-1.2.1版本,而且Spark SQL能夠連到不一樣版本的Hive metastore(從0.12.0到1.2.1,參考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore)
Spark SQL Thrift JDBC server採用了」out of the box」(開箱即用)的設計,使用很方便,併兼容已有的Hive安裝版本。你不須要修改已有的Hive metastore或者改變數據的位置,或者表分區。
Spark SQL 支持絕大部分Hive功能,如:
SELECT
GROUP BY
ORDER BY
CLUSTER BY
SORT BY
=
, ⇔
, ==
, <>
, <
, >
, >=
, <=
, etc)+
, -
, *
, /
, %
, etc)AND
, &&
, OR
, ||
, etc)sign
, ln
, cos
, etc)instr
, length
, printf
, etc)JOIN
{LEFT|RIGHT|FULL} OUTER JOIN
LEFT SEMI JOIN
CROSS JOIN
SELECT col FROM ( SELECT a + b AS col from t1) t2
CREATE TABLE
CREATE TABLE AS SELECT
ALTER TABLE
TINYINT
SMALLINT
INT
BIGINT
BOOLEAN
FLOAT
DOUBLE
STRING
BINARY
TIMESTAMP
DATE
ARRAY<>
MAP<>
STRUCT<>
如下是目前不支持的Hive特性的列表。多數是不經常使用的。
不支持的Hive常見功能
不支持的Hive高級功能
Hive輸入、輸出格式
Hive優化
一些比較棘手的Hive優化目前尚未在Spark中提供。有一些(如索引)對應Spark SQL這種內存計算模型來講並不重要。另一些,在Spark SQL將來的版本中會支持。
STREAMTABLE
join提示:Spark SQL裏沒有這玩藝兒Spark SQL和DataFrames支持以下數據類型:
ByteType
: 1字節長的有符號整型,範圍:-128
到 127
.ShortType
: 2字節長有符號整型,範圍:-32768
到 32767
.IntegerType
: 4字節有符號整型,範圍:-2147483648
到 2147483647
.LongType
: 8字節有符號整型,範圍: -9223372036854775808
to 9223372036854775807
.FloatType
: 4字節單精度浮點數。DoubleType
: 8字節雙精度浮點數DecimalType
: 任意精度有符號帶小數的數值。內部使用java.math.BigDecimal, BigDecimal包含任意精度的不縮放整型,和一個32位的縮放整型StringType
: 字符串BinaryType
: 字節序列BooleanType
: 布爾類型TimestampType
: 表示包含年月日、時分秒等字段的日期DateType
: 表示包含年月日字段的日期ArrayType(elementType, containsNull)
:數組類型,表達一系列的elementType類型的元素組成的序列,containsNull表示數組可否包含null值MapType(keyType, valueType, valueContainsNull)
:映射集合類型,表示一個鍵值對的集合。鍵的類型是keyType,值的類型則由valueType指定。對應MapType來講,鍵是不能爲null的,而值可否爲null則取決於valueContainsNull。StructType(fields):
表示包含StructField序列的結構體。
全部Spark SQL支持的數據類型都在這個包裏:org.apache.spark.sql.types,你能夠這樣導入之:
import org.apache.spark.sql.types._
Data type | Value type in Scala | API to access or create a data type |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull])注意:默認containsNull爲true |
MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull])注意:默認valueContainsNull爲true |
StructType | org.apache.spark.sql.Row | StructType(fields)注意:fields是一個StructFields的序列,而且同名的字段是不容許的。 |
StructField | 定義字段的數據對應的Scala類型(例如,若是StructField的dataType爲IntegerType,則其數據對應的scala類型爲Int) | StructField(name, dataType, nullable) |
這是Not-a-Number的縮寫,某些float或double類型不符合標準浮點數語義,須要對其特殊處理:
原創文章,轉載請註明: 轉載自併發編程網 – ifeve.com本文連接地址: 《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 編程指南