探究 flink1.11 Application 模式

隨着流式計算的興起,實時分析成爲現代商業的利器。愈來愈多的平臺和公司基於Apache Flink 構建他們的實時計算平臺,並saas化。html

這些平臺旨在經過簡化應用的提交來下降最終用戶的使用負擔。一般的作法是,會提供一個諸如管理平臺的web程序,方便使用者提交應用,而且該平臺集成了一些權限,監控等內容。這個管理平臺我能夠叫做部署服務。web

可是如今這些平臺遇到一個大問題是部署服務是一個消耗資源比較大的服務,而且很難計算出實際資源限制。好比,若是咱們取負載的平均值,則可能致使部署服務的資源真實所需的值遠遠大於限制值,最壞的狀況是在必定時間影響全部的線上應用。可是若是咱們將取負載的最大值,又會形成不少沒必要要的浪費。基於此,Flink 1.11 引入了另一種部署選項 Application Mode, 該模式容許更加輕量級,可擴展的應用提交進程,將以前客戶端的應用部署能力均勻分散到集羣的每一個節點上。apache

爲了理解這個問題以及Application Mode 是如何解決這個問題,咱們將在下文介紹當前flink中應用執行的模式。緩存

Flink 中的應用執行

Flink中應用的執行會涉及到三部分:_Client,JobManager 和 TaskManagers。_Client 負責提交應用到集羣,JobManager 負責應用執行期間一些必要的記錄工做,TaskManager 負責具體的應用執行。具體的架構圖以下:網絡

當前部署模式

在引入Application Mode(Flink1.11) 以前,Flink 支持 Session 和 Per-Job 兩種mode,這兩種有不一樣的集羣生命週期和資源隔離。session

Session 模式

Session 模式假定已經存在一個集羣,並任何的提交的應用都在該集羣裏執行。所以會致使資源的競爭。該模式的優點是你無需爲每個提交的任務花費精力去分解集羣。可是,若是Job異常或是TaskManager 宕掉,那麼該TaskManager運行的其餘Job都會失敗。除了影響到任務,也意味着潛在須要更多的恢復操做,重啓全部的Job,會併發訪問文件系統,會致使該文件系統對其餘服務不可用。此外,單集羣運行多個Job,意味着JobManager更大的負載。這種模式適合啓動延遲很是重要的短時間做業。架構

Per-Job 模式

在Per-Job模式下,集羣管理器框架(例如YARN或Kubernetes)用於爲每一個提交的Job啓動一個 Flink 集羣。Job完成後,集羣將關閉,全部殘留的資源(例如文件)也將被清除。此模式能夠更好地隔離資源,由於行爲異常的Job不會影響任何其餘Job。另外,因爲每一個應用程序都有其本身的JobManager,所以它將記錄的負載分散到多個實體中。考慮到前面提到的Session模式的資源隔離問題,Per-Job模式適合長期運行的Job,這些Job能夠接受啓動延遲的增長以支持彈性。 併發

總而言之,在Session 模式下,集羣生命週期獨立於集羣上運行的任何Job,而且集羣上運行的全部Job共享其資源。Per-Job模式選擇爲每一個提交的Job承擔拆分集羣的費用,以提供更好的資源隔離保證,由於資源不會在Job之間共享。在這種狀況下,集羣的生命週期將與job的生命週期綁定在一塊兒。app

應用提交

Flink 應用的執行包含兩個階段:框架

  • pre-flight: 在main()方法調用以後開始。
  • runtime: 一旦用戶代碼調用 execute() 就會觸發該階段。

main()方法使用Flink的API(DataStream API,Table API,DataSet API)之一構造用戶程序。當main()方法調用env.execute()時,用戶定義的pipeline將轉換爲Flink運行時能夠理解的形式,稱爲job graph,並將其傳送到集羣中。

儘管有一些不一樣,可是 對於 Session 模式 和 Per-Job模式 , pre-flight 階段都是在客戶端完成的。

對於那些在本身本地計算機上提交任務的場景(本地計算機包含了全部運行Job所需的依賴),這一般不是問題。可是,對於經過諸如部署服務之類的遠程進行提交的場景,此過程包括:

  • 下載應用所需的依賴
  • 執行main()方法提取 job graph
  • 將依賴和 job graph 傳輸到集羣
  • 有可能須要等待結果

這樣客戶端大量消耗資源,由於它可能須要大量的網絡帶寬來下載依賴項並將二進制文件運送到集羣,而且須要CPU週期來執行main()方法。隨着更多用戶共享同一客戶端,此問題會更加明顯。

紅色,藍色和綠色表明3個應用程序,每一個應用程序三個併發。黑色矩形表明不一樣的進程:TaskManagers,JobManagers和 Deployer(集中式部署服務)。而且咱們假設在全部狀況下都只有一個Deployer進程。彩色三角形表示提交進程的負載,而彩色矩形表示TaskManager和JobManager進程的負載。如圖所示,不論是per-job 仍是 session 模式, 部署程序承擔相同的負載。它們的區別在於Job的分配和JobManager的負載。在session模式下,集羣中的全部做業只有一個JobManager,而在per-job模式下,每一個Job都有一個JobManager。另外,在session 模式下的Job 被隨機分配給TaskManager,而在per-job 模式下,每一個TaskManager只有單個Job。

Application 模式

Application 模式 嘗試去將per-job 模式的資源隔離性和輕量級,可擴展的應用提交進程相結合。爲了實現這個目的,它會每一個Job 建立一個集羣,可是 應用的main()將被在JobManager 執行。

每一個應用程序建立一個集羣,能夠看做建立僅在特定應用程序的Job之間共享的session集羣,並在應用程序完成時銷燬。經過這種架構,Application模式能夠提供與 per-job 模式相同的資源隔離和負載平衡保證,但前提是保證一個完整應用程序的粒度。顯然,屬於同一應用程序的Job應該被關聯起來,並視爲一個單元。

在JobManager 中執行 main()方法,更大大減輕客戶端的資源消耗。更進一步講,因爲每一個應用程序有一個JobManager,所以能夠更平均地分散網絡負載。上圖對此進行了說明,在該圖中,此次客戶端負載已轉移到每一個應用程序的JobManager。

在Application 模式下,與其餘模式不同的是,main() 方法在集羣上而不是在客戶端執行。這可能會對您的代碼產生影響,例如,您必須使用應用程序的JobManager能夠訪問使用registerCachedFile()在環境中註冊的任何路徑。

與per-job 模式相比,Application 模式容許提交由多個Job組成的應用程序。Job執行的順序不受部署模式的影響,但受啓動Job的調用的影響。使用阻塞的 execute()方法,將是一個順序執行的效果,結果就是"下一個"Job的執行被推遲到「該」Job完成爲止。相反,一旦提交當前做業,非阻塞executeAsync()方法將當即繼續提交「下一個」Job。

減小網絡需求

如上所述,經過在JobManager上執行應用程序的main()方法,Application 模式能夠節省不少提交應用所需的資源。可是仍有改進的空間。

專一於YARN, 由於社區對於yarn的優化支持更全面。即便使用 Application 模式,仍然須要客戶端將用戶jar發送到JobManager。此外,對於每一個應用程序,客戶端都必須將「 flink-dist」路徑輸送到集羣,該目錄包含框架自己的二進制文件,包括flink-dist.jarlib/plugin/ 目錄。這兩個能夠佔用客戶端大量的帶寬。此外,在每一個提交中傳送相同的flink-dist二進制文件不只浪費帶寬,並且浪費存儲空間,只需容許應用程序共享相同的二進制文件就能夠緩解。

對於Flink1.11 , 引入了下面的兩個選項可供你們使用:

  1. 指定目錄的遠程路徑,YARN能夠在該目錄中找到Flink分發二進制文件
  2. 指定YARN能夠在其中找到用戶jar的遠程路徑。

對於1.,咱們利用YARN的分佈式緩存,並容許應用程序共享這些二進制文件。所以,若是因爲先前在同一TaskManager上執行的應用程序而致使某個應用程序恰巧在其TaskManager的本地存儲上找到Flink的副本,則它甚至沒必要在內部下載它。

注意兩種優化均可用於YARN上的全部部署模式,而不只僅是Application模式。

示例: Application 模式 on Yarn

有關完整說明,請參閱正式的Flink文檔,尤爲是涉及集羣管理框架,例如YARN或Kubernetes。在這裏,咱們將提供有關YARN的一些示例:

Application 模式下,使用如下語句提交一個應用:

./bin/flink run-application -t yarn-application ./MyApplication.jar

使用此命令,全部配置參數均可以經過其配置選項(以-D爲前綴)來指定。有關可用配置選項的目錄,請參閱Flink的配置頁面

例如,用於指定JobManager和TaskManager的內存大小的命令以下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    ./MyApplication.jar

爲了進一步節省將Flink發行版傳送到集羣的帶寬,請考慮將Flink發行版預上傳到YARN能夠訪問的位置,並使用yarn.provided.lib.dirs配置選項,以下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
    ./MyApplication.jar

最後,爲了進一步節省提交應用程序jar所需的帶寬,您能夠將其預上傳到HDFS,並指定指向./MyApplication.jar的遠程路徑,以下所示:

./bin/flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=2048m \
    -Dtaskmanager.memory.process.size=4096m \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
    hdfs://myhdfs/jars/MyApplication.jar

這將使Job提交特別輕巧,由於所需的Flink jar和應用程序jar將從指定的遠程位置獲取,而不是由客戶端傳送到集羣。客戶端將惟一傳送到集羣的是你的應用程序配置,其中包括上述全部路徑。

PS: 本文屬於翻譯,原文

相關文章
相關標籤/搜索