Spark TopK問題解法

軟件版本及平臺:

CDH5.八、四子節點(cpu:2核、內存:4G)、JDK:1.七、IDEA14 、 Spark:1.6.0-cdh5.7.3;
代碼下載:

問題描述:

現有用戶項目評分數據,其格式以下所示:
u1,item1,0.0913375062480507
u2,item1,0.4061823571029518
u3,item1,0.021727289053235843
u4,item1,0.24172510761164112
u5,item1,0.7898802150245668
u6,item1,0.2166110282064876

現須要求得每一個用戶對項目的評分前10個的數據,以下所示:
(u9,List((item74540,0.9999953894581668), (item76768,0.9999930103445652), (item21169,0.9999889286058848), (item4820,0.9999782306748293), (item85543,0.9999663834573093), (item8372,0.9999487766494871), (item99252,0.9999365275502845), (item23653,0.9999347307884792), (item19615,0.9999236599402785), (item30399,0.999918672799968)))
(u18,List((item48113,0.9999984432903763), (item44728,0.9999823700583934), (item65298,0.9999721951269472), (item11426,0.9999686624512639), (item72669,0.9999525503292274), (item36801,0.9999334853565013), (item49233,0.9999283335977657), (item67481,0.9999041428222344), (item47549,0.9998546064810947), (item66968,0.999842604478957)))

思路:

思路有三種:
  1. a. 讀取數據後,使用把數據轉換爲(user,(item,pref))這樣的鍵值對;b. 而後使用combineByKey把相同user的數據整合起來;c. 最後調用map,針對每一個(user,List((item,pref)))這樣的數據中的List進行排序,而後取其前k個;
  2. 參考思路1中的a.b.c步驟,只是在c步驟不是對List直接排序取前k個,而是使用一個堆棧,獲得List中pref最大的k個值返回;
  3. a. 讀取數據後,構造(StringDouble(user,pref),item)這樣的鍵值對,同時須要對StringDouble這樣的類構造比較方法;b. 調用repartitionAndSortWithinPartitions方法來對RDD進行分區,分區規則根據key來,把相同的用戶分爲一個區,因此這裏要定義一個分區器;c. 調用mapPartition對每一個partition也就是每一個用戶取其前k個值返回;

思路1:

對整合後的每一個用戶的項目按照其評分進行排序,取其前k個;
代碼以下:
/**
   *先合併全部用戶的item,而後對每一個item排序,而後取前k個
   * @param input
   * @param output
   * @param k
   */
  def fun1(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={
    val start = System.currentTimeMillis;
    sc.textFile(input,num).map{x => val f = x.split(",");(f(0),(f(1),f(2).toDouble))}.
      combineByKey((x:(String,Double)) => List(x),
      (c:List[(String,Double)], x:(String,Double)) => c :+ x ,
      (c1:List[(String,Double)], c2:List[(String,Double)]) => c1 ::: c2).
      map(x => (x._1 , x._2.sortWith((x,y) => x._2 > y._2).take(10))).
      saveAsTextFile(output)
    println("fun1,k: "+k+",input:"+input+",num:"+num+"---> time:"+
      (System.currentTimeMillis - start )*1.0 /1000 +" s")
  }

思路2:

對整合後的每一個用戶的項目按照其評分使用堆棧求其前k個;
代碼以下:
/**
   * 先合併全部用戶item,而後使用堆棧獲取k個top值
   * @param sc
   * @param input
   * @param output
   * @param k
   */
  def fun2(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={
    val start = System.currentTimeMillis
    sc.textFile(input,num).map{x => val f = x.split(",");(f(0),(f(1),f(2).toDouble))}.
      combineByKey((x:(String,Double)) => List(x),(c:List[(String,Double)], x:(String,Double)) => c :+ x ,
        (c1:List[(String,Double)], c2:List[(String,Double)]) => c1 ::: c2).
      map(x => (x._1 ,MyUtils.top10(x._2,k))).saveAsTextFile(output)
    println("fun2,k: "+k+",input:"+input+",num:"+num+"---> time:"+
      (System.currentTimeMillis - start )*1.0 /1000 +" s")

  }
這裏使用到了MyUtils,這個類是參考Spark裏面的實現的,使用堆棧求前topk個值,以下:
package org.apache.spark.util

import org.apache.spark.util

/**
 * Created by fansy on 2016/12/13.
 */
object MyUtils {
  implicit val sortStringDoubleByDouble = new Ordering[(String,Double)] {
    override def compare(a: (String,Double), b: (String,Double)) = (b._2 - a._2 ).toInt
  }
  /**
   * 使用堆棧獲取最大值k個
   * @param items
   * @param k
   * @return
   */
  def top10(items : List[(String,Double)], k:Int):List[(String,Double)] ={
    val queue = new BoundedPriorityQueue[(String,Double)](k)(sortStringDoubleByDouble)
    queue ++= util.collection.Utils.takeOrdered(items.iterator, k)(sortStringDoubleByDouble)
    queue.toList
  }
}
定義此類須要注意:
1)這個類必定要放在org.apache.spark.util這個包下面;
2)定義的比較器sortStringDoubleByDouble,能夠根據本身的須要,這裏因爲須要取得評分比較大的,因此須要是b._2 - a._2;

思路3:

使用自定義Key類型,使用repartitionAndSort,自定義分區器,mapPartition來獲取topk;
1. 首先定義key類型:
/**
 * 根據d排序,因爲每一個分區的s是同樣的,因此只須要根據d排序便可
 * @param s
 * @param d
 */
case class StringDoubleKey(s:String,d:Double)
object StringDoubleKey {
  implicit def orderingByDouble[A <: StringDoubleKey] : Ordering[A] = {
    Ordering.by(x => - x.d)
  }
}
key類型是一個組合key,包含用戶和評分,這裏定義比較的方法,因爲是按照評分從到到小的,因此這裏只比較評分;同時須要注意,這裏不須要把用戶也加進來,由於這裏排序的是針對每一個分區的數據進行排序的,並且每一個分區就只有一個用戶(因此這裏就要特別注意partition的設計)。
2. 定義partition
/**
 * partition根據 鍵值對的s進行分區
 * 須要保證每一個用戶放到一個分區裏面
 * @param userNum
 */
case class partition(userNum:Map[String,Int]) extends Partitioner{
  override def getPartition(key: Any): Int ={
    userNum(key.asInstanceOf[StringDoubleKey].s)
  }
  override def numPartitions: Int = userNum.size
}
因爲每一個用戶須要映射到一個partition中,因此這裏傳進來一個用戶和id的映射(這個映射中的用戶是惟一的),把key轉換爲StringDoubleKey類型後,只取其s也就是user字段進行映射便可;
3. 定義 fun3 主函數
/**
   * 每一個用戶的數據組成一個partition,而後每一個partition獲取topk
   * @param sc
   * @param input
   * @param output
   * @param k
   * @param num  partition的個數沒有用
   */
  def fun3(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={
    var start = System.currentTimeMillis
    val data = sc.textFile(input).map{x => val f = x.split(",");(StringDoubleKey(f(0),f(2).toDouble),f(1))}
    val userNum = data.map(x => x._1.s).distinct.collect
    println("fun3,k: "+k+",input:"+input+",num:"+num+"---> time:"+
      (System.currentTimeMillis - start )*1.0 /1000 +" s")
    start = System.currentTimeMillis()
    data.repartitionAndSortWithinPartitions( partition(userNum.zip(0 until userNum.size).toMap)).mapPartitions(topK(k)).saveAsTextFile(output)
    println("fun3,k: "+k+",input:"+input+",num:"+num+"---> time:"+
      (System.currentTimeMillis - start )*1.0 /1000 +" s")
  }

主函數裏面先求得全部用戶的distinct值(去重),而後把各個用戶使用0到用戶個數-1進行映射;接着調用repartitionAndSortWithInPartitions方法進行每一個partition內部排序,其排序邏輯就採用StringDoubleKey的排序邏輯;最後,調用mapPartition對每一個partition取前k個值就能夠了,這裏topK返回的一樣是一個函數;
4. 定義topK函數
/**
   * 每一個partition返回一條記錄
   * topk ,返回用戶,list((item,pref),(item,pref))
   * @param k
   * @param iter
   * @return
   */
  def topK(k :Int )(iter : Iterator[(StringDoubleKey,String)]):Iterator[(String,List[(String,Double)])] ={
    val pre = iter.next
    val user = pre._1.s
    var items_pref = List[(String, Double)]((pre._2,pre._1.d))
    for (cur <- iter if items_pref.length < k)
    {
      items_pref .::= (cur._2, cur._1.d)
    }
    Array((user,items_pref)).iterator
  }

topK實現採用柯里化方式返回一個函數,這裏須要注意的是接收的參數以及返回的參數,因爲每一個partition傳過來的參數是Iterator[(StringDoubleKey,String)],而且輸出的應該是(user,List(Item,pref),後面的list是前k個;同時在處理iterator的時候,也要注意其遍歷一次的特性,取了next後,再接着訪問的就是iter下面的一個值了;

實驗:

1. 生成數據:
生成數據一共分爲多個用戶多個項目構成,其代碼以下所示:
sc.parallelize(for( i <- 1 to 100000; j <- 1 to 50) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user1000_item3w")
sc.parallelize(for( i <- 1 to 30000; j <- 1 to 100) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user300_item3w")
sc.parallelize(for( i <- 1 to 300000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item30w")
sc.parallelize(for( i <- 1 to 500000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item50w")
sc.parallelize(for( i <- 1 to 1000000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item100w")
使用下面的方式,效率更快:
sc.parallelize(for( j <- 1 to 100000) yield "u"+j,32).flatMap(x => for(j <- 1 to 1000) yield (x,"item"+j,Math.random)).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10w_item1k")


其在HDFS上的截圖以下所示:

2. 調用代碼:
編寫一個for循環來運行任務,代碼以下:
#!/bin/bash

echo "Start..."
input=("user_100_item_3w" "user_20_item_10w" "user_50_item_10w")
method=(1 2)
partition=(4 8 12 16 20 24 28 32)
for i in ${input[@]}
do
	for m in ${method[@]}
	do
		for p in ${partition[@]}
		do
			echo "input:_$i ,method:  $m ,par:  _$p," 
spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; 

		done
	done
done

#!/bin/bash

echo "Start..."
input=("user10_item30w" "user10_item50w" "user10_item100w")
method=(1 2)
partition=(16 20 24 28 32)
for i in ${input[@]}
do
	for m in ${method[@]}
	do
		for p in ${partition[@]}
		do
			echo "input:_$i ,method:  $m ,par:  _$p," 
spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; 

		done
	done
done

#!/bin/bash

echo "Start..."
input=("user_100_item_3w" "user_20_item_10w" "user_50_item_10w" "user10_item30w" "user10_item50w" "user10_item100w")
method=(3)
partition=(4) # partition個數沒有影響
for i in ${input[@]}
do
	for m in ${method[@]}
	do
		for p in ${partition[@]}
		do
			echo "input:_$i ,method:  $m ,par:  _$p," 
spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; 

		done
	done
done

固然上面的代碼第一個和第二個能夠合併,第三個代碼是測試第三種方法的代碼;
3. 測試結果
等待全部程序運行完成,能夠看到任務監控以及HDFS監控,以下:



查找日誌,能夠看到具體時間,以下:

各個時間整理,以下表所示:

使用10w用戶,1k的數據量進行TopK操做,獲得的結果分別以下:



總結

1. 思路1和思路2整體時間相差很少,可是從直觀的想法來看,通常狀況下思路2要比思路1的效率高,可是從實驗中並無看到這樣的效果;
2. 思路1和思路2其partition的個數從數據來看是越大越好,可是這個應該是和數據量比較小有關,當數據量很大的時候partition在必定範圍其效果最好;
3. 思路3整體看來會比思路1/思路2的效果好,可是當用戶多的時候,使用思路3其效率應該會降低不少;



若是您以爲lz的文章還行,而且您願意動動手指,能夠爲我投上您的寶貴一票!謝謝!html

http://blog.csdn.net/vote/candidate.html?username=fansy1990java