第1章 項目體系架構設計1.1 項目系統架構1.2 項目數據流程1.3 數據模型第2章 工具環境搭建第3章 建立項目並初始化業務數據3.1 在 IDEA 中建立 maven 項目3.1.1 項目框架搭建3.1.2 聲明項目中工具的版本信息3.1.3 添加項目依賴3.2 數據加載準備3.2.1 movie.csv3.2.2 ratings.csv3.2.3 tags.csv3.2.4 日誌管理配置文件3.3 數據初始化到 MongoDB3.3.1 啓動 MongoDB 數據庫(略)3.3.2 數據加載程序主體實現3.3.3 將數據寫入 MongoDB3.3.4 MongoDB 中查看結果3.4 數據初始化到 ElasticSearch3.4.1 啓動 ElasticSearch 服務器(略)3.4.2 將數據寫入 ElasticSearch3.4.3 ElasticSearch 中查看結果第4章 離線推薦服務建設4.1 離線推薦服務4.2 離線統計服務4.2.1 歷史熱門電影統計4.2.2 最近熱門電影統計4.2.3 電影平均得分統計4.2.4 每一個類別優質電影統計4.2.5 測試查看4.3 基於隱語義模型的協同過濾推薦4.3.1 用戶電影推薦矩陣4.3.2 電影類似度矩陣4.3.3 模型評估和參數選取第5章 實時推薦服務建設5.1 實時推薦服務5.2 實時推薦算法設計5.3 實時推薦算法的實現5.3.1 獲取用戶的 K 次最近評分5.3.2 獲取當前電影最類似的 K 個電影5.3.3 電影推薦優先級計算5.3.4 將結果保存到 mongoDB5.3.5 更新實時推薦結果5.4 實時系統聯調5.4.1 啓動實時系統的基本組件5.4.2 先啓動 zookeeper 集羣5.4.3 再啓動 kafka 集羣5.4.4 構建 Kafka Streaming 程序(簡單的 ETL)5.4.5 配置並啓動 flume5.4.6 啓動業務系統後臺第6章 冷啓動問題處理第七章 基於內容的推薦服務建設7.1 基於內容的推薦服務7.2 基於內容推薦的實現第8章 程序部署與運行html
項目以推薦系統建設領域知名的通過修改過的 MovieLens 數據集做爲依託,以某科技公司電影網站真實業務數據架構爲基礎,構建了符合教學體系的一體化的電影推薦系統,包含了離線推薦與實時推薦體系,綜合利用了協同過濾算法以及基於內容的推薦方法來提供混合推薦。提供了從前端應用、後臺服務、算法設計實現、平臺部署等多方位的閉環的業務實現。
前端![]()
用戶可視化:主要負責實現和用戶的交互以及業務數據的展現, 主體採用 AngularJS2 進行實現,部署在 Apache 服務上。(或者能夠部署在 Nginx 上)
綜合業務服務:主要實現 JavaEE 層面總體的業務邏輯,經過 Spring 進行構建,對接業務需求。部署在 Tomcat 上。
【數據存儲部分】
業務數據庫:項目採用普遍應用的文檔數據庫 MongDB 做爲主數據庫,主要負責平臺業務邏輯數據的存儲。
搜索服務器:項目採用 ElasticSearch 做爲模糊檢索服務器,經過利用 ES 強大的匹配查詢能力實現基於內容的推薦服務。
緩存數據庫:項目採用 Redis 做爲緩存數據庫,主要用來支撐實時推薦系統部分對於數據的高速獲取需求。
【離線推薦部分】
離線統計服務:批處理統計性業務採用 Spark Core + Spark SQL 進行實現,實現對指標類數據的統計任務。
離線推薦服務:離線推薦業務採用 Spark Core + Spark MLlib 進行實現,採用 ALS 算法進行實現。
工做調度服務:對於離線推薦部分須要以必定的時間頻率對算法進行調度,採用 Azkaban 進行任務的調度。
【實時推薦部分】
日誌採集服務:經過利用 Flume-ng 對業務平臺中用戶對於電影的一次評分行爲進行採集,實時發送到 Kafka 集羣。
消息緩衝服務:項目採用 Kafka 做爲流式數據的緩存組件,接受來自 Flume 的數據採集請求。並將數據推送到項目的實時推薦系統部分。
實時推薦服務:項目採用 Spark Streaming 做爲實時推薦系統,經過接收 Kafka 中緩存的數據,經過設計的推薦算法實現對實時推薦的數據處理,並將結果合併更新到 MongoDB 數據庫。
![]()
【系統初始化部分】
0、經過 Spark SQL 將系統初始化數據加載到 MongoDB 和 ElasticSearch 中。
【離線推薦部分】
一、經過 Azkaban 實現對於離線統計服務以離線推薦服務的調度,經過設定的運行時間完成對任務的觸發執行。
二、離線統計服務從 MongoDB 中加載數據,將【電影平均評分統計】、【電影評分個數統計】、【最近電影評分個數統計】三個統計算法進行運行實現,並將計算結果回寫到 MongoDB 中;離線推薦服務從 MongoDB 中加載數據,經過 ALS 算法分別將【用戶推薦結果矩陣】、【影片類似度矩陣】回寫到 MongoDB 中。
【實時推薦部分】
三、Flume 從綜合業務服務的運行日誌中讀取日誌更新,並將更新的日誌實時推送到 Kafka 中;Kafka 在收到這些日誌以後,經過 KafkaStream 程序對獲取的日誌信息進行過濾處理,獲取用戶評分數據流 (UID|MID|SCORE|TIMESTAMP),併發送到另一個Kafka 隊列;Spark Streaming 監聽 Kafka 隊列,實時獲取 Kafka 過濾出來的用戶評分數據流,融合存儲在 Redis 中的用戶最近評分隊列數據,提交給實時推薦算法,完成對用戶新的推薦結果計算;計算完成以後,將新的推薦結構和 MongDB 數據庫中的推薦結果進行合併。
【業務系統部分】
四、推薦結果展現部分,從 MongoDB、ElasticSearch 中將離線推薦結果、實時推薦結果、內容推薦結果進行混合,綜合給出相對應的數據。
五、電影信息查詢服務經過對接 MongoDB 實現對電影信息的查詢操做。
六、電影評分部分,獲取用戶經過 UI 給出的評分動做,後臺服務進行數據庫記錄後,一方面將數據推進到 Redis 羣中,另外一方面,經過預設的日誌框架輸出到 Tomcat 中的日誌中。
七、項目經過 ElasticSearch 實現對電影的模糊檢索。
八、電影標籤部分,項目提供用戶對電影打標籤服務。
一、Movie【電影數據表】java
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
name | String | 電影的名稱 | 無 |
descri | String | 電影的描述 | 無 |
timelong | String | 電影的時長 | 無 |
issue | String | 電影發佈時間 | 無 |
shoot | String | 電影拍攝時間 | 無 |
language | String | 電影的語言 | 無 |
genres | String | 電影所屬類別 | 無 |
actors | String | 電影的演員 | 無 |
directors | String | 電影的導演 | 無 |
二、Rating【用戶評分表】node
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
mid | Int | 電影的 ID | 無 |
score | Double | 電影的分值 | 無 |
timestamp | Long | 評分的時間 | 無 |
三、Tag【電影標籤表】linux
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
mid | Int | 電影的 ID | 無 |
tag | String | 電影的標籤 | 無 |
timestamp | Long | 評分的時間 | 無 |
四、User【用戶表】redis
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
username | String | 用戶名 | 無 |
password | String | 用戶密碼 | 無 |
first | Boolean | 用因而否第一次登陸 | 無 |
genres | List< String> | 用戶偏心的電影類型 | 無 |
timestamp | Long | 用戶建立的時間 | 無 |
五、RateMoreMoviesRecently【最近電影評分個數統計表】算法
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
count | Int | 電影的評分數 | 無 |
yearmonth | String | 評分的時段 | yyyymm |
六、RateMoreMovies【電影評分個數統計表】sql
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
count | Int | 電影的評分數 | 無 |
七、AverageMoviesScore【電影平均評分表】mongodb
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
avg | Double | 電影的平均評分 | 無 |
八、MovieRecs【電影類似性矩陣】數據庫
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
recs | Array[(mid: Int, score: Double)] | 該電影最類似的電影集合 | 無 |
九、UserRecs【用戶電影推薦矩陣】
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
recs | Array[(mid: Int, score: Double)] | 推薦給該用戶的電影集合 | 無 |
十、StreamRecs【用戶實時電影推薦矩陣】
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
recs | Array[(mid: Int, score: Double)] | 實時推薦給該用戶的電影集合 | 無 |
十一、GenresTopMovies【電影類別 TOP10】
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
genres | String | 電影類型 | 無 |
recs | Array[(mid: Int, score: Double)] | TOP10 電影 | 無 |
咱們的項目中用到了多種工具進行數據的存儲、計算、採集和傳輸,本章主要簡單介紹設計的工具環境搭建。若是機器的配置不足, 推薦只採用一臺虛擬機進行配置,而非徹底分佈式,將該虛擬機 CPU 的內存設置的儘量大,推薦爲 CPU > 四、MEM > 4GB。
注意:本章節沒有實操過!!!爲了保持項目的完整。
咱們的項目主體用 Scala 編寫,採用 IDEA 做爲開發環境進行項目編寫,採用 maven 做爲項目構建和管理工具。
打開 IDEA,建立一個 maven 項目,命名爲 MovieRecommendSystem。爲了方便後期的聯調,咱們會把業務系統的代碼也添加進來,因此咱們能夠以 MovieRecommendSystem 做爲父項目,並在其下建一個名爲 recommender 的子項目,而後再在下面搭建多個子項目用於提供不一樣的推薦服務。
在 MovieRecommendSystem 的 pom.xml 文件中加入元素<packaging>pom</packaging>
,而後新建一個 maven module 做爲子項目, 命名爲 recommender。一樣的,再以 recommender 爲父項目,在它的 pom.xml 中加入<packing>pom</packaging>
,而後新建一個 maven module 做爲子項目。咱們的第一步是初始化業務數據,因此子項目命名爲 DataLoader。
父項目只是爲了規範化項目結構,方便依賴管理,自己是不須要代碼實現的,因此 MovieRecommendSystem 和 recommender 下的 src 文件夾均可以刪掉。目前的總體項目框架以下:
咱們整個項目須要用到多個工具,它們的不一樣版本可能會對程序運行形成影響, 因此應該在最外層的 MovieRecommendSystem 中聲明全部子項目共用的版本信息。
在 pom.xml 中加入如下配置:
MovieRecommendSystem/pom.xml
<properties>
<log4j.version>1.2.17</log4j.version><!-- log4j 日誌系統 -->
<!-- SLF4J,即簡單日誌門面(Simple Logging Facade for Java),不是具體的日誌解決方案,它只服務於各類各樣的日誌系統。
按照官方的說法,SLF4J 是一個用於日誌系統的簡單Facade,容許最終用戶在部署其應用時使用其所但願的日誌系統。 -->
<slf4j.version>1.7.22</slf4j.version>
<mongodb-spark.version>2.0.0</mongodb-spark.version>
<casbah.version>3.1.1</casbah.version><!-- mongodb 在 scala 上的驅動器 -->
<elasticsearch-spark.version>5.6.2</elasticsearch-spark.version>
<elasticsearch.version>5.6.2</elasticsearch.version>
<redis.version>2.9.0</redis.version>
<kafka.version>0.10.2.1</kafka.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<jblas.version>1.2.1</jblas.version><!-- Java 中線性代數相關的庫 -->
</properties>
首先,對於整個項目而言,應該有一樣的日誌管理,咱們在 MovieRecommendSystem 中引入公有依賴:
MovieRecommendSystem/pom.xml
<dependencies>
<!-- 引入共同的日誌管理工具 -->
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/jcl-over-slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
一樣,對於 maven 項目的構建, 能夠引入公有的插件:
<build>
<!-- 聲明並引入子項目共有的插件 -->
<plugins>
<!-- 直接引入插件,全部子模塊都會引入 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<!-- 全部的編譯用 JDK1.8 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<!-- 只是聲明插件,並不引入,若是子模塊須要用的時候再引入便可 -->
<plugins>
<!-- maven 的打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 該插件用於將 scala 代碼編譯成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<!-- 綁定到 maven 的編譯階段 -->
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
而後,在 recommender 模塊中,咱們能夠爲全部的推薦模塊聲明 spark 相關依賴(這裏的 dependencyManagement 表示僅聲明相關信息,子項目若是依賴須要自行引入便可):
MovieRecommendSystem/recommender/pom.xml
<dependencyManagement>
<dependencies>
<!-- 引入 Spark 相關的 Jar 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
因爲各推薦模塊都是 scala 代碼,還應該引入 scala-maven-plugin 插件,用於 scala 程序的編譯。由於插件已經在父項目中聲明, 因此這裏不須要再聲明版本和具體配置:
<build>
<plugins>
<!-- 父項目已聲明該 plugin,子項目在引入的時候,不用聲明版本和已經聲明的配置 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
對於具體的 DataLoader 子項目,須要 spark 相關組件,還須要 mongodb、elasticsearch 的相關依賴,咱們在 pom.xml 文件中引入全部依賴(在父項目中已聲明的不須要再加詳細信息):
MovieRecommendSystem/recommender/DataLoader/pom.xml
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- 加入 ElasticSearch 的驅動 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>${elasticsearch-spark.version}</version>
<!-- 將不須要依賴的包從依賴路徑中除去 -->
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
至此,咱們作數據加載須要的依賴都已配置好,能夠開始寫代碼了。
在 src/main/目錄下,能夠看到已有的默認源文件目錄是 java,咱們能夠將其更名爲 scala。將數據文件 movies.csv,ratings.csv,tags.csv 複製到資源文件目錄 src/main/resources 下,咱們將從這裏讀取數據並加載到 mongodb 和 elastic search 中。
數據格式:
mid,name,descri,timelong,issue,shoot,language,genres,actors,directors
e.g.
1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English
^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John Lasseter
movie.csv 有 10 個字段,每一個字段之間經過 「^」 符號進行分割。
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
mid | Int | 電影的 ID | 無 |
name | String | 電影的名稱 | 無 |
descri | String | 電影的描述 | 無 |
timelong | String | 電影的時長 | 無 |
issue | String | 電影發佈時間 | 無 |
shoot | String | 電影拍攝時間 | 無 |
language | Array[String] | 電影的語言 | 每一項用豎槓分割 |
genres | Array[String] | 電影所屬類別 | 每一項用豎槓分割 |
actors | Array[String] | 電影的演員 | 每一項用豎槓分割 |
directors | Array[String] | 電影的導演 | 每一項用豎槓分割 |
數據格式:
uid,mid,score,timestamp
e.g.
1,31,2.5,1260759144
ratings.csv 有 4 個字段, 每一個字段之間經過 「,」 分割。
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
mid | Int | 電影的 ID | 無 |
score | Double | 電影的分值 | 無 |
timestamp | Long | 評分的時間 | 無 |
數據格式:
uid,mid,tag,timestamp
e.g.
1,31,action,1260759144
tags.csv 有 4 個字段, 每一個字段之間經過 「,」 分割。
字段名 | 字段類型 | 字段描述 | 字段備註 |
---|---|---|---|
uid | Int | 用戶的 ID | 無 |
mid | Int | 電影的 ID | 無 |
tag | String | 電影的標籤 | 無 |
timestamp | Long | 評分的時間 | 無 |
log4j 對日誌的管理,須要經過配置文件來生效。在 src/main/resources 下新建配置文件 log4j.properties,寫入如下內容:
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
#log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
#log4j.appender.syslog=com.c4c.dcos.commons.logger.appender.SyslogAppenderExt
#log4j.appender.syslog.SyslogHost= 192.168.22.237
#log4j.appender.syslog.Threshold=INFO
#log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
#log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%20t] %-130c:(line:%4L) : %m%n
#demo|FATAL|2014-Jul-03 14:34:34,194|main|com.c4c.logdemo.App:(line:15)|send a log
參看文章連接:http://www.javashuo.com/article/p-fvkzpduw-r.html
咱們會爲原始數據定義幾個樣例類,經過 SparkContext 的 textFile 方法從文件中讀取數據,並轉換成 DataFrame,再利用 Spark SQL 提供的 write 方法進行數據的分佈式插入。
在 DataLoader/src/main/scala 下新建 package,命名爲 com.atguigu.recommender,新建名爲 DataLoader 的 scala class 文件。
程序主體代碼以下:
DataLoader/src/main/scala/com.atguigu.recommerder/DataLoader.scala
package com.atguigu.recommender
import java.net.InetAddress
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
* Movie 數據集
*
* 260 電影ID,mid
* Star Wars: Episode IV - A New Hope (1977) 電影名稱,name
* Princess Leia is captured and held hostage 詳情描述,descri
* 121 minutes 時長,timelong
* September 21, 2004 發行時間,issue
* 1977 拍攝時間,shoot
* English 語言,language
* Action|Adventure|Sci-Fi 類型,genres
* Mark Hamill|Harrison Ford|Carrie Fisher 演員表,actors
* George Lucas 導演,directors
*/
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
/**
* Rating 數據集
*
* 1,31,2.5,1260759144
*/
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Long)
/**
* Tag 數據集
*
* 15,1955,dentist,1193435061
*/
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Long)
// 把 MongoDB 和 Elasticsearch 的配置封裝成樣例類
/**
* @param uri MongDB 的鏈接
* @param db MongDB 的 數據庫
*/
case class MongoConfig(uri: String, db: String)
/**
* @param httpHosts ES 的 http 主機列表,逗號分隔
* @param transportHosts ES 的 http 端口列表,逗號分隔
* @param index 須要操做的索引庫,即數據庫
* @param clusterName 集羣名稱:默認是 my-application
*/
case class ESConfig(httpHosts: String, transportHosts: String, index: String, clusterName: String)
object DataLoader {
// 定義常量
// 以 Window 下爲例,需替換成本身的路徑,linux 下爲 /YOUR_PATH/resources/movies.csv
val MOVIE_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\movies.csv"
val TATING_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
val TAG_DATA_PATH = "D:\\learn\\JetBrains\\workspace_idea\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\tags.csv"
// 定義 MongoDB 數據庫中的一些表名
val MONGODB_MOVIE_COLLECTION = "Movie"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_TAG_COLLECTION = "Tag"
// 定義 ES 中的一些索引(即數據庫)
val ES_MOVIE_INDEX = "Movie"
// 主程序的入口
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"es.httpHosts" -> "hadoop102:9200",
"es.transportHosts" -> "hadoop102:9300",
"es.index" -> "recommender",
"es.cluster.name" -> "my-application"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 加載數據,將 Movie、Rating、Tag 數據集加載進來
// 數據預處理
val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)
// 將 movieRDD 轉換爲 DataFrame
val movieDF = movieRDD.map(
item => {
val attr = item.split("\\^")
Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim, attr(9).trim)
}
).toDF()
val ratingRDD = spark.sparkContext.textFile(TATING_DATA_PATH)
// 將 ratingRDD 轉換爲 DataFrame
val ratingDF = ratingRDD.map(
item => {
val attr = item.split(",")
Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toLong)
}
).toDF()
val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)
// 將 tagRDD 裝換爲 DataFrame
val tagDF = tagRDD.map(
item => {
val attr = item.split(",")
Tag(attr(0).toInt, attr(1).toInt, attr(2).trim, attr(3).toLong)
}
).toDF()
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 將數據保存到 MongoDB 中
storeDataInMongDB(movieDF, ratingDF, tagDF)
import org.apache.spark.sql.functions._
// 數據預處理,把 movie 對應的 tag 信息添加進去,加一列,使用 「|」 分隔:tag1|tag2|...
/**
* mid,tags
* tags: tag1|tag2|tag3|...
*/
val newTag = tagDF.groupBy($"mid")
.agg(concat_ws("|", collect_set($"tag")).as("tags")) // groupby 爲對原 DataFrame 進行打包分組,agg 爲聚合(其操做包括 max、min、std、sum、count)
.select("mid", "tags")
// 將 movie 和 newTag 做 左外鏈接,把數據合在一塊兒
val movieWithTagsDF = movieDF.join(newTag, Seq("mid", "mid"), "left")
// 聲明一個隱式的配置對象
implicit val esConfig = ESConfig(config("es.httpHosts"), config("es.transportHosts"), config("es.index"), config("es.cluster.name"))
// 將數據保存到 ES 中
storeDataInES(movieWithTagsDF)
// 關閉 SparkSession
spark.stop()
}
}
接下來,實現 storeDataInMongo 方法,將數據寫入 mongodb 中:
def storeDataInMongDB(movieDF: DataFrame, ratingDF: DataFrame, tagDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
// 新建一個到 MongoDB 的鏈接
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
// 若是 MongoDB 中已有相應的數據庫,則先刪除
mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()
// 將 DF 數據寫入對應的 MongoDB 表中
movieDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
tagDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_TAG_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 對 MongoDB 中的數據表建索引
mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
// 關閉 MongoDB 的鏈接
mongoClient.close()
}
參看文章連接:https://www.cnblogs.com/chenmingjun/p/10817378.html#h23elasticsearchlinux
與上節相似,一樣主要經過 Spark SQL 提供的 write 方法進行數據的分佈式插入,實現 storeDataInES 方法:
def storeDataInES(movieWithTagsDF: DataFrame)(implicit esConfig: ESConfig): Unit = {
// 新建一個 es 的配置
val settings: Settings = Settings.builder().put("cluster.name", esConfig.clusterName).build()
// 新建一個 es 的客戶端
val esClient = new PreBuiltTransportClient(settings)
// 須要將 TransportHosts 添加到 esClient 中
val REGEX_HOST_PORT = "(.+):(\\d+)".r
esConfig.transportHosts.split(",").foreach {
case REGEX_HOST_PORT(host: String, port: String) => {
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
}
}
// 須要先清除掉 ES 中遺留的數據
if (esClient.admin().indices().exists(new IndicesExistsRequest(esConfig.index))
.actionGet()
.isExists
) {
esClient.admin().indices().delete(new DeleteIndexRequest(esConfig.index))
}
esClient.admin().indices().create(new CreateIndexRequest(esConfig.index))
// 將數據寫入到 ES 中
movieWithTagsDF.write
.option("es.nodes", esConfig.httpHosts)
.option("es.http.timeout", "100m")
.option("es.mapping.id", "mid") // 映射主鍵
.mode("overwrite")
.format("org.elasticsearch.spark.sql")
.save(esConfig.index + "/" + ES_MOVIE_INDEX)
}
網頁端查看:
在 Linux 中 crul 命令查看
離線推薦服務是綜合用戶全部的歷史數據,利用設定的離線統計算法和離線推薦算法週期性的進行結果統計與保存,計算的結果在必定時間週期內是固定不變的,變動的頻率取決於算法調度的頻率。
離線推薦服務主要計算一些能夠預先進行統計和計算的指標,爲實時計算和前端業務相應提供數據支撐。
離線推薦服務主要分爲統計性算法、基於 ALS 的協同過濾推薦算法以及基於 ElasticSearch 的內容推薦算法。
在 recommender 下新建子項目 StatisticsRecommender,pom.xml 文件中只需引入 spark、scala 和 mongodb 的相關依賴:
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
在 resources 文件夾下引入 log4j.properties,而後在 src/main/scala 下新建 scala 單例對象 com.atguigu.statistics.StatisticsRecommender。
一樣,咱們應該先建好樣例類,在 main() 方法中定義配置、建立 SparkSession 並加載數據,最後關閉 spark。代碼以下:
src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala
package com.atguigu.statistics
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Long)
// 把 MongoDB 和 Elasticsearch 的配置封裝成樣例類
/**
* @param uri MongDB 的鏈接
* @param db MongDB 的 數據庫
*/
case class MongoConfig(uri: String, db: String)
case class Recommendation(mid: Int, score: Double)
// 定義電影類別 top10 推薦對象(每種類型的電影集合中評分最高的 10 個電影)
case class GenresRecommendation(genres: String, recs: Seq[Recommendation])
object StatisticsRecommender {
// 定義 MongoDB 數據庫中的一些表名
val MONGODB_MOVIE_COLLECTION = "Movie"
val MONGODB_RATING_COLLECTION = "Rating"
val RATE_MORE_MOVIES = "RateMoreMovies" // 電影評分個數統計表
val RATE_MORE_RECENTLY_MOVIES = "RateMoreRecentlyMovies" // 最近電影評分個數統計表
val AVERAGE_MOVIES_Score = "AverageMoviesScore" // 電影平均評分表
val GENRES_TOP_MOVIES = "GenresTopMovies" // 電影類別 TOP10
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 從 MongoDB 中加載數據
val movieDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie] // DataSet
.toDF()
// 從 MongoDB 中加載數據
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating] // DataSet
.toDF()
// 建立臨時表,名爲 ratings
ratingDF.createOrReplaceTempView("ratings")
// TODO:不一樣的統計推薦結果
// ......
// 關閉 SparkSession
spark.stop()
}
def storeDFInMongDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
}
根據全部歷史評分數據,計算曆史評分次數最多的電影。
實現思路:經過 Spark SQL 讀取評分數據集,統計全部評分中評分個數最多的電影,而後按照從大到小排序,將最終結果寫入 MongoDB 的 RateMoreMovies【電影評分個數統計表】數據集中。
// 一、歷史熱門電影統計:根據全部歷史評分數據,計算曆史評分次數最多的電影。mid,count
val rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings group by mid")
// 把結果寫入對應的 MongoDB 表中
storeDFInMongDB(rateMoreMoviesDF, RATE_MORE_MOVIES)
根據評分次數,按月爲單位計算最近時間的月份裏面評分次數最多的電影集合。
實現思路:經過 Spark SQL 讀取評分數據集,經過 UDF 函數將評分的數據時間修改成月,而後統計每個月電影的評分數。統計完成以後將數據寫入到 MongoDB 的 RateMoreRecentlyMovies【最近電影評分個數統計表】數據集中。
// 二、最近熱門電影統計:根據評分次數,按月爲單位計算最近時間的月份裏面評分次數最多的電影集合。mid,count,yearmonth
// 建立一個日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
// 註冊一個 UDF 函數,用於將 timestamp 轉換成年月格式 1260759144000 => 201605
spark.udf.register("changeDate", (x: Long) => {
simpleDateFormat.format(new Date(x * 1000)).toInt
})
// 對原始數據 ratings 作預處理,去掉 uid,保存成臨時表,名爲 ratingOfMonth
val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth from ratings")
ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
// 根據評分次數,按月爲單位計算最近時間的月份裏面評分次數最多的電影集合
val rateMoreRecentlyMoviesDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")
// 把結果寫入對應的 MongoDB 表中
storeDFInMongDB(rateMoreRecentlyMoviesDF, RATE_MORE_RECENTLY_MOVIES)
根據歷史數據中全部用戶對電影的評分,週期性的計算每一個電影的平均得分。
實現思路:經過 Spark SQL 讀取保存在 MongDB 中的 Rating 數據集,經過執行如下 SQL 語句實現對於電影的平均分統計:
// 三、電影平均得分統計:根據歷史數據中全部用戶對電影的評分,週期性的計算每一個電影的平均得分。mid,avg
val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")
// 把結果寫入對應的 MongoDB 表中
storeDFInMongDB(averageMoviesDF, AVERAGE_MOVIES_Score)
統計完成以後將生成的新的 DataFrame 寫出到 MongoDB 的 AverageMoviesScore【電影平均評分表】集合中。
根據提供的全部電影類別,分別計算每種類型的電影集合中評分最高的 10 個電影。
實現思路:在計算完整個電影的平均得分以後,將影片集合與電影類型作笛卡爾積,而後過濾掉電影類型不符合的條目,將 DataFrame 輸出到 MongoDB 的 GenresTopMovies【電影類別 TOP10】集合中。
// 四、每一個類別優質電影統計:根據提供的全部電影類別,分別計算每種類型的電影集合中評分最高的 10 個電影。
// 定義全部的類別
val genres = List("Action", "Adventure", "Animation", "Comedy", "Crime", "Documentary", "Drama", "Famil y", "Fantasy",
"Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western")
// 把電影的平均評分加入到 movie 表中,使用 inner join,不知足條件的不顯示
val movieWithScore = movieDF.join(averageMoviesDF, Seq("mid", "mid"))
// 爲作笛卡爾積,咱們須要把 genres 轉成 RDD
val genresRDD = spark.sparkContext.makeRDD(genres)
// 計算類別 top10,首先對類別和電影作笛卡爾積,而後進行過濾
val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd)
.filter {
// 條件過濾:找出 movieRow 中的字段 genres 值包含當前類別 genres 的那些
case (genres, movieRow) => movieRow.getAs[String]("genres").toLowerCase().contains(genres.toLowerCase())
}
.map { // 將整個數據集的數據量減少,生成 RDD[String, Iter[mid, avg]]
case (genres, movieRow) => (genres, (movieRow.getAs[Int]("mid"), movieRow.getAs[Double]("avg")))
}
.groupByKey()
.map {
case (genres, items) => GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).take(10).map(item =>
Recommendation(item._1, item._2)
))
}
.toDF()
// 把結果寫入對應的 MongoDB 表中
storeDFInMongDB(genresTopMoviesDF, GENRES_TOP_MOVIES)
項目採用 ALS 做爲協同過濾算法, 分別根據 MongoDB 中的用戶評分表和電影數據集計算用戶電影推薦矩陣以及電影類似度矩陣。
經過 ALS 訓練出來的 Model 來計算全部當前用戶電影的推薦矩陣,主要思路以下:
一、uid 和 mid 作笛卡爾積,產生 (uid,mid) 的元組。
二、經過模型預測 (uid,mid) 的元組。
三、將預測結果經過預測分值進行排序。
四、返回分值最大的 K 個電影,做爲當前用戶的推薦。
最後生成的數據結構以下:將數據保存到 MongoDB 的 UserRecs【用戶電影推薦矩陣】表中。
<dependencies>
<!-- Java 中線性代數相關的庫 -->
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
一樣通過前期的構建樣例類、聲明配置、建立 SparkSession 等步驟,能夠加載數據開始計算模型了。
核心代碼以下:
src/main/scala/com.atguigu.offline/OfflineRecommender.scala
package com.atguigu.offline
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 基於評分數據的 LFM,只須要 rating 數據(用戶評分表)注意:spark mllib 中有 Rating 類,爲了便於區別,咱們從新命名爲 MovieRating
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Long)
case class MongoConfig(uri: String, db: String)
// 標準推薦對象
case class Recommendation(mid: Int, score: Double)
// 用戶推薦列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])
// 電影類似度(電影推薦)
case class MovieRecs(mid: Int, recs: Seq[Recommendation])
object OfflineRecommender {
// 定義 MongoDB 數據庫中的一些表名
val MONGODB_RATING_COLLECTION = "Rating"
// 推薦表的名稱
val USER_RECS = "UserRecs"
val MOVIE_RECS = "MovieRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 從 MongoDB 中加載數據
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating] // DataSet
.rdd
.map(rating => (rating.uid, rating.mid, rating.score)) // 轉換成 RDD,而且去掉時間戳
.cache()
// 從 ratingRDD 數據中提取全部的 uid 和 mid ,並去重
val userRDD = ratingRDD.map(_._1).distinct()
val movieRDD = ratingRDD.map(_._2).distinct()
// 訓練隱語義模型
val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
val (rank, iterations, lambda) = (50, 5, 0.01)
val model = ALS.train(trainData, rank, iterations, lambda)
// 基於用戶和電影的隱特徵,計算預測評分,獲得用戶推薦列表
// user 和 movie 作笛卡爾積,獲得一個空評分矩陣,即產生 (uid,mid) 的元組
val userMovies = userRDD.cartesian(movieRDD)
// 調用 model 的 predict 方法進行預測評分
val preRatings = model.predict(userMovies)
val userRecs = preRatings
.filter(_.rating > 0) // 過濾出評分大於零的項
.map(rating => (rating.user, (rating.product, rating.rating)))
.groupByKey()
.map {
case (uid, recs) => UserRecs(uid, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
}
.toDF()
// 把結果寫入對應的 MongoDB 表中
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// TODO:計算電影類似度矩陣
spark.stop()
}
核心代碼以下:
// 基於電影的隱特徵,計算類似度矩陣,獲得電影的類似度列表
val movieFeatures = model.productFeatures.map {
case (mid, features) => (mid, new DoubleMatrix(features))
}
// 對全部電影兩兩計算它們的類似度,先作笛卡爾積
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter {
// 把本身跟本身的配對過濾掉
case (a, b) => a._1 != b._1
}
.map {
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
}
.filter(_._2._2 > 0.6) // 過濾出類似度大於 0.6 的
.groupByKey()
.map {
case (mid, recs) => MovieRecs(mid, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
}
.toDF()
// 把結果寫入對應的 MongoDB 表中
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 求兩個向量的餘弦類似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
movie1.dot(movie2) / (movie1.norm2() * movie2.norm2()) // l1範數:向量元素絕對值之和;l2範數:即向量的模長(向量的長度),向量元素的平方和再開方
}
在 scala/com.atguigu.offline/ 下新建單例對象 ALSTrainer,代碼主體架構以下:
package com.atguigu.offline
import breeze.numerics.sqrt
import com.atguigu.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ALSTrainer {
def main(args: Array[String]): Unit = {
// 定義用到的配置參數
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 從 MongoDB 中加載數據
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating] // DataSet
.rdd
.map(rating => Rating(rating.uid, rating.mid, rating.score)) // 轉換成 RDD,而且去掉時間戳
.cache()
// 將一個 RDD 隨機切分紅兩個 RDD,用以劃分訓練集和測試集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
// 模型參數選擇,輸出最優參數
adjustALSParam(trainingRDD, testingRDD)
spark.close()
}
其中 adjustALSParams 方法是模型評估的核心,輸入一組訓練數據和測試數據,輸出計算獲得最小 RMSE 的那組參數。代碼實現以下:
def adjustALSParam(trainData: RDD[Rating], testData: RDD[Rating]): Unit = {
// 這裏指定迭代次數爲 5,rank 和 lambda 在幾個值中選取調整
val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(1, 0.1, 0.01, 0.001))
yield {
val model = ALS.train(trainData, rank, 5, lambda)
val rmse = getRMSE(model, testData)
(rank, lambda, rmse)
}
// 控制檯打印輸出
// println(result.sortBy(_._3).head)
println(result.minBy(_._3))
}
計算 RMSE 的函數 getRMSE 代碼實現以下:
def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
// 計算預測評分
val userProducts = data.map(item => (item.user, item.product))
val predictRating = model.predict(userProducts)
// 以 uid,mid 做爲外鍵,將 實際觀測值 和 預測值 使用內鏈接
val observed = data.map(item => ((item.user, item.product), item.rating))
val predicted = predictRating.map(item => ((item.user, item.product), item.rating))
// 內鏈接,獲得 (uid, mid), (observe, predict)
// 計算 RMSE
sqrt(
observed.join(predicted).map {
case ((uid, mid), (observe, predict)) =>
val err = observe - predict
err * err
}.mean()
)
}
運行代碼,咱們就能夠獲得目前數據的最優模型參數。
實時計算與離線計算應用於推薦系統上最大的不一樣在於實時計算推薦結果應該反映最近一段時間用戶近期的偏好,而離線計算推薦結果則是根據用戶從第一次評分起的全部評分記錄來計算用戶整體的偏好。
用戶對物品的偏好隨着時間的推移老是會改變的。好比一個用戶 u 在某時刻對電影 p 給予了極高的評分,那麼在近期一段時候,u 極有可能很喜歡與電影 p 相似的其餘電影;而若是用戶 u 在某時刻對電影 q 給予了極低的評分,那麼在近期一段時候,u 極有可能不喜歡與電影 q 相似的其餘電影。因此對於實時推薦,當用戶對一個電影進行了評價後,用戶會但願推薦結果基於最近這幾回評分進行必定的更新,使得推薦結果匹配用戶近期的偏好,知足用戶近期的口味。
若是實時推薦繼續採用離線推薦中的 ALS 算法,因爲算法運行時間巨大,不具備實時獲得新的推薦結果的能力;而且因爲算法自己的使用的是評分表,用戶本次評分後只更新了總評分表中的一項,使得算法運行後的推薦結果與用戶本次評分以前的推薦結果基本沒有多少差異,從而給用戶一種推薦結果一直沒變化的感受,很影響用戶體驗。
另外,在實時推薦中因爲時間性能上要知足實時或者準實時的要求,因此算法的計算量不能太大,避免複雜、過多的計算形成用戶體驗的降低。鑑於此,推薦精度每每不會很高。實時推薦系統更關心推薦結果的動態變化能力, 只要更新推薦結果的理由合理便可,至於推薦的精度要求則能夠適當放寬。
因此對於實時推薦算法,主要有兩點需求:
一、用戶本次評分後、或最近幾個評分後系統能夠明顯的更新推薦結果。
二、計算量不大,知足響應時間上的實時或者準實時要求。
咱們在 recommender 下新建子項目 StreamingRecommender,引入 spark、scala、mongo、redis 和 kafka 的依賴:
<dependencies>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驅動 -->
<!-- 用於代碼方式鏈接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用於 Spark 和 MongoDB 的對接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
代碼中首先定義樣例類和一個鏈接助手對象(用於創建 redis 和 mongo 鏈接),並在 StreamingRecommender 中定義一些常量:
src/main/scala/com.atguigu.streaming/StreamingRecommender.scala
package com.atguigu.streaming
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
// 定義鏈接助手對象並序列化
object ConnHelper extends Serializable {
lazy val jedis = new Jedis("hadoop102")
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
}
case class MongoConfig(uri: String, db: String)
// 定義一個基準推薦對象
case class Recommendation(mid: Int, score: Double)
// 定義基於預測評分的用戶推薦列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])
// 定義基於LFM電影特徵向量的電影類似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])
object StreamingRecommender {
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_MOVIES_NUM = 20
def main(args: Array[String]): Unit = {
}
}
實時推薦主體代碼以下:
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender").set("spark.ui.port", "44040" )
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 獲取 Streaming Context
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(2)) // 微批次處理時間間隔
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加載數據:電影類似度矩陣數據,轉換成爲 Map[Int, Map[Int, Double]],把它廣播出去
val simMovieMatrix = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRecs]
.rdd
.map { movieRecs => // 爲了查詢類似度方便,轉換成 KV
(movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap)
}.collectAsMap()
val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
// 定義 Kafka 的鏈接參數
val kafkaParam = Map(
"bootstrap.servers" -> "hadoop102:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest" // 偏移量的初始設置
)
// 經過 Kafka 建立一個 DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam)
)
// 把原始數據 uid|mid|score|timestamp 轉換成評分流
val ratingStream = kafkaStream.map {
msg =>
val attr = msg.value().split("\\|")
(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toLong)
}
// 繼續作流式處理,實時算法部分
ratingStream.foreachRDD {
rdds =>
rdds.foreach {
case (uid, mid, score, timestamp) => {
println("rating data coming! >>>>>>>>>>>>>>>>>>>>")
// 一、從 redis 中獲取當前用戶最近的 K 次評分,保存成 Array[(mid, score)]
val userRecentlyRatings = getUserRecentlyRatings(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)
// 二、從電影類似度矩陣中取出與當前電影最類似的 K 個電影,做爲備選電影列表,Array[mid]
val candidateMovies = getTopsSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value)
// 三、對每一個備選電影,計算推薦優先級,獲得當前用戶的實時推薦列表,Array[(mid, score)]
val streamRecs = computeMovieScores(candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value)
// 四、把當前用戶的實時推薦數據保存到 MongoDB 中
storeDataInMongDB(uid, streamRecs)
}
}
}
// 開始接收和處理數據
ssc.start()
println(">>>>>>>>>>>>>>>>>>>> streaming started!")
ssc.awaitTermination()
}
實時推薦算法的前提:
一、在 Redis 集羣中存儲了每個用戶最近對電影的 K 次評分。實時算法能夠快速獲取。
二、離線推薦算法已經將電影類似度矩陣提早計算到了 MongoDB 中。
三、Kafka 已經獲取到了用戶實時的評分數據。算法過程以下:
實時推薦算法輸入爲一個評分<uid, mid, rate, timestamp>
,而執行的核心內容包括:獲取 uid 最近 K 次評分、獲取 mid 最類似 K 個電影、計算候選電影的推薦優先級、更新對 uid 的實時推薦結果。
業務服務器在接收用戶評分的時候,默認會將該評分狀況以 uid, mid, rate, timestamp 的格式插入到 Redis 中該用戶對應的隊列當中,在實時算法中,只須要經過 Redis 客戶端獲取相對應的隊列內容便可。
// 由於 redis 操做返回的是 java 類,爲了使用 map 操做須要引入轉換類
import scala.collection.JavaConversions._
/**
* 獲取當前最近的 K 次電影評分
*
* @param num 評分的個數
* @param uid 誰的評分
* @return
*/
def getUserRecentlyRatings(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
// 從 redis 中讀取數據,用戶評分數據保存在 uid:UID 爲 key 的隊列中,裏面的 value 是 MID:SCORE
jedis.lrange("uid:" + uid.toString, 0, num) // 從用戶的隊列中取出 num 個評分
.map {
item => // 具體的每個評分是以冒號分割的兩個值
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}
.toArray
}
在離線算法中,已經預先將電影的類似度矩陣進行了計算,因此每一個電影 mid 的最類似的 K 個電影很容易獲取:從 MongoDB 中讀取 MovieRecs 數據, 從 mid 在 simHash 對應的子哈希表中獲取類似度前 K 大的那些電影。輸出是數據類型爲 Array[Int] 的數組, 表示與 mid 最類似的電影集合, 並命名爲 candidateMovies 以做爲候選電影集合。
/**
* 獲取與當前電影 K 個類似的電影,做爲備選電影
*
* @param num 類似電影的數量
* @param mid 當前電影的 ID
* @param uid 當前的評分用戶 ID
* @param simMovies 電影類似度矩陣的廣播變量值
* @param mongoConfig MongoDB 的配置
* @return 過濾以後的備選電影列表
*/
def getTopsSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
(implicit mongoConfig: MongoConfig): Array[Int] = {
// 一、從類似度矩陣中拿到全部類似的電影
val allSimMovies = simMovies(mid).toArray
// 二、從 MongnDB 中查詢用戶已經看過的電影
val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
.find(MongoDBObject("uid" -> uid))
.toArray
.map {
item => item.get("mid").toString.toInt
}
// 三、把看過的過濾掉,獲得備選電影的輸出列表
allSimMovies.filter(x => !ratingExist.contains(x._1))
.sortWith(_._2 > _._2)
.take(num)
.map(x => x._1)
}
對於候選電影集合 simiHash 和 uid 的最近 K 個評分 recentRatings, 算法代碼內容以下:
/**
* 計算待選電影的推薦分數
*
* @param candidateMovies 與當前電影最類似的 K 個電影(待選電影)
* @param userRecentlyRatings 用戶最近的 K 次評分
* @param simMovies 電影類似度矩陣的廣播變量值
* @return
*/
def computeMovieScores(candidateMovies: Array[Int], userRecentlyRatings: Array[(Int, Double)],
simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {
// 定義一個 ArrayBuffer,用於保存每個備選電影的基礎得分
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
// 定義一個 HashMap,保存每個備選電影的加強減弱因子
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
for (candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings) {
// 獲取備選電影和最近評分電影的類似度的得分
val simScore = getMoviesSimScore(candidateMovie, userRecentlyRating._1, simMovies)
if (simScore > 0.7) {
// 計算候選電影的基礎推薦得分
scores += ((candidateMovie, simScore * userRecentlyRating._2))
if (userRecentlyRating._2 > 3) {
increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
} else {
decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
}
}
}
// 根據備選電影的 mid 作 groupBy,根據公式求最後的推薦得分,並排序
scores.groupBy(_._1).map {
// groupBy 以後獲得的數據是 Map(mid -> ArrayBuffer[(mid, score)])
case (mid, scoreList) =>
(mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)))
}.toArray.sortWith(_._2 > _._2)
}
其中,getMovieSimScore 是取候選電影和已評分電影的類似度,代碼以下:
/**
* 獲取備選電影和最近評分電影的類似度的得分
*
* @param mid1 備選電影
* @param mid2 最近評分電影
* @param simMovies 電影類似度矩陣的廣播變量值
* @return
*/
def getMoviesSimScore(mid1: Int, mid2: Int,
simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
simMovies.get(mid1) match {
case Some(sims) => sims.get(mid2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
而 log 是對數運算, 這裏實現爲取 10 的對數(經常使用對數):
/**
* 求一個數以10爲底數的對數(使用換底公式)
*
* @param m
* @return
*/
def log(m: Int): Double = {
val N = 10
math.log(m) / math.log(N) // 底數爲 e => ln m / ln N = log m N = lg m
}
saveRecsToMongoDB 函數實現告終果的保存:
/**
* 把結果寫入對應的 MongoDB 表中
*
* @param uid
* @param streamRecs 流式的推薦結果
* @param mongoConfig MongoDB 的配置
*/
def storeDataInMongDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
// 定義到 MongoDB 中 StreamRecs 表的鏈接
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
// 若是表中已有 uid 對應的數據,則刪除
streamRecsCollection.findAndRemove(MongoDBObject("uid" -> uid))
// 將新的 streamRecs 存入表 StreamRecs 中
streamRecsCollection.insert(MongoDBObject("uid" -> uid, "recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))
}
當計算出候選電影的推薦優先級的數組 updatedRecommends<mid, E>
後,這個數組將被髮送到 Web 後臺服務器,與後臺服務器上 uid 的上次實時推薦結果 recentRecommends<mid, E>
進行合併、替換並選出優先級 E 前 K 大的電影做爲本次新的實時推薦。具體而言:
a、合併:將 updatedRecommends 與 recentRecommends 並集合成爲一個新的 <mid, E>
數組;
b、替換(去重):當 updatedRecommends 與 recentRecommends 有重複的電影 mid 時,recentRecommends 中 mid 的推薦優先級因爲是上次實時推薦的結果,因而將做廢,被替換成表明了更新後的 updatedRecommends 的 mid 的推薦優先級;
c、選取 TopK:在合併、替換後的 <mid, E>
數組上,根據每一個 movie 的推薦優先級,選擇出前 K 大的電影,做爲本次實時推薦的最終結果。
咱們的系統實時推薦的數據流向是:業務系統(評分數據) -> 日誌 -> flume 日誌採集 -> kafka streaming 數據清洗和預處理 -> spark streaming 流式計算。在咱們完成實時推薦服務的代碼後,應該與其它工具進行聯調測試,確保系統正常運行。
啓動實時推薦系統 StreamingRecommender 以及 MongoDB、Redis。
一、運行 StreamingRecommender.scala 代碼
二、啓動 MongoDB
C:\Windows\system32>mongod
三、啓動 Redis,並進行連通測試
[atguigu@hadoop102 bin]$ pwd
/usr/local/bin
[atguigu@hadoop102 bin]$ ./redis-server /opt/module/redis-3.0.4/myredis/redis.conf
[atguigu@hadoop102 bin]$ ./redis-cli -p 6379
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
在 recommender 下新建模塊 KafkaStreaming,主要用來作日誌數據的預處理,過濾出須要的內容。pom.xml 文件須要引入依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>recommender</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>KafkaStreaming</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastream</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.kafkastream.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
在 src/main/java 下新建 java 類 com.atguigu.kafkastreaming.Application
package com.atguigu.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
public class Application {
public static void main(String[] args) {
String brokers = "hadoop102:9092";
String zookeepers = "hadoop102:2181";
// 輸入和輸出的 topic
String from = "log";
String to = "recommender";
// 定義 kafka streaming 的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
// 建立 kafka streaming 配置對象
StreamsConfig config = new StreamsConfig(settings);
// 建立一個拓撲建構器
TopologyBuilder builder = new TopologyBuilder();
// 定義流處理的拓撲結構
builder.addSource("SOURCE", from)
.addProcessor("PROCESSOR", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESSOR");
// 建立 KafkaStreams 對象
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
System.out.println("kafka stream started! >>>>>>>>>>>>>>>>>>>>");
}
}
這個程序會將 topic 爲 「log」 的信息流獲取來作處理,並以 「recommender」 爲新的 topic 轉發出去。
流處理程序 LogProcess.java
package com.atguigu.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(byte[] dummy, byte[] line) { // dummy 表示 啞變量,沒什麼用
// 把收集到的日誌信息用 String 表示
String input = new String(line);
// 根據前綴 MOVIE_RATING_PREFIX: 從日誌信息中提取評分數據
if (input.contains("MOVIE_RATING_PREFIX:")) {
System.out.println("movie rating data coming! >>>>>>>>>>>>>>>>>>>>" + input);
input = input.split("MOVIE_RATING_PREFIX:")[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long l) {
}
@Override
public void close() {
}
}
完成代碼後,啓動 Application。
在 flume 的 conf 目錄下新建 flume-log-kafka.conf,對 flume 鏈接 kafka 作配置:
flume-log-kafka.conf
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined.
agent.sources.exectail.type = exec
# 下面這個路徑是須要收集日誌的絕對路徑,改成本身的日誌目錄
agent.sources.exectail.command = tail –f /opt/module/flume/log/agent.log
agent.sources.exectail.interceptors = i1
agent.sources.exectail.interceptors.i1.type = regex_filter
# 定義日誌過濾前綴的正則
agent.sources.exectail.interceptors.i1.regex = .+MOVIE_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined.
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
# Specify the channel the sink should use.
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel.
agent.channels.memoryChannel.capacity = 10000
配置好後,啓動 flume:
[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ --name a1 --conf-file job/flume-log-kafka.conf \
-Dflume.root.logger=INFO,console
將業務代碼加入系統中。注意在 src/main/resources/ 下的 log4j.properties 中,log4j.appender.file.File 的值應該替換爲本身的日誌目錄,與 flume 中的配置應該相同。
啓動業務系統後臺,訪問 localhost:8088/index.html;點擊某個電影進行評分, 查看實時推薦列表是否會發生變化。
整個推薦系統更多的是依賴於用於的偏好信息進行電影的推薦,那麼就會存在一個問題,對於新註冊的用戶是沒有任何偏好信息記錄的,那這個時候推薦就會出現問題,致使沒有任何推薦的項目出現。
處理這個問題通常是經過當用戶首次登錄時,爲用戶提供交互式的窗口來獲取用戶對於物品的偏好。
在本項目中,當用戶第一次登錄的時候,系統會詢問用戶對於影片類別的偏好。以下:
![]()
當獲取用戶的偏好以後,對應於須要經過用戶偏好信息獲取的推薦結果,則更改成經過對影片的類型的偏好的推薦。
原始數據中的 tag 文件,是用戶給電影打上的標籤,這部份內容想要直接轉成評分並不容易,不過咱們能夠將標籤內容進行提取,獲得電影的內容
特徵向量
,進而能夠經過求取類似度矩陣。這部分能夠與實時推薦系統直接對接,計算出與用戶當前評分電影的類似電影,實現基於內容的實時推薦。爲了不熱門標籤對特徵提取的影響,咱們還能夠經過TF-IDF 算法對標籤的權重進行調整
,從而儘量地接近用戶偏好。
基於以上思想,加入 TF-IDF 算法的求取電影特徵向量的核心代碼以下:
package com.atguigu.content
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 須要的數據源是電影內容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class MongoConfig(uri: String, db: String)
// 定義一個基準推薦對象
case class Recommendation(mid: Int, score: Double)
// 定義電影內容信息提取出的特徵向量的電影類似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])
object ContentRecommender {
// 定義表名和常量
val MONGODB_MOVIE_COLLECTION = "Movie"
val CONTENT_MOVIE_RECS = "ContentMovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 建立一個 SparkConf 對象
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 建立一個 SparkSession 對象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 聲明一個隱式的配置對象
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import spark.implicits._
// 加載數據,並做預處理
val movieTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map { // 提取 mid,name,genres 三項做爲原始的內容特徵,分詞器默認分隔符是空格
x => (x.mid, x.name, x.genres.map(c => if (c == '|') ' ' else c))
}
.toDF("mid", "name", "genres")
.cache()
// TODO:從內容信息中提取電影特徵的特徵向量
// 建立一個分詞器,默認按照空格分詞
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 用分詞器對原始數據進行轉換,生成新的一列words
val wordsData = tokenizer.transform(movieTagsDF)
// 引入 HashingTF 工具,該工具能夠將詞語序列轉換成對應的詞頻
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
val featurizeData = hashingTF.transform(wordsData)
// 測試
// wordsData.show()
// featurizeData.show()
// featurizeData.show(truncate = false) // 不壓縮顯示
// 引入 IDF 工具,該工具能夠獲得 IDF 模型
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 訓練 IDF 模型,獲得每一個詞的逆文檔頻率
val idfModel = idf.fit(featurizeData)
// 用 IDF 模型對原數據進行處理,獲得文檔中每一個詞的 TF-IDF,做爲新的特徵向量
val rescaleData = idfModel.transform(featurizeData)
// 測試
// rescaleData.show(truncate = false) // 不壓縮顯示
val movieFeatures = rescaleData.map(
row => (row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray)
).rdd.map(
x => (x._1, new DoubleMatrix(x._2))
)
// 測試
// movieFeatures.collect().foreach(println)
// 對全部電影兩兩計算它們的類似度,先作笛卡爾積
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter {
// 把本身跟本身的配對過濾掉
case (a, b) => a._1 != b._1
}
.map {
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
}
.filter(_._2._2 > 0.6) // 過濾出類似度大於 0.6 的
.groupByKey()
.map {
case (mid, recs) => MovieRecs(mid, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
}
.toDF()
// 把結果寫入對應的 MongoDB 表中
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 求兩個向量的餘弦類似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
movie1.dot(movie2) / (movie1.norm2() * movie2.norm2()) // l1範數:向量元素絕對值之和;l2範數:即向量的模長(向量的長度),向量元素的平方和再開方
}
}
而後經過電影特徵向量進而求出類似度矩陣,就能夠爲實時推薦提供基礎,獲得用戶推薦列表了。能夠看出,基於內容和基於隱語義模型,目的都是爲了提取出物品的特徵向量,從而能夠計算出類似度矩陣。而咱們的實時推薦系統算法正是基於類似度來定義的。
注意:本章節沒有實操過!!!爲了保持項目的完整。