攜程實時計算平臺架構與實踐丨DataPipeline

文 | 潘國慶 攜程大數據平臺實時計算平臺負責人前端

圖片描述

本文主要從攜程大數據平臺概況、架構設計及實現、在實現當中踩坑及填坑的過程、實時計算領域詳細的應用場景,以及將來規劃五個方面闡述攜程實時計算平臺架構與實踐,但願對須要構建實時數據平臺的公司和同窗有所借鑑。node

1、攜程大數據平臺之整體架構sql

攜程大數據平臺結構分爲三層:
應用層:開發平臺Zeus(分爲調度系統、Datax數據傳輸系統、主數據系統、數據質量系統)、查詢平臺(ArtNova報表系統、Adhoc查詢)、機器學習(基於tensorflow、spark等開源框架進行開發;GPU雲平臺基於K8S實現)、實時計算平臺Muise;數據庫

中間層:基於開源的大數據基礎架構,分爲分佈式存儲和計算框架、實時計算框架;
離線主要是基於Hadoop、HDFS分佈式存儲、分佈式離線計算基於Hive及Spark、KV存儲基於HBase、Presto和Kylin用於Adhoc以及報表系統;
實時計算框架底層是基於Kafka封裝的消息隊列系統Hermes, Qmq是攜程自研的消息隊列, Qmq主要用於定單交易系統,確保百分之百不丟失數據而打造的消息隊列。apache

底層:資源監控與運維監控,分爲自動化運維繫統、大數據框架設施監控、大數據業務監控。服務器

圖片描述

2、架構設計與實現架構

1.Muise平臺介紹框架

1)Muise是什麼
Muise,取自希臘神話的文藝女神繆斯之名,是攜程的實時數據分析和處理的平臺;Muise平臺底層基於消息隊列和開源的實時處理系統JStorm、Spark Streaming和Flink,可以支持秒級,甚至是毫秒級延遲的流式數據處理。運維

2)Muise的功能
數據源:Hermes Kafka/Mysql、Qmq;
數據處理:提供Muise JStorm/Spark/FlinkCore API消費Hermes或Qmq數據,底層使用Jstorm、Spark或實時處理數據,並提供本身封裝的API給用戶使用。API對接了全部數據源系統,方便用戶直接使用;
做業管理:Portal提供對於JStorm、Spark Streaming和Flink做業的管理,包含新建做業,上傳jar包以及發佈生產等功能;
監控和告警:使用Jstorm、Spark和Flink提供的Metrics框架,支持自定義的metrics;metrics信息中心化管理,接入Ops的監控和告警系統,提供全面的監控和告警支持,幫助用戶在第一時間內監控到做業是否發生問題。機器學習

2.Muise平臺現狀

平臺現狀:
Jstorm 2.1.一、Spark 2.0.一、Flink1.6.0、Kafka 2.0;
集羣規模:
13個集羣、200+臺機器150+Jstorm、50+Yarn、100+ Kafka;
做業規模:
11個業務線、350+Jstorm做業、120+SS/Flink做業;
消息規模:
Topic 1300+、增量 100T+ PD、Avg 200K TPS、Max 900K TPS;
消息延時:
Hermes 200ms之內、Storm 20ms之內;
消息處理成功率:
99.99%。

3.Muise平臺演進之路

2015 Q2~2015 Q3 :基於Storm開發實時計算平臺;
2016 Q1~2016 Q2 :Storm遷移JStorm、引入StreamCQL;
2017 Q1~2017 Q2 :Spark Streaming調研與接入;
2017 Q3~2018 Q1 :Flink調研與接入。

4.Muise平臺架構

1)Muise平臺架構
應用層:Muise Portal 目前主要支持了 Storm 與 Spark Streaming兩類做業,支持新建做業、Jar包發佈、做業運行與中止等一系列功能;

中間層:對底層Infrastructure作了封裝,爲用戶提供基於Storm、Spark、Flink相對應的API以及各方面Services;

底層:Hermes & Qmq是數據源、Redis、HBase、HDFS、DB等做爲外部的數據存儲、Graphite、Grafana、ES主要用於監控。

圖片描述

2)Muise實時計算流程
Producer端:用戶先申請Kafka的topic,而後將數據實時寫到Kafka中;
Muise Portal端:用戶基於咱們提供的API作開發,開發完之後經過Muise Portal配置、上傳和啓動做業;做業啓動後,jar包會分發到各個對應的集羣消費Kafka數據;
存儲端:數據在被消費以後能夠寫回QMQ或Kafka,也能夠存儲到外部系統Redis、HBase、HDFS/Hive、DB。

圖片描述

5.平臺設計 ——易用性

首先:做爲一個平臺設計第一要點就是要簡單易用,咱們提供綜合的Portal,便於用戶本身新建管理它的做業,方便開發實時做業第一時間可以上線;
其次:咱們封裝了不少Core API,支持多套實時計算框架:

  • 支持HermesKafka/MySQL 、QMQ;
  • 集成Jstorm、Spark Streaming、Flink;
  • 做業資源管控;
  • 提供DB、Redis、HBase和HDFS輸出組件;
  • 基於內置Metric系統定製多項metric進行做業預警監控;
  • 用戶可自定義Metric用於監控與預警;
  • 支持AtLeast Once 與Exactly Once語義。

上文講到平臺設計要易用,下面講平臺的容錯,確保數據必定不能出問題。

6.平臺設計——容錯

  • Jstorm:基於Acker機制確保At Least Once;
  • Spark Streaming:基於Checkpoint實現Exactly Once、基於Kafka Offset回溯實現At Least Once;
  • Flink:基於Flinktwo-phase commit + Kafka 0.11事務性支持實現Exactly Once。

7.Exactly Once

1)Direct Approach
當前大部分拿Spark Streaming消費Kafka的話,都是用Direct Approach的方式:
優勢:記錄每一個批次消費的Offset,做業可經過offset回溯;
缺點:數據存儲與offset存儲異步:
數據保存成功,應用宕機,offset未保存 (致使數據重複);
offset保存成功,應用宕機,數據保存失敗 (致使數據丟失);

2)CheckPoint

優勢:默認記錄每一個批次的運行狀態與源數據,宕機時可從cp目錄恢復;
缺點:
1.非100%保證ExactlyOnce;
https://www.iteblog.com/archi... 描述了沒法保證Exactly once的場景;
https://issues.apache.org/jir... 也存在doCheckPoint時出現塊丟失的狀況;
2.啓用cp帶來額外性能影響;
3.Streaming做業邏輯改變沒法從cp恢復。

適用場景:比較適合有狀態計算的場景;

使用方式:建議程序本身存儲offset,當發生宕機時,若是spark代碼邏輯沒有發生改變,則根據checkpoint目錄建立StreamingContext。若是發生改變,則根據實現本身存儲的offset建立context並設立新的checkpoint點。

圖片描述

8.平臺設計——監控與告警

如何可以第一時間幫用戶發現做業問題,是一個重中之重。

集羣監控

  • 服務器監控:考量的指標有Memory、CPU、Disk IO、Net IO;
  • 平臺監控:Ganglia;

做業監控

  • 基於實時計算框架原生Metric系統;
  • 定製Metrics反應做業狀態;
  • 採集原生與定製Metrics用於監控和告警;
  • 存儲:Graphite展 現:Grafana 告警:Appmon;

咱們如今定製的不少Metrics當中比較通用的是:

  • Fail:按期時間內,Jstorm數據處理失敗數量、Spark task Fail數量;
  • Ack:按期時間內,處理的數據量;
  • Lag:按期時間內,數據產生與被消費的中間延遲(kafka 2.0基於自帶bornTime)。

攜程開發了本身告警系統,將Metrics代入系統以後基於規則作告警。經過做業監控看板完成相關指標的監控和查看,咱們會把Flink做爲比較關心的Metrics指標,全都導入到Graphite數據庫裏面,而後基於前端Grafana作展示。經過做業監控看板,咱們可以直接看到Kafka to Flink Delay(Lag),至關於數據從產生到被Flink做業消費,中間延遲是62毫秒,速度相對比較快的。其次咱們監控了每次從Kafka中獲取數據的速度。由於從Kafka獲取數據是基於一小塊一小塊去獲取,咱們設置的是每次拉2兆的數據量。經過做業監控看板能夠監控到每次從Kafka拉取數據時候的平均延遲是25毫秒,Max是 760毫秒。

圖片描述

接下來說講咱們在這幾年踩到的一些坑以及如何填坑的。

3、踩坑與填坑

坑1:HermesUBT數據量大,埋點信息衆多,服務端與客戶端均承受巨大壓力;
解決方案:提供統一分流做業,基於特定規則與配置將數據分流至不一樣topic。

坑2:Kafka沒法保證全局有序;
解決方案:若是在強制全局有序的場景下,使用單Partition;若是在部分有序的狀況下,可基於某個字段做Hash,保證Partition內部有序。

坑3:Kafka沒法根據時間精確回溯到某時間段的數據;
解決方案:平臺提供過濾功能,過濾時間早於設定時間的數據(kafka 0.10以後每條數據都帶有本身的時間戳,因此這個問題在升級kafka以後天然而然的就解決了)。

坑4:最初,攜程全部的Spark Streaming、Flink做業都是跑在主機羣上面的,是一個大Hadoop集羣,目前是幾千臺規模,離線和實時是混布的,一旦一個大的離線做業上來時,會對實時做業有影響;其次是Hadoop集羣常常會作一些升級改造,因此可能會重啓Name Node或者Node Manager,這會致使做業有時會掛掉;
解決方案:咱們採用分開部署,單獨搭建實時集羣,獨立運行實時做業。離線歸離線,實時歸實時的,實時集羣單獨跑Spark Streaming跟Yarn的做業,離線專門跑離線的做業。

當分開部署後,會遇到新的問題,部分實時做業須要去一些離線做業作一些Join或 Feature的操做,因此也是須要訪問主機羣數據。這至關於有一個跨集羣訪問的問題。

坑5:Hadoop實時集羣跨集羣訪問主機羣;
解決方案:Hdfs-site.xml配置ns-prod、ns雙重namespace,分別指向本地與主機羣;

Spark配置spark.yarn.access.namenodes or hadoopFlieSystems

坑6:不管是Jstorm仍是接Storm都會遇到一個CPU搶佔的問題,當你上了一個大的做業,尤爲是那種消耗CPU特別厲害的,可能我給它分開了一個Worker,一個CPU Core,可是它最後有可能會給我用到3個甚至4個;
解決方案:啓用cgroup限制cpu使用率。

4、應用場景

1.實時報表統計

實時報表統計與展示也是Spark Streaming使用較多的一個場景,數據能夠基於Process Time統計,也能夠基於Event Time統計。因爲自己Spark Streaming不一樣批次的job能夠視爲一個個的滾動窗口,某個獨立的窗口中包含了多個時間段的數據,這使得使用SparkStreaming基於Event Time統計時存在必定的限制。通常較爲經常使用的方式是統計每一個批次中不一樣時間維度的累積值並導入到外部系統,如ES;而後在報表展示的時基於時間作二次聚合得到完整的累加值最終求得聚合值。下圖展現了攜程IBU基於Spark Streaming實現的實時看板。

圖片描述

2.實時數倉

1)Spark Streaming近實時存儲數據
現在市面上有形形色色的工具能夠從Kafka實時消費數據並進行過濾清洗最終落地到對應的存儲系統,如:Camus、Flume等。相比較於此類產品,Spark Streaming的優點首先在於能夠支持更爲複雜的處理邏輯,其次基於Yarn系統的資源調度使得Spark Streaming的資源配置更加靈活,用戶採用Spark Streaming實時把數據寫到HDFS或者寫到Hive裏面去。

2)基於各類規則做數據質量檢測
基於Spark Streaming,自定義metric功能對數據的數據量、字段數、數據格式與重複數據進行了數據質量校驗與監控。

3)基於自定義metric實時預警
基於咱們封裝提供的Metric註冊系統肯定一些規則,而後每一個批次基於這些規則作一個校驗,返回一個結果。這個結果會基於Metric sink吐出來,吐出來基於metrics的結果作一個監控。當前咱們採用Flink加載TensorFlow模型實時作預測。基本時效性是數據一旦到達兩秒鐘以內就可以把告警信息告出來,給用戶很是好的體驗。

圖片描述

5、將來規劃

1.Flink on K8S

在攜程內部有一些不一樣的計算框架,有實時計算的,有機器學習的,還有離線計算的,因此須要一個統一的底層框架來進行管理,所以在將來將Flink遷移到了K8S上,進行統一的資源管控。

2.Muise平臺接入Flink SQL

Muise平臺雖然接入了Flink,可是用戶仍是得手寫代碼,咱們開發了一個實時特徵平臺,用戶只須要寫SQL,即基於Flink的SQL就能夠實時採集用戶所須要的模型裏面或者用到的特徵。以後會把實時特徵平臺跟實時計算平臺作進行合併,用戶最後只須要寫SQL就能夠實現全部的實時做業實現。

3.Jstorm全面啓用Cgroup

當前因爲部分歷史緣由致使如今不少做業跑在Jstorm上面,所以出現了資源分配不均衡的狀況,以後會全面啓用Cgroup。

4.在線模型訓練

攜程部分部門須要實時在線模型訓練,經過用Spark訓練了模型以後,而後使用Spark Streaming的模型,實時作一個攔截或者控制,應用在風控等場景。

—end—

相關文章
相關標籤/搜索