本文主要介紹 Flink Runtime 的做業執行的核心機制。首先介紹 Flink Runtime 的總體架構以及 Job 的基本執行流程,而後介紹在這個過程,Flink 是怎麼進行資源管理、做業調度以及錯誤恢復的。最後,本文還將簡要介紹 Flink Runtime 層當前正在進行的一些工做。算法
Flink 的總體架構如圖 1 所示。Flink 是能夠運行在多種不一樣的環境中的,例如,它能夠經過單進程多線程的方式直接運行,從而提供調試的能力。它也能夠運行在 Yarn 或者 K8S 這種資源管理系統上面,也能夠在各類雲環境中執行。緩存
針對不一樣的執行環境,Flink 提供了一套統一的分佈式做業執行引擎,也就是 Flink Runtime 這層。Flink 在 Runtime 層之上提供了 DataStream 和 DataSet 兩套 API,分別用來編寫流做業與批做業,以及一組更高級的 API 來簡化特定做業的編寫。本文主要介紹 Flink Runtime 層的總體架構。網絡
Flink Runtime 層的主要架構如圖 2 所示,它展現了一個 Flink 集羣的基本結構。Flink Runtime 層的整個架構主要是在 FLIP-6 中實現的,總體來講,它採用了標準 master-slave 的結構,其中左側白色圈中的部分便是 master,它負責管理整個集羣中的資源和做業;而右側的兩個 TaskExecutor 則是 Slave,負責提供具體的資源並實際執行做業。數據結構
其中,Master 部分又包含了三個組件,即 Dispatcher、ResourceManager 和 JobManager。其中,Dispatcher 負責接收用戶提供的做業,而且負責爲這個新提交的做業拉起一個新的 JobManager 組件。ResourceManager 負責資源的管理,在整個 Flink 集羣中只有一個 ResourceManager。JobManager 負責管理做業的執行,在一個 Flink 集羣中可能有多個做業同時執行,每一個做業都有本身的 JobManager 組件。這三個組件都包含在 AppMaster 進程中。多線程
基於上述結構,當用戶提交做業的時候,提交腳本會首先啓動一個 Client進程負責做業的編譯與提交。它首先將用戶編寫的代碼編譯爲一個 JobGraph,在這個過程,它還會進行一些檢查或優化等工做,例如判斷哪些 Operator 能夠 Chain 到同一個 Task 中。而後,Client 將產生的 JobGraph 提交到集羣中執行。此時有兩種狀況,一種是相似於 Standalone 這種 Session 模式,AM 會預先啓動,此時 Client 直接與 Dispatcher 創建鏈接並提交做業便可。另外一種是 Per-Job 模式,AM 不會預先啓動,此時 Client 將首先向資源管理系統 (如Yarn、K8S)申請資源來啓動 AM,而後再向 AM 中的 Dispatcher 提交做業。架構
看成業到 Dispatcher 後,Dispatcher 會首先啓動一個 JobManager 組件,而後 JobManager 會向 ResourceManager 申請資源來啓動做業中具體的任務。這時根據 Session 和 Per-Job 模式的區別, TaskExecutor 可能已經啓動或者還沒有啓動。若是是前者,此時 ResourceManager 中已有記錄了 TaskExecutor 註冊的資源,能夠直接選取空閒資源進行分配。不然,ResourceManager 也須要首先向外部資源管理系統申請資源來啓動 TaskExecutor,而後等待 TaskExecutor 註冊相應資源後再繼續選擇空閒資源進程分配。目前 Flink 中 TaskExecutor 的資源是經過 Slot 來描述的,一個 Slot 通常能夠執行一個具體的 Task,但在一些狀況下也能夠執行多個相關聯的 Task,這部份內容將在下文進行詳述。ResourceManager 選擇到空閒的 Slot 以後,就會通知相應的 TM 「將該 Slot 分配分 JobManager XX 」,而後 TaskExecutor 進行相應的記錄後,會向 JobManager 進行註冊。JobManager 收到 TaskExecutor 註冊上來的 Slot 後,就能夠實際提交 Task 了。併發
TaskExecutor 收到 JobManager 提交的 Task 以後,會啓動一個新的線程來執行該 Task。Task 啓動後就會開始進行預先指定的計算,並經過數據 Shuffle 模塊互相交換數據。負載均衡
以上就是 Flink Runtime 層執行做業的基本流程。能夠看出,Flink 支持兩種不一樣的模式,即 Per-job 模式與 Session 模式。如圖 3 所示,Per-job 模式下整個 Flink 集羣只執行單個做業,即每一個做業會獨享 Dispatcher 和 ResourceManager 組件。此外,Per-job 模式下 AppMaster 和 TaskExecutor 都是按需申請的。所以,Per-job 模式更適合運行執行時間較長的大做業,這些做業對穩定性要求較高,而且對申請資源的時間不敏感。與之對應,在 Session 模式下,Flink 預先啓動 AppMaster 以及一組 TaskExecutor,而後在整個集羣的生命週期中會執行多個做業。能夠看出,Session 模式更適合規模小,執行時間短的做業。dom
本節對 Flink 中資源管理與做業調度的功能進行更深刻的說明。實際上,做業調度能夠看作是對資源和任務進行匹配的過程。如上節所述,在 Flink 中,資源是經過 Slot 來表示的,每一個 Slot 能夠用來執行不一樣的 Task。而在另外一端,任務即 Job 中實際的 Task,它包含了待執行的用戶邏輯。調度的主要目的就是爲了給 Task 找到匹配的 Slot。邏輯上來講,每一個 Slot 都應該有一個向量來描述它所能提供的各類資源的量,每一個 Task 也須要相應的說明它所須要的各類資源的量。可是實際上在 1.9 以前,Flink 是不支持細粒度的資源描述的,而是統一的認爲每一個 Slot 提供的資源和 Task 須要的資源都是相同的。從 1.9 開始,Flink 開始增長對細粒度的資源匹配的支持的實現,但這部分功能目前仍在完善中。分佈式
做業調度的基礎是首先提供對資源的管理,所以咱們首先來看下 Flink 中資源管理的實現。如上文所述,Flink 中的資源是由 TaskExecutor 上的 Slot 來表示的。如圖 4 所示,在 ResourceManager 中,有一個子組件叫作 SlotManager,它維護了當前集羣中全部 TaskExecutor 上的 Slot 的信息與狀態,如該 Slot 在哪一個 TaskExecutor 中,該 Slot 當前是否空閒等。當 JobManger 來爲特定 Task 申請資源的時候,根據當前是 Per-job 仍是 Session 模式,ResourceManager 可能會去申請資源來啓動新的 TaskExecutor。當 TaskExecutor 啓動以後,它會經過服務發現找到當前活躍的 ResourceManager 並進行註冊。在註冊信息中,會包含該 TaskExecutor中全部 Slot 的信息。 ResourceManager 收到註冊信息後,其中的 SlotManager 就會記錄下相應的 Slot 信息。當 JobManager 爲某個 Task 來申請資源時, SlotManager 就會從當前空閒的 Slot 中按必定規則選擇一個空閒的 Slot 進行分配。當分配完成後,如第 2 節所述,RM 會首先向 TaskManager 發送 RPC 要求將選定的 Slot 分配給特定的 JobManager。TaskManager 若是尚未執行過該 JobManager 的 Task 的話,它須要首先向相應的 JobManager 創建鏈接,而後發送提供 Slot 的 RPC 請求。在 JobManager 中,全部 Task 的請求會緩存到 SlotPool 中。當有 Slot 被提供以後,SlotPool 會從緩存的請求中選擇相應的請求並結束相應的請求過程。
當 Task 結束以後,不管是正常結束仍是異常結束,都會通知 JobManager 相應的結束狀態,而後在 TaskManager 端將 Slot 標記爲已佔用但未執行任務的狀態。JobManager 會首先將相應的 Slot 緩存到 SlotPool 中,但不會當即釋放。這種方式避免了若是將 Slot 直接還給 ResourceManager,在任務異常結束以後須要重啓時,須要馬上從新申請 Slot 的問題。經過延時釋放,Failover 的 Task 能夠儘快調度回原來的 TaskManager,從而加快 Failover 的速度。當 SlotPool 中緩存的 Slot 超過指定的時間仍未使用時,SlotPool 就會發起釋放該 Slot 的過程。與申請 Slot 的過程對應,SlotPool 會首先通知 TaskManager 來釋放該 Slot,而後 TaskExecutor 通知 ResourceManager 該 Slot 已經被釋放,從而最終完成釋放的邏輯。
除了正常的通訊邏輯外,在 ResourceManager 和 TaskExecutor 之間還存在定時的心跳消息來同步 Slot 的狀態。在分佈式系統中,消息的丟失、錯亂不可避免,這些問題會在分佈式系統的組件中引入不一致狀態,若是沒有定時消息,那麼組件沒法從這些不一致狀態中恢復。此外,當組件之間長時間未收到對方的心跳時,就會認爲對應的組件已經失效,並進入到 Failover 的流程。
在 Slot 管理基礎上,Flink 能夠將 Task 調度到相應的 Slot 當中。如上文所述,Flink 還沒有徹底引入細粒度的資源匹配,默認狀況下,每一個 Slot 能夠分配給一個 Task。可是,這種方式在某些狀況下會致使資源利用率不高。如圖 5 所示,假如 A、B、C 依次執行計算邏輯,那麼給 A、B、C 分配分配單獨的 Slot 就會致使資源利用率不高。爲了解決這一問題,Flink 提供了 Share Slot 的機制。如圖 5 所示,基於 Share Slot,每一個 Slot 中能夠部署來自不一樣 JobVertex 的多個任務,可是不能部署來自同一個 JobVertex 的 Task。如圖5所示,每一個 Slot 中最多能夠部署同一個 A、B 或 C 的 Task,可是能夠同時部署 A、B 和 C 的各一個 Task。當單個 Task 佔用資源較少時,Share Slot 能夠提升資源利用率。 此外,Share Slot 也提供了一種簡單的保持負載均衡的方式。
基於上述 Slot 管理和分配的邏輯,JobManager 負責維護做業中 Task執行的狀態。如上文所述,Client 端會向 JobManager 提交一個 JobGraph,它表明了做業的邏輯結構。JobManager 會根據 JobGraph 按並發展開,從而獲得 JobManager 中關鍵的 ExecutionGraph。ExecutionGraph 的結構如圖 5 所示,與 JobGraph 相比,ExecutionGraph 中對於每一個 Task 與中間結果等均建立了對應的對象,從而能夠維護這些實體的信息與狀態。
在一個 Flink Job 中是包含多個 Task 的,所以另外一個關鍵的問題是在 Flink 中按什麼順序來調度 Task。如圖 7 所示,目前 Flink 提供了兩種基本的調度邏輯,即 Eager 調度與 Lazy From Source。Eager 調度如其名子所示,它會在做業啓動時申請資源將全部的 Task 調度起來。這種調度算法主要用來調度可能沒有終止的流做業。與之對應,Lazy From Source 則是從 Source 開始,按拓撲順序來進行調度。簡單來講,Lazy From Source 會先調度沒有上游任務的 Source 任務,當這些任務執行完成時,它會將輸出數據緩存到內存或者寫入到磁盤中。而後,對於後續的任務,當它的前驅任務所有執行完成後,Flink 就會將這些任務調度起來。這些任務會從讀取上游緩存的輸出數據進行本身的計算。這一過程繼續進行直到全部的任務完成計算。
在 Flink 做業的執行過程當中,除正常執行的流程外,還有可能因爲環境等緣由致使各類類型的錯誤。總體上來講,錯誤可能分爲兩大類:Task 執行出現錯誤或 Flink 集羣的 Master 出現錯誤。因爲錯誤不可避免,爲了提升可用性,Flink 須要提供自動錯誤恢復機制來進行重試。
對於第一類 Task 執行錯誤,Flink 提供了多種不一樣的錯誤恢復策略。如圖 8 所示,第一種策略是 Restart-all,即直接重啓全部的 Task。對於 Flink 的流任務,因爲 Flink 提供了 Checkpoint 機制,所以當任務重啓後能夠直接從上次的 Checkpoint 開始繼續執行。所以這種方式更適合於流做業。第二類錯誤恢復策略是 Restart-individual,它只適用於 Task 之間沒有數據傳輸的狀況。這種狀況下,咱們能夠直接重啓出錯的任務。
因爲 Flink 的批做業沒有 Checkpoint 機制,所以對於須要數據傳輸的做業,直接重啓全部 Task 會致使做業從頭計算,從而致使必定的性能問題。爲了加強對 Batch 做業,Flink 在1.9中引入了一種新的Region-Based的Failover策略。在一個 Flink 的 Batch 做業中 Task 之間存在兩種數據傳輸方式,一種是 Pipeline 類型的方式,這種方式上下游 Task 之間直接經過網絡傳輸數據,所以須要上下游同時運行;另一種是 Blocking 類型的試,如上節所述,這種方式下,上游的 Task 會首先將數據進行緩存,所以上下游的 Task 能夠單獨執行。基於這兩種類型的傳輸,Flink 將 ExecutionGraph 中使用 Pipeline 方式傳輸數據的 Task 的子圖叫作 Region,從而將整個 ExecutionGraph 劃分爲多個子圖。能夠看出,Region 內的 Task 必須同時重啓,而不一樣 Region 的 Task 因爲在 Region 邊界存在 Blocking 的邊,所以,能夠單獨重啓下游 Region 中的 Task。
基於這一思路,若是某個 Region 中的某個 Task 執行出現錯誤,能夠分兩種狀況進行考慮。如圖 8 所示,若是是因爲 Task 自己的問題發生錯誤,那麼能夠只重啓該 Task 所屬的 Region 中的 Task,這些 Task 重啓以後,能夠直接拉取上游 Region 緩存的輸出結果繼續進行計算。
另外一方面,如圖若是錯誤是因爲讀取上游結果出現問題,如網絡鏈接中斷、緩存上游輸出數據的 TaskExecutor 異常退出等,那麼還須要重啓上游 Region 來從新產生相應的數據。在這種狀況下,若是上游 Region 輸出的數據分發方式不是肯定性的(如 KeyBy、Broadcast 是肯定性的分發方式,而 Rebalance、Random 則不是,由於每次執行會產生不一樣的分發結果),爲保證結果正確性,還須要同時重啓上游 Region 全部的下游 Region。
除了 Task 自己執行的異常外,另外一類異常是 Flink 集羣的 Master 進行發生異常。目前 Flink 支持啓動多個 Master 做爲備份,這些 Master 能夠經過 ZK 來進行選主,從而保證某一時刻只有一個 Master 在運行。當前活路的 Master 發生異常時,某個備份的 Master 能夠接管協調的工做。爲了保證 Master 能夠準確維護做業的狀態,Flink 目前採用了一種最簡單的實現方式,即直接重啓整個做業。實際上,因爲做業自己可能仍在正常運行,所以這種方式存在必定的改進空間。
Flink目前仍然在Runtime部分進行不斷的迭代和更新。目前來看,Flink將來可能會在如下幾個方式繼續進行優化和擴展:
本文爲雲棲社區原創內容,未經容許不得轉載。