本文從一個調試時候常見的異常 "TimeoutException: Heartbeat of TaskManager timed out"切入,爲你們剖析Flink的心跳機制。文中代碼基於Flink 1.10。html
你們若是常常調試Flink,當進入斷點看到了堆棧和變量內容以後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程序Resume的時候,你發現程序沒法運行,有以下提示:java
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.
這實在是很鬱悶的事情。做爲程序猿不能忍啊,既然異常提示中有 Heartbeat 字樣,因而咱們就來一塊兒看看Flink的心跳機制,看看有沒有能夠修改的途徑。apache
Flink有核心四大組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。緩存
這四大組件彼此之間的通訊須要依賴RPC實現。服務器
Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,全部的實體被認爲是獨立的actors。actors和其餘actors經過發送異步消息通訊。數據結構
Actor模型的強大來自於異步。它也能夠顯式等待響應,這使得能夠執行同步操做。可是強烈不建議同步消息,由於它們限制了系統的伸縮性。架構
RPC做用是:讓異步調用看起來像同步調用。併發
Flink基於Akka構建了其底層通訊系統,引入了RPC調用,各節點經過GateWay方式回調,隱藏通訊組件的細節,實現解耦。Flink整個通訊框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。app
RPC相關的主要接口以下:框架
RpcEndpoint是Flink RPC終端的基類,全部提供遠程過程調用的分佈式組件必須擴展RpcEndpoint,其功能由RpcService支持。
RpcEndpoint的子類只有四類組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個組件有RPC的能力,換句話說只有這四個組件有RPC的這個需求。
每一個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同肯定),每一個路徑對應一個Actor,其實現了RpcGateway接口,
RpcServer是RpcEndpoint的成員變量,爲RpcService提供RPC服務/鏈接遠程Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通訊方式依然是Akka)。
RpcServer用於啓動和鏈接到RpcEndpoint, 鏈接到rpc服務器將返回一個RpcGateway,可用於調用遠程過程。
Flink四大組件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,因此構建四大組件時,同步須要初始化RpcServer。如JobManager的構造方式,第一個參數就是須要知道RpcService。
Flink的RPC協議經過RpcGateway來定義;由前面可知,若想與遠端Actor通訊,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啓動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通訊時,必須讓其提供對應地址。
Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大組件經過各類方式實現了Gateway。以JobMaster爲例,JobMaster實現JobMasterGateway接口。各組件類的成員變量都有須要通訊的其餘組件的GateWay實現類,這樣可經過各自的Gateway實現RPC調用。
常見的心跳檢測有兩種:
Flink實現的是第二種方案。
Flink的心跳機制代碼在:
Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat
四個接口:
HeartbeatListener.java HeartbeatManager.java HeartbeatTarget.java HeartbeatMonitor.java
以及以下幾個類:
HeartbeatManagerImpl.java HeartbeatManagerSenderImpl.java HeartbeatMonitorImpl.java HeartbeatServices.java NoOpHeartbeatManager.java
Flink集羣有多種業務流程,好比Resource Manager, Task Manager, Job Manager。每種業務流程都有本身的心跳機制。Flink的心跳機制只是提供接口和基本功能,具體業務功能由各業務流程本身實現。
咱們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。可是檢測動做是Sender主動發起,即Sender主動發送請求探測receiver是否存活,由於Sender已經發送過來了探測心跳請求,因此這樣receiver同時也知道Sender是存活的,而後Reciver給Sender迴應一個心跳錶示本身也是活着的。
由於Flink的幾個名詞和咱們常見概念有所差異,因此流程上須要你們仔細甄別,即:
HeartbeatTarget是對監控目標的抽象。心跳機制在行爲上而言有兩種動做:
HeartbeatTarget的函數就是這兩個動做:
這兩個函數的參數也很簡單:分別是請求的發送放和接收方,還有Payload載荷。對於一個肯定節點而言,接收的和發送的載荷是同一類型的。
public interface HeartbeatTarget<I> { /** * Sends a heartbeat response to the target. * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported. */ // heartbeatOrigin 就是 Receiver void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload); /** * Requests a heartbeat from the target. * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request. */ // requestOrigin 就是 Sender void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload); }
對HeartbeatTarget的封裝,這樣Manager對Target的操做是經過對Monitor完成,後續會在其繼承類中詳細說明。
public interface HeartbeatMonitor<O> { // Gets heartbeat target. HeartbeatTarget<O> getHeartbeatTarget(); // Gets heartbeat target id. ResourceID getHeartbeatTargetId(); // Report heartbeat from the monitored target. void reportHeartbeat(); //Cancel this monitor. void cancel(); //Gets the last heartbeat. long getLastHeartbeat(); }
HeartbeatManager負責管理心跳機制,好比啓動/中止/報告一個HeartbeatTarget。此接口繼承HeartbeatTarget。
除了HeartbeatTarget的函數以外,這接口有4個函數:
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> { void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget); void unmonitorTarget(ResourceID resourceID); void stop(); long getLastHeartbeatFrom(ResourceID resourceId); }
用戶業務邏輯須要繼承這個接口以處理心跳結果。其能夠看作服務的輸出,實現了三個回調函數。
public interface HeartbeatListener<I, O> { void notifyHeartbeatTimeout(ResourceID resourceID); void reportPayload(ResourceID resourceID, I payload); O retrievePayload(ResourceID resourceID); }
以前提到Sender和Receiver,下面兩個類就對應上述概念。
幾個關鍵問題:
如何斷定心跳超時?
心跳服務啓動後,Flink在Monitor中經過 ScheduledFuture 會啓動一個線程來處理心跳超時事件。在設定的心跳超時時間到達後才執行線程。
若是在設定的心跳超時時間內接收到組件的心跳消息,會先將該線程取消然後從新開啓,重置心跳超時事件的觸發。
若是在設定的心跳超時時間內沒有收到組件的心跳,則會通知組件:你超時了。
什麼時候"調用雙方"發起心跳檢查?
心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另外一方(Receiver)則是對心跳作出響應,二者經過RPC相互調用,重置對方的 Monitor 超時線程。
以JobMaster和TaskManager爲例,JM在啓動時會開啓週期調度,向已經註冊到JM中的TM發起心跳檢查,經過RPC調用TM的requestHeartbeat方法,重置TM中對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用後,經過RPC調用JM的receiveHeartbeat,重置 JM 中對TM超時線程的調用,表示TM狀態正常。
如何處理心跳超時?
心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理線程,該線程經過調用HeartbeatListener.notifyHeartbeatTimeout
方法作後續重連操做或者直接斷開。
下面是一個概要(以RM & TM爲例):
RM : 實現了ResourceManagerGateway (能夠直接被RPC調用)
TM : 實現了TaskExecutorGateway (能夠直接被RPC調用)
RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有用戶定義的 TaskManagerHeartbeatListener
TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有用戶定義的ResourceManagerHeartbeatListener。
HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor
對於RM的每個須要監控的TM, 其生成一個HeartbeatTarget,進而被構形成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。
每個Target對應的Monitor中,有本身的異步任務ScheduledFuture,這個ScheduledFuture不停的被取消/從新生成。若是在某個期間內沒有被取消,則通知用戶定義的listener出現了timeout。
HearbeatManagerImpl是receiver的具體實現。它由 心跳 被髮起方(就是Receiver,例如TM) 建立,接收 發起方(就是Sender,例如 JM)的心跳發送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。
注意:被髮起方監控線程(Monitor)的開啓是在接收到請求心跳(requestHeartbeat被調用後)之後才觸發的,屬於被動觸發。
HearbeatManagerImpl主要維護了
一個心跳監控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
。這是一個KV關聯。
key表明要發送心跳組件(例如:TM)的ID,value則是爲當前組件建立的觸發心跳超時的線程HeartbeatMonitor,二者一一對應。
當一個從所聯繫的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啓一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知本身的HeartbeatListener。
一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。
heartbeatListener :處理心跳結果。
HearbeatManagerImpl 數據結構以下:
@ThreadSafe public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { /** Heartbeat timeout interval in milli seconds. */ private final long heartbeatTimeoutIntervalMs; /** Resource ID which is used to mark one own's heartbeat signals. */ private final ResourceID ownResourceID; /** Heartbeat listener with which the heartbeat manager has been associated. */ private final HeartbeatListener<I, O> heartbeatListener; /** Executor service used to run heartbeat timeout notifications. */ private final ScheduledExecutor mainThreadExecutor; /** Map containing the heartbeat monitors associated with the respective resource ID. */ private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets; /** Running state of the heartbeat manager. */ protected volatile boolean stopped; }
HearbeatManagerImpl實現的主要函數有:
繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)建立,實現了run函數(即它能夠做爲一個單獨線程運行),建立後當即開啓週期調度線程,每次遍歷本身管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable { public void run() { if (!stopped) { for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { requestHeartbeat(heartbeatMonitor); } // 週期調度 getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); } } // 主動發起心跳檢查 private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // 調用 Target 的 requestHeartbeat 函數 heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); } }
Heartbeat monitor管理心跳目標,它啓動一個ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable { /** Resource ID of the monitored heartbeat target. */ private final ResourceID resourceID; // 被監控的resource ID /** Associated heartbeat target. */ private final HeartbeatTarget<O> heartbeatTarget; //心跳目標 private final ScheduledExecutor scheduledExecutor; /** Listener which is notified about heartbeat timeouts. */ private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器 /** Maximum heartbeat timeout interval. */ private final long heartbeatTimeoutIntervalMs; private volatile ScheduledFuture<?> futureTimeout; // AtomicReference 使用 private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING); // 最近一次接收到心跳的時間 private volatile long lastHeartbeat; // 報告心跳 public void reportHeartbeat() { // 保留最近一次接收心跳時間 lastHeartbeat = System.currentTimeMillis(); // 接收心跳後,重置timeout線程 resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); } // 心跳超時,觸發lister的notifyHeartbeatTimeout public void run() { // The heartbeat has timed out if we're in state running if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { heartbeatListener.notifyHeartbeatTimeout(resourceID); } } // 重置TIMEOUT void resetHeartbeatTimeout(long heartbeatTimeout) { if (state.get() == State.RUNNING) { //先取消線程,在從新開啓 cancelTimeout(); // 啓動超時線程 futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS); // Double check for concurrent accesses (e.g. a firing of the scheduled future) if (state.get() != State.RUNNING) { cancelTimeout(); } } }
創建heartbeat receivers and heartbeat senders,主要是對外提供服務。這裏咱們能夠看到:
public class HeartbeatServices { // Creates a heartbeat manager which does not actively send heartbeats. public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) { return new HeartbeatManagerImpl<>(...); } // Creates a heartbeat manager which actively sends heartbeats to monitoring targets. public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) { return new HeartbeatManagerSenderImpl<>(...); } }
心跳管理服務在Cluster入口建立。由於咱們是調試,因此在MiniCluster.start調用。
public void start() throws Exception { ...... heartbeatServices = HeartbeatServices.fromConfiguration(configuration); ...... }
HeartbeatServices.fromConfiguration會從Configuration中獲取配置信息:
這個就是咱們解決最開始問題的思路:從配置信息入手,擴大心跳間隔。
public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { this.heartbeatInterval = heartbeatInterval; this.heartbeatTimeout = heartbeatTimeout; } public static HeartbeatServices fromConfiguration(Configuration configuration) { long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); }
系統中有幾個ResourceManager?整個 Flink 集羣中只有一個 ResourceManager。
系統中有幾個JobManager?JobManager 負責管理做業的執行。默認狀況下,每一個 Flink 集羣只有一個 JobManager 實例。JobManager 至關於整個集羣的 Master 節點,負責整個集羣的任務管理和資源管理。
系統中有幾個TaskManager?這個由具體啓動方式決定。好比Flink on Yarn,Session模式可以指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交做業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。好比:有一個做業,Parallelism爲10,numberOfTaskSlots爲1,則TaskManager爲10。
Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:
咱們以前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。
ResourceManager 級別最高,因此兩個HM都是Sender,監控taskManager和jobManager
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway, LeaderContender { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender }
JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager }
TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager }
以JobManager和TaskManager爲例。JM在啓動時會開啓週期調度,向已經註冊到JM中的TM發起心跳檢查,經過RPC調用TM的requestHeartbeat方法,重置對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用後,經過RPC調用JM的receiveHeartbeat,重置對TM超時線程的調用,表示TM狀態正常。
TM初始化生成了兩個Receiver HM。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The heartbeat manager for job manager in the task manager. */ private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager; /** The heartbeat manager for resource manager in the task manager. */ private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager; //初始化函數 this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId); this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId); }
生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。
此時,兩個HeartbeatManagerImpl中已經建立好對應monitor線程,只有在JM或者RM執行requestHeartbeat後,纔會觸發該線程的執行。
JM生成了一個Sender HM,一個Receiver HM。這裏會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager; private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; private void startHeartbeatServices() { taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), log); } }
JobMaster在啓動時候,會在startHeartbeatServices函數中生成兩個Sender HeartbeatManager。
taskManagerHeartbeatManager :HeartbeatManagerSenderImpl對象,會反覆啓動一個定時器,定時掃描須要探測的對象而且發送心跳請求。
jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啓動一個定時器,定時掃描須要探測的對象而且發送心跳請求。
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
咱們以TM與RM交互爲例。TaskExecutor啓動以後,須要註冊到RM和JM中。
流程圖以下:
* 1. Run in Task Manager * * TaskExecutor.onStart //Life cycle * | * +----> startTaskExecutorServices@TaskExecutor * | //開始TM服務 * | * +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); * | // 開始鏈接到RM * | // start by connecting to the ResourceManager * | * +----> notifyLeaderAddress@ResourceManagerLeaderListener * | // 當RM狀態變化以後,將回調到這裏 * | // The listener for leader changes of the resource manager. * | * +----> reconnectToResourceManager@TaskExecutor * | // 如下三步調用是漸進的,就是與RM聯繫。 * | * +----> tryConnectToResourceManager@TaskExecutor * | * +----> connectToResourceManager()@TaskExecutor * | // 主要做用是生成了 TaskExecutorToResourceManagerConnection * | * +----> start@TaskExecutorToResourceManagerConnection * | // 開始RPC調用,將會調用到其基類RegisteredRpcConnection的start * | * +----> start@RegisteredRpcConnection * | // RegisteredRpcConnection實現了組件之間註冊聯繫的基本RPC * | * ~~~~~~~~ 這裏是 Akka RPC * 2. Run in Resource Manager * 如今程序執行序列到達了RM, 主要是添加一個Target到RM 的 Sender HM; * * registerTaskExecutor@ResourceManager * | * +----> taskExecutorGatewayFuture.handleAsync * | // 異步調用到這裏 * | * +----> registerTaskExecutorInternal@ResourceManager * | // RM的內部實現,將把TM註冊到RM本身這裏 * | * +----> taskManagerHeartbeatManager.monitorTarget * | // 生成HeartbeatMonitor, * | * +----> heartbeatTargets.put(resourceID,heartbeatMonitor); * | // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM * | * ~~~~~~~~ 這裏是 Akka RPC * 3. Run in Task Manager * 如今程序回到了TM, 主要是添加一個Target到 TM 的 Receiver HM; * * onRegistrationSuccess@TaskExecutorToResourceManagerConnection * | * | * +----> onRegistrationSuccess@ResourceManagerRegistrationListener * | // 回調函數 * | * +----> runAsync(establishResourceManagerConnection) * | // 異步執行 * | * +----> establishResourceManagerConnection@TaskExecutor * | // 說明已經和RM創建了聯繫,因此能夠開始監控RM了 * | * +----> resourceManagerHeartbeatManager.monitorTarget * | // 生成HeartbeatMonitor, * | * +----> heartbeatTargets.put(resourceID,heartbeatMonitor); * | // 把 RM 也註冊到 TM了 * | // monitor the resource manager as heartbeat target
下面是具體文字描述。
private final LeaderRetrievalService resourceManagerLeaderRetriever; // resourceManagerLeaderRetriever實際上是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the ResourceManager will always send heartbeat requests to the // TaskManager } public void requestHeartbeat(ResourceID resourceID, Void payload) { taskExecutorGateway.heartbeatFromResourceManager(resourceID); } });
當註冊完成後,RM中的Sender HM內部結構以下,能看出來多了一個Target:
taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} heartbeatPeriod = 10000 heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af" heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatTargets = {ConcurrentHashMap@8875} size = 1 {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" value = {HeartbeatMonitorImpl@9448} resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" heartbeatTarget = {ResourceManager$2@8868} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} heartbeatTimeoutIntervalMs = 50000 futureTimeout = {ScheduledFutureAdapter@10140} state = {AtomicReference@9786} "RUNNING" lastHeartbeat = 0
RM會經過RPC再次回到TaskExecutor,其新執行序列以下:
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor heartbeatTargets.put(resourceID, heartbeatMonitor);
當註冊完成後,其Receiver HM結構以下:
resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2" heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatTargets = {ConcurrentHashMap@10427} size = 1 {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" value = {HeartbeatMonitorImpl@10666} resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" heartbeatTarget = {TaskExecutor$1@10668} scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} heartbeatTimeoutIntervalMs = 50000 futureTimeout = {ScheduledFutureAdapter@10992} state = {AtomicReference@10667} "RUNNING" lastHeartbeat = 0
其調用基本思路與以前相同,就是TM和JM之間互相註冊一個表明對方的monitor:
JobLeaderListenerImpl ----> establishJobManagerConnection
消息到了JM中,作以下操做。
registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget // monitor the task manager as heartbeat target
在任務提交以後,咱們就進入了正常的心跳監控流程。咱們依然用 TM 和 RM進行演示。
咱們先給出一個流程圖。
* 1. Run in Resouce Manager * * HeartbeatManagerSender in RM * | * +----> run@HeartbeatManagerSenderImpl * | //遍歷全部監控的Monitor(Target),逐一在Target上調用requestHeartbeat * | * +----> requestHeartbeat@HeartbeatManagerSenderImpl * | // 將調用具體監控對象的自定義函數 * | // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); * | * +----> getHeartbeatListener().retrievePayload * | // 調用到TaskManagerHeartbeatListener@ResourceManager * | // 這裏是return null;,由於RM不會是任何人的Receiver * | * +----> requestHeartbeat@HeartbeatTarget * | // 調用到Target這裏,代碼在ResourceManager這裏,就是生成Target時候賦值的 * | * +----> taskExecutorGateway.heartbeatFromResourceManager * | // 會經過gateway RPC 調用到TM,這就是主動對TM發起了心跳請求 * | * ~~~~~~~~ 這裏是 Akka RPC * 2. Run in Task Manager * 如今程序執行序列到達了TM, 主要是 1. 重置TM的Monitor線程; 2.返回一些負載信息; * * heartbeatFromResourceManager@TaskExecutor * | * +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); * | //開始要調用到 Receiver HM in Task Manager * | * +----> requestHeartbeat@HeartbeatManager in TM * | // 在Receiver HM in Task Manager 這裏運行 * | * +----> reportHeartbeat@HeartbeatMonitor * | //reportHeartbeat : 記錄發起請求的這個時間點,而後resetHeartbeatTimeout * | * +----> resetHeartbeatTimeout@HeartbeatMonitor * | // 若是Monitor狀態依然是RUNNING,則取消以前設置的ScheduledFuture。 * | // 從新建立一個ScheduleFuture。由於若是不取消,則以前那個ScheduleFuture運行時 * | // 會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet後,通知目標函數 * | // 目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。 * | // 這裏表明 JM 狀態正常。 * | * +----> heartbeatListener.reportPayload * | // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。 * | // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄作監聽管理。 * | * +----> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); * | * | * +----> retrievePayload@ResourceManagerHeartbeatListener in TM * | // heartbeatTarget.receiveHeartbeat參數調用的 * | * +----> return new TaskExecutorHeartbeatPayload * | * | * +----> receiveHeartbeat in TM * | // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函數 * | // 就是響應一個心跳消息回給RM * | * +----> resourceManagerGateway.heartbeatFromTaskManager * | // 會經過gateway RPC 調用到 ResourcManager * | * ~~~~~~~~ 這裏是 Akka RPC * 3. Run in Resouce Manager * 如今程序回到了RM, 主要是 1.重置RM的Monitor線程;2. 上報收到TaskExecutor的負載信息 * * heartbeatFromTaskManager in RM * | * | * +----> taskManagerHeartbeatManager.receiveHeartbeat * | // 這是個Sender HM * | * +----> HeartbeatManagerImpl.receiveHeartbeat * | * | * +----> HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin); * | * | * +----> heartbeatMonitor.reportHeartbeat(); * | // 這裏就是重置RM 這裏對應的Monitor。在reportHeartbeat重置 JM monitor線程的觸發,即cancelTimeout取消註冊時候的超時定時任務,而且註冊下一個超時檢測futureTimeout;這表明TM正常執行。 * | * +----> heartbeatListener.reportPayload * | //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄作監聽管理。 * | * +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport()); * | // TaskManagerHeartbeatListener中調用,上報收到TaskExecutor的負載信息 * |
下面是具體文字描述。
心跳機制是由Sender主動發起的。這裏就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual調用,這裏會遍歷全部監控的Monitor(Target),逐一在Target上調用requestHeartbeat。
// HeartbeatManagerSenderImpl中的代碼 @Override public void run() { if (!stopped) { for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { // 這裏向被監控對象節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控對象迴應心跳 requestHeartbeat(heartbeatMonitor); } getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS); } } } // 運行時候的變量 this = {HeartbeatManagerSenderImpl@9037} heartbeatPeriod = 10000 heartbeatTimeoutIntervalMs = 50000 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95" heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} // 調用棧以下 requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager) requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager) requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat) run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat) call:511, Executors$RunnableAdapter (java.util.concurrent) run$$$capture:266, FutureTask (java.util.concurrent) run:-1, FutureTask (java.util.concurrent)
具體監控對象 Target 會調用自定義的requestHeartbeat。
HeartbeatManagerSenderImpl private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); // 這裏就是具體監控對象 heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); } heartbeatTarget = {ResourceManager$2@10688} taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334" this$0 = {StandaloneResourceManager@9458}
請注意,每個Target都是由ResourceManager生成的。ResourceManager以前註冊成爲Monitor時候就註冊了這個HeartbeatTarget。
這個HeartbeatTarget的定義以下,兩個函數是:
receiveHeartbeat :這個是空,由於RM沒有本身的Sender。
requestHeartbeat :這個針對TM,就是調用TM的heartbeatFromResourceManager,固然是經過RPC調用。
會調用到ResourceManager定義的函數requestHeartbeat,而requestHeartbeat會經過gateway調用到TM,這就是主動對TM發起了心跳請求。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the ResourceManager will always send heartbeat requests to the TaskManager } @Override public void requestHeartbeat(ResourceID resourceID, Void payload) { //就是調用到這裏 taskExecutorGateway.heartbeatFromResourceManager(resourceID); } });
經過taskExecutorGateway。心跳程序執行就經過RPC從RM跳躍到了TM。
taskExecutorGateway.heartbeatFromResourceManager
的意義就是:經過RPC調用回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。
// TaskExecutor RPC gateway interface. public interface TaskExecutorGateway extends RpcGateway
TaskExecutor實現了TaskExecutorGateway,因此具體在TaskExecutor內部實現了接口函數。
@Override public void heartbeatFromResourceManager(ResourceID resourceID) { //調用到了這裏 ........... resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); }
TM中,resourceManagerHeartbeatManager 定義以下。
/** The heartbeat manager for resource manager in the task manager. */ private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
因此下面就是執行TM中的Receiver HM。在這個過程當中有兩個處理步驟:
具體是調用 requestHeartbeat@HeartbeatManager。在其中會
@Override public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}.", requestOrigin); final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); if (heartbeatTarget != null) { if (heartbeatPayload != null) { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); } } }
最後會經過resourceManagerGateway.heartbeatFromTaskManager
調用到 ResourcManager。
JobMaster在接收到rpc請求後調用其heartbeatFromTaskManager方法,會調用taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程當中一樣有兩個處理步驟:
至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。
首先,在HeartbeatMonitorImpl中,若是超時,會調用Listener。
public void run() { // The heartbeat has timed out if we're in state running if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) { heartbeatListener.notifyHeartbeatTimeout(resourceID); } }
這就來到了ResourceManagerHeartbeatListener,會嘗試再次鏈接RM。
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceId) { validateRunsInMainThread(); // first check whether the timeout is still valid if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) { reconnectToResourceManager(new TaskManagerException( String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))); } else { ..... } }
RM就直接簡單粗暴,關閉鏈接。
private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceID) { validateRunsInMainThread(); closeTaskManagerConnection( resourceID, new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); } }
心跳機制咱們講解完了,可是咱們最初提到的異常應該如何解決呢?在程序最開始生成環境變量時候,經過設置環境變量的配置便可搞定:
Configuration conf = new Configuration(); conf.setString("heartbeat.timeout", "18000000"); final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);