Spark學習之Spark SQL

1、簡介

  Spark SQL 提供瞭如下三大功能。
   (1) Spark SQL 能夠從各類結構化數據源(例如 JSON、Hive、Parquet 等)中讀取數據。
   (2) Spark SQL 不只支持在 Spark 程序內使用 SQL 語句進行數據查詢,也支持從相似商業智能軟件 Tableau 這樣的外部工具中經過標準數據庫鏈接器(JDBC/ODBC)鏈接 SparkSQL 進行查詢。
   (3) 當在 Spark 程序內使用 Spark SQL 時,Spark SQL 支持 SQL 與常規的 Python/Java/Scala代碼高度整合,包括鏈接 RDD 與 SQL 表、公開的自定義 SQL 函數接口等。這樣一來,許多工做都更容易實現了。html

2、Spark SQL基本示例

  

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

object Test {
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf().setAppName("test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    val hiveCtx = new HiveContext(sc)
    val input = hiveCtx.jsonFile("tweets.json")
    input.registerTempTable("tweets")
    // hiveCtx.cacheTable("tweets") 緩存表
    // input.printSchema()  // 輸出結構信息
    val topTweets = hiveCtx.sql("SELECT user.name,text FROM tweets")
    topTweets.collect().foreach(println)
    
  }
}

  

3、SchemaRDD

  讀取數據和執行查詢都會返回 SchemaRDD。SchemaRDD 和傳統數據庫中的表的概念相似。從內部機理來看,SchemaRDD 是一個由 Row 對象組成的 RDD,附帶包含每列數據類型的結構信息。 Row 對象只是對基本數據類型(如整型和字符串型等)的數組的封裝。在 Spark 1.3 版本之後,SchemaRDD 這個名字被改成 DataFrame。SchemaRDD 仍然是 RDD,因此你能夠對其應用已有的 RDD 轉化操做,好比 map() 和filter() 。sql

  SchemaRDD 能夠存儲一些基本數據類型,也能夠存儲由這些類型組成的結構體和數組。數據庫

  

    

4、緩存

  Spark SQL 的緩存機制與 Spark 中的稍有不一樣。因爲咱們知道每一個列的類型信息,因此Spark 能夠更加高效地存儲數據。爲了確保使用更節約內存的表示方式進行緩存而不是儲存整個對象,應當使用專門的 hiveCtx.cacheTable("tableName") 方法。當緩存數據表時,Spark SQL 使用一種列式存儲格式在內存中表示數據。這些緩存下來的表只會在驅動器程序的生命週期裏保留在內存中,因此若是驅動器進程退出,就須要從新緩存數據。和緩存RDD 時的動機同樣,若是想在一樣的數據上屢次運行任務或查詢時,就應把這些數據表緩存起來。apache

5、讀取和存儲數據

一、 Apache Hive  

  當從 Hive 中讀取數據時,Spark SQL 支持任何 Hive 支持的存儲格式,包括文本文件、RCFiles、ORC、Parquet、Avro,以及 Protocol Buffer。要把 Spark SQL 鏈接到已經部署好的 Hive 上,你須要提供一份 Hive 配置。你只須要把你的 hive-site.xml 文件複製到 Spark 的 ./conf/ 目錄下便可。若是你只是想探索一下 Spark SQL而沒有配置 hive-site.xml 文件,那麼 Spark SQL 則會使用本地的 Hive 元數據倉,而且一樣能夠輕鬆地將數據讀取到 Hive 表中進行查詢。編程

// 使用 Scala 從 Hive 讀取
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key, value FROM mytable")
val keys = rows.map(row => row.getInt(0))

二、Parquet

  Parquet(http://parquet.apache.org/)是一種流行的列式存儲格式,能夠高效地存儲具備嵌套字段的記錄。Parquet 格式常常在 Hadoop 生態圈中被使用,它也支持 Spark SQL 的所有數據類型。Spark SQL 提供了直接讀取和存儲 Parquet 格式文件的方法。能夠經過 HiveContext.parquetFile 或者 SQLContext.parquetFile 來讀取數據。json

三、JSON

  要讀取 JSON 數據,只要調用 hiveCtx 中的 jsonFile() 方法便可。若是你想得到從數據中推斷出來的結構信息,能夠在生成的 SchemaRDD 上調用printSchema 方法。數組

四、基於RDD

  除了讀取數據,也能夠基於 RDD 建立 DataFrame。緩存

/**SQLContext 建立 DataFrame **/
  def createDtaFrame(sparkCtx:SparkContext,sqlCtx:SQLContext):Unit = {
    val rowRDD = sparkCtx.textFile("D://TxtData/studentInfo.txt").map(_.split(",")).map(p => Row(p(0),p(1).toInt,p(2)))
    val schema = StructType(
      Seq(
        StructField("name",StringType,true),
        StructField("age",IntegerType,true),
        StructField("studentNo",StringType,true)
      )
    )
    val dataDF = sqlCtx.createDataFrame(rowRDD,schema)
 
    //df註冊到內存表
    dataDF.registerTempTable("Student")
    val result = sqlCtx.sql("select * from Student")
    result.show()
 
    //    dataDF.select("name").show()
    //    dataDF.filter(dataDF("age") <14).show()
    //    dataDF.where("age <> ''").show()
  }

五、Spark SQL UDF

  用戶自定義函數,也叫 UDF,可讓咱們使用 Python/Java/Scala 註冊自定義函數,並在 SQL中調用。這種方法很經常使用,一般用來給機構內的 SQL 用戶們提供高級功能支持,這樣這些用戶就能夠直接調用註冊的函數而無需本身去經過編程來實現了。在 Spark SQL 中,編寫UDF 尤其簡單。Spark SQL 不只有本身的 UDF 接口,也支持已有的 Apache Hive UDF。ide

    //  寫一個求字符串長度的UDF  原書代碼會報錯,查閱官方文檔得知  
    hiveCtx.udf.register("strLenScala", (_: String).length)
    val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")

  注意:由於Spark版本的緣由,原書的代碼會報錯,有一個好的解決方式,就是查閱官方文檔。函數

  

6、Spark SQL性能

  Spark SQL 提供的高級查詢語言及附加的類型信息可使 SparkSQL 數據查詢更加高效。

  Spark SQL 的性能調優選項以下表所示

  

  在一個傳統的 Spark SQL 應用中,能夠在 Spark 配置中設置這些 Spark 屬性。

// 在 Scala 中打開 codegen 選項的代碼
conf.set("spark.sql.codegen", "true")

  Spark Sql官方文檔

  這篇博文主要來自《Spark快速大數據分析》這本書裏面的第九章,內容有刪減,還有本書的一些代碼的實驗結果。還要注意一點,在之後的學習中要養成查閱官方文檔的習慣。

相關文章
相關標籤/搜索