----本節內容-------
1.概覽
1.1 Spark SQL
1.2 DatSets和DataFrame
2.動手幹活
2.1 契入點:SparkSession
2.2 建立DataFrames
2.3 非強類型結果集操做
2.4 程序化執行SQL查詢
2.5 全局臨時視圖
2.6 建立DataSets
2.7 與RDD交互操做
2.8 彙集函數
3.Spark數據源
3.1 通用Load/Save函數
3.2 Parquets文件格式
3.2.1 讀取Parquet文件
3.2.2 解析分區信息
3.2.3 Schema合併
3.2.4 Hive元與Parquet錶轉換
3.3 JSON數據集
3.4 Hive表
3.5 JDBC鏈接其餘庫
4 性能調優
4.1 緩存數據至內存
4.2 調優參數
5 分佈式SQL引擎
5.1 運行Thrift JDBC/ODBC服務
5.2 運行Spark SQL CLI
6.參考資料
---------------------html
最近好幾個好友和Spark官方文檔槓上了,準備共同整理一下Spark官方文檔,互相分享研究心得故有此一篇。本次參考官網的說明,着重介紹SparkSQL,結合官網提供的重要內容以及本身的理解作一次學習筆記,主要是針對spark2.0的官方文檔,本文不是對官網文檔的翻譯,可是主要參考內容來自官方文檔。java
1.概覽python
Spark SQL是Spark的一個組件,可以很好的處理結構化數據。Spark SQL記錄了更多數據結構化信息, 因此相比RDD,能夠更好的處理結構化數據,而且具備更好的性能(Spark SQL都記錄了啥信息這麼能幹,舉個簡單的例子,通常的數據庫讀表中某個字段的數據,先拿到字段內容,而後還要去元數據表得到這個字段表示什麼含義,來來回回的查,效率低,Spark SQL就不須要這樣,他一拿到字段內容就知道是什麼意思,由於他記錄了字段的含義,不須要去查元數據表來感知這個字段什麼含義,效率就提高了)。可使用 SQL或者DataSet與Spark SQL進行交互。無論你使用Java、Scala仍是python語言,Spark SQL底層計算引擎都是同樣的,因此支持不少語言開發,隨你挑,隨你用什麼語言開發。
1.1 Spark SQL
Spark SQL無縫集成Hive的sql語法,只須要作一些簡單的配置,怎麼配置,自行百度。經過SQL查詢返回的結果是Dataset/DataFrame,也支持命令行或者JDBC的方式鏈接Spark SQL.
1.2 DatSets和DataFrame
官網真的很囉嗦,嘮嘮叨叨的,這一段其實就講了這麼幾個要點
1)DataSet在spark1.6開始支持
2)DataFrame是一個分佈式的數據集合,該數據集合以命名列的方式進行整合。DataFrame能夠理解爲關係數據庫中的一張表,也能夠理解爲R/Python中的一個data frame。DataFrames能夠經過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程當中生成的RDD等。
3)DataFrame的API支持4種語言:Scala、Java、Python、R。可是對python和R支持的不是很好,對Java和scala支持很好,mysql
2.動手幹活算法
本例子演示如何建立SparkSession,建立DataFrames,操做Dataset
啓動Spark shell,
命令: bin/spark-shell --master spark://master01:7077
----------------------
import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark=SparkSession.builder().appName("test").config("spark.some.config.option","some-value").getOrCreate();
val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json");
df.show()
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
----------------------
2.1 契入點:SparkSession
入口點是SparkSession類,使用SparkSession.builder()能夠得到SparkSession對象2.2 建立DataFrames
使用建立好的sparkSession來建立DataFrames,DataFrame能夠來自RDD,或者Spark數據源
如hive,或則其餘數據源,下面這個例子是讀取json數據,數據源格式,people.json在spark自帶文件夾內examples/src/main/resources/people.json
--------------------------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
--------------------------2.3 非強類型結果集操做
DataFrame在Scala、Java、Python和R中爲結構化數據操做提供了一個特定領域語言支持。在Spark2.0中,在Scala和Java的API中,DataFrame僅僅是Dataset的RowS表示。與Scala/Java中的強類型的「帶類型轉換操做」相比,這些操做也能夠看作「無類型轉換操做」。
打印出來表結構打印列名爲name的全部列內容
sql
選擇name和age列,而且age都加1過濾出age大於21的記錄
shell
根據年齡分組,並統計個數 數據庫
除了簡單的列引用和表達式外,Dataset同時有豐富的函數庫,包括字符串操做、日期算法、經常使用數學操做等。完整的列表可參考DataFrame Function Reference。
2.4 程序化執行SQL查詢
Sparksession中的sql函數使得應用能夠編程式執行SQL查詢語句而且已DataFrame形式返回:
2.5 全局臨時視圖
臨時視圖是基於session級別的,建立視圖的session一旦掛掉臨時視圖的生命也就到此爲止了,使用全局視圖,能夠避免這樣的慘劇發生。
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()
2.6 建立DataSets
Dataset與RDD很像,不一樣的是它並不使用Java序列化或者Kryo,而是使用特殊的編碼器來爲網絡間的處理或傳輸的對象進行序列化。對轉換一個對象爲字節的過程來講編碼器和標準系列化器都是可靠的,編碼器的代碼是自動生成而且使用了一種格式,這種格式容許Spark在不須要將字節解碼成對象的狀況下執行不少操做,如filtering、sorting和hashing等。
case class Person(name: String, age: Long)
//建立一個Person而後轉化爲DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()express
//換成你本地spark安裝路徑便可
val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show()備註說明:
1)case 類在scala2.1.0最多支持22個字段
2)編碼器默認導入spark.implicits._
3)經過制定類名,DataFrame能夠自動轉爲DataSet
2.7 與RDD交互操做
Spark SQL支持兩種將已存在的RDD轉化爲Dataset的方法。第一種方法:使用反射推斷包含特定類型對象的RDD的結構。這種基於反射的方法代碼更加簡潔,而且當你在寫Spark程序的時候已經知道RDD的結構的狀況下效果很好。第二種方法:建立Dataset的方法是經過編程接口創建一個結構,而後將它應用於一個存在的RDD。雖然這種方法更加繁瑣,但它容許你在運行以前不知道其中的列和對應的類型的狀況下構建Dataset。
使用反射推斷結構
Spark SQL的Scala接口支持自動的將一個包含case class的RDD轉換爲DataFrame。這個case class定義了表結構。Caseclass的參數名是經過反射機制讀取,而後變成列名。Caseclass能夠嵌套或者包含像Seq或Array之類的複雜類型。這個RDD能夠隱式的轉換爲一個DataFrame,而後被註冊爲一張表。這個表能夠隨後被SQL的statement使用。
-----------------------
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder
import spark.implicits._
//讀取txt文件->切分文件->將切分後的內容做爲參數傳遞給Person類構建對象->轉爲dataset
val peopleDF=spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
//註冊一個dataset臨時視圖
peopleDF.createOrReplaceTempView("people")
//使用sql執行標準sql語句,這裏的name和age是Person類的成員對象,要保持一致
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 20")
//使用map進行轉換,teenager 是一個數組,根據下標取得數據
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Age: " + teenager(1)).show()
//根據列名取數
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
//dataset要進行map操做,要先定義一個Encoder,不支持map會給升級帶來較大麻煩
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
-----------people.txt數據-----------apache
經過編程接口指定Schema
當JavaBean不能被預先定義的時候(好比不一樣用戶解析同一行,解析結果字段可能就不一樣),編程建立DataFrame分爲三步:
● 從原來的RDD建立一個Row格式的RDD
● 建立與RDD中Rows結構匹配的StructType,經過該StructType建立表示RDD的Schema
● 經過SparkSession提供的createDataFrame方法建立DataFrame,方法參數爲RDD的Schema
----------------------------------
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
//定義一個shema列名,string類型
val schemaString = "name age"
//根據schema列名生成schema,經過StructType方式生成schema
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
//將RDD記錄轉爲RowS 形式
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
// 建立dataFrame,將schema和文件內容RDD結合在一塊兒了 val peopleDF = spark.createDataFrame(rowRDD, schema)
//建立臨時視圖
peopleDF.createOrReplaceTempView("people")
//執行sql
val results = spark.sql("SELECT name FROM people")
results.map(attributes => "Name: " + attributes(0)).show()
2.8 彙集函數
DataFrames內置了常見的聚合函數,如min,max,count,distinct等,都是爲DataFrame,用戶也能夠定義本身的彙集函數
------------------
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble/buffer.getLong(1)
}
//////////////////////////////////////////////////
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json")
df.createOrReplaceTempView("employees") df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()
備註:
1)employe.json,這個文件在個人網盤下,spark沒有自帶該文件
連接:http://pan.baidu.com/s/1bpqzII7 密碼:kuyv
2)若是你是file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json方式讀取本地文件,你得將employe.json分發到各個節點指定的目錄examples/src/main/resources/employe下面,不然會報錯。
3.Spark數據源
Spark SQL經過DataFrame接口,能夠支持對多種數據源的操做。DataFrame可使用關係轉換來進行操做,並且能夠用來建立臨時視圖。將DataFrame註冊爲臨時視圖能夠容許你在數據上運行SQL查詢語句。本節講解使用SparkData Source加載數據和保存數據的通用方法,而後
詳細講述內部支持的數據源可用的特定操做。
3.1 通用Load/Save函數
Spark默認數據源格式將被用於全部的操做,默認是parquet文件格式,使用spark.sql.sources.default指定默認文件格式
--------------------
val usersDF = spark.read.load("/tmp/namesAndAges.parquet") usersDF.select("name", "age").write.save("namesAndAgestest.parquet")
--------------------
手動指定文件格式
你能夠手動指定數據源以及數據源附帶的額外選項。數據源被他們的徹底限定名來指定(如,org.apache.spark.sql.parquet),但對於內部支持的數據源,你可使用短名(json,parquet,jdbc)。DataFrame可使用這種語法從任何能夠轉換爲其餘類型的數據源加載數據。
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("/tmp/namesAndAges.parquet")
在文件上直接執行SQL
除了使用讀取API加載一個文件到DATAFrame而後查詢它的方式,你一樣能夠經過SQL直接查詢文件。
val sqlDF=spark.sql("SELECT name FROM parquet.`/tmp/namesAndAges.parquet`")保存模式
保存操做可選SaveMode,它指定了如何處理現有的數據。須要重視的一點是這些保存模式沒有使用任何的鎖,而且不具備原子性。此外,當執行Overwrite時,數據將先被刪除,而後寫出新數據。
(1)Overwrite
若是有文件存在,新內容會覆蓋原有內容
---------------
import org.apache.spark.sql.SaveMode
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
//覆蓋模式,原來有文件存在會先刪除,在寫入
peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()(2)Append
若是文件存在,就在原有的文件中追加新增內容
----------------------
import org.apache.spark.sql.SaveMode
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
//覆蓋模式,原來有文件存在會先刪除,在寫入
peopleDF.select("name", "age").write.mode(SaveMode.Append).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()(3)Ignore
若是有文件存在, 則不發生任何事情,和create table if not exists 同樣的功能
peopleDF.select("name", "age").write.mode(SaveMode.Ignore).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()(4)ErrorIfExists
若是文件存在,就報錯,默認就是這個模式
若是有文件存在, 則不發生任何事情,和create table if not exists 同樣的功能
peopleDF.select("name", "age").write.mode(SaveMode.ErrorIfExists).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()保存數據到hive表
能夠經過saveAsTable方法將DataFrames存儲到表中,現有的hive版本不支持該功能。與registerTempTable方法不一樣的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中存儲元數據。存儲一個DataFrame,可使用SQLContext的table方法。table先建立一個表,方法參數爲要建立的表的表名,而後將DataFrame持久化到這個表中。
默認的saveAsTable方法將建立一個「managed table」,表示數據的位置能夠經過metastore得到。當存儲數據的表被刪除時,managed table也將自動刪除。
目前,saveAsTable不支持一外表的方式將dataframe內容保存到外表,須要打一個patch才能實現
從spark2.1開始,持久化源數據到表中的元數據,在hive中也能夠進行分區存儲,這樣
· 查詢時只須要返回須要的分區數據,不須要查詢所有分區數據
· DDL語句,如ALTER TABLE PARTITION ... SET LOCATION,這樣的語句可使用Datasource API來實現。
3.2 Parquets文件格式
3.2.1 讀取Parquet文件(Loading Data Programmatically)
Parquet是一種支持多種數據處理列存儲數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。
讀取Parquet文件示例以下:
---------------
import spark.implicits._
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
peopleDF.write.parquet("/tmp/people.parquet")
val parquetFileDF = spark.read.parquet("people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()--------------------
3.2.2 解析分區信息
對錶進行分區是對數據進行優化的方式之一。在分區的表內,數據經過分區列將數據存儲在不一樣的目錄下。Parquet數據源如今可以自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列爲gender和country,使用下面的目錄結構:
經過傳遞path/to/table給SparkSession.read.parquet or SparkSession.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema以下:
須要注意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數爲:spark.sql.sources.partitionColumnTypeInference.enabled,默認值爲true。若是想關閉該功能,直接將該參數設置爲disabled。此時,分區列數據格式將被默認設置爲string類型,再也不進行類型解析。注意要解析的路徑寫法問題,是寫相對路徑仍是絕對路徑,
3.2.3 Schema合併
像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。如今Parquet數據源能自動檢測這種狀況,併合並這些文件的schemas。
由於Schema合併是一個高消耗的操做,在大多數狀況下並不須要,因此Spark SQL從1.5.0開始默認關閉了該功能。能夠經過下面兩種方式開啓該功能:
● 當數據源爲Parquet文件時,將數據源選項mergeSchema設置爲true
● 設置全局SQL選項spark.sql.parquet.mergeSchema爲true
----------------
import spark.implicits._
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("/tmp/test_table1/key=1")
squaresDF .printSchema()
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("/tmp/test_table1/key=2")
cubesDF.printSchema()
val mergedDF = spark.read.option("mergeSchema", "true").parquet("/tmp/test_table1")
mergedDF.printSchema()
3.2.4 Hive metastore Parquet錶轉換
Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet錶轉換爲enabled時,表修改後緩存的元數據並不能刷新。因此,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例以下:
--------------
sqlContext.refreshTable("my_table")
--------------
配置Parquet可使用SparkSession的setConf方法或使用SQL執行SET key=value命令。詳細參數說明以下:3.3 JSON數據集
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。
須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:
--------------- val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) peopleDF.printSchema() peopleDF.createOrReplaceTempView("people") val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()val otherPeopleRDD = spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD)
otherPeople .printSchema() otherPeople.show()
3.4 Hive表
------------------
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) val warehouseLocation = "/user/hive/warehouse" val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate() import spark.implicits._
import spark.sql sql("CREATE TABLE IF NOT EXISTS src(key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src") sql("SELECT * FROM src").show()經過hive能夠發下新增的src表
sql("SELECT COUNT(*) FROM src").show() val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
sqlDF .show() // The items in DaraFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()// You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
-------------
備註:
1)將hive-site.xml拷貝到spark conf目錄下,全部節點都要有
2)將mysql驅動包分發到spark全部節點的jar目錄下
3.4.1 訪問不一樣版本的Hive Metastore
Spark SQL常常須要訪問Hive metastore,Spark SQL能夠經過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。注意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操做(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:3.5 JDBC鏈接其餘庫
bin/spark-shell --driver-class-path jars/ojdbc14.jar --jars jars/ojdbc14.jar --master spark://master01:7077
----------------------
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "dataArrive").option("user", "maptest") .option("password", "maptest789").load()
jdbcDF.show()//建立臨時視圖
jdbcDF.createOrReplaceTempView("dataArrvie")
//執行sql
val results = spark.sql("SELECT * FROM dataArrvie")
results.map(attributes => "JobaName: " + attributes(0)).show()----------------------寫數據到mysql
import java.util.Properties
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "ApplyJob").option("user", "maptest") .option("password", "maptest789").load()
jdbcDF.createOrReplaceTempView("ApplyJob")
//執行sql
val results = spark.sql("SELECT * FROM ApplyJob")
results.write.mode("append").jdbc("jdbc:mysql://192.168.1.121:3306/sjh", "ApplyJob2", connectionProperties)
故障排除(Troubleshooting)
● 在客戶端session和全部的executors上,JDBC driver必須對啓動類加載器(primordial class loader)設置爲visible。由於當建立一個connection時,Java的DriverManager類會執行安全驗證,安全驗證將忽略全部對啓動類加載器爲非visible的driver。一個很方便的解決方法是,修改全部worker節點上的compute_classpath.sh腳本,將driver JARs添加至腳本。
● 有些數據庫(例:H2)將全部的名字轉換爲大寫,因此在這些數據庫中,Spark SQL也須要將名字所有大寫。
4 性能調優
4.1 緩存數據至內存
Spark SQL能夠經過調用spark.catalog.cacheTable("tableName") 或者dataFrame.cache(),將表用列式存儲格式( an in­memory columnar format)緩存至內存中。而後Spark SQL在執行查詢任務時,只需掃描必需的列,從而以減小掃描數據量、提升性能。經過緩存數據,Spark SQL還能夠自動調節壓縮,從而達到最小化內存使用率和下降GC壓力的目的。調用sqlContext.uncacheTable("tableName")可將緩存的數據移出內存。可經過兩種配置方式開啓緩存數據功能:
● 使用SparkSession的setConf方法
● 執行SQL命令 SET key=value4.2 調優參數
能夠經過配置下表中的參數調節Spark SQL的性能。在後續的Spark版本中將逐漸加強自動調優功能,下表中的參數在後續的版本中或許將再也不須要配置。
5 分佈式SQL引擎
使用Spark SQL的JDBC/ODBC或者CLI,能夠將Spark SQL做爲一個分佈式查詢引擎。終端用戶或應用不須要編寫額外的代碼,能夠直接使用Spark SQL執行SQL查詢。
5.1 運行Thrift JDBC/ODBC服務
這裏運行的Thrift JDBC/ODBC服務與Hive 1.2.1中的HiveServer2一致。能夠在Spark目錄下執行以下命令來啓動JDBC/ODBC服務,
命令:./sbin/start-thriftserver.sh
這個命令接收全部 bin/spark-submit 命令行參數,添加一個 --hiveconf 參數來指定Hive的屬性。詳細的參數說明請執行
命令: ./sbin/start-thriftserver.sh --help
服務默認監聽端口爲localhost:10000。有兩種方式修改默認監聽端口:
● 修改環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \ ...
● 修改系統屬性
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri> ...
使用 beeline 來測試Thrift JDBC/ODBC服務:
./bin/beeline
鏈接到Thrift JDBC/ODBC服務
beeline> !connect jdbc:hive2://localhost:10000
在非安全模式下,只須要輸入機器上的一個用戶名便可,無需密碼。在安全模式下,beeline會要求輸入用戶名和密碼。安全模式下的詳細要求,請閱讀beeline documentation的說明。
配置Hive須要替換 conf/ 目錄下的 hive-site.xml。
Thrift JDBC服務也支持經過HTTP傳輸發送thrift RPC messages。開啓HTTP模式須要將下面的配參數配置到系統屬性或 conf/: 下的 hive-site.xml中
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
測試http模式,可使用beeline連接JDBC/ODBC服務:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
5.2 運行Spark SQL CLI
Spark SQL CLI能夠很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。須要注意的是,Spark SQL CLI不能與Thrift JDBC服務交互。
在Spark目錄下執行以下命令啓動Spark SQL CLI:
./bin/spark-sql
配置Hive須要替換 conf/ 下的 hive-site.xml 。
執行 ./bin/spark-sql --help 可查看詳細的參數說明 。
6.參考資料
1)http://spark.apache.org/docs/latest/sql-programming-guide.html
Spark SQL官方網站
2) http://www.cnblogs.com/BYRans/p/5057110.html3).http://www.cnblogs.com/BYRans/p/5057110.html4).http://blog.csdn.net/yhao2014/article/details/522159665).http://www.tuicool.com/articles/yEZr6ve case class與普通class區別