本文主要介紹 Apache Flink 在同程藝龍的應用實踐,從當前同程藝龍實時計算平臺現狀、建設過程、易用性提高、穩定性優化四方面分享了同城藝龍實時計算平臺的建設經驗,供你們參考。html
在 2015 年初,爲了可以採集到用戶在 PC,APP 等平臺上的行爲軌跡,咱們開始開發實時應用。那時可選的技術架構仍是比較少的,實時計算框架這塊,當時比較主流的有 Storm 和 Spark-streaming。綜合考慮實時性,接入難度,咱們最終選擇使用基於 Storm 構建了第一個版本的用戶行爲軌跡採集框架。後續隨着實時業務的增多,咱們發現 Storm 已經遠遠不能知足咱們對數據端到端處理準確一次(Exactly-Once)語義的需求,而且對於流量高峯來臨時也不能平滑的背壓(BackPressure),在大規模集羣的支持上 Storm 也存在問題。通過充分的調研後,咱們在 2018 年初選擇基於 Flink 開發同程藝龍新一代實時計算平臺。git
目前實時計算平臺已支撐近千個實時任務運行,服務公司的市場、機票、火車票、酒店、金服、國旅、研發等各個業務條線。下面主要結合實時計算平臺來分享下咱們在 Flink 落地過程當中的一些實踐經驗及思考。github
在開發實時計算平臺前,咱們有過大量實時應用業務的經驗,咱們發現使用實時計算的業務方主要有兩類:算法
爲了更好的爲兩類用戶提供支持,實時計算平臺同時支持兩種類型的任務:FlinkSQL 和 FlinkStream。平臺總體架構如圖所示: apache
上圖的後端 RTC-FlinkSQL 模塊便是用來執行提交 FlinkSQL 任務的服務,SQL 屬於聲明式語言,通過 30、40 年的發展,具備很高的易用性、靈活性和表達性。雖然 Flink 提供了 Table & SQL API,可是咱們當時基於的 Flink 1.4 及 1.6 版本自己語法也不支持像 Create Table 這樣的 DDL 語法,而且在須要關聯到外部數據源的時候 Flink 也沒有提供 SQL 相關的實現方式。編程
此外根據其提供的 API 接口編寫 TableSource 和 TableSink 異常繁瑣,不只要了解 Flink 各類 Operator 的 API,還要對各個組件的相關接入和調用方式有必定了解(好比 Kafka、RocketMQ、Elasticsearch、HBase、HDFS 等),所以對於只熟悉 SQL 進行數據分析的人員直接編寫 FlinkSQL 任務須要較大的學習成本。後端
鑑於以上緣由,咱們構建了實時計算平臺的 RTC-FlinkSQL 開發模塊並對 FlinkSQL 進行擴展,讓這部分用戶在使用 FlinkSQL 的時候只須要關心作什麼,而不須要關心怎麼作。不須要過多的關心程序的實現,而是專一於業務邏輯。api
經過 Yarn Client 提交構建好的 Flink 任務,提交成功返回 ApplicationID性能優化
這裏主要是根據上述 validator 階段獲取的 Source 配置信息,根據指定參數實例化出該對象,而後調用 registerTableSource 方法將 TableSource 註冊到 environment,從而完成了源表的註冊。 網絡
Flink Table 輸出 Operator 基類是 TableSink,咱們這裏繼承的是 AppendStreamTableSink,根據上述 validator 階段獲取的 Sink 配置信息,根據指定參數實例化出該對象,而後調用 registerTableSink 方法將 TableSink 註冊到 environment。
繼承 ScalarFunction 或者繼承 TableFunction,須要從用戶提交的 SQL 中獲取要使用的自定義函數類名, 以後經過反射獲取實例,判斷自定義 Function 屬於上述哪一種類型,而後調用 TableEnvironment.registerFunction 便可完成了 UDF 的註冊,最後用戶就能夠在 SQL中使用自定義的 UDF。
使用 Calcite 對上述 validator 階段獲取的可執行 SQL 進行解析,將 SQL 解析出一個語法樹,經過迭代的方式,搜索到對應的維表,並結合上述 validator 階段獲取的維表信息實例化對應的 SideOperator 對象,以後經過 RichAsyncFunction 算子生成新的 DataStream,最後從新註冊表並執行其餘 SQL,咱們同時支持帳號密碼直連和公司研發提供的 DAL 方式。
以下圖所示,能夠方便地在實時計算平臺上 FlinkSQL 編輯器內完成 FlinkSQL 任務的開發,目前線上運行有 500+ 的 FlinkSQL 任務在運行。
除了 FlinkSQL 外,平臺上還有一半的實時任務是一些業務場景更復雜,經過代碼來編寫開發的任務。對此咱們提供了 RTC-FlinkStream 模塊來讓用戶上傳本身本地打包後的 FAT-JAR,經過資源管理平臺來讓用戶對 JAR 作版本管理控制,方便用戶選擇運行指定的任務版本,FlinkStream 任務開發界面如圖所示。
這部分任務有些對資源使用需求比較大,咱們提供了任務容器配置的參數來讓用戶靈活的配置其 Task 併發,而且提供了自定義時間週期觸發保存點(savepoint)的功能。
平臺開發難度相對低,難的是如何提高平臺的易用性,由於開源組件如 Apache Flink 核心關注數據的處理流程,對於易用性這部分稍顯不足,因此在實時平臺功能開發過程當中要修改 Flink 組件的源碼來提高其易用性。
以 Flink 任務運行的指標(Metrics)監控來講,當 Flink 程序提交至集羣以後,咱們須要的是收集任務的實時運行 Metrics 數據,經過這些數據能夠實時監控任務的運行情況,例如,算子的 CPU 耗時、JVM 內存、線程數等。這些實時 Metrics 指標對任務的運維、調優等有着相當重要的做用,方便及時發現報警,進行調整。
經過對比現有的指標採集系統,包括 InfluxDB、StatsD、Datadog 等系統再結合公司的指標收集系統,咱們最終決定採用 Prometheus 做爲指標系統。可是在開發過程當中咱們發現 Flink 只支持 Prometheus 的拉模式收集數據,此模式須要提早知道集羣的運行主機以及端口等信息,適合於單集羣模式。
而做爲企業用戶,更多的是將 Flink 任務部署在 YARN 等集羣上,此時,Flink 的 JobManager、TaskManager 的運行是由 YARN 統一調度,主機以及是端口都是動態的,而 Flink 只支持的拉模式難以知足咱們需求。因此咱們經過增長 Prometheus 的 Pushgateway 來進行指標的收集,此模式屬於推模式,架構如圖所示。同時,咱們也積極的向社區貢獻了這個新特性[4] ,目前 PR 已經被合併,詳情見 FLINK-9187。
在完成 Flink Pushgateway 的相關工做後,爲了方便用戶查看本身 Flink 任務的吞吐量,處理延遲等重要監控信息,咱們爲用戶配置了監控頁面,方便用戶在實時計算平臺上快速定位出任務性能問題,如經過咱們實時平臺監控頁面提供的圖表,具體指標爲 flink_taskmanager_job_task_buffers_outPoolUsage 來快速判斷實時任務的 Operator 是否存在反壓狀況[2]。
在使用過程當中咱們也發現了 Flink Metrics 中衡量端到端的 Opertor Latency 的指標存在漂移,致使監控不許確問題。咱們也修復了該問題[5]並反饋給了社區,詳情見FLINK-11887。
提高平臺易用性還有一個重要的地方就是日誌,日誌分爲操做日誌,啓動日誌,業務日誌,運行歷史等日誌信息。其中比較難處理的就是用戶代碼中打印的業務日誌。由於 Flink 任務是分佈式執行的,不一樣的 TaskManager 的處理節點都會有一份日誌,業務看日誌要分別打開多個 TaskManager 的日誌頁面。
而且Flink任務是屬於長運行的任務,用戶代碼中打印的日誌是打印在 Flink WebUI 上。此時會面臨一個問題,當任務運行的時間越長,日誌量會愈來愈多,原生自帶的日誌頁面將沒法打開。爲了方便用戶查看日誌,解決用戶沒法獲取到實時任務的日誌信息,同時也爲了方便用戶根據關鍵詞進行歷史日誌的檢索,咱們在實時計算平臺爲用戶提供了一套實時日誌系統功能,開發人員能夠實時地搜索任務的日誌。
而且系統採用無侵入式架構,架構圖見下圖,在用戶程序無感知的狀況下,實時採集日誌,並同步到 Elasticsearch 中,當業務須要檢索日誌時,可經過 Elasticsearch 語法進行檢索。
計算組件每每處於大數據的中間位置,上游承接 MQ 等實時數據源,下游對接 HDFS、HBase 等大數據存儲,經過 Flink 這些實時組件將數據源和數據目標串聯在一塊兒。爲了不混亂,這個過程每每須要經過數據血緣來作管理。然而常見的數據血緣管理的開源項目如 Apache Atlas 等並未提供對 Flink 的支持,而 Flink 自身也沒有提供相應的 Hook 來抽取用戶代碼的中的數據源等信息。
爲了解決這個問題,咱們修改了 Flink Client 提交過程,在 CliFrontend 中增長一個 notify 環節,經過 ContextClassLoader 和反射在 Flink 任務提交階段將 Flink 生成的 StreamGraph 內的各個 StreamNode 抽取出來,這樣就能夠在提交時候獲取出用戶編寫的 Flink 任務代碼中關鍵數據源等配置信息,從而爲後續的 Flink 數據血緣管理提供支持。其關鍵代碼以下:
Flink 採用了 Chandy-Lamport 的快照算法來保證一致性和容錯性,在實時任務的運行期間是經過 Checkpoint [1]機制來保障的。若是升級程序,重啓程序,任務的運行週期結束,window 內的狀態或使用 mapstate 的帶狀態算子(Operator)所保存的數據就會丟失了,爲了解決這個問題,給用戶提供平滑升級程序方案從而保障數據準確處理,咱們實時計算平臺提供了從外部觸發 Savepoint 功能,在用戶手動重啓任務的時候,能夠選擇最近一段時間內執行成功的保存點來恢復本身的程序。平臺從保存點恢復任務操做如圖所示。
雖然咱們提供了通用的實時計算平臺,可是有些用戶想使用 Flink,除此以外還須要在平臺上增長些更符合其業務特色的功能,對此咱們也開放了咱們實時計算平臺的 API 接口給到業務方,讓業務根據其自身場景特色來加速實時應用的變現和落地。
前面介紹了咱們在實時計算平臺易用性方面如:SQL,監控,日誌,血緣,保存點等功能點上作的開發工做,其實除了平臺功能開發以外還有更多的工做內容是用戶沒有感知到的。如保障實時應用運行穩定性,在這方面咱們積累了不少實踐經驗,與此同時咱們也在 Github 上創建了 Tongcheng-Elong 組織,並將修復後的源代碼貢獻到 Apache 社區。其中有十幾個 patch 已經被社區接收合併。接下來分享一些咱們遇到的穩定性問題和提供的解決方案。
咱們在集羣運維過程當中發現,在偶發的狀況下,Flink 任務會在 YARN 集羣上空跑。此時,在 YARN 層面的現象是任務處於 RUNNING 狀態,可是進入到 Flink WebUI,會發現此時全部的 TaskManager 所有退出,並無任務在運行。這個狀況下,會形成的 YARN 資源的浪費,同時也給運維人員帶來困擾,爲何 TaskManager 都退出了,JobManager 不退出呢?甚至給平臺監控任務運行狀態帶來誤判,認爲任務還在運行,但實際任務早掛了。
這個問題比較難定位,首先發生這種狀況很少,可是一旦出現影響很大。其次,沒有異常堆棧信息,沒法定位到具體的根本緣由。咱們的解決方法是經過修改源碼,在多個可能的地方增長日誌埋點,以觀察並瞭解任務退出時 JobManager 所執行的處理邏輯。最終咱們定位到當任務失敗時,在默認的重試策略以後,會將信息歸檔到 HDFS 上。因爲是串行執行,因此若是在歸檔過程當中發生異常,則會中斷正常處理邏輯從而致使通知 JobManager 的過程不能成功執行。具體的執行邏輯見下圖。
梳理清楚邏輯以後,咱們發現社區也沒有修復這個問題。一樣,咱們也積極向社區進行提交PR修復6[8]。修復這個問題,須要經過 3 個 PR,逐步進行完善,詳情見 FLINK-1224六、FLINK-1221九、FLINK-12247。
咱們的存儲組件比較多,在使用 Flink-Connector 來讀寫相關存儲組件的如:RocketMQ、HDFS、Kudu、Elasticsearch 也發現過這些 Connector 的 Source/Sink 存在問題,咱們在修復以後也提交了 PR 反饋到社區:
對於該問題的臨時解決方案是在使用 Elasticsearch 6.x 的 RestHighLevelClient 的時候暫時中止使用 setBulkFlushInterval 配置, 而是經過 Flink 自身的 checkpoint 機制來觸發數據定時 Flush 到 ElasticSearch Server 端。真正完全解決辦法是構建單獨的線程池提供給 ReryHandler 來使用。隨後咱們也向 Elasticsearch 社區提交了 issue 及 PR 來修復這個問題 [10]。在這個過程當中發現也順便修復了 Flink 在任務重試時候 transport client 線程泄露[11]等問題詳情見 FLINK-11235。
咱們也遇到了 Flink 與 ZK 網絡問題,當 Jobmanager 與 ZK 的鏈接中斷以後,會將正在運行的任務當即中止。當集羣中任務不少時,可能因爲網絡抖動等緣由瞬斷時,會致使任務的重啓。而在咱們集羣上有上千的 Flink 應用,一旦出現網絡抖動,會使得大量 Flink 任務重啓,這個問題對集羣和任務的穩定性影響比較大。
根本緣由是 Flink 底層採用 Curator 的 LeaderLatch 作分佈式鎖服務,在 Curator-2.x 的版本中對於網絡瞬斷沒有容忍性,當由於網絡抖動、機器繁忙、zk集羣短暫無響應都會致使 curator 將狀態置爲 suspended,正是這個 suspended 狀態致使了全部任務的重啓。
咱們的解決辦法是先升級 Curator 版本到 4.x[12],而後在提高版本後再用 CuratorFrameworkFactory 來構造 CuratorFramework 時,經過使用 ConnectionStateErrorPolicy 將 StandardConnectionStateErrorPolicy 替換爲 SessionConnectionStateErrorPolicy,前者將 suspended 和 lost 都做爲 error,後者只是將 lost 做爲 error,而只有發生 error 的時候纔會取消 leadership,因此在通過修改以後,在進入 suspended 狀態時,再也不發生 leadership 的取消和從新選舉。咱們把這個問題和咱們的解決辦法也反饋給了社區,詳情見 FLINK-10052。
本文大體介紹了 Flink 在同程藝龍實時計算平臺實踐過程當中的一些工做和踩過的坑。對於大數據基礎設施來講平臺是基礎,除此以外還須要投入不少精力來提升 Flink 集羣的易用性和穩定性,這個過程當中要緊跟開源社區,由於隨着同程藝龍在大數據這塊應用場景愈來愈多,會遇到不少其它公司沒有遇到甚至沒有發現的問題,這個時候基礎設施團隊要有能力主動解決這些影響穩定性的風險點,而不是被動的等待社區來提供 patch。
因爲在 Flink 在 1.8 版本以前社區方向主要集中在 Flink Stream 處理這塊,咱們也主要應用 Flink 的流計算來替換 storm 及 spark streaming。可是隨着近期 Flink 1.9 的發佈,Blink 分支合併進入 Flink 主分支,咱們也打算在 Flink Batch 這塊嘗試一些應用來落地。
做者:同城藝龍數據中心 Flink 小分隊(謝磊、周生乾、李蘇興)
Reference:
[1]https://www.ververica.com/blo...
[2]https://www.cnblogs.com/Alone...
[3]https://www.cnblogs.com/Alone...
[4]https://issues.apache.org/jir...
[5]https://issues.apache.org/jir...
[6]https://issues.apache.org/jir...
[7]https://issues.apache.org/jir...
[8]https://issues.apache.org/jir...
[9]https://issues.apache.org/jir...
[10]https://github.com/elastic/el...
[11]https://issues.apache.org/jir...
[12]https://issues.apache.org/jir...
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:
https://developer.aliyun.com/...
首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊: