如何基於 Spark Streaming 構建實時計算平臺

GitChat 做者:潘國慶
原文:如何基於 Spark Streaming 構建實時計算平臺
關注微信公衆號:「GitChat 技術雜談」 一本正經的講技術html

###前言前端

隨着互聯網技術的迅速發展,用戶對於數據處理的時效性、準確性與穩定性要求愈來愈高,如何構建一個穩定易用並提供齊備的監控與預警功能的實時計算平臺也成了不少公司一個很大的挑戰。java

自2015年攜程實時計算平臺搭建以來,通過兩年多不斷的技術演進,目前實時集羣規模已達上百臺,平臺涵蓋各個SBU與公共部門數百個實時應用,整年JStorm集羣穩定性達到100%。目前實時平臺主要基於JStorm與Spark Streaming構建而成,相信關注攜程實時平臺的朋友在去年已經看到一篇關於攜程實時平臺的分享:攜程實時大數據平臺實踐分享git

本次分享將着重於介紹攜程如何基於Spark Streaming構建實時計算平臺,文章將從如下幾個方面分別闡述平臺的構建與應用:github

  • Spark Streaming vs JStorm正則表達式

  • Spark Streaming設計與封裝redis

  • Spark Streaming在攜程的實踐算法

  • 曾經踩過的坑sql

  • 將來展望數據庫

###Spark Streaming vs JStorm

攜程實時平臺在接入Spark Streaming以前,JStorm已穩定運行有一年半,基本可以知足大部分的應用場景。接入Spark Streaming主要有如下幾點考慮:首先攜程使用的JStorm版本爲2.1.1版本,此版本的JStorm封裝與抽象程度較低,並無提供High Level抽象方法以及對窗口、狀態和Sql等方面的功能支持,這大大的提升了用戶使用JStorm實現實時應用的門檻以及開發複雜實時應用場景的難度。在這幾個方面,Spark Streaming表現就相對好的多,不但提供了高度集成的抽象方法(各類算子),而且用戶還能夠與Spark SQL相結合直接使用SQL處理數據。

其次,用戶在處理數據的過程當中每每須要維護兩套數據處理邏輯,實時計算使用JStorm,離線計算使用Hive或Spark。爲了下降開發和維護成本,實現流式與離線計算引擎的統一,Spark爲此提供了良好的支撐。

最後,在引入Spark Streaming以前,咱們重點分析了Spark與Flink兩套技術的引入成本。Flink當時的版本爲1.2版本,Spark的版本爲2.0.1。相比較於Spark,Flink在SQL與MLlib上的支持相對弱於Spark,而且公司許多部門都是基於Spark SQL與MLlib開發離線任務與算法模型,使得大大下降了用戶使用Spark的學習成本。

下圖簡單的給出了當前咱們使用Spark Streaming與JStorm的對比:

enter image description here
enter image description here

###Spark Streaming設計與封裝

在接入Spark Streaming的初期,首先須要考慮的是如何基於現有的實時平臺無縫的嵌入Spark Streaming。原先的實時平臺已經包含了許多功能:元數據管理、監控與告警等功能,因此第一步咱們先針對Spark Streaming進行了封裝並提供了豐富的功能。整套體系總共包含了Muise Spark Core、Muise Portal以及外部系統。

####Muise Spark Core

Muise Spark Core是咱們基於Spark Streaming實現的二次封裝,用於支持攜程多種消息隊列,其中Hermes Kafka與源生的Kafka基於Direct Approach的方式消費數據,Hermes Mysql與Qmq基於Receiver的方式消費數據。接下來將要講的諸多特性主要是針對Kafka類型的數據源。

Muise spark core主要包含了如下特性:

  • Kafka Offset自動管理

  • 支持Exactly Once與At Least Once語義

  • 提供Metric註冊系統,用戶可註冊自定義metric

  • 基於系統與用戶自定義metric進行預警

  • Long running on Yarn,提供容錯機制

Kafka Offset自動管理

封裝muise spark core的第一目標就是簡單易用,讓用戶以最簡單的方式可以上手使用Spark Streaming。首先咱們實現了幫助用戶自動讀取與存儲Kafka Offset的功能,用戶無需關心Offset是如何被處理的。其次咱們也對Kafka Offset的有效性進行了校驗,有的用戶的做業可能在中止了較長時間後從新運行會出現Offset失效的情形,咱們也對此做了對應的操做,目前的操做是將失效的Offset設置爲當前有效的最老的Offset。下圖展示了用戶基於muise spark core編寫一個Spark streaming做業的簡單示例,用戶只須要短短几行代碼便可完成代碼的初始化並建立好對應的DStream:

enter image description here
enter image description here

默認狀況下,做業每次都是基於上次存儲的Kafka Offset繼續消費,可是用戶也能夠自行決定Offset的消費起點。下圖中展現了設置消費起點的三種方式:

enter image description here
enter image description here

Exactly Once的實現

若是實時做業要實現端對端的exactly once則須要數據源、數據處理與數據存儲的三個階段都保證exactly once的語義。目前基於Kafka Direct API加上Spark RDD算子精確一次的保證可以實現端對端的exactly once的語義。在數據存儲階段通常實現exactly once須要保證存儲的過程是冪等操做或事務操做。不少系統自己就支持了冪等操做,好比相同數據寫hdfs同一個文件,這自己就是冪等操做,保證了屢次操做最終獲取的值仍是相同;HBase、ElasticSearch與redis等都可以實現冪等操做。對於關係型數據庫的操做通常都是可以支持事務性操做。

官方在建立DirectKafkaInputStream時只須要輸入消費Kafka的From Offset,而後其自行獲取本次消費的End Offset,也就是當前最新的Offset。保存的Offset是本批次的End Offset,下次消費從上次的End Offset開始消費。當程序宕機或重啓任務後,這其中存在一些問題。若是在數據處理完成前存儲Offset,則可能存在做業處理數據失敗與做業宕機等狀況,重啓後會沒法追溯上次處理的數據致使數據出現丟失。若是在數據處理完成後存儲Offset,可是存儲Offset過程當中發生失敗或做業宕機等狀況,則在重啓後會重複消費上次已經消費過的數據。並且此時又沒法保證重啓後消費的數據與宕機前的數據量相同數據至關,這又會引入另一個問題,若是是基於聚合統計指標做更新操做,這會帶來沒法判斷上次數據是否已經更新成功。

因此在muise spark core中咱們加入了本身的實現用以保證Exactly once的語義。具體的實現是咱們對Spark源碼進行了改造,保證在建立DirectKafkaInputStream能夠同時輸入From Offset與End Offset,而且咱們在存儲Kafka Offset的時候保存了每一個批次的起始Offset與結束Offset,具體格式以下:

enter image description here
enter image description here

如此作的用意在於可以確保不管是宕機仍是人爲重啓,重啓後的第一個批次與重啓前的最後一個批次數據如出一轍。這樣的設計使得後面用戶在後面對於第一個批次的數據處理很是靈活可變,若是用戶直接忽略第一個批次的數據,那此時保證的是at most once的語義,由於咱們沒法獲知重啓前的最後一個批次數據操做是否有成功完成;若是用戶依照原有邏輯處理第一個批次的數據,不對其作去重操做,那此時保證的是at least once的語義,最終結果中可能存在重複數據;最後若是用戶想要實現exactly once,muise spark core提供了根據topic、partition與offset生成UID的功能,只要確保兩個批次消費的Offset相同,則最終生成的UID也相同,用戶能夠根據此UID做爲判斷上個批次數據是否有存儲成功的依據。下面簡單的給出了重啓後第一個批次操做的行爲。

enter image description here
enter image description here

Metrics系統

Musie spark core基於Spark自己的metrics系統進行了改造,添加了許多定製的metrics,而且向用戶暴露了metrics註冊接口,用戶能夠很是方便的註冊本身的metrics並在程序中更新metrics的數值。最後全部的metrics會根據做業設定的批次間隔寫入Graphite,基於公司定製的預警系統進行報警,前端能夠經過Grafana展示各項metrics指標。

Muise spark core自己定製的metrics包含如下三種:

  • Fail 批次時間內spark task失敗次數超過4次便報警,用於監控程序的運行狀態。

  • Ack 批次時間內spark streaming處理的數據量小0便報警,用於監控程序是否在正常消費數據。

  • Lag 批次時間內數據消費延遲大於設定值便報警。

其中因爲咱們大部分做業開啓了Back Pressure功能,這就致使在Spark UI中看到每一個批次數據都能在正常時間內消費完成,然而可能此時kafka中已經積壓了大量數據,故每一個批次咱們都會計算當前消費時間與數據自己時間的一個平均差值,若是這個差值大於批次時間,說明自己數據消費就已經存在了延遲。

下圖展示了預警系統中,基於用戶自定義註冊的Metrics以及系統定製的Metrics進行預警。

enter image description here
enter image description here

容錯

其實在上面Exactly Once一章中已經詳細的描述了muise spark core如何在程序宕機後可以保證數據正確的處理。可是爲了可以讓Spark Sreaming可以長時間穩定的運行在Yarn集羣上,還須要添加許多配置,感興趣的朋友能夠查看:Long running Spark Streaming Jobs on Yarn Cluster

除了上述容錯保證以外,Muise Portal(後面會講)也提供了對Spark Streaming做業定時檢測的功能。目前每過5分鐘對當前全部數據庫中狀態標記爲Running的Spark Streaming做業進行狀態檢測,經過Yarn提供的REST APIs能夠根據每一個做業的Application Id查詢做業在Yarn上的狀態,若是狀態處於非運行狀態,則會嘗試重啓做業。

####Muise Portal

在封裝完全部的Spark Streaming以後,咱們就須要有一個平臺可以管理配置做業,Muise Portal就是這樣的存在。Muise Portal目前主要支持了Storm與Spark Streaming兩類做業,支持新建做業、Jar包發佈、做業運行與中止等一系列功能。下圖展示了新建做業的界面:

enter image description here
enter image description here

Spark Streaming做業基於Yarn Cluster模式運行,全部做業經過在Muise Portal上的Spark客戶端提交到Yarn集羣上運行。具體的一個做業運行流程以下圖所示:

enter image description here
enter image description here

####總體架構

最後這邊給出一下目前攜程實時平臺的總體架構。

enter image description here
enter image description here

###Spark Streaming在攜程的實踐

目前Spark Streaming在攜程的業務場景主要能夠分爲如下幾塊:ETL、實時報表統計、個性化推薦類的營銷場景以及風控與安全的應用。從抽象上來講,主要能夠分爲數據過濾抽取、數據指標統計與模型算法的使用。

####ETL

現在市面上有形形色色的工具能夠從Kafka實時消費數據並進行過濾清洗最終落地到對應的存儲系統,如:Camus、Flume等。相比較於此類產品,Spark Streaming的優點首先在於能夠支持更爲複雜的處理邏輯,其次基於Yarn系統的資源調度使得Spark Streaming的資源配置更加靈活,最後用戶能夠將Spark RDD數據轉換成Spark Dataframe數據,使得能夠與Spark SQL相結合,而且最終將數據輸出到HDFS和Alluxio等分佈式文件系統時能夠存儲爲Parquet之類的格式化數據,用戶在後續使用Spark SQL處理數據時更爲的簡便。

目前在ETL使用場景中較爲典型的是攜程度假部門的Data Lake應用,度假部門使用Spark Streaming對數據作ETL操做最終將數據存儲至Alluxio,期間基於muise-spark-core的自定義metric功能對數據的數據量、字段數、數據格式與重複數據進行了數據質量校驗與監控,具體的監控預警已在上面說過。

enter image description here
enter image description here

####實時報表統計

實時報表統計與展示也是Spark Streaming使用較多的一個場景,數據能夠基於Process Time統計,也能夠基於Event Time統計。因爲自己Spark Streaming不一樣批次的job能夠視爲一個個的滾動窗口,某個獨立的窗口中包含了多個時間段的數據,這使得使用Spark Streaming基於Event Time統計時存在必定的限制。通常較爲經常使用的方式是統計每一個批次中不一樣時間維度的累積值並導入到外部系統,如ES;而後在報表展示的時基於時間作二次聚合得到完整的累加值最終求得聚合值。下圖展現了攜程IBU基於Spark Streaming實現的實時看板。

enter image description here
enter image description here

####個性化推薦與風控安全

這兩類應用的共同點莫過於它們都須要基於算法模型對用戶的行爲做出相對應的預測或分類,攜程目前全部模型都是基於離線數據天天定時離線訓練。在引入Spark Streaming以後,許多部門開始積極的嘗試特徵的實時提取、模型的在線訓練。而且Spark Streaming能夠很好的與Spark MLlib相結合,其中最爲成功的案例爲信安部門之前是基於各種過濾條件抓取攻擊請求,後來他們採用離線模型訓練,Spark Streaming加Spark MLlib對用戶進行實時預測,性能上較JStorm(基於大量正則表達式匹配用戶,十分消耗CPU)提升了十倍,漏報率下降了20%。

###曾經踩過的坑

目前攜程的Spark Streaming做業運行的YARN集羣與離線做業同屬一個集羣,這對做業不管是性能仍是穩定性都帶來了諸多影響。尤爲是當YARN或者Hadoop集羣須要更新維護重啓服務時,在很大程度上會致使Spark Streaming做業出現報錯、掛掉等情況,雖然有諸多的容錯保障,但也會致使數據積壓數據處理延遲。後期將會獨立部署Hadoop與Yarn集羣,全部的實時做業都運行在獨立的集羣上,不受外部的影響,這也方便後期對於Flink做業的開發與維護。後期經過Alluxio實現主集羣與子集羣間的數據共享。

在使用過程當中,也遇到了形形色色不一樣的Bug,這邊簡單的介紹幾個較爲嚴重的問題。首先第一個問題是,Spark Streaming每一個批次Job都會經過DirectKafkaInputStream的comput方法獲取消費的Kafka Topic當前最新的offset,若是此時kafka集羣因爲某些緣由不穩定,就會致使 java.lang.RuntimeException: No leader found for partition xx的問題,因爲此段代碼運行在Driver端,若是沒有作任何配置和處理的狀況下,會致使程序直接掛掉。對應的解決方法是配置spark.streaming.kafka.maxRetries大於1,而且能夠經過配置refresh.leader.backoff.ms參數設置每次重試的間隔時間。

其次在使用Spark Streaming與Spark Sql相結合的過程當中,也會有諸多問題。好比在使用過程當中可能出現out of memory:PermGen space,這是因爲Spark sql使用code generator致使大量使用PermGen space,經過在spark.driver.extraJavaOptions中添加-XX:MaxPermSize=1024m -XX:PermSize=512m解決。還有Spark Sql須要建立Spark Warehouse,若是基於Yarn來運行,默承認能是在HDFS上建立相對應的目錄,若是沒有權限會報出Permission denied的問題,用戶能夠經過配置config("spark.sql.warehouse.dir", "file:${system:user.dir}/spark-warehouse")來解決。

###將來展望

上面主要針對Spark Streaming在攜程實時平臺中的運用作了詳細的介紹,在使用Spark Streaming過程當中仍是存在一些痛點,好比窗口功能比較單1、基於Event Time統計指標過於繁瑣以及官方在新的版本中基本沒有新的特性加入等,這使得咱們更加傾向於嘗試Flink。Flink基本實現了Google提出的各種實時處理的理念,引入了WaterMark的實現,感興趣的朋友能夠查看Google官方文檔:The world beyond batch: Streaming 102

目前Flink 1.4 release版本發佈在即,Spark 2.2.0基於kafka數據源的Structured Streaming也支持了更多的特性。前期咱們已對Flink作了充分的調研,下半年主要工做將放在Flink的對接上。在提供了諸多實時計算框架的支持後,隨之而來的是帶來了更多的學習成本,從此咱們的重心將放在如何使用戶更加容易的實現實時計算邏輯。其中Apache Beam對各類實時場景提供了良好的封裝並對多種實時計算引擎作了支持,其次基於Stream Sql實現複雜的實時應用場景都將是咱們主要調研的方向。


實錄:《潘國慶:基於 Spark Streaming 構建實時計算平臺實戰解析》

相關文章
相關標籤/搜索