Akka2使用探索4(Actors)

Actor模型爲編寫併發和分佈式系統提供了一種更高的抽象級別。它將開發人員從顯式地處理鎖和線程管理的工做中解脫出來,使編寫併發和並行式系統更加容易。java

Akka Actor的API與Scala Actor相似,而且從Erlang中借用了一些語法。數據庫

 

Actor類的定義api

定義一個Actor類須要繼承UntypedActor,並實現onReceive方法。服務器

 

Props網絡

Props是一個用來在建立actor時指定選項的配置類。 如下是使用如何建立Props實例的示例.併發

Props props1 = new Props();
Props props2 = new Props(MyUntypedActor.class);
Props props3 = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
Props props4 = props1.withCreator(new UntypedActorFactory() {
  public UntypedActor create() {
    return new MyUntypedActor();
  }
});app

 

使用Props建立Actor負載均衡

Actor能夠經過將 Props 實例傳入 actorOf 工廠方法來建立。異步

ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");

 

使用默認構造函數建立Actors分佈式

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
    ActorSystem system = ActorSystem.create("MySystem");
    ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");

actorOf返回ActorRef實例,它是你建立的UntypedActor實例的句柄,可用它與實際的Actor交互。ActorRef是不可變的,它也是可序列化的,可用於網絡傳輸,在遠程主機上它仍然表明原節點上的同一個Actor。

也能夠經過actor的context上下文來建立,它被建立他的actor監管,系統建立的actor將成爲頂級actor。

public class FirstUntypedActor extends UntypedActor {
ActorRef myActor = getContext().actorOf(new Props(MyActor.class), "myactor");

name 參數是可選的, 但建議你爲你的actor起一個合適的名字,由於它將在日誌信息中被用於標識各個actor. 名字不能夠爲空,也不能以以$開頭。若是給定的名字已經被賦給了同一個父actor的其它子actor,將會拋出InvalidActorNameException

Actor 在建立後將自動異步地啓動。當你建立UntypedActor時它會自動調用preStart回調方法,你能夠重載preStart方法,加入初始化代碼。

注意:

使用system.actorOf建立頂級actor是個阻塞操做,有可能發生死鎖。避免方法就是不要在actors或futures內部使用默認的dispatcher調用system.actorOf方法。

 

使用非缺省構造方法建立Actor

若是UntypedActor的構造方法有參數,就不能用actorOf(new Props(clazz))建立了。須要使用new Props(new UntypedActorFactory() {..})建立,例子以下:

// allows passing in arguments to the MyActor constructor
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
  public UntypedActor create() {
      return new MyActor("...");
  }
}), "myactor");

 

UntypedActor API

UntypedActor只定義了一個抽象方法,就是上面提到的onReceive(Objectmessage), 用來實現actor的行爲。

若是當前actor的行爲與收到的消息不匹配,則會調用unhandled方法, 它的缺省實現是向actor系統的事件流中發佈一條 akka.actor.UnhandledMessage(message, sender, recipient)

另外,它還包括:

  • getSelf()表明本actor的 ActorRef
  • getSender()表明最近收到的消息的發送actor,一般用於下面將講到的迴應消息中
  • supervisorStrategy()用戶可重寫它來定義對子actor的監管策略
  • getContext()暴露actor和當前消息的上下文信息,如:
  • 用於建立子actor的工廠方法 (actorOf)
  • actor所屬的系統
  • 父監管者
  • 所監管的子actor
  • 生命週期監控
  • hotswap行爲棧

    其他的可見方法是能夠被用戶重寫的生命週期hook。

    public void preStart() { }

    public void preRestart(Throwable reason, Option message) { for (ActorRef each : getContext().getChildren()) getContext().stop(each); postStop(); }

    public void postRestart(Throwable reason) {
      preStart();
    }
    public void postStop() {
    }

     

    使用DeathWatch進行生命週期監控

    爲了能在其它actor結束時(永久終止, 而不是臨時的失敗和重啓)收到通知, actor能夠將本身註冊爲其它actor在終止時所發佈的 Terminated 消息的接收者. 這個服務是由actor系統的 DeathWatch 組件提供的。

    public static class WatchActor extends UntypedActor {
      final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
      {
        this.getContext().watch(child); // <-- this is the only call needed for registration
      }
      ActorRef lastSender = getContext().system().deadLetters();
      @Override
      public void onReceive(Object message) {
        if (message.equals("kill")) {
          getContext().stop(child);
          lastSender = getSender();
        } else if (message instanceof Terminated) {
          final Terminated t = (Terminated) message;
          if (t.getActor() == child) {
            lastSender.tell("finished");
          }
          } else {
            unhandled(message);
          }
        }
    }

    要注意 Terminated 消息的產生與註冊和終止行爲所發生的順序無關。屢次註冊並不表示會有多個消息產生,也不保證有且只有一個這樣的消息被接收到:若是被監控的actor已經生成了消息而且已經進入了隊列, 在這個消息被處理以前又發生了另外一次註冊,則會有第二個消息進入隊列,由於一個已經終止的actor註冊監控器會馬上致使Terminated 消息的發生。

    可使用 context.unwatch(target)來中止對另外一個actor的生存狀態的監控, 但很明顯這不能保證不會接收到Terminated 消息由於該消息可能已經進入了隊列。

     

    啓動 Hook

    actor啓動後,它的 preStart 會被當即執行。

    重啓 Hook

    全部的Actor都是被監管的, i.e. 以某種失敗處理策略與另外一個actor連接在一塊兒。 若是在處理一個消息的時候拋出的異常,Actor將被重啓。這個重啓過程包括上面提到的Hook:

  • 要被重啓的actor的 preRestart 被調用,攜帶着致使重啓的異常以及觸發異常的消息; 若是重啓並非由於消息的處理而發生的,所攜帶的消息爲 None , 例如,當一個監管者沒有處理某個異常繼而被它本身的監管者重啓時。 這個方法是用來完成清理、準備移交給新的actor實例的最佳位置。 它的缺省實現是終止全部的子actor並調用 postStop.
  • 最初 actorOf 調用的工廠方法將被用來建立新的實例。
  • 新的actor的 postRestart 方法被調用,攜帶着致使重啓的異常信息。 By default the preStart is called, just as in the normal start-up case.

    actor的重啓會替換掉原來的actor對象; 重啓不影響郵箱的內容, 因此對消息的處理將在 postRestart hook 返回後繼續. 觸發異常的消息不會被從新接收。在actor重啓過程當中全部發送到該actor的消息將象日常同樣被放進郵箱隊列中。

    終止 Hook

    一個Actor終止後,它的 postStop hook將被調用, 這能夠用來取消該actor在其它服務中的註冊. 這個hook保證在該actor的消息隊列被禁止後才運行, i.e. 以後發給該actor的消息將被重定向到 ActorSystemdeadLetters 中。

     

    標識 Actor

    每一個actor擁有一個惟一的邏輯路徑, 此路徑是由從actor系統的根開始的父子鏈構成;它還擁有一個物理路徑,若是監管鏈包含有遠程監管者,此路徑可能會與邏輯路徑不一樣。這些路徑用來在系統中查找actor,例如,當收到一個遠程消息時查找收件者, 可是它們的更直接的用處在於:actor能夠經過指定絕對或相對路徑(邏輯的或物理的)來查找其它的actor並隨結果獲取一個 ActorRef

  • context.actorFor("/user/serviceA/aggregator") // 查找絕對路徑
  • context.actorFor("../joe") // 查找同一父監管者下的兄弟
  • 其中指定的路徑被解釋爲一個 java.net.URI, 它以 / 分隔成路徑段. 若是路徑以 /開始, 表示一個絕對路徑,從根監管者 ( "/user"的父親)開始查找; 不然是從當前actor開始。若是某一個路徑段爲 .., 會找到當前所遍歷到的actor的上一級, 不然則會向下一級尋找具備該名字的子actor。 必須注意的是 actor路徑中的.. 老是表示邏輯結構,也就是其監管者。

    若是要查找的路徑不存在,會返回一個特殊的actor引用,它的行爲與actor系統的死信隊列相似,可是保留其身份(i.e. 查找路徑)。

    若是開啓了遠程調用,則遠程actor地址也能夠被查找。:

  • context.actorFor("akka://app@otherhost:1234/user/serviceB")
  • 這些查找動做當即返回一個(多是遠程的)actor引用, 因此你必須向它發送一個消息並等待其響應,來確認serviceB 是真正可訪問和運行的。

     

    發送消息

    向actor發送消息是使用下列方法之一。

  • tell 意思是「fire-and-forget」, e.g. 異步發送一個消息並當即返回。這是發送消息的推薦方式。 不會阻塞地等待消息。它擁有最好的併發性和可擴展性。
  • ask 異步發送一條消息並返回一個 Future表明一個可能的迴應。須要採用Future的處理模式。

    每個消息發送者分別保證本身的消息的次序. try {
      String result = operation();
      getSender().tell(result);
    } catch (Exception e) {
      getSender().tell(new akka.actor.Status.Failure(e));
      throw e;
    }

    ask使用方式以下:

    List<Future<Object>> futures = []
            AkkaClientNoReply client = new AkkaClientNoReply("akka://xw@127.0.0.1:8888/user/server")
            client.send("hello")
            0.upto(15) {
                futures << akka.pattern.Patterns.ask(client.akkaClient, it, 1000 * 60)
    //模擬客戶端給服務端發0——15消息,服務器處理(把數值+1返回給客戶端)
            }
    
            final Future<Iterable<Object>> aggregate = Futures.sequence(futures, client.system.dispatcher());
            final Future<Integer> transformed = aggregate.map(new Mapper<Iterable<Object>, Integer>() {
                public Integer apply(Iterable<Object> coll) {
                    final Iterator<Object> it = coll.iterator();
                    int count = 0;
                    while (it.hasNext()) {
                        int x = (Integer) it.next();
                        count = count + x
                    }
                    return new Integer(count);
                }
            });
    
            AkkaServerApp app = new AkkaServerApp("resultHandler", "127.0.0.1", 6666, "result")
            app.messageProcessor = {msg, UntypedActorContext context ->
                log.info("1到16之和爲" + msg)
            }
            app.startup()
    
            akka.pattern.Patterns.pipe(transformed).to(app.serverActor)

    若是服務端處理消息時發生了異常而致使沒有給客戶端迴應,那麼客戶端收到的結果將會收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。能夠將異常捕獲用Failure封裝異常發給客戶端:actor.tell(new akka.actor.Status.Failure(e))。

    Future的onComplete, onResult, 或 onTimeout 方法能夠用來註冊一個回調,以便在Future完成時獲得通知。從而提供一種避免阻塞的方法。

    警告

    在使用future回調如 onComplete, onSuccess, and onFailure時, 在actor內部你要當心避免捕捉該actor的引用, i.e. 不要在回調中調用該actor的方法或訪問其可變狀態。這會破壞actor的封裝,會引用同步bug和race condition, 由於回調會與此actor一同被併發調度。 不幸的是目前尚未一種編譯時的方法可以探測到這種非法訪問。

     

    轉發消息

    你能夠將消息從一個actor轉發給另外一個。雖然通過了一個‘中轉’,但最初的發送者地址/引用將保持不變。當實現功能相似路由器、負載均衡器、備份等的actor時會頗有用。

    myActor.forward(message, getContext());

     

    迴應消息

    getSender().tell(replyMsg)

    若是沒有sender (不是從actor發送的消息或者沒有future上下文) 那麼 sender 缺省爲「dead-letter」 actor的引用.

     

    初始化接收消息超時

    設置receiveTimeout 屬性並聲明一個處理 ReceiveTimeout 對象的匹配分支。

    public class MyReceivedTimeoutUntypedActor extends UntypedActor {
      public MyReceivedTimeoutUntypedActor() {
        getContext().setReceiveTimeout(Duration.parse("30 seconds"));
      }
      public void onReceive(Object message) {
        if (message.equals("Hello")) {
            getSender().tell("Hello world");
        } else if (message == Actors.receiveTimeout()) {
            throw new RuntimeException("received timeout");
        } else {
            unhandled(message);
        }
      }
    }

     

    終止Actor

    經過調用ActorRefFactory i.e. ActorContextActorSystemstop 方法來終止一個actor , 一般 context 用來終止子actor,而 system 用來終止頂級actor. 實際的終止操做是異步執行的, i.e. stop 可能在actor被終止以前返回。

    若是當前有正在處理的消息,對該消息的處理將在actor被終止以前完成,可是郵箱中的後續消息將不會被處理。缺省狀況下這些消息會被送到 ActorSystem死信, 可是這取決於郵箱的實現。

    actor的終止分兩步: 第一步actor將中止對郵箱的處理,向全部子actor發送終止命令,而後處理來自子actor的終止消息直到全部的子actor都完成終止, 最後終止本身 (調用 postStop, 銷燬郵箱, 向 DeathWatch 發佈 Terminated, 通知其監管者). 這個過程保證actor系統中的子樹以一種有序的方式終止, 將終止命令傳播到葉子結點並收集它們回送的確認消息給被終止的監管者。若是其中某個actor沒有響應 (i.e. 因爲處理消息用了太長時間以致於沒有收到終止命令), 整個過程將會被阻塞。

    ActorSystem.shutdown被調用時, 系統根監管actor會被終止,以上的過程將保證整個系統的正確終止。

    postStop hook 是在actor被徹底終止之後調用。

     

    PoisonPill

    你也能夠向actor發送 akka.actor.PoisonPill 消息, 這個消息處理完成後actor會被終止。 PoisonPill 與普通消息同樣被放進隊列,所以會在已經入隊列的其它消息以後被執行。

     

    優雅地終止

    若是你想等待終止過程的結束,或者組合若干actor的終止次序,可使用gracefulStop:

    try {

      Future<Boolean> stopped = akka.pattern.Patterns.gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);

      Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));

      // the actor has been stopped

    } catch (ActorTimeoutException e) {

      // the actor wasn't stopped within 5 seconds

    }

     

    熱拔插 Become/Unbecome

    升級 Upgrade

    Akka支持在運行時對Actor消息循環 (e.g. 的實現)進行實時替換: 在actor中調用 context.become 方法。

    Become 要求一個 akka.japi.Procedure 參數做爲新的消息處理實現。 被替換的代碼被存在一個棧中,能夠被push和pop。

    降級

    因爲被熱替換掉的代碼存在棧中,你也能夠對代碼進行降級,只須要在actor中調用 context.unbecome 方法。

     

    Killing actor

    發送Kill消息給actor

     

    Actor 與 異常

    在消息被actor處理的過程當中可能會拋出異常,例如數據庫異常。

    消息會怎樣

    若是消息處理過程當中(即從郵箱中取出並交給receive後)發生了異常,這個消息將被丟失。必須明白它不會被放回到郵箱中。因此若是你但願重試對消息的處理,你須要本身抓住異常而後在異常處理流程中重試. 請確保你限制重試的次數,由於你不會但願系統產生活鎖 (從而消耗大量CPU而於事無補)。

    郵箱會怎樣

    若是消息處理過程當中發生異常,郵箱沒有任何變化。若是actor被重啓,郵箱會被保留。郵箱中的全部消息不會丟失。

    actor會怎樣

    若是拋出了異常,actor實例將被丟棄而生成一個新的實例。這個新的實例會被該actor的引用所引用(因此這個過程對開發人員來講是不可見的)。注意這意味着若是你不在preRestart 回調中進行保存,並在postRestart回調中恢復,那麼失敗的actor實例的當前狀態會被丟失。

  • 相關文章
    相關標籤/搜索