角色模型對編寫併發、分佈式系統進行了高度抽象。它減輕了開發者必須對互斥鎖與線程管理的負擔,更容易編寫出正確的併發與並行系統。早在1973 年 Carl Hewitt 發表的論文中定義了角色,但一直流行於Erlang 語言中,隨後被愛立信公司應用於創建高併發、可靠通訊系統,取得了巨大成功。 html
Akka 框架裏面角色的API 跟Scala 框架裏面角色類似,後者一些語法曾經模仿Erlang語言。 java
注意:因爲Akka強迫父級監管者監督每個角色和(潛在的子級)監管者,建議你熟悉角色系統、監管、監控,這將可能幫助你閱讀角色參考、路徑和地址。 spring
在Java裏面,角色是經過繼承UntypedActor 類及實現onReceive方法來實現的.這個方法將message做爲參數。 數據庫
這裏有個例子: api
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
Props 是一個配置類,它的做用是對建立角色確認選項。把它做爲不可變的、所以可自由共享規則對建立一個角色包括相關部署信息(例如:使用調度,詳見下文)。下面是如何建立一個Props 實例的一些例子: 安全
1 | import akka.actor.Props; |
2 | import akka.japi.Creator; |
1 | static class MyActorC implements Creator<MyActor> { |
2 | @Override public MyActor create() { |
3 | return new MyActor("..."); |
4 | } |
5 | } |
6 |
7 | Props props1 = Props.create(MyUntypedActor.class); |
8 | Props props2 = Props.create(MyActor.class, "..."); |
9 | Props props3 = Props.create(new MyActorC()); |
第二行顯示如何傳遞構造參數給Actor去建立。在構建Props對象時,存在匹配的構造是被驗證的,若是發現不存在或者存在多個匹配構造,會致使一個IllegalArgumentEception。 網絡
第三行驗證Creator使用。用來驗證Props構造的Creator必須是靜態。類型參數是用來判斷生成角色類的,若是充分擦除,將落回到Actor類,一個參數化工廠例子,能夠是: 併發
1 | static class ParametricCreator<T extends MyActor> implements Creator<T> { |
2 | @Override public T create() { |
3 | // ... fabricate actor here |
4 | } |
5 | } |
注意:
因爲郵箱要求——如使用雙端隊列爲基礎的郵箱使用的隱藏角色——被拾起,在建立以前,角色類型須要已知的,這是Creator類型參數容許的。所以對你用到角色必定儘量使用特定類型。 app
這是個好的主意在UntypedActor類裏面提供靜態工廠方法,該方法幫助建立儘量接近角色定義的合適Props 類。這也容許使用基於Creator方法,該方法靜態驗證所使用的構造函數確實存在,而不是隻在運行時檢查依賴。 負載均衡
01 | public class DemoActor extends UntypedActor { |
02 |
03 | /** |
04 | * Create Props for an actor of this type. |
05 | * @param magicNumber The magic number to be passed to this actor’s constructor. |
06 | * @return a Props for creating this actor, which can then be further configured |
07 | * (e.g. calling `.withDispatcher()` on it) |
08 | */ |
09 | public static Props props(final int magicNumber) { |
10 | return Props.create(new Creator<DemoActor>() { |
11 | private static final long serialVersionUID = 1L; |
12 |
13 | @Override |
14 | public DemoActor create() throws Exception { |
15 | return new DemoActor(magicNumber); |
16 | } |
17 | }); |
18 | } |
19 |
20 | final int magicNumber; |
21 |
22 | public DemoActor(int magicNumber) { |
23 | this.magicNumber = magicNumber; |
24 | } |
25 |
26 | @Override |
27 | public void onReceive(Object msg) { |
28 | // some behavior here |
29 | } |
30 |
31 | } |
32 |
33 | system.actorOf(DemoActor.props(42), "demo"); |
角色經過傳入Props實例進入actorOf 工廠方法,該工廠方法在ActorSystem 和ActorContext類中提供使用。
1 | import akka.actor.ActorRef; |
2 | import akka.actor.ActorSystem; |
1 | // ActorSystem is a heavy object: create only one per application |
2 | final ActorSystem system = ActorSystem.create("MySystem"); |
3 | final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class), |
4 | "myactor"); |
使用ActorSystem 將建立頂級角色,由角色系統提供守護的角色監管,同時使用一個角色的上下文將建立一個子角色。
1 | class A extends UntypedActor { |
2 | final ActorRef child = |
3 | getContext().actorOf(Props.create(MyUntypedActor.class), "myChild"); |
4 | // plus some behavior ... |
5 | } |
建議建立一個包含子類、超子類等等的層次結構,使得它適合具備邏輯性故障處理應用程序結構,詳見Actor Systems。
actorOf 方法調用返回ActorRef實例。這是對角色實例處理,並與它進行交互的惟一途徑。該ActorRef是不可變的,並有一個與它表明的一對一關係角色。該ActorRef是可序列化的和具有網絡意識的。這意味着,你能夠把它進行序列化,將它經過網絡發送,在遠程主機上使用它,它仍然表明着在原始的節點上相同的角色,橫跨網絡。
名稱參數是可選的,可是你應該給你的角色起個更好名稱,由於這是用在日誌消息裏面,並肯定角色。該名稱不能爲空或以$開頭,但它可能包含URL編碼的字符(例如,%20表明空格)。若是給定的名稱已被相同父類中的其餘子類使用,那將拋出InvalidActorNameException異常。
角色是自動異步啓動當被建立時候。
若是你的未類型化的角色有一個攜帶參數的構造函數,而後那些須要Prosp的一部分,以及,如上所述。但在某些狀況下,必須使用一個工廠方法,例如當實際構造函數參數由一個依賴注入框架決定時。
1 | import akka.actor.Actor; |
2 | import akka.actor.IndirectActorProducer; |
01 | class DependencyInjector implements IndirectActorProducer { |
02 | final Object applicationContext; |
03 | final String beanName; |
04 |
05 | public DependencyInjector(Object applicationContext, String beanName) { |
06 | this.applicationContext = applicationContext; |
07 | this.beanName = beanName; |
08 | } |
09 |
10 | @Override |
11 | public Class<? extends Actor> actorClass() { |
12 | return MyActor.class; |
13 | } |
14 |
15 | @Override |
16 | public MyActor produce() { |
17 | MyActor result; |
18 | // obtain fresh Actor instance from DI framework ... |
19 | return result; |
20 | } |
21 | } |
22 |
23 | final ActorRef myActor = getContext().actorOf( |
24 | Props.create(DependencyInjector.class, applicationContext, "MyActor"), |
25 | "myactor3"); |
警告:
你可能有時會傾向於提供一個IndirectActorProducer它老是返回相同的實例,例如:經過使用一個靜態字段。這是不支持的,由於它違背了一個角色重啓含義,這是這裏所描述的含義:什麼從新啓動方式。當使用一個依賴注入框架時,角色Beans 必定不能是單例模式範圍。
依賴注入和依賴注入框架集成技術更深刻地介紹了使用Akka與依賴注入指導方針和在類型安全的活化劑方面的Akka Java Spring 指導。
當寫在角色外面的代碼,應與角色進行溝通,在ask模式能夠是一個解決方案(見下文),但有兩個事情不能作:接收多個回覆(例如:經過訂閱的ActorRef到通知服務)和監控其餘角色的生命週期。爲了這些目的這裏有個Inbox 類:
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.send(target, "hello"); |
3 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world"); |
send方法包裝一個標準的tell和提供一個內部的角色引用做爲發送者。在最後一行將容許該回覆被接收。監控一個角色同時也十分簡單。
1 | final Inbox inbox = Inbox.create(system); |
2 | inbox.watch(target); |
3 | target.tell(PoisonPill.getInstance(), ActorRef.noSender()); |
4 | assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated; |
UntypedActor 類僅僅定義一個抽象方法,就是上面提到onReceive(Object message)方法,該方法實現了角色行爲。若是當前角色行爲不匹配一個接收信息,建議調用unhandled 方法,該方法默認將發出一個new akka.actor.UnhandledMessage(message, sender, recipient)在系統角色事件流中(設置配置項akka.actor.debug.unhandled 到on 讓它們轉化爲實際調試信息)。另外,它提供:
剩餘的可見的方法是用戶可重寫的生命週期鉤子,將在如下描述:
01 | public void preStart() { |
02 | } |
03 |
04 | public void preRestart(Throwable reason, scala.Option<Object> message) { |
05 | for (ActorRef each : getContext().getChildren()) { |
06 | getContext().unwatch(each); |
07 | getContext().stop(each); |
08 | } |
09 | postStop(); |
10 | } |
11 |
12 | public void postRestart(Throwable reason) { |
13 | preStart(); |
14 | } |
15 |
16 | public void postStop() { |
17 | } |
上面顯示實現是默認由UntypedActor 類提供。
在角色系統中的路徑表明一個「地方」,這可能被一個存活着的的角色佔用着。最初,(除了系統初始化角色)的路徑是空的。當actorOf()被調用時,指定一個由經過Props 描述給定的路徑角色的化身。一個角色化身由路徑和一個UID肯定。從新啓動僅僅交換Props 定義的Actor 實例,但化身與UID依然是相同的。
當該角色中止時,化身的生命週期也相應結束了。在這一刻時間上相對應的生命週期事件也將被調用和監管角色也被通知終止結束。化身被中止以後,路徑也能夠重複被經過actorOf() 方法建立的角色使用。在這種狀況下,新的化身的名稱跟與前一個將是相同的而是UIDs將會有所不一樣。
一個ActorRef 老是表明一個化身(路徑和UID)而不僅是一個給定的路徑。所以,若是一個角色中止,一個新的具備相同名稱建立的舊化身的ActorRef不會指向新的。
在另外一方面ActorSelection 指向該路徑(或多個路徑在使用通配符時),而且是徹底不知道其化身當前佔用着它。 因爲這個緣由致使ActorSelection 不能被監視到。經過發送識別信息到將被回覆包含正確地引用(見經過角色選擇集識別角色)的 ActorIdentity 的ActorSelection 來解決當前化身ActorRef 存在該路徑之下。這也能夠用ActorSelection 類的resolveOne方法來解決,這將返回一個匹配ActorRef 的Future 。
當另外一個角色終止時,爲了通知被通知(即永久性地中止,而不是暫時的失敗和從新啓動),一個角色能夠本身註冊爲接收在終止上層的其餘角色發送的終止消息,其餘演員出動(請參閱中止演員)。這項服務是由角色系統的臨終看護組件提供。
註冊一個監視器是很容易的(見第四行,剩下的就是用於展現整個功能):
1 | import akka.actor.Terminated; |
01 | public class WatchActor extends UntypedActor { |
02 | final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); |
03 | { |
04 | this.getContext().watch(child); // <-- the only call needed for registration |
05 | } |
06 | ActorRef lastSender = getContext().system().deadLetters(); |
07 |
08 | @Override |
09 | public void onReceive(Object message) { |
10 | if (message.equals("kill")) { |
11 | getContext().stop(child); |
12 | lastSender = getSender(); |
13 | } else if (message instanceof Terminated) { |
14 | final Terminated t = (Terminated) message; |
15 | if (t.getActor() == child) { |
16 | lastSender.tell("finished", getSelf()); |
17 | } |
18 | } else { |
19 | unhandled(message); |
20 | } |
21 | } |
22 | } |
可是應當注意的是,產生的終止消息獨立於註冊和終止發生的順序。特別是,監控角色將接收一個終止信息即便被監控角色已經被終止在註冊時候。
註冊屢次並沒必要然致使對多個消息被產生,但不保證只有一個對應這樣的消息被接收:若是被監控角色終止已經發生和發送的消息排隊等候着,在另外一個註冊完成以前,該消息已經處理完,而後第二消息將會排隊,是由於已經結束角色的監控的註冊致使終止信息馬上產生。
使用getContext().unwatch(target)方法從監控另外一個角色生命活力撤銷下來也是有可能的。這個工做即便已終止消息已經排隊於郵箱中,在調用unwatch方法後對於那個角色將沒有終止消息被處理。
在正確啓動角色以後,preStart方法被調用。
1 | @Override |
2 | public void preStart() { |
3 | child = getContext().actorOf(Props.empty()); |
4 | } |
第一次建立角色時,該方法被調用。在從新啓動期間,它被postRestart的默認實現調用,這意味着經過重寫該方法,你能夠選擇此方法中初始化代碼是否被調用,對這個角色或每次重啓僅只調用一次。在一個角色類的實例建立時,角色的構造函數的一部分的初始化代碼將每次都被調用,這發生在每次重啓時。
全部角色被監督着,即用故障處理策略連接到另外一個角色。當處理一個消息是,拋出一個異常的狀況下,演員可能從新啓動(見監管與監控)拋出一個異常。這重啓涉及上述提到鉤子:
1. 舊角色是經過調用preRestart方法進行通知的,這伴隨着形成重啓的異常與綁定該異常的消息;處理一個消息沒有形成這個重啓發生,則後者可能也沒有發生,例如,當一個監管者不捕獲該異常,則由其監管者重啓又或者若是因爲一個同類的失敗,一個角色將被從新啓動。若是消息是可用的,那麼該消息的發件人也能夠經過正常方式訪問的(即經過調用getSender())。
這個方法用在這些地方時最好的,例如:清除,準備交到新的角色實例等等。默認它中止全部子實例和調用postStop方法。
2. 來自actorOf方法調用的初始化工廠用來產生新的實例。
3. 新角色的postRestart方法被調用時這引發了重啓異常。默認狀況下,preStart 是被調用,就如同在正常啓動的狀況下。
一個角色重啓僅替換實際角色的對象;郵箱中的內容是不受重啓影響,因此消息的處理將在postRestart鉤子返回後恢復。引起異常的消息將不會再接收。當重啓時候,發送到角色的任何消息將像日常同樣排隊到它的郵箱。
注意:要知道,相關用戶失敗消息的順序是不肯定的。特別是,一個父類可能會從新啓動其子類以前它已經處理了在失敗以前子類發送故障的的最後消息。見討論:消息順序的詳細信息。
終止一個角色以後,其postStop鉤子被調用時,其可能用於例如從其餘服務註銷這個角色。在這個角色的消息隊列已禁用以後,這個鉤子仍保證運行,即送到已終止角色的信息將被重定向到ActorSystem的deadLetters。
做爲角色的引用,路徑和地址描述,每一個角色都有一個惟一的邏輯路徑,這是由如下的子類到父類直到達到角色系統的根的角色的鏈獲得的,它有一個物理路徑,若是監管鏈包括任何遠程監管者,這可能會有所不一樣。這些路徑是由系統使用來查找角色,如當接收到一個遠程的消息和收件人進行搜索,但他們也有更直接用法:角色能夠查找其餘角色經過指定絕對或相對路徑,邏輯或物理,並接收返回的結果的ActorSelection:
1 | // will look up this absolute path |
2 | getContext().actorSelection("/user/serviceA/actor"); |
3 | // will look up sibling beneath same supervisor |
4 | getContext().actorSelection("../joe"); |
其中指定的路徑被解釋爲一個java.net.URI, 它以 / 分隔成路徑段. 若是路徑以 /開始, 表示一個絕對路徑,從根監管者 ( 「/user」的父親)開始查找; 不然是從當前角色開始。若是某一個路徑段爲 .., 會找到當前所遍歷到的角色的上一級, 不然則會向下一級尋找具備該名字的子角色。 必須注意的是角色路徑中的.. 老是表示邏輯結構,也就是其監管者。
一個角色選擇集的路徑元素能夠包含通配符,容許消息額廣播到該選擇集:
1 | // will look all children to serviceB with names starting with worker |
2 | getContext().actorSelection("/user/serviceB/worker*"); |
3 | // will look up all siblings beneath same supervisor |
4 | getContext().actorSelection("../*"); |
信息能夠經過ActorSelection發送和當傳送的每一個消息時,查找ActorSelection的路徑。若是選擇集不匹配任何角色的消息將被丟棄。
爲了得到一個ActorSelection的ActorRef,你須要發送一個消息到選擇集和使用來自橘色的回覆的getSender引用。有一個內置的識別信息,即全部角色都理解並自動回覆一個包含ActorRef的ActorIdentity消息。此消息由該角色特殊處理,在這個意義上說是穿越的,若是一個具體的名稱查找失敗(即非通配符路徑元素不符合一個存在的角色)而後產生一個消極結果。請注意,這並不意味着傳遞的答覆是有保障的,但它仍然是一個正常的消息。
1 | import akka.actor.ActorIdentity; |
2 | import akka.actor.ActorSelection; |
3 | import akka.actor.Identify; |
01 | public class Follower extends UntypedActor { |
02 | final String identifyId = "1"; |
03 | { |
04 | ActorSelection selection = |
05 | getContext().actorSelection("/user/another"); |
06 | selection.tell(new Identify(identifyId), getSelf()); |
07 | } |
08 | ActorRef another; |
09 |
10 | final ActorRef probe; |
11 | public Follower(ActorRef probe) { |
12 | this.probe = probe; |
13 | } |
14 |
15 | @Override |
16 | public void onReceive(Object message) { |
17 | if (message instanceof ActorIdentity) { |
18 | ActorIdentity identity = (ActorIdentity) message; |
19 | if (identity.correlationId().equals(identifyId)) { |
20 | ActorRef ref = identity.getRef(); |
21 | if (ref == null) |
22 | getContext().stop(getSelf()); |
23 | else { |
24 | another = ref; |
25 | getContext().watch(another); |
26 | probe.tell(ref, getSelf()); |
27 | } |
28 | } |
29 | } else if (message instanceof Terminated) { |
30 | final Terminated t = (Terminated) message; |
31 | if (t.getActor().equals(another)) { |
32 | getContext().stop(getSelf()); |
33 | } |
34 | } else { |
35 | unhandled(message); |
36 | } |
37 | } |
38 | } |
您也能夠取得一個ActorSelection的ActorRef經過ActorSelection的resolveOne方法。它返回匹配ActorRef的Future,若是這樣一個角色存在。若是沒有這樣的角色存在或鑑定所提供的超時時間內沒有完成,它將已失敗了結akka.actor.ActorNotFound。
遠程角色地址也能夠查找,若是遠程被啓用:
1 | getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB"); |
一個關於actor查找的示例見 遠程查找.
注意:在支持actorSelection,actorFor是被廢棄,由於用actorFor得到的角色引用對本地與遠程角色表現不一樣。在一個本地角色引用的狀況下,查找以前命名的演員須要存在,不然所獲取的引用將是一個EmptyLocalActorRef。即便在獲取角色引用以後,一個真實路徑的角色才被建立,這時也是能夠獲取的。對於遠程角色引用經過actorFor來獲取的行爲不一樣的,發送信息到該引用上將在覆蓋下經過在遠程系統給每個消息發送的路徑查找角色。
重要信息:消息能夠是任何類型的對象,但必須是不可變的。阿Aka不能強制不變性,因此這必須按照約定。 這裏是一個不變的消息的示例:
01 | public class ImmutableMessage { |
02 | private final int sequenceNumber; |
03 | private final List<String> values; |
04 |
05 | public ImmutableMessage(int sequenceNumber, List<String> values) { |
06 | this.sequenceNumber = sequenceNumber; |
07 | this.values = Collections.unmodifiableList(new ArrayList<String>(values)); |
08 | } |
09 |
10 | public int getSequenceNumber() { |
11 | return sequenceNumber; |
12 | } |
13 |
14 | public List<String> getValues() { |
15 | return values; |
16 | } |
17 | } |
向actor發送消息是使用下列方法之一。
每個消息發送者分別保證本身的消息的次序.
注意:使用ask會形成性能影響,由於當超時是,一些事情須要保持追蹤。這須要一些東西來將一個Promise鏈接進入ActorRef,而且須要經過遠程鏈接可到達的。因此老是使用tell更偏向性能,除非必須才用ask.
在全部這些方法你能夠傳遞本身的ActorRef。讓它這樣作,由於這將容許接收的角色纔可以回覆您的郵件,由於發件人引用隨該信息一塊兒發送的。
這是發送消息的推薦方式。 不會阻塞地等待消息。它擁有最好的併發性和可擴展性。
1 | // don’t forget to think about who is the sender (2nd argument) |
2 | target.tell(message, getSelf()); |
發送者引用是伴隨着消息傳遞的,在接收角色可用範圍內,當處理該消息時,經過getSender方法。在一個角色內部一般是getSelf,這應該爲發送者,但也多是這種狀況,回覆被路由到一些其餘角色即該父類的第二個參數tell將是不一樣的一個。在角色外部,若是沒有回覆,第二個參數能夠爲null;若是在角色外部須要一個回覆,你可使用問答模式描,下面描述..
ask 模式既包含actor也包含future, 因此它是做爲一種使用模式,而不是ActorRef的方法:
1 | import static akka.pattern.Patterns.ask; |
2 | import static akka.pattern.Patterns.pipe; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.dispatch.Futures; |
6 | import akka.dispatch.Mapper; |
7 | import akka.util.Timeout; |
01 | final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); |
02 |
03 | final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>(); |
04 | futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout |
05 | futures.add(ask(actorB, "another request", t)); // using timeout from |
06 | // above |
07 |
08 | final Future<Iterable<Object>> aggregate = Futures.sequence(futures, |
09 | system.dispatcher()); |
10 |
11 | final Future<Result> transformed = aggregate.map( |
12 | new Mapper<Iterable<Object>, Result>() { |
13 | public Result apply(Iterable<Object> coll) { |
14 | final Iterator<Object> it = coll.iterator(); |
15 | final String x = (String) it.next(); |
16 | final String s = (String) it.next(); |
17 | return new Result(x, s); |
18 | } |
19 | }, system.dispatcher()); |
20 |
21 | pipe(transformed, system.dispatcher()).to(actorC); |
上面的例子展現了將 ask與 future上的 pipe 模式一塊兒使用,由於這是一種很是經常使用的組合。 請注意上面全部的調用都是徹底非阻塞和異步的: ask 產生 Future, 兩個經過Futures.sequence和map方法組合成一個新的Future,而後用 pipe 在future上安裝一個 onComplete-處理器來完成將收集到的 Result 發送到其它actor的動做。
使用 ask 將會像tell 同樣發送消息給接收方, 接收方必須經過getSender().tell(reply, getSelf()) 發送迴應來爲返回的 Future 填充數據。ask 操做包括建立一個內部actor來處理迴應,必須爲這個內部actor指定一個超時期限,過了超時期限內部actor將被銷燬以防止內存泄露。詳見下面:
注意:若是要以異常來填充future你須要發送一個 Failure 消息給發送方。這個操做不會在actor處理消息發生異常時自動完成。
1 | try { |
2 | String result = operation(); |
3 | getSender().tell(result, getSelf()); |
4 | } catch (Exception e) { |
5 | getSender().tell(new akka.actor.Status.Failure(e), getSelf()); |
6 | throw e; |
7 | } |
若是一個actor 沒有完成future , 它會在超時時限到來時過時, 明確做爲一個參數傳給ask方法,以 AskTimeoutException來完成Future。
關於如何等待或查詢一個future,更多信息請見Futures 。
Future的onComplete, onResult, 或 onTimeout 方法能夠用來註冊一個回調,以便在Future完成時獲得通知。從而提供一種避免阻塞的方法。
在使用future回調時,在角色內部你要當心避免關閉該角色的引用, 即不要在回調中調用該角色的方法或訪問其可變狀態。這會破壞角色的封裝,會引用同步bugbug和race condition, 由於回調會與此角色一同被併發調度。 不幸的是目前尚未一種編譯時的方法可以探測到這種非法訪問。 參閱: 角色與共享可變狀態
你能夠將消息從一個角色轉發給另外一個。雖然通過了一個‘中轉’,但最初的發送者地址/引用將保持不變。當實現功能相似路由器、負載均衡器、備份等的角色時會頗有用。同時你須要傳遞你的上下文變量。
1 | target.forward(result, getContext()); |
當一個角色收到被傳遞到onReceive方法的消息,這是在須要被定義的UntypedActor基類的抽象方法。
下面是個例子:
01 | import akka.actor.UntypedActor; |
02 | import akka.event.Logging; |
03 | import akka.event.LoggingAdapter; |
04 |
05 | public class MyUntypedActor extends UntypedActor { |
06 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
07 |
08 | public void onReceive(Object message) throws Exception { |
09 | if (message instanceof String) { |
10 | log.info("Received String message: {}", message); |
11 | getSender().tell(message, getSelf()); |
12 | } else |
13 | unhandled(message); |
14 | } |
15 | } |
除了使用IF-instanceof檢查,還有一種方法是使用Apache Commons MethodUtils調用指定的參數類型相匹配的消息類型方法。
若是你須要一個用來發送回應消息的目標,可使用 getSender, 它是一個角色引用。 你能夠用 getSender().tell(replyMsg, getSelf())向這個引用發送迴應消息。 你也能夠將這個ActorRef保存起來未來再做迴應。若是沒有sender (不是從actor發送的消息或者沒有future上下文) 那麼 sender 缺省爲 ‘死信’ 角色的引用。
1 | @Override |
2 | public void onReceive(Object msg) { |
3 | Object result = |
4 | // calculate result ... |
5 |
6 | // do not forget the second argument! |
7 | getSender().tell(result, getSelf()); |
8 | } |
在一個ReceiveTimeout消息發送觸發以後,該UntypedActorContext setReceiveTimeout定義不活動超時時間。當指定時,接收功能應該可以處理一個akka.actor.ReceiveTimeout消息。 1毫秒爲最小支持超時。
請注意,接受超時可能會觸發和在另外一條消息是入隊後,該ReceiveTimeout消息將重排隊;所以,它不能保證在接收到接收超時的必定有預先經過該方法所配置的空閒時段。
一旦設置,接收超時保持有效(即持續重複觸發超過不活動時間後)。傳遞Duration.Undefined關掉此功能。
01 | import akka.actor.ActorRef; |
02 | import akka.actor.ReceiveTimeout; |
03 | import akka.actor.UntypedActor; |
04 | import scala.concurrent.duration.Duration; |
05 |
06 | public class MyReceiveTimeoutUntypedActor extends UntypedActor { |
07 |
08 | public MyReceiveTimeoutUntypedActor() { |
09 | // To set an initial delay |
10 | getContext().setReceiveTimeout(Duration.create("30 seconds")); |
11 | } |
12 |
13 | public void onReceive(Object message) { |
14 | if (message.equals("Hello")) { |
15 | // To set in a response to a message |
16 | getContext().setReceiveTimeout(Duration.create("1 second")); |
17 | } else if (message instanceof ReceiveTimeout) { |
18 | // To turn it off |
19 | getContext().setReceiveTimeout(Duration.Undefined()); |
20 | } else { |
21 | unhandled(message); |
22 | } |
23 | } |
24 | } |
經過調用ActorRefFactory 即 ActorContext 或 ActorSystem 的 stop 方法來終止一個角色, 一般 context 用來終止子角色,而 system 用來終止頂級角色. 實際的終止操做是異步執行的, 即stop 可能在角色被終止以前返回。
若是當前有正在處理的消息,對該消息的處理將在actor被終止以前完成,可是郵箱中的後續消息將不會被處理。缺省狀況下這些消息會被送到 ActorSystem 的 死信, 可是這取決於郵箱的實現。
角色的終止分兩步: 第一步角色將中止對郵箱的處理,向全部子角色發送終止命令,而後處理來自子角色的終止消息直到全部的子角色都完成終止, 最後終止本身 (調用postStop, 銷燬郵箱, 向 DeathWatch 發佈 Terminated , 通知其監管者). 這個過程保證角色系統中的子樹以一種有序的方式終止, 將終止命令傳播到葉子結點並收集它們回送的確認消息給被終止的監管者。若是其中某個角色沒有響應 (即因爲處理消息用了太長時間以致於沒有收到終止命令), 整個過程將會被阻塞。
在 ActorSystem.shutdown被調用時, 系統根監管角色會被終止,以上的過程將保證整個系統的正確終止。
postStop hook 是在角色被徹底終止之後調用的。這是爲了清理資源:
1 | @Override |
2 | public void postStop() { |
3 | // clean up resources here ... |
4 | } |
注意:因爲角色的終止是異步的, 你不能立刻使用你剛剛終止的子角色的名字;這會致使 InvalidActorNameException. 你應該 watch 正在終止的 介紹而在最終到達的 Terminated消息的處理中建立它的替代者。
你也能夠向角色發送 akka.actor.PoisonPill 消息, 這個消息處理完成後角色會被終止。 PoisonPill 與普通消息同樣被放進隊列,所以會在已經入隊列的其它消息以後被執行。
像下面使用:
1 | myActor.tell(akka.actor.PoisonPill.getInstance(), sender); |
若是你想等待終止過程的結束,或者組合若干actor的終止次序,可使用gracefulStop:
1 | import static akka.pattern.Patterns.gracefulStop; |
2 | import scala.concurrent.Await; |
3 | import scala.concurrent.Future; |
4 | import scala.concurrent.duration.Duration; |
5 | import akka.pattern.AskTimeoutException; |
1 | try { |
2 | Future<Boolean> stopped = |
3 | gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN); |
4 | Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); |
5 | // the actor has been stopped |
6 | } catch (AskTimeoutException e) { |
7 | // the actor wasn't stopped within 5 seconds |
8 | } |
01 | public class Manager extends UntypedActor { |
02 |
03 | public static final String SHUTDOWN = "shutdown"; |
04 |
05 | ActorRef worker = getContext().watch(getContext().actorOf( |
06 | Props.create(Cruncher.class), "worker")); |
07 |
08 | public void onReceive(Object message) { |
09 | if (message.equals("job")) { |
10 | worker.tell("crunch", getSelf()); |
11 | } else if (message.equals(SHUTDOWN)) { |
12 | worker.tell(PoisonPill.getInstance(), getSelf()); |
13 | getContext().become(shuttingDown); |
14 | } |
15 | } |
16 |
17 | Procedure<Object> shuttingDown = new Procedure<Object>() { |
18 | @Override |
19 | public void apply(Object message) { |
20 | if (message.equals("job")) { |
21 | getSender().tell("service unavailable, shutting down", getSelf()); |
22 | } else if (message instanceof Terminated) { |
23 | getContext().stop(getSelf()); |
24 | } |
25 | } |
26 | }; |
27 | } |
當gracefulStop()成功返回時,角色的postStop()鉤子將被執行:存在一個狀況,happens-before 邊緣在postStop()結尾和gracefulStop()返回之間。
在上面的例子中一個自定義的Manager.SHUTDOWN消息被髮送到目標角色爲了初始化正在終止角色的處理。您可使用PoisonPill爲這一點,但在阻止目標的角色以前,你擁有不多機會與其餘角色進行交互。簡單的清除任務能夠在postStop中處理。
注意:請記住,一個角色終止和它的名字被註銷是互相異步發生的獨立事件。所以,在gracefulStop()後返回,它多是你會發現名稱仍然在使用。爲了保證正確的註銷,只能重複使用來自你控制監管者內與一個終止的消息的迴應的名稱,即不屬於頂級的角色。
Akka支持在運行時對角色消息循環 (例如它的的實現)進行實時替換: 在角色中調用getContext.become 方法。 熱替換的代碼被存在一個棧中,能夠被pushed(replacing 或 adding 在頂部)和popped。
注意:請注意角色被其監管者重啓後將恢復其最初的行爲。
熱替換角色使用getContext().become:
1 | import akka.japi.Procedure; |
01 | public class HotSwapActor extends UntypedActor { |
02 |
03 | Procedure<Object> angry = new Procedure<Object>() { |
04 | @Override |
05 | public void apply(Object message) { |
06 | if (message.equals("bar")) { |
07 | getSender().tell("I am already angry?", getSelf()); |
08 | } else if (message.equals("foo")) { |
09 | getContext().become(happy); |
10 | } |
11 | } |
12 | }; |
13 |
14 | Procedure<Object> happy = new Procedure<Object>() { |
15 | @Override |
16 | public void apply(Object message) { |
17 | if (message.equals("bar")) { |
18 | getSender().tell("I am already happy :-)", getSelf()); |
19 | } else if (message.equals("foo")) { |
20 | getContext().become(angry); |
21 | } |
22 | } |
23 | }; |
24 |
25 | public void onReceive(Object message) { |
26 | if (message.equals("bar")) { |
27 | getContext().become(angry); |
28 | } else if (message.equals("foo")) { |
29 | getContext().become(happy); |
30 | } else { |
31 | unhandled(message); |
32 | } |
33 | } |
34 | } |
become 方法還有不少其它的用處,一個特別好的例子是用它來實現一個有限狀態機(FSM)。這將代替當前行爲(即行爲棧頂部),這意味着你不用使用unbecome,而是下一個行爲將明確被安裝。
使用become另外一個方式:不代替而是添加到行爲棧頂部。這種狀況是必需要保證在長期運行中「pop」操做(即unbecome)數目匹配「push」數目,不然這個數目將致使內存泄露(這就是該行爲不是默認緣由)。
01 | public class UntypedActorSwapper { |
02 |
03 | public static class Swap { |
04 | public static Swap SWAP = new Swap(); |
05 |
06 | private Swap() { |
07 | } |
08 | } |
09 |
10 | public static class Swapper extends UntypedActor { |
11 | LoggingAdapter log = Logging.getLogger(getContext().system(), this); |
12 |
13 | public void onReceive(Object message) { |
14 | if (message == SWAP) { |
15 | log.info("Hi"); |
16 | getContext().become(new Procedure<Object>() { |
17 | @Override |
18 | public void apply(Object message) { |
19 | log.info("Ho"); |
20 | getContext().unbecome(); // resets the latest 'become' |
21 | } |
22 | }, false); // this signals stacking of the new behavior |
23 | } else { |
24 | unhandled(message); |
25 | } |
26 | } |
27 | } |
28 |
29 | public static void main(String... args) { |
30 | ActorSystem system = ActorSystem.create("MySystem"); |
31 | ActorRef swap = system.actorOf(Props.create(Swapper.class)); |
32 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
33 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
34 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
35 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
36 | swap.tell(SWAP, ActorRef.noSender()); // logs Hi |
37 | swap.tell(SWAP, ActorRef.noSender()); // logs Ho |
38 | } |
39 |
40 | } |
該UntypedActorWithStash類使一個角色臨時藏匿不能或不該該使用角色的當前行爲處理的消息。在改變角色的消息處理函數,即調用getContext().become()或getContext().unbecome(),全部藏匿的消息能夠「unstashed」,所以前面加上他們角色的郵箱。這樣一來,藏消息能夠在他們已經收到原先相同的順序進行處理。擴展UntypedActorWithStash角色將自動得到一個雙端隊列爲基礎的郵箱。
注意:抽象類UntypedActorWithStash實現標記接口RequiresMessageQueue這要求系統可以爲該角色自動選擇基於雙端隊列的郵箱實現。若是你想更多的控制權郵箱,請見郵箱文檔:郵箱
。
這裏是UntypedActorWithStash類中操做的示例:
1 | import akka.actor.UntypedActorWithStash; |
01 | public class ActorWithProtocol extends UntypedActorWithStash { |
02 | public void onReceive(Object msg) { |
03 | if (msg.equals("open")) { |
04 | unstashAll(); |
05 | getContext().become(new Procedure<Object>() { |
06 | public void apply(Object msg) throws Exception { |
07 | if (msg.equals("write")) { |
08 | // do writing... |
09 | } else if (msg.equals("close")) { |
10 | unstashAll(); |
11 | getContext().unbecome(); |
12 | } else { |
13 | stash(); |
14 | } |
15 | } |
16 | }, false); // add behavior on top instead of replacing |
17 | } else { |
18 | stash(); |
19 | } |
20 | } |
21 | } |
調用stash()將當前的消息(即角色最後收到的消息)到角色的藏匿處。當在處理默認狀況下在角色的消息處理函數來隱藏那些沒有被其餘案件處理的狀況時,這是典型調用。同一消息的兩次是非法藏匿;這樣作會致使一個IllegalStateException被拋出。藏匿也能夠此狀況下,調用stath()能會致使容量違規,這致使StashOverflowException。藏匿的容量可使用郵箱的配置的藏匿容量設置(一個Int類型)進行配置。
調用unstashAll()從藏匿到角色的郵箱進入隊列消息,直到信箱(若是有的話)已經達到的能力(請注意,從藏匿處的消息前加上郵箱)。若是一個有界的郵箱溢出,一個MessageQueueAppendFailedException被拋出。在調用unstashAll()後,藏匿保證爲空。
藏匿由scala.collection.immutable.Vector支持。這樣一來,即便是很是大量的消息在不會對性能產生重大影響下被藏匿。
注意,藏匿是短暫的角色狀態的一部分,該郵箱不像。所以,應該像具備相同屬性的角色狀態的其餘部分進行管理。該preRestart的UntypedActorWithStash的實現將調用unstashAll(),它一般是所指望的行爲。
注意:若是要強制執行,你的角色只能用一個無上限stash進行工做,那麼你應該使用UntypedActorWithUnboundedStash類代替。
您能夠經過發送一個
kill消息殺一個角色。這將致使角色拋出一個
ActorKilledException,引起故障。角色將暫停運做,其監管這將被要求如何處理失敗,這可能意味着恢復的角色,從新啓動,或徹底終止它。請見 監管手段以獲取更多信息。
使用Kill像下面:
1 | victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender()); |
在消息被actor處理的過程當中可能會拋出異常,例如數據庫異常。
若是消息處理過程當中(即從郵箱中取出並交給receive後)發生了異常,這個消息將被丟失。必須明白它不會被放回到郵箱中。因此若是你但願重試對消息的處理,你須要本身抓住異常而後在異常處理流程中重試. 請確保你限制重試的次數,由於你不會但願系統產生活鎖 (從而消耗大量CPU而於事無補)。另外一種可能性請查看PeekMailbox pattern
若是消息處理過程當中發生異常,郵箱沒有任何變化。若是actor被重啓,郵箱會被保留。郵箱中的全部消息不會丟失。
若是角色內代碼拋出了異常,那麼角色將被暫停,接着監管者處理開始(見監管與監控)。依賴監管者決策角色將被恢復(就像什麼事情沒發生),重啓(擦除內部狀態從新開始)或終止。
角色鉤子的豐富的生命週期提供了實現各類初始化模式的有用工具。在一個ActorRef的生命週期,一個角色可能會經歷屢次從新啓動後,當舊的實例替換爲新的,對外面觀察這是不可見的,僅僅看見ActorRef。
有人可能會想到「化身」的新實例。初始化可能須要一個角色的每個化身,但有時人們須要初始化僅發生在第一個實例誕生時,當ActorRef被建立。如下各節提供的模式爲不一樣的初始化需求。
使用構造函數初始化有着各類好處。首先,它使得有可能使用的val字段來存儲任何狀態,這並不在角色實例的生命週期中變化,使得角色實現更加健壯。該構造函數被角色的每個化身調用,因此角色的內部老是能夠認爲正確的初始化發生。這也是這種方法的缺點,由於有當一我的想避免在從新啓動時從新初始化的內部狀況。例如,在重啓過程,保持整個子角色一般是有用。如下部分提供了這種狀況下的模式。
在的第一個實例的初始化過程當中,一個角色的preStart()方法僅僅被直接調用一次,那就是,在ActorRef的建立。在從新啓動的狀況下,preStart()從postRestart()被調用,所以,若是不重寫,preStart()被每個化身調用。然而,覆蓋postRestart(),能夠禁用此行爲,並確保只調用一次preStart()。
這種模式的一個有用的用法是在從新啓動時禁止建立子類新的ActorRef。這能夠經過覆蓋preRestart()來實現:
01 | @Override |
02 | public void preStart() { |
03 | // Initialize children here |
04 | } |
05 |
06 | // Overriding postRestart to disable the call to preStart() |
07 | // after restarts |
08 | @Override |
09 | public void postRestart(Throwable reason) { |
10 | } |
11 |
12 | // The default implementation of preRestart() stops all the children |
13 | // of the actor. To opt-out from stopping the children, we |
14 | // have to override preRestart() |
15 | @Override |
16 | public void preRestart(Throwable reason, Option<Object> message) |
17 | throws Exception { |
18 | // Keep the call to postStop(), but no stopping of children |
19 | postStop(); |
20 | } |
請注意,該子角色還在從新啓動,但不會建立新的ActorRef。對子類能夠遞歸應用相同的原則,確保他們的preStart()方法被只在建立本身的引用時調用。
瞭解更多信息,請參閱What Restarting Means:
有這樣的狀況,在構造函數中,當它是不可能傳遞所需的全部角色初始化的信息,例如,在存在循環的依賴關係。在這種狀況下,角色應該聽一個初始化消息,並利用become()或有限狀態機的狀態對角色的初始化和未初始化的狀態進行編碼。
01 | private String initializeMe = null; |
02 |
03 | @Override |
04 | public void onReceive(Object message) throws Exception { |
05 | if (message.equals("init")) { |
06 | initializeMe = "Up and running"; |
07 | getContext().become(new Procedure<Object>() { |
08 | @Override |
09 | public void apply(Object message) throws Exception { |
10 | if (message.equals("U OK?")) |
11 | getSender().tell(initializeMe, getSelf()); |
12 | } |
13 | }); |
14 | } |
15 | } |
若是在初始化以前,角色能夠接收消息,一個有用的工具多是Stash,能夠保存消息直到初始化完成,在角色初始化以後從新放回。
注意:這個模式應當心使用,而且當上述的模式都不適用才使用。其中一個潛在的問題是,消息可能會在發送給遠程角色丟失。另外,在未初始化狀態發佈一個ActorRef可能致使在其接收用戶信息的初始化以前已經完成。