實時計算可以及時捕獲用戶短時興趣,同時可以快速反饋分發當前系統的用戶興趣內容。大量實踐以及發表的文章都顯示了推薦系統實時化,對推薦精準度的提高的有效性和必要性。php
實時推薦相關工做很是多,騰訊和北大合做的兩篇SIGMOD文章是比較實際和詳細的實現,採用的計算框架可以支持大規模數據的實時推薦,如下將會分開簡述如下兩篇文章。python
Huang發表了基於Storm和KV存儲的大規模實時推薦系統 (TencentRec: Real-time Stream Recommendation in Practice)git
騰訊採用使用storm緣由,支持實時數據流式計算,良好的可擴展性、可容錯性,採用簡單編程模型。
文章核心包括實時增量計算的ItemCF,以及用戶隱式反饋計算、實時剪枝算法、基於用戶畫像的數據稀疏性策略。應用在多個業務上都有不一樣程度的提高,最明顯的是騰訊視頻的全局表現提高高達30%。redis
全文核心應該是下圖六道公式,闡述騰訊如何具體實現的增量itemcf。算法
文章中的co-rating,其實就是咱們常說的user bias. 公式3和4解決了用戶隱式反饋問題,細節的計算能夠參考2016的文章,實際是一個log函數融合了用戶的瀏覽、點擊、分享、購買等行爲,轉化成rating.apache
請注意公式4,因爲他們定義了corating,實際是將類似度的增量計算從L2範數的計算轉化成了L1範數計算.(當Rup取x的時候,y=1/x)。編程
可擴展的增量計算數據結構
騰訊視頻的推薦應用(Real-time Video Recommendation Exploration)架構
在咱們看來,全文核心在於實時計算的數據流轉,以下圖所示:app
基於storm的實時計算topology圖:
糖豆總體推薦框架,從離線,近線,在線三套計算流程組合而成。在線流程基於Spark Streaming框架實現,部署在近線集羣。 在線推薦框架實時根據用戶行爲,生成實時推薦列表,從而知足用戶瞬時興趣,提升推薦系統的推薦新鮮度。簡單架構圖以下:
實時計算流程以下圖所示:
分解步驟:
python實現:
if __name__ == "__main__": print sys.argv reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName="real_time_etl") #20秒 ssc = StreamingContext(sc, 15) brokers = "kafka-servers:9092" topic = "logstash" #讀取kafka kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) #解析日誌、過濾無關數據、讀取類似視頻 lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x)) #lines.pprint() #寫入推薦結果 lines.foreachRDD(lambda rdd: list2Redis(rdd)) ssc.start() ssc.awaitTermination()
部署在集羣Master節點的監控腳本會每30s掃描一次實時計算代碼進程,若是發現進程被failed,會自動拉起實時計算Spark Steaming進程。若是進程拉起失敗會觸發郵件、短信報警
#! /bin/sh MOBILE="your phone numbers" RT_HOME=/home/realtime/recommend.py DIR=/data/rtdamon PID_FILE=$DIR/.run/rt-litetl-damon.pid LOG_FILE=$DIR/.log/rt-litetl-damon.log t=$(date -d "today" +"%Y-%m-%d %H:%M:%S") source /etc/profile echo $PID_FILE $LOG_FILE if [ -e "$PID_FILE" ]; then pid=`cat $PID_FILE` echo $pid damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v \<defunct\> ` echo "damon process exists : $process_exists" if [ -n "$damon_process_exists" ] then echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE exit fi fi pid=$$ echo "$pid" > $PID_FILE while : do process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l` echo "process exists : $process_exists" >>$LOG_FILE if [ "$process_exists" == "0" ]; then /hadoop/spark/bin/spark-submit --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log 2>&1 & /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py" echo "realtime recommendation process already restarted at $t" >> $LOG_FILE fi #sleep `expr 3600 \* 3` sleep `expr 60 \* 1` done
根據咱們的AB測試數據來看,總體CTR提高25%。用推薦系統的A版對比無推薦的B版,用戶觀看時長提高47%。