【Flink】flink 內部 Akka and Actors

使用Akka,全部遠程過程調用如今都實現爲異步消息。 這主要影響JobManager,TaskManager和JobClient的組件。 未來,甚至可能會將更多的組件轉換爲參與者,從而容許它們發送和處理異步消息。安全

Akka and the Actor Model

Akka是開發併發,容錯和可擴展應用程序的框架。 它是參與者模型的實現,所以相似於Erlang的併發模型。 在參與者模型的上下文中,全部代理實體都被視爲獨立的參與者。 角色經過彼此發送異步消息與其餘角色進行通訊。 參與者模型的優點在於這種異步性。 也能夠顯式等待響應,以便您執行同步操做。 可是,強烈建議不要使用同步消息,由於它們會限制系統的可伸縮性。 每一個參與者都有一個郵箱,其中存儲了收到的消息。 此外,每一個參與者都保持本身的孤立狀態。 下面是幾個參與者的示例網絡。網絡

角色具備單個處理線程,該線程輪詢角色的郵箱並連續處理接收到的消息。 做爲已處理消息的結果,參與者能夠更改其內部狀態,發送新消息或生成新參與者。 若是一個actor的內部狀態是從其處理線程內部專門操縱的,則無需使actor的狀態線程安全。 即便單個參與者本質上是順序的,由多個參與者組成的系統也是高度併發且可擴展的,由於處理線程在全部參與者之間共享。 這種共享也是爲何從不該該從參與者線程內部調用阻塞調用的緣由。 這樣的調用將阻止該線程被其餘參與者用來處理本身的消息。併發

Actor Systems

Actor系統是全部演員生活的容器。 它提供共享服務,例如計劃,配置和日誌記錄。 參與者系統還包含線程池,全部參與者線程都從該線程池中募集。框架

多角色系統能夠共存於一臺機器上。 若是actor系統以RemoteActorRefProvider啓動,則能夠從可能駐留在遠程計算機上的另外一個actor系統進行訪問。 演員系統自動識別演員消息是發給同一個演員系統中仍是遠程演員系統中的演員的。 在本地通訊的狀況下,可使用共享內存有效地傳輸消息。 在遠程通訊的狀況下,消息是經過網絡堆棧發送的。異步

全部參與者都按層次結構組織。 每一個新建立的actor都會將其建立actor做爲父級分配。 該層次結構用於監督。 每一個父母都有對其子女的監護權。 若是其中一個子項發生錯誤,則會通知他。 若是演員能夠解決問題,那麼他能夠繼續或從新開始孩子。 若是問題超出了他的處理範圍,他能夠將錯誤上報給本身的父母。 逐步升級錯誤僅表示當前層之上的層次結構層如今負責解決問題。分佈式

系統建立的第一個參與者由系統提供的守護者參與者/用戶監督。 角色層次在此進行了詳細說明。ide

Actors in Flink

Actors自己就是狀態和行爲的容器。 它是actor線程順序處理傳入的消息。 由於一個actor一次僅活動一個線程,因此它使用戶擺脫了易於出錯的鎖定和線程管理任務。 可是,必須確保僅今後參與者線程訪問參與者的內部狀態。 actor的行爲由接收函數定義,該函數爲每一個消息包含在接收到此消息時執行的某些邏輯。函數

Flink系統由三個必須通訊的分佈式組件組成:JobClient,JobManager和TaskManager。 JobClient從用戶那裏獲取Flink做業,並將其提交給JobManager。 而後JobManager負責編排做業執行。 首先,它分配所需的資源量。 這主要包括TaskManager上的執行插槽。spa

分配資源後,JobManager將做業的各個任務部署到相應的TaskManager中。一旦收到任務,TaskManager會生成一個執行任務的線程。 狀態更改(例如開始計算或完成計算)將發送回JobManager。 根據這些狀態更新,JobManager將引導做業執行直到完成。 做業完成後,其結果將發送回給JobClient,以告知用戶相關信息。 下圖描述了做業執行過程。線程

JobManager & TaskManager

JobManager是中央控制單元,負責執行Flink做業。 所以,它控制着資源分配,任務調度和狀態報告。

必須先啓動一個JobManager和一個或多個TaskManager,而後才能執行任何Flink做業。 而後TaskManager經過向JobManager發送RegisterTaskManager消息在JobManager上註冊。 JobManager經過``確認註冊''消息確認註冊成功。 若是TaskManager已在JobManager上註冊,則因爲發送了多個RegisterTaskManager消息,則JobManager返回一個AlreadyRegistered消息。 若是註冊被拒絕,則JobManager將以RefuseRegistration消息做爲響應。

經過向做業管理器發送帶有相應JobGraph的SubmitJob消息向做業管理器提交做業。 收到JobGraph後,JobManager將在JobGraph中建立一個ExecutionGraph,做爲分佈式執行的邏輯表示。 ExecutionGraph包含有關必須執行才能部署到TaskManager的任務的信息。

JobManager的調度程序負責在可用TaskManager上分配執行插槽。 在TaskManager上分配執行插槽後,帶有執行任務所需的全部必要信息的SubmitTask消息將發送到相應的TaskManager。 TaskOperationResult確認任務部署成功。 一旦部署並運行了提交做業的源,做業提交也被認爲是成功的。 JobManager經過發送帶有相應做業ID的成功消息來通知JobClient此狀態。

在TaskManager上運行的單個任務的狀態更新經過UpdateTaskExecutionState消息發送回JobManager。 使用這些更新消息,能夠更新ExecutionGraph以反映執行的當前狀態。

JobManager還充當數據源的輸入拆分分配器。 它負責在全部TaskManager之間分配工做,以便在可能的狀況下保留數據局部性。 爲了動態平衡負載,任務在完成對舊輸入的處理後,會請求新的輸入拆分。 該請求是經過將RequestNextInputSplit發送到JobManager來實現的。 JobManager用NextInputSplit消息響應。 若是沒有更多輸入拆分,則消息中包含的輸入拆分爲null。

任務被延遲部署到任務管理器。 這意味着消耗數據的任務僅在其生產者之一完成生產某些數據以後才部署。 生產者執行此操做後,就會將ScheduleOrUpdateConsumers消息發送到JobManager。 此消息代表,消費者如今能夠讀取新生成的數據。 若是使用任務還沒有運行,它將被部署到TaskManager。

JobClient

JobClient表明分佈式系統的面向用戶的組件。 它用於與JobManager進行通訊,所以它負責提交Flink做業,查詢已提交做業的狀態並接收當前正在運行的做業的狀態消息。

JobClient仍是您經過消息與之通訊的參與者。 存在與做業提交有關的兩條消息:SubmitJobDetached和SubmitJobWait。 第一條消息提交做業,並從接收任何狀態消息和最終做業結果中註銷。 若是您想以丟臉的方式將做業提交到Flink羣集,則分離模式很是有用。

SubmitJobWait消息將做業提交到JobManager並註冊以接收該做業的狀態消息。 在內部,這是經過生成輔助角色來完成的,該輔助角色用做狀態消息的接收者。 做業終止後,由JobManager將帶有持續時間和累加器結果的JobResultSuccess發送給產生的助手角色。 收到此消息後,輔助角色將消息轉發給客戶端,該客戶端最初發出了SubmitJobWait消息,而後終止。

Asynchronous vs. Synchronous Messages

Flink儘量嘗試使用異步消息並將響應做爲未來處理。 期貨和少數現有的阻塞調用都有一個超時,在此以後該操做將被視爲失敗。 這樣能夠防止消息丟失或分佈式組件崩潰時系統陷入僵局。 可是,若是您碰巧擁有很是大的羣集或緩慢的網絡,則可能會錯誤地觸發超時。 所以,能夠經過配置中的「 akka.ask.timeout」指定這些操做的超時時間。

演員能夠與其餘演員交談以前,必須爲其檢索ActorRef。 此操做的查找也須要超時。 爲了使Actor未啓動時系統快速故障,將查找超時設置爲比常規超時更小的值。 若是遇到查找超時的狀況,能夠經過配置中的「 akka.lookup.timeout」來增長查找時間。

Akka的另外一個特色是它設置了能夠發送的最大郵件大小的限制。 緣由是它保留了相同大小的序列化緩衝區,而且不想浪費內存。 若是因爲消息超出最大大小而遇到傳輸錯誤,則能夠經過配置中的「 akka.framesize」來增長幀大小。

Failure Detection

分佈式系統中的故障檢測對其魯棒性相當重要。 在商品集羣上運行時,老是會發生某些組件發生故障或沒法再訪問的狀況。 此類故障的緣由是多態的,從硬件故障到網絡中斷均可能形成故障。 一個強大的分佈式系統應該可以檢測出故障的組件並從中恢復。

Flink經過使用Akka的DeathWatch機制來檢測故障組件。 即便沒有受到該演員的監督,甚至不在另外一個演員系統中,DeathWatch也可讓演員觀看其餘演員。 一旦被觀看的演員死亡或沒法聯繫,終止消息就會發送給觀看的演員。 所以,在接收到這樣的消息時,系統能夠針對它採起步驟。 在內部,DeathWatch被實現爲心跳和故障檢測器,它基於心跳間隔,聽音暫停和故障閾值來估計演員什麼時候可能死亡。 能夠經過在配置中設置「 akka.watch.heartbeat.interval」值來控制心跳間隔。 能夠經過「 akka.watch.heartbeat.pause」指定可接受的心跳暫停。 心跳暫停應爲心跳間隔的倍數,不然丟失的心跳將直接觸發DeathWatch。 能夠經過「 akka.watch.threshold」指定故障閾值,它能夠有效地控制故障檢測器的靈敏度。 您能夠在此處找到有關DeathWatch機制和故障檢測器的更多詳細信息。

在Flink中,JobManager監視全部已註冊的TaskManager,而TaskManager監視JobManager。 這樣,兩個組件都知道什麼時候再也不可訪問另外一個組件。 JobManager的反應是將各個TaskManager標記爲已死,以防止未來的任務部署到該TaskManager。 此外,它將使當前正在此任務管理器上運行的全部任務失敗,並在其餘TaskManager上從新安排其執行時間。 若是TaskManager僅因暫時的鏈接丟失而被標記爲死,那麼一旦從新創建鏈接,它就能夠在JobManager中簡單地從新註冊本身。

TaskManager還監視JobManager。 此監視容許TaskManager在檢測到JobManager失敗時經過使全部當前正在運行的任務失敗來進入清除狀態。 此外,若是觸發的死亡僅由網絡擁塞或鏈接丟失引發,TaskManager將嘗試從新鏈接到JobManager。

原文地址

相關文章
相關標籤/搜索