SparkSQL簡單使用

==> 什麼是 Spark SQL?sql

    ---> Spark SQL 是 Spark 用來處理結構化數據的一個模塊
數據庫

    ---> 做用:提供一個編程抽象(DataFrame) 而且做爲分佈式 SQL 查詢引擎apache

    ---> 運行原理:將 Spark SQL 轉化爲 RDD, 而後提交到集羣執行
編程

    ---> 特色:json

        ---- 容易整合
分佈式

        ---- 統一的數據訪問方式ide

        ---- 兼容 Hive函數

        ---- 標準的數據鏈接優化

==> SparkSessionspa

    ---> 特色:(2.0引用 SparkSession)

        ---- 爲用戶提供一個統一的切入點使用Spark 各項功能

        ---- 容許用戶經過它調用 DataFrame 和 Dataset 相關 API 來編寫程序

        ---- 減小了用戶須要瞭解的一些概念,能夠很容易的與 Spark 進行交互

        ---- 與 Spark 交互之時不須要顯示的建立 SparkConf, SparkContext 以及 SQlContext,這些對象已經封閉在 SparkSession 中

==> DataFrames   組織成命名列的數據集,等同於數據庫中的表

    ---> 與 RDD 相比較:

        ---- RDD                是分佈式的 Java 對象 的集合

        ---- DataFrame     是分佈式 Row 對象的集合

    ---> 建立 DataFrames

        ---- 經過 case class 建立 DataFrames

// 定義 case class (至關於表的結構)
case class Emp(Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)   

// 將 HDFS 上的數據讀入 RDD, 並將 RDD 與 case class 關聯
val lines = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))
val emp = lines.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) `

// 將RDD 轉換成 DataFrames
val empDF = emp.toDF

// 經過 DataFrames 查詢數據
empDF.show

        ---- 經過 SparkSession 建立 DataFrames

// 建立 StructType 來定義結構,注意,須要先導入模塊
import org.apache.spark.sql.types._
val myschema = StructType(List(
                StructField("empno", DataTypes.IntegerType), 
                StructField("ename", DataTypes.StringType),
                StructField("job", DataTypes.StringType),
                StructField("mgr", DataTypes.StringType),
                StructField("hiredate", DataTypes.StringType),
                StructField("sal", DataTypes.IntegerType),
                StructField("comm", DataTypes.StringType),
                StructField("deptno", DataTypes.IntegerType)
                ))
                
// 讀入數據且切分數據
val empcsvRDD = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))
// 將 RDD 數據映射成 Row,須要 import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
val rowRDD = empcsvRDD.map(line=> Row(line(0).toInt, line(1), line(2), line(3),line(4), line(5).toInt, line(6), line(7).toInt)
// 建立 DataFrames
val df = spark.createDataFrame(rowRDD, myschema)
// 查看錶
df.show

        ---- 使用 Json 文件來建立 DataFrame

val df = spark.read.json("Json 文件")
// 查看數據
df.show

    ---> DataFrame 操做  DataFrame 操做也稱爲無類型的 Dataset操做

        ---- 查詢全部員工姓名

df.select("ename").show


        ---- 查詢全部員工姓名和薪水,並給薪水加 100 元

df.select($"ename", $"sal", $"sal"+ 100).show


        ---- 查詢工資大於2000的員工

df.select($"sal" > 2000).show


        ---- 求每一個部門員工數

df.groupBy($"deptno").count.show

        ---- 在 DataFrame 中使用 SQL 語句    注: 須要首先將 DataFrame 註冊成表(視圖)

df.createOrReplaceTempView("emp")
// 執行查詢
spark.sql("select * from emp").show


    ---> 臨時視圖(2種):

        ---- 只在當前會話中有效            df.createOrReplaceTempView("emp1")

        ---- 在全局有效                    df.createGlobalTempView("emp2")



==> Datasets

    ---> 數據的分佈式集合

    --->特色:  

        ---- Spark1.6中添加的新接口,是DataFrame之上更高一級的抽象

        ---- 提供了 RDD的優勢(強類型化,使用 lambda函數的能力)

        ---- Spark SQL 優化後的執行引擎

        ---- 能夠從 JVM 對象構造,而後使用函數轉換(map, flatMap, filter等)去操做

        ---- 支持 Scala 和 Java,不支持 Python

    ---> 建立 DataSet

        ---- 使用序列

// 定義 case class
case class MyData(a:String, b:String)
// 生成序列並建立 DataSet
val ds = Seq(MyData(1, "Tom"), MyData(2, "Mary")).toDS
// 查看結果
ds.show


        ---- 使用 Json 數據

// 定義 case class 
case class Person(name:String, gender:String)
//經過 Json 數據生成 DataFrame
val df = spark.read.json(sc.parallelize("""{"gender":"Male", "name": "Tom"}""" ::Nil))
// 將 DataFrame 轉成 DataSet
df.as[Person].show
df.as[Person].collect


        ---- 經過使用 DHFS 執行 WordCount 程序 

// 讀取 HDFS 數據,並建立 DataSet
val linesDS = spark.read.text("hdfs://bigdata0:9000/input/data.txt").as[String]
// 對DataSet 進行操做:分詞後, 查詢長度大於3 的單詞
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
// 查看結果
words.show
words.collect

// 執行wordcount 程序
val result = linesDS.flatMap(_.split(" ").map((_.1)).groupByKey(x=> x._1).count)
result.show
// 排序
result.orderBy($"value").show

==> Datasets 操做

    ---> joinWith 和 join 的區別是鏈接後的新 Dataset 的 schema 會不同

// 使用 emp.json 生成 DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
// 查詢工資大於 3000 的員工
empDF.where($"sal" > 3000).show
// 建立 case class
case class Emp(empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long)
// 生成 DataSets,並查詢數據
val empDS = empDF.as[Emp]
// 查詢工資大於 3000 的員工
empDS.filter(_.sal > 3000).show
// 查看 10 號部門的員工
empDS.filter(_.deptno == 10)
// 多表查詢
// 1.建立部門表
val deptRDD = sc.textFile("/test/dept.csv").map(_.split(","))
case class Dept(deptno:Int, dname:String, loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt, x(1), x(2))).toDS

// 2.建立員工表
case class Emp(empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)
val empRDD = sc.textFile("/test/emp.csv").map(_.split(","))
val empDS = empRDD.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt))

// 3.執行多表查詢: 等值連接
val result = deptDF.join(empDS, "deptno")

// 另外一種寫法: 注意有三個等號
val result = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno"))
// 查看執行計劃
result.explain
相關文章
相關標籤/搜索