使用Akka的遠程調用

概述

  正如其它RPC或者RMI框架那樣,Akka也提供了遠程調用的能力。服務端在監聽的端口上接收客戶端的調用。本文將在《Spring與Akka的集成》一文的基礎上介紹Akka的remote調用,本文不少代碼和例子來源於Akka官網的代碼示例,也包含了一些適用於Spring集成的改造,本文旨在介紹Akka的遠程調用的開發過程。html

服務端開發java

配置

  Akka的默認配置文件爲application.conf,若是不特別指明,Akka System都會默認加載此配置。若是你想自定義符合你習慣的名字,可使用以下代碼:spring

final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));  

上述代碼中的yourconf不包含文件後綴名,在你的資源路徑下實際是yourconf.conf。編程

  我不太想自定義加載的配置文件,而是繼續使用application.conf,這裏先列出其配置:json

include "common"  
  
akka {  
  # LISTEN on tcp port 2552  
  remote.netty.tcp.port = 2552  
}  

這裏的remote.netty.tcp.port配置屬性表示使用Netty框架在TCP層的監聽端口是2552。include與java裏的import或者jsp頁面中的include標籤的做用相似,表示引用其餘配置文件中的配置。因爲common.conf中包含了Akka的一些公共配置,因此能夠這樣引用,common.conf的配置以下:服務器

akka {  
  
  actor {  
    provider = "akka.remote.RemoteActorRefProvider"  
  }  
  
  remote {  
    netty.tcp {  
      hostname = "127.0.0.1"  
    }  
  }  
  
}  

common配置中的provider屬性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即遠程ActorRef的提供者。這裏的hostname屬性表示服務器的主機名。從common配置咱們還能夠看出Akka的配置有點相似於json,也是一種嵌套結構。此外,Akka還能夠採用一種扁平的配置方式,例如:架構

akka.actor.provider = "..."  
akka.remote.netty.tcp.hostname = "127.0.0.1"  

它們所表明的做用是同樣的。至於選擇扁平仍是嵌套的,一方面依據你的我的習慣,一方面依據配置的多寡——隨着配置項的增多,你會發現嵌套會讓你的配置文件更加清晰。app

服務端

  因爲官網的例子比較簡潔並能說明問題,因此本文對Akka官網的例子進行了一些改造來介紹服務端與客戶端之間的遠程調用。服務端的配置已在上一小節列出,本小節着重介紹服務端的實現。框架

咱們的服務端是一個簡單的提供基本的加、減、乘、除的服務的CalculatorActor,這些運算都直接封裝在CalculatorActor的實現中(在實際的業務場景中,Actor應該只接收、回覆及調用具體的業務接口,這裏的加減乘除運算應當由指定的Service接口實現,特別是在J2EE或者與Spring集成後),CalculatorActor的實現見代碼清單1。dom

代碼清單1

@Named("CalculatorActor")  
@Scope("prototype")  
public class CalculatorActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class);  
  
    @Override  
    public void onReceive(Object message) {  
  
        if (message instanceof Op.Add) {  
            Op.Add add = (Op.Add) message;  
            logger.info("Calculating " + add.getN1() + " + " + add.getN2());  
            Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Subtract) {  
            Op.Subtract subtract = (Op.Subtract) message;  
            logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2());  
            Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(),  
                    subtract.getN1() - subtract.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Multiply) {  
            Op.Multiply multiply = (Op.Multiply) message;  
            logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2());  
            Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),  
                    multiply.getN1() * multiply.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Divide) {  
            Op.Divide divide = (Op.Divide) message;  
            logger.info("Calculating " + divide.getN1() + " / " + divide.getN2());  
            Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(),  
                    divide.getN1() / divide.getN2());  
            getSender().tell(result, getSelf());  
  
        } else {  
            unhandled(message);  
        }  
    }  
}  

Add、Subtract、Multiply、Divide都繼承自MathOp,這裏只列出MathOp和Add的實現,見代碼清單2所示。

代碼清單2

public interface MathOp extends Serializable {  
}  
  
public static class Add implements MathOp {  
    private static final long serialVersionUID = 1L;  
    private final int n1;  
    private final int n2;  
  
    public Add(int n1, int n2) {  
        this.n1 = n1;  
        this.n2 = n2;  
    }  
  
    public int getN1() {  
        return n1;  
    }  
  
    public int getN2() {  
        return n2;  
    }  
} 

服務端應當啓動CalculatorActor實例,以提供服務,代碼以下:

logger.info("Start calculator");  
final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator");  
actorMap.put("calculator", calculator);  
logger.info("Started calculator"); 

客戶端

  客戶端調用遠程CalculatorActor提供的服務後,還要接收其回覆信息,所以也須要監聽端口。客戶端和服務端若是在同一臺機器節點上,那麼客戶端的監聽端口不能與服務端衝突,我給出的配置示例以下:

include "common"  
  
akka {  
  remote.netty.tcp.port = 2553  
}  

客戶端經過遠程Actor的路徑得到ActorSelection,而後向遠程的Akka System獲取遠程CalculatorActor的ActorRef,進而經過此引用使用遠端CalculatorActor提供的服務。在詳細的說明實現細節以前,先來看看LookupActor的實現,見代碼清單3所示。

代碼清單3

@Named("LookupActor")  
@Scope("prototype")  
public class LookupActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(LookupActor.class);  
  
    private final String path;  
    private ActorRef calculator = null;  
  
    public LookupActor(String path) {  
        this.path = path;  
        sendIdentifyRequest();  
    }  
  
    private void sendIdentifyRequest() {  
        getContext().actorSelection(path).tell(new Identify(path), getSelf());  
        getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(),  
                ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());  
    }  
  
    @Override  
    public void onReceive(Object message) throws Exception {  
        if (message instanceof ActorIdentity) {  
            calculator = ((ActorIdentity) message).getRef();  
            if (calculator == null) {  
                logger.info("Remote actor not available: " + path);  
            } else {  
                getContext().watch(calculator);  
                getContext().become(active, true);  
            }  
  
        } else if (message instanceof ReceiveTimeout) {  
            sendIdentifyRequest();  
  
        } else {  
            logger.info("Not ready yet");  
  
        }  
    }  
  
    Procedure<Object> active = new Procedure<Object>() {  
        @Override  
        public void apply(Object message) {  
            if (message instanceof Op.MathOp) {  
                // send message to server actor  
                calculator.tell(message, getSelf());  
  
            } else if (message instanceof Op.AddResult) {  
                Op.AddResult result = (Op.AddResult) message;  
                logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
  
            } else if (message instanceof Op.SubtractResult) {  
                Op.SubtractResult result = (Op.SubtractResult) message;  
                logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
                  
            } else if (message instanceof Terminated) {  
                logger.info("Calculator terminated");  
                sendIdentifyRequest();  
                getContext().unbecome();  
  
            } else if (message instanceof ReceiveTimeout) {  
                // ignore  
  
            } else {  
                unhandled(message);  
            }  
  
        }  
    };  
}

LookupActor的構造器須要傳遞遠端CalculatorActor的路徑,而且調用了sendIdentifyRequest方法,sendIdentifyRequest的做用有兩個:

  1. 經過向ActorSelection向遠端的Akka System發送Identify消息,並獲取遠程CalculatorActor的ActorRef;
  2. 啓動定時調度,3秒後向CalculatorActor的執行上下文發送ReceiveTimeout消息,而LookupActor處理ReceiveTimeout消息時,再次調用了sendIdentifyRequest方法。
爲什麼要循環調用sendIdentifyRequest方法呢?因爲遠端服務有可能由於進程奔潰、系統重啓等緣由致使已經得到的ActorRef過時或失效,所以須要一個監測機制。sendIdentifyRequest的循環調用就是一個簡單的檢測機制。
遠端的Akka System在接收到Identify消息後,會給LookupActor回覆ActorIdentity消息,LookupActor收到ActorIdentity消息後即可以解析出消息中載有的CalculatorActor的ActorRef,LookupActor而後調用getContext().watch(calculator)實現對子Actor的監管,一旦CalculatorActor重啓或終止,LookupActor即可以接收到Terminated消息(有關Actor的監管機制,能夠閱讀官方文檔)。
因爲LookupActor的onReceive沒法處理加、減、乘、除及Terminated消息,因此這裏用到了一個Akka Actor的狀態轉換,經過使用getContext().become(active, true)。這裏的active是一個內部類,其繼承了Procedure並重寫了apply方法,其中封裝了對於對於加減乘除的計算以及結果、Terminated消息的處理。經過getContext().become(active, true),使得active接替onReceive方法處理接收到的消息。正如Akka官網所述——Actor的這一特性很是適合於開發實現FSM(有限狀態自動機)。
active的功能主要分爲三類:
  • 若是收到MathOp的消息,說明是加減乘除的消息,則將消息進一步告知遠端的CalculatorActor並由其進行處理;
  • 若是收到AddResult或者SubtractResult,這說明CalculatorActor已經處理完了加或者減的處理,並回復了處理結果,所以對計算結果進行使用(本例只是簡單的打印);
  • 若是收到了Terminated消息,說明遠端的CalculatorActor中止或者重啓了,所以須要從新調用sendIdentifyRequest獲取最新的CalculatorActor的ActorRef。最後還須要取消active,恢復爲默認接收消息的狀態;
啓動客戶端的代碼示例以下:
logger.info("start lookup");  
final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator";  
final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup");  
final Random r = new Random();  
actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() {  
  
    @Override  
    public void run() {  
        if (r.nextInt(100) % 2 == 0) {  
            lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);  
        } else {  
            lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);  
        }  
  
    }  
}, actorSystem.dispatcher());   
這裏的客戶端示例以1秒的週期,向LookupActor隨機發送Add或者Subtract的消息。

Actor遠端調用模型

  不管是本地Actor仍是遠端Actor,Actor之因此可以接收消息,是由於每一個Actor都有其自身的郵箱,你能夠定製本身的郵箱(能夠用java中的各類隊列)。本地應用若是想要調用遠端的Actor服務並接收返回信息也就必須擁有本身的郵箱,不然郵遞員投遞信件時因爲沒法找到你家的郵箱,可能會打回郵件、放在你家的門縫下甚至丟棄。所以Actor的調用不管是本地的仍是遠端的都最好遵照Actor的編程模型,就像下圖所示。
 

運行結果

  個人客戶端和服務端都運行於本地,客戶端tcp監聽端口是2553,服務端監聽端口是2552,因爲本例子的代碼較爲健壯,因此客戶端、服務端能夠以任意順序啓動。客戶端運行後的日誌以下圖所示:

 

服務端的運行日誌以下圖所示:

 

總結

  Akka的遠端調用是你們在使用時最經常使用的特性之一,掌握起來不是什麼難事,如何實現處理多種消息,並考慮其穩定性、健壯性是須要詳細考慮的。

後記:通過近一年的準備,《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
 
售賣連接以下:
相關文章
相關標籤/搜索