Spark提升篇——RDD/DataSet/DataFrame(二)

該部分分爲兩篇,分別介紹RDD與Dataset/DataFrame:html

1、RDDsql

2、DataSet/DataFrame數據庫

 

該篇主要介紹DataSet與DataFrame。apache

1、生成DataFrame

1.1.經過case class構造DataFrame

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataFrameTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataFrameTest")
      .getOrCreate()

    import spark.implicits._
    val caseClassDF = Seq(Person(1, "lily", 18), Person(2, "lucy", 20)).toDF("id", "name", "age")
    println("=========== show: ============")
    caseClassDF.show()
    println("=========== schema: ===========")
    caseClassDF.printSchema()

    spark.stop()
  }
}

        這裏經過「import spark.implicits._」使用了隱式Encoder,不然沒法調用「.toDF()」,這裏的spark爲上面定義的sparkSession變量,並非「import org.apache.spark」,注意不要混淆。api

        運行結果:app

1534072368_74_w963_h367.png

        能夠看到,咱們將兩個Person實例封裝爲DataFrame實例,以後即可以像操做表/視圖同樣來對其進行其餘處理了。函數

1.2.經過數值構造DataFrame

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataFrameTest {
  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataFrameTest")
      .getOrCreate()

    import spark.implicits._
    val valueDF = Seq(1, 2, 3).toDF("id")
    println("=========== show: ============")
    valueDF.show()
    println("=========== schema: ===========")
    valueDF.printSchema()

    spark.stop()
  }
}

        經過這個和上面的例子能夠看到,咱們能夠經過Seq將任何值類型對象轉換爲DataFrame對象,Seq相似於Java的List。ui

        運行結果:spa

1534072404_69_w1439_h713.png

1.3.經過SparkSession讀取數據

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataFrameTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataFrameTest")
      .getOrCreate()

    // 默認將整行定義爲value: String, 能夠爲空
    val hdfsDF = spark.read.text(dataPath)

    println("=========== show: ============")
    hdfsDF.show()
    println("=========== schema: ===========")
    hdfsDF.printSchema()

    val personEncoder: Encoder[Person] = Encoders.product

    val personDF = hdfsDF.map(row => {
      //val value = row.getAs[String]("value")
      val value = row.getString(0)
      val fields = value.trim.split(",")
      Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
    })(personEncoder).toDF()

    println("=========== show: ============")
    personDF.show()
    println("=========== schema: ===========")
    personDF.printSchema()

    spark.stop()
  }
}

        運行結果:scala

        能夠看到,「spark.read.text(dataPath)」默認將文件中的一行定義爲String類型的value字段,能夠經過get(0)、getString(0)或getAs[String]("value")來獲取value的內容。       

        這裏咱們沒有使用「import spark.implicits._」將Person隱式Encoder,而是經過「val personEncoder: Encoder[Person] = Encoders.product」顯式定義一個Encoder[Person]類型的變量,並調用「hdfsDF.map(...)(personEncoder)」來顯式Encoder,並在map以後調用".toDF"將map的結果轉換爲DataFrame,map的結果爲DataSet類型,因此DataSet能夠直接調用「.toDF」轉換爲DataFrame。若是使用「import spark.implicits._」 ,就不須要定義「personEncoder」變量,也不須要爲map的最後一個參數賦值。

1.4.經過RDD轉換爲DataFrame

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataFrameTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataFrameTest")
      .getOrCreate()

    import spark.implicits._
    val rddDF = spark.sparkContext.textFile(dataPath)
      .map(row => row.split(","))
      .map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
      .toDF("id", "name", "age")

    println("=========== show: ============")
    rddDF.show()
    println("=========== schema: ===========")
    rddDF.printSchema()

    spark.stop()
  }
}

        能夠看到,RDD轉換爲DataFrame與經過Seq生成DataFrame同樣,都須要「import spark.implicits._」,不然沒法調用「.toDF」。

        運行結果:

1534074813_9_w1440_h815.png

2、生成DataSet

2.1.經過case class構造DataSet

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataSetTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataSetTest")
      .getOrCreate()

    import spark.implicits._
    val caseClassDs = Seq(Person(1, "lily", 18), Person(2, "lucy", 20))
      .toDS()

    println("=========== show: ============")
    caseClassDs.show()
    println("=========== schema: ===========")
    caseClassDs.printSchema()

    spark.stop()
  }
}

         能夠看到,相似於1.1,只須要將「.toDF」換爲「.toDS」便可獲得DataSet類型的結果。

        運行結果:

1534076444_12_w877_h348.png

        能夠看到其結構等於DF。

2.2.經過數值構造DataSet

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataSetTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataSetTest")
      .getOrCreate()

    import spark.implicits._

    val valueDs = Seq(1, 2, 3).toDS()

    println("=========== show: ============")
    valueDs.show()
    println("=========== schema: ===========")
    valueDs.printSchema()

    spark.stop()
  }
}

         運行結果:

1534076566_68_w854_h328.png

        能夠看到,其結構相似於DF,可是列名默認爲value。

2.3.經過SparkSession讀取數據

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataSetTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataSetTest")
      .getOrCreate()

    
    // 默認將整行定義爲value: String, 能夠爲空
    val hdfsDF = spark.read.text(dataPath)

    println("=========== show: ============")
    hdfsDF.show()
    println("=========== schema: ===========")
    hdfsDF.printSchema()

    val personEncoder: Encoder[Person] = Encoders.product

    val personDs = hdfsDF.map(row => {
      val value = row.getAs[String]("value")
      //val value = row.getString(0)
      val fields = value.trim.split(",")
      Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
    })(personEncoder)

    println("=========== show: ============")
    personDs.show()
    println("=========== schema: ===========")
    personDs.printSchema()

    spark.stop()
  }
}

        經過1.3與2.3能夠看到,SparkSession讀取文件(SparkSession.read.*)獲得的爲DataFrame,DataFrame通過map、filter等操做後獲得的爲DataSet,DataSet又能夠經過「.toDF」轉換爲DataFrame,這也印證了官網對DataFrame的定義:

type DataFrame = DataSet[Row]

2.4.經過RDD轉換爲DataSet

package com.personal.test

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

object DataSetTest {

  case class Person(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val dataPath = "hdfs://192.168.60.164:9000/user/name/input"

    val spark = SparkSession
      .builder()
      .appName("DataSetTest")
      .getOrCreate()

    import spark.implicits._
    val rddDS = spark.sparkContext.textFile(dataPath)
      .map(row => row.split(","))
      .map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
      .toDS()

    println("=========== show: ============")
    rddDS.show()
    println("=========== schema: ===========")
    rddDS.printSchema()

    spark.stop()
  }
}

         相似於1.4,只須要將「.toDF」替換爲「.toDS」便可獲得DataSet類型的結果。

 

        經過以上(一)、(二)內容咱們看到了如何將文件、數值、對象、RDD轉換爲DataSet/DataFrame,以及DataSet與DataFrame之間如何互轉,如何隱式/顯式使用Encoder。

接下來介紹基於DataSet/DataFrame能夠進行哪些操做。

3、基於DataSet/DataFrame的操做

        除了上例中用到的map,還有filter、count、first、foreach、reduce等一樣能夠基於RDD進行的操做,此外,還有幾個特殊操做:

3.1. select

        select能夠接受一個或多個參數,用於從DataSet/DataFrame中獲取指定字段。

3.2. createOrReplaceTempView

        createOrReplaceTempView用於將DataSet/DataFrame的數據臨時保存爲視圖,方便後續使用SparkSession.sql()進行操做,Session結束時生命週期結束。

3.3. printSchema

        如(一)(二)中示例所示,printSchema用於打印DataSet/DataFrame數據集的樹形結構定義。

3.4. withColumnRenamed

        withColumnRenamed用於對列重命名,相似於sql語句「select a as aa, b as bb from table」中的as。

3.5. join

        join用於按指定的join表達式與join類型(默認爲inner join)將另外一個DataSet/DataFrame與當前DataSet/DataFrame合併。

 

        這裏舉一個不完整的例子,用以演示基於DataSet/DataFrame的操做。

    val exposureLogEncoder: Encoder[ExposureLog] = Encoders.product
    val exposureLogTupleEncoder: Encoder[Tuple1[ExposureLog]] = Encoders.product
    // 計數器
    val dataCounter = spark.sparkContext.longAccumulator("dataCounter")
    val legalDataCounter = spark.sparkContext.longAccumulator("legalDataCounter")
    val illegalAvCounter = spark.sparkContext.longAccumulator("illegalAvCounter")
    val illegalKvCounter = spark.sparkContext.longAccumulator("illegalKvCounter")
    val illegalReportkeyCounter = spark.sparkContext.longAccumulator("illegalReportkeyCounter")

    // 1.曝光日誌: select id,ei,av,ui,kv from mta_t_boss_v1_5362
    val exposureLogDF = HiveUtil.readTableByPartition(tdw, exposure, spark, partition)
    if (exposureLogDF == null) {
      System.exit(2)
    }
    val exposureLogDS = exposureLogDF.select("id", "ei", "ei", "av", "ui", "kv", "log_time")
      .filter(row => {
        dataCounter.add(1)
        val av = row.getAs[String]("av")
        if (av == null
          || av.trim.startsWith("1.6.2")
          || av.compareTo("0.9.5")<0) {
          illegalAvCounter.add(1)
          false
        }
        else true
      })
      .filter(row => {
        val kv = row.getAs[String]("kv")
        if (kv == null || kv.trim.length == 0) {
          illegalKvCounter.add(1)
          false
        }
        else true
      })
      .map(row =>parseExposure(row))(exposureLogTupleEncoder)
      .filter(exposure => {
        val obj = exposure._1
        if (obj == null) {
          illegalReportkeyCounter.add(1)
          false
        }
        else {
          legalDataCounter.add(1)
          true
        }
      })
      .map(row => row._1)(exposureLogEncoder)

    val resultCount = exposureLogDS.persist().count()
    println(s"Data count: ${dataCounter.value}")
    println(s"Legal data count: ${legalDataCounter.value}")
    println(s"Result count: ${resultCount}")
    println(s"Illegal av count: ${illegalAvCounter.value}")
    println(s"Illegal kv count: ${illegalKvCounter.value}")
    println(s"Illegal reportKey count: ${illegalReportkeyCounter.value}")


    // 1.save log info to hdfs
    exposureLogDS.persist().map(exposure => exposure.toString())(Encoders.STRING)
      .write.mode(SaveMode.Overwrite).text(logSavePath)
    println(s"[INFO] save log to ${logSavePath} success.")

    exposureLogDS.persist().select("sign", "channel").createOrReplaceTempView("log")
    val sql = "select sign, channel, count(*) as data_count " +
      "from log " +
      "group by sign, channel"
    val aggDF = ss.sql(sql)

    aggDF.printSchema()
    // 2.save log statistics info to hdfs
    aggDF.map(row => row.mkString(","))(Encoders.STRING)
      .repartition(1)
      .write.mode(SaveMode.Overwrite)
      .text(logStatisticsInfoSavePath)
    println(s"[INFO] save statistics info to ${logStatisticsInfoSavePath} success.")

        注:HiveUtil.readTableByPartition()爲自定義函數,用於從hive中讀取指定數據庫/表/分區的數據,結果爲DataFrame類型。

        上例從hive中讀取數據後,使用select獲取指定字段,而後使用filter根據指定字段進行非法數據過濾,以後再調用map進行數據預處理、解析等工做,以後在調用filter進行空數據過濾,最後使用map對Tuple1拆箱。以後將處理結果經過map構造爲字符串並保存到hdfs,同時使用createOrReplaceTempView建立臨時視圖,再經過SparkSession.sql對視圖進行聚合操做,以統計sign,channel緯度的記錄數,以後使用printSchema打印sql操做後數據集的schema結構,最後將聚合後的統計信息經過map構造爲字符串保存到hdfs。能夠看到平常數據處理過程當中會常常遇到如上例通常的需求。

        另外,提一下,使用Accumulator的時候要保證只執行一次action操做,不然須要執行cache或者persist來保證計數器不重複計數,如上例中重複使用了exposureLogDS,若是不執行persist/cache會致使計數器重複計數。

        另外,注意例中第一個map返回的結構爲Tuple1[ExposureLog],之因此將ExposureLog又包了一層,是由於「Product type is represented as a row, and the entire row can not be null in Spark SQL like normal databases」,因此若是須要返回null對象,就須要對其裝箱,使返回值爲非空對象,再在後續流程(如最後一個filter 、map)中拆箱。

其餘操做能夠參考官網API:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

相關文章
相關標籤/搜索