DataFrameAPI是從Spark 1.3開始就有的,它是一種以RDD爲基礎的分佈式無類型數據集,它的出現大幅度下降了普通Spark用戶的學習門檻。git
DataFrame相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以解析到具體數據的結構信息,從而對DataFrame中的數據源以及對DataFrame的操做進行了很是有效的優化,從而大幅提高了運行效率。github
如今咱們經過實現幾個功能來了解DataFrame的具體用法。先來看第一個功能:經過DataFrame實現某部電影觀看者中男性和女性不一樣年齡分別有多少人。sql
println("========================================") println("功能一:經過DataFrame實現某部電影觀看者中男性和女性不一樣年齡人數") // 首先把User的數據格式化,即在RDD的基礎上增長數據的元數據信息 val schemaForUsers = StructType( "UserID::Gender::Age::OccupationID::Zip-code".split("::") .map(column => StructField(column,StringType,true)) ) // 而後把咱們的每一條數據變成以Row爲單位的數據 val usersRDDRows = usersRDD.map(_.split("::")).map( line => Row(line(0).trim(),line(1).trim(),line(2).trim(),line(3).trim(),line(4).trim()) ) // 使用SparkSession的createDataFrame方法,結合Row和StructType的元數據信息 基於RDD建立DataFrame, // 這時RDD就有了元數據信息的描述 val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers) // 也能夠對StructType調用add方法來對不一樣的StructField賦予不一樣的類型 val schemaforratings = StructType( "UserID::MovieID".split("::") .map(column => StructField(column,StringType,true))) .add("Rating",DoubleType,true) .add("Timestamp",StringType,true) val ratingsRDDRows = ratingsRDD.map(_.split("::")).map( line => Row(line(0).trim(),line(1).trim(),line(2).trim().toDouble,line(3).trim()) ) val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings) // 接着構建movies的DataFrame val schemaformovies = StructType( "MovieID::Title::Genres".split("::") .map(column => StructField(column,StringType,true)) ) val moviesRDDRows = moviesRDD.map(_.split("::")).map(line => Row(line(0).trim(),line(1).trim(),line(2).trim())) val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies) // 這裏可以直接經過列名MovieID爲1193過濾出這部電影,這些列名都是在上面指定的 /* * Join的時候直接指定基於UserID進行Join,這相對於原生的RDD操做而言更加方便快捷 * 直接經過元數據信息中的Gender和Age進行數據的篩選 * 直接經過元數據信息中的Gender和Age進行數據的groupBy操做 * 基於groupBy分組信息進行count統計操做,並顯示出分組統計後的前10條信息 */ ratingsDataFrame.filter(s"MovieID==1193") .join(usersDataFrame,"UserID") .select("Gender", "Age") .groupBy("Gender", "Age") .count().show(10)
上面案例中的代碼不管是從思路上,仍是從結構上都和SQL語句十分相似,下面經過寫SQL語句的方式來實現上面的案例。數據庫
println("========================================") println("功能二:用LocalTempView實現某部電影觀看者中不一樣性別不一樣年齡分別有多少人") // 既然使用SQL語句,那麼表確定是要有的,因此須要先把DataFrame註冊爲臨時表 ratingsDataFrame.createTempView("ratings") usersDataFrame.createTempView("users") // 而後寫SQL語句,直接使用SparkSession的sql方法執行SQL語句便可。 val sql_local = "SELECT Gender,Age,count(*) from users u join ratings as r on u.UserID=r.UserID where MovieID=1193 group by Gender,Age" spark.sql(sql_local).show(10)
這篇博文主要來自《Spark大數據商業實戰三部曲》這本書裏面的第一章,內容有刪減,還有本書的一些代碼的實驗結果。隨書附贈的代碼庫連接爲:https://github.com/duanzhihua/code-of-spark-big-data-business-trilogy分佈式