平常的數據來源有不少渠道,如網絡爬蟲、網頁埋點、系統日誌等。下面的案例中使用的是用戶觀看電影和點評電影的行爲數據,數據來源於網絡上的公開數據,共有3個數據文件:uers.dat、ratings.dat和movies.dat。數組
其中,uers.dat的格式以下: UserID::Gender::Age::Occupation::Zip-code ,這個文件裏共有6040個用戶的信息,每行中用「::」隔開的詳細信息包括ID、性別(F、M分別表示女性、男性)、年齡(使用7個年齡段標記)、職業和郵編。緩存
ratings.dat的格式以下: UserID::MovieID::Rating::Timestamp ,這個文件共有一百萬多條記錄,記錄的是評分信息,即用戶ID、電影ID、評分(滿分是5分)和時間戳。網絡
movies.dat的格式以下: MovieID::Title::Genres ,這個文件記錄的是電影信息,即電影ID、電影名稱和電影類型。分佈式
首先初始化Spark,以及讀取文件。建立一個Scala的object類,在main方法中配置SparkConf和SparkContext,這裏指定程序在本地運行,而且把程序名字設置爲「RDD_Movie_Users_Analyzer」。ide
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_Movie_User_Analyzer") /** * Spark2.0引入SparkSession封裝了SparkContext和SQLContext,而且會在builder的getOrCreate方法中判斷是否 * 含有符合要求的SparkSession存在,有則使用,沒有則進行建立 */ val spark = SparkSession.builder.config(conf).getOrCreate() // 獲取SparkSession的SparkContext val sc = spark.sparkContext // 把Spark程序運行時的日誌設置爲warn級別,以方便查看運行結果 sc.setLogLevel("WARN") // 把用到的數據加載進來轉換爲RDD,此時使用sc.textFile並不會讀取文件,而是標記了有這個操做,遇到Action級算子時纔回真正去讀取文件 val usersRDD = sc.textFile("./src/test1/users.dat") val moviesRDD = sc.textFile("./src/test1/movies.dat") val ratingsRDD = sc.textFile("./src/test1/ratings.dat")
首先咱們來寫一個案例計算,並打印出全部電影中評分最高的前10個電影名和平均評分。ui
第一步:從ratingsRDD中取出MovieID和rating,從moviesRDD中取出MovieID和Name,若是後面的代碼重複使用這些數據,則能夠把它們緩存起來。首先把使用map算子上面的RDD中的每個元素(即文件中的每一行)以「::」爲分隔符進行拆分,而後再使用map算子從拆分後獲得的數組中取出須要用到的元素,並把獲得的RDD緩存起來this
第二步:從ratings的數據中使用map算子獲取到形如(movieID,(rating,1))格式的RDD,而後使用reduceByKey把每一個電影的總評分以及點評人數算出來。此時獲得的RDD格式爲(movieID,Sum(ratings),Count(ratings)))。spa
第三步:把每一個電影的Sum(ratings)和Count(ratings)相除,獲得包含了電影ID和平均評分的RDD:scala
第四步:把avgRatings與movieInfo經過關鍵字(key)鏈接到一塊兒,獲得形如(movieID, (MovieName,AvgRating))的RDD,而後格式化爲(AvgRating,MovieName),並按照key(也就是平均評分)降序排列,最終取出前10個並打印出來。日誌
println("全部電影中平均得分最高(口碑最好)的電影:") val movieInfo = moviesRDD.map(_.split("::")).map(x=>(x(0),x(1))).cache() val ratings = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2))).cache() val moviesAndRatings = ratings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) val avgRatings = moviesAndRatings.map(x=>(x._1,x._2._1.toDouble/x._2._2)) avgRatings.join(movieInfo).map(item=>(item._2._1,item._2._2)) .sortByKey(false).take(10) .foreach(record=>println(record._2+"評分爲:"+record._1))
接下來咱們來看另一個功能的實現:分析最受男性喜好的電影Top10和最受女性喜好的電影Top10。
首先來分析一下:單從ratings中沒法計算出最受男性或者女性喜好的電影Top10,由於該RDD中沒有Gender信息,若是須要使用Gender信息進行Gender的分類,此時必定須要聚合。固然,咱們力求聚合使用的是mapjoin(分佈式計算的一大痛點是數據傾斜,map端的join必定不會數據傾斜),這裏是否可以使用mapjoin?不能夠,由於map端的join是使用broadcast把相對小得多的變量廣播出去,這樣能夠減小一次shuffle,這裏,用戶的數據很是多,因此要使用正常的join。
使用join鏈接ratings和users以後,對分別過濾出男性和女性的記錄進行處理:
println("========================================") println("全部電影中最受男性喜好的電影Top10:") val usersGender = usersRDD.map(_.split("::")).map(x=>(x(0),x(1))) val genderRatings = ratings.map(x=>(x._1,(x._1,x._2,x._3))).join(usersGender).cache() // genderRatings.take(10).foreach(println) val maleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("M")).map(x=>x._2._1) val femaleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("F")).map(x=>x._2._1) maleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) .map(x=>(x._1,x._2._1.toDouble/x._2._2)) .join(movieInfo) .map(item=>(item._2._1,item._2._2)) .sortByKey(false) .take(10) .foreach(record=>println(record._2+"評分爲:"+record._1)) println("========================================") println("全部電影中最受女性喜好的電影Top10:") femaleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) .map(x=>(x._1,x._2._1.toDouble/x._2._2)) .join(movieInfo) .map(item=>(item._2._1,item._2._2)) .sortByKey(false) .take(10) .foreach(record=>println(record._2+"評分爲:"+record._1))
在現實業務場景中,二次排序很是重要,而且常常遇到。下面來模擬一下這些場景,實現對電影評分數據進行二次排序,以Timestamp和Rating兩個維度降序排列,值得一提的是,Java版本的二次排序代碼很是煩瑣,而使用Scala實現就會很簡捷,首先咱們須要一個繼承自Ordered和Serializable的類。
class SecondarySortKey(val first:Double,val second:Double) extends Ordered[SecondarySortKey] with Serializable{ // 在這個類中重寫compare方法 override def compare(other:SecondarySortKey):Int={ // 既然是二次排序,那麼首先要判斷第一個排序字段是否相等,若是不相等,就直接排序 if(this.first-other.first!=0){ (this.first-other.first).toInt }else { // 若是第一個字段相等,則比較第二個字段,若想實現屢次排序,也能夠按照這個模式繼續比較下去 if(this.second-other.second>0){ Math.ceil(this.second-other.second).toInt }else if (this.second-other.second<0) { Math.floor(this.second-other.second).toInt }else { (this.second-other.second).toInt } } } }
而後再把RDD的每條記錄裏想要排序的字段封裝到上面定義的類中做爲key,把該條記錄總體做爲value。
println("========================================") println("對電影評分數據以Timestamp和Rating兩個維度進行二次降序排列:") val pairWithSortkey = ratingsRDD.map(line=>{ val spilted = line.split("::") (new SecondarySortKey(spilted(3).toDouble,spilted(2).toDouble),line) }) // 直接調用sortByKey,此時會按照以前實現的compare方法排序 val sorted = pairWithSortkey.sortByKey(false) val sortedResult = sorted.map(sortedline => sortedline._2) sortedResult.take(10).foreach(println)
取出排序後的RDD的value,此時這些記錄已是按照時間戳和評分排好序的,最終打印出的結果如圖所示,從圖中能夠看到已經按照timestamp和評分降序排列了。