Spark學習之Spark SQL

Spark SQL

1、Spark SQL基礎

1Spark SQL簡介

Spark SQLSpark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。
http://spark.apache.org/sql/
html

爲何要學習Spark SQL咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。因此Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!同時Spark SQL也支持從Hive中讀取數據。java

 

Spark SQL的特色:mysql

1.容易整合(集成) 
sql

2.統一的數據訪問方式
shell

3.兼容Hive
 數據庫

4.標準的數據鏈接
apache

2、基本概念:DatasetsDataFrames

  DataFrame編程

  DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,json

例如:api

  結構化數據文件

  hive中的表

  外部數據庫或現有RDDs

DataFrame API支持的語言有ScalaJavaPythonR

從上圖能夠看出,DataFrame多了數據的結構信息,schemaRDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化

  Datasets

  Dataset是數據的分佈式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優勢(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化後的執行引擎的優勢。一個Dataset 能夠從JVM對象構造,而後使用函數轉換(mapflatMapfilter等)去操做。 Dataset API 支持ScalaJavaPython不支持Dataset API

3、測試數據

使用員工表的數據,並已經將其保存到了HDFS上。
emp.csv

dept.csv

4、建立DataFrames

*)經過Case Class建立DataFrames

① 定義case class(至關於表的結構:Schema

注意:因爲mgrcomm列中包含null值,簡單起見,將對應的case class類型定義爲String

② HDFS上的數據讀入RDD,並將RDDcase Class關聯 
 

③ RDD轉換成DataFrames

④ 經過DataFrames查詢數據
 

*)使用SparkSession

① 什麼是SparkSession

Apache Spark 2.0引入了SparkSession,其爲用戶提供了一個統一的切入點來使用Spark的各項功能,而且容許用戶經過它調用DataFrameDataset相關API來編寫Spark程序。最重要的是,它減小了用戶須要瞭解的一些概念,使得咱們能夠很容易地與Spark交互。

2.0版本以前,與Spark交互以前必須先建立SparkConfSparkContext然而在Spark 2.0中,咱們能夠經過SparkSession來實現一樣的功能,而不須要顯式地建立SparkConf, SparkContext 以及 SQLContext,由於這些對象已經封裝在SparkSession中。
 

② 建立StructType,來定義Schema結構信息
 

注意,須要:import org.apache.spark.sql.types._

③ 讀入數據而且切分數據
 

④ RDD中的數據映射成Row

注意,須要:import org.apache.spark.sql.Row

⑤ 建立DataFrames

val df = spark.createDataFrame(rowRDD,myschema)

 

再舉一個例子,使用JSon文件來建立DataFame

① 源文件:$SPARK_HOME/examples/src/main/resources/people.json

② val df = spark.read.json("源文件")

③ 查看數據和Schema信息
 

5DataFrame操做

DataFrame操做也稱爲無類型的Dataset操做

*)查詢全部的員工姓名

*)查詢全部的員工姓名和薪水,並給薪水加100塊錢

*)查詢工資大於2000的員工

*)求每一個部門的員工人數
 

完整的例子,請參考:

http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset 

*)在DataFrame中使用SQL語句

① 將DataFrame註冊成表(視圖):df.createOrReplaceTempView("emp")

② 執行查詢:spark.sql("select * from emp").show

              spark.sql("select * from emp where deptno=10").show

              spark.sql("select deptno,sum(sal) from emp group by deptno").show

 

6Global Temporary View

上面使用的是一個在Session生命週期中的臨時views。在Spark SQL中,若是你想擁有一個臨時的view,並想在不一樣的Session中共享,並且在application的運行週期內可用,那麼就須要建立一個全局的臨時view。並記得使用的時候加上global_temp做爲前綴來引用它,由於全局的臨時view是綁定到系統保留的數據庫global_temp上。

① 建立一個普通的view和一個全局的view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")

 

② 在當前會話中執行查詢,都可查詢出結果。

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show

 

③ 開啓一個新的會話,執行一樣的查詢

spark.newSession.sql("select * from emp1").show     (運行出錯)

spark.newSession.sql("select * from global_temp.emp2").show

 

7、建立Datasets

DataFrame的引入,可讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺少編譯時類型安全。爲了解決這個問題,Spark採用新的Dataset API (DataFrame API的類型擴展)
 

Dataset是一個分佈式的數據收集器。這是在Spark1.6以後新加的一個接口,兼顧了RDD的優勢(強類型,可使用功能強大的lambda)以及Spark SQL的執行器高效性的優勢。因此能夠把DataFrames當作是一種特殊的Datasets,即:Dataset(Row)

 

*)建立DataSet,方式一:使用序列

1、定義case class

    case class MyData(a:Int,b:String)

 

2、生成序列,並建立DataSet

   val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

 

3、查看結果
 

*)建立DataSet,方式二:使用JSON數據

1、定義case class

             case class Person(name: String, gender: String)

 

2、經過JSON數據生成DataFrame

             val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))

 

3、將DataFrame轉成DataSet

               df.as[Person].show

               df.as[Person].collect

 

*)建立DataSet,方式三:使用HDFS數據

1、讀取HDFS數據,並建立DataSet

                val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]

 

2、對DataSet進行操做:分詞後,查詢長度大於3的單詞

                val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

                words.show

                words.collect

 

            3、執行WordCount程序

             val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

             result.show

             排序:result.orderBy($"value").show

 

8Datasets的操做案例
  emp.json

*)使用emp.json 生成DataFrame

val empDF = spark.read.json("/root/resources/emp.json")

            查詢工資大於3000的員工

            empDF.where($"sal" >= 3000).show

 

*)建立case class

case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)

 

*)生成DataSets,並查詢數據

     val empDS = empDF.as[Emp]

 

     查詢工資大於3000的員工

     empDS.filter(_.sal > 3000).show

 

     查看10號部門的員工

     empDS.filter(_.deptno == 10).show

 

*)多表查詢

1、建立部門表

val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

 

2、建立員工表

case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))

val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS

 

3、執行多表查詢:等值連接

    val result = deptDS.join(empDS,"deptno")

    

    另外一種寫法:注意有三個等號

    val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))

    joinWithjoin的區別是鏈接後的新Datasetschema會不同

 

*)查看執行計劃:result.explain

 

2、使用數據源 

1、通用的Load/Save函數

*)什麼是parquet文件?

Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心:

能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。

壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length EncodingDelta Encoding)進一步節約存儲空間。

l 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。

l Parquet格式是Spark SQL的默認數據源,可經過spark.sql.sources.default配置

 

*)通用的Load/Save函數

讀取Parquet文件

val usersDF = spark.read.load("/root/resources/users.parquet")

查詢Schema和數據

查詢用戶的name和喜好顏色,並保存

usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")

 

l 驗證結果

*)顯式指定文件格式:加載json格式

直接加載:val usersDF = spark.read.load("/root/resources/people.json")

                      會出錯

l val usersDF = spark.read.format("json").load("/root/resources/people.json")

 

 

*)存儲模式(Save Modes

能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
 

Demo

  usersDF.select($"name").write.save("/root/result/parquet1")

--> 出錯:由於/root/result/parquet1已經存在

 

  usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

 

*)將結果保存爲表

  usersDF.select($"name").write.saveAsTable("table1")

 

也能夠進行分區、分桶等操做:partitionBybucketBy

2Parquet文件

Parquet是一個列格式並且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,全部的列被自動轉化爲nullable,由於兼容性的緣故。

*)案例:

讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。

*Schema的合併:

Parquet支持Schema evolutionSchema演變,即:合併)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。

Demo:
 

3JSON Datasets

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDDJSON文件轉換爲DataFrame

須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:

*Demo1:使用Spark自帶的示例文件 --> people.json 文件

定義路徑:

val path ="/root/resources/people.json"

 

讀取Json文件,生成DataFrame

val peopleDF = spark.read.json(path)

 

打印Schema結構信息:

peopleDF.printSchema()

 

建立臨時視圖:

peopleDF.createOrReplaceTempView("people")

 

執行查詢

spark.sql("SELECT name FROM people WHERE age=19").show

 

4、使用JDBC

Spark SQL一樣支持經過JDBC讀取其餘數據庫的數據做爲數據源。

Demo演示:使用Spark SQL讀取Oracle數據庫中的表。

啓動Spark Shell的時候,指定Oracle數據庫的驅動

spark-shell --master spark://spark81:7077            \\

         --jars /root/temp/ojdbc6.jar              \\

         --driver-class-path /root/temp/ojdbc6.jar

 

  讀取Oracle數據庫中的數據

 

 

 

*)方式一:

            val oracleDF = spark.read.format("jdbc").

         option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").

         option("dbtable","scott.emp").

         option("user","scott").

         option("password","tiger").

         load

 

*)方式二:

導入須要的類:

import java.util.Properties   

 

定義屬性:               

val oracleprops = new Properties()

oracleprops.setProperty("user","scott")

oracleprops.setProperty("password","tiger")

 

 

讀取數據:

val oracleEmpDF =

   spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",

   "scott.emp",oracleprops)

 

注意:下面是讀取Oracle 10gWindows 上)的步驟

5、使用Hive Table

首先,搭建好Hive的環境(須要Hadoop

配置Spark SQL支持Hive

  只須要將如下文件拷貝到$SPARK_HOME/conf的目錄下,便可

  $HIVE_HOME/conf/hive-site.xml

  $HADOOP_CONF_DIR/core-site.xml

  $HADOOP_CONF_DIR/hdfs-site.xml

 

使用Spark Shell操做Hive

  啓動Spark Shell的時候,須要使用--jars指定mysql的驅動程序

  建立表

  spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")

 

  導入數據

spark.sql("load data local path '/root/temp/data.txt' into table src")

 

  查詢數據

spark.sql("select * from src").show

 

使用spark-sql操做Hive

  啓動spark-sql的時候,須要使用--jars指定mysql的驅動程序

  操做Hive

  show tables;

  select * from ;

 

 

3、性能優化

1、在內存中緩存數據

性能調優主要是將數據放入內存中操做。經過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table

Demo案例:

*)從Oracle數據庫中讀取數據,生成DataFrame

     val oracleDF = spark.read.format("jdbc")

        .option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")

        .option("dbtable","scott.emp")

        .option("user","scott")

        .option("password","tiger").load

 

*)將DataFrame註冊成表:    oracleDF.registerTempTable("emp")

 

*)執行查詢,並經過Web Console監控執行的時間

                 spark.sql("select * from emp").show
 

*)將表進行緩存,並查詢兩次,並經過Web Console監控執行的時間

     spark.sqlContext.cacheTable("emp")
 

*)清空緩存:

      spark.sqlContext.cacheTable("emp")

      spark.sqlContext.clearCache

2、性能優化相關參數

1.將數據緩存到內存中的相關優化參數

  (1)spark.sql.inMemoryColumnarStorage.compressed

  默認爲 true

  Spark SQL 將會基於統計信息自動地爲每一列選擇一種壓縮編碼方式。

 

  (2)spark.sql.inMemoryColumnarStorage.batchSize

  默認值:10000

  緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOMOut Of Memory)的風險。

 

2.其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)

  (1)spark.sql.files.maxPartitionBytes

  默認值:128 MB

  讀取文件時單個分區可容納的最大字節數

 

  (2)spark.sql.files.openCostInBytes

  默認值:4M

  打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)

 

  (3)spark.sql.autoBroadcastJoinThreshold

  默認值:10M

  用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

 

  (4)spark.sql.shuffle.partitions

  默認值:200

  用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。

4、在IDEA中開發Spark SQL程序

 

1、指定Schema格式

package sparksql

import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

object SpecifyingSchema {
  def main(args: Array[String]) {
      //建立Spark Session對象
    val spark = SparkSession.builder().master("local").appName("UnderstandingSparkSession").getOrCreate()

    //從指定的地址建立RDD
    val personRDD = spark.sparkContext.textFile("D:\\temp\\student.txt").map(_.split(" "))

    //經過StructType直接指定每一個字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //將RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

    //將schema信息應用到rowRDD上
    val personDataFrame = spark.createDataFrame(rowRDD, schema)

    //註冊表
    personDataFrame.createOrReplaceTempView("t_person")

    //執行SQL
    val df = spark.sql("select * from t_person order by age desc limit 4")

    //顯示結果
    df.show()

    //中止Spark Context
    spark.stop()
  }
}

 

2、使用case class 

package demo

import org.apache.spark.sql.SparkSession

//使用case class
object Demo2 {

  def main(args: Array[String]): Unit = {
    //建立SparkSession
    val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate()

    //從指定的文件中讀取數據,生成對應的RDD
    val lineRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" "))

    //將RDD和case class 關聯
    val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))

    //生成 DataFrame,經過RDD 生成DF,導入隱式轉換
    import spark.sqlContext.implicits._
    val studentDF = studentRDD.toDF

    //註冊表 視圖
    studentDF.createOrReplaceTempView("student")

    //執行SQL
    spark.sql("select * from student").show()

    spark.stop()
  }
}

//case class 必定放在外面
case class Student(stuID:Int,stuName:String,stuAge:Int)

 

 

3、就數據保存到數據庫

package demo

import java.util.Properties

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


//做用:讀取本地一個文件, 生成對應 DataFrame,註冊表
object Demo1 {

  def main(args: Array[String]): Unit = {
    //建立SparkSession
    val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate()

    //從指定的文件中讀取數據,生成對應的RDD
    val personRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" "))

    //建立schema ,經過StructType
    val schema = StructType(
        List(
          StructField("personID",IntegerType,true),
          StructField("personName",StringType,true),
          StructField("personAge",IntegerType,true)
        )
    )

    //將RDD映射到Row RDD 行的數據上
    val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).toInt))

    //生成DataFrame
    val personDF = spark.createDataFrame(rowRDD,schema)

    //將DF註冊成表
    personDF.createOrReplaceTempView("myperson")

    //執行SQL
    val result = spark.sql("select * from myperson")

    //顯示
    //result.show()

    //將結果保存到oracle中
    val props = new Properties()
    props.setProperty("user","scott")
    props.setProperty("password","tiger")

    result.write.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props)

    //若是表已經存在,append的方式數據
    //result.write.mode("append").jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props)


    //中止spark context
    spark.stop()
  }
}
相關文章
相關標籤/搜索