糖豆實時推薦系統設計與實現

1.實時推薦系統與相關工做

1.1 緣由

實時計算可以及時捕獲用戶短時興趣,同時可以快速反饋分發當前系統的用戶興趣內容。大量實踐以及發表的文章都顯示了推薦系統實時化,對推薦精準度的提高的有效性和必要性。php

1.2 騰訊架構與實現

實時推薦相關工做很是多,騰訊和北大合做的兩篇SIGMOD文章是比較實際和詳細的實現,採用的計算框架可以支持大規模數據的實時推薦,如下將會分開簡述如下兩篇文章。python

2015年

Huang發表了基於Storm和KV存儲的大規模實時推薦系統 (TencentRec: Real-time Stream Recommendation in Practice)git

  1. 實現了一系列經典推薦算法的實時版本
  2. 實現了數種實時算法提升推薦精度
  3. 普遍應用於業務有效提升

騰訊採用使用storm緣由,支持實時數據流式計算,良好的可擴展性、可容錯性,採用簡單編程模型。
文章核心包括實時增量計算的ItemCF,以及用戶隱式反饋計算、實時剪枝算法、基於用戶畫像的數據稀疏性策略。應用在多個業務上都有不一樣程度的提高,最明顯的是騰訊視頻的全局表現提高高達30%。redis

全文核心應該是下圖六道公式,闡述騰訊如何具體實現的增量itemcf。算法

文章中的co-rating,其實就是咱們常說的user bias. 公式3和4解決了用戶隱式反饋問題,細節的計算能夠參考2016的文章,實際是一個log函數融合了用戶的瀏覽、點擊、分享、購買等行爲,轉化成rating.apache


corating.png

請注意公式4,因爲他們定義了corating,實際是將類似度的增量計算從L2範數的計算轉化成了L1範數計算.(當Rup取x的時候,y=1/x)。編程

可擴展的增量計算數據結構


itemcf.png

initemcf.png

2016年

騰訊視頻的推薦應用(Real-time Video Recommendation Exploration)架構

  1. 實時處理、大規模數據下的準確率和可擴展性。
  2. 開發了一個基於矩陣分解的大規模在線協同過濾算法,以及一系列的自適應更新策略。
  3. 經過增長包括視頻類別、時間因素影響、用戶畫像剪枝以及訓練等方法,提升實時TopN推薦的精度。

在咱們看來,全文核心在於實時計算的數據流轉,以下圖所示:app


tecvideo.png

基於storm的實時計算topology圖:


topo.png

2. 糖豆的設計與實現

2.1 架構

糖豆總體推薦框架,從離線,近線,在線三套計算流程組合而成。在線流程基於Spark Streaming框架實現,部署在近線集羣。 在線推薦框架實時根據用戶行爲,生成實時推薦列表,從而知足用戶瞬時興趣,提升推薦系統的推薦新鮮度。簡單架構圖以下:


糖豆實時架構.png

2.2 基於Spark Streaming的實現

2.2.1. 計算流程

實時計算流程以下圖所示:


實時計算流程圖


分解步驟:

  1. Spark Streaming 讀取Kafka,原始日誌ETL
  2. 提取用戶隱式反饋,生成候選集tuple (uid,vid)
  3. 天天凌晨會將離線計算好的ItemCF模型結果集導入Redis。itemcf數據結構是一個similarity vid list。
  4. 實時維護看過視頻set,對看過視頻的處理候選集tuple過濾該用戶看過的視頻
  5. 實時更新推薦過視頻set,候選集tuple過濾當天已經被推薦過的視頻
  6. 候選集寫入Redis推薦list

python實現:

if __name__ == "__main__":
    print sys.argv
    reload(sys)
    sys.setdefaultencoding('utf-8')
    sc = SparkContext(appName="real_time_etl")
    #20秒
    ssc = StreamingContext(sc, 15)
    brokers = "kafka-servers:9092"
    topic = "logstash"
    #讀取kafka
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
   #解析日誌、過濾無關數據、讀取類似視頻
    lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x))
    #lines.pprint()
     #寫入推薦結果
    lines.foreachRDD(lambda rdd: list2Redis(rdd))  
    ssc.start()
    ssc.awaitTermination()

 

2.2.2 監控

部署在集羣Master節點的監控腳本會每30s掃描一次實時計算代碼進程,若是發現進程被failed,會自動拉起實時計算Spark Steaming進程。若是進程拉起失敗會觸發郵件、短信報警

#! /bin/sh

MOBILE="your phone numbers"
RT_HOME=/home/realtime/recommend.py

DIR=/data/rtdamon
PID_FILE=$DIR/.run/rt-litetl-damon.pid
LOG_FILE=$DIR/.log/rt-litetl-damon.log
t=$(date -d "today" +"%Y-%m-%d %H:%M:%S")

source /etc/profile 
echo $PID_FILE $LOG_FILE

if [ -e "$PID_FILE" ];
then
        pid=`cat $PID_FILE`
        echo $pid
        damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v \<defunct\> `
        echo "damon process exists : $process_exists"
        if [ -n "$damon_process_exists" ]
        then
                echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE
                exit
        fi
fi

pid=$$
echo "$pid" > $PID_FILE

while :
do
        process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l`
        echo "process exists : $process_exists" >>$LOG_FILE
        if [ "$process_exists" == "0" ]; then


/hadoop/spark/bin/spark-submit  --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log  2>&1 &
    /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py"
                echo "realtime recommendation process already restarted at $t" >> $LOG_FILE
        fi

        #sleep `expr 3600 \* 3`
        sleep `expr 60 \* 1`
done

 

2.3 收益

根據咱們的AB測試數據來看,總體CTR提高25%。用推薦系統的A版對比無推薦的B版,用戶觀看時長提高47%。


recabdata.png

3. 問題與改進

    1. 較多代碼邏輯集中在Redis。目前Redis無災備措施,同時IO和負載也會出現Peak。
    2. Spark Streaming 目前實時級別在分鐘級。須要升級成storm的秒、毫秒級別。
    3. 須要用戶點擊等行爲纔會生產數據,容易召回不足。
相關文章
相關標籤/搜索