歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~python
1、數據源nginx
一、類似人羣數據存在TDW庫中,數據字典說明:算法
CREATE TABLE sim_people_tdw_tbl( uid STRING COMMENT 'reader id', sim_uids STRING COMMENT 'sim_uids', sim_num BIGINT COMMENT 'sim_num', update_date STRING COMMENT 'update_date' )
字段 | 類型 | 含義 |
---|---|---|
uid | string | 用戶標識 |
sim_uids | string | 與uid喜愛類似的人羣,格式爲用戶編號:相同閱讀量,類似用戶之間以逗號分隔 |
sim_num | BIGINT | 類似人羣的人數 |
update_date | string | 數據日期 |
二、基礎用戶畫像存在MongoDB中mongodb
基礎用戶畫像數據庫
字段 | 含義 |
---|---|
_id | 用戶id |
profile(離線)positive(實時) | 用戶正畫像(喜歡),每一個維度以分號間隔,每一個子維度以逗號間隔,值格式爲key_id:weight,維度含義依次爲一級分類、二級分類、關鍵字、topic、閱讀來源 |
negative | 負畫像(不喜歡),其餘字段的含義與正畫像同樣 |
update_time | 更新時間 |
cityCode或city | 城市編碼 |
三、類似人羣畫像也存在MongoDB中微信
2、總體思路app
因爲TESLA集羣沒法直接操做MongoDB,須要將TDW裏面的用戶畫像數據,經過洛子系統導出至HDFS,再與MongoDB中原有羣畫像進行合併。python2.7
總體流程ui
3、算法流程
算法流程圖
4、核心代碼
#! /usr/bin/python2.7 # -*- coding: utf8 -*- import decimal import time import math import sys import os import param_map from pymongo.collection import Collection from decimal import Decimal import datetime reload(sys) sys.setdefaultencoding("utf-8") sys.path.append("../") from utils import mongoUtils, confUtils decimal.getcontext().prec = 6 BATCH_NUM = 100000 now_time = datetime.datetime.now() delta = datetime.timedelta(days=30) delta30 = now_time - delta time_limit = int(time.mktime(delta30.timetuple())) print(time_limit) def split_uid_similarity(uid_num_str): """ 拆分uid和類似度,並分別返回 :param uid_num_str: :return:uid,類似度 """ uid_num = uid_num_str.split(":") return uid_num[0], float(uid_num[1]) def split_uid_sim_user(user_hd): """ 拆分uid和類似人羣,並分別返回 :param user_hd: :return: uid,類似人羣 """ uid_sim_user = user_hd.strip().split("\t") return uid_sim_user[0], uid_sim_user[1] def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str): """ :param dimension_profile: :param min_i: :param max_i: :param limit: :param cluster_profile_str: :return: 返回前limit個特徵標籤,並對特徵權重進行映射 """ if len(dimension_profile) != 0: # 先排序 dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True) # 再對前limit條記錄進行映射 size = limit if len(dimension_profile) > limit else len(dimension_profile) for i in range(size): tag = dimension_profile[i] tag_id = tag[0] tag_value = tag[1] tag_value = max_i if tag_value > max_i else tag_value if tag_value >= min_i: cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + "," if len(dimension_profile) != 0: # 假如長度不爲0,將最後一個逗號刪掉 cluster_profile_str = cluster_profile_str[:-1] return cluster_profile_str def cluster_profile_dic2list(cluster_profile, dimension_param_dic): """ 類似用戶羣畫像閾值過濾,dic->list :param dimension_param_dic: 維度閾值 :return: 類似用戶羣特徵list :param cluster_profile:羣體畫像 """ cluster_profile_str = "" if len(cluster_profile) == 0: return None for key, dimension_profile in cluster_profile.items(): # 取出維度的閾值 dimention_param = dimension_param_dic.get(str(key)) if dimention_param is not None: min_i = dimention_param.get("min") max_i = dimention_param.get("max") limit = dimention_param.get("limit") if dimension_profile is not None: cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str) # values爲不爲None 都須要追加一個分號 cluster_profile_str = cluster_profile_str + ";" cluster_profile_list = cluster_profile_str[:-1].split(";") return cluster_profile_list def sim_users_dic2list(cluster_dic, sim_users_max_size): """ # 類似人羣數量限制,dic->list :param sim_users_max_size: 類似人羣的最大值 :type cluster_dic: 字典表 :param cluster_dic:類似人羣字典表 :return: 類似度最高的類似人羣 """ user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True) sim_users_s = "" i = 0 new_cluster_dic = {} for i in range(len(user_similarity_list)): if i < sim_users_max_size: user_similarity = user_similarity_list[i] key = user_similarity[0] value = user_similarity[1] new_cluster_dic[key] = value sim_users_s = sim_users_s + key + ":" + str(value) + "," else: break i = i + 1 sim_users_list = sim_users_s[:-1].split(",") return sim_users_list, new_cluster_dic class ClusterProfileComputer(object): cf = confUtils.getConfig("../conf/setting.conf") def __init__(self, environment): self.xw_database, self.xw_client = mongoUtils.getMongodb("XW") self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC") self.om_database, self.pac_client = mongoUtils.getMongodb("OM") item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH" self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item) self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor") self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size") self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low") self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high") @staticmethod def basic_cursor2dic(platform, mongodb_cursor): """ mongodb取出的基礎畫像存到字典表 :param platform: 平臺 :param mongodb_cursor: :return: """ users_profile_map = {} for user_profile in mongodb_cursor: _uid = user_profile["name"] if platform == "PAC" else user_profile["_id"] users_profile_map[_uid] = user_profile return users_profile_map @staticmethod def get_sim_users_profile(all_users_profile, users_similarity): """ :param all_users_profile: :param users_similarity: :return:類似人羣的畫像 """ rs = [] for uid_similarity in users_similarity: uid, similarity = split_uid_similarity(uid_similarity) profile = all_users_profile.get(uid) if profile is not None: rs.append(profile) return rs def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection): # type: (list, int) -> dict """ :return: 平臺基礎畫像 :param platform: 平臺 :return: 基礎畫像字典表 :param profile_collection: 數據庫集合 :param all_uid:用戶的編號列表 :type batch_num: int """ rs = {} # 數據庫查詢全部人羣用戶畫像,此畫像中沒有類似人羣 for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))): key = "name" if platform == "PAC" else "_id" cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}}, {"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True) rs.update(self.basic_cursor2dic(platform, cursor)) cursor.close() return rs def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection): users = open(path) all_uid_list = [] uid_sim_map = {} # uid_sim_map["1_291083852"] = ["1_757155427:8"] for user_str in users: # 從hdfs中取出udi的類似人羣 uid_hf, sim_users_hd = split_uid_sim_user(user_str) uid_sim_map[uid_hf] = sim_users_hd.split(",") all_uid_list.append(uid_hf) print("uid_sim_map : %d" % len(uid_sim_map)) # 數據庫查詢全部用戶的基礎畫像,此畫像中沒有類似人羣 platform_basic_profile_dic = {} xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection) platform_basic_profile_dic["XW"] = xw_users_basic_profile_map pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection) platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection) platform_basic_profile_dic["OM"] = om_users_basic_profile_map # print("dump basic profile %d records" % len(pac_all_users_profile_map)) # 數據庫查詢類似人羣畫像 cluster_profile_collection = self.xw_database.get_collection( param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"]) # type: Collection old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection, BATCH_NUM) print("dump cluster profile %d records" % len(old_cluster_profile_map)) #index = 0 for uid, sim_users_list in uid_sim_map.items(): print ("uid = %s" % uid) # 合併新老類似人羣,並使用衰減因子來計算類似度 users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low, self.similarity_high, old_cluster_profile_map) # 類似人羣---->將字典錶轉化爲list,存儲mongodb sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size) print("similar people len: %d" % len(sim_users_list)) platform_cluster_profile_list = [] for platform_name, platform_basic_profile in platform_basic_profile_dic.items(): # 取出用戶i類似人羣的畫像 sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list) cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list, users_similarity_dic) # 結果區間映射,類似人羣畫像特徵----->字典錶轉list,便於存儲mongodb cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM) platform_cluster_profile_list.append(cluster_profile_list) xw_cluster_profile = platform_cluster_profile_list[0] pac_cluster_profile = platform_cluster_profile_list[1] om_cluster_profile = platform_cluster_profile_list[2] old_profile = cluster_profile_collection.find_one({"_id": uid}) if old_profile is None: create_time = int(time.time()) else: create_time = old_profile["create_time"] document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile, "pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile, "create_time": create_time, "update_time": int(time.time())}) cluster_profile_collection.save(document) #if index >= 100: # break #index = index + 1 print("end") users.close() def run(self): # 類似人羣HDFS xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"]) pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"]) om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"]) for dir_path, dir_names, file_names in os.walk(self.sim_users_path): print(dir_names) for file_name in file_names: if "attempt_" in file_name: print(file_name) path = os.path.join(dir_path, file_name) self.compute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection) def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio): """ 將user指定維度的特徵累加到羣畫像 :param cluster_dimension_feature:羣畫像某個維度的特徵 :param user_dimension:用戶某個維度的特徵 :param ratio:user的權重,公式爲類似度/(類似度+10),區間爲(1/3,10/11) :return:指定維度的羣畫像 """ if user_dimension != "": user_feature_list = user_dimension.split(",") for feature in user_feature_list: atom = feature.split(":") if len(atom) == 2: k = atom[0] w = atom[1] if cluster_dimension_feature.get(k) is None: cluster_dimension_feature[k] = Decimal(w) * ratio else: cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k]) return cluster_dimension_feature def dump_cluster_profile_history(self, all_uid, collection, batch_num): rs = {} for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))): cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}}, no_cursor_timeout=True) rs.update(cluster_cursor2dic(cursor)) cursor.close() return rs def cluster_cursor2dic(mongodb_cursor): """ mongodb取出的人羣畫像存到字典表 :param mongodb_cursor: :return: """ users_profile_map = {} for user_profile in mongodb_cursor: _uid = user_profile["_id"] users_profile_map[_uid] = user_profile return users_profile_map def merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic): """ 合併類似人羣 :param similarity_low: 類似度最低值 :param similarity_high: 類似度最高值 :param uid_hdf: 用戶編號 :param sim_users_new: 最新的類似用戶 :param decay_factor: 衰減因子 :param old_cluster_profile_dic:老羣體畫像 :return:最新的類似人羣 """ cluster_union_dic = {} # 提取uid和類似度到字典表 for user_similarity in sim_users_new: _uid, similarity = split_uid_similarity(user_similarity) cluster_union_dic[_uid] = similarity # 從mongodb中讀取老畫像 old = old_cluster_profile_dic.get(uid_hdf) if old is not None: sim_users_old = old['sim_users'] for uid_similarity_old in sim_users_old: uid_similarity_old_list = uid_similarity_old.split(":") if len(uid_similarity_old_list) == 2: sim_uid_old = uid_similarity_old_list[0] try: weight_old = float(uid_similarity_old_list[1]) * float(decay_factor) except IndexError: pass else: if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old >= similarity_low): cluster_union_dic[sim_uid_old] = weight_old else: weight_new = weight_old + cluster_union_dic[sim_uid_old] if weight_new > similarity_high: weight_new = similarity_high if weight_new < similarity_low: del cluster_union_dic[sim_uid_old] else: cluster_union_dic[sim_uid_old] = weight_new return cluster_union_dic def cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic): # type: (String, list, dic) -> dic """ 類似人羣特徵計算 :param platform:平臺 :param sim_users_profile_array: 從mongodb中查出來的類似人羣的畫像 :param sim_users_dic: 類似人羣的類似度字典表 :return: 類似人羣畫像字典表 """ cluster_profile_rs = {} for sim_user_obj in sim_users_profile_array: key = "name" if platform == "PAC" else "_id" sim_user_id = sim_user_obj.get(key) # 獲取兩兩用戶的類似度 similarity = sim_users_dic.get(sim_user_id) if similarity is not None: sim_num = Decimal(similarity) # 用戶對應的權重 rate = Decimal(sim_num / (10 + sim_num)) # 取出某一我的的畫像 profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else "" dimension_list = profile.split(";") i = 0 for u_dimension in dimension_list: # 獲取羣體維度i的特徵 dimension_feature = cluster_profile_rs.get(i) if dimension_feature is None: dimension_feature = {} # 更新維度i的特徵 cluster_profile_rs[i] = accumulate_dimension_profile(dimension_feature, u_dimension, rate) i = i + 1 return cluster_profile_rs if __name__ == "__main__": if len(sys.argv) == 2: env = sys.argv[1] else: env = "local" computer = ClusterProfileComputer(env) computer.run()
問答
相關閱讀
此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/developer/article/1159230?fromSource=waitui
歡迎你們前往騰訊雲+社區或關注雲加社區微信公衆號(QcloudCommunity),第一時間獲取更多海量技術實踐乾貨哦~
海量技術實踐經驗,盡在雲加社區!