Spark SQL基本操做以及函數的使用

引語:

    本篇博客主要介紹了Spark SQL中的filter過濾數據、去重、集合等基本操做,以及一些經常使用日期函數,隨機函數,字符串操做等函數的使用,並列編寫了示例代碼,同時還給出了代碼當中用到的一些數據,放在最文章最後。sql

SparkSQL簡介

    Spark SQL是Spark生態系統中很是重要的組件,其前身爲Shark。Shark是Spark上的數據倉庫,最初設計成與Hive兼容,可是該項目於2014年開始中止開發,轉向Spark SQL。Spark SQL全面繼承了Shark,並進行了優化。 Spark SQL增長了SchemaRDD(即帶有Schema信息的RDD),使用戶能夠在Spark SQL中執行SQL語句,數據既能夠來自RDD,也能夠來自Hive、HDFS、Cassandra等外部數據源,還能夠是JSON格式的數據。Spark SQL目前支持Scala、Java、Python三種語言,支持SQL-92規範。數據庫

Spark SQL的優勢

    Spark SQL能夠很好地支持SQL查詢,一方面,能夠編寫Spark應用程序使用SQL語句進行數據查詢,另外一方面,也可使用標準的數據庫鏈接器(好比JDBC或ODBC)鏈接Spark進行SQL查詢 。apache

Spark SQL基本操做

    去重json

        distinct:根據每條數據進行完整去重。app

        dropDuplicates:根據字段去重。函數

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/**
  * 類名  DistinctDemo
  * 做者   彭三青
  * 建立時間  2018-11-29 15:02
  * 版本  1.0
  * 描述: $ 去重操做:distinct、drop
  */
​
object DistinctDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Operations")
      .getOrCreate()
    import spark.implicits._
​
    val employeeDF: DataFrame = spark.read.json("E://temp/person.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​
    println("--------------------distinct---------------------")
    // 根據每條數據進行完整的去重
    employeeDS.distinct().show()
​
    println("--------------------dropDuplicates---------------------")
    // 根據字段進行去重
    employeeDS.dropDuplicates(Seq("name")).show()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

    過濾優化

        filter():括號裏的參數能夠是過濾函數、函數返回的Boolean值(爲true則保留,false則過濾掉)、列名或者表達式。ui

        except:過濾出當前DataSet中有,但在另外一個DataSet中不存在的。spa

        intersect:獲取兩個DataSet的交集。scala

    提示:except和intersect使用的時候必需要是相同的實例,若是把另一個的Employee換成一個一樣的字段的Person類就會報錯。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/**
  * 類名  FilterDemo
  * 做者   彭三青
  * 建立時間  2018-11-29 15:09
  * 版本  1.0
  * 描述: $
  */
​
object FilterDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._
​
    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
    val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json")
    val employee2DS: Dataset[Employee] = employee2DF.as[Employee]
​
    println("--------------------employee--------------------")
    employeeDS.show()
​
    println("--------------------employee2--------------------")
    employee2DS.show()
​
    println(
      "       ┏┓   ┏┓\n" +
      "     ┏┛┻━━━┛┻┓\n" +
      "   ┃       ┃\n" +
      "   ┃   ━   ┃\n" +
      "   ┃ ┳┛ ┗┳ ┃\n" +
      "   ┃       ┃\n" +
      "   ┃   ┻   ┃\n" +
      "   ┃       ┃\n" +
      "   ┗━┓   ┏━┛\n" +
      "     ┃   ┃\n" +
      "      ┃   ┃\n" +
      "     ┃   ┗━━━┓\n" +
      "     ┃       ┣┓\n" +
      "     ┃       ┏┛\n" +
      "     ┗┓┓┏━┳┓┏┛\n" +
      "      ┃┫┫ ┃┫┫\n" +
      "      ┗┻┛ ┗┻┛\n"
    )
​
    println("-------------------------------------------------")
​
    // 若是參數返回true,就保留該元素,不然就過濾掉
    employeeDS.filter(employee => employee.age == 35).show()
    employeeDS.filter(employee => employee.age > 30).show()
    // 獲取當前的DataSet中有,可是在另一個DataSet中沒有的元素
    employeeDS.except(employee2DS).show()
    // 獲取兩個DataSet的交集
    employeeDS.intersect(employee2DS).show()
​
    spark.stop()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

    集合

        collect_set:將一個分組內指定字段的值都收集到一塊兒,不去重

        collect_list:講一個分組內指定字段的值都收集到一塊兒,會去重

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/**
  * 類名  CollectSetAndList
  * 做者   彭三青
  * 建立時間  2018-11-29 15:24
  * 版本  1.0
  * 描述: $ collect_list、 collect_set
  */
​
object CollectSetAndList {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._
​
    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​
    // collect_list:將一個分組內指定字段的值都收集到一塊兒,不去重
    // collect_set:同上,但惟一區別是會去重
    employeeDS
      .groupBy(employeeDS("depId"))
      .agg(collect_set(employeeDS("name")), collect_list(employeeDS("name")))
      .show()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

    joinWith和sort

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/**
  * 類名  JoinAndSort
  * 做者   彭三青
  * 建立時間  2018-11-29 15:19
  * 版本  1.0
  * 描述: $
  */
​
object JoinAndSort {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("FilterDemo")
      .getOrCreate()
    import spark.implicits._
​
    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
    val departmentDF: DataFrame = spark.read.json("E://temp/department.json")
    val departmentDS: Dataset[Department] = departmentDF.as[Department]
​
    println("----------------------employeeDS----------------------")
    employeeDS.show()
    println("----------------------departmentDS----------------------")
    departmentDS.show()
    println("------------------------------------------------------------")
​
    // 等值鏈接
    employeeDS.joinWith(departmentDS, $"depId" === $"id").show()
    // 按照年齡進行排序,並降序排列
    employeeDS.sort($"age".desc).show()
  }
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

函數的使用

    日期函數:

        current_time():獲取當前日期。

        current_timestamp():獲取當前時間戳。

    數學函數

        rand():生成0~1之間的隨機數

        round(e: column,scale: Int ):column列名,scala精確到小數點的位數。

        round(e: column):一個參數默認精確到小數點1位。

    字符串函數

        concat_ws(seq: String, exprs: column*):字符串拼接。參數seq傳入的拼接的字符,column傳入的須要拼接的字符,能夠指定多個列,不一樣列之間用逗號隔開。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/**
  * 類名  FunctionsDemo
  * 做者   彭三青
  * 建立時間  2018-11-29 15:56
  * 版本  1.0
  * 描述: $
  */
​
object FunctionsDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Operations")
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._
​
    val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")
    val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​
    employeeDS
        .select(employeeDS("name"), current_date(), current_timestamp(),
          rand(), round(employeeDS("salary"), 2),// 取隨機數,
          concat(employeeDS("gender"), employeeDS("age")),
          concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()
​
    spark.stop()
  }
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

    數據

        employee.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}

        employee2.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}

        department.json

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
相關文章
相關標籤/搜索