美圖收集的日誌須要經過 ETL 程序清洗、規整,並持久化地落地於 HDFS / Hive,便於後續的統一分析處理。mysql
圖 1sql
ETL?即 Extract-Transform-Load,用來描述將數據歷來源端通過抽取(extract)、轉換(transform)、加載(load)至目的端的過程。ETL?一詞較經常使用在數據倉庫,但其對象並不限於數據倉庫。數據庫
在美圖特有的業務環境下,ETL 須要作到如下需求:json
1.大數據量、高效地清洗落地。美圖業務繁多、用戶基數大、數據量龐大,除此以外業務方但願數據採集後就能快速地查詢到數據。多線程
2.靈活配置、知足多種數據格式。因爲不斷有新業務接入,當有新業務方數據接入時要作到靈活通用、增長一個配置信息就能夠對新業務數據進行清洗落地;同時每一個業務方的數據格式各式各樣,ETL 須要兼容多種通用數據格式,以知足不一樣業務的需求(如 json、avro、DelimiterText 等)。架構
3.約束、規範。須要知足數據庫倉庫規範,數據按不一樣層(STG 層、ODS 層等)、不一樣庫(default.db、meipai.db 等)、不一樣分區(必須指定時間分區)落地。app
4.容錯性。考慮業務日誌採集可能存在必定的髒數據,須要在達到特定的閾值時進行告警;而且可能出現?Hadoop 集羣故障、Kafka 故障等各類情況,所以須要支持數據重跑恢復。框架
ETL 有兩種形式:實時流 ETL 和 離線 ETL。oop
如圖 2 所示,實時流 ETL?一般有兩種形式:一種是經過 Flume 採集服務端日誌,再經過 HDFS 直接落地;另外一種是先把數據採集到 Kafka,再經過 Storm 或 Spark streaming 落地 HDFS,實時流 ETL 在出現故障的時候很難進行回放恢復。美圖目前僅使用實時流 ETL 進行數據注入和清洗的工做。大數據
圖 2
根據 Lambda 結構,若是實時流 ETL 出現故障須要離線 ETL 進行修補。離線 ETL 是從 Kafka拉取消息,通過 ETL 再從 HDFS 落地。爲了提升實時性及減輕數據壓力,離線 ETL 是每小時 05 分調度,清洗上一個小時的數據。爲了減輕? HDFS NameNode 的壓力、減小小文件,日期分區下同個 topic&partition 的數據是 append 追加到同一個日誌文件。
離線 ETL 採用 MapReduce 框架處理清洗不一樣業務的數據,主要是採用了分而治之的思想,可以水平擴展數據清洗的能力;
圖 3:離線 ETL 架構
如圖 3 所示,離線 ETL 分爲三個模塊:
離線 ETL 工做流程
圖 4
如圖 4 所示是離線 ETL 的基本工做流程:
1.kafka-etl 將業務數據清洗過程當中的公共配置信息抽象成一個 etl schema ,表明各個業務不一樣的數據;
2.在 kafka-etl 啓動時會從 zookeeper 拉取本次要處理的業務數據 topic&schema 信息;
3.kafka-etl 將每一個業務數據按 topic、partition 獲取的本次要消費的 offset 數據(beginOffset、endOffset),並持久化 mysql;
4.kafka-etl 將本次須要處理的 topic&partition 的 offset 信息抽象成 kafkaEvent,而後將這些 kafkaEvent 按照必定策略分片,即每一個 mapper 處理一部分 kafkaEvent;
5.RecordReader 會消費這些 offset 信息,解析 decode 成一個個 key-value 數據,傳給下游清洗處理;
6.清洗後的 key-value 統一經過 RecordWriter 數據落地 HDFS。
離線 ETL 的模塊實現
數據分片(Split)
咱們從 kafka 獲取當前 topic&partition 最大的 offset 以及上次消費的截止 offset ,組成本次要消費的[beginOffset、endOffset]kafkaEvent,kafkaEvent 會打散到各個 Mapper 進行處理,最終這些 offset 信息持久化到 mysql 表中。
圖 5
那麼如何保證數據不傾斜呢?首先經過配置自定義mapper個數,並建立對應個數的ETLSplit。因爲kafkaEevent包含了單個topic&partition以前消費的Offset以及將要消費的最大Offset,便可得到每一個 kafkaEvent 須要消費的消息總量。最後遍歷全部的 kafkaEevent,將當前 kafkaEevent 加入當前最小的 ETLSplit(經過比較須要消費的數據量總和,便可得出),經過這樣生成的 ETLSplit 能儘可能保證數據均衡。
數據解析清洗(Read)
圖 6
如圖 6 所示,首先每一個分片會有對應的 RecordReader 去解析,RecordReade 內包含多個 KafkaConsumerReader ,就是對每一個 KafkaEevent 進行消費。每一個 KafkaEevent 會對應一個?KafkaConsumer,拉取了字節數據消息以後須要對此進行 decode 反序列化,此時就涉及到 MessageDecoder 的結構。MessageDecoder 目前支持三種格式:
MessageDecoder 接收到?Kafka 的 key 和 value 時會對它們進行反序列化,最後生成 ETLKey 和 ETLValue。同時?MessageDecoder 內包含了 Injector,它主要作了以下事情:
過程當中還有涉及到 DebugFilter,它將 SDK 調試設備的日誌過濾,不落地到 HDFS。
多文件落地(Write)
因爲 MapReduce 自己的 RecordWriter 不支持單個落地多個文件,須要進行特殊處理,而且 HDFS 文件是不支持多個進程(線程)writer、append,因而咱們將 KafkaKey+ 業務分區+ 時間分區 + Kafka partition 定義一個惟一的文件,每一個文件都是會到帶上 kafka partition 信息。同時對每一個文件建立一個?RecordWriter。
圖 7
如圖 7 所示,每一個?RecordWriter 包含多個?Writer ,每一個 Writer 對應一個文件,這樣能夠避免同一個文件多線程讀寫。目前是經過 guava cache 維護 writer 的數量,若是 writer 太多或者太長時間沒有寫訪問就會觸發 close 動做,待下批有對應目錄的 kafka 消息在建立 writer 進行 append 操做。這樣咱們能夠作到在同一個 map 內對多個文件進行寫入追加。?
檢測數據消費完整性 (Commit)
圖 8
MapReduce Counter 爲提供咱們一個窗口,觀察統計 MapReduce job 運行期的各類細節數據。而且它自帶了許多默認 Counter,能夠檢測數據是否完整消費:
reader_records: 解析成功的消息條數;
decode_records_error: 解析失敗的消息條數;
writer_records: 寫入成功的消息條數;
...
最後經過本次要消費 topic offset 數量、reader_records 以及 writer_records 數量是否一致,來確認消息消費是否完整。
*容許必定比例的髒數據,若超出限度會生成短信告警
?/ ETL 系統核心特徵 /?
數據補跑及其優化
ETL 是如何實現數據補跑以及優化的呢?首先了解一下須要重跑的場景:
*當用戶調用 application kill 時會經歷三個階段:1) kill SIGTERM(-15) pid;2) Sleep for 250ms;3)kill SIGKILL(-9) pid 。
那麼有哪些重跑的方式呢?
如圖 9 所示是第三種重跑方式的總體流程,ETL 是按照小時調度的,首先將數據按小時寫到臨時目錄中,若是消費失敗會告警通知並重跑消費當前小時。若是落地成功則合併到倉庫目錄的目標文件,合併失敗一樣會告警通知並人工重跑,將小文件合併成目標文件。
圖 9
優化後的重跑狀況分析以下表所示:
自動水平擴展
如今離線 Kafka-ETL 是每小時 05 分調度,每次調度的 ETL 都會獲取每一個 topic&partition 當前最新、最大的 latest offset,同時與上個小時消費的截止 offset 組合成本地要消費的 kafkaEvent。因爲每次獲取的 latest offset 是不可控的,有些狀況下某些 topic&partition 的消息 offset 增加很是快,同時 kafka topic 的 partition 數量來不及調整,致使 ETL 消費處理延遲,影響下游的業務處理流程:
Kafka ETL 是否能自動水平擴展不強依賴於 kafka topic partition 的個數。若是某個 topic kafkaEvent 須要處理的數據過大,評估在合理時間範圍單個 mapper 能消費的最大的條數,再將 kafkaEvent 水平拆分紅多個子 kafkaEvent,並分配到各個 mapper 中處理,中老年品牌這樣就避免單個 mapper 單次須要處理過大 kafkaEvent 而致使延遲,提升水平擴展能力。拆分的邏輯如圖 10 所示:
圖 10
後續咱們將針對如下兩點進行自動水平擴展的優化: