如何用 Hadoop/Spark 構建七牛數據平臺

數據平臺在大部分公司都屬於支撐性平臺,作的很差馬上會被吐槽,這點和運維部門很像。因此在技術選型上優先考慮現成的工具,快速出成果,不必去擔憂有技術負擔。早期,咱們走過彎路,認爲沒多少工做量,收集存儲和計算都本身研發,發現是吃力不討好。去年上半年開始,咱們全面擁抱開源工具,搭建本身的數據平臺。html

一、數據平臺設計理念

公司的主要數據來源是散落在各個業務服務器上的半結構化日誌,好比系統日誌、程序日誌、訪問日誌、審計日誌等。日誌是最原始的數據記錄,若是不是日誌,確定會有信息上的丟失。說個簡單的例子,需求是統計Nginx上每一個域名的的流量,這個徹底能夠經過一個簡單的Nginx模塊去完成,可是若是統計的是不一樣來源的流量就沒法作了。因此須要原始的完整的日誌。redis

有種手法是業務程序把日誌經過網絡直接發送出去,可是這並不可取,由於網絡和接收端並不徹底可靠,當出問題時會對業務形成影響或者日誌丟失。所以,對業務侵入最小最天然的方式是把日誌落到本地硬盤上。數據庫

二、數據平臺設計架構

 

2.1 Agent設計需求

每臺機器上會有一個Agent去同步這些日誌,這是個典型的隊列模型,業務進程在不斷的push,Agent在不停的pop。Agent須要有記憶功能,用來保存同步的位置(offset),這樣才儘量保證數據準確性,但不可能作到徹底準確。因爲發送數據和保存offset是兩個動做,不具備事務性,不可避免的會出現數據不一致性狀況,一般是發送成功後保存offset,那麼在Agent異常退出或機器斷電時可能會形成多餘的數據。安全

在這裏,Agent須要足夠輕,這主要體如今運維和邏輯兩個方面。Agent在每臺機器上都會部署,運維成本、接入成本是須要考慮的。Agent不該該有解析日誌、過濾、統計等動做,這些邏輯應該給數據消費者。假若Agent有較多的邏輯,那它是不可完成的,不可避免的常常會有升級變動動做。性能優化

2.2 數據收集流程

數據收集這塊的技術選擇,Agent是用Go本身研發的,消息中間件Kafka,數據傳輸工具Flume。說到數據收集常常有人拿Flume和Kafka作比較,我看來這二者定位是不一樣的,Flume更傾向於數據傳輸自己,Kakfa是典型的消息中間件用於解耦生產者消費者。服務器

具體架構上,Agent並沒把數據直接發送到Kafka,在Kafka前面有層由Flume構成的forward。這樣作有兩個緣由:網絡

1. Kafka的API對非JVM系的語言支持很不友好,forward對外提供更加通用的http接口。架構

2. forward層能夠作路由、Kafka topic和Kafka partition key等邏輯,進一步減小Agent端的邏輯。併發

forward層不含狀態,徹底能夠作到水平擴展,不用擔憂成爲瓶頸。出於高可用考慮,forward一般不止一個實例,這會帶來日誌順序問題,Agent按必定規則(round-robin、failover等)來選擇forward實例,即便Kafka partition key同樣,因爲forward層的存在,最終落入Kafka的數據順序和Agent發送的順序可能會不同。咱們對亂序是容忍的,由於產生日誌的業務基本是分佈式的,保證單臺機器的日誌順序意義不大。若是業務對順序性有要求,那得把數據直接發到Kafka,並選擇好partition key,Kafka只能保證partition級的順序性。運維

2.3 跨機房收集要點

多機房的情形,經過上述流程,先把數據匯到本地機房Kafka 集羣,而後匯聚到核心機房的Kafka,最終供消費者使用。因爲Kafka的mirror對網絡不友好,這裏咱們選擇更加的簡單的Flume去完成跨機房的數據傳送。Flume在不一樣的數據源傳輸數據仍是比較靈活的,但有幾個點須要注意:

1. memory-channel效率高但可能有丟數據的風險,file-channel安全性高但性能不高。咱們是用memory-channel,但把capacity設置的足夠小,使內存中的數據儘量少,在乎外重啓和斷電時丟的數據不多。我的比較排斥file-channel,效率是一方面,另外一個是對Flume的指望是數據傳輸,引入file-channel時,它的角色會向存儲轉變,這在整個流程中是不合適的。一般Flume的sink端是Kafka和HDFS這種可用性和擴張性比較好的系統,不用擔憂數據擁堵問題。

2. 默認的http souce 沒有設置線程池,有性能問題,若是有用到,須要本身修改代碼。

3. 單sink速度跟不上時,須要多個sink。像跨機房數據傳輸網絡延遲高單rpc sink吞吐上不去和HDFS sink效率不高情形,咱們在一個channel後會配十多個sink。

筆者注:

七牛遇到的flume入hdfs的性能問題我們這邊也遇到了。這邊開始也是提升sink的數量,可是這沒從根本解決問題,這個問題須要在flume收集日誌的機器進行消息合併纔是根本。單純增長sink數量會形成hdfs小文件過多,增長nn內存壓力。不是好的解決方案。

2.4 Kafka使用要點

Kafka在性能和擴展性很不錯,如下幾個點須要注意下:

1. topic的劃分,大topic對生產者有利且維護成本低,小topic對消費者比較友好。若是是徹底不相關的相關數據源且topic數不是發散的,優先考慮分topic。

2. Kafka的並行單位是partition,partition數目直接關係總體的吞吐量,但parition數並非越大越高,3個partition就能吃滿一塊普通硬盤IO了。因此partition數是由數據規模決定,最終仍是須要硬盤來抗。

3. partition key選擇不當,可能會形成數據傾斜。在對數據有順序性要求才需使用partition key。Kafka的producer sdk在沒指定partition key時,在必定時間內只會往一個partition寫數據,這種狀況下當producer數少於partition數也會形成數據傾斜,能夠提升producer數目來解決這個問題。

2.5 數據離線和實時計算

數據到Kafka後,一路數據同步到HDFS,用於離線統計。另外一路用於實時計算。因爲今天時間有限,接下來只能和你們分享下實時計算的一些經驗。

實時計算咱們選擇的Spark Streaming。咱們目前只有統計需求,沒迭代計算的需求,因此Spark Streaming使用比較保守,從Kakfa讀數據統計完落入mongo中,中間狀態數據不多。帶來的好處是系統吞吐量很大,但幾乎沒遇到內存相關問題

Spark Streaming對存儲計算結果的數據庫tps要求較高。好比有10萬個域名須要統計流量,batch interval爲10s,每一個域名有4個相關統計項,算下來平均是4萬 tps,考慮到峯值可能更高,固態硬盤上的mongo也只能抗1萬tps,後續咱們會考慮用redis來抗這麼高的tps

有外部狀態的task邏輯上不可重入的,當開啓speculation參數時候,可能會形成計算的結果不許確。說個簡單的例子。這個任務,若是被重作了,會形成落入mongo的結果比實際多。

有狀態的對象生命週期很差管理,這種對象不可能作到每一個task都去new一個。咱們的策略是一個JVM內一個對象,同時在代碼層面作好併發控制。相似下面。

 

在Spark1.3的後版本,引入了 Kafka Direct API試圖來解決數據準確性問題,使用Direct在必定程序能緩解準確性問題,但不可避免還會有一致性問題。爲何這樣說呢?Direct API 把Kafka consumer offset的管理暴露出來(之前是異步存入ZooKeeper),當保存計算結果和保存offset在一個事務裏,才能保證準確。

這個事務有兩種手段作到,一是用MySQL這種支持事務的數據庫保存計算結果offset,一是本身實現兩階段提交。這兩種方法在流式計算裏實現的成本都很大。其次Direct API 還有性能問題,由於它到計算的時候才實際從Kafka讀數據,這對總體吞吐有很大影響。

三、七牛數據平臺規模

 

要分享的就這些了,最後秀下咱們線上的規模:Flume + Kafka + Spark8臺高配機器,日均500億條數據,峯值80萬tps。

Refer:

[1] 美團 Spark 性能優化指南——基礎篇

http://tech.meituan.com/spark-tuning-basic.html

[2] 諸葛io基於Spark的用戶行爲路徑分析的產品化實踐

http://bit.ly/2n58OIA

相關文章
相關標籤/搜索