友信金服公司推行全域的數據體系戰略,經過打通和整合集團各個業務線數據,利用大數據、人工智能等技術構建統一的數據資產,如 ID-Mapping、用戶標籤等。友信金服用戶畫像項目正是以此爲背景成立,旨在實現「數據驅動業務與運營」的集團戰略。目前該系統支持日處理數據量超 10 億,接入上百種合規數據源。算法
做者 | 楊毅,穆超峯,賀小兵,胡夕數據庫
導讀:當此生活節奏日益加快,企業面對不斷增長的海量信息,其信息篩選和處理效率低下的困擾與日俱增。因爲用戶營銷不夠細化,企業 App 中許多不合時宜或不合偏好的消息推送很大程度上影響了用戶體驗,甚至引起了用戶流失。在此背景下,友信金服公司推行全域的數據體系戰略,經過打通和整合集團各個業務線數據,利用大數據、人工智能等技術構建統一的數據資產,如 ID-Mapping、用戶標籤等。友信金服用戶畫像項目正是以此爲背景成立,旨在實現「數據驅動業務與運營」的集團戰略。目前該系統支持日處理數據量超 10 億,接入上百種合規數據源。
傳統基於 Hadoop 生態的離線數據存儲計算方案已在業界大規模應用,但受制於離線計算的高時延性,愈來愈多的數據應用場景已從離線轉爲實時。這裏引用一張表格對目前主流的實時計算框架作個對比。後端
Apache Storm 的容錯機制須要對每條數據進行應答(ACK),所以其吞吐量備受影響,在數據大吞吐量的場景下會有問題,所以不適用此項目的需求。瀏覽器
Apache Spark 整體生態更爲完善,且在機器學習的集成和應用性暫時領先,但 Spark 底層仍是採用微批(Micro Batching)處理的形式。安全
Apache Flink 在流式計算上有明顯優點:首先其流式計算屬於真正意義上的單條處理,即每一條數據都會觸發計算。在這一點上明顯與 Spark 的微批流式處理方式不一樣。其次,Flink 的容錯機制較爲輕量,對吞吐量影響較小,使得 Flink 能夠達到很高的吞吐量。最後 Flink 還擁有易用性高,部署簡單等優點。相比之下咱們最終決定採用基於 Flink 的架構方案。性能優化
用戶畫像系統目前爲集團線上業務提供用戶實時標籤數據服務。爲此咱們的服務須要打通多種數據源,對海量的數字信息進行實時不間斷的數據清洗、聚類、分析,從而將它們抽象成標籤,並最終爲應用方提供高質量的標籤服務。在此背景下,咱們設計用戶畫像系統的總體架構以下圖所示:cookie
總體架構分爲五層:數據結構
在總體架構設計方案設計完成以後,咱們針對數據也設計了詳盡的處理方案。在數據處理階段,鑑於 Kafka 高吞吐量、高穩定性的特色,咱們的用戶畫像系通通一採用 Kafka 做爲分佈式發佈訂閱消息系統。數據清洗階段利用 Flink 來實現用戶惟一性識別、行爲數據的清洗等,去除冗餘數據。這一過程支持交互計算和多種複雜算法,並支持數據實時 / 離線計算。目前咱們數據處理流程迭代了兩版,具體方案以下:架構
總體數據來源包含兩種:app
根據不一樣業務的指標需求咱們直接從集團數據倉庫抽取數據並落入 Kafka,或者直接從業務端以 CDC(Capture Data Change)的方式寫入 Kafka。在計算層,數據被導入到 Flink 中,經過 DataStream 生成 ID-Mapping、用戶標籤碎片等數據,而後將生成數據存入 JanusGraph(JanusGraph 是以 HBase 做爲後端存儲的圖數據庫介質)與 Kafka,並由 Flink 消費落入 Kafka 的用戶標籤碎片數據,進行聚合生成最新的用戶標籤碎片(用戶標籤碎片是由用戶畫像系統獲取來自多種渠道的碎片化數據塊處理後生成的)。
服務層將存儲層存儲的用戶標籤碎片數據,經過 JanusGraph Spark On Yarn 模式,執行 TinkerPop OLAP 計算生成全量用戶 Yids 列表文件。Yid 是用戶畫像系統中定義的集團級用戶 ID 標識。結合 Yids 列表文件,在 Flink 中批量讀取 HBase 聚合成完整用戶畫像數據,生成 HDFS 文件,再經過 Flink 批量操做新生成的數據生成用戶評分預測標籤,將用戶評分預測標籤落入 Phoenix,以後數據即可經過統一數據服務接口進行獲取。下圖完整地展現了這一流程。
爲了實現用戶標籤的整合,用戶 ID 之間的強打通,咱們將用戶 ID 標識當作圖的頂點、ID pair 關係看做圖的邊,好比已經識別瀏覽器 Cookie 的用戶使用手機號登錄了公司網站就造成了<cookie,mobile>對應關係。這樣全部用戶 ID 標識就構成了一張大圖,其中每一個小的連通子圖 / 連通分支就是一個用戶的所有標識 ID 信息。
ID-Mapping 數據由圖結構模型構建,圖節點包含 UserKey、Device、IdCard、Phone 等類型,分別表示用戶的業務 ID、設備 ID、身份證以及電話等信息。節點之間邊的生成規則是經過解析數據流中包含的節點信息,以必定的優先級順序進行節點之間的鏈接,從而生成節點之間的邊。好比,識別了用戶手機系統的 Android_ID,以後用戶使用郵箱登錄了公司 App,在系統中找到了業務線 UID 就造成了<Android_ID,mail>和<mail,UID>關係的 ID pair,而後系統根據節點類型進行優先級排序,生成 Android_ID、mail、UID 的關係圖。數據圖結構模型以下圖所示:
<p style="text-align:center">Gephi</p>
1.0 版本數據處理流程在系統初期較好地知足了咱們的平常需求,但隨着數據量的增加,該方案遇到了一些性能瓶頸:
<p style="text-align:center">Gephi</p>
鑑於這些問題,咱們提出了 2.0 版本的解決方案。在 2.0 版本中,咱們經過利用 HBase 列式存儲、修改圖數據結構等優化方案嘗試解決以上三個問題。
以下圖所示,2.0 版本數據處理流程大部分承襲了 1.0 版本。新版本數據處理流程在如下幾個方面作了優化:
<p style="text-align:center">2.0 版本數據處理流程</p>
<p style="text-align:center">Gephi</p>
目前,線上部署的用戶畫像系統中的數據絕大部分是來自於 Kafka 的實時數據。隨着數據量愈來愈多,系統的壓力也愈來愈大,以致於出現了 Flink 背壓與 Checkpoint 超時等問題,致使 Flink 提交 Kafka 位移失敗,從而影響了數據一致性。這些線上出現的問題讓咱們開始關注 Flink 的可靠性、穩定性以及性能。針對這些問題,咱們進行了詳細的分析並結合自身的業務特色,探索並實踐出了一些相應的解決方案。
下圖展現了 Flink 中 checkpointing 執行流程圖:
<p style="text-align:center">Flink 中 checkpointing 執行流程</p>
經過以上流程分析,咱們經過三種方式來提升 Checkpointing 性能。這些方案分別是:
CheckPoint 存儲方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不一樣 StateBackend 之間的性能以及安全性是有很大差別的。一般狀況下,MemoryStateBackend 適合應用於測試環境,線上環境則最好選擇 RocksDBStateBackend。
這有兩個緣由:首先,RocksDBStateBackend 是外部存儲,其餘兩種 Checkpoint 存儲方式都是 JVM 堆存儲。受限於 JVM 堆內存的大小,Checkpoint 狀態大小以及安全性可能會受到必定的制約;其次,RocksDBStateBackend 支持增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態。與完整檢查點相比,增量檢查點能夠顯著縮短 checkpointing 時間,但代價是須要更長的恢復時間。
Checkpointing 須要對每一個 Task 進行數據狀態採集。單個 Task 狀態數據越多則 Checkpointing 越慢。因此咱們能夠經過增長 Task 並行度,減小單個 Task 狀態數據的數量來達到縮短 CheckPointing 時間的效果。
Flink 算子鏈(Operator Chains)越長,Task 也會越多,相應的狀態數據也就更多,Checkpointing 也會越慢。經過縮短算子鏈長度,能夠減小 Task 數量,從而減小系統中的狀態數據總量,間接的達到優化 Checkpointing 的目的。下面展現了 Flink 算子鏈的合併規則:
基於以上這些規則,咱們在代碼層面上合併了相關度較大的一些 Task,使得平均的操做算子鏈長度至少縮短了 60%~70%。
在 Flink 運行過程當中,每個操做算子都會消費一箇中間 / 過渡狀態的流,並對它們進行轉換,而後生產一個新的流。這種機制能夠類比爲:Flink 使用阻塞隊列做爲有界的緩衝區。跟 Java 裏阻塞隊列同樣,一旦隊列達到容量上限,處理速度較慢的消費者會阻塞生產者向隊列發送新的消息或事件。下圖展現了 Flink 中兩個操做算子之間的數據傳輸以及如何感知到背壓的:
首先,Source 中的事件進入 Flink 並被操做算子 1 處理且被序列化到 Buffer 中,而後操做算子 2 從這個 Buffer 中讀出該事件。當操做算子 2 處理能力不足的時候,操做算子 1 中的數據便沒法放入 Buffer,從而造成背壓。背壓出現的緣由可能有如下兩點:
實踐中咱們經過如下方式解決背壓問題。首先,縮短算子鏈會合理的合併算子,節省出資源。其次縮短算子鏈也會減小 Task(線程)之間的切換、消息的序列化 / 反序列化以及數據在緩衝區的交換次數,進而提升系統的總體吞吐量。最後,根據數據特性將不須要或者暫不須要的數據進行過濾,而後根據業務需求將數據分別處理,好比有些數據源須要實時的處理,有些數據是能夠延遲的,最後經過使用 keyBy 關鍵字,控制 Flink 時間窗口大小,在上游算子處理邏輯中儘可能合併更多數據來達到下降下游算子的處理壓力。
通過以上優化,在天天億級數據量下,用戶畫像能夠作到實時信息實時處理並沒有持續背壓,Checkpointing 平均時長穩定在 1 秒之內。
目前用戶畫像部分數據都是從 Hive 數據倉庫拿到的,數據倉庫自己是 T+1 模式,數據延時性較大,因此爲了提升數據實時性,端到端的實時流處理頗有必要。
端到端是指一端採集原始數據,另外一端以報表 / 標籤 / 接口的方式對這些對數進行呈現與應用,鏈接兩端的是中間實時流。在後續的工做中,咱們計劃將現有的非實時數據源所有切換到實時數據源,統一通過 Kafka 和 Flink 處理後再導入到 Phoenix/JanusGraph/HBase。強制全部數據源數據進入 Kafka 的一個好處在於它可以提升總體流程的穩定性與可用性:首先 Kafka 做爲下游系統的緩衝,能夠避免下游系統的異常影響實時流的計算,起到「削峯填谷」的做用;其次,Flink 自 1.4 版本開始正式支持與 Kafka 的端到端精確一次處理語義,在一致性方面上更有保證。
做者介紹:楊毅:友信金服計算平臺部 JAVA 工程師穆超峯:友信金服計算平臺部數據開發高級工程師賀小兵:友信金服計算平臺部數據開發工程師胡夕:友信金服計算平臺部技術總監