[Spark SQL] SparkSession、DataFrame 和 DataSet 練習

本課主題

  • DataSet 實戰

 

DataSet 實戰

SparkSession 是 SparkSQL 的入口,而後能夠基於 sparkSession 來獲取或者是讀取源數據來生存 DataFrameReader,在 Spark 2.x 版本中已經沒有 DataFrame 的 API,它變成了 DataSet[Row] 類型的數據。html

  1. 建立 SparkSession
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Basic Examples")
      .getOrCreate()
  2. 導入隱式轉換的方法
    import spark.implicits._
    import org.apache.spark.sql.types._ // 自定義schema時導入
  3. 建立 DataFrame 即 DataSet[Row] 類型數據。
    sql

    val df = spark.read.json("src/main/resources/general/people.json")
    • 能夠直接調用 DataFrame 不少很好用的方法,好比 select( ),filter( ),groupBy( )
      df.show() //打印數據,默認是前20條數據
      df.printSchema()
      df.select("name").show() //提取column是name的數據
      df.select($"name",$"age" + 1).show() //提取column是name和age+1的數據
      df.filter($"age" > 25).select("name").show() 
      df.groupBy($"age").count().show() 
  4. 也能夠自定義 case class 來建立 DataSet[Row] 類型
    val personDF = sc.textFile("src/main/resources/general/people.txt") //personRDD
      .map(x => x.split(",")) //Array[String] = Array(name, age)
      .map(attr => Person(attr(0),attr(1).trim().toInt))
      .toDF()
  5. 或者用自定義 schema 的方式
    val schemaString = "name,age"
    val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    
    val personRDD = sc.textFile("src/main/resources/general/people.txt") //personRDD
    val rowRDD = personRDD.map(_.split(",")).map(attr => Row(attr(0),attr(1).trim()))
    val personDF = spark.createDataFrame(rowRDD,schema)
    • 或者是調用 createOrReplaceTempView 方法來建立臨時表運行 SQL
      apache

      personDF.createOrReplaceTempView("people")
      val sqlDF = spark.sql("SELECT * FROM people")
      sqlDF.map(people => "Name: " + people(0)).show()
      

 

 

 

參考資料 

資料來源來至 Spark 官方網站json

相關文章
相關標籤/搜索