本文由京東搜索算法架構團隊分享,主要介紹 Apache Flink 在京東商品搜索排序在線學習中的應用實踐。文章的主要大綱以下:算法
一、背景緩存
二、京東搜索在線學習架構網絡
三、實時樣本生成多線程
四、Flink Online Learning架構
五、監控系統併發
六、規劃總結框架
在京東的商品搜索排序中,常常會遇到搜索結果多樣性不足致使系統非最優解的問題。爲了解決數據馬太效應帶來的模型商品排序多樣性的不足,咱們利用基於二項式湯普森採樣建模,可是該算法仍存在對全部用戶採用一致的策略,未有效考慮用戶和商品的個性化信息。基於該現狀,咱們採起在線學習,使深度學習和湯普森採樣融合,實現個性化多樣性排序方案,實時更新模型的關參數。機器學習
在該方案中,Flink 主要應用於實時樣本的生成和 online learning 的實現。在在線學習過程當中,樣本是模型訓練的基石,在超大規模樣本數據的處理上,咱們對比了 Flink、Storm 和 Spark Streaming 以後,最終選擇用 Flink 做爲實時樣本流數據的生產以及迭代 online learning 參數的框架。在線學習的總體鏈路特別長,涉及在線端特徵日誌、流式特徵處理、流式特徵與用戶行爲標籤關聯、異常樣本處理、模型動態參數實時訓練與更新等環節,online learning 對樣本處理和參數狀態處理的準確性和穩定性要求較高,任何一個階段都有可能出現問題,爲此咱們接入京東的 observer 體系,擁有完整的全鏈路監控系統,保證各個階段數據的穩定性和完整性;下面咱們首先介紹一下京東搜索在線學習架構。異步
京東搜索的排序模型系統架構主要包括如下幾個部分:分佈式
一、Predictor 是模型預估服務,在 load 模型中分爲 static 部分和 dynamic 部分,static 部分由離線數據訓練獲得,主要學習 user 和 doc 的稠密特徵表示,dynamic 部分主要包含 doc 粒度的權重向量,這部分由實時的 online learning 任務實時更新。 二、Rank 主要包括一些排序策略,在排序最終結果肯定以後,會實時落特徵日誌,將 doc 的特徵按順序寫入特徵數據流,做爲後續實時樣本的數據源(feature)。 三、Feature Collector 的任務是承接在線預估系統發出的特徵數據,對下游屏蔽緩存、去重、篩選等在線系統特有邏輯,產出 Query+Doc 粒度的特徵流。 四、Sample join 的任務將上面的 feature 數據、曝光、點擊、加購、下單等用戶行爲標籤數據做爲數據源,經過 Flink 的 union + timer 數據模型關聯成爲符合業務要求的樣本數據,算法可根據目標需求選擇不一樣的標籤做爲正負樣本標記。 五、**Online learning **任務負責消費上游生成的實時樣本作訓練,負責更新 model 的 dynamic 部分。
Online Learning 對於在線樣本生成的時效性和準確性都有很高的要求,同時也對做業的穩定性有很高的要求。在海量用戶日誌數據實時涌入的狀況下,咱們不只要保證做業的數據延時低、樣本關聯率高且任務穩定,並且做業的吞吐不受影響、資源使用率達到最高。
京東搜索排序在線樣本的主要流程以下:
一、數據源大體有曝光流、feature 流和用戶行爲流等做爲實時樣本的數據源,統一以 JDQ 管道流的形式,由京東實時計算平臺提供平臺支撐。 二、接到 feature 流和曝光流、label 流後,進行數據清洗,獲得任務須要的數據格式。 三、拿到各個標準流後,對各個流進行 union 操做,以後進行 keyby。 四、咱們在 process function 裏面添加 Flink timer 定時器,做爲樣本生成的實時窗口。 五、將生成的樣本實時落入 jdq 和 HDFS,jdq 能夠用做後面的 online learning 的 input,HDFS 持久存儲樣本數據,用於離線訓練、增量學習和數據分析。
在線樣本任務優化實踐:
京東搜索樣本數據吞吐量每秒達到 GB 規模,對分佈式處理分片、超大狀態和異常處理提出很高的優化要求。
使用 keyby 的時候,不免會有數據傾斜的狀況,這裏咱們假設 key 設計合理、 shuffle 方式選擇正確、任務沒有反壓且資源足夠使用,因爲任務 parallelism 設置致使的數據傾斜的狀況。咱們先看 Flink 裏面 key 是如何被分發到 subtask 上面的。
keygroup = assignToKeyGroup(key, maxParallelism) subtaskNum = computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupId)複製代碼
假設咱們的併發設置的是 300,那麼 maxParallelism 就是 512,如此設計,必然致使有的 subtask 分佈 1 個 keygroup 有的分配兩個,同時也致使了數據天然傾斜。針對上述問題,有兩個解決方案:
● 設置並行度爲 2 的 n 次方; ● 設置最大並行度爲 並行度的 n 倍。
若是使用方案 1 ,調整併發的話只能調整 2 的冪次,建議使用方案 2,且假如 parallelism 爲 300,maxParallelism 設置爲 1200 的狀況下假如數據仍是有傾斜,能夠再相應的把 maxParallelism 設置大一些保證每一個 keygroup 的 key 少一些,如此也能夠下降數據傾斜的發生。
在線樣本用到了 Flink 的 state,咱們以前默認將 state 放到了內存裏面,可是隨着放量的增長,state 數據量激增,發現 GC 時間特別長,以後改變策略,將 state 放入了 RocksDB,GC 問題得以解決。咱們針對 checkpoint 作了以下配置:
● 開啓增量 checkpoint; ● 合理設置 checkpoint 的超時時間、間隔時間和最小暫停時間。
● 讓 Flink 本身管理 RocksDB 佔用的內存,對 RocksDB 的 blockcache、writebuffer 等進行調優。 ● 優化 state 的數據使用,將 state 數據放入多個 state object 裏面使用,下降序列化/反序列化的代價。
在任務調優的時候咱們發現咱們的任務訪問 RocksDB 的時間很是長,查看 jstack 發現,不少線程都在等待數據的序列化和反序列化,隨着算法特徵的逐漸增多,樣本中的特徵個數超過 500 個,使得每條數據的量級愈來愈大。可是在作樣本關聯的時候實際上是不須要特徵關聯的,只須要相應的主鍵關聯就能夠了,所以,咱們用 ValueState 存儲主鍵,用 MapState/ListState 存儲特徵等值。固然了還能夠將這些特徵值存儲到外部存儲裏面,這裏就須要對網絡 io 和 本地 io 之間的選擇作一個取捨。
● failure recovery 的時候開啓本地恢復。
因爲咱們的 checkpoint 數據達到了 TB 級別,一旦任務發生 failover,不論是針對 HDFS 仍是針對任務自己,壓力都很是大,所以,咱們優先使用本地進行 recovery,這樣,不只能夠下降 HDFS 的壓力還能夠增長 recovery 的速度。
對於 online learning,咱們先介紹一下伯努利湯普森採樣算法,假設每一個商品的 reward 機率服從 Beta 分佈,所以給每一個商品維護兩個參數成功次數 si 及失敗次數 fi,及全部商品的公共先驗參數成功次數 α 和失敗次數 β。
每次根據商品相應的 Beta 分佈採樣爲最優商品的指望 reward: Q(at) = θi,並選擇指望 reward 最大的商品展示給用戶。最後根據環境給出真實 reward,更新模型相應的參數達到 online learning 的效果。該參數表明一個商品特徵,用一個 n 維向量表示,該向量由原始特徵經過 MLP 網絡預測獲得。原始特徵通過 DNN 網絡獲得一個 N 維向量做爲該商品的個性化表徵,採用 Logistic Regression 函數建模似然函數,利用 Flink 構建該表徵和實時反饋所組成的實時樣本,用於不斷迭代近似更新參數分佈。
從 jdq 接過實時樣本以後,因爲以前並無保證數據的有序性,這裏採用 watermark 機制保證數據的有序性。
把只曝光無行爲的商品看作負樣本,有點擊及後續行爲的商品看作正樣本,當窗口將達到必定正負比例或數據量時進行一次 batch 訓練,迭代出新的參數向量,將商品 embedding 數據放到 Flink 的 state 裏面,以後做爲 model 的 dynamic 部分更新參數。
個性化 ee 參數在線學習採用異步更新方式的時候,存在參數更新順序錯亂問題,這會下降在線學習模型收斂速度,從而形成了流量的浪費,所以,參數異步更新方式更改成同步更新方式,避免參數讀寫錯亂問題。在同步更新的方式下,存儲在 status 中的參數向量須要在下一次訓練迭代時使用,若參數發生丟失會使該商品的迭代過程當中斷,爲防止系統風險形成參數丟失,設計了參數雙重保障。通常的任務異常或重啓後參數可從 checkpoint 或 savepoint 中恢復,若是意外狀況下參數沒法恢復,從遠程在線服務中取回上一版參數並記錄到 state。
在線學習任務使用同一個 Flink 任務來支持多個版本模型在不一樣實驗桶下進行 AB 實驗,經過版本號區分不一樣的 AB 流量桶,對應的實時樣本以 docid+version 做爲 key 進行處理,迭代過程互不影響。
爲了提升帶寬利用率以及性能的需求,咱們內部採用 pb 格式傳輸數據,通過調研,pb 的傳輸格式優於 Flink 的兜底的 general class 的 kryo 序列化方式,所以咱們採用了 Flink 的 custom serialization 解決方案,直接用 pb 格式在 op 之間傳輸數據。
這裏咱們區分業務全鏈路監控和任務穩定性相關監控,具體狀況下面將詳細介紹。
整個系統使用京東內部的 observer 平臺來實現業務全鏈路監控,主要包括 predictor 服務相關的監控、feature dump 的 QPS 監控、特徵和標籤質量監控、關聯狀況監控、train 相關的監控以及 AB 指標相關的一些監控,以下:
任務穩定性監控這裏主要是指 Flink 的任務穩定性監控,鏈路吞吐量達 GB/s規模,特徵消息 QPS 達 10W 規模,且 online learning 的不可間斷性,無論對於在線樣本任務仍是 online learning 的任務,相關監控報警都是必不可少的。
■ 容器的內存、cpu 監控、thread 個數,gc 監控
■ 樣本相關業務監控
Flink 在實時數據處理方面有優秀的性能、容災、吞吐等表現、算子豐富易上手使用、天然支持批流一體化,且目前已有在線學習的框架開源,作在線學習是個不二的選擇,隨着機器學習數據規模的擴大和對數據時效性、模型時效性要求的提高,在線學習不只僅做爲離線模型訓練的補充,更成爲模型系統效率發展的趨勢。爲此咱們作的規劃以下:
做者致謝:感謝實時計算研發部、搜索排序算法團隊的支持。