G7在實時計算的探索與實踐

做者: 張皓java

G7業務快覽

G7主要經過在貨車上的傳感器感知車輛的軌跡、油耗、點熄火、載重、溫度等數據,將車輛、司機、車隊、貨主鏈接到一塊兒,優化貨物運輸的時效、安全、成本等痛點問題。node

整個數據是經過車載的傳感器設備採集,好比公司的Smart盒子,CTBox盒子,油感設備,溫度探頭等,將車輛數據上報到後端平臺,在後端平臺計算和處理,最後展現到用戶面前。mysql

G7的業務場景是典型的IoT場景:redis

  • 傳感器數據
  • 數據種類多
  • 數據質量差
  • 數據低延遲
  • 數據量大

其中,數據質量差的緣由是整個鏈條會很是的長,從傳感器採集的車輛的數據,經過網絡運營商將數據上報到後端服務器,再通過解析,mq,過濾,調用三方接口,業務處理,入庫,整個過程很是的長,形成數據在傳輸過程當中出現數據重複,數據缺失等。另一點,IoT場景須要數據傳輸的延遲很是低,好比進出區域報警,當車輛進入到某個電子圍欄中的時候須要觸發報警,這個時候須要快速產生報警事件,一般不能超過30s,不然時間太長車輛已經經過了某個電子圍欄區域再報警就沒有價值了。再一個,數據量也是很是大的,如今天天產生軌跡點20億+,天天產生數據量100億+,對計算性能的要求很是高。spring

實時計算選型

從上面的場景咱們能夠感知到,在G7的IoT場景須要的是一個低延遲,處理速度快的實時計算引擎。最開始咱們的一些架構是基於Lambda架構的,好比軌跡點計算,會使用實時計算引擎計算出實時數據,這份數據延遲比較低,可是數據不是很準確,另外須要用離線批量再計算一遍,這份數據一般比較準確,能夠用來修復實時數據。這樣作的缺點也比較明顯,一是程序須要維護兩套代碼:實時程序和離線程序,二是實時數據不許確,準確的數據延遲又過高。後來咱們驚喜的發現一種基於實時處理的架構體系Kappa。sql

Kappa的架構是強調數據的實時性,爲了保證數據的實時性有些延遲太多的數據它會建議丟棄,全部的計算邏輯只有在實時計算中,整個計算只有一套邏輯,數據從MQ中獲取,通過數據處理層計算和加工,最後落入到數據存儲層,對外提供數據查詢功能。相對Lambda架構,Kappa架構更加適合IoT領域。數據庫

針對Kappa架構,咱們對行業主流的實時流計算框架進行了對比:編程

分別對主流的流計算框架:Storm,Storm Trident,Spark Streaming,Google Cloud Dataflow,Flink作了對比。基於微批量的Spark Streaming和Storm Trident延遲比較高,從這點就不適合咱們的場景。Storm的延遲很低,可是數據一致性是At Least once,容錯機制比較複雜,流控會比較抖動,這些方面都是不太適合。其中,Flink的一致性保證(1.4版本還支持了end-to-end一致性),延遲比較低,容錯機制的開銷是比較小的(基於Chandy-Lamport的分佈式快照),流控是比較優雅的(算子之間的數據傳輸是基於分佈式內存阻塞隊列),應用邏輯與容錯是分離的(算子處理和分佈式快照checkpoint),基於以上咱們認爲flink是比較適合IoT這個場景的。後端

G7業務應用案例 Flink目前在G7的應用場景,主要有三方面:

  • 實時計算
  • 實時ETL
  • 統計分析

下面分別介紹下以上三個場景的使用。緩存

實時計算

在G7的場景中,有不少業務都屬於實時計算的範疇,好比進出區域事件,超速事件,怠速事件,超速事件,疲勞報警事件,危險駕駛報警,油耗計算,里程計算等。其中疲勞報警計算是最先開始嘗試使用flink來落地的。

疲勞報警業務模型

這是G7針對客戶推出的G7大屏,其中風險相關的部分是根據疲勞計算得出。

根據G7的大數據計算,由於疲勞駕駛形成貨車事故的比重佔到整個事故的20%。對疲勞駕駛進行報警和預警就顯得特別重要,能夠有效下降事故發生的可能性。

根據車輛行駛的里程,駕駛員行駛的里程,駕駛時長,判斷是否存在疲勞駕駛。若是超過報警閥值則報警,若是在報警閥值下面在預警閥值上面則預警。報警和預警都是下發語音到貨車駕駛室提醒司機。

這個業務場景中面臨的最大挑戰是實時性,穩定性。只有用最短的時間、最穩定的方式將告警下發到相關人員才能最大程度減小風險。

業務流程

在整個處理流程中,首先會去獲取疲勞配置,根據車輛的狀態信息和司機打卡信息與疲勞配置結合,判斷是否出現預警和報警。計算過程當中會把疲勞駕駛開始的狀態緩存起來,疲勞駕駛結束的時候獲取以前的狀態數據,匹配成功以後會生成一條完整的疲勞事件。中間會調用一些接口服務好比dubbo獲取車輛的配置數據、狀態數據,產生的疲勞報警則會調用下發語音的接口,疲勞事件結果也會存儲到hbase、mysql、kafka等。

Streaming模型

最後開發成Flink的程序,從頭到到尾分別由如下算子構成:消費kafka算子、類型轉換算子、數據過濾算子、異步調用第三方接口算子,窗口排序算子,疲勞處理業務邏輯算子,數據入庫算子組成。

這個過程,也是踩了很多坑,咱們也有一些心得體會:

  1. 算子表達儘可能單一
  2. 每一個算子儘可能內聚,算子間儘可能低耦合
  3. 算子打散,異步+多線程的性能發揮更好
  4. 單獨設置每一個算子單元的並行度,性能更優
  5. hash和balance根據狀況選擇:只有須要使用keyby和valuestate地方纔使用hash從新分佈數據。其餘地方儘可能使用balance而且上下游並行度一致,會將task串聯成一個線程,不會走網絡IO性能更高
  6. 使用Asynchronous I/O 調用dubbo接口,zuul,db,hbase等外部接口

實時ETL

有部分場景是數據簡單採集、處理,入庫,也就是實時ETL,包括從Kafka採集數據到HDFS、DB、HBase、ES、Kafka等,這部分工做能夠抽象成Flink的算子表達:Source -> Transformation -> Sink。

這部分一般能夠FlinkKafkaConumser、MapFunction、JDBCAppendTableSink這類代碼。以下:

統計分析

有部分場景須要有一些實時的統計分析,好比統計最近一小時內全國各城市,車輛總數,司機總數,疲勞事件,進出區域事件,打卡次數,點熄火事件等。這種場景,一般可使用Flink SQl的作實時分析,sql+窗口函數(固定窗口,滑動窗口)。代碼大體以下:

實時計算平臺開發和現狀

在業務上的成功落地,咱們也但願能把打造一個實時計算平臺,服務各條業務線,通過差很少3個月的打磨,內部代號爲Glink的實時計算平臺上線,大體的架構以下:

Glink主要由如下部分組成:

  1. HDFS分佈式文件系統。用來存儲flink任務中產生的checkpoint/savepoint數據,任務報、第三方依賴包的存儲和分發,任務運行中產生的臨時數據等;
  2. Yarn統一計算資源平臺。用來提供統一的分佈式計算資源平臺,任務提交,任務調度,任務執行,資源隔離功能。目前全部的flink任務都是經過yarn進行統一的計算資源管理;
  3. 性能監控AMP工具。使用點評開源的Cat,在此基礎上作二次開發並取名「天樞系統」。能夠提供程序的耗時9五、99線、平均耗時、最大耗時、java GC監控、線程監控、堆棧信息等;
  4. 集羣監控管理。機器資源監控使用zabbix,提供cpu、內存、磁盤io、網絡io、鏈接數、句柄監控。集羣資源監控和管理使用開源Ambari,提供自動化安裝、配置、集羣總體任務、內存、cpu資源、hdfs空間、yarn資源大小監控報警;
  5. 任務監控報警。使用flink提供的statsD reporter將數據上傳導時序數據庫InfluxDB,經過掃描Infludb數據繪製出task的處理流量,經過監控流量閥值低於預期值報警;
  6. 診斷調試。使用成熟的日誌查詢系統 es+logstash+kibana,經過採集每一個節點的日誌寫入到es中, 能夠在kibana中查詢關鍵信息獲取日誌內存,提供診斷和調優程序的線索;
  7. Flink APP 程序應用層。具體開發的flink應用程序,一般解決實時etl,統計分析,業務計算的場景;
  8. Glink任務管控平臺。將如下的功能進行封裝,提供統一的任務管理,運維管理功能。

實時計算平臺展現-任務管理

實時計算平臺展現-日誌和性能監控

平臺的部分功能介紹:

  1. 任務管理功能。提供任務發佈,修改,升級,中止,申請資源,資源審覈,啓動日誌查看功能;
  2. 運維管理功能。提供日誌查看,程序監控,任務監控,流量監控,異常報警等功能。

以上Glink實時計算平臺的功能,基本上知足用戶獨立完成從程序開發,發佈,調優,上線,運維的工做。

Glink-Framework開發框架

除了提供相應的平臺功能,還須要在flink的生態上提供比較好的封裝和工具類,所以咱們提供了開發工具的腳手架:Glink-Framework框架。

Glink-Framework提供如下封裝:

  1. 簡化pom文件,減小大量的依賴、插件配置;
  2. 三方調用集成:dubbo,zuul;
  3. 三方數據庫集成:mysql,redis;
  4. 多環境管理;
  5. 依賴版本管理;
  6. 代碼監測工具:checkstyle,pmd,findbugs。

平臺與業務方BP合做方式

另一方面,咱們認爲flink是有必定的技術門檻,特別對於以前沒有併發編程、集羣開發經驗的小夥伴,須要有一段時間的學習才能上手,針對這個痛點,咱們提出了技術BP的技術合做方式。咱們會根據業務的複雜度,平臺指派一至多名技術人員參與到業務方的整個開發和運維工做中,從需求分析到上線落地全程參與,後期還會有持續的技術分享和培訓幫助業務方學習開發能力。

踩坑

在整個平臺化,以及業務開發的過程當中,flink也踩坑很多,比較典型的下面一些。

  1. 並行度太多形成barrier對齊的花費時間更長,有個並行度28的子任務的對齊時間超過50s;
  2. Valuestate不能跨算子共享;
  3. flink1.3 kafka connector不支持partition增長;
  4. 與spring整合,出現handler匹配的問題;
  5. hadoop的包衝突形成,程序沒法正常啓動的問題且無異常;

其中比較有意思的是並行度太多,形成barrier對齊花費時間太多的問題。要理解這個問題首先要了解flink在生成checkpoint的過程當中,會在source的插入barrier與正常消息一塊兒往下游發射,算子中等到指定的brrier後會觸發checkpoint。以下圖所:

這是在一個流的狀況下,若是有多個流同時進入一個算子處理就會複雜一點。flink在作checkpoint的時候,發現有多個流進入一個算子,先進入這個算子的barrier對應的那段消息就會buffer到算子中等待另外的流對應的barrier也到達纔會觸發checkpoint,這個buffer再等待的過程稱爲checkpoint alignment(barrier對齊),以下圖:

在線上運行的某個程序的一些算子由於barrier對齊的時間超過50s,形成程序 checkpoint超時失敗。對於這個問題,咱們的調優策略是兩種,一是儘可能減小並行度,就是讓流入一個算子的流盡可能少,若是在4個之內barrier對齊的時間是比較少的。另一種方式,使用at least once的語義替換exactly once的語義,這樣checkpoint的時候不會去作barrier對齊,數據到了算子立刻作checkpoint併發送下游。目前 咱們的解決辦法是根據不一樣的業務場景來區分,若是使用at least once數據保證就能知足業務需求的儘可能用at least once語義。若是不支持的,就減小並行度以此減小barrier對齊的數據量和時間。

平臺收益

經過近段時間的平臺化建設,在」降本增效「方面的收益主要體如今如下幾個方面:

  1. 資源利用率提升。目前經過對整個集羣的監控,在混合部署的狀況下平均cpu利用率在20%左右,在某些cpu密集計算的業務cpu利用率會更高一些;
  2. 開發效率提高。好比ETL採集程序的開發,傳統開發採集數據、轉化、入庫大概須要1天左右時間,經過平臺化的方式開發簡單的ETL程序在1小時內完成開發;
  3. 數據處理量大。平均天天處理數據量在80億條以上;
  4. 業務覆蓋面廣。平臺上線業務30+,預計年內突破100+。服務於公司各條業務線,IoT平臺,EMS,FMS,智能掛車,企業解決方案,SaaS,硬件部門等。

將來規劃

將來對於flink的規劃,咱們主要仍是會圍繞「降本增效,提供統一的計算平臺」爲目標,主要聚焦在如下幾個方面:
1 .資源隔離更完全。目前的資源隔離使用yarn的默認隔離方式只是對內存隔離,後續須要使用yarn+cgroup對內存和cpu都作隔離。另外會考慮使用yarn的node label作完全機器級別隔離,針對不一樣的業務劃分不一樣類型的機器資源,例如高CPU的任務對應CPU密集型的機器,高IO的任務對應IO比較好的機器;

  1. 平臺易用性提升。平臺包括代碼發佈、debug、調試、監控、問題排查,一站式解決問題;
  2. 減小Code。經過使用Flink SQL+UDF函數的方式,將經常使用的方法和函數進行封裝,儘可能用SQL表達業務,提升開發效率。另外也會考慮CEP的模式匹配支持,目前不少業務均可以用動態CEP去支持;
  3. 通用的腳手架。在Glink-Framework上持續開發,提供更多的source、sink、工具等,業務封裝,簡化開發;

此篇文章,摘自於張皓在 「Flink China社區線下 Meetup·成都站」 的技術分享。

更多資訊請訪問 Apache Flink 中文社區網站

相關文章
相關標籤/搜索