正如其它RPC或者RMI框架那樣,Akka也提供了遠程調用的能力。服務端在監聽的端口上接收客戶端的調用。本文將在《Spring與Akka的集成》一文的基礎上介紹Akka的remote調用,本文不少代碼和例子來源於Akka官網的代碼示例,也包含了一些適用於Spring集成的改造,本文旨在介紹Akka的遠程調用的開發過程。html
服務端開發java
Akka的默認配置文件爲application.conf,若是不特別指明,Akka System都會默認加載此配置。若是你想自定義符合你習慣的名字,可使用以下代碼:spring
final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));
上述代碼中的yourconf不包含文件後綴名,在你的資源路徑下實際是yourconf.conf。編程
我不太想自定義加載的配置文件,而是繼續使用application.conf,這裏先列出其配置:json
include "common" akka { # LISTEN on tcp port 2552 remote.netty.tcp.port = 2552 }
這裏的remote.netty.tcp.port配置屬性表示使用Netty框架在TCP層的監聽端口是2552。include與java裏的import或者jsp頁面中的include標籤的做用相似,表示引用其餘配置文件中的配置。因爲common.conf中包含了Akka的一些公共配置,因此能夠這樣引用,common.conf的配置以下:服務器
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { netty.tcp { hostname = "127.0.0.1" } } }
common配置中的provider屬性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即遠程ActorRef的提供者。這裏的hostname屬性表示服務器的主機名。從common配置咱們還能夠看出Akka的配置有點相似於json,也是一種嵌套結構。此外,Akka還能夠採用一種扁平的配置方式,例如:架構
akka.actor.provider = "..." akka.remote.netty.tcp.hostname = "127.0.0.1"
它們所表明的做用是同樣的。至於選擇扁平仍是嵌套的,一方面依據你的我的習慣,一方面依據配置的多寡——隨着配置項的增多,你會發現嵌套會讓你的配置文件更加清晰。app
因爲官網的例子比較簡潔並能說明問題,因此本文對Akka官網的例子進行了一些改造來介紹服務端與客戶端之間的遠程調用。服務端的配置已在上一小節列出,本小節着重介紹服務端的實現。框架
咱們的服務端是一個簡單的提供基本的加、減、乘、除的服務的CalculatorActor,這些運算都直接封裝在CalculatorActor的實現中(在實際的業務場景中,Actor應該只接收、回覆及調用具體的業務接口,這裏的加減乘除運算應當由指定的Service接口實現,特別是在J2EE或者與Spring集成後),CalculatorActor的實現見代碼清單1。dom
代碼清單1
@Named("CalculatorActor") @Scope("prototype") public class CalculatorActor extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class); @Override public void onReceive(Object message) { if (message instanceof Op.Add) { Op.Add add = (Op.Add) message; logger.info("Calculating " + add.getN1() + " + " + add.getN2()); Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Subtract) { Op.Subtract subtract = (Op.Subtract) message; logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2()); Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Multiply) { Op.Multiply multiply = (Op.Multiply) message; logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2()); Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Divide) { Op.Divide divide = (Op.Divide) message; logger.info("Calculating " + divide.getN1() + " / " + divide.getN2()); Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2()); getSender().tell(result, getSelf()); } else { unhandled(message); } } }
Add、Subtract、Multiply、Divide都繼承自MathOp,這裏只列出MathOp和Add的實現,見代碼清單2所示。
代碼清單2
public interface MathOp extends Serializable { } public static class Add implements MathOp { private static final long serialVersionUID = 1L; private final int n1; private final int n2; public Add(int n1, int n2) { this.n1 = n1; this.n2 = n2; } public int getN1() { return n1; } public int getN2() { return n2; } }
服務端應當啓動CalculatorActor實例,以提供服務,代碼以下:
logger.info("Start calculator"); final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator"); actorMap.put("calculator", calculator); logger.info("Started calculator");
客戶端調用遠程CalculatorActor提供的服務後,還要接收其回覆信息,所以也須要監聽端口。客戶端和服務端若是在同一臺機器節點上,那麼客戶端的監聽端口不能與服務端衝突,我給出的配置示例以下:
include "common" akka { remote.netty.tcp.port = 2553 }
客戶端經過遠程Actor的路徑得到ActorSelection,而後向遠程的Akka System獲取遠程CalculatorActor的ActorRef,進而經過此引用使用遠端CalculatorActor提供的服務。在詳細的說明實現細節以前,先來看看LookupActor的實現,見代碼清單3所示。
代碼清單3
@Named("LookupActor") @Scope("prototype") public class LookupActor extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(LookupActor.class); private final String path; private ActorRef calculator = null; public LookupActor(String path) { this.path = path; sendIdentifyRequest(); } private void sendIdentifyRequest() { getContext().actorSelection(path).tell(new Identify(path), getSelf()); getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(), ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf()); } @Override public void onReceive(Object message) throws Exception { if (message instanceof ActorIdentity) { calculator = ((ActorIdentity) message).getRef(); if (calculator == null) { logger.info("Remote actor not available: " + path); } else { getContext().watch(calculator); getContext().become(active, true); } } else if (message instanceof ReceiveTimeout) { sendIdentifyRequest(); } else { logger.info("Not ready yet"); } } Procedure<Object> active = new Procedure<Object>() { @Override public void apply(Object message) { if (message instanceof Op.MathOp) { // send message to server actor calculator.tell(message, getSelf()); } else if (message instanceof Op.AddResult) { Op.AddResult result = (Op.AddResult) message; logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult())); ActorRef sender = getSender(); logger.info("Sender is: " + sender); } else if (message instanceof Op.SubtractResult) { Op.SubtractResult result = (Op.SubtractResult) message; logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult())); ActorRef sender = getSender(); logger.info("Sender is: " + sender); } else if (message instanceof Terminated) { logger.info("Calculator terminated"); sendIdentifyRequest(); getContext().unbecome(); } else if (message instanceof ReceiveTimeout) { // ignore } else { unhandled(message); } } }; }
LookupActor的構造器須要傳遞遠端CalculatorActor的路徑,而且調用了sendIdentifyRequest方法,sendIdentifyRequest的做用有兩個:
logger.info("start lookup"); final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator"; final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup"); final Random r = new Random(); actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() { @Override public void run() { if (r.nextInt(100) % 2 == 0) { lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null); } else { lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null); } } }, actorSystem.dispatcher());
個人客戶端和服務端都運行於本地,客戶端tcp監聽端口是2553,服務端監聽端口是2552,因爲本例子的代碼較爲健壯,因此客戶端、服務端能夠以任意順序啓動。客戶端運行後的日誌以下圖所示:
服務端的運行日誌以下圖所示:
Akka的遠端調用是你們在使用時最經常使用的特性之一,掌握起來不是什麼難事,如何實現處理多種消息,並考慮其穩定性、健壯性是須要詳細考慮的。