Spark(十二)SparkSQL簡單使用

1、SparkSQL的進化之路

1.0之前:   Sharkjava

1.1.x開始:SparkSQL(只是測試性的)  SQLmysql

1.3.x:          SparkSQL(正式版本)+Dataframesql

1.5.x:          SparkSQL 鎢絲計劃數據庫

1.6.x:       SparkSQL+DataFrame+DataSet(測試版本)apache

 2.x:編程

  •      SparkSQL+DataFrame+DataSet(正式版本)
  •      SparkSQL:還有其餘的優化
  •      StructuredStreaming(DataSet)

Spark on Hive和Hive on Sparkjson

  • Spark on Hive: Hive只做爲儲存角色,Spark負責sql解析優化,執行。
  • Hive on Spark:Hive即做爲存儲又負責sql的解析優化,Spark負責執行。

2、認識SparkSQL

2.1 什麼是SparkSQL?

spark SQL是spark的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象就是DataFrame。服務器

2.2 SparkSQL的做用

提供一個編程抽象(DataFrame) 而且做爲分佈式 SQL 查詢引擎分佈式

DataFrame:它能夠根據不少源進行構建,包括:結構化的數據文件,hive中的表,外部的關係型數據庫,以及RDDoop

2.3 運行原理

將 Spark SQL 轉化爲 RDD, 而後提交到集羣執行

2.4 特色

(1)容易整合

(2)統一的數據訪問方式

(3)兼容 Hive

(4)標準的數據鏈接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession爲用戶提供了統一的切入點,來讓用戶學習spark的各項功能。 
  在spark的早期版本中,SparkContext是spark的主要切入點,因爲RDD是主要的API,咱們經過sparkcontext來建立和操做RDD。對於每一個其餘的API,咱們須要使用不一樣的context。例如,對於Streming,咱們須要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。可是隨着DataSet和DataFrame的API逐漸成爲標準的API,就須要爲他們創建接入點。因此在spark2.0中,引入SparkSession做爲DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。爲了向後兼容,SQLContext和HiveContext也被保存下來。 
   
  SparkSession實質上是SQLContext和HiveContext的組合(將來可能還會加上StreamingContext),因此在SQLContext和HiveContext上可用的API在SparkSession上一樣是可使用的。SparkSession內部封裝了sparkContext,因此計算其實是由sparkContext完成的。

特色:

   ---- 爲用戶提供一個統一的切入點使用Spark 各項功能

        ---- 容許用戶經過它調用 DataFrame 和 Dataset 相關 API 來編寫程序

        ---- 減小了用戶須要瞭解的一些概念,能夠很容易的與 Spark 進行交互

        ---- 與 Spark 交互之時不須要顯示的建立 SparkConf, SparkContext 以及 SQlContext,這些對象已經封閉在 SparkSession 中

2.7 DataFrames   

在Spark中,DataFrame是一種以RDD爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及做用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提高運行時效率的目標。反觀RDD,因爲無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

3、RDD轉換成爲DataFrame

使用spark1.x版本的方式

測試數據目錄:spark/examples/src/main/resources(spark的安裝目錄裏面)

people.txt

3.1 經過 case class 建立 DataFrames(反射)

//定義case class,至關於表結構
case class People(var name:String,var age:Int)
object TestDataFrame1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val context = new SQLContext(sc)
    // 將本地的數據讀入 RDD, 並將 RDD 與 case class 關聯
    val peopleRDD = sc.textFile("E:\\666\\people.txt")
      .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
    import context.implicits._
    // 將RDD 轉換成 DataFrames
    val df = peopleRDD.toDF
    //將DataFrames建立成一個臨時的視圖
    df.createOrReplaceTempView("people")
    //使用SQL語句進行查詢
    context.sql("select * from people").show()
  }
}

運行結果

3.2 經過 structType 建立 DataFrames(編程接口)

object TestDataFrame2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val fileRDD = sc.textFile("E:\\666\\people.txt")
    // 將 RDD 數據映射成 Row,須要 import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = fileRDD.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).trim.toInt)
    })
    // 建立 StructType 來定義結構
    val structType: StructType = StructType(
      //字段名,字段類型,是否能夠爲空
      StructField("name", StringType, true) ::
      StructField("age", IntegerType, true) :: Nil
    )
    /**
      * rows: java.util.List[Row],
      * schema: StructType
      * */
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

運行結果

3.3 經過 json 文件建立 DataFrames

object TestDataFrame3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df: DataFrame = sqlContext.read.json("E:\\666\\people.json")
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

4、DataFrame的read和save和savemode

4.1 數據的讀取

object TestRead {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")
    //方式二
    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")
    //方式三,默認是parquet格式
    val df5 = sqlContext.load("E:\\666\\users.parquet")
  }
}

4.2 數據的保存

object TestSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    //方式一
    df1.write.json("E:\\111")
    df1.write.parquet("E:\\222")
    //方式二
    df1.write.format("json").save("E:\\333")
    df1.write.format("parquet").save("E:\\444")
    //方式三
    df1.write.save("E:\\555")

  }
}

4.3 數據的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

5、數據源

5.1 數據源只json

參考4.1

5.2 數據源之parquet

參考4.1

5.3 數據源之Mysql

object TestMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "dbs"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //須要傳入Mysql的URL、代表、properties(鏈接數據庫的用戶名密碼)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("dbs")
    sqlContext.sql("select * from dbs").show()

  }
}

運行結果

5.4 數據源之Hive

(1)準備工做

在pom.xml文件中添加依賴

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

開發環境則把resource文件夾下添加hive-site.xml文件,集羣環境把hive的配置文件要發到$SPARK_HOME/conf目錄下

<configuration>
        <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                <description>JDBC connect string for a JDBC metastore</description>
                <!-- 若是 mysql 和 hive 在同一個服務器節點,那麼請更改 hadoop02 爲 localhost -->
        </property>
        <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>com.mysql.jdbc.Driver</value>
                <description>Driver class name for a JDBC metastore</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>root</value>
                <description>username to use against metastore database</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>root</value>
        <description>password to use against metastore database</description>
        </property>
    <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/hive/warehouse</value>
                <description>hive default warehouse, if nessecory, change it</description>
        </property>  
</configuration>

(2)測試代碼

object TestHive {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show()
  }
}

運行結果

6、SparkSQL 的元數據

1.1元數據的狀態

SparkSQL 的元數據的狀態有兩種:

一、in_memory,用完了元數據也就丟了

二、hive , 經過hive去保存的,也就是說,hive的元數據存在哪兒,它的元數據也就存在哪兒。

換句話說,SparkSQL的數據倉庫在創建在Hive之上實現的。咱們要用SparkSQL去構建數據倉庫的時候,必須依賴於Hive。

2.2Spark-SQL腳本

若是用戶直接運行bin/spark-sql命令。會致使咱們的元數據有兩種狀態:

一、in-memory狀態:若是SPARK-HOME/conf目錄下沒有放置hive-site.xml文件,元數據的狀態就是in-memory

二、hive狀態:若是咱們在SPARK-HOME/conf目錄下放置了,hive-site.xml文件,那麼默認狀況下,spark-sql的元數據的狀態就是hive.

相關文章
相關標籤/搜索