flink中的rpc框架使用的akka。在本節並不詳細講述akka,而是就flink中rpc來說述akka的部份內容。本節,我從AkkaRpcActor.handleRpcInvocation方法講起。
看過hadoop、yarn、hive、hbase、presto的rpc框架,感受flink的通訊框架是最容易讓人繞暈的。雖然以前也看過一點spark中akka的通訊,但如今早已忘得一乾二淨。現在重拾akka通訊,感受仍是挺複雜的。所以,這裏特地拿出一節來說解。
1.這裏首先要講述的是flink中關於心跳的rpc交互。這裏也是akka中第一種遠程通訊方式,也就是說經過tell方式異步傳輸。
這裏咱們從HeartbeatTarget.requestHeartbeat開始講。真正調用的是ResourceManager.registerTaskExecutorInternal方法中
類型爲HeartbeatTarget的匿名類,其內部調用了taskExecutorGateway.heartbeatFromResourceManager。這裏的taskExecutorGateway是一個代理類,其invocationHandler爲AkkaInvocationHandler。所以,這裏首先調用的是AkkaInvocationHandler.invoke,因爲這裏要調用的並不是本地方法,所以接着調用了方法AkkaInvocationHandler.invokeRpc。在該方法中首先經過方法createRpcInvocationMessage封裝了發現taskmanager端的請求RemoteRpcInvocation,接着獲取了欲調用方法的返回值(這裏的判斷是爲了後面使用不一樣的akka通訊方式)。咱們這裏的返回值爲Void。而後調用了AkkaInvocationHandler.tell。這裏的入參是剛剛封裝的RemoteRpcInvocation,該方法內部調用了ActorRef.tell。該actor就是taskmanager端的化生,發送了RemoteRpcInvocation(可序列化)。jobmanager端,也就是resourcemanager端的流程到這裏就結束了,由於咱們遠程調用的方法是無返回值的。
接着,咱們來到taskmanager端,這裏的AkkaRpcActor.onReceive接收到resourcemanager端發來的消息。根據類型的匹配,咱們來到AkkaRpcActor.handleRpcMessage。因爲這裏的信息是RemoteRpcInvocation,實現了接口RpcInvocation,所以,咱們來到AkkaRpcActor.handleRpcInvocation方法。這裏首先調用方法lookupRpcMethod根據方法名獲取taskmanager端對應的方法,也就是TaskExecutor中對應的方法。接着,設置了其訪問屬性後,便開始反射調用。因爲咱們這裏的方法返回值類型爲Void,所以,在調用了TaskExecutor.heartbeatFromResourceManager後再無後續操做。
2.接着是akka中的第二種通訊方式——異步返回。我這裏的使用的是taskmanager向resourcemanager遠程註冊的例子來說解。
從TaskExecutorToResourceManagerConnection.ResourceManagerRegistration.invokeRegistration講起。該方法內部調用了resourceManager.registerTaskExecutor。這裏的resourceManager實際類型是FencedAkkaInvocationHandler。FencedAkkaInvocationHandler繼承自AkkaInvocationHandler。這裏的部分調用流程與上面的異步無返回相似,我就從其中不一樣的地方講起。因爲咱們這裏的返回值類型爲CompletableFuture<RegistrationResponse>,不是Void類型,所以,這裏首先調用了FencedAkkaInvocationHandler.ask,接着調用了FencedAkkaInvocationHandler.fenceMessage將信息類型封裝爲RemoteFencedMessage,接着調用AkkaInvocationHandler.ask。這裏是比較複雜的地方。首先調用了Patterns.ask(ActorRef, message),這裏的ActorRef是resourcemanager端的化身,Patterns.ask是akka用於遠程異步調用的一種方式。其返回值爲scala.concurrent.Future,也就是scala類型的Future。該類型有方法onComplete,做用是當該Future完成是,不管是拋出異常或返回值完成此將來時,調用該方法入參中的函數。這裏咱們經過FutureUtils.toJava將scala中的Future轉換爲java中的CompletableFuture。獲得CompletableFuture後,taskmanager端接着調用CompletableFuture.thenApply方法,內部調用了返回值的deserializeValue方法,也就是獲取到遠程的序列化的返回值後,將其反序列化。因爲咱們這裏rpc調用的方法返回值是CompletableFuture類型,所以這裏並不阻塞,直接返回。
而後,咱們來到resourcemanager端,這裏的AkkaRpcActor.onReceive方法被調用(注意,這裏的實際類型是FencedAkkaRpcActor),因爲傳入的類型爲RemoteFencedMessage,這裏接着調用了FencedAkkaRpcActor.handleRpcMessage。通過幾個判斷後,這裏調用了AkkaRpcActor.handleRpcMessage,此時,這裏的入參爲RemoteFencedMessage.getPayload,也就是RemoteRpcInvocation。接下來的流程我在上面已經提到,這裏就不贅述了。所不一樣的是,咱們這裏的返回爲類型爲CompletableFuture,所以,這裏接着會調用AkkaRpcActor.sendAsyncResponse。這裏首先調用了方法——Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender),這裏的promise是scala中的Promise.DefaultPromise類型,該方法的做用其實就是講java中的CompletableFuture轉換爲scala中的類型DefaultPromise,畢竟,java中的CompletableFuture類型沒法實現rpc。sendAsyncResponse方法的做用就是,當入參asyncResponse完成後,會調用Promise.DefaultPromise的相應方法(success或failure)被調用。此時,因爲Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender)已經被調用,所以,taskmanager端調用Patterns.ask方法的返回的future爲完成狀態,也就是調用了其onComplete。接着,在taskmanager端將返回值反序列化,完成異步rpc的調用。
3.接着是akka的最後通訊方式——阻塞返回。在flink中的對應的方法是AkkaRpcActor.sendSyncResponse(這裏在flink中不多用到,所以我這裏並無舉例)。
這裏rpc調用方法的返回值爲非CompletableFuture類型,前面的調用流程與上面講述的異步返回同樣,所不一樣的是,因爲方法返回值類型爲非CompletableFuture,所以,這裏調用了CompletableFuture.get,這裏會一直阻塞,直待該CompletableFuture的完成。這裏的CompletableFuture其實就是經過FutureUtils.toJava實現了將scala中的future轉換爲java中的CompletableFuture。也就是說,這裏會一直等到遠程方法Promise.DefaultPromise的相應方法(success或failure)被調用,這裏的阻塞纔會被打斷。
好了,到這裏爲止,關於flink中應用akka完成其rpc通訊框架的流程就結束了,感謝你們的關注。