在 Actor 模型中全部的 Actor 之間有且只有一種通訊模式,那就是 tell 的方式,也就是 fire and forget 的方式。可是在實際的開發過程當中工程師們逐漸總結出了一些經常使用的通訊模式。本文以 akka-typed(2.6.0-M8) 框架爲例,介紹存在於 actor 模型中最基本的一些通訊模式,討論消息是如何在 actor 之間流動的。前端
咱們首先看到最基礎的模式 fire and forget 即發送後遺忘。這種模式是直觀地嘗試向一個 actor 發送一條消息後就再也不關心此次通訊。此處的發送消息僅保證到 fire 的發送,即消息以發出,可是可能因爲網絡緣由被丟失。這種通訊模式使用的是所謂的 at most once 送達語義。java
在給出一個可運行的代碼例子以前,咱們簡單介紹一下 akka-typed 框架這個 actor 模型的特定實現。在 akka-typed 中,消息的傳輸是經過 ActorRef<T>
的引用來進行的,ActorRef
是爲位置透明的 actor 的引用,T
表示了 actor 能接收的消息類型。這點跟典型的 actor 模型有所不一樣。這是由於典型的 actor 模型是非肯定性的,實現爲 akka untyped 那樣的框架,即任意 actor 均可以接受任意類型的信息,而且所以可以經過 become
/unbecome
來改變本身的行爲(behavior)。在 akka-typed 中經過帶有類型標籤的引用,保證調用 actorRef.tell(message)
的時候不會意外的傳遞錯誤的信息,同時類型信息有助於推理代碼,而不是在一堆任意引用任意類型的 ActorRef 和 Object 類型的消息中費力的分析。一個對應的缺點就是沒法實現任意靈活度的 become
/unbecome
,若是須要實現相似狀態機的功能,須要加入額外的域而且在 Behaviors
中針對狀態分別討論,但全部能接受的消息都一定是 T
的類型。數據庫
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Behaviors; import java.util.concurrent.CountDownLatch; public class FireAndForget { public static final CountDownLatch LATCH = new CountDownLatch(2); public static class PrintMe { public final String message; public PrintMe(String message) { this.message = message; } } public static final Behavior<PrintMe> printerBehavior = Behaviors.receive((ctx, printMe) -> { ctx.getLog().info(printMe.message); LATCH.countDown(); return Behaviors.same(); }); public static void main(String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.setup(ctx -> { ActorRef<PrintMe> printer = ctx.spawn(printerBehavior, "printer"); printer.tell(new PrintMe("message 1")); printer.tell(new PrintMe("message 2")); return Behaviors.ignore(); }), "fire-and-forget"); LATCH.await(); system.terminate(); } }
這裏須要用 CountDownLatch
來同步保證消息送達後再關閉 ActorSystem
,不然因爲執行順序沒有保證,頗有可能在看到 log 打印以前因爲 ActorSystem
的關閉 printer actor 被中止,從然後續被調度的消息傳遞失敗。由此也能夠看出 fire and forget 的方式僅保證在 tell
的調用點發出消息,至於消息能不能送到,會被怎麼處理,發送方就無論了。編程
請求-響應模式是另外一種很是常見的通訊模式,即 actor-1 向 actor-2 發送消息,actor-2 收到消息後向 actor-1 返回相應的消息。注意這裏咱們只提到了消息的發送,兩個方向均是 fire and forget 的,actor-1 收到的 actor-2 的消息跟它收到的任何一條其餘信息沒有任何區別。除非 actor-2 發回的消息中攜帶有跟迴應相關的信息,不然 actor-1 並不知道這條消息就是迴應剛纔發送的某一條信息的。此外,特化到 akka-typed 中,甚至因爲消息中再也不自然包括 sender 信息,actor-1 並不知道這條消息是 actor-2 發過來的,必須在消息中顯式的包含全部須要的信息。後端
因爲 request 和 response 是鬆耦合的,這和咱們下面要講到的使用 ask pattern 的模式不一樣,不能做爲面向對象編程在 actor 模型中的投影。設計模式
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.Terminated; import akka.actor.typed.javadsl.Behaviors; import scala.runtime.Nothing$; @SuppressWarnings("WeakerAccess") public class RequestResponseTypedActors { public static final class Request { public final String message; public final ActorRef<Response> replyTo; public Request(String message, ActorRef<Response> replyTo) { this.message = message; this.replyTo = replyTo; } } public static final class Response { public final String message; public Response(String message) { this.message = message; } } private static final Behavior<Request> responder = Behaviors .receiveMessage(request -> { System.out.println("got request: " + request.message); request.replyTo.tell(new Response("got it!")); return Behaviors.same(); }); private static Behavior<Response> requester(ActorRef<Request> responder) { return Behaviors .setup(ctx -> { responder.tell(new Request("hello", ctx.getSelf())); return Behaviors.receiveMessage(response -> { System.out.println("got message " + response.message); return Behaviors.stopped(); }); }); } public static void main(String[] args) { ActorSystem.create(Behaviors.setup(ctx -> { ActorRef<Request> res = ctx.spawn(responder, "responder"); ActorRef<Response> req = ctx.spawn(requester(res), "requester"); ctx.watch(req); return Behaviors.receiveSignal((context, sig) -> { if (sig instanceof Terminated) { return Behaviors.stopped(); } else { return Behaviors.unhandled(); } }); }), "ReqResTyped"); } }
能夠看到,responder 收到 requester 的消息中包含了 requester 的地址,所以能夠發送回傳信息。requester 之因此知道 responder 的地址是由開發者在建立 requester 的時候直接告訴它的。這樣一個顯然的問題就是 requester 是怎麼天生地知道 responder 的地址的呢?在分佈式系統中,這能夠經過預配置固定地址或名稱服務來達到這個目的。在 akka-typed 中,經過 ActorSelection
使用 actor 地址尋址的方式已經很難作到的(除非先用 untyped 強行獲取再轉回去,很是複雜並且 hacky),主要是經過 Receptionist
的名稱服務來實現的。這個服務在 cluster 模式下要求必須使用 akka-cluster-typed,實在很有些臃腫。緩存
這裏提到的 Query 模式便是所謂的 ask 模式,也即發送一個消息後等待一個綁定到該消息上的迴應。因爲網絡的滯後性和不肯定性,阻塞地等待這個迴應不只會形成性能問題,更有可能因爲迴應消息的丟失或目標機器宕機而使得當前系統不可用。爲此,咱們須要解耦請求和響應的過程,將本地返回的類型從直接的結果值變爲包裹在 Future 中的值。而後,請求方的業務流程提高到 Future 域中,使用變換組合字來組合後續步驟。因爲 Future 盒子是當即返回的,當前線程能夠繼續其餘工做,在 Future 值被填充後根據此前組合的步驟執行相應的邏輯。這裏順帶提一句,Future 也是函數式編程中著名的概念 Monad 的一個實例,上面這種域的提高和組合的方式是函數式編程中應對狀態變化的一種經常使用手法。網絡
因爲返回的是一個 Future,咱們須要仔細的討論這個 Future 完成的條件。首先,抽象地說,這個 Future 會在收到回覆的時候完成。可是這個請求回覆的雙方若是是兩個業務 actor 之間完成的話,業務 actor 就須要有一張表來緩存全部的 Future,而且在請求和回覆中都帶有 Future 的惟一標識符,以在接受到回覆時自動的完成 Future。因爲 Query 模式不是天生就伴隨在 actor 模型裏的,而是一個後期總結的設計模式,這麼作會致使全部 actor 都承擔這一沒必要要的開銷。併發
所以在 akka 當中採用的方法是,對於一個請求回覆週期,使用一個專門的 PromiseActor
來負責和遠端的 actor 交互,即請求消息由 PromiseActor
發出,收到回覆時也由 PromiseActor
完成其中惟一的專用的 Future。在 akka-typed 中,還額外加入了 createRequest
和 applyToResponse
參數,前者用於將 PromiseActor
的引用傳入發送的消息中,後者用於接收回復後適配爲上層 actor 能接受的消息類型。app
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AskPattern; import akka.actor.typed.javadsl.Behaviors; import java.time.Duration; import java.util.concurrent.CompletableFuture; public class Ask { public static class Pong { } public static class Ping { public final ActorRef<Pong> replyTo; public Ping(ActorRef<Pong> replyTo) { this.replyTo = replyTo; } } public static final Behavior<Ping> pongBehavior = Behaviors.receiveMessage(ping -> { ping.replyTo.tell(new Pong()); return Behaviors.stopped(); }); public static void main(String[] args) { ActorSystem system = ActorSystem.create(Behaviors.setup(ctx -> { ActorRef<Ping> ref = ctx.spawn(pongBehavior, "pong"); CompletableFuture<Pong> future = AskPattern.ask( ref, Ping::new, Duration.ofMillis(2000L), ctx.getSystem().scheduler()).toCompletableFuture(); future.whenComplete((pong, throwable) -> { ctx.getLog().info("pong={}, throwable={}", pong, throwable); ctx.getSystem().terminate(); }); return Behaviors.ignore(); }), "ask"); } }
關於 ask 模式還有一個特別須要注意的點,即若是在 actor 中使用 AskSupport 的 ask,實際上這個 ask 中會有一個 spawn 出來的 PromiseActor。這一點在上面已經提過了,可是絕對值得再強調一遍。由於當你在 actor-1 中使用 AskSupport 應用 ask 模式和 actor-2 通訊時,實際通訊的是 PromiseActor。所以,akka 提供的關於兩個 actor 之間消息的有序性對於這第三個 actor PromiseActor 是不成立的。這有可能致使很是微妙的併發競爭。
最後簡單說起兩種消息流模式 Pipe 和 Aggregate,這兩種模式常常配合 ask 模式使用。
Pipe 模式將一個 Future 在完成時發送到另外一個 actor 上,其基本形式爲 pipe(future, replyTo)
,常常用於 actor-1 向 actor-2 請求,actor-2 進而向 actor-3 請求,隨後在 actor-2 中將向 actor-3 請求的 future pipe 到 actor-1 的回覆中。舉個例子,actor-1 是 client,actor-2 是數據庫前端,actor-3 是數據庫後端。
Aggregate 模式能夠視爲 Ask 模式的一個天然擴展。Ask 模式一次只能處理一個請求及其迴應,而 Aggregate 模式簡單的擴展到一個顯式建立的子 actor 來處理多個請求響應,很是直觀。