推薦系統-多路召回

介紹
第一部分,已經介紹協同推薦,並利用它實現了一個簡單的推薦系統,在第二部分,咱們簡要的分析了咱們擁有的數據,包括每一個字段的分佈,常見的統計信息等,爲接下來的多路召回提供了很好的指引。html

回想一下baseline的思路,咱們首先計算了item的之間的類似度,而後基於用戶的正反饋item列表,找到與列表中每個item類似度最高的topn個item,組成一個列表,最後直接按照類似度得分進行排序,獲得最後的推薦結果。python

在實際的推薦場景中,一般會有兩個階段,第一個階段是召回階段,第二個階段是排序階段。第一個階段召回那些類似度較高的N個item列表,它更關注的是召回率,較爲粗略;而排序階段會使用更爲複雜的模型進行監督學習(轉化爲分類的任務),獲得分類機率,也就是置信度,最後按照置信度進行排序,取top K個item做爲最後的推薦列表。git

baseline中其實只包含了召回這個階段,雖然但就這個任務而言,它已經夠了。本節介紹的是多路召回,什麼是多路召回呢,好比在一個推薦場景中,咱們能夠選擇ItemCF或者UserCF以及基於熱點的召回策略等等,由於咱們召回層的目的是爲了儘量的確保召回,因此基於單個的策略確定是效果不如多個策略的,這裏就引出了多路召回的概念,也就是多個策略並行的進行召回。下面這個圖表示了多路召回的一個例子。
image.pnggithub

在多路召回中,每一個策略之間絕不相關,可使用多種不一樣的策略來獲取用戶排序的候選商品集合,而具體使用哪些召回策略實際上是與業務強相關的 ,針對不一樣的任務就會有對於該業務真實場景下須要考慮的召回規則。例如新聞推薦,召回規則能夠是「熱門視頻」、「導演召回」、「演員召回」、「最近上映「、」流行趨勢「、」類型召回「等等。算法

導入相關庫segmentfault

import pandas as pd  
import numpy as np
from tqdm import tqdm  
from collections import defaultdict  
import os, math, warnings, math, pickle
from tqdm import tqdm
import faiss
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
from sklearn.preprocessing import LabelEncoder
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from deepmatch.models import *
from deepmatch.utils import sampledsoftmaxloss

warnings.filterwarnings('ignore')
data_path = './data_raw/'
save_path = './temp_results/'
# 作召回評估的一個標誌, 若是不進行評估就是直接使用全量數據進行召回
metric_recall = False

讀取數據架構

在通常的推薦系統比賽中讀取數據部分主要分爲三種模式, 不一樣的模式對應的不一樣的數據集:app

  1. Debug模式: 這個的目的是幫助咱們基於數據先搭建一個簡易的baseline並跑通, 保證寫的baseline代碼沒有什麼問題。 因爲推薦比賽的數據每每很是巨大, 若是一上來直接採用所有的數據進行分析,搭建baseline框架, 每每會帶來時間和設備上的損耗, 因此這時候咱們每每須要從海量數據的訓練集中隨機抽取一部分樣原本進行調試(train_click_log_sample), 先跑通一個baseline。
  2. 線下驗證模式: 這個的目的是幫助咱們在線下基於已有的訓練集數據, 來選擇好合適的模型和一些超參數。 因此咱們這一塊只須要加載整個訓練集(train_click_log), 而後把整個訓練集再分紅訓練集和驗證集。 訓練集是模型的訓練數據, 驗證集部分幫助咱們調整模型的參數和其餘的一些超參數。
  3. 線上模式: 咱們用debug模式搭建起一個推薦系統比賽的baseline, 用線下驗證模式選擇好了模型和一些超參數, 這一部分就是真正的對於給定的測試集進行預測, 提交到線上, 因此這一塊使用的訓練數據集是全量的數據集(train_click_log+test_click_log)

下面就分別對這三種不一樣的數據讀取模式先創建不一樣的代導入函數, 方便後面針對不一樣的模式下導入數據。框架

# debug模式: 從訓練集中劃出一部分數據來調試代碼
def get_all_click_sample(data_path, sample_nums=10000):
    """
        訓練集中採樣一部分數據調試
        data_path: 原數據的存儲路徑
        sample_nums: 採樣數目(這裏因爲機器的內存限制,能夠採樣用戶作)
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) 
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

# 讀取點擊數據,這裏分紅線上和線下,若是是爲了獲取線上提交結果應該講測試集中的點擊數據合併到總的數據中
# 若是是爲了線下驗證模型的有效性或者特徵的有效性,能夠只使用訓練集
def get_all_click_df(data_path='./data', offline=True):
    if offline:
        all_click = pd.read_csv(data_path + '/train_click_log.csv')
    else:
        trn_click = pd.read_csv(data_path + '/train_click_log.csv')
        tst_click = pd.read_csv(data_path + '/testA_click_log.csv')

        all_click = trn_click.append(tst_click)
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click
# 讀取文章的基本屬性
def get_item_info_df(data_path):
    item_info_df = pd.read_csv(data_path + 'articles.csv')
    
    # 爲了方便與訓練集中的click_article_id拼接,須要把article_id修改爲click_article_id
    item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
    
    return item_info_df
# 讀取文章的Embedding數據
def get_item_emb_dict(data_path):
    item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
    # 進行歸一化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)

    item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
    pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
    
    return item_emb_dict
# min-max 歸一化函數
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 採樣數據
# all_click_df = get_all_click_sample(data_path)

# 全量訓練集
all_click_df = get_all_click_df(offline=False)

# 對時間戳進行歸一化,用於在關聯規則的時候計算權重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)

item_info_df = get_item_info_df(data_path)

item_emb_dict = get_item_emb_dict(data_path)

工具函數
獲取用戶-文章-時間函數dom

這個在基於關聯規則的用戶協同過濾的時候會用到

# 根據點擊時間獲取用戶的點擊文章序列   {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
    
    click_df = click_df.sort_values('click_timestamp')
    
    def make_item_time_pair(df):
        return list(zip(df['click_article_id'], df['click_timestamp']))
    
    user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'item_time_list'})
    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
    
    return user_item_time_dict

獲取文章-用戶-時間函數
這個在基於關聯規則的文章協同過濾的時候會用到

# 根據時間獲取商品被點擊的用戶序列  {item1: [(user1, time1), (user2, time2)...]...}
# 這裏的時間是用戶點擊當前商品的時間,好像沒有直接的關係。
def get_item_user_time_dict(click_df):
    def make_user_time_pair(df):
        return list(zip(df['user_id'], df['click_timestamp']))
    
    click_df = click_df.sort_values('click_timestamp')
    item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
                                                            .reset_index().rename(columns={0: 'user_time_list'})
    
    item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
    return item_user_time_dict

獲取歷史和最後一次點擊

這個在評估召回結果, 特徵工程和製做標籤轉成監督學習測試集的時候回用到

# 獲取當前數據的歷史點擊和最後一次點擊
def get_hist_and_last_click(all_click):
    
    all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
    click_last_df = all_click.groupby('user_id').tail(1)

    # 若是用戶只有一個點擊,hist爲空了,會致使訓練的時候這個用戶不可見,此時默認泄露一下
    def hist_func(user_df):
        if len(user_df) == 1:
            return user_df
        else:
            return user_df[:-1]

    click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)

    return click_hist_df, click_last_df

獲取文章屬性特徵

# 獲取文章id對應的基本屬性,保存成字典的形式,方便後面召回階段,冷啓動階段直接使用
def get_item_info_dict(item_info_df):
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
    
    item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
    item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
    item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
    
    return item_type_dict, item_words_dict, item_created_time_dict

獲取用戶歷史點擊的文章信息

def get_user_hist_item_info_dict(all_click):
    
    # 獲取user_id對應的用戶歷史點擊文章類型的集合字典
    user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
    user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
    
    # 獲取user_id對應的用戶點擊文章的集合
    user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
    user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
    
    # 獲取user_id對應的用戶歷史點擊的文章的平均字數字典
    user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
    user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
    
    # 獲取user_id對應的用戶最後一次點擊的文章的建立時間
    all_click_ = all_click.sort_values('click_timestamp')
    user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
    
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
    
    user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
                                                user_last_item_created_time['created_at_ts']))
    
    return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict

獲取點擊次數最多的Top-k個文章
獲取近期點擊最多的文章

def get_item_topk_click(click_df, k):
    topk_click = click_df['click_article_id'].value_counts().index[:k]
    return topk_click

定義多路召回字典
獲取文章的屬性信息,保存成字典的形式方便查詢

item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)

# 定義一個多路召回的字典,將各路召回的結果都保存在這個字典當中
user_multi_recall_dict =  {'itemcf_sim_itemcf_recall': {},
                           'embedding_sim_item_recall': {},
                           'youtubednn_recall': {},
                           'youtubednn_usercf_recall': {}, 
                           'cold_start_recall': {}}
                           
                           
# 提取最後一次點擊做爲召回評估,若是不須要作召回評估直接使用全量的訓練集進行召回(線下驗證模型)
# 若是不是召回評估,直接使用全量數據進行召回,不用將最後一次提取出來
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)

召回效果評估
作完了召回有時候也須要對當前的召回方法或者參數進行調整以達到更好的召回效果,由於召回的結果決定了最終排序的上限,下面也會提供一個召回評估的方法

# 依次評估召回的前10, 20, 30, 40, 50個文章中的擊中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
    last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
    user_num = len(user_recall_items_dict)
    
    for k in range(10, topk+1, 10):
        hit_num = 0
        for user, item_list in user_recall_items_dict.items():
            # 獲取前k個召回的結果
            tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
            if last_click_item_dict[user] in set(tmp_recall_items):
                hit_num += 1
        
        hit_rate = round(hit_num * 1.0 / user_num, 5)
        print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)

計算類似性矩陣
這一部分主要是經過協同過濾以及向量檢索獲得類似性矩陣,類似性矩陣主要分爲user2user和item2item,下面依次獲取基於itemCF的item2item的類似性矩陣。

itemCF i2i_sim
借鑑KDD2020的去偏商品推薦,在計算item2item類似性矩陣時,使用關聯規則,使得計算的文章的類似性還考慮到了:

  1. 用戶點擊的時間權重
  2. 用戶點擊的順序權重
  3. 文章建立的時間權重
def itemcf_sim(df, item_created_time_dict):
    """
        文章與文章之間的類似性矩陣計算
        :param df: 數據表
        :item_created_time_dict:  文章建立時間的字典
        return : 文章與文章的類似性矩陣
        
        思路: 基於物品的協同過濾(詳細請參考上一期推薦系統基礎的組隊學習) + 關聯規則
    """
    
    user_item_time_dict = get_user_item_time(df)
    
    # 計算物品類似度
    i2i_sim = {}
    item_cnt = defaultdict(int)
    for user, item_time_list in tqdm(user_item_time_dict.items()):
        # 在基於商品的協同過濾優化的時候能夠考慮時間因素
        for loc1, (i, i_click_time) in enumerate(item_time_list):
            item_cnt[i] += 1
            i2i_sim.setdefault(i, {})
            for loc2, (j, j_click_time) in enumerate(item_time_list):
                if(i == j):
                    continue
                    
                # 考慮文章的正向順序點擊和反向順序點擊    
                loc_alpha = 1.0 if loc2 > loc1 else 0.7
                # 位置信息權重,其中的參數能夠調節
                loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
                # 點擊時間權重,其中的參數能夠調節
                click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
                # 兩篇文章建立時間的權重,其中的參數能夠調節
                created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                i2i_sim[i].setdefault(j, 0)
                # 考慮多種因素的權重計算最終的文章之間的類似度
                i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
                
    i2i_sim_ = i2i_sim.copy()
    for i, related_items in i2i_sim.items():
        for j, wij in related_items.items():
            i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
    
    # 將獲得的類似性矩陣保存到本地
    pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
    
    return i2i_sim_

i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]

userCF u2u_sim
在計算用戶之間的類似度的時候,也可使用一些簡單的關聯規則,好比用戶活躍度權重,這裏將用戶的點擊次數做爲用戶活躍度的指標

def get_user_activate_degree_dict(all_click_df):
    all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index()
    
    # 用戶活躍度歸一化
    mm = MinMaxScaler()
    all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']])
    user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id']))
    
    return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict):
    """
        用戶類似性矩陣計算
        :param all_click_df: 數據表
        :param user_activate_degree_dict: 用戶活躍度的字典
        return 用戶類似性矩陣
        
        思路: 基於用戶的協同過濾(詳細請參考上一期推薦系統基礎的組隊學習) + 關聯規則
    """
    item_user_time_dict = get_item_user_time_dict(all_click_df)
    
    u2u_sim = {}
    user_cnt = defaultdict(int)
    for item, user_time_list in tqdm(item_user_time_dict.items()):
        for u, click_time in user_time_list:
            user_cnt[u] += 1
            u2u_sim.setdefault(u, {})
            for v, click_time in user_time_list:
                u2u_sim[u].setdefault(v, 0)
                if u == v:
                    continue
                # 用戶平均活躍度做爲活躍度的權重,這裏的式子也能夠改善
                activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v])   
                u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1)
    
    u2u_sim_ = u2u_sim.copy()
    for u, related_users in u2u_sim.items():
        for v, wij in related_users.items():
            u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v])
    
    # 將獲得的類似性矩陣保存到本地
    pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb'))

    return u2u_sim_
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 因爲usercf計算時候太耗費內存了,這裏就不直接運行了
# 若是是採樣的話,是能夠運行的
user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)
u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)

item embedding sim
使用Embedding計算item之間的類似度是爲了後續冷啓動的時候能夠獲取未出如今點擊數據中的文章,後面有對冷啓動專門的介紹,這裏簡單的說一下faiss。

aiss是Facebook的AI團隊開源的一套用於作聚類或者類似性搜索的軟件庫,底層是用C++實現。Faiss由於超級優越的性能,被普遍應用於推薦相關的業務當中.

faiss工具包通常使用在推薦系統中的向量召回部分。在作向量召回的時候要麼是u2u,u2i或者i2i,這裏的u和i指的是user和item.咱們知道在實際的場景中user和item的數量都是海量的,咱們最容易想到的基於向量類似度的召回就是使用兩層循環遍歷user列表或者item列表計算兩個向量的類似度,可是這樣作在面對海量數據是不切實際的,faiss就是用來加速計算某個查詢向量最類似的topk個索引向量。

faiss查詢的原理:

faiss使用了PCA和PQ(Product quantization乘積量化)兩種技術進行向量壓縮和編碼,固然還使用了其餘的技術進行優化,可是PCA和PQ是其中最核心部分。

  1. PCA降維算法細節參考下面這個連接進行學習
    主成分分析(PCA)原理總結 4
  2. PQ編碼的細節下面這個連接進行學習
    實例理解product quantization算法 4

faiss使用

faiss官方教程 7

# 向量檢索類似度計算
# topk指的是每一個item, faiss搜索後返回最類似的topk個item
def embdding_sim(click_df, item_emb_df, save_path, topk):
    """
        基於內容的文章embedding類似性矩陣計算
        :param click_df: 數據表
        :param item_emb_df: 文章的embedding
        :param save_path: 保存路徑
        :patam topk: 找最類似的topk篇
        return 文章類似性矩陣
        
        思路: 對於每一篇文章, 基於embedding的類似性返回topk個與其最類似的文章, 只不過因爲文章數量太多,這裏用了faiss進行加速
    """
    
    # 文章索引與文章id的字典映射
    item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
    
    item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
    item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
    # 向量進行單位化
    item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
    
    # 創建faiss索引
    item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
    item_index.add(item_emb_np)
    # 類似度查詢,給每一個索引位置上的向量返回topk個item以及類似度
    sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
    
    # 將向量檢索的結果保存成原始id的對應關係
    item_sim_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
        target_raw_id = item_idx_2_rawid_dict[target_idx]
        # 從1開始是爲了去掉商品自己, 因此最終得到的類似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_idx_2_rawid_dict[rele_idx]
            item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
    
    # 保存i2i類似度矩陣
    pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))   
    
    return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk能夠自行設置

召回
這個就是咱們開篇提到的那個問題, 面的36萬篇文章, 20多萬用戶的推薦, 咱們又有哪些策略來縮減問題的規模? 咱們就能夠再召回階段篩選出用戶對於點擊文章的候選集合, 從而下降問題的規模。召回經常使用的策略:

  • Youtube DNN 召回
  • 基於文章的召回

    • 文章的協同過濾
    • 基於文章embedding的召回
  • 基於用戶的召回

    • 用戶的協同過濾
    • 用戶embedding

上面的各類召回方式一部分在基於用戶已經看得文章的基礎上去召回與這些文章類似的一些文章, 而這個類似性的計算方式不一樣, 就獲得了不一樣的召回方式, 好比文章的協同過濾, 文章內容的embedding等。還有一部分是根據用戶的類似性進行推薦,對於某用戶推薦與其類似的其餘用戶看過的文章,好比用戶的協同過濾和用戶embedding。 還有一種思路是相似矩陣分解的思路,先計算出用戶和文章的embedding以後,就能夠直接算用戶和文章的類似度, 根據這個類似度進行推薦, 好比YouTube DNN。 咱們下面詳細來看一下每個召回方法:

YoutubeDNN召回

(這一步是直接獲取用戶召回的候選文章列表)

Youtubednn召回架構
image.png
關於YoutubeDNN原理和應用推薦看王喆的兩篇博客:

# 獲取雙塔召回時的訓練驗證數據
# negsample指的是經過滑窗構建樣本的時候,負樣本的數量
def gen_data_set(data, negsample=0):
    data.sort_values("click_timestamp", inplace=True)
    item_ids = data['click_article_id'].unique()

    train_set = []
    test_set = []
    for reviewerID, hist in tqdm(data.groupby('user_id')):
        pos_list = hist['click_article_id'].tolist()
        
        if negsample > 0:
            candidate_set = list(set(item_ids) - set(pos_list))   # 用戶沒看過的文章裏面選擇負樣本
            neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True)  # 對於每一個正樣本,選擇n個負樣本
            
        # 長度只有一個的時候,須要把這條數據也放到訓練集中,否則的話最終學到的embedding就會有缺失
        if len(pos_list) == 1:
            train_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
            test_set.append((reviewerID, [pos_list[0]], pos_list[0],1,len(pos_list)))
            
        # 滑窗構造正負樣本
        for i in range(1, len(pos_list)):
            hist = pos_list[:i]
            
            if i != len(pos_list) - 1:
                train_set.append((reviewerID, hist[::-1], pos_list[i], 1, len(hist[::-1])))  # 正樣本 [user_id, his_item, pos_item, label, len(his_item)]
                for negi in range(negsample):
                    train_set.append((reviewerID, hist[::-1], neg_list[i*negsample+negi], 0,len(hist[::-1]))) # 負樣本 [user_id, his_item, neg_item, label, len(his_item)]
            else:
                # 將最長的那一個序列長度做爲測試數據
                test_set.append((reviewerID, hist[::-1], pos_list[i],1,len(hist[::-1])))
                
    random.shuffle(train_set)
    random.shuffle(test_set)
    
    return train_set, test_set

# 將輸入的數據進行padding,使得序列特徵的長度都一致
def gen_model_input(train_set,user_profile,seq_max_len):

    train_uid = np.array([line[0] for line in train_set])
    train_seq = [line[1] for line in train_set]
    train_iid = np.array([line[2] for line in train_set])
    train_label = np.array([line[3] for line in train_set])
    train_hist_len = np.array([line[4] for line in train_set])

    train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post', truncating='post', value=0)
    train_model_input = {"user_id": train_uid, "click_article_id": train_iid, "hist_article_id": train_seq_pad,
                         "hist_len": train_hist_len}

    return train_model_input, train_label
def youtubednn_u2i_dict(data, topk=20):    
    sparse_features = ["click_article_id", "user_id"]
    SEQ_LEN = 30 # 用戶點擊序列的長度,短的填充,長的截斷
    
    user_profile_ = data[["user_id"]].drop_duplicates('user_id')
    item_profile_ = data[["click_article_id"]].drop_duplicates('click_article_id')  
    
    # 類別編碼
    features = ["click_article_id", "user_id"]
    feature_max_idx = {}
    
    for feature in features:
        lbe = LabelEncoder()
        data[feature] = lbe.fit_transform(data[feature])
        feature_max_idx[feature] = data[feature].max() + 1
    
    # 提取user和item的畫像,這裏具體選擇哪些特徵還須要進一步的分析和考慮
    user_profile = data[["user_id"]].drop_duplicates('user_id')
    item_profile = data[["click_article_id"]].drop_duplicates('click_article_id')  
    
    user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_['user_id']))
    item_index_2_rawid = dict(zip(item_profile['click_article_id'], item_profile_['click_article_id']))
    
    # 劃分訓練和測試集
    # 因爲深度學習須要的數據量一般都是很是大的,因此爲了保證召回的效果,每每會經過滑窗的形式擴充訓練樣本
    train_set, test_set = gen_data_set(data, 0)
    # 整理輸入數據,具體的操做能夠看上面的函數
    train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)
    test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)
    
    # 肯定Embedding的維度
    embedding_dim = 16
    
    # 將數據整理成模型能夠直接輸入的形式
    user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
                            VarLenSparseFeat(SparseFeat('hist_article_id', feature_max_idx['click_article_id'], embedding_dim,
                                                        embedding_name="click_article_id"), SEQ_LEN, 'mean', 'hist_len'),]
    item_feature_columns = [SparseFeat('click_article_id', feature_max_idx['click_article_id'], embedding_dim)]
    
    # 模型的定義 
    # num_sampled: 負採樣時的樣本數量
    model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim))
    # 模型編譯
    model.compile(optimizer="adam", loss=sampledsoftmaxloss)  
    
    # 模型訓練,這裏能夠定義驗證集的比例,若是設置爲0的話就是全量數據直接進行訓練
    history = model.fit(train_model_input, train_label, batch_size=256, epochs=1, verbose=1, validation_split=0.0)
    
    # 訓練完模型以後,提取訓練的Embedding,包括user端和item端
    test_user_model_input = test_model_input
    all_item_model_input = {"click_article_id": item_profile['click_article_id'].values}

    user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
    item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
    
    # 保存當前的item_embedding 和 user_embedding 排序的時候可能可以用到,可是須要注意保存的時候須要和原始的id對應
    user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
    item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
    
    # embedding保存以前歸一化一下
    user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
    item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
    
    # 將Embedding轉換成字典的形式方便查詢
    raw_user_id_emb_dict = {user_index_2_rawid[k]: 
                                v for k, v in zip(user_profile['user_id'], user_embs)}
    raw_item_id_emb_dict = {item_index_2_rawid[k]: 
                                v for k, v in zip(item_profile['click_article_id'], item_embs)}
    # 將Embedding保存到本地
    pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_youtube_emb.pkl', 'wb'))
    pickle.dump(raw_item_id_emb_dict, open(save_path + 'item_youtube_emb.pkl', 'wb'))
    
    # faiss緊鄰搜索,經過user_embedding 搜索與其類似性最高的topk個item
    index = faiss.IndexFlatIP(embedding_dim)
    # 上面已經進行了歸一化,這裏能夠不進行歸一化了
#     faiss.normalize_L2(user_embs)
#     faiss.normalize_L2(item_embs)
    index.add(item_embs) # 將item向量構建索引
    sim, idx = index.search(np.ascontiguousarray(user_embs), topk) # 經過user去查詢最類似的topk個item
    
    user_recall_items_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
        target_raw_id = user_index_2_rawid[target_idx]
        # 從1開始是爲了去掉商品自己, 因此最終得到的類似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = item_index_2_rawid[rele_idx]
            user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {})
                                                                    .get(rele_raw_id, 0) + sim_value
            
    user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
    # 將召回的結果進行排序
    
    # 保存召回的結果
    # 這裏是直接經過向量的方式獲得了召回結果,相比於上面的召回方法,上面的只是獲得了i2i及u2u的類似性矩陣,還須要進行協同過濾召回才能獲得召回結果
    # 能夠直接對這個召回結果進行評估,爲了方即可以統一寫一個評估函數對全部的召回結果進行評估
    pickle.dump(user_recall_items_dict, open(save_path + 'youtube_u2i_dict.pkl', 'wb'))
    return user_recall_items_dict
# 因爲這裏須要作召回評估,因此講訓練集中的最後一次點擊都提取了出來
if not metric_recall:
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(all_click_df, topk=20)
else:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
    user_multi_recall_dict['youtubednn_recall'] = youtubednn_u2i_dict(trn_hist_click_df, topk=20)
    # 召回效果評估
    metrics_recall(user_multi_recall_dict['youtubednn_recall'], trn_last_click_df, topk=20)

itemCF recall

上面已經經過協同過濾,Embedding檢索的方式獲得了文章的類似度矩陣,下面使用協同過濾的思想,給用戶召回與其歷史文章類似的文章。
這裏在召回的時候,也是用了關聯規則的方式:

  1. 考慮類似文章與歷史點擊文章順序的權重(細節看代碼)
  2. 考慮文章建立時間的權重,也就是考慮類似文章與歷史點擊文章建立時間差的權重
  3. 考慮文章內容類似度權重(使用Embedding計算類似文章類似度,可是這裏須要注意,在Embedding的時候並無計算全部商品兩兩之間的類似度,因此類似的文章與歷史點擊文章不存在類似度,須要作特殊處理)
# 基於商品的召回i2i
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim):
    """
        基於文章協同過濾的召回
        :param user_id: 用戶id
        :param user_item_time_dict: 字典, 根據點擊時間獲取用戶的點擊文章序列   {user1: [(item1, time1), (item2, time2)..]...}
        :param i2i_sim: 字典,文章類似性矩陣
        :param sim_item_topk: 整數, 選擇與當前文章最類似的前k篇文章
        :param recall_item_num: 整數, 最後的召回文章數量
        :param item_topk_click: 列表,點擊次數最多的文章列表,用戶召回補全
        :param emb_i2i_sim: 字典基於內容embedding算的文章類似矩陣
        
        return: 召回的文章列表 [(item1, score1), (item2, score2)...]
        
    """
    # 獲取用戶歷史交互的文章
    user_hist_items = user_item_time_dict[user_id]
    user_hist_items_ = {user_id for user_id, _ in user_hist_items }

    item_rank = {}
    for loc, (i, click_time) in enumerate(user_hist_items):
        for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
            if j in user_hist_items_:
                continue
            
            # 文章建立時間差權重
            created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
            # 類似文章和歷史點擊文章序列中歷史文章所在的位置權重
            loc_weight = (0.9 ** (len(user_hist_items) - loc))
            
            content_weight = 1.0
            if emb_i2i_sim.get(i, {}).get(j, None) is not None:
                content_weight += emb_i2i_sim[i][j]
            if emb_i2i_sim.get(j, {}).get(i, None) is not None:
                content_weight += emb_i2i_sim[j][i]
                
            item_rank.setdefault(j, 0)
            item_rank[j] += created_time_weight * loc_weight * content_weight * wij
    
    # 不足10個,用熱門商品補全
    if len(item_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in item_rank.items(): # 填充的item應該不在原來的列表中
                continue
            item_rank[item] = - i - 100 # 隨便給個負數就行
            if len(item_rank) == recall_item_num:
                break
    
    item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
        
    return item_rank

itemCF sim召回

# 先進行itemcf召回, 爲了召回評估,因此提取最後一次點擊

if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)

i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))
emb_i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl', 'rb'))

sim_item_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \
                                                        i2i_sim, sim_item_topk, recall_item_num, \
                                                        item_topk_click, item_created_time_dict, emb_i2i_sim)

user_multi_recall_dict['itemcf_sim_itemcf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall'], open(save_path + 'itemcf_recall_dict.pkl', 'wb'))

if metric_recall:
    # 召回效果評估
    metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall'], trn_last_click_df, topk=recall_item_num)

embedding sim 召回

# 這裏是爲了召回評估,因此提取最後一次點擊
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 20
recall_item_num = 10

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
    
user_multi_recall_dict['embedding_sim_item_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['embedding_sim_item_recall'], open(save_path + 'embedding_sim_item_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果評估
    metrics_recall(user_multi_recall_dict['embedding_sim_item_recall'], trn_last_click_df, topk=recall_item_num)

userCF召回
基於用戶協同過濾,核心思想是給用戶推薦與其類似的用戶歷史點擊文章,由於這裏涉及到了類似用戶的歷史文章,這裏仍然能夠加上一些關聯規則來給用戶可能點擊的文章進行加權,這裏使用的關聯規則主要是考慮類似用戶的歷史點擊文章與被推薦用戶歷史點擊商品的關係權重,而這裏的關係就能夠直接借鑑基於物品的協同過濾類似的作法,只不過這裏是對被推薦物品關係的一個累加的過程,下面是使用的一些關係權重,及相關的代碼:

  1. 計算被推薦用戶歷史點擊文章與類似用戶歷史點擊文章的類似度,文章建立時間差,相對位置的總和,做爲各自的權重
# 基於用戶的召回 u2u2i
def user_based_recommend(user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num, 
                         item_topk_click, item_created_time_dict, emb_i2i_sim):
    """
        基於文章協同過濾的召回
        :param user_id: 用戶id
        :param user_item_time_dict: 字典, 根據點擊時間獲取用戶的點擊文章序列   {user1: [(item1, time1), (item2, time2)..]...}
        :param u2u_sim: 字典,文章類似性矩陣
        :param sim_user_topk: 整數, 選擇與當前用戶最類似的前k個用戶
        :param recall_item_num: 整數, 最後的召回文章數量
        :param item_topk_click: 列表,點擊次數最多的文章列表,用戶召回補全
        :param item_created_time_dict: 文章建立時間列表
        :param emb_i2i_sim: 字典基於內容embedding算的文章類似矩陣
        
        return: 召回的文章列表 [(item1, score1), (item2, score2)...]
    """
    # 歷史交互
    user_item_time_list = user_item_time_dict[user_id]    # {item1: time1, item2: time2...}
    user_hist_items = set([i for i, t in user_item_time_list])   # 存在一個用戶與某篇文章的屢次交互, 這裏得去重
    
    items_rank = {}
    for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
        for i, click_time in user_item_time_dict[sim_u]:
            if i in user_hist_items:
                continue
            items_rank.setdefault(i, 0)
            
            loc_weight = 1.0
            content_weight = 1.0
            created_time_weight = 1.0
            
            # 當前文章與該用戶看的歷史文章進行一個權重交互
            for loc, (j, click_time) in enumerate(user_item_time_list):
                # 點擊時的相對位置權重
                loc_weight += 0.9 ** (len(user_item_time_list) - loc)
                # 內容類似性權重
                if emb_i2i_sim.get(i, {}).get(j, None) is not None:
                    content_weight += emb_i2i_sim[i][j]
                if emb_i2i_sim.get(j, {}).get(i, None) is not None:
                    content_weight += emb_i2i_sim[j][i]
                
                # 建立時間差權重
                created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
                
            items_rank[i] += loc_weight * content_weight * created_time_weight * wuv
        
    # 熱度補全
    if len(items_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in items_rank.items(): # 填充的item應該不在原來的列表中
                continue
            items_rank[item] = - i - 100 # 隨便給個複數就行
            if len(items_rank) == recall_item_num:
                break
        
    items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]    
    
    return items_rank

userCF sim召回

# 這裏是爲了召回評估,因此提取最後一次點擊
# 因爲usercf中計算user之間的類似度的過程太費內存了,全量數據這裏就沒有跑,跑了一個採樣以後的數據
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df
    
user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)

u2u_sim = pickle.load(open(save_path + 'usercf_u2u_sim.pkl', 'rb'))

sim_user_topk = 20
recall_item_num = 10
item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)

for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)    

pickle.dump(user_recall_items_dict, open(save_path + 'usercf_u2u2i_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果評估
    metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)

user embedding sim召回

雖然沒有直接跑usercf的計算用戶之間的類似度,爲了驗證上述基於用戶的協同過濾的代碼,下面使用了YoutubeDNN過程當中產生的user embedding來進行向量檢索每一個user最類似的topk個user,在使用這裏獲得的u2u的類似性矩陣,使用usercf進行召回,具體代碼以下

# 使用Embedding的方式獲取u2u的類似性矩陣
# topk指的是每一個user, faiss搜索後返回最類似的topk個user
def u2u_embdding_sim(click_df, user_emb_dict, save_path, topk):
    
    user_list = []
    user_emb_list = []
    for user_id, user_emb in user_emb_dict.items():
        user_list.append(user_id)
        user_emb_list.append(user_emb)
        
    user_index_2_rawid_dict = {k: v for k, v in zip(range(len(user_list)), user_list)}    
    
    user_emb_np = np.array(user_emb_list, dtype=np.float32)
    
    # 創建faiss索引
    user_index = faiss.IndexFlatIP(user_emb_np.shape[1])
    user_index.add(user_emb_np)
    # 類似度查詢,給每一個索引位置上的向量返回topk個item以及類似度
    sim, idx = user_index.search(user_emb_np, topk) # 返回的是列表
   
    # 將向量檢索的結果保存成原始id的對應關係
    user_sim_dict = collections.defaultdict(dict)
    for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb_np)), sim, idx)):
        target_raw_id = user_index_2_rawid_dict[target_idx]
        # 從1開始是爲了去掉商品自己, 因此最終得到的類似商品只有topk-1
        for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): 
            rele_raw_id = user_index_2_rawid_dict[rele_idx]
            user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
    
    # 保存i2i類似度矩陣
    pickle.dump(user_sim_dict, open(save_path + 'youtube_u2u_sim.pkl', 'wb'))   
    return user_sim_dict
# 讀取YoutubeDNN過程當中產生的user embedding, 而後使用faiss計算用戶之間的類似度
# 這裏須要注意,這裏獲得的user embedding其實並非很好,由於YoutubeDNN中使用的是用戶點擊序列來訓練的user embedding,
# 若是序列廣泛都比較短的話,其實效果並非很好
user_emb_dict = pickle.load(open(save_path + 'user_youtube_emb.pkl', 'rb'))
u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10)

經過YoutubeDNN獲得的user_embedding

# 使用召回評估函數驗證當前召回方式的效果
if metric_recall:
    trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
else:
    trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
u2u_sim = pickle.load(open(save_path + 'youtube_u2u_sim.pkl', 'rb'))

sim_user_topk = 20
recall_item_num = 10

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \
                                                        recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim)
    
user_multi_recall_dict['youtubednn_usercf_recall'] = user_recall_items_dict
pickle.dump(user_multi_recall_dict['youtubednn_usercf_recall'], open(save_path + 'youtubednn_usercf_recall.pkl', 'wb'))

if metric_recall:
    # 召回效果評估
    metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall'], trn_last_click_df, topk=recall_item_num)

冷啓動問題

冷啓動問題能夠分紅三類:文章冷啓動,用戶冷啓動,系統冷啓動。

  • 文章冷啓動:對於一個平臺系統新加入的文章,該文章沒有任何的交互記錄,如何推薦給用戶的問題。(對於咱們場景能夠認爲是,日誌數據中沒有出現過的文章均可以認爲是冷啓動的文章)
  • 用戶冷啓動:對於一個平臺系統新來的用戶,該用戶尚未文章的交互信息,如何給該用戶進行推薦。(對於咱們場景就是,測試集中的用戶是否在測試集對應的log數據中出現過,若是沒有出現過,那麼能夠認爲該用戶是冷啓動用戶。可是有時候並無這麼嚴格,咱們也能夠本身設定某些指標來判別哪些用戶是冷啓動用戶,好比經過使用時長,點擊率,留存率等等)
  • 系統冷啓動:就是對於一個平臺剛上線,尚未任何的相關歷史數據,此時就是系統冷啓動,其實也就是前面兩種的一個綜合。

當前場景下冷啓動問題的分析:

對當前的數據進行分析會發現,日誌中全部出現過的點擊文章只有3w多個,而整個文章庫中卻有30多萬,那麼測試集中的用戶最後一次點擊是否會點擊沒有出如今日誌中的文章呢?若是存在這種狀況,說明用戶點擊的文章以前沒有任何的交互信息,這也就是咱們所說的文章冷啓動。經過數據分析還能夠發現,測試集用戶只有一次點擊的數據佔得比例還很多,其實僅僅經過用戶的一次點擊就給用戶推薦文章使用模型的方式也是比較難的,這裏其實也能夠考慮用戶冷啓動的問題,可是這裏只給出物品冷啓動的一些解決方案及代碼,關於用戶冷啓動的話提一些可行性的作法。

  1. 文章冷啓動(沒有冷啓動的探索問題)
    其實咱們這裏不是爲了作文章的冷啓動而作冷啓動,而是猜想用戶可能會點擊一些沒有在log數據中出現的文章,咱們要作的就是如何從將近27萬的文章中選擇一些文章做爲用戶冷啓動的文章,這裏其實也能夠當作是一種召回策略,咱們這裏就採用簡單的比較好理解的基於規則的召回策略來獲取用戶可能點擊的未出如今log數據中的文章。
    如今的問題變成了:如何給每一個用戶考慮從27萬個商品中獲取一小部分商品?隨機選一些多是一種方案。下面給出一些參考的方案。

    1. 首先基於Embedding召回一部分與用戶歷史類似的文章
    2. 從基於Embedding召回的文章中經過一些規則過濾掉一些文章,使得留下的文章用戶更可能點擊。咱們這裏的規則,能夠是,留下那些與用戶歷史點擊文章主題相同的文章,或者字數相差不大的文章。而且留下的文章儘可能是與測試集用戶最後一次點擊時間更接近的文章,或者是當天的文章也行。
  2. 用戶冷啓動
    這裏對測試集中的用戶點擊數據進行分析會發現,測試集中有百分之20的用戶只有一次點擊,那麼這些點擊特別少的用戶的召回是否是能夠單獨作一些策略上的補充呢?或者是在排序後直接基於規則加上一些文章呢?這些均可以去嘗試,這裏沒有提供具體的作法。

注意:

這裏看似和基於embedding計算的item之間類似度而後作itemcf是一致的,可是如今咱們的目的不同,咱們這裏的目的是找到類似的向量,而且尚未出如今log日誌中的商品,再加上一些其餘的冷啓動的策略,這裏須要找回的數量會偏多一點,否則被篩選完以後可能都沒有文章了

# 先進行itemcf召回,這裏不須要作召回評估,這裏只是一種策略
trn_hist_click_df = all_click_df

user_recall_items_dict = collections.defaultdict(dict)
user_item_time_dict = get_user_item_time(trn_hist_click_df)
i2i_sim = pickle.load(open(save_path + 'emb_i2i_sim.pkl','rb'))

sim_item_topk = 150
recall_item_num = 100 # 稍微召回多一點文章,便於後續的規則篩選

item_topk_click = get_item_topk_click(trn_hist_click_df, k=50)
for user in tqdm(trn_hist_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, 
                                                        recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim)
pickle.dump(user_recall_items_dict, open(save_path + 'cold_start_items_raw_dict.pkl', 'wb'))
# 基於規則進行文章過濾
# 保留文章主題與用戶歷史瀏覽主題類似的文章
# 保留文章字數與用戶歷史瀏覽文章字數相差不大的文章
# 保留最後一次點擊當天的文章
# 按照類似度返回最終的結果

def get_click_article_ids_set(all_click_df):
    return set(all_click_df.click_article_id.values)

def cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \
                     user_last_item_created_time_dict, item_type_dict, item_words_dict, 
                     item_created_time_dict, click_article_ids_set, recall_item_num):
    """
        冷啓動的狀況下召回一些文章
        :param user_recall_items_dict: 基於內容embedding類似性召回來的不少文章, 字典, {user1: [item1, item2, ..], }
        :param user_hist_item_typs_dict: 字典, 用戶點擊的文章的主題映射
        :param user_hist_item_words_dict: 字典, 用戶點擊的歷史文章的字數映射
        :param user_last_item_created_time_idct: 字典,用戶點擊的歷史文章建立時間映射
        :param item_tpye_idct: 字典,文章主題映射
        :param item_words_dict: 字典,文章字數映射
        :param item_created_time_dict: 字典, 文章建立時間映射
        :param click_article_ids_set: 集合,用戶點擊過得文章, 也就是日誌裏面出現過的文章
        :param recall_item_num: 召回文章的數量, 這個指的是沒有出如今日誌裏面的文章數量
    """
    
    cold_start_user_items_dict = {}
    for user, item_list in tqdm(user_recall_items_dict.items()):
        cold_start_user_items_dict.setdefault(user, [])
        for item, score in item_list:
            # 獲取歷史文章信息
            hist_item_type_set = user_hist_item_typs_dict[user]
            hist_mean_words = user_hist_item_words_dict[user]
            hist_last_item_created_time = user_last_item_created_time_dict[user]
            hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time)
            
            # 獲取當前召回文章的信息
            curr_item_type = item_type_dict[item]
            curr_item_words = item_words_dict[item]
            curr_item_created_time = item_created_time_dict[item]
            curr_item_created_time = datetime.fromtimestamp(curr_item_created_time)

            # 首先,文章不能出如今用戶的歷史點擊中, 而後根據文章主題,文章單詞數,文章建立時間進行篩選
            if curr_item_type not in hist_item_type_set or \
                item in click_article_ids_set or \
                abs(curr_item_words - hist_mean_words) > 200 or \
                abs((curr_item_created_time - hist_last_item_created_time).days) > 90: 
                continue
                
            cold_start_user_items_dict[user].append((item, score))      # {user1: [(item1, score1), (item2, score2)..]...}
    
    # 須要控制一下冷啓動召回的數量
    cold_start_user_items_dict = {k: sorted(v, key=lambda x:x[1], reverse=True)[:recall_item_num] \
                                  for k, v in cold_start_user_items_dict.items()}
    
    pickle.dump(cold_start_user_items_dict, open(save_path + 'cold_start_user_items_dict.pkl', 'wb'))
    
    return cold_start_user_items_dict
all_click_df_ = all_click_df.copy()
all_click_df_ = all_click_df_.merge(item_info_df, how='left', on='click_article_id')
user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_)
click_article_ids_set = get_click_article_ids_set(all_click_df)
# 須要注意的是
# 這裏使用了不少規則來篩選冷啓動的文章,因此前面再召回的階段就應該儘量的多召回一些文章,不然很容易被刪掉
cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, 
                                              user_last_item_created_time_dict, item_type_dict, item_words_dict, 
                                              item_created_time_dict, click_article_ids_set, recall_item_num)

user_multi_recall_dict['cold_start_recall'] = cold_start_user_items_dict

多路召回合併

多路召回合併就是將前面全部的召回策略獲得的用戶文章列表合併起來,下面是對前面全部召回結果的彙總

  1. 基於itemcf計算的item之間的類似度sim進行的召回
  2. 基於embedding搜索獲得的item之間的類似度進行的召回
  3. YoutubeDNN召回
  4. YoutubeDNN獲得的user之間的類似度進行的召回
  5. 基於冷啓動策略的召回

注意:
在作召回評估的時候就會發現有些召回的效果不錯有些召回的效果不好,因此對每一路召回的結果,咱們能夠認爲的定義一些權重,來作最終的類似度融合

def combine_recall_results(user_multi_recall_dict, weight_dict=None, topk=25):
    final_recall_items_dict = {}
    
    # 對每一種召回結果按照用戶進行歸一化,方便後面多種召回結果,相同用戶的物品之間權重相加
    def norm_user_recall_items_sim(sorted_item_list):
        # 若是冷啓動中沒有文章或者只有一篇文章,直接返回,出現這種狀況的緣由多是冷啓動召回的文章數量太少了,
        # 基於規則篩選以後就沒有文章了, 這裏還能夠作一些其餘的策略性的篩選
        if len(sorted_item_list) < 2:
            return sorted_item_list
        
        min_sim = sorted_item_list[-1][1]
        max_sim = sorted_item_list[0][1]
        
        norm_sorted_item_list = []
        for item, score in sorted_item_list:
            if max_sim > 0:
                norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0
            else:
                norm_score = 0.0
            norm_sorted_item_list.append((item, norm_score))
            
        return norm_sorted_item_list
    
    print('多路召回合併...')
    for method, user_recall_items in tqdm(user_multi_recall_dict.items()):
        print(method + '...')
        # 在計算最終召回結果的時候,也能夠爲每一種召回結果設置一個權重
        if weight_dict == None:
            recall_method_weight = 1
        else:
            recall_method_weight = weight_dict[method]
        
        for user_id, sorted_item_list in user_recall_items.items(): # 進行歸一化
            user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list)
        
        for user_id, sorted_item_list in user_recall_items.items():
            # print('user_id')
            final_recall_items_dict.setdefault(user_id, {})
            for item, score in sorted_item_list:
                final_recall_items_dict[user_id].setdefault(item, 0)
                final_recall_items_dict[user_id][item] += recall_method_weight * score  
    
    final_recall_items_dict_rank = {}
    # 多路召回時也能夠控制最終的召回數量
    for user, recall_item_dict in final_recall_items_dict.items():
        final_recall_items_dict_rank[user] = sorted(recall_item_dict.items(), key=lambda x: x[1], reverse=True)[:topk]

    # 將多路召回後的最終結果字典保存到本地
    pickle.dump(final_recall_items_dict, open(os.path.join(save_path, 'final_recall_items_dict.pkl'),'wb'))

    return final_recall_items_dict_rank
# 這裏直接對多路召回的權重給了一個相同的值,其實能夠根據前面召回的狀況來調整參數的值
weight_dict = {'itemcf_sim_itemcf_recall': 1.0,
               'embedding_sim_item_recall': 1.0,
               'youtubednn_recall': 1.0,
               'youtubednn_usercf_recall': 1.0, 
               'cold_start_recall': 1.0}
               
               
# 最終合併以後每一個用戶召回150個商品進行排序
final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150)

總結

上述實現了以下召回策略:

  1. 基於關聯規則的itemcf
  2. 基於關聯規則的usercf
  3. youtubednn召回
  4. 冷啓動召回

對於上述實現的召回策略其實都不是最優的結果,咱們只是作了個簡單的嘗試,其中還有不少地方能夠優化,包括已經實現的這些召回策略的參數或者新加一些,修改一些關聯規則均可以。固然還能夠嘗試更多的召回策略,好比對新聞進行熱度召回等等。

Reference

  1. 重讀Youtube深度學習推薦系統論文,字字珠璣,驚爲神文
  2. YouTube深度學習推薦系統的十大工程問題
  3. YouTubeDNN原理
  4. Word2Vec知乎衆贊文章 — word2vec放到排序中的w2v的介紹部分
相關文章
相關標籤/搜索