Spark機器學習之推薦引擎

一. 最小二乘法創建模型

 

 

 

關於最小二乘法矩陣分解,咱們能夠參閱:html

1、矩陣分解模型。es6

用戶對物品的打分行爲能夠表示成一個評分矩陣A(m*n),表示m個用戶對n各物品的打分狀況。以下圖所示:算法


其中,A(i,j)表示用戶user i對物品item j的打分。可是,ALS 的核心就是下面這個假設:的打分矩陣 A 能夠用兩個小矩陣的乘積來近似:。這樣咱們就把整個系統的自由度從一降低到了咱們接下來就聊聊爲何 ALS 的低秩假設是合理的。世上萬千事物,人們的喜愛各不相同。但。舉個例子,我喜歡看略帶黑色幽默的警匪電影,那麼你們根據這個描述就知道我大概會喜歡昆汀的《低俗小說》、《落水狗》和韋家輝的《一個字頭的誕生》。這些電影都符合我對本身喜愛的描述,也就是說他們在這個抽象的低維空間的投影和個人喜愛類似。再抽象一些,把人們的喜愛和電影的特徵都投到這個低維空間,一我的的喜愛映射到了一個低維向量,一個電影的特徵變成了緯度相同的向量,那麼這我的和這個電影的類似度就能夠表述成這兩個向量之間的內積。 咱們把打分理解成類似度,那麼「打分矩陣A(m*n)」就能夠由「用戶喜愛特徵矩陣U(m*k)」「產品特徵矩陣V(n*k)」的乘積來近似了。矩陣U、矩陣V以下圖所示:apache


U Vapp

2、交替最小二乘法(ALS)。

矩陣分解模型的損失函數爲:ide



有了損失函數以後,下面就開始談優化方法了,一般的優化方法分爲兩種:交叉最小二乘法(alternative least squares)和隨機梯度降低法(stochastic gradient descent)。本文使用算法的思想就是:咱們先隨機生成而後固定它求解,再固定求解,這樣交替進行下去,直到取得最優解min(C)。由於每步迭代都會下降偏差,而且偏差是有下界的,因此 ALS 必定會收斂。但因爲問題是非凸的,ALS 並不保證會收斂到全局最優解。但在實際應用中,ALS 對初始點不是很敏感,是否是全局最優解形成的影響並不大。函數

算法的執行步驟:優化

一、先隨機生成一個。通常能夠取0值或者全局均值。ui

二、固定(即:認爲是已知的常量),來求解lua

此時,損失函數爲:

因爲C中只有Vj一個未知變量,所以C的最優化問題轉化爲最小二乘問題,用最小二乘法求解Vj的最優解:

固定j(j=1,2,......,n),則:C的導數


,獲得:


即:


,則:

按照上式依次計算v1,v2,......,vn,從而獲得

三、固定(即:認爲是已知的量),來求解

此時,損失函數爲:

同理,用步驟2中相似的方法,能夠計算ui的值:

,獲得:


即:


,則:



依照上式依次計算u1,u2,......,um,從而獲得

四、循環執行步驟二、3,直到損失函數C的值收斂(或者設置一個迭代次數N,迭代執行步驟二、3 N次後中止)。這樣,就獲得了C最優解對應的矩陣U、V。


 

 

MovieLens 數據

數據集由用戶ID,影片ID,評分,時間戳組成

咱們只須要前3個字段

/* Load the raw ratings data from a file. Replace 'PATH' with the path to the MovieLens data */
val rawData = sc.textFile("/PATH/ml-100k/u.data")
rawData.first()
// 14/03/30 13:21:25 INFO SparkContext: Job finished: first at <console>:17, took 0.002843 s
// res24: String = 196    242    3    881250949

/* Extract the user id, movie id and rating only from the dataset */
val rawRatings = rawData.map(_.split("\t").take(3))
rawRatings.first()
// 14/03/30 13:22:44 INFO SparkContext: Job finished: first at <console>:21, took 0.003703 s
// res25: Array[String] = Array(196, 242, 3)

 

 

MLlib ALS模型

MLlib導入ALS模型:

import org.apache.spark.mllib.recommendation.ALS

咱們看一下ALS.train函數:

ALS.train
/*
    <console>:13: error: ambiguous reference to overloaded definition,
    both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    and  method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    match expected type ?
                  ALS.train
                      ^ 
*/

 

咱們能夠得知train函數須要四個參數:ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double

 

1. ratings

org.apache.spark.mllib.recommendation.Rating類是對用戶ID,影片ID,評分的封裝

咱們能夠這樣生成Rating的org.apache.spark.rdd.RDD

 

val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
ratings.first()
// 14/03/30 13:26:43 INFO SparkContext: Job finished: first at <console>:24, took 0.002808 s
// res28: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

 

2. rank

對應ALS模型中的因子個數,即「兩個小矩陣」中的k

3. iterations

對應運行時的迭代次數

4. lambda:

控制模型的正則化過程,從而控制模型的過擬合狀況。

 

由此,咱們能夠獲得模型:

/* Train the ALS model with rank=50, iterations=10, lambda=0.01 */
val model = ALS.train(ratings, 50, 10, 0.01)
// ...
// 14/03/30 13:28:44 INFO MemoryStore: ensureFreeSpace(128) called with curMem=7544924, maxMem=311387750
// 14/03/30 13:28:44 INFO MemoryStore: Block broadcast_120 stored as values to memory (estimated size 128.0 B, free 289.8 MB)
// model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@7c7fbd3b

/* Inspect the user factors */
model.userFeatures
// res29: org.apache.spark.rdd.RDD[(Int, Array[Double])] = FlatMappedRDD[1099] at flatMap at ALS.scala:231

/* Count user factors and force computation */
model.userFeatures.count
// ...
// 14/03/30 13:30:08 INFO SparkContext: Job finished: count at <console>:26, took 5.009689 s
// res30: Long = 943

model.productFeatures.count
// ...
// 14/03/30 13:30:59 INFO SparkContext: Job finished: count at <console>:26, took 0.247783 s
// res31: Long = 1682

/* Make a prediction for a single user and movie pair */ 
val predictedRating = model.predict(789, 123)

 

 

 

二. 使用推薦模型

 

用戶推薦

用戶推薦,向給定用戶推薦物品。這裏,咱們給用戶789推薦前10個他可能喜歡的電影。咱們能夠先解析下電影資料數據集

該數據集是由「|」分割,咱們只須要前兩個字段電影ID和電影名稱

val movies = sc.textFile("/PATH/ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles(123)
// res68: String = Frighteners, The (1996)

咱們看一下預測的結果:

/* Make predictions for a single user across all movies */
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))
/* 
Rating(789,715,5.931851273771102)
Rating(789,12,5.582301095666215)
Rating(789,959,5.516272981542168)
Rating(789,42,5.458065302395629)
Rating(789,584,5.449949837103569)
Rating(789,750,5.348768847643657)
Rating(789,663,5.30832117499004)
Rating(789,134,5.278933936827717)
Rating(789,156,5.250959077906759)
Rating(789,432,5.169863417126231)
*/
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
/*
(To Die For (1995),5.931851273771102)
(Usual Suspects, The (1995),5.582301095666215)
(Dazed and Confused (1993),5.516272981542168)
(Clerks (1994),5.458065302395629)
(Secret Garden, The (1993),5.449949837103569)
(Amistad (1997),5.348768847643657)
(Being There (1979),5.30832117499004)
(Citizen Kane (1941),5.278933936827717)
(Reservoir Dogs (1992),5.250959077906759)
(Fantasia (1940),5.169863417126231)
*/

咱們再來看一下實際上的結果是:

val moviesForUser = ratings.keyBy(_.user).lookup(789)
// moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), ...
// ...
println(moviesForUser.size)
// 33
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)
/*
(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)
*/

很遺憾,一個都沒對上~不過,這很正常。由於預測的結果剛好都是用戶789沒看過的電影,其預測的評分都在5.0以上,而實際上的結果是根據用戶789已經看過的電影按評分排序得到的,這也體現的推薦系統的做用~

 

物品推薦

物品推薦,給定一個物品,哪些物品和它最類似。這裏咱們使用餘弦類似度

Cosine類似度計算

將查詢語句的特徵詞的權值組成向量 a

網頁中對應的特徵詞的權值組成向量 b

查詢語句與該網頁的Cosine類似度:

 

 

/* Compute the cosine similarity between two vectors */
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

 

jblas線性代數庫

這裏MLlib庫須要依賴jblas線性代數庫,若是你們編譯jblas的jar包有問題,能夠到個人百度雲上獲取。把jar包加到lib文件夾後,記得在spark-env.sh添加配置:

SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:$SPARK_LIBRARY_PATH/jblas-1.2.4-SNAPSHOT.jar"

 

 

import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
// aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000]

 

 

求各個產品的餘弦類似度:

val sims = model.productFeatures.map{ case (id, factor) => 
    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)
    (id, sim)
}

求類似度最高的前10個相識電影。第一名確定是本身,因此要取前11個,再除去第1個:

val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")
/* 
(Hideaway (1995),0.6932331537649621)
(Body Snatchers (1993),0.6898690594544726)
(Evil Dead II (1987),0.6897964975027041)
(Alien: Resurrection (1997),0.6891221044611473)
(Stephen King's The Langoliers (1995),0.6864214133620066)
(Liar Liar (1997),0.6812075443259535)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.6754663844488256)
(Army of Darkness (1993),0.6702643811753909)
(Mystery Science Theater 3000: The Movie (1996),0.6594872765176396)
(Scream (1996),0.6538249646863378)
*/

 

 

 

 三.推薦模型評估

 

 

1.MSE/RMSE

均方差(MSE),就是對各個實際存在評分的項,pow(預測評分-實際評分,2)的值進行累加,在除以項數。而均方根差(RMSE)就是MSE開根號。

咱們先用ratings生成(user,product)RDD,做爲model.predict()的參數,從而生成以(user,product)爲key,value爲預測的rating的RDD。而後,用ratings生成以(user,product)爲key,實際rating爲value的RDD,並join上前者

val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
val predictions = model.predict(usersProducts).map{
    case Rating(user, product, rating) => ((user, product), rating)
}
val ratingsAndPredictions = ratings.map{
    case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
ratingsAndPredictions.first()
//res21: ((Int, Int), (Double, Double)) = ((291,800),(2.0,2.052364223387371))

使用MLLib的評估函數,咱們要傳入一個(actual,predicted)的RDD。actual和predicted左右位置能夠交換

import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
// Mean Squared Error = 0.08231947642632852
// Root Mean Squared Error = 0.2869137090247319

 

 

2. MAPK/MAP

K值平均準確率(MAPK)能夠簡單的這麼理解:

設定推薦K=10,即推薦10個物品。預測該用戶評分最高的10個物品ID做爲文本1,實際上用戶評分過全部物品ID做爲文本2,求兩者的相關度。(我的認爲該評估方法在這裏不是很適用)

咱們能夠按評分排序預測物品ID,再從頭遍歷,若是該預測ID出如今實際評分過ID的集合中,那麼就增長必定分數(固然,排名高的應該比排名低的增長更多的分數,由於前者更能體現推薦的準確性)。最後將累加獲得的分數除以min(K,actual.size)

若是是針對全部用戶,咱們須要把各個用戶的累加分數進行累加,在除以用戶數。

在MLlib裏面,使用的是全局平均準確率(MAP,不設定K)。它須要咱們傳入(predicted.Array,actual.Array)的RDD。

如今,咱們先來生成predicted

咱們先生成產品矩陣:

/* Compute recommendations for all users */
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
// (1682,50)

以便工做節點可以訪問到,咱們把該矩陣以廣播變量的形式分發出去:

// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)

」,矩陣相乘,計算出評分。scores.data.zipWithIndex,scores.data再按評分排序。生成recommendedIds,構建(userId, recommendedIds)RDD。

val allRecs = model.userFeatures.map{ case (userId, array) => 
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}

生成actual

// next get all the movie ids per user, grouped by user id
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
// userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at <console>:21

生成(predicted.Array,actual.Array)的RDD,並使用評估函數:

import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.07171412913757183
相關文章
相關標籤/搜索