第1章 項目體系架構設計1.1 項目系統架構1.2 項目數據流程1.3 數據模型1.4 離線統計服務1.5 離線推薦服務(基於 LFM 模型)1.6 實時推薦服務(基於自定義模型)1.7 離線推薦服務--基於內容的協同過濾推薦(類似推薦)1.8 離線推薦服務--基於物品的協同過濾推薦(類似推薦)1.9 混合推薦--分區混合第2章 工具環境搭建第3章 建立項目並初始化業務數據3.1 在IDEA中建立maven項目3.1.1 項目框架搭建3.1.2 聲明項目中工具的版本信息3.1.3 添加項目依賴3.2 數據加載準備3.2.1 Products 數據集3.2.2 Ratings 數據集3.2.3 日誌管理配置文件3.3 數據初始化到 MongoDB3.3.1 啓動 MongoDB 數據庫(略)3.3.2 數據加載程序主體實現3.3.3 將數據寫入 MongoDBcss
項目以推薦系統建設領域知名的通過修改過的中文亞馬遜電商數據集做爲依託,以某電商網站真實業務數據架構爲基礎,構建了符合教學體系的一體化的電商推薦系統,包含了離線推薦與實時推薦體系,綜合利用了協同過濾算法以及基於內容的推薦方法來提供混合推薦。提供了從前端應用、後臺服務、算法設計實現、平臺部署等多方位的閉環的業務實現。html
【數據存儲部分】
業務數據庫:項目採用普遍應用的文檔數據庫 MongDB 做爲主數據庫,主要負責平臺業務邏輯數據的存儲。
緩存數據庫:項目採用 Redis 做爲緩存數據庫,主要用來支撐實時推薦系統部分對於數據的高速獲取需求。
【離線推薦部分】
離線統計服務:批處理統計性業務採用 Spark Core + Spark SQL 進行實現,實現對指標類數據的統計任務。
離線推薦服務:離線推薦業務採用 Spark Core + Spark MLlib 進行實現,採用 ALS 算法進行實現。
【實時推薦部分】
日誌採集服務:經過利用 Flume-ng 對業務平臺中用戶對於商品的一次評分行爲進行採集,實時發送到 Kafka 集羣。
消息緩衝服務:項目採用 Kafka 做爲流式數據的緩存組件,接受來自 Flume 的數據採集請求。並將數據推送到項目的實時推薦系統部分。
實時推薦服務:項目採用 Spark Streaming 做爲實時推薦系統,經過接收 Kafka 中緩存的數據,經過設計的推薦算法實現對實時推薦的數據處理,並將結構合併更新到 MongoDB 數據庫。前端
【系統初始化部分】
0、經過 Spark SQL 將系統初始化數據加載到 MongoDB 中。
【離線推薦部分】
一、離線統計服務從 MongoDB 中加載數據,將【商品平均評分統計】、【商品評分個數統計】、【最近商品評分個數統計】三個統計算法進行運行實現,並將計算結果回寫到 MongoDB 中;離線推薦服務從 MongoDB 中加載數據,經過 ALS 算法分別將【用戶推薦結果矩陣】、【影片類似度矩陣】回寫到 MongoDB 中。
【實時推薦部分】
二、Flume 從綜合業務服務的運行日誌中讀取日誌更新,並將更新的日誌實時推送到 Kafka 中;Kafka 在收到這些日誌以後,經過 kafkaStream 程序對獲取的日誌信息進行過濾處理,獲取用戶評分數據流【UID|PID|SCORE|TIMESTAMP】,併發送到另一個 Kafka 隊列;Spark Streaming 監聽 Kafka 隊列,實時獲取 Kafka 過濾出來的用戶評分數據流,融合存儲在 Redis 中的用戶最近評分隊列數據,提交給實時推薦算法,完成對用戶新的推薦結果計算;計算完成以後,將新的推薦結構和 MongDB 數據庫中的推薦結果進行合併。
【業務系統部分】
三、推薦結果展現部分,從 MongoDB 中將離線推薦結果、實時推薦結果、內容推薦結果進行混合,綜合給出相對應的數據。
四、商品信息查詢服務經過對接 MongoDB 實現對商品信息的查詢操做。
五、商品評分部分,獲取用戶經過 UI 給出的評分動做,後臺服務進行數據庫記錄後,一方面將數據推進到 Redis 羣中,另外一方面,經過預設的日誌框架輸出到 Tomcat 中的日誌中。
六、商品標籤部分,項目提供用戶對商品打標籤服務。java
各數據表解析redis
數據源解析 以及 主要數據模型算法
咱們的項目中用到了多種工具進行數據的存儲、計算、採集和傳輸,本章主要簡單介紹設計的工具環境搭建。
若是機器的配置不足,推薦只採用一臺虛擬機進行配置,而非徹底分佈式,將該虛擬機CPU的內存設置的儘量大,推薦爲CPU > 四、MEM > 4GB。
參考連接:https://www.cnblogs.com/chenmingjun/p/10914837.htmlsql
咱們的項目主體用 Scala 編寫,採用 IDEA 做爲開發環境進行項目編寫,採用 maven 做爲項目構建和管理工具。mongodb
打開 IDEA,建立一個 maven 項目,命名爲 ECommerceRecommendSystem。爲了方便後期的聯調,咱們會把業務系統的代碼也添加進來,因此咱們能夠以 ECommerceRecommendSystem 做爲父項目,並在其下建一個名爲 recommender 的子項目,而後再在下面搭建多個子項目用於提供不一樣的推薦服務。數據庫
在 ECommerceRecommendSystem 下新建一個 maven module 做爲子項目,命名爲 recommender。一樣的,再以 recommender 爲父項目,新建一個 maven module 做爲子項目。咱們的第一步是初始化業務數據,因此子項目命名爲 DataLoader。
父項目只是爲了規範化項目結構,方便依賴管理,自己是不須要代碼實現的,因此 ECommerceRecommendSystem 和 recommender 下的 src 文件夾均可以刪掉。
目前的總體項目框架以下:apache
咱們整個項目須要用到多個工具,它們的不一樣版本可能會對程序運行形成影響,因此應該在最外層的 ECommerceRecommendSystem 中聲明全部子項目共用的版本信息。
在pom.xml中加入如下配置:
ECommerceRecommendSystem/pom.xml
<properties>
<log4j.version>1.2.17</log4j.version><!-- 日誌的具體實現 -->
<slf4j.version>1.7.22</slf4j.version><!-- 日誌接口 -->
<mongodb-spark.version>2.0.0</mongodb-spark.version><!-- mongodb 與 spark 之間的鏈接器 -->
<casbah.version>3.1.1</casbah.version><!-- mongodb 與 scala 之間的 Driver -->
<redis.version>2.9.0</redis.version>
<kafka.version>0.10.2.1</kafka.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<jblas.version>1.2.1</jblas.version><!-- java 線性代數的庫 -->
</properties>
首先,對於整個項目而言,應該有一樣的日誌管理,咱們在 ECommerceRecommendSystem 中引入公有依賴:
ECommerceRecommendSystem/pom.xml
<dependencies>
<!-- 引入共同的日誌管理工具 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
一樣,對於 maven 項目的構建,能夠引入公有的插件:
<build>
<!-- 聲明並引入子項目共有的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<!-- 全部的編譯用 JDK1.8 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<!-- 聲明但不引入子項目共有的插件,子項目若是依賴須要自行引入 -->
<plugins>
<!-- maven 的打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 該插件用於將 scala 代碼編譯成 class 文件-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<!-- 綁定到 maven 的編譯階段-->
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
而後,在 recommender 模塊中,咱們能夠爲全部的推薦模塊聲明 spark 相關依賴(這裏的 dependencyManagement 表示僅聲明相關信息,子項目若是依賴須要自行引入):
ECommerceRecommendSystem/recommender/pom.xml
<dependencyManagement>
<!-- 聲明但不引入子項目共有的插件,子項目若是依賴須要自行引入 -->
<dependencies>
<!-- 引入 Spark 相關的 Jar 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
因爲各推薦模塊都是 scala 代碼,還應該引入 scala-maven-plugin 插件,用於 scala 程序的編譯。由於插件已經在父項目中聲明,因此這裏不須要再聲明版本和具體配置:
<build>
<plugins>
<!-- 父項目已聲明該 plugin,子項目在引入的時候,不用聲明版本和已經聲明的配置 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
對於具體的 DataLoader 子項目,須要 spark 相關組件,還須要 mongodb 的相關依賴,咱們在 pom.xml 文件中引入全部依賴(在父項目中已聲明的不須要再加詳細信息):
ECommerceRecommendSystem/recommender/DataLoader/pom.xml
<!-- 對於具體的子項目而言,須要 spark 相關組件,還須要 mongodb 的相關依賴,咱們引入全部依賴(在父項目中已聲明的不須要再加詳細信息) -->
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
至此,咱們作數據加載須要的依賴都已配置好,能夠開始寫代碼了。
在 src/main/ 目錄下,能夠看到已有的默認源文件目錄是 java,咱們能夠將其更名爲 scala。將數據文件 products.csv,ratings.csv 複製到資源文件目錄 src/main/resources 下,咱們將從這裏讀取數據並加載到 mongodb 中。
數據格式:
productId,name,categoryIds,amazonId,imageUrl,categories,tags
例如:
3982^Fuhlen 富勒 M8眩光舞者時尚節能無線鼠標(草綠)(眩光.悅動.時尚炫舞鼠標 12個月免換電池 高精度光學尋跡引擎 超細微接收器10米傳輸距離)^1057,439,736^B009EJN4T2^https://images-cn-4.ssl-images-amazon.com/images/I/31QPvUDNavL._SY300_QL70_.jpg^外設產品|鼠標|電腦/辦公^富勒|鼠標|電子產品|好用|外觀漂亮
Products 數據集有 7 個字段,每一個字段之間經過 「^」 符號進行分割。其中的 categoryIds、amazonId 對於內容特徵沒有實質幫助,咱們只須要其它5個字段:
數據格式:
userId,prudcutId,rating,timestamp
例如:
4867,457976,5.0,1395676800
Rating 數據集有 4 個字段,每一個字段之間經過 「,」 分割。
log4j 對日誌的管理,須要經過配置文件來生效。在 src/main/resources 下新建配置文件 log4j.properties,寫入如下內容:
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
咱們會爲原始數據定義幾個樣例類,經過 SparkContext 的 textFile 方法從文件中讀取數據,並轉換成 DataFrame,再利用 Spark SQL 提供的 write 方法進行數據的分佈式插入。
在 DataLoader/src/main/scala 下新建 package,命名爲 com.atguigu.recommender,新建名爲 DataLoader 的 scala 單例 object 對象文件。
程序主體代碼以下:
DataLoader/src/main/scala/com.atguigu.recommerder/DataLoader.scala
// 定義樣例類
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
// 注意:spark mllib 中有 Rating 類,爲了便於區別,咱們從新命名爲 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
object DataLoader {
// 定義數據文件路徑
val PRODUCT_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea3\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\products.csv"
val RATING_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea3\\ECommerceRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
// 定義 MongoDB 中存儲的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val MONGODB_RATING_COLLECTION = "Rating"
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一個 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
// 建立一個 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一個 sparkContext
val sc = spark.sparkContext
// 加入隱式轉換:在對 DataFrame 和 Dataset 進行操做許多操做都須要這個包進行支持
import spark.implicits._
// 將 Products、Ratings 數據集加載進來
val productRDD = sc.textFile(PRODUCT_DATA_PATH)
// 將 prodcutRDD 裝換爲 DataFrame
val productDF = productRDD.map(item => {
// productId,name,categoryIds,amazonId,imageUrl,categories,tags
val attr = item.split("\\^")
// 取出數據,轉換成樣例類
Product(attr(0).trim.toInt, attr(1).trim, attr(4).trim, attr(5).trim, attr(6).trim)
}).toDF()
val ratingRDD = sc.textFile(RATING_DATA_PATH)
//將 ratingRDD 轉換爲 DataFrame
val ratingDF = ratingRDD.map(item => {
// userId,prudcutId,rating,timestamp
val attr = item.split(",")
// 取出數據,轉換成樣例類
ProductRating(attr(0).trim.toInt, attr(1).trim.toInt, attr(2).trim.toDouble, attr(3).trim.toInt)
}).toDF()
// 聲明一個隱式的配置對象,方便重複調用(當屢次調用對 MongoDB 的存儲或讀寫操做時)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 將數據保存到 MongoDB 中
storeDataInMongDB(productDF, ratingDF)
// 關閉 Spark
spark.stop()
}
接下來,實現 storeDataInMongo 方法,將數據寫入 mongodb 中:
/**
* 將數據寫入 MongoDB 中
*
* @param productDF
* @param ratingDF
* @param mongoConfig
*/
def storeDataInMongDB(productDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig) = {
// 建立一個到 MongoDB 的鏈接客戶端
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
// 定義經過 MongoDB 客戶端拿到的表操做對象
val productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)
val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
// 若是 MongoDB 中已有對應的表,那麼應該刪除
productCollection.dropCollection()
ratingCollection.dropCollection()
// 將當前數據寫入到 MongoDB 對應的表中
productDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 對數據表建立索引
productCollection.createIndex(MongoDBObject("productId" -> 1))
ratingCollection.createIndex(MongoDBObject("userId" -> 1))
ratingCollection.createIndex(MongoDBObject("productId" -> 1))
// 關閉 MongoDB 的鏈接 mongoClient.close() }