美圖大數據平臺架構實踐

本文系美圖互聯網技術沙龍第 11 期嘉賓分享內容,公衆號後臺回覆「美圖大數據平臺」獲取 PPT,點擊閱讀原文可觀看完整視頻回放。node

現在大數據在各行業的應用愈來愈普遍:運營基於數據關注運營效果,產品基於數據分析關注轉化率狀況,開發基於數據衡量系統優化效果等。美圖公司有美拍、美圖秀秀、美顏相機等十幾個 app,每一個 app 都會基於數據作個性化推薦、搜索、報表分析、反做弊、廣告等,總體對數據的業務需求比較多、應用也比較普遍。python

所以美圖數據技術團隊的業務背景主要體如今:業務線多以及應用比較普遍。這也是促使咱們搭建數據平臺的一個最主要的緣由,由業務驅動linux

圖 1json

舉幾個美圖的數據應用案例。如圖 1 所示,左起第一張是美圖自研的數據可視化平臺 DataFace,支持業務方自由拖拽生成可視化報表,便於高效的作數據報表以及後續的分析;第二張是美拍 APP 的首頁,熱門個性化推薦,基於用於的行爲數據,爲用戶推薦可能喜歡、感興趣的視頻列表;第三張是基於用戶的做弊的數據,根據必定的模型與策略進行反做弊,有效判斷、過濾用戶的做弊行爲。除此以外,包括搜索、a/b 實驗、渠道跟蹤、廣告等方面都有普遍應用。緩存

當前美圖每個月有 5 億活躍用戶,這些用戶天天產生接近 200 億條的行爲數據,總體的量級相對來講仍是比較大,集羣機器達到千量級,以及有 PB 級的歷史總數據量。安全

美圖有比較多的業務線,而且各業務線比較普遍地運用到數據,以及總體的用戶規模比較大,以上因素都促使咱們必須構建對應的數據平臺,來驅動這些業務增加,更高效地使用數據。
服務器


美圖數據平臺總體架構

如圖 2 所示是咱們數據平臺的總體架構。在數據收集這部分,咱們構建一套採集服務端日誌系統 Arachnia,支持各 app 集成的客戶端 SDK,負責收集 app 客戶端數據;同時也有基於 DataX 實現的數據集成(導入導出);Mor 爬蟲平臺支持可配置的爬取公網數據的任務開發。網絡

圖 2
架構


數據存儲層主要是根據業務特色來選擇不一樣的存儲方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在數據計算部分,當前離線計算主要仍是基於 Hive&MR、實時流計算是 Storm 、 Flink 以及還有另一個自研的 bitmap 系統 Naix。app

在數據開發這塊咱們構建了一套數據工坊、數據總線分發、任務調度等平臺。數據可視化與應用部分主要是基於用戶需求構建一系列數據應用平臺,包括:A/B 實驗平臺、渠道推廣跟蹤平臺、數據可視化平臺、用戶畫像等等。

圖 2 右側展現的是一些各組件均可能依賴的基礎服務,包括地理位置、元數據管理、惟一設備標識等。

如圖 3 所示是基本的數據架構流圖,典型的 lamda 架構,從左端數據源收集開始,Arachnia、AppSDK 分別將服務端、客戶端數據上報到代理服務 collector,經過解析數據協議,把數據寫到 kafka,而後實時流會通過一層數據分發,最終業務消費 kafka 數據進行實時計算。

圖 3


離線會由 ETL 服務負責從 Kafka dump 數據到 HDFS,而後異構數據源(好比 MySQL、Hbase 等)主要基於 DataX 以及 Sqoop 進行數據的導入導出,最終經過 hive、kylin、spark 等計算把數據寫入到各種的存儲層,最後經過統一的對外 API 對接業務系統以及咱們本身的可視化平臺等。


數據平臺的階段性發展

企業級數據平臺建設主要分三個階段:

  • 剛開始是基本使用免費的第三方平臺,這個階段的特色是能快速集成並看到 app 的一些統計指標,可是缺點也很明顯,沒有原始數據除了那些第三方提供的基本指標其餘分析、推薦等都沒法實現。因此有從 0 到 1 的過程,讓咱們本身有數據能夠用;

  • 在有數據可用後,由於業務線、需求量的爆發,須要提升開發效率,讓更多的人蔘與數據開發、使用到數據,而不只僅侷限於數據研發人員使用,因此就涉及到把數據、計算存儲能力開放給各個業務線,而不是握在本身手上;

  • 在當數據開放了之後,業務方會要求數據任務可否跑得更快,可否秒出,可否更實時;另一方面,爲了知足業務需求集羣的規模愈來愈大,所以會開始考慮知足業務的同時,如何實現更節省資源。

美圖如今是處於第二與第三階段的過渡期,在不斷完善數據開放的同時,也逐步提高查詢分析效率,以及開始考慮如何進行優化成本。接下來會重點介紹 0 到 1 以及數據開放這兩個階段咱們平臺的實踐以及優化思路。


從 0 到 1

從 0 到 1 解決從數據採集到最終可使用數據。如圖 4 所示是數據收集的演進過程,從剛開始使用相似 umeng、flurry 這類的免費第三方平臺,到後面快速使用 rsync 同步日誌到一臺服務器上存儲、計算,再到後面快速開發了一個簡單的python腳本支持業務服務器上報日誌,最終咱們開發了服務端日誌採集系統 Arachnia 以及客戶端 AppSDK。

圖 4


數據採集是數據的源頭,在整個數據鏈路中是相對重要的環節,須要更多關注:數據是否完整、數據是否支持實時上報、數據埋點是否規範準確、以及維護管理成本。所以咱們的日誌採集系統須要知足如下需求:

  • 能集成管理維護,包括 Agent 能自動化部署安裝升級卸載、配置熱更、延遲方面的監控;

  • 在可靠性方面至少須要保證 at least once;

  • 美圖如今有多 IDC 的狀況,須要能支持多個 IDC 數據採集彙總到數據中心;

  • 在資源消耗方面儘可能小,儘可能作到不影響業務。

基於以上需求咱們沒有使用 flume、scribe、fluentd,最終選擇本身開發一套採集系統 Arachnia。

圖 5


圖 5 是 Arachnia 的簡易架構圖,它經過系統大腦進行集中式管理。puppet 模塊主要做爲單個 IDC 內統一彙總 Agent 的 metrics,中轉轉發的 metrics 或者配置熱更命令。採集器 Agent 主要是運維平臺負責安裝、啓動後從 brain 拉取到配置,並開始採集上報數據到 collector。

接着看 Arachnia 的實踐優化,首先是 at least once 的可靠性保證。很多的系統都是採用把上報失敗的數據經過 WAL 的方式記錄下來,重試再上報,以避免上報失敗丟失。咱們的實踐是去掉 WAL,增長了 coordinator 來統一的分發管理 tx 狀態。

圖 6

開始採集前會從 coordinator 發出 txid,source 接收到信號後開始採集,並交由 sink 發送數據,發送後會ack tx,告訴 coordinator 已經 commit。coordinator 會進行校驗確認,而後再發送 commit 的信號給 source、sink 更新狀態,最終 tx 完 source 會更新採集進度到持久層(默認是本地 file)。該方式若是在前面 3 步有問題,則數據沒有發送成功,不會重複執行;若是後面 4 個步驟失敗,則數據會重複,該 tx 會被重放。

基於上文的 at least once 可靠性保證,有些業務方是須要惟一性的,咱們這邊支持爲每條日誌生成惟一 ID 標識。另一個數據採集系統的主要實踐是:惟必定位一個文件以及給每條日誌作惟一的 MsgID,方便業務方能夠基於 MsgID 在發生日誌重複時能在後面作清洗。

咱們一開始是使用 filename,後面發現 filename 不少業務方都會變動,因此改成 inode,可是 inode linux 會回收重複利用,最後是以 inode & 文件頭部內容作 hash 來做爲fileID。而 MsgID 是經過 agentID & fileID & offset 來惟一確認。

數據上報以後由 collector 負責解析協議推送數據到 Kafka,那麼 Kafka 如何落地到 HDFS 呢? 首先看美圖的訴求:

  • 支持分佈式處理;

  • 涉及到較多業務線所以有多種數據格式,因此須要支持多種數據格式的序列化,包括 json、avro、特殊分隔符等;

  • 支持由於機器故障、服務問題等致使的數據落地失敗重跑,並且須要能有比較快速的重跑能力,由於一旦這塊故障,會影響到後續各個業務線的數據使用;

  • 支持可配置的 HDFS 分區策略,能支持各個業務線相對靈活的、不同的分區配置;

  • 支持一些特殊的業務邏輯處理,包括:數據校驗、過時過濾、測試數據過濾、注入等;

基於上述訴求痛點,美圖從 Kafka 落地到 HDFS 的數據服務實現方式如圖 7 所示。


圖 7


基於 Kafka 和 MR 的特色,針對每一個 kafka topic 的 partition,組裝 mapper 的 inputsplit,而後起一個 mapper 進程處理消費這個批次的 kafka 數據,通過數據解析、業務邏輯處理、校驗過濾、最終根據分區規則落地寫到目標 HDFS 文件。落地成功後會把此次處理的 meta 信息(包括 topic、partition、開始的 offset、結束的offset)存儲到 MySQL。下次再處理的時候,會從上次處理的結束的 offset 開始讀取消息,開始新一批的數據消費落地。

實現了基本功能後不免會遇到一些問題,好比不一樣的業務 topic 的數據量級是不同的,這樣會致使一次任務須要等待 partition 數據量最多以及處理時間最長的 mapper 結束,才能結束整個任務。那咱們怎麼解決這個問題呢?系統設計中有個不成文原則是:分久必合、合久必分,針對數據傾斜的問題咱們採用了相似的思路。

圖 8


首先對數據量級較小的 partition 合併到一個 inputsplit,達到一個 mapper 能夠處理多個業務的 partition 數據,最終落地寫多份文件。

圖 9


另外對數據量級較大的 partition 支持分段拆分,平分到多個 mapper 處理同一個 partition,這樣就實現了更均衡的 mapper 處理,能更好地應對業務量級的突增。

除了數據傾斜的問題,還出現各類緣由致使數據 dump 到 HDFS 失敗的狀況,好比由於 kafka 磁盤問題、hadoop 集羣節點宕機、網絡故障、外部訪問權限等致使該 ETL 程序出現異常,最終可能致使由於未 close HDFS 文件致使文件損壞等,須要重跑數據。那咱們的數據時間分區基本都是以天爲單位,用原來的方式可能會致使一個天粒度的文件損壞,解析沒法讀取。

圖 10


咱們採用了分兩階段處理的方式:mapper 1 先把數據寫到一個臨時目錄,mapper 2 把 Hdfs 的臨時目錄的數據 append 到目標文件。這樣當 mapper1 失敗的時候能夠直接重跑這個批次,而不用重跑成天的數據;當 mapper2 失敗的時候能直接從臨時目錄 merge 數據替換最終文件,減小了從新 ETL 天粒度的過程。

在數據的實時分發訂閱寫入到 kafka1 的數據基本是每一個業務的全量數據,可是針對需求方大部分業務都只關注某個事件、某小類別的數據,而不是任何業務都消費全量數據作處理,因此咱們增長了一個實時分發 Databus 來解決這個問題。

圖 11


Databus 支持業務方自定義分發 rules 往下游的 kafka 集羣寫數據,方便業務方訂閱處理本身想要的數據,而且支持更小粒度的數據重複利用。

圖 12


圖 12 能夠看出 Databus 的實現方式,它的主體基於 Storm 實現了 databus topology。Databus 有兩個 spout,一個支持拉取全量以及新增的 rules,而後更新到下游的分發 bolt 更新緩存規則,另一個是從 kafka 消費的 spout。而 distributionbolt 主要是負責解析數據、規則 match,以及把數據往下游的 kafka 集羣發送。


數據開放

有了原始數據而且能作離線、實時的數據開發之後,隨之而來的是數據開發需求的井噴,數據研發團隊目不暇接。因此咱們經過數據平臺的方式開放數據計算、存儲能力,賦予業務方有數據開發的能力。

對實現元數據管理、任務調度、數據集成、DAG 任務編排、可視化等不一一贅述,主要介紹數據開放後,美圖對穩定性方面的實踐心得。

數據開放和系統穩定性是相愛相殺的關係:一方面,開放了以後再也不是有數據基礎的研發人員來作,常常會遇到提交非法、高資源消耗等問題的數據任務,給底層的計算、存儲集羣的穩定性形成了比較大的困擾;另一方面,其實也是由於數據開放,纔不斷推動咱們必須提升系統穩定性。

針對很多的高資源、非法的任務,咱們首先考慮可否在 HiveSQL 層面能作一些校驗、限制。如圖 13 所示是 HiveSQL 的整個解析編譯爲可執行的 MR 的過程:

圖 13


首先基於 Antlr 作語法的解析,生成 AST,接着作語義解析,基於AST 會生成 JAVA 對象 QueryBlock。基於 QueryBlock 生成邏輯計劃後作邏輯優化,最後生成物理計劃,進行物理優化後,最終轉換爲一個可執行的 MR 任務。

咱們主要在語義解析階段生成 QueryBlock 後,拿到它作了很多的語句校驗,包括:非法操做、查詢條件限制、高資源消耗校驗判斷等。

第二個在穩定性方面的實踐,主要是對集羣的優化,包括:

  • 咱們完整地對 Hive、Hadoop 集羣作了一次升級。主要是由於以前在低版本有 fix 一些問題以及合併一些社區的 patch,在後面新版本都有修復;另一個緣由是新版本的特性以及性能方面的優化。咱們把 Hive 從 0.13 版本升級到 2.1 版本,Hadoop 從 2.4 升級到 2.7;

  • 對 Hive 作了 HA 的部署優化。咱們把 HiveServer 和 MetaStoreServer 拆分開來分別部署了多個節點,避免合併在一個服務部署運行相互影響;

  • 以前執行引擎基本都是 On MapReduce 的,咱們也在作 Hive On Spark 的遷移,逐步把線上任務從 Hive On MR 切換到 Hive On Spark;

  • 拉一個內部分支對平時遇到的一些問題作 bugfix 或合併社區 patch 的特性;

在平臺穩定性方面的實踐最後一部分是提升權限、安全性,防止對集羣、數據的非法訪問、攻擊等。提升權限主要分兩塊:API 訪問與集羣。

圖 14


  • API Server :上文提到咱們有 OneDataAPI,提供給各個業務系統訪問數據的統一 API。這方面主要是額外實現了一個統一認證 CA 服務,業務系統必須接入 CA 拿到 token 後來訪問OneDataAPI,OneDataAPI 在 CA 驗證事後,合法的才容許真正訪問數據,從而防止業務系統能夠任意訪問全部數據指標。

  • 集羣:目前主要是基於 Apache Ranger 來統一各種集羣,包括 Kafka、Hbase、Hadoop 等作集羣的受權管理和維護;

以上就是美圖在搭建完數據平臺並開放給各個業務線使用後,對平臺穩定性作的一些實踐和優化。

那接下來對數據平臺建設過程作一個簡單的總結。

  • 首先在搭建數據平臺以前,必定要先瞭解業務,看業務的總體體量是否比較大、業務線是否比較廣、需求量是否多到嚴重影響咱們的生產力。若是都是確定答案,那能夠考慮儘快搭建數據平臺,以更高效、快速提升數據的開發應用效率。若是自己的業務量級、需求很少,不必定非得套大數據或者搭建多麼完善的數據平臺,以快速知足支撐業務優先。

  • 在平臺建設過程當中,須要重點關注數據質量、平臺的穩定性,好比關注數據源採集的完整性、時效性、設備的惟一標識,多在平臺的穩定性方面作優化和實踐,爲業務方提供一個穩定可靠的平臺。

  • 在提升分析決策效率以及規模逐漸擴大後須要對成本、資源作一些優化和思考。

相關文章
相關標籤/搜索