美圖離線ETL實踐

美圖收集的日誌須要經過 ETL 程序清洗、規整,並持久化地落地於 HDFS / Hive,便於後續的統一分析處理。mysql


什麼是 ETL?

ETL 即 Extract-Transform-Load,用來描述將數據歷來源端通過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。ETL 一詞較經常使用在數據倉庫,但其對象並不限於數據倉庫。android

在美圖特有的業務環境下,ETL 須要作到如下需求:ios

1.大數據量、高效地清洗落地。美圖業務繁多、用戶基數大、數據量龐大,除此以外業務方但願數據採集後就能快速地查詢到數據。sql

2.靈活配置、知足多種數據格式。因爲不斷有新業務接入,當有新業務方數據接入時要作到靈活通用、增長一個配置信息就能夠對新業務數據進行清洗落地;同時每一個業務方的數據格式各式各樣,ETL 須要兼容多種通用數據格式,以知足不一樣業務的需求(如 json、avro、DelimiterText 等)。數據庫

3.約束、規範。須要知足數據庫倉庫規範,數據按不一樣層(STG 層、ODS 層等)、不一樣庫(default.db、meipai.db 等)、不一樣分區(必須指定時間分區)落地。json

4.容錯性。考慮業務日誌採集可能存在必定的髒數據,須要在達到特定的閾值時進行告警;而且可能出現 Hadoop 集羣故障、Kafka 故障等各類情況,所以須要支持數據重跑恢復。多線程


ETL 有兩種形式:實時流 ETL 和 離線 ETL。架構

以下圖所示,實時流 ETL 一般有兩種形式:一種是經過 Flume 採集服務端日誌,再經過 HDFS 直接落地;另外一種是先把數據採集到 Kafka,再經過 Storm 或 Spark streaming 落地 HDFS,實時流 ETL 在出現故障的時候很難進行回放恢復。美圖目前僅使用實時流 ETL 進行數據注入和清洗的工做。app

根據 Lambda 結構,若是實時流 ETL 出現故障須要離線 ETL 進行修補。離線 ETL 是從 Kafka拉取消息,通過 ETL 再從 HDFS 落地。爲了提升實時性及減輕數據壓力,離線 ETL 是每小時 05 分調度,清洗上一個小時的數據。爲了減輕 HDFS NameNode 的壓力、減小小文件,日期分區下同個 topic&partition 的數據是 append 追加到同一個日誌文件。負載均衡


離線 ETL 的架構設計及實現原理

離線 ETL 採用 MapReduce 框架處理清洗不一樣業務的數據,主要是採用了分而治之的思想,可以水平擴展數據清洗的能力;


如上圖所示,離線 ETL 分爲三個模塊:

  • Input(InputFormat):主要對數據來源(Kafka 數據)進行解析分片,按照必定策略分配到不一樣的 Map 進程處理;建立 RecordReader,用於對分片數據讀取解析,生成 key-value 傳送給下游處理。

  • Map(Mapper):對 key-value 數據進行加工處理。

  • Output (OutputFormat):建立 RecordWriter 將處理過的 key-value 數據按照庫、表、分區落地;最後在 commit 階段檢測消息處理的完整性。


離線 ETL 工做流程

上圖是離線 ETL 的基本工做流程:

1.kafka-etl 將業務數據清洗過程當中的公共配置信息抽象成一個 etl schema ,表明各個業務不一樣的數據;

2.在 kafka-etl 啓動時會從 zookeeper 拉取本次要處理的業務數據 topic&schema 信息;

3.kafka-etl 將每一個業務數據按 topic、partition 獲取的本次要消費的 offset 數據(beginOffset、endOffset),並持久化 mysql;

4.kafka-etl 將本次須要處理的 topic&partition 的 offset 信息抽象成 kafkaEvent,而後將這些 kafkaEvent 按照必定策略分片,即每一個 mapper 處理一部分 kafkaEvent;

5.RecordReader 會消費這些 offset 信息,解析 decode 成一個個 key-value 數據,傳給下游清洗處理;

6.清洗後的 key-value 統一經過 RecordWriter 數據落地 HDFS。


離線 ETL 的模塊實現

數據分片(Split)

咱們從 kafka 獲取當前 topic&partition 最大的 offset 以及上次消費的截止 offset ,組成本次要消費的[beginOffset、endOffset]kafkaEvent,kafkaEvent 會打散到各個 Mapper 進行處理,最終這些 offset 信息持久化到 mysql 表中。

那麼如何保證數據不傾斜呢?首先經過配置自定義 mapper 個數,並建立對應個數的 ETLSplit。因爲 kafkaEevent 包含了單個 topic&partition 以前消費的 Offset 以及將要消費的最大 Offset,便可得到每一個 kafkaEvent 須要消費的消息總量。最後遍歷全部的 kafkaEevent,將當前 kafkaEevent 加入當前最小的 ETLSplit(經過比較須要消費的數據量總和,便可得出),經過這樣生成的 ETLSplit 能儘可能保證數據均衡。


數據解析清洗(Read)

如上圖所示,首先每一個分片會有對應的 RecordReader 去解析,RecordReade 內包含多個 KafkaConsumerReader ,就是對每一個 KafkaEevent 進行消費。每一個 KafkaEevent 會對應一個 KafkaConsumer,拉取了字節數據消息以後須要對此進行 decode 反序列化,此時就涉及到 MessageDecoder 的結構。MessageDecoder 目前支持三種格式:

格式

涉及 topic

Avro

android、ios、ad_sdk_android...

Json

app-server-meipai、anti-spam...

DelimiterText

app-server-youyan、app-server-youyan-im...


MessageDecoder 接收到 Kafka 的 key 和 value 時會對它們進行反序列化,最後生成 ETLKey 和 ETLValue。同時 MessageDecoder 內包含了 Injector,它主要作了以下事情:

  • 注入 Aid:針對 arachnia agent 採集的日誌數據,解析 KafkaKey 注入日誌惟一標識 Aid;

  • 注入 GeoIP 信息:根據 GeoIP 解析 ip 信息注入地理信息(如 country_id、province_id、city_id);

  • 注入 SdkDeviceInfo: 自己實時流 ETL 會作注入 gid、is_app_new 等信息,可是離線 ETL 檢測這些信息是否完整,作進一步保障。


過程當中還有涉及到 DebugFilter,它將 SDK 調試設備的日誌過濾,不落地到 HDFS。


多文件落地(Write)

因爲 MapReduce 自己的 RecordWriter 不支持單個落地多個文件,須要進行特殊處理,而且 HDFS 文件是不支持多個進程(線程)writer、append,因而咱們將 KafkaKey+ 業務分區+ 時間分區 + Kafka partition 定義一個惟一的文件,每一個文件都是會到帶上 kafka partition 信息。同時對每一個文件建立一個 RecordWriter。

如上圖所示,每一個 RecordWriter 包含多個 Writer ,每一個 Writer 對應一個文件,這樣能夠避免同一個文件多線程讀寫。目前是經過 guava cache 維護 writer 的數量,若是 writer 太多或者太長時間沒有寫訪問就會觸發 close 動做,待下批有對應目錄的 kafka 消息在建立 writer 進行 append 操做。這樣咱們能夠作到在同一個 map 內對多個文件進行寫入追加。


檢測數據消費完整性 (Commit)


MapReduce Counter 爲提供咱們一個窗口,觀察統計 MapReduce job 運行期的各類細節數據。而且它自帶了許多默認 Counter,能夠檢測數據是否完整消費:

reader_records: 解析成功的消息條數;

decode_records_error: 解析失敗的消息條數;

writer_records: 寫入成功的消息條數;

...

最後經過本次要消費 topic offset 數量、reader_records 以及 writer_records 數量是否一致,來確認消息消費是否完整。

*容許必定比例的髒數據,若超出限度會生成短信告警


ETL 系統核心特徵

數據補跑及其優化

ETL 是如何實現數據補跑以及優化的呢?首先了解一下須要重跑的場景:

*當用戶調用 application kill 時會經歷三個階段:1) kill SIGTERM(-15) pid;2) Sleep for 250ms;3)kill SIGKILL(-9) pid 。


那麼有哪些重跑的方式呢?

以下圖所示是第三種重跑方式的總體流程,ETL 是按照小時調度的,首先將數據按小時寫到臨時目錄中,若是消費失敗會告警通知並重跑消費當前小時。若是落地成功則合併到倉庫目錄的目標文件,合併失敗一樣會告警通知並人工重跑,將小文件合併成目標文件。


優化後的重跑狀況分析以下表所示:


自動水平擴展

如今離線 Kafka-ETL 是每小時 05 分調度,每次調度的 ETL 都會獲取每一個 topic&partition 當前最新、最大的 latest offset,同時與上個小時消費的截止 offset 組合成本地要消費的 kafkaEvent。因爲每次獲取的 latest offset 是不可控的,有些狀況下某些 topic&partition 的消息 offset 增加很是快,同時 kafka topic 的 partition 數量來不及調整,致使 ETL 消費處理延遲,影響下游的業務處理流程:

  • 因爲擴容、故障等緣由須要補採集漏採集的數據或者歷史數據,這種狀況下 topic&&partition 的消息 offset 增加很是快,僅僅依賴 kafka topic partiton 擴容是不靠譜的,補採集完後面還得刪除擴容的 partition;

  • 週末高峯、節假日、6.1八、雙十一等用戶流量高峯期,收集的用戶行爲數據會比平時翻幾倍、幾十倍,可是一樣遇到來不及擴容 topic partition 個數、擴容後須要縮容的狀況;


Kafka ETL 是否能自動水平擴展不強依賴於 kafka topic partition 的個數。若是某個 topic kafkaEvent 須要處理的數據過大,評估在合理時間範圍單個 mapper 能消費的最大的條數,再將 kafkaEvent 水平拆分紅多個子 kafkaEvent,並分配到各個 mapper 中處理,這樣就避免單個 mapper 單次須要處理過大 kafkaEvent 而致使延遲,提升水平擴展能力。拆分的邏輯以下圖所示:

後續咱們將針對如下兩點進行自動水平擴展的優化:

  • 若是單個 mapper 處理的總消息數據比較大,將考慮擴容 mapper 個數並生成分片 split 進行負載均衡。

  • 每種格式的消息處理速度不同,分配時可能出現一些 mapper 負擔比較重,將給每一個格式配置必定的權重,根據消息條數、權重等結合一塊兒分配 kafkaEvent。

相關文章
相關標籤/搜索