Spark SQL

1、SparkSQL介紹sql

    1、概述:
    sparkSQL是spark用來處理結構化數據的一個模塊。
    sparkSQL提供了一個編程的抽象叫作DataFrame而且做爲咱們分佈式SQL的查詢引擎
    
    2、做用:用來處理結構化數據,先將非結構化的數據轉成結構化數據。
    
    3、SparkSQL提供了兩種編程模型:
    1)SQL的方式 select * from user;
    2)DataFrame方式(DSL)
    HQL:將SQL轉換爲mr任務
    SparkSQL:將SQL轉換爲RDD,效率快
    
    4、特色:
    1)容易整合 spark
    2)統一數據的訪問方式
    3)標準的數據鏈接
    支持JDBC/ODBC,能夠對接BI工具
    4)兼容HIVE    

2、DataFrame介紹數據庫

    與RDD相似,DataFrame也是一個分佈式數據容器。
    SparkSQL屬於SQL解析引擎。在spark,將SQL解析RDD。注意:這個RDD比較特殊,是帶有schema信息的RDD。
    這個RDD就叫DataFrame。
    DataFrame像數據庫的二維表格(有行有列表描述),它除了數據以外還記錄了數據的結構信息(schema)。
    
    與RDD區別:
    DataFrame:存放告終構化數據的描述信息
    RDD:存儲文本數據、二進制、音頻、視頻...

3、SQL風格apache

一、SqlTest1編程

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * spark2.x
  * SQL風格
  */
object SqlTest1 {
  def main(args: Array[String]): Unit = {
    //1.構建SparkSession
    val sparkSession = SparkSession.builder().appName("SqlTest1")
      .master("local[2]")
      .getOrCreate()

    //2.建立RDD
    val dataRdd: RDD[String] = sparkSession.sparkContext
      .textFile("hdfs://192.168.146.111:9000/user.txt")

    //3.切分數據
    val splitRdd: RDD[Array[String]] = dataRdd.map(_.split("\t"))

    //4.封裝數據
    val rowRdd = splitRdd.map(x => {
      val id = x(0).toInt
      val name = x(1).toString
      val age = x(2).toInt
      //封裝一行數據
      Row(id, name, age)
    })

    //5.建立schema(描述DataFrame信息) sql=表
    val schema: StructType = StructType(List(
      StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))

    //6.建立DataFrame
    val userDF: DataFrame = sparkSession.createDataFrame(rowRdd, schema)

    //7.註冊表
    userDF.registerTempTable("user_t")

    //8.寫sql
    val uSql: DataFrame = sparkSession.sql("select * from user_t order by age")

    //9.查看結果  show databases;
    uSql.show()

    //10.釋放資源
    sparkSession.stop()
  }
}

 二、user.txtapp

1    zhangsan    18
2    lisi    23
3    tom    26
4    mary    16
5    zhangsanfeng    128

三、結果分佈式

4、toDF使用工具

scala> val rdd = sc.textFile("hdfs://192.168.146.111:9000/user.txt").map(_.split("\t"))
rdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24

scala> case class User(id:Int,name:String,age:Int)
defined class User

scala> val userRdd = rdd.map(x => User(x(0).toInt,x(1),x(2).toInt))
userRdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[4] at map at <console>:28

scala> val udf = userRdd.toDF
udf: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> udf.show()
+---+------------+---+
| id|        name|age|
+---+------------+---+
|  1|    zhangsan| 18|
|  2|        lisi| 23|
|  3|         tom| 26|
|  4|        mary| 16|
|  5|zhangsanfeng|128|
+---+------------+---+


scala> udf.select("name","age").show()
+------------+---+
|        name|age|
+------------+---+
|    zhangsan| 18|
|        lisi| 23|
|         tom| 26|
|        mary| 16|
|zhangsanfeng|128|
+------------+---+


scala> udf.filter(col("id") <= 3).show()
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 18|
|  2|    lisi| 23|
|  3|     tom| 26|
+---+--------+---+


scala> udf.filter(col("id") > 3).show()
+---+------------+---+
| id|        name|age|
+---+------------+---+
|  4|        mary| 16|
|  5|zhangsanfeng|128|
+---+------------+---+


scala> udf.groupBy(("name")).count.show()
+------------+-----+                                                            
|        name|count|
+------------+-----+
|zhangsanfeng|    1|
|        mary|    1|
|    zhangsan|    1|
|         tom|    1|
|        lisi|    1|
+------------+-----+


scala> udf.sort("age").show()
+---+------------+---+
| id|        name|age|
+---+------------+---+
|  4|        mary| 16|
|  1|    zhangsan| 18|
|  2|        lisi| 23|
|  3|         tom| 26|
|  5|zhangsanfeng|128|
+---+------------+---+


scala> udf.orderBy("age").show()
+---+------------+---+
| id|        name|age|
+---+------------+---+
|  4|        mary| 16|
|  1|    zhangsan| 18|
|  2|        lisi| 23|
|  3|         tom| 26|
|  5|zhangsanfeng|128|
+---+------------+---+


scala> udf.registerTempTable("user_t")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sqlContext.sql("select * from user_t").show()
+---+------------+---+
| id|        name|age|
+---+------------+---+
|  1|    zhangsan| 18|
|  2|        lisi| 23|
|  3|         tom| 26|
|  4|        mary| 16|
|  5|zhangsanfeng|128|
+---+------------+---+


scala> spark.sqlContext.sql("select name,age from user_t where age>18").show()
+------------+---+
|        name|age|
+------------+---+
|        lisi| 23|
|         tom| 26|
|zhangsanfeng|128|
+------------+---+


scala> 

5、DSL風格ui

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  * DSL風格
  */
object SqlTest2 {
  def main(args: Array[String]): Unit = {
    //1.建立sparkSession
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("SqlTest2")
      .master("local[2]")
      .getOrCreate()

    //2.建立rdd
    val dataRDD: RDD[String] = sparkSession.sparkContext
      .textFile("hdfs://192.168.146.111:9000/user.txt")

    //3.切分數據
    val splitRDD: RDD[Array[String]] = dataRDD.map(_.split("\t"))
    val rowRDD: RDD[Row] = splitRDD.map(x => {
      val id = x(0).toInt
      val name = x(1).toString
      val age = x(2).toInt
      //Row表明一行數據
      Row(id, name, age)
    })

    val schema: StructType = StructType(List(
      //結構字段
      StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))

    //4.rdd轉換爲dataFrame
    val userDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

    //5.DSL風格 查詢年齡大於18 rdd dataFrame dataSet
    import sparkSession.implicits._
    val user1DF: Dataset[Row] = userDF.where($"age" > 18)
    user1DF.show()

    //6.關閉資源
    sparkSession.stop()
  }
}

結果:spa

6、WordCountscala

一、SqlWordCount

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object SqlWordCount {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("SqlWordCount")
      .master("local[2]")
      .getOrCreate()

    //2.加載數據 使用dataSet處理數據 dataSet是一個更加智能的rdd,默認有一列叫value
    val datas: Dataset[String] = sparkSession.read
      .textFile("hdfs://192.168.146.111:9000/words.txt")

    //3.sparkSql 註冊表/註冊視圖 rdd.flatMap
    import sparkSession.implicits._
    val word: Dataset[String] = datas.flatMap(_.split("\t"))

    //4.註冊視圖
    word.createTempView("wc_t")

    //5.執行sql wordCount
    val r: DataFrame = sparkSession
      .sql("select value as word,count(*) sum from wc_t group by value order by sum desc")

    r.show()
    sparkSession.stop()
  }
}

二、words.txt

hello    world
hello    China
hello    Beijing
haha    heihei

三、結果

7、Join操做

一、JoinDemo

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * SQL方式
  */
object JoinDemo {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("JoinDemo")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._

    //2.直接建立dataSet
    val datas1: Dataset[String] = sparkSession
      .createDataset(List("1 Tom 18", "2 John 22", "3 Mary 16"))

    //3.整理數據
    val dataDS1: Dataset[(Int, String, Int)] = datas1.map(x => {
      val fields: Array[String] = x.split(" ")
      val id = fields(0).toInt
      val name = fields(1).toString
      val age = fields(2).toInt
      //元組輸出
      (id, name, age)
    })

    val dataDF1: DataFrame = dataDS1.toDF("id", "name", "age")


    //2.建立第二份數據
    val datas2: Dataset[String] = sparkSession
      .createDataset(List("18 young", "22 old"))

    val dataDS2: Dataset[(Int, String)] = datas2.map(x => {
      val fields: Array[String] = x.split(" ")
      val age = fields(0).toInt
      val desc = fields(1).toString
      //元組輸出
      (age, desc)
    })

    //3.轉化爲dataFrame
    val dataDF2: DataFrame = dataDS2.toDF("dage", "desc")

    //4.註冊視圖
    dataDF1.createTempView("d1_t")
    dataDF2.createTempView("d2_t")

    //5.寫sql(join)
    val r = sparkSession.sql("select name,desc from d1_t join d2_t on age = dage")

    //6.觸發任務
    r.show()

    //7.關閉資源
    sparkSession.stop()
  }
}

二、結果

三、JoinDemo1

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object JoinDemo1 {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("JoinDemo1")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._

    //2.直接建立dataSet
    val datas1: Dataset[String] = sparkSession
      .createDataset(List("1 Tom 18", "2 John 22", "3 Mary 16"))

    //3.整理數據
    val dataDS1: Dataset[(Int, String, Int)] = datas1.map(x => {
      val fields: Array[String] = x.split(" ")
      val id = fields(0).toInt
      val name = fields(1).toString
      val age = fields(2).toInt
      //元組輸出
      (id, name, age)
    })

    val dataDF1: DataFrame = dataDS1.toDF("id", "name", "age")

    //2.建立第二份數據
    val datas2: Dataset[String] = sparkSession
      .createDataset(List("18 young", "22 old"))

    val dataDS2: Dataset[(Int, String)] = datas2.map(x => {
      val fields: Array[String] = x.split(" ")
      val age = fields(0).toInt
      val desc = fields(1).toString
      //元組輸出
      (age, desc)
    })

    //3.轉化爲dataFrame
    val dataDF2: DataFrame = dataDS2.toDF("dage", "desc")

    //默認方式 inner join
    //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage")
    //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "left")
    //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "right")
    //val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "left_outer")
    val r: DataFrame = dataDF1.join(dataDF2, $"age" === $"dage", "cross")

    r.show()

    //7.關閉資源
    sparkSession.stop()
  }
}

四、結果

相關文章
相關標籤/搜索