u1,item1,0.0913375062480507 u2,item1,0.4061823571029518 u3,item1,0.021727289053235843 u4,item1,0.24172510761164112 u5,item1,0.7898802150245668 u6,item1,0.2166110282064876
(u9,List((item74540,0.9999953894581668), (item76768,0.9999930103445652), (item21169,0.9999889286058848), (item4820,0.9999782306748293), (item85543,0.9999663834573093), (item8372,0.9999487766494871), (item99252,0.9999365275502845), (item23653,0.9999347307884792), (item19615,0.9999236599402785), (item30399,0.999918672799968))) (u18,List((item48113,0.9999984432903763), (item44728,0.9999823700583934), (item65298,0.9999721951269472), (item11426,0.9999686624512639), (item72669,0.9999525503292274), (item36801,0.9999334853565013), (item49233,0.9999283335977657), (item67481,0.9999041428222344), (item47549,0.9998546064810947), (item66968,0.999842604478957)))
/** *先合併全部用戶的item,而後對每一個item排序,而後取前k個 * @param input * @param output * @param k */ def fun1(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={ val start = System.currentTimeMillis; sc.textFile(input,num).map{x => val f = x.split(",");(f(0),(f(1),f(2).toDouble))}. combineByKey((x:(String,Double)) => List(x), (c:List[(String,Double)], x:(String,Double)) => c :+ x , (c1:List[(String,Double)], c2:List[(String,Double)]) => c1 ::: c2). map(x => (x._1 , x._2.sortWith((x,y) => x._2 > y._2).take(10))). saveAsTextFile(output) println("fun1,k: "+k+",input:"+input+",num:"+num+"---> time:"+ (System.currentTimeMillis - start )*1.0 /1000 +" s") }
/** * 先合併全部用戶item,而後使用堆棧獲取k個top值 * @param sc * @param input * @param output * @param k */ def fun2(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={ val start = System.currentTimeMillis sc.textFile(input,num).map{x => val f = x.split(",");(f(0),(f(1),f(2).toDouble))}. combineByKey((x:(String,Double)) => List(x),(c:List[(String,Double)], x:(String,Double)) => c :+ x , (c1:List[(String,Double)], c2:List[(String,Double)]) => c1 ::: c2). map(x => (x._1 ,MyUtils.top10(x._2,k))).saveAsTextFile(output) println("fun2,k: "+k+",input:"+input+",num:"+num+"---> time:"+ (System.currentTimeMillis - start )*1.0 /1000 +" s") }這裏使用到了MyUtils,這個類是參考Spark裏面的實現的,使用堆棧求前topk個值,以下:
package org.apache.spark.util import org.apache.spark.util /** * Created by fansy on 2016/12/13. */ object MyUtils { implicit val sortStringDoubleByDouble = new Ordering[(String,Double)] { override def compare(a: (String,Double), b: (String,Double)) = (b._2 - a._2 ).toInt } /** * 使用堆棧獲取最大值k個 * @param items * @param k * @return */ def top10(items : List[(String,Double)], k:Int):List[(String,Double)] ={ val queue = new BoundedPriorityQueue[(String,Double)](k)(sortStringDoubleByDouble) queue ++= util.collection.Utils.takeOrdered(items.iterator, k)(sortStringDoubleByDouble) queue.toList } }定義此類須要注意:
/** * 根據d排序,因爲每一個分區的s是同樣的,因此只須要根據d排序便可 * @param s * @param d */ case class StringDoubleKey(s:String,d:Double) object StringDoubleKey { implicit def orderingByDouble[A <: StringDoubleKey] : Ordering[A] = { Ordering.by(x => - x.d) } }key類型是一個組合key,包含用戶和評分,這裏定義比較的方法,因爲是按照評分從到到小的,因此這裏只比較評分;同時須要注意,這裏不須要把用戶也加進來,由於這裏排序的是針對每一個分區的數據進行排序的,並且每一個分區就只有一個用戶(因此這裏就要特別注意partition的設計)。
/** * partition根據 鍵值對的s進行分區 * 須要保證每一個用戶放到一個分區裏面 * @param userNum */ case class partition(userNum:Map[String,Int]) extends Partitioner{ override def getPartition(key: Any): Int ={ userNum(key.asInstanceOf[StringDoubleKey].s) } override def numPartitions: Int = userNum.size }因爲每一個用戶須要映射到一個partition中,因此這裏傳進來一個用戶和id的映射(這個映射中的用戶是惟一的),把key轉換爲StringDoubleKey類型後,只取其s也就是user字段進行映射便可;
/** * 每一個用戶的數據組成一個partition,而後每一個partition獲取topk * @param sc * @param input * @param output * @param k * @param num partition的個數沒有用 */ def fun3(sc:SparkContext,input:String ,output:String, k :Int,num:Int) ={ var start = System.currentTimeMillis val data = sc.textFile(input).map{x => val f = x.split(",");(StringDoubleKey(f(0),f(2).toDouble),f(1))} val userNum = data.map(x => x._1.s).distinct.collect println("fun3,k: "+k+",input:"+input+",num:"+num+"---> time:"+ (System.currentTimeMillis - start )*1.0 /1000 +" s") start = System.currentTimeMillis() data.repartitionAndSortWithinPartitions( partition(userNum.zip(0 until userNum.size).toMap)).mapPartitions(topK(k)).saveAsTextFile(output) println("fun3,k: "+k+",input:"+input+",num:"+num+"---> time:"+ (System.currentTimeMillis - start )*1.0 /1000 +" s") }
/** * 每一個partition返回一條記錄 * topk ,返回用戶,list((item,pref),(item,pref)) * @param k * @param iter * @return */ def topK(k :Int )(iter : Iterator[(StringDoubleKey,String)]):Iterator[(String,List[(String,Double)])] ={ val pre = iter.next val user = pre._1.s var items_pref = List[(String, Double)]((pre._2,pre._1.d)) for (cur <- iter if items_pref.length < k) { items_pref .::= (cur._2, cur._1.d) } Array((user,items_pref)).iterator }
sc.parallelize(for( i <- 1 to 100000; j <- 1 to 50) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user1000_item3w") sc.parallelize(for( i <- 1 to 30000; j <- 1 to 100) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user300_item3w") sc.parallelize(for( i <- 1 to 300000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item30w") sc.parallelize(for( i <- 1 to 500000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item50w") sc.parallelize(for( i <- 1 to 1000000; j <- 1 to 10) yield ("u"+j,"item"+i,Math.random),4).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10_item100w")使用下面的方式,效率更快:
sc.parallelize(for( j <- 1 to 100000) yield "u"+j,32).flatMap(x => for(j <- 1 to 1000) yield (x,"item"+j,Math.random)).map(x => x._1+","+x._2+","+x._3).saveAsTextFile("/tmp/user10w_item1k")
#!/bin/bash echo "Start..." input=("user_100_item_3w" "user_20_item_10w" "user_50_item_10w") method=(1 2) partition=(4 8 12 16 20 24 28 32) for i in ${input[@]} do for m in ${method[@]} do for p in ${partition[@]} do echo "input:_$i ,method: $m ,par: _$p," spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; done done done
#!/bin/bash echo "Start..." input=("user10_item30w" "user10_item50w" "user10_item100w") method=(1 2) partition=(16 20 24 28 32) for i in ${input[@]} do for m in ${method[@]} do for p in ${partition[@]} do echo "input:_$i ,method: $m ,par: _$p," spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; done done done
#!/bin/bash echo "Start..." input=("user_100_item_3w" "user_20_item_10w" "user_50_item_10w" "user10_item30w" "user10_item50w" "user10_item100w") method=(3) partition=(4) # partition個數沒有影響 for i in ${input[@]} do for m in ${method[@]} do for p in ${partition[@]} do echo "input:_$i ,method: $m ,par: _$p," spark-submit --name "input :$i,method:fun$m,partition:$p" --class topk.TopK --master yarn --deploy-mode cluster --driver-memory 3G --executor-memory 3G --num-executors 8 top.jar "/tmp/$i" "/tmp/fun${m}_${i}_par${p}" $m 10 $p 1>/dev/null 2>/dev/null ; done done done
若是您以爲lz的文章還行,而且您願意動動手指,能夠爲我投上您的寶貴一票!謝謝!html
http://blog.csdn.net/vote/candidate.html?username=fansy1990java