二、建立DataFrame的方式

#1:DataFrame其實它是經過RDD的map方法讀取每一條數據,而後把他存到「case class」中。最後經過這個RDD的toDF方法產生的。 #因爲case class 中有屬性字段,並且這些字段的類型都有了,是否是就是表結構??sql

#2:能夠經過sqlContext.read.format("jdbc").options()建立DataFrame(詳情筆記三)apache

#如何建立?代碼以下json

package com.liufu.org.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by liufu on 2016/11/18.
  */
object DataFrameTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("dataframTest").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)
    //建立SQLContext
    val sqlContext = new SQLContext(sc)

    //讀取外部數據,而且切割成每一條信息,造成arr數組。
    val spilted: RDD[Array[String]] = sc.textFile("file:///E:/words.txt").map(_.split(" "))

    //這裏的操做很容易出錯,
    //當map中的邏輯很複雜的時候,通常不建議使用「_」來代替每個數據,
    // 而是使用「函數體」的形式來一步一步的操做實現。
    val personRDD: RDD[Person] = spilted.map(valueArr => Person(valueArr(0),valueArr(1).toInt))

    /**
      * 導入隱式轉換,若是不到人沒法將RDD轉換成DataFrame
      * 將上述有RDD和case class產生的personSet變化成DataFrame。
      */
    import sqlContext.implicits._
    val dF: DataFrame = personRDD.toDF()

    //將DataFrame註冊成表結構纔可以使用SQL進行操做。
    dF.registerTempTable("person")

    //使用sqlContext執行sql語句進行查詢操做。
    val selectDF: DataFrame = sqlContext.sql("select * from person")
    selectDF.show()

    sc.stop()
  }
}

//經過樣例類來定義表結構。還能夠經過structType產生一個shema的方式,可是太麻煩通常不怎麼用
case class Person(name:String,age:Int)

#總結: 使用DataFrame的流程數組

  • 1:RDD的map方法中將信息寫入到Person樣例類中,這樣數據就和表結構關聯起來了。函數

    val personRDD: RDD[Person] = spilted.map(valueArr => Person(valueArr(0),valueArr(1).toInt))
  • 2:將RDD轉化爲DataFrame。注意:必定要引入sqlContext的隱式轉換。spa

    /**
        * 導入隱式轉換,若是不到人沒法將RDD轉換成DataFrame
        * 將上述有RDD和case class產生的personSet變化成DataFrame。
        */
      import sqlContext.implicits._
      val dF: DataFrame = personRDD.toDF()
  • 3:將DataFrame註冊成表,由於只有註冊成表以後纔可以使用sql語句來查詢code

    dF.registerTempTable("person")
  • 4:利用sqlContext對象來執行SQL語句來對錶進行操做。orm

    val selectDF: DataFrame = sqlContext.sql("select * from person")
      //展現。 
      selectDF.show()
  • 5://將結果以JSON的方式存儲到指定位置對象

    df.write.json("file:///E:/output1")
相關文章
相關標籤/搜索