Spark實戰電影點評系統(二)

2、經過DataFrame實戰電影點評系統

  DataFrameAPI是從Spark 1.3開始就有的,它是一種以RDD爲基礎的分佈式無類型數據集,它的出現大幅度下降了普通Spark用戶的學習門檻。git

  DataFrame相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以解析到具體數據的結構信息,從而對DataFrame中的數據源以及對DataFrame的操做進行了很是有效的優化,從而大幅提高了運行效率。github

  如今咱們經過實現幾個功能來了解DataFrame的具體用法。先來看第一個功能:經過DataFrame實現某部電影觀看者中男性和女性不一樣年齡分別有多少人。sql

    println("========================================")
    println("功能一:經過DataFrame實現某部電影觀看者中男性和女性不一樣年齡人數")
    // 首先把User的數據格式化,即在RDD的基礎上增長數據的元數據信息
    val schemaForUsers = StructType(
        "UserID::Gender::Age::OccupationID::Zip-code".split("::")
        .map(column => StructField(column,StringType,true))
    )
    // 而後把咱們的每一條數據變成以Row爲單位的數據
    val usersRDDRows = usersRDD.map(_.split("::")).map(
        line => Row(line(0).trim(),line(1).trim(),line(2).trim(),line(3).trim(),line(4).trim())    
    )
    // 使用SparkSession的createDataFrame方法,結合Row和StructType的元數據信息 基於RDD建立DataFrame,
    // 這時RDD就有了元數據信息的描述
    val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers)
    // 也能夠對StructType調用add方法來對不一樣的StructField賦予不一樣的類型
    val schemaforratings = StructType(
        "UserID::MovieID".split("::")
        .map(column => StructField(column,StringType,true)))
        .add("Rating",DoubleType,true)
        .add("Timestamp",StringType,true)
     val ratingsRDDRows = ratingsRDD.map(_.split("::")).map(
         line => Row(line(0).trim(),line(1).trim(),line(2).trim().toDouble,line(3).trim())    
     )
     val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings)
     // 接着構建movies的DataFrame
     val schemaformovies = StructType(
         "MovieID::Title::Genres".split("::")
         .map(column => StructField(column,StringType,true))
     )
     val moviesRDDRows = moviesRDD.map(_.split("::")).map(line => Row(line(0).trim(),line(1).trim(),line(2).trim()))
     val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies)
    // 這裏可以直接經過列名MovieID爲1193過濾出這部電影,這些列名都是在上面指定的
    /* 
     * Join的時候直接指定基於UserID進行Join,這相對於原生的RDD操做而言更加方便快捷
     * 直接經過元數據信息中的Gender和Age進行數據的篩選
     * 直接經過元數據信息中的Gender和Age進行數據的groupBy操做
     * 基於groupBy分組信息進行count統計操做,並顯示出分組統計後的前10條信息
     */
    ratingsDataFrame.filter(s"MovieID==1193")
        .join(usersDataFrame,"UserID")
        .select("Gender", "Age")
        .groupBy("Gender", "Age")
        .count().show(10)

   

  上面案例中的代碼不管是從思路上,仍是從結構上都和SQL語句十分相似,下面經過寫SQL語句的方式來實現上面的案例。數據庫

    println("========================================")
    println("功能二:用LocalTempView實現某部電影觀看者中不一樣性別不一樣年齡分別有多少人")
    // 既然使用SQL語句,那麼表確定是要有的,因此須要先把DataFrame註冊爲臨時表
    ratingsDataFrame.createTempView("ratings")
    usersDataFrame.createTempView("users")
    // 而後寫SQL語句,直接使用SparkSession的sql方法執行SQL語句便可。
    val sql_local = "SELECT Gender,Age,count(*) from users u join ratings as r on u.UserID=r.UserID where MovieID=1193 group by Gender,Age"
    spark.sql(sql_local).show(10)

   

  這篇博文主要來自《Spark大數據商業實戰三部曲》這本書裏面的第一章,內容有刪減,還有本書的一些代碼的實驗結果。隨書附贈的代碼庫連接爲:https://github.com/duanzhihua/code-of-spark-big-data-business-trilogy分佈式

相關文章
相關標籤/搜索