魅族大數據之流平臺設計部署實踐--轉

原文地址:http://mp.weixin.qq.com/s/-RZB0gCj0gCRUq09EMx1fAjava

沈輝煌   魅族數據架構師 node

2010年加入魅族,負責大數據、雲服務相關設計與研發;web

專一於分佈式服務、分佈式存儲、海量數據下rdb與nosql融合等技術。正則表達式

主要技術點:推薦算法、文本處理、ranking算法算法

 

本篇文章內容來自第八期魅族開放日魅族數據架構師沈輝煌的現場分享,由IT大咖說提供現場速錄,由msup整理編輯。sql

 

導讀:魅族大數據的流平臺系統擁有自設計的採集SDK,自設計支持多種數據源採集的Agent組件,還結合了Flume、Spark、Metaq、Storm、Kafka、Hadoop等技術組件,本文就魅族流平臺對大量數據的採集、實時計算、系統分析方法,全球多機房數據採集等問題進行介紹。數據庫

 

流平臺是魅族大數據平臺的重要部分,包括數據採集、數據處理、數據存儲、數據計算等模塊,流平臺爲大數據提供了強大的支撐能力。後端

 

文章還介紹了魅族大數據流平臺的架構、設計方式、經常使用組件、核心技術框架等方面的內容,還原魅族大數據平臺的搭建過程及遇到的問題。跨域

 

1、魅族大數據平臺架構緩存

 

如圖所示即是魅族的大數據平臺架構。

 

                                           

  • 左邊是多樣性的數據源接入;

  • 右上是離線數據的採集;

  • 下面是流平臺(也是今天分享的主角);

  • 中間是集羣的部署;

  • 右邊是ETL的數據挖掘、算法庫和一些數據模型;

  • 左上角是數據開發平臺,好比webIDE可使得開發人員更便捷地作一些數據查詢和管理;

  • 最右邊的是一個數據產品門戶,包括咱們的用戶畫像、統計系統等,這裏麪包含大數據的不少組件,好比數據採集、數據處理、數據存儲、數據挖掘等,最後產生大數據的雛形。

 

2、流平臺介紹

 

 

流平臺是大數據平臺一個比較重要的部分,主要包括四個部分:數據採集、數據處理、數據存儲、計算能力。

 

  • 數據採集

 

 「誰擁有了整個世界的數據,他就是最大的贏家」,這句話雖然有點誇張,可是卻表達了數據採集的重要性。一個大數據平臺數據的多樣性、數據量的級別很大程度上決定了大數據的能力和豐富程度。

 

  • 數據處理

 

這裏講的數據處理並非像末端那麼專業的數據清洗,更多的是爲後續入庫作一些簡單處理,以及實時計算。

 

  • 數據存儲

計算能力,包括離線計算和實時計算

 

流平臺爲大數據提供很是強大的支撐,數據統計分析、數據挖掘、神經網絡的圖形計算等均可以依靠計算能力進行。

 

實時計算是指在必定單位的時間延遲範圍內,基於增量的數據推算出結果,再結合歷史數據獲得指望的分析結果。這個時間是根據業務需求而定。

 

一、流平臺架構

                                            

上圖是咱們的流平臺架構圖

 

  • 左邊是數據源,像NoSQL、RDB、文件類型;

  • 最右邊是集羣,下面還有其餘的一些Hadoop(存儲);

  • 中間的框是核心,也就是流平臺;

  • 最上面的是AS-Manager(咱們的流管理平臺),承載了很是多的管理功能;

  • 下面是Zookeeper,這是一個很是流行的集成管理中心,魅族的一些架構都會用到它,流平臺也不例外,Zookeeper能夠說貫穿了咱們整個流平臺的架構;

  • 最下面是AS-Protocol,咱們本身設計的流平臺的數據對象協議,打通了整個流平臺的數據鏈路;

  • 中間四個框是核心的四個模塊:採集模塊、數據中轉模塊、緩存模塊、實時計算模塊,也叫合併層。

 

二、具體架構介紹

 

這是咱們的具體架構圖。

 

業務規模:從這邊採集數據到通過流平臺最後通過實時計算或入庫,它的數據量量級在千億級別。

 

三、組件

 

  • 數據源渠道

 

前面提到採集數據源渠道的多樣性決定了大數據平臺的相應能力和綜合程度。咱們這邊首先會有一個文件類的業務數據,包括業務日誌、業務數據、數據庫文件,這些都會通過採集服務採集。

 

下面這一塊包括一些網站的js訪問、手機各APP埋點、特色的應用日誌文件(它會經過手機端的一些埋點上訪到咱們的埋點服務)。

 

  • 數據採集

 

數據採集分爲兩個部分:採集服務、獨立部署的埋點服務。圖中只顯示了一個埋點服務,裏面還會有不少的第三方業務,第三方業務經過這個紅色的插件接入咱們的採集。

 

  • 數據中轉

 

經過採集模塊把數據流轉到中轉模塊,中轉模塊採用的是目前比較流行的flume組件,紅色sink是咱們本身開發的。

 

  • Cache

 

sink把前面的數據轉給緩存層,緩存層裏有metaq和Kafka。

 

  • Streaming

 

實時計算模塊上線了Spark和Storm,較早上線的是Spark,目前兩個都在用的緣由是它會適應不一樣的業務場景。

 

  • Store

 

最後面是咱們提供給落地的store層,像HIVE、Hbase等等。

 

  • 流管理平臺

 

最下面是流管理平臺,圖中有四條線連着四個核心模塊,對這四個模塊進行很是重要且很是豐富的邏輯管理,包括數據管理、對各節點的監控、治理、實時命令的下發等。

 

 

3、流平臺設計

 

一、概念解讀

 

Message,就是一條消息,是最小的數據單位。業務方給的一條數據就是一個message;咱們去採集文件的話,一行數據就是一個message。

 

AS-Protocol,是咱們本身設計的流平臺數據的對象,它會對一批量的message進行打包,而後再加上一些必要的變量作一個封裝。

 

Evnet,會提供一個相似的標準接口,這個地方其實更多的是爲了打通採集的流平臺。它最重要的一個變量是Topic,就是說我拿到了個人AS-Protocol就能夠根據對應的Topic發到相應的登陸去緩存提取,由於咱們的AS-Protocol除了起始端和結束端之外,中間層是不用解析協議的。

 

Type,數據格式目前是Json和Hive格式,能夠根據業務去擴展。

 

Compress,Hive格式在空間上也是很是有優點的,很是適合於網絡傳輸壓縮。當壓縮數據源質量沒有達到必定量的程度的時候會越壓越大,因此咱們要判斷是否須要壓縮。咱們壓縮採用的是一個全系統

Data_timestamp,數據的時間是最上面的message,每個message會攜帶一個數據時間.這個比較好理解,就是入庫以後會用作數據統計和分析的。

 

Send_timestamp,發送時間會攜帶在咱們的AS-Protocol裏,它聲明瞭每個數據包發送的時間。

 

Unique Key,每個數據包都有一個惟一的標識,這個也是很是重要的,它會跟着AS-Protocol和Event走通整個平臺的數據鏈路,在作數據定位、問題定位的時候很是有用,能夠明確查到每一個數據包在哪一個鏈路經歷了什麼事情。

 

Topic。這個不需多言。

 

Data_Group,數據分組是咱們很是核心的一個設計思想,原則上咱們是一個業務對應一個數據分組。

 

Protobuf序列化,咱們會對Event數據作一個PT序列化,而後再往上面傳,這是爲了節省數據流量。

 

二、協議設計

 

                                           

如圖所示爲Event、As-Protocol和Message的關係。

 

最上層是Event,裏面有一個Unique Key和Topic包括了咱們的As-Protocol,而後是數據格式、發動時間是否壓縮、用什麼方式壓縮,還攜帶一些額外的變量。最後面是一個Body,Body其實就是一個message的宿主,以字節流的方式存儲。這個就是咱們一個數據對象的協議設計。

接下來看數據在整個架構裏是如何流轉和傳輸的。

 

首先是數據源渠道,最左邊的是message,任何業務方的數據過來都是一條message,通過數據採集把一批message打包封裝成Event,再發給數據中轉模塊,也叫flume。把Event拆出來,有一個topic,最後把As-protocol放到相應位置緩存,消費對應的Topic,拿到對應的As-Protocol,並把這個數據包解析出來,獲得一條一條的message,這時就能夠進行處理、入庫或實時計算。

 

須要特別注意的是message和Event。每一個Message的業務量級是不同的,有幾十B、幾百B、幾千B的差異,打包成As-Protocol的時候要試試批量的數目有多少,原則上壓縮後的數據有個建議值,這個建議值視業務而定,DataGroup打包的數量是能夠配的。

 

三、數據分組設計

 

 

如圖所示是咱們的DataGroup設計。首先看最上面,一個Topic能夠定義N個DataGroup。往下是Topic和streaming Job一比一的關係,就是說一個實時的Group只須要對應一個Topic,若是兩個業務不相關就對應的兩個Topic,用兩個Job去處理,最後獲得想要的關係。

 

從架構圖能夠看到DataGroup的扭轉關係。最初數據採集每個節點會聲明它是屬於哪個DataGroup,上傳數據會處於這個DataGroup,通過數據中轉發給咱們的分佈式緩存也對應了Topic下面不一樣的分組數據。最後Streaming交給我Topic,我能夠帥選出在最上面的關係,去配置DataGroup,能夠很是靈活地組合。這就是DataGroup的設計思想。

 

 

4、採集組件Agent

 

 

一、概述

 

                                          

如圖所示,這是徹底由咱們本身設計和實現的一款組件。右邊是採集組件,分爲兩部分:一個是基於java環境的獨立工做程序;另外一個是jar插件。插件叫Agen-Stub.jar;獨立層是Agent-File.zip,Agent-File有一個paresr支持不一樣的文件類型,目前支持的file和Binlog,可擴展。根據須要能夠增長parser,也是接入Agent-stub,擁有Agent-stub的一些特性。

 

如上圖右側的示意圖,Agent-stub接入多個Business,前面提到的一個埋點服務就是一個Business,它把數據交給Agent-stub,Agent-stub會日後發展,與file和mysQL相對應的是file parser,出來是Agent-stub,流程是同樣的。

 

二、Agent-Stub.jar

 

接下來看Agent-Stub是如何設計的。

 

多線程、異步這個毫無疑問,作插件化確定是這樣考慮的,不能阻塞上層業務。

 

內存小隊列+磁盤壓縮隊列。這是咱們改進最大的一個地方,早期版本中咱們採用的是內存大隊列,若是隻有內存大隊列缺點很是明顯:

 

程序正常啓動的時候大隊列裏的數據怎麼辦?要等他發完嗎?仍是不發完?當大隊列塞滿的時候,還有對上層業務的侵入性怎麼辦?程序遇到問題時怎麼辦?大隊列多是50萬、100萬甚至更多。

 

採用了內存小隊列+磁盤壓縮隊列後能夠解決正常程序的啓停,保證數據沒有問題,還能夠解決空間的佔用清空性的問題,以此同時,磁盤壓縮隊列還能夠在程序出錯的時候加速發送。

 

解釋一下磁盤壓縮隊列, 此次咱們設計協議的思想很簡單:壓縮以後獲得一個字節速度,存在磁盤的文件裏,這個文件按照小時存儲,這時對於二次發送帶來的損耗並不大,不須要從新阻斷數據也不須要解析和壓縮,只須要讀出來發出去。後面還有一個提高就是磁盤發送隊列跟內存發送隊列是單獨分開的,這樣更能提高二次數據的發送性能。

 

無損啓停。正常的啓動和中止,數據是不會中止不會丟失的。

 

Agent的版本號自動上報平臺。這個很是重要,咱們早期的版本是沒有的,能夠想象一下當你的Agent節點是幾千上萬,若是沒有一個平臺直觀地管理,那將是一個怎樣恐怖的局面。如今咱們每個Agent啓動的時候都會建立一個node path,把版本號放到path裏,在管理平臺解析這個path,而後作分類,咱們的版本就是這樣上報的。

 

自動識別接入源,智能歸類。這個其實和上面那點是同樣的,在早期版本中咱們作一個Agent的標識,其實就是一個IP+一個POD,就是說你有幾千個IP+POD量表須要人工管理,工做量很是大且乏味。咱們優化了一個自動識別,把DataGroup放到Agent的node path裏,管理平臺能夠作到自動識別。

 

Agent的全面實時監控。包括內存隊列數、磁盤隊列數、運行狀態、出錯狀態、qps等,均可以Agent上報,而且在管理平臺直觀地看到哪個節點是什麼樣子的。其作法也依賴於zookeeper的實現和承載,這裏其實就是對zk node的應用,咱們有一個定時線程收集當前Agent必要的數據,而後傳到node的data上去,管理平臺會獲取這些date,最後作一個平臺化的展現。

 

支持實時命令。包括括限流,恢復限流、中止、調整心跳值等,大大提升了運維能力。其實現原理也是依賴於Agent,這裏咱們建立一個Data Group,經過管理平臺操做以後把數據放到Data Group裏,而後會有一個監聽者去監聽獲取數據的變化並做出相應的邏輯。

 

兼容Docker。目前魅族在用Doker,Doker對咱們這邊的Agent來說是一個挑戰,它的啓動和中止是很是態化的,就是你可能認爲相同的Docker容器不會重啓第二次。

 

三、Agent-File.zip   

 

接入Agent-Stub。Agent-file首先是接入Agent-stub,擁有Agent-stub的一些特性。

 

兼容Docker。由於啓動和中止的常態,假設咱們剛剛一個業務接入了Agent-stub,那中止的時候它會通知我,Agent-stub會把小隊列裏的數據抓到磁盤壓縮隊列裏去。可是這裏須要注意的是:磁盤壓縮隊列不能放到Docker本身的文件系統裏,否則它停了以後數據就沒有人可以獲得了。

 

當Agent-stub停的時候,會有一個標識說磁盤要作隊列,咱們的數據有沒有發完,磁盤壓縮隊列裏有一個評級的標識文件,這時要用到Agent-file,Agent-file有一個單獨的掃描線程一個個地去掃描Docker目錄,掃到這個文件的時候判斷其數據有沒有發完,若是沒發完就只能當作一個發送者。

 

支持重發歷史數據。作大數據的可能都知道這些名詞,好比昨天的數據已經採集完了,但因爲某些緣由有可能數據有遺漏,須要再跑一次後端的補貼邏輯,或者上馬訓練,這時就要作數據重發。咱們在管理平臺上就會有一個支持這種特定文件或特定時間段的選擇,Agent接收到這個命令的時候會把相應的數據發上去,固然前提是數據不要被清了。

 

管理平臺自助升級。這個能夠理解成軟件升級,Agent能夠說是很是常見的組件,可是咱們從新設計時把自動升級考慮在內,這也是咱們爲何設計本身作而不是用開源的組件。這樣作帶來的好處是很是大的,咱們幾千個Agent在平臺裏只須要一鍵就能夠完成自動升級。

 

文件名正則表達式匹配。文件名的掃描是用自動錶達式。

 

源目錄定時掃描 and Jnotify。重點介紹文件掃描機制。早期的版本是基於Agent-fire和KO-F二者結合作的數據採集:Agent-file是加碼裏對文件變動的事件鑑定,包括重命名、刪除、建立都有一個事件產生;KO-F是拿到文件下的最佳數據。假設源目錄裏有一千個文件,KO-F現場就是一千個,Agent-file對應的文件變革賦予的追加、重命名等均可能會產生一系列事件,邏輯複雜。

 

因此咱們設計了源目錄定時掃描的機制,首先有一個目標,就是咱們的文件隊列,包括爲未讀文件、已讀文件作區別,區別以後掃描,固然還會有像文件摘要等的存在這裏不細講,掃描以後更新未讀文件、已讀文件列表。

 

之因此加Jnotify是由於咱們發現只用定製掃描不能解決全部業務場景的問題,jootify在這裏起到補充定製掃描的做用,解決文件風險和文件產程的問題。

 

單文件讀取。早期版本中這一點依賴於文件列表,當文件很是多時程序變得很是不穩定,由於可能要開幾百個或幾千個線程。後來咱們改爲了單文件的讀取,上文提到的掃描機制會產生一個文件隊列,而後從文件隊列裏讀取,這樣一個個文件、一段段圖,程序就很是穩定了。

 

文件方式存儲offset,無損啓停。早期採用切入式PTE作存儲,銜接很是重,後來咱們改爲文件方式存儲,設計很是簡單就只有兩個文件:一個是目錄下面全部文件的offset;一個是正在讀的文件的offset。這裏涉及到無損啓停和策略的問題,咱們定了一個5次算法:就是每讀了5次就會刷盤一次,但只刷在讀文件,別的文件不會變化,因此能夠想象獲得,當這個程序被替換走的時候,最多也就是重複5條數據,大會致使數據丟失。

 

四、Agent示意圖

 

 

如圖是Agent示意圖。上面是Agent-file和數據對象。Agent啓動的時候要把裏面的offset文件取來,就會產生未讀文件和已讀文件列表,掃描文件目錄,而後更新文件隊列,還有一個fileJNotify是相對應的文件隊列。而後有一個比較重要的fileReader,我會先從文件隊列裏拿到再去讀實際文件,讀完刷盤以後這一塊就成功了,我會根據個人刷盤去刷新offset。

 

上圖左邊有一個業務加了一個Agent-stub,最後變成flume,這裏有一個QueueReceiver(隊列接收者),filereader和業務方的DataSender會把message發過來,QueueReceiver接受的數據就是一條條的message,而後發送到內存小隊列裏,當這邊的小隊列滿了怎麼辦呢?中間有一個額外的固定大小的性能提高的地方用於message歸類,當這個fIieReader往這個內存小隊列發的時候發現塞不進去了,就會在規定大小的隊列裏發,當一個固定大小的隊列滿了以後就會打包壓縮,以字節處理的方式存到磁盤壓縮隊列。

 

再來講說咱們爲何會提出二次數據的發送,其實就是多了一個countsender即壓縮隊列的發送者,直接的數據來源是磁盤壓縮隊列,與上面的並生沒有任何衝突。Countsender的數據對帳功能是咱們整個平臺的核心功能之一,基於這個統計的數據確保了其完整性,少一條數據咱們都知道,在採集層有一個countsender,以另一個渠道發出去,和真正的數據源渠道不同,會更加的輕量化更加可靠,且數值很是小。    

  

最後是前文提到的監控和命令的實現,一邊是Agentnode,一邊是數據管理。

 

五、Agent的坑

 

丟數據。如前文提到內存大隊列帶來的問題。

 

版本管理的問題。

 

tailf -f的問題。

 

網絡緣由致使zk刪節點問題。網絡不穩定的時候,ZK會有一個節點的心跳檢測,不穩定的時候監測會覺得節點已經不存在了而把節點刪掉,這會致使管理平臺的節點監控、文件下發所有都失效。解決辦法就是在message加一層控制檢查線程,發現節點不在了再建立一遍。

 

亂碼的問題。可能會跟一些遠程訪問的軟件相關,原則上咱們假設第二次啓動的時候沒有配置咱們的編碼,默認與系統一致,但當遠程軟件啓動的時候可能會發生不同的地方,因此不要依賴於默認值,必定要在啓動程序裏設置但願的編碼。

 

日誌問題,在插件化的時候確定要考慮到業務方的日誌,咱們把業務方的日誌刷死了,當網絡出現問題的時候每發送一條就失敗一條,那是否是都要打印出來?咱們的考慮是第一條不打印,後面可能十條打印一次,一百條打印一次,一千條打印一次,這個量取決於業務。補充一點,咱們有一個統計線程,能夠根據統計線程觀察Agent的正常與否。

 

 

5、流管理平臺

 

 

如圖所示,咱們的流管理平臺界面比較簡單,但功能很是豐富,包括:

  • 接入業務的管理、發佈、上線;

  • 對Agent節點進行實時監測、管理、命令;

  • 對Flume進行監測、管理;

  • 對實時計算的job的管理;

  • 對全鏈路的數據流量對賬,這是咱們自檢的功能;

  • 智能監控報警,咱們有一個很是人性化的報警閥值的建議。取一個平均值,好比一週或一天,設定一個閥值,好比一天的流量訪問次數多是一千次,咱們設計的報警是2000次,當連續一週都是2000次的時候就得改進。

 

 

6、數據中轉

 

 

一、背景

 

業務發展可能從1到100再到1000,或者當公司互聯網發展到必定程度的時候業務可能遍及世界各地,魅族的雲服務數據分爲海外服務和國內服務,咱們把業務拆分開來,大數據採集確定也要跟着走,這就面臨着數據中轉的問題。

 

                                            

如圖所示是咱們兩個案例的示意圖。黑色的是內網的線,橙色的是跨界性的線,有公網的、雲端的、專線的,各類各樣的網絡狀況。

 

上面的是Agent集羣,B-IDC也有一個Agent集羣,直接訪問咱們登陸的集羣。

 

這裏第一個問題是咱們的鏈接很是多,訪問Agent節點的時候有幾千個Agent節點就得訪問幾千個節點,這是不太友好的事情。另外一個問題是當咱們作升級遷移的時候,Agent要作修改和配置,必須得重啓,當整個B-IDC遷移到A-IDC,咱們加了一個Flyme集羣。一樣是一個Agent集羣,下面有一個Flume集羣,這樣的好處:一是裏面的鏈接很是少,線上的Flume一個ID就三臺;二是這邊承載了全部的Agent,除了Agent還有其餘的採集都在A-IDC裏中轉,當這個片區要作升級的時候上面的業務是透明的,靈活性很是高。

 

二、Flume介紹

 

Flume裏有三個核心的部分:Source、Channel、Sink,Source是數據結構源;Channel至關於內存大隊列,Sink是輸出到不一樣的目標。官方提供了不少組件:Avro、HTTP、Thrift、Memory、File、Spillable Memory、Avro、Thrift、Hdfs、Hive。

 

三、Flume實踐

 

無Group,採用Zookeeper作集羣

 

Agent採用LB作負載均衡,動態感知。結合Zookeeper能夠感知到Agent列表,這時會採用負載均衡的作法找到當前的那個Flume,到後端的Flume直接變化的時候能夠感知到從而下線。

 

硬盤緩存、無損啓停。採用memory可能會帶來些很差的問題,若是內存隊列改爲文件就沒有這個問題。由於內存速度快,存儲強制刷新的時候就沒有數據了,因此咱們作了優化:仍是採用memory,在Flume停的時候把數據採集下來,下一次啓動的時候把數據發出去,這時就能夠作到無損啓停,可是有一點千萬要注意:磁盤實際上是固化在機器裏面,當這臺機器停下再也不啓動的時候,別忘了把數據移走發出去。

 

中止順序優化。在作優化的時候遇到源碼的修改,其實就是Flume中止順序的優化。原生裏好像先中止Channel,而後提升sink,這就會致使想要作這個功能的時候作不到。咱們應該先把這個數據改掉再去中止sink最後中止Channel,這樣就保證Channel裏的數據能夠所有固化到硬盤裏。

 

多種轉發方式。咱們如今是全球的RBC,支持公網、內網、跨域性專線,咱們提供一個很是好的功能:http sink,它也是一個安全的支持ssl的轉換方式。

 

自定義Sink,多線程發送(channel的get只能單線程)。

 

四、中止順序

 

 

如圖是中止順序的修改。這是一個sourceRunner、sink、channel。

 

五、Memory的capacity

 

 

選擇內存以後,這個內存大小到底多少比較合適?如圖所示,左邊Flume是從500-1000,channel容量是5萬、10萬,還有Agent的個數、線程,咱們發如今10萬的時候它的fullGC是很是頻繁的,因此咱們最後定的大小是5萬。固然不一樣的機器根據不一樣的測試獲得本身的值,這個值不是恆定的。

 

包大小從10K到30K到50K有什麼不同呢?很明顯TPS從1萬多降到了2000多,由於包越大網卡就越慢了,這裏看到其實已經到了200兆(雙網卡),把網卡跑滿了。咱們作流平臺設計的時候,不但願鏈路被跑滿,因此咱們給了個建議值,大小在5-10K。固然,線上咱們採用的萬兆網卡。

 

 

7、實時計算

 

 

一、實時計算集羣

 

在SparkZK裏直接寫HA,能夠減小沒必要要的MR提升IO,減小IO消耗。

Kafka+Strom (ZK)

 

二、Spark實踐

 

直接寫HDFS底層文件

 

自動建立不存在的Hive分區

 

相應Metaq的日誌切割,這一點上如今的Kafka是沒有問題的,當時的日誌切割會致使網絡鏈接超時,咱們查看源代碼發現確實會堵塞,咱們的解決方法是把切割調成多色或分區調多。

 

不要定時的killJob。早期的Spark版本由於大批量的killJob致使一些不穩定的狀況,某些job實際上是沒有被徹底覆蓋,假死在那裏的。

相關文章
相關標籤/搜索