第4章 離線推薦服務建設4.1 離線推薦服務4.2 離線統計服務4.2.1 離線統計服務主體框架4.2.2 歷史熱門商品統計4.2.3 最近熱門商品統計4.2.4 商品平均得分統計4.2.5 將 DF 數據寫入 MongoDB 數據庫對應的表中的方法4.3 基於隱語義模型的協同過濾推薦(類似推薦)4.3.1 用戶商品推薦列表4.3.2 商品類似度矩陣4.3.3 模型評估和參數選取第5章 實時推薦服務建設5.1 實時推薦服務5.2 實時推薦模型和代碼框架5.2.1 實時推薦模型算法設計5.2.2 實時推薦模塊框架5.3 實時推薦算法的實現5.3.1 獲取用戶的 K 次最近評分5.3.2 獲取當前商品最類似的 K 個商品5.3.3 商品推薦優先級計算5.3.4 將結果保存到 mongoDB5.3.5 更新實時推薦結果5.4 實時系統聯調5.4.1 啓動實時系統的基本組件5.4.2 啓動 zookeeper 集羣(使用羣起腳本)5.4.3 啓動 kafka 集羣(使用羣起腳本)5.4.4 構建 Kafka Streaming 程序5.4.5 配置並啓動 flume5.4.6 啓動業務系統後臺第6章 冷啓動問題處理第7章 其它形式的離線推薦服務(類似推薦)7.1 基於內容的協同過濾推薦(類似推薦)7.2 基於物品的協同過濾推薦(類似推薦)第8章 程序部署與運行html
離線推薦服務是綜合用戶全部的歷史數據,利用設定的離線統計算法
和離線推薦算法
週期性的進行結果統計與保存,計算的結果在必定時間週期內是固定不變的,變動的頻率取決於算法調度的頻率。
離線推薦服務主要計算一些能夠預先進行統計和計算的指標,爲實時計算和前端業務相應提供數據支撐。
離線推薦服務主要分爲統計推薦
、基於隱語義模型的協同過濾推薦
以及基於內容的類似推薦
和基於 Item-CF 的類似推薦
。咱們這一章主要介紹前兩部分,基於內容的推薦
和 基於 Item-CF 的推薦
在總體結構和實現上是相似的,咱們將在第 7 章詳細介紹。前端
在 recommender 下新建子項目 StatisticsRecommender,pom.xml 文件中只需引入 spark、scala 和 mongodb 的相關依賴:java
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
在 resources 文件夾下引入 log4j.properties,而後在 src/main/scala 下新建 scala 單例 object 對象 com.atguigu.statistics.StatisticsRecommender。
一樣,咱們應該先建好樣例類,在 main() 方法中定義配置、建立 SparkSession 並加載數據,最後關閉 spark。代碼以下:redis
src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala算法
// 定義樣例類
// 注意:spark mllib 中有 Rating 類,爲了便於區別,咱們從新命名爲 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
object StatisticsRecommender {
// 定義 MongoDB 中存儲的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定義 MongoDB 中統計表的名稱
val RATE_MORE_PRODUCTS = "RateMoreProducts"
val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
val AVERAGE_PRODUCTS_SCORE = "AverageProductsScore"
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 MongoDB 中的數據加載進來,並轉換爲 DataFrame
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.toDF()
// 建立一張名爲 ratings 的臨時表
ratingDF.createOrReplaceTempView("ratings")
// TODO: 用 sparK sql 去作不一樣的統計推薦結果
// 一、歷史熱門商品統計(按照商品的評分次數統計)數據結構是:productId, count
// 二、最近熱門商品統計,即統計以月爲單位每一個商品的評分個數(須要將時間戳轉換成 yyyyMM 格式後,按照商品的評分次數統計)數據結構是:productId, count, yearmonth
// 三、商品平均得分統計(即優質商品統計)數據結構是:productId,avg
// 關閉 Spark
spark.stop()
}
}
根據全部歷史評分數據,計算曆史評分次數最多的商品。
實現思路:經過 Spark SQL 讀取評分數據集,統計全部評分中評分個數最多的商品,而後按照從大到小排序,將最終結果寫入 MongoDB 的 RateMoreProducts 數據集中。sql
// 一、歷史熱門商品統計(按照商品的評分次數統計)數據結構是:productId, count
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")
storeDFInMongoDB(rateMoreProductsDF, RATE_MORE_PRODUCTS)
根據評分,按月爲單位計算最近時間的月份裏面評分數個數最多的商品集合。
實現思路:經過 Spark SQL 讀取評分數據集,經過 UDF 函數將評分的數據時間修改成月,而後統計每個月商品的評分數。統計完成以後將數據寫入到 MongoDB 的 RateMoreRecentlyProducts 數據集中。mongodb
// 二、最近熱門商品統計,即統計以月爲單位每一個商品的評分個數(須要將時間戳轉換成 yyyyMM 格式後,按照商品的評分次數統計)數據結構是:productId, count, yearmonth
// 建立一個日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
// 註冊 UDF,將 時間戳 timestamp 轉化爲年月格式 yyyyMM,注意:時間戳 timestamp 的單位是 秒,而日期格式化工具中 Date 須要的是 毫秒,且 format() 的結果是 字符串,須要轉化爲 Int 類型
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
// 把原始的 ratings 數據轉換成想要的數據結構:productId, score, yearmonth,而後建立對應的臨時表
val ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
// 將新的數據集註冊成爲一張臨時表
ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from " +
"ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")
storeDFInMongoDB(rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS)
根據歷史數據中全部用戶對商品的評分,週期性的計算每一個商品的平均得分。
實現思路:經過 Spark SQL 讀取保存在 MongDB 中的 Rating 數據集,經過執行如下 SQL 語句實現對於商品的平均分統計。統計完成以後將生成的新的 DataFrame 寫出到 MongoDB 的 AverageProductsScore 集合中。數據庫
// 三、商品平均得分統計(即優質商品統計)數據結構是:productId,avg
val averageProductsScoreDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
storeDFInMongoDB(averageProductsScoreDF, AVERAGE_PRODUCTS_SCORE)
/**
* 將 DF 數據寫入 MongoDB 數據庫對應的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
項目採用 ALS(交替最小二乘法) 做爲協同過濾算法,根據 MongoDB 中的用戶評分表 計算離線的用戶商品推薦列表以及商品類似度矩陣。apache
經過 ALS 訓練出來的 Model 來計算全部當前用戶商品的推薦列表,主要思路以下:
一、userId 和 productId 作笛卡爾積,產生 (userId, productId) 的元組。
二、經過模型預測 (userId, productId) 對應的評分。
三、將預測結果經過預測分值進行排序。
四、返回分值最大的 K 個商品,做爲當前用戶的推薦列表。
最後生成的數據結構以下:將數據保存到 MongoDB 的 UserRecs 表中。
bootstrap
<dependencies>
<!-- java 線性代數的庫 -->
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
一樣通過前期的構建樣例類、聲明配置、建立 SparkSession 等步驟,能夠加載數據開始計算模型了。
核心代碼以下:
src/main/scala/com.atguigu.offline/OfflineRecommender.scala
// 定義樣例類
// 注意:spark mllib 中有 Rating 類,爲了便於區別,咱們從新命名爲 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 標準推薦對象,productId, score
case class Recommendation(productId: Int, score: Double)
// 用戶推薦列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 商品類似度列表(商品類似度矩陣/商品推薦列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OfflineRecommender {
// 定義 MongoDB 中存儲的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定義 MongoDB 中推薦表的名稱
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 MongoDB 中的數據加載進來,並轉換爲 RDD,以後進行 map 遍歷轉換爲 三元組形式的 RDD,並緩存
val ratingRDD = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(productRating => (productRating.userId, productRating.productId, productRating.score))
.cache()
// 提取出用戶和商品的數據集,並去重
val userRDD = ratingRDD.map(_._1).distinct()
val productRDD = ratingRDD.map(_._2).distinct()
// TODO: 核心計算過程 -- 基於 LFM 模型的協同過濾推薦(類似推薦)
// 一、訓練隱語義模型
// 建立訓練數據集
val trainDataRDD = ratingRDD.map(x => Rating(x._1, x._2, x._3)) // Rating(user, product, rating)
// rank 是模型中隱語義因子(特徵)的個數, iterations 是迭代的次數, lambda 是 ALS 的正則化參數
val (rank, iterations, lambda) = (5, 10, 0.001)
val model = ALS.train(trainDataRDD, rank, iterations, lambda)
// 二、獲取預測評分矩陣,獲得用戶的商品推薦列表(用戶推薦矩陣)
// 用 userRDD 和 productRDD 作一個笛卡爾積,獲得一個空的 userProductsRDD: RDD[(userId, productId)]
val userProductsRDD = userRDD.cartesian(productRDD)
// 執行模型預測,獲取預測評分矩陣,predictRatingRDD: RDD[Rating(userId, productId, rating)]
val predictRatingRDD = model.predict(userProductsRDD)
// 從預測評分矩陣中提取獲得用戶推薦列表
// (先過濾 filter,而後 map 轉換爲 KV 結構,再 groupByKey,再 map 封裝樣例類1,sortWith 後 take 再 map 封裝樣例類2)
val userRecsDF = predictRatingRDD.filter(_.rating > 0)
.map(
rating =>
(rating.user, (rating.product, rating.rating))
)
.groupByKey()
.map {
case (userId, recs) =>
// UserRecs(userId, recs.toList.sortBy(_._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
} // userRecsRDD: RDD[(userId, Seq[(productId, score)])]
.toDF()
// 將 DF 數據寫入 MongoDB 數據庫對應的表中
storeDFInMongoDB(userRecsDF, USER_RECS)
// 三、利用商品的特徵矩陣,計算商品的類似度列表(商品類似度矩陣)
spark.stop()
}
經過 ALS 計算商品類似度矩陣,該矩陣用於查詢當前商品的類似商品併爲實時推薦系統服務。
核心代碼以下:
// 三、利用商品的特徵矩陣,計算商品的類似度列表(商品類似度矩陣)
// 經過訓練出的 model 的 productFeatures 方法,獲得 商品的特徵矩陣
// 數據格式 RDD[(scala.Int, scala.Array[scala.Double])]
val productFeaturesRDD = model.productFeatures.map {
case (productId, featuresArray) =>
(productId, new DoubleMatrix(featuresArray))
}
// 將 商品的特徵矩陣 和 商品的特徵矩陣 作一個笛卡爾積,獲得一個空的 productFeaturesCartRDD
val productFeaturesCartRDD = productFeaturesRDD.cartesian(productFeaturesRDD)
// 獲取 商品類似度列表(商品類似度矩陣/商品推薦列表)
val productSimDF = productFeaturesCartRDD
.filter { // 過濾掉本身與本身作笛卡爾積的數據
case (a, b) =>
a._1 != b._1
}
.map { // 計算餘弦類似度
case (a, b) =>
val simScore = this.consinSim(a._2, b._2)
// 返回一個二元組 productSimRDD: RDD[(productId, (productId, consinSim))]
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 將 DF 數據寫入 MongoDB 數據庫對應的表中
storeDFInMongoDB(productSimDF, PRODUCT_RECS)
其中,consinSim 是求兩個向量餘弦類似度的函數,代碼實現以下:
/**
* 計算兩個商品之間的餘弦類似度(使用的是向量點積公式)
*
* @param product1
* @param product2
* @return
*/
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
// dot 表示點積,norm2 表示模長,模長就是 L2範式
product1.dot(product2) / (product1.norm2() * product2.norm2()) // l1範數:向量元素絕對值之和;l2範數:即向量的模長(向量的長度)
}
在上述模型訓練的過程當中,咱們直接給定了隱語義模型的 rank,iterations,lambda 三個參數。對於咱們的模型,這並不必定是最優的參數選取,因此咱們須要對模型進行評估。一般的作法是計算均方根偏差(RMSE),考察預測評分與實際評分之間的偏差。
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 MongoDB 中的數據加載進來,並轉換爲 RDD,以後進行 map 遍歷轉換爲 RDD(樣例類是 spark mllib 中的 Rating),並緩存
val ratingRDD = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(productRating => Rating(productRating.userId, productRating.productId, productRating.score))
.cache()
// ratingRDD: RDD[Rating(user, product, rating)]
// 將一個 RDD 隨機切分紅兩個 RDD,用以劃分訓練集和測試集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingDataRDD = splits(0)
val testinggDataRDD = splits(1)
// 輸出最優參數
adjustALSParams(trainingDataRDD, testinggDataRDD)
// 關閉 Spark
spark.close()
}
其中 adjustALSParams 方法是模型評估的核心,輸入一組訓練數據和測試數據,輸出計算獲得最小 RMSE 的那組參數。代碼實現以下:
/**
* 輸出最優參數的方法:輸入一組訓練數據和測試數據,輸出計算獲得最小 RMSE 的那組參數
*
* @param trainingDataRDD
* @param testinggData
*/
def adjustALSParams(trainingDataRDD: RDD[Rating], testinggData: RDD[Rating]) = {
// 這裏指定迭代次數爲 10,rank 和 lambda 在幾個值中選取調整
val result = for (rank <- Array(50, 100, 150, 200); lambda <- Array(1, 0.1, 0.01, 0.001))
yield { // yield 表示把 for 循環的每一次中間結果保存下來
val model = ALS.train(trainingDataRDD, rank, 10, lambda)
val rmse = getRMSE(model, testinggData)
(rank, lambda, rmse)
}
// 按照 rmse 排序
// println(result.sortBy(_._3).head)
println(result.minBy(_._3))
}
計算 RMSE 的函數 getRMSE 代碼實現以下:
/**
* 計算 RMSE
*
* @param model
* @param testinggDataRDD
*/
def getRMSE(model: MatrixFactorizationModel, testinggDataRDD: RDD[Rating]) = {
// 將 三元組數據 轉化爲 二元組數據
// testinggDataRDD: RDD[Rating(userId, productId, rating)]
val userProductsRDD = testinggDataRDD.map(rating => (rating.user, rating.product))
// 執行模型預測,獲取預測評分矩陣
// predictRatingRDD: RDD[Rating(userId, productId, rating)]
val predictRatingRDD = model.predict(userProductsRDD)
// 測試數據的真實評分
val realRDD = testinggDataRDD.map(rating => ((rating.user, rating.product), rating.rating))
// 測試數據的預測評分
val predictRDD = predictRatingRDD.map(rating => ((rating.user, rating.product), rating.rating))
// 計算 RMSE(測試數據的真實評分 與 測試數據的預測評分 作內鏈接操做)
sqrt(
realRDD.join(predictRDD).map {
case ((userId, productId), (real, predict)) =>
// 真實值和預測值之間的差
val err = real - predict
err * err
}.mean()
)
}
運行代碼,咱們就能夠獲得目前數據的最優模型參數。
實時計算與離線計算應用於推薦系統上最大的不一樣在於實時計算推薦結果應該反映最近一段時間用戶近期的偏好
,而離線計算推薦結果則是根據用戶從第一次評分起的全部評分記錄來計算用戶整體的偏好
。
用戶對物品的偏好隨着時間的推移老是會改變的。好比一個用戶 u 在某時刻對商品 p 給予了極高的評分,那麼在近期一段時候,u 極有可能很喜歡與商品 p 相似的其餘商品;而若是用戶 u 在某時刻對商品 q 給予了極低的評分,那麼在近期一段時候,u 極有可能不喜歡與商品 q 相似的其餘商品。因此對於實時推薦,當用戶對一個商品進行了評價後,用戶會但願推薦結果基於最近這幾回評分進行必定的更新,使得推薦結果匹配用戶近期的偏好,知足用戶近期的口味。
若是實時推薦繼續採用離線推薦中的 ALS 算法,因爲 ALS 算法運行時間巨大(好幾分鐘甚至好十幾分鍾)
,不具備實時獲得新的推薦結果的能力;而且因爲算法自己的使用的是用戶評分表,用戶本次評分後只更新了總評分表中的一項,使得算法運行後的推薦結果與用戶本次評分以前的推薦結果基本沒有多少差異,從而給用戶一種推薦結果一直沒變化的感受,很影響用戶體驗。
另外,在實時推薦中因爲時間性能上要知足實時或者準實時的要求,因此算法的計算量不能太大,避免複雜、過多的計算形成用戶體驗的降低。鑑於此,推薦精度每每不會很高。實時推薦系統更關心推薦結果的動態變化能力,只要更新推薦結果的理由合理便可
,至於推薦的精度要求則能夠適當放寬
。
因此對於實時推薦算法,主要有兩點需求:
(1)用戶本次評分後、或最近幾個評分後系統能夠明顯的更新推薦結果。
(2)計算量不大,知足響應時間上的實時或者準實時要求。
咱們在 recommender 下新建子項目 OnlineRecommender,引入 spark、scala、mongo、redis 和 kafka 的依賴:
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- Redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
代碼中首先定義樣例類和一個鏈接助手對象(用於創建 redis 和 mongo 鏈接),並在 OnlineRecommender 中定義一些常量:
src/main/scala/com.atguigu.online/OnlineRecommender.scala
package com.atguigu.online
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
// 定義樣例類
// 鏈接助手對象(用於創建 redis 和 mongo 的鏈接)並序列化
object ConnHelper extends Serializable {
// 懶變量:使用的時候才初始化
lazy val jedis = new Jedis("hadoop102")
// 用於 MongoDB 中的一些複雜操做(讀寫以外的操做)
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://hadoop102:27017/ECrecommender"))
}
// MongoDB 鏈接配置對象
case class MongoConfig(uri: String, db: String)
// 標準推薦對象,productId, score
case class Recommendation(productId: Int, score: Double)
// 用戶推薦列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 商品類似度列表(商品類似度矩陣/商品推薦列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OnlineRecommender {
// 定義常量和表名
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_PRODUCTS_NUM = 20
def main(args: Array[String]): Unit = {
}
}
實時推薦主體代碼以下:
object OnlineRecommender {
// 定義常量和表名
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_PRODUCTS_NUM = 20
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender",
"kafka.topic" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 建立一個 StreamContext
val ssc = new StreamingContext(sc, Seconds(2)) // 通常 500 毫秒以上
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 加載數據:加載 MongoDB 中 ProductRecs 表的數據(商品類似度列表/商品類似度矩陣/商品推薦列表)
val simProductsMatrixMap = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load() // DF
.as[ProductRecs] // DS
.rdd // RDD
.map { recs =>
(recs.productId, recs.recs.map(item => (item.productId, item.score)).toMap)
}.collectAsMap() // Map[(productId, Map[(productId, score)])] 轉換成 Map 結構,這麼作的目的是:爲了後續查詢商品類似度方便
// 將 商品類似度 Map 廣播出去
val simProductsMatrixMapBroadCast = sc.broadcast(simProductsMatrixMap)
// 建立到 Kafka 的鏈接
val kafkaPara = Map(
"bootstrap.servers" -> "hadoop102:9092", // 使用的是 Kafka 的高級 API
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ECrecommender",
"auto.offset.reset" -> "latest"
)
// 建立 kafka InputDStream
val kafkaInputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaPara)
)
// UID|PID|SCORE|TIMESTAMP
// userId|productId|score|timestamp
// 產生評分流
// ratingDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(userId, productId, score, timestamp)]
val ratingDStream = kafkaInputDStream.map {
case msg =>
val attr = msg.value().split("\\|")
(attr(0).trim.toInt, attr(1).trim.toInt, attr(2).trim.toDouble, attr(3).toInt)
}
// TODO: 對評分流的處理流程
ratingDStream.foreachRDD {
rdds =>
rdds.foreach {
case (userId, productId, score, timestamp) =>
println("rating data coming! >>>>>>>>>>>>>>>>>>>> ")
// TODO: 核心實時推薦算法流程
// 一、從 redis 中獲取 當前用戶最近的 K 次商品評分,保存成一個數組 Array[(productId, score)]
val userRecentlyRatings = getUserRecentlyRatings(MAX_USER_RATINGS_NUM, userId, ConnHelper.jedis)
// 二、從 MongoDB 的 商品類似度列表 中獲取 當前商品 p 的 K 個最類似的商品列表,做爲候選商品列表,保存成一個數組 Array[(productId)]
val candidateProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM, productId, userId, simProductsMatrixMapBroadCast.value)
// 三、計算每個 候選商品 q 的 推薦優先級得分,獲得 當前用戶的實時推薦列表,保存成一個數組 Array[(productId, score)]
// 爲何不保存成 Recommendation 的列表呢?答:由於最後保存的過程中不用 DataFram 的 write() 方法了,而是將每個元素包裝成 MongoDBObject 對象,而後插入列表中去
val streamRecs = computeProductsScore(candidateProducts, userRecentlyRatings, simProductsMatrixMapBroadCast.value)
// 四、將 當前用戶的實時推薦列表數據 保存到 MongoDB
storeDataInMongDB(userId, streamRecs)
}
}
// 啓動 Streaming 程序
ssc.start()
println(">>>>>>>>>>>>>>>>>>>> streaming started!")
ssc.awaitTermination()
}
實時推薦算法的前提:
一、在 Redis 集羣中存儲了每個用戶最近對商品的 K 次評分。實時算法能夠快速獲取。
二、離線推薦算法已經將商品類似度矩陣提早計算到了 MongoDB 中。
三、Kafka 已經獲取到了用戶實時的評分數據。
算法過程以下:
實時推薦算法輸入爲一個評分流 <userId, productId, score, timestamp>
,而執行的核心內容包括:獲取 userId 最近 K 次商品評分、獲取 productId 最類似 K 個商品、計算候選商品的推薦優先級、更新對 userId 的實時推薦結果。
業務服務器在接收用戶評分的時候,默認會將該評分狀況以 userId, productId, score, timestamp
的格式插入到 Redis 中該用戶對應的隊列當中,在實時算法中,只須要經過 Redis 客戶端獲取相對應的隊列內容便可。
// 由於 redis 操做返回的是 java 類,爲了使用 map 操做須要引入轉換類
import scala.collection.JavaConversions._
/**
* 一、從 redis 中獲取 當前用戶最近的 K 次商品評分,保存成一個數組 Array[(productId, score)]
*
* @param MAX_USER_RATINGS_NUM
* @param userId
* @param jedis
*/
def getUserRecentlyRatings(MAX_USER_RATINGS_NUM: Int, userId: Int, jedis: Jedis) = {
// redis 中的列表類型(list)能夠存儲一個有序的字符串列表
// 從 redis 中 用戶的評分隊列 裏獲取評分數據,list 中的 鍵 userId:4867 值 457976:5.0
jedis.lrange("userId:" + userId.toString, 0, MAX_USER_RATINGS_NUM)
.map { item =>
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}
.toArray
}
在離線算法中,已經預先將商品的類似度矩陣進行了計算,因此每一個商品 productId 的最類似的 K 個商品很容易獲取:從 MongoDB 中讀取 ProductRecs 數據,從 productId 在 candidateProducts 對應的子哈希表中獲取類似度前 K 大的那些商品。輸出是數據類型爲 Array[Int] 的數組,表示與 productId 最類似的商品集合,並命名爲 candidateProducts 以做爲候選商品集合。
/**
* 二、從 MongoDB 的 商品類似度列表 中獲取 當前商品 p 的 K 個最類似的商品列表,做爲候選商品列表,保存成一個數組 Array[(productId)]
*
* @param MAX_SIM_PRODUCTS_NUM
* @param productId
* @param userId
* @param simProductsMatrixMap
*/
def getTopSimProducts(MAX_SIM_PRODUCTS_NUM: Int, productId: Int, userId: Int, simProductsMatrixMap: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
// 一、從廣播變量 商品類似度矩陣 中拿到當前商品的類似度商品列表
// simProductsMatrixMap: Map[(productId, Map[(productId, score)])]
// allSimProducts: Array[(productId, score)]
val allSimProducts = simProductsMatrixMap(productId).toArray
// 二、定義經過 MongoDB 客戶端拿到的表操做對象
val ratingCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
// 獲取用戶已經評分過的商品(經過 MongoDBObject 對象)
val ratingExist = ratingCollection.find(MongoDBObject("userId" -> userId)).toArray.map(item => item.get("productId").toString.toInt)
// 三、過濾掉用戶已經評分過的商品,排序輸出
allSimProducts.filter(x => !ratingExist.contains(x._1))
.sortWith(_._2 > _._2)
.take(MAX_SIM_PRODUCTS_NUM)
.map(x => x._1)
}
對於候選商品集合 candidateProducts 和 userId 的最近 K 個評分 userRecentlyRatings,算法代碼內容以下:
/**
* 三、計算每個 候選商品 q 的 推薦優先級得分,獲得 當前用戶的實時推薦列表,保存成一個數組 Array[(productId, score)]
*
* @param candidateProducts
* @param userRecentlyRatings
* @param simProductsMatrixMap
*/
def computeProductsScore(candidateProducts: Array[Int], userRecentlyRatings: Array[(Int, Double)], simProductsMatrixMap: collection.Map[Int, Map[Int, Double]]) = {
// 一、定義一個長度可變的數組 scala ArrayBuffer,用於保存每個候選商品的基礎得分
val scores = ArrayBuffer[(Int, Double)]()
// 二、定義兩個可變的 scala HashMap,用於保存每個候選商品的加強因子和減弱因子
val increMap = HashMap[Int, Int]()
val decreMap = HashMap[Int, Int]()
// 三、對 每個候選商品 和 每個已經評分的商品 計算推薦優先級得分
for (candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings) {
// 獲取當前 候選商品 和當前 最近評分商品 的類似度的得分
val simScore = getProductSimScore(candidateProduct, userRecentlyRating._1, simProductsMatrixMap)
if (simScore > 0.6) {
// 計算 候選商品 的基礎得分
scores += ((candidateProduct, simScore * userRecentlyRating._2))
// 計算 加強因子 和 減弱因子
if (userRecentlyRating._2 > 3) {
increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1
} else {
decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1
}
}
}
// 四、根據備選商品的 productId 作 groupBy,而後根據公式最後求出候選商品的 推薦優先級得分 並排序
scores.groupBy(_._1).map {
case (productId, scoreList) =>
(productId, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)))
}.toArray.sortWith(_._2 > _._2)
}
其中,getTopSimProducts 是取候選商品和已評分商品的類似度,代碼以下:
/**
* 獲取當前 候選商品 和當前 最近評分商品 的類似度的得分,獲得一個 Double
*
* @param productId1
* @param productId2
* @param simProductsMatrixMap
*/
def getProductSimScore(productId1: Int, productId2: Int, simProductsMatrixMap: collection.Map[Int, Map[Int, Double]]) = {
simProductsMatrixMap.get(productId1) match {
case Some(map) =>
map.get(productId2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
而 log 是對數運算,這裏實現爲取 10 的對數(經常使用對數):
/**
* 求一個數以10爲底數的對數(使用換底公式)
*
* @param m
* @return
*/
def log(m: Int): Double = {
val N = 10
math.log(m) / math.log(N) // 底數爲 e => ln m / ln N = log m N = lg m
}
storeDataInMongDB 函數實現告終果的保存:
/**
* 四、將 當前用戶的實時推薦列表數據 保存到 MongoDB
*
* @param userId
* @param streamRecs
*/
def storeDataInMongDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
// 到 StreamRecs 的鏈接
val streaRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
streaRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
streaRecsCollection.insert(MongoDBObject("userId" -> userId, "recs" -> streamRecs.map(x => MongoDBObject("productId" -> x._1, "score" -> x._2))))
}
當計算出候選商品的推薦優先級的數組 updatedRecommends<productId, E>
後,這個數組將被髮送到 Web 後臺服務器,與後臺服務器上 userId 的上次實時推薦結果 recentRecommends<productId, E>
進行合併、替換並選出優先級 E 前 K 大的商品做爲本次新的實時推薦。具體而言:
a、合併:將 updatedRecommends 與 recentRecommends 並集合成爲一個新的 <productId, E>
數組;
b、替換(去重):當 updatedRecommends 與 recentRecommends 有重複的商品 productId 時,recentRecommends 中 productId 的推薦優先級因爲是上次實時推薦的結果,因而將做廢,被替換成表明了更新後的 updatedRecommends 的 productId 的推薦優先級;
c、選取 TopK:在合併、替換後的 <productId, E>
數組上,根據每一個 product 的推薦優先級,選擇出前 K 大的商品,做爲本次實時推薦的最終結果。
咱們的系統實時推薦的數據流向是:業務系統 -> 埋點日誌 -> flume 日誌採集 -> kafka streaming 數據清洗和預處理 -> spark streaming 流式計算。在咱們完成實時推薦服務的代碼後,應該與其它工具進行聯調測試,確保系統正常運行。
啓動實時推薦系統 OnlineRecommender 以及 mongodb、redis。
參考連接:https://www.cnblogs.com/chenmingjun/p/10914837.html
啓動 hadoop 集羣命令:
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 ~]$ zkstart.sh
[atguigu@hadoop102 ~]$ kafka-start.sh
在 recommender 下新建 module,KafkaStreaming,主要用來作日誌數據的預處理,過濾出須要的內容。pom.xml 文件須要引入依賴:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastream</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.kafkastream.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在 src/main/java 下新建 java 類 com.atguigu.kafkastreaming.Application
package com.atguigu.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
public class Application {
public static void main(String[] args) {
String brokers = "hadoop102:9092";
String zookeepers = "hadoop102:2181";
// 定義輸入和輸出的 topic
String from = "log";
String to = "ECrecommender";
// 定義 kafka streaming 的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
StreamsConfig config = new StreamsConfig(settings);
// 拓撲建構器
TopologyBuilder builder = new TopologyBuilder();
// 定義流處理的拓撲結構
builder.addSource("SOURCE", from)
.addProcessor("PROCESSOR", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESSOR");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
System.out.println("kafka stream started!");
}
}
這個程序會將 topic 爲 「log」 的信息流獲取來作處理,並以 「ECrecommender」 爲新的 topic 轉發出去。
流處理程序 LogProcess.java
package com.atguigu.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(byte[] dummy, byte[] line) { // dummy 表示 啞變量,沒什麼用
// 把收集到的日誌信息用 String 表示
String input = new String(line);
// 根據前綴 PRODUCT_RATING_PREFIX: 從日誌信息中提取評分數據
if (input.contains("PRODUCT_RATING_PREFIX:")) {
System.out.println("product rating data coming! >>>>>>>>>>>>>>>>>>>> " + input);
input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
完成代碼後,啓動 Application。
在 flume 的 /opt/module/flume/job/ECrecommender 目錄下(該目錄任意)新建 flume-log-kafka.conf,對 flume 鏈接 kafka 作配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined.
agent.sources.exectail.type = exec
# 下面這個路徑是須要收集日誌的絕對路徑,改成本身的日誌目錄(系統部署後的 tomcat 的日誌目錄)
agent.sources.exectail.command = tail –f /opt/module/tomcat/logs/catalina.out
agent.sources.exectail.interceptors = i1
agent.sources.exectail.interceptors.i1.type = regex_filter
# 定義日誌過濾前綴的正則
agent.sources.exectail.interceptors.i1.regex = .+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined.
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
# Specify the channel the sink should use.
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel.
agent.channels.memoryChannel.capacity = 10000
配置好後,啓動 flume:
[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ --name a1 --conf-file job/ECrecommender/flume-log-kafka.conf \
-Dflume.root.logger=INFO,console
將業務代碼加入系統中。注意在 src/main/resources/ 下的 log4j.properties 中,log4j.appender.file.File 的值應該替換爲本身的日誌目錄,與 flume 中的配置應該相同(當 flume 與 業務代碼在同一臺機器上時這麼作,不然 flume 指向的應該是系統部署後的 tomcat 的日誌目錄
)。
啓動業務系統後臺,訪問 localhost:8088/index.html;點擊某個商品進行評分,查看實時推薦列表是否會發生變化。
整個推薦系統更多的是依賴於用於的偏好信息進行商品的推薦,那麼就會存在一個問題,對於新註冊的用戶是沒有任何偏好信息記錄的,那這個時候推薦就會出現問題,致使沒有任何推薦的項目出現。
處理這個問題通常是經過當用戶首次登錄時,爲用戶提供交互式的窗口來獲取用戶對於物品的偏好,讓用戶勾選預設的興趣標籤
。
當獲取用戶的偏好以後,就能夠直接給出相應類型商品的推薦。
原始數據中的 tag 文件,是用戶給商品打上的標籤,這部份內容想要直接轉成評分並不容易,不過咱們能夠將標籤內容進行提取,獲得商品的內容特徵向量,進而能夠經過求取商品內容類似度矩陣
。這部分能夠與實時推薦系統直接對接,計算出與用戶當前評分商品的類似商品,實現基於內容的實時推薦
。爲了不熱門標籤對特徵提取的影響,咱們還能夠經過 TF-IDF 算法對標籤的權重進行調整,從而儘量地接近用戶偏好。
咱們在 recommender 下新建子項目 ContentRecommender,引入 spark、scala、mongo 和 jblas 的依賴:
<dependencies>
<!-- java 線性代數的庫 -->
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
基於以上思想,加入 TF-IDF 算法將商品的標籤內容進行提取,獲得商品的內容特徵向量 的核心代碼以下:
src/main/scala/com.atguigu.content/ContentRecommender.scala
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
case class MongoConfig(uri: String, db: String)
// 標準推薦對象,productId, score
case class Recommendation(productId: Int, score: Double)
// 商品類似度列表(商品類似度矩陣/商品推薦列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ContentRecommender {
// 定義 mongodb 中存儲的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 MongoDB 中的數據加載進來
val productTagsDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.map(product => (product.productId, product.name, product.tags.map(x => if (x == '|') ' ' else x))) // 由於 TF-IDF 默認使用的分詞器的分隔符是空格
.toDF("productId", "name", "tags")
.cache()
// TODO: 用 TF-IDF 算法將商品的標籤內容進行提取,獲得商品的內容特徵向量
// 一、實例化一個分詞器,用來作分詞,默認按照空格進行分詞(注意:org.apache.spark.ml._ 下的 API 都是針對 DF 來操做的)
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分詞器作轉換後,獲得增長一個新列 words 的 DF
val wordsDataDF = tokenizer.transform(productTagsDF)
// 二、定義一個 HashingTF 工具,用於計算頻次 TF(映射特徵的過程使用的就是 Hash 算法,特徵的數量就是 Hash 的分桶數量,若分桶的數量太小,會出現 Hash 碰撞,默認分桶很大,後面作笛卡爾積性能不好)
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rowFeatures").setNumFeatures(1000)
// 用 HashingTF 作轉換
val featurizedDataDF = hashingTF.transform(wordsDataDF)
// 三、定義一個 IDF 工具,計算 TF-IDF
val idf = new IDF().setInputCol("rowFeatures").setOutputCol("features")
// 訓練一個 idf 模型,將詞頻數據傳入,獲得 idf 模型(逆文檔頻率)
val idfModel = idf.fit(featurizedDataDF)
// 經過 idf 模型轉換後,獲得增長一個新列 features 的 DF,即用 TF-IDF 算法獲得新的特徵矩陣
val rescaleDataDF = idfModel.transform(featurizedDataDF)
// 測試
// rescaleDataDF.show(truncate = false)
// 對數據進行轉換,獲得所須要的 RDD
// 從獲得的 rescaledDataDF 中提取特徵向量
val productFeaturesRDD = rescaleDataDF
.map {
row => // DF 轉換爲 二元組
(row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)
}
.rdd
.map {
case (productId, featuresArray) =>
(productId, new DoubleMatrix(featuresArray))
}
// 將 商品的特徵矩陣 和 商品的特徵矩陣 作一個笛卡爾積,獲得一個稀疏的 productFeaturesCartRDD
val productFeaturesCartRDD = productFeaturesRDD.cartesian(productFeaturesRDD)
// 測試
// productFeaturesCartRDD.foreach(println(_))
// 獲取 商品類似度列表(商品類似度矩陣/商品推薦列表)
val productSimDF = productFeaturesCartRDD
.filter { // 過濾掉本身與本身作笛卡爾積的數據
case (a, b) =>
a._1 != b._1
}
.map { // 計算餘弦類似度
case (a, b) =>
val simScore = this.consinSim(a._2, b._2)
// 返回一個二元組 productSimRDD: RDD[(productId, (productId, consinSim))]
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 將 DF 數據寫入 MongoDB 數據庫對應的表中
storeDFInMongoDB(productSimDF, CONTENT_PRODUCT_RECS)
spark.stop()
}
/**
* 計算兩個商品之間的餘弦類似度(使用的是向量點積公式)
*
* @param product1
* @param product2
* @return
*/
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
// dot 表示點積,norm2 表示模長,模長就是 L2範式
product1.dot(product2) / (product1.norm2() * product2.norm2()) // l1範數:向量元素絕對值之和;l2範數:即向量的模長(向量的長度)
}
/**
* 將 DF 數據寫入 MongoDB 數據庫對應的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
}
而後經過商品特徵向量進而求出商品類似度矩陣,就能夠在商品詳情頁給出類似推薦了;一般在電商網站中,用戶瀏覽商品或者購買完成以後,都會顯示相似的推薦列表。
獲得的類似度矩陣也能夠爲實時推薦提供基礎,獲得用戶推薦列表。能夠看出,基於內容模型 和 基於隱語義模型
,目的都是爲了提取出物品的特徵向量
,從而能夠計算出物品的類似度矩陣
。而咱們的實時推薦系統算法正是基於類似度來定義的。
基於物品的協同過濾(Item-CF),只需收集用戶的常規行爲數據(好比點擊、收藏、購買等)就能夠獲得商品間的類似度,在實際項目中應用很廣。
咱們在 recommender 下新建子項目 ItemCFRecommender,引入 spark、scala、mongo 和 jblas 的依賴:
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
核心代碼實現以下:
src/main/scala/com.atguigu.itemcf/ItemCFRecommender.scala
// 定義樣例類
// 注意:spark mllib 中有 Rating 類,爲了便於區別,咱們從新命名爲 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 標準推薦對象,productId, score
case class Recommendation(productId: Int, score: Double)
// 商品類似度列表(商品類似度矩陣/商品推薦列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ItemCFRecommender {
// 定義 MongoDB 中存儲的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定義 MongoDB 中推薦表的名稱
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
val MAX_RECOMMENDATION = 10
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 MongoDB 中的數據加載進來,獲得 DF (userId, productId, count)
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(productRating => (productRating.userId, productRating.productId, productRating.score))
.toDF("userId", "productId", "score")
.cache()
// TODO: 核心算法:基於物品的協同過濾推薦(類似推薦)--計算物品的同現類似度,獲得商品的類似度列表
// 一、統計每一個商品的評分個數,使用 ratingDF 按照 productId 作 groupBy,獲得 (productId, count)
val productRatingCountDF = ratingDF.groupBy("productId").count()
// 二、在原有的 rating 表中添加一列 count,獲得新的 評分表,將 ratingDF 和 productRatingCountDF 作內鏈接 join 便可,獲得 (productId, userId, score, count)
val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")
// 三、將 新的評分表 按照 userId 作兩兩 join,統計兩個商品被同一個用戶評分過的次數,獲得 (userId, productId1, score1, count1, productId2, score2, count2)
val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
.toDF("userId", "productId1", "score1", "count1", "productId2", "score2", "count2") // 設置 DF 的列名稱
.select("userId", "productId1", "count1", "productId2", "count2") // 設置 DF 顯示的列
// 建立一個名爲 joined 的臨時表,用於寫 sql 查詢
joinedDF.createOrReplaceTempView("joined")
// 四、按照 productId1, productId2 作 groupBy,統計 userId 的數量,獲得對兩個商品同時評分的人數
val sql =
"""
|select productId1
|, productId2
|, count(userId) as cooCount
|, first(count1) as count1
|, first(count2) as count2
|from joined
|group by productId1, productId2
""".stripMargin
val cooccurrenceDF = spark.sql(sql).cache() // (productId1, productId2, cooCount, count1, count2)
val simDF = cooccurrenceDF
.map {
row =>
val coocSim = cooccurrenceSim(row.getAs[Long]("cooCount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )
(row.getInt(0), (row.getInt(1), coocSim))
}
.rdd
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.filter(x => x._1 != productId).sortWith(_._2 > _._2).take(MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 測試
// simDF.show()
// 將 DF 數據寫入 MongoDB 數據庫對應的表中
storeDFInMongoDB(simDF, ITEM_CF_PRODUCT_RECS)
spark.stop()
}
}
其中,計算同現類似度的函數代碼實現以下:
/**
* 計算同現類似度
*
* @param cooCount
* @param count1
* @param count2
*/
def cooccurrenceSim(cooCount: Long, count1: Long, count2: Long) = {
cooCount / math.sqrt(count1 * count2)
}
其中,將 DF 數據寫入 MongoDB 數據庫對應的表中的函數代碼實現以下:
/**
* 將 DF 數據寫入 MongoDB 數據庫對應的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}