reduceByKey的做用對像是(key, value)形式的rdd,而reduce有減小、壓縮之意,reduceByKey的做用就是對相同key的數據進行處理,最終每一個key只保留一條記錄,保留一條記錄一般,有兩種結果:一種是隻保留咱們但願的信息,好比每一個key出現的次數;第二種是把value聚合在一塊兒造成列表,這樣後續能夠對value作進一步的操做,好比排序。sql
好比如今咱們有數據goods Sale:RDD[(String, String)],兩個字段分別是goodsid、單個訂單中的銷售額,如今咱們須要統計每一個goodsid的銷售額。咱們只須要保留每一個goodsid的累記銷售額,可使用以下語句來實現:函數
val goodsSaleSum = goodsSale.reduceByKey((x,y) => x+y)
簡化方式爲:
val goodsSaleSum = goodsSale.reduceByKey(_+_)
reduceByKey會尋找相同key的數據,當找到這樣的兩條記錄時會對其value(分別記爲x,y)作(x,y) => x+y
的處理,即只保留求和以後的數據做爲value。反覆執行這個操做直至每一個key只留下一條記錄。spa
如今假設goodsSaleSum還有一個字段類目id,即 RDD[(String, String, String)] 形式,三個字段分別是類目id、goodsid、總銷量,如今咱們要得到第個類目id下銷量最高的一個商品。上一步聚是保留value求和以後的數據,而這裏其實咱們只須要保留銷量更高的那條記錄。不過咱們不能直接對RDD[(String, String, String)]類型的數據使用reduceByKey方法,由於這並非一個(key, value)形式的數據,因此須要使用map方法轉化一下類型。code
val catGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3))).reduceByKey((x, y) => if (x._2.toDouble > y._2.toDouble) x else y).map(x => (x._1, x._2._1, x._2._2)
再進一步,假設如今咱們有一個任務:推薦5個銷售額最高的類目,併爲每一個類目推薦一個銷售額最高的商品,而咱們的數據就是上述RDD[(String, String, String)類型的goodsSaleSum。這須要兩步,一是計算每一個類目的銷售額,這和舉的第一個例子同樣。二是找出每一個類目下銷量最高的商品,這和第二個例子同樣。實際上,咱們能夠只實用一個reduceByKey就達到上面的目的。blog
val catIdGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3, x._3))) .reduceByKey((x, y) => if (x._2 > y._2) (x._1, x._2, x._3+y._3) else (y._1, y._2, x._3+y._3)) .map( x => (x._1, x._2._1, x._2._2, x._2._3).sortBy(_._3, false).take(5)
因爲咱們須要計算每一個類目的總銷售額,同時須要保留商品的銷售額,因此先使用map增長一個字段用來記錄類目的總銷售額。這樣一來,咱們就可使用reduceByKey同時完成前兩個例子的操做。剩下的就是進行排序並獲取前5條記錄。排序
上述的三個例子都是隻保留須要的信息,但有時咱們須要將value聚合在一塊兒進行排序操做,好比對每一個類目下的商品按銷售額進行排序。假設咱們的數據是 RDD[(String, String, String)],三個字段分別是類目id、goodsid、銷售額。如果使用sql,那咱們直接用row_number函數就能夠很簡單的使用分類目排序這個任務。但因爲spark-sql佔用的資源會比RDD多很多,在開發任務時並不建議使用spark-sql。咱們的方法是經過reduceByKey把商品聚合成一個List,而後對這個List進行排序,再使用flatMapValues攤平數據。咱們在使用reduceyByKey時會注意到,兩個value聚合後的數據類型必須和以前一致。因此在聚合商品時咱們也須要保證這點,一般有兩種方法,一是使用ListBuffer,便可變長度的List。二是使用String,以分隔符來區分商品和銷售額。下面咱們使用第一種方式完成這個任務。ip
val catIdGoodsIdSorted = goodsGmvSum.map(x => (x._1, ListBuffer(x._2, x._3.toDouble))) .reduceByKey((x, y) => x++y).flatMapValues( x => x.toList.sortBy(_._2).reverse.zipWithIndex)
上述zipWithIndex給列表增長一個字段,用來記錄元素的位置信息。而flatMapValues能夠把List的每一個元素單獨拆成一條記錄資源
本文中介紹了reduceByKey的三種做用:開發
求和彙總it
得到每一個key下value最大的記錄
聚合value造成一個List以後進行排序
轉自: https://www.jianshu.com/p/af175e66ce99