#研發解決方案介紹#Recsys-Evaluate(推薦評測)

關鍵詞: recsys 、推薦評測、Evaluation of Recommender System、piwik、flume、kafka、storm、redis、mysql
本文檔適用人員:研發
推薦系統可不只僅是圍着推薦算法打轉
先明確一下,咱們屬於工業領域。不少在學術論文裏行之有效的新特奇算法,在工業界是行不通的。當年咱們作語義聚合時,分詞、聚類、類似性計算、實體詞識別、情感分析等領域最終還都採用了工業界十幾年前乃至於幾十年前就流行的成熟算法。若是算法不能決定命運,那什麼是關鍵呢?
算法+規則庫+人工干預(整理語料、標識、調參數等),大都是髒活兒累活兒。
或者叫, 特徵+算法+人工干預,用 特徵縮小數據範圍或降維。
我在2009年曾經 寫道
在語義的世界裏,能夠近似地說:萬事萬物都是特徵提取。你只要找到特徵,事情就好辦。……
……你指望畢其功於一役嗎?天然語言處理的真實應用裏是很難有什麼場景找到一個通吃特徵的。都是一層一層特徵疊加的。
一層特徵去掉一部分垃圾數據。如此反覆,終成正果。注意方法論。
梁斌在2012年微博說道:
統計粗且糙,乃大錘。規則細而精,乃小錘。先大場後細棋。
規則庫怎麼來的? 得建設一些方便觀測的外圍系統,才能發現特徵、創建規則、調整參數、觀察效果。因此與此相似,作了推薦服務後,就須要推薦效果評測了。
推薦評測應用場景
電商推薦場景下有很是明確的指標:
  1. 推薦位展現次數商品投放次數
  2. 推薦位展現點擊率商品投放點擊率
  3. 最重要的是下單轉化率成單轉化率(或叫支付轉化率)這兩個硬指標。
那麼推薦評測系統應具有的功能有:
  1. 實時(至少是近乎實時)統計幾個展現性指標
    • 區分網站端和移動客戶端的推薦展現效果
    • 進一步區分不一樣客戶端,如 iOS 和 Android
  2. 數據概覽
  3. 按推薦位類型或推薦算法概括各類指標
    • 看了又看
    • 瀏覽過該商品的用戶購買了
    • 您可能對如下商品感興趣(猜你喜歡)
    • 商品周邊商品(注:只能是本地生活服務類商品)
    • 簽到彈窗推薦
    • 商品附近門店
    • 門店周邊美食
    • 附近吃喝玩樂
    • ……
  4. 常見的評測推薦效果的兩種實驗方法
    1. 離線試驗:
      • 作法:從日誌系統中取得用戶的行爲數據,而後將數據集分紅訓練數據和測試數據,好比80%的訓練數據和20%的測試數據(還能夠交叉驗證),而後在訓練數據集上訓練用戶的興趣模型,在測試集上進行測試
      • 優勢:它不須要實際用戶的交互
      • 缺點:離線實驗只能評測一個很狹窄的數據集切面,主要是關於算法預測或者評估的準確性
      • 目的:提早過濾掉性能較差的算法
    2. AB測試:
      • 作法:經過必定的規則把用戶隨機分紅幾組,並對不一樣組的用戶採用不一樣的推薦算法,這樣的話可以比較公平地得到不一樣算法在實際在線時的一些性能指標
  5. 推薦服務接口測試界面
    • 暴露出來,讓咱們手工就能夠提交,看看效果
推薦評測技術選型
說到實時日誌聚合和處理,還得是 flume+kafka+storm,因此技術選型是:
Piwik+Flume+Kafka+Storm+Redis+MySQL
推薦評測數據流轉流程
  1. 數據上報:——Piwik
    1. 主站自己部署了 開源流量統計系統 Piwik,因此在網頁的各類推薦位上按規則埋點便可
      • 實例:「瀏覽過該商品的用戶還購買了」推薦欄第一位商品的a元素增長了wwe屬性:wwe="t:goods,w:rec,id:ae45c145d1045c9d51c270c066018685,rec:101_01_103"
    2. 瀏覽器徹底加載完成後,Piwik JavaScript 會向服務器端發送埋點數據
    3. Piwik 服務器端收到後,寫磁盤日誌文件
  2. 數據採集:——Flume
    1. Piwik 集羣的每一臺服務器上都部署了Flume Agent
    2. agent 會向推薦數據收集 Flume 集羣 推送日誌,譬如配置爲每增長一行日誌就推送,或每5分鐘推送一次
    3. 手機客戶端的埋點日誌則存放在無線服務器端的 MySQL 中,因此咱們用腳本每分鐘讀取一次數據,放到 flume 的監控目錄下
  3. 數據接入:——Kafka
    1. 因爲數據採集速度和數據處理速度不必定匹配,所以添加一個消息中間件Linkedin Kafka 做爲緩衝
    2. 數據流轉方式爲 Flume Source--<Flume Channel--<Flume Sink,那麼咱們寫一個Kafka Sink 做爲消息生產者,將 sink 從 channel 裏接收到的日誌數據發送給消息消費者
  4. 流式計算:——Storm
    1. Storm 負責對採集到的數據進行實時計算
    2. Storm Spout 負責從外部系統不間斷地讀取數據,並組裝成 tuple 發射出去,tuple 被髮射後在 Topology 中傳播
    3. 因此咱們要寫一個Kafka Spout 做爲消息消費者拉日誌數據
    4. 再寫些 Storm Bolt 處理數據
      • ,一個Topology的結構示意圖
  5. 數據輸出:——Redis
    1. Storm Bolt 實時分析數據以後,將統計結果寫入 Redis
  6. 數據統計:——MySQL
    1. 評測系統實時數據直接從 Redis 中讀取,並查詢主站數據庫追蹤成單狀況,同步到 MySQL 中,做爲報表展現數據源
簡而言之,數據按以下方式流轉:
  1. piwik javascript
  2. piwik servers
  3. flume agent
  4. 自定義 kafka sink
  5. 自定義 kafka spout
  6. 自定義 storm bolt
  7. redis
  8. 評測系統計算
  9. mysql
  10. 評測系統報表展現
Flume+Kafka+Storm常見問題
雖然咱們的實時流量統計和推薦評測系統均採用了 flume+kafka+storm 方案,但要注意這個方案也有一些小坑。下面摘錄一些 第三方的結論
  • 若是配置爲每新增一條日誌就採集,那麼 flume 到 kafka 的實時數據可能會因爲單條過快,形成 storm spout 消費 kafka 消息速率跟不上。延時能夠是數據發射到 stream 中後進行 hbase 的計算操做引發的(注:hbase 的性能確實堪憂,不適合這種實時數據處理,尤爲是加了較多索引以後);
    • 可參考的一個數據:storm 單條流水線的處理能力大約爲 20000 tupe/s (每一個tuple大小爲1000字節);
    • tuple 過多,會因爲 kafka 的 message 須要 new String() 進行獲取,會報 gc 的異常;
    • tuple 在 stream 中的大量堆積,形成超時自動回調 fail() 的函數;
    • 能夠進行多 tuple 結構的優化,把多個 log 打包成一個 tuple
    • 就通常狀況而言,單條發射能扛得住
Kafka Sink 消息生產者代碼片斷
KafkaSink.java
import kafka.javaapi.producer. Producer;
……
public class KafkaSink extendsAbstractSink implements Configurable {
……
private Producerbyte[]< producer;
……
@ Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try{
tx.begin();
Evente = channel.take();
if (e == null) {
tx.rollback();
returnStatus.BACKOFF;
}
producer.send(newKeyedMessage< span style='font-size:12px;font-style:normal;font-weight:bold;color:rgb(255, 0, 0);' >String, byte[]<(topic, e.getBody()));
tx.commit();
returnStatus.READY;
} catch ( Exception e) {
Kafka Spout 消息消費者代碼片斷
spout 有多個,咱們挑 kafka spout 看下。
KafkaSpout.java
public abstract class KafkaSpout implementsIRichSpout {
……
@ Override
public void activate() {
……
for( final KafkaStream stream : streamList) {
executor.submit(new Runnable() {
@ Override
public void run() {
ConsumerIterator< span style='font-size:12px;font-style:normal;font-weight:normal;color:rgb(0, 112, 192);' >byte[], byte[]< iterator = stream.iterator();
while (iterator.hasNext()) {
if(spoutPending.get() < span>
sleep(1000);
continue;
}
MessageAndMetadata< span style='font-size:12px;font-style:normal;font-weight:normal;color:rgb(0, 112, 192);' >byte[], byte[]< next = iterator.next();
byte[] message = next.message();
List< span style='font-size:12px;font-style:normal;font-weight:normal;color:rgb(255, 0, 0);' >Object< tuple = null;
try{
tuple = generateTuple(message);
} catch(Exception e) {
e.printStackTrace();
}
if (tuple == null|| tuple.size() != outputFieldsLength) {
continue;
}
collector.emit(tuple);
spoutPending.decrementAndGet();
}
}
Storm Bolt 代碼片斷
有多個自定義 bolt,挑一個看下。
EvaluateBolt.java
public classEvaluateBolt extendsBaseBasicBolt {
……
@ Override
public void execute(Tuple input, BasicOutputCollector collector) {
……
if (LogWebsiteSpout.PAGE_EVENT_BROWSE.equals(event)) {
if (LogWebsiteSpout.PAGE_TYPE_GOODS.equals(pageType)) {
incrBaseStatistics(baseKeyMap, BROWSE_ALL, 1);
} else if (LogWebsiteSpout.PAGE_TYPE_PAY1.equals(pageType)) {
incrBaseStatistics(baseKeyMap, ORDER_ALL, 1);
}
String recDisplay = input.getStringByField(LogWebsiteSpout.FIELD_REC_DISPLAY);
recDisplayStatistics(recDisplay, time, pageType, baseKeyMap);
} else if (LogWebsiteSpout.PAGE_EVENT_CLICK.equals(event)) {
String recType = input.getStringByField(LogWebsiteSpout.FIELD_REC_TYPE);
評測指標定義:
  • 投放點擊率:推薦瀏覽量/推薦商品投放量
  • 展示點擊率:推薦瀏覽量/推薦位展示次數
  • 推薦展現率::推薦位展現次數/總瀏覽量
  • 推薦瀏覽量:經由推薦產生的瀏覽量
  • 推薦商品投放量:推薦位投放的推薦商品數量(如:用戶瀏覽A商品,那在瀏覽或購買推薦位產生的推薦商品爲5個,則推薦商品投放量+5)
  • 推薦位展示次數:若是推薦位有推薦商品並展現,計數+1
-over-
相關文章
相關標籤/搜索