Akka 中的有類型 Actor 是 Active Objects 模式的一種實現. Smalltalk誕生之時,就已經缺省地將方法調用從同步操做發爲異步派發。 api
有類型 Actor 由兩 「部分」 組成, 一個public接口和一個實現, 若是你有 「企業級」 Java的開發經驗, 這對你應該很是熟悉。 對普通actor來講,你擁有一個外部API (public接口的實例) 來將方法調用異步地委託給其實現類的私有實例。 app
有類型Actor相對於普通Actor的優點在於有類型Actor擁有靜態的契約, 你不須要定義你本身的消息, 它的劣勢在於對你能作什麼和不能作什麼進行了一些限制,好比 你不能使用 become/unbecome. 異步
有類型Actor是使用 JDK Proxies 實現的,JDK Proxies提供了很是簡單的api來攔截方法調用。 ide
注意 函數
和普通Akka actor同樣,有類型actor也一次處理一個消息。 工具
何時使用有類型的Actor 測試
有類型的Actor很適合用在鏈接actor系統和非actor的代碼,由於它可使你能在外部編寫正常的OO模式的代碼。但切記不可濫用。 this
工具箱 spa
返回有類型actor擴展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem
.net
判斷一個引用是不是有類型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);
返回一個外部有類型actor代理所表明的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);
返回當前的ActorContext//Returns the current ActorContext,
此方法僅在一個TypedActor 實現的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();
返回當前有類型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法僅在一個TypedActor 實現的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();
返回一個有類型Actor擴展的上下文實例//Returns a contextual instance of the Typed Actor Extension
這意味着若是你用它建立其它的有類型actor,它們會成爲當前有類型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());
具體例子及說明
package practise.akka.typedactors import akka.dispatch.Future import akka.japi.Option /** * 這個就是對外的接口,各函數就是Typed Actor的接口方法 */ public interface Squarer { void squareDontCare(int i); //fire-forget Future<Integer> square(int i); //non-blocking send-request-reply Option<Integer> squareNowPlease(int i);//blocking send-request-reply int squareNow(int i); //blocking send-request-reply }
package practise.akka.typedactors import akka.dispatch.Future import akka.dispatch.Futures import akka.actor.TypedActor import akka.japi.Option import akka.actor.ActorContext import groovy.util.logging.Log4j import akka.actor.ActorRef /** * 這個是接口實現。(實現akka.actor.TypedActor.Receiver接口就能接收actor發來的普通消息(非函數調用消息)。) */ @Log4j class SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver { private String name; public SquarerImpl() { this.name = "default"; } public SquarerImpl(String name) { this.name = name; } public void squareDontCare(int i) { log.debug("squareDontCare,fire-and-forget只接收不返回結果,與ActorRef.tell徹底一致----" + i) //能夠從線程號看出是異步處理的 int sq = i * i; //Nobody cares :( //返回當前的ActorContext, // 此方法僅在一個TypedActor 實現的方法中有效 ActorContext context = TypedActor.context(); println "context ---- " + context //返回當前有類型actor的外部代理, // 此方法僅在一個TypedActor 實現的方法中有效 Squarer mysq = TypedActor.<Squarer> self(); println "--self --" + mysq } public Future<Integer> square(int i) { log.debug("square send-request-reply Future----" + i) //能夠從線程號看出是異步處理的 return Futures.successful(i * i, TypedActor.dispatcher()); } public Option<Integer> squareNowPlease(int i) { log.debug("squareNowPlease send-request-reply Option----" + i) //能夠從線程號看出是異步處理的 return Option.some(i * i); } public int squareNow(int i) { log.debug("squareNow send-request-reply result----" + i) //能夠從線程號看出是異步處理的 return i * i; } @Override void onReceive(Object o, ActorRef actorRef) { log.debug("TypedActor收到消息----${o}---from:${actorRef}") } }
package practise.akka.typedactors import akka.actor.ActorSystem import akka.actor.TypedActor import akka.actor.TypedProps import com.typesafe.config.ConfigFactory import akka.japi.Creator import groovy.util.logging.Log4j import akka.actor.ActorContext /** * 這裏建立Typed Actor. */ @Log4j class TypedActorsFactory { ActorSystem system private final String config = """akka { loglevel = "${log?.debugEnabled ? "DEBUG" : "INFO"}" actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.hostname = "127.0.0.1" remote.netty.port = 2552 remote.log-received-messages = on remote.log-sent-messages = on }""" TypedActorsFactory(String sysName) { this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config)) } Squarer getTypedActorDefault() { Squarer mySquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class)); //這裏建立的是代理類型 return mySquarer } Squarer getTypedActor(String name) { Squarer otherSquarer = TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, new Creator<SquarerImpl>() { public SquarerImpl create() { return new SquarerImpl(name); } //這裏建立的是具體的實現類型 }), name); //這個name是actor的name:akka//sys@host:port/user/name return otherSquarer } }
下面用幾個測試用例實驗一下
package practise.akka.typedactors import akka.actor.ActorRef import akka.actor.TypedActor import akka.actor.UntypedActorContext import akka.dispatch.Future import com.baoxian.akka.AkkaClientNoReply import com.baoxian.akka.AkkaServerApp class TestTypedActors extends GroovyTestCase { def testTypeActor() { println("----") TypedActorsFactory factory = new TypedActorsFactory("typedServer") // Squarer squarer = factory?.getTypedActorDefault() //建立代理 Squarer squarer = factory?.getTypedActor("serv") //具體實現 squarer?.squareDontCare(10) Future future = squarer?.square(10) AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result") //這是我本身構建的接收器 app.messageProcessor = {msg, UntypedActorContext context -> log.info("結果爲" + msg) } app.startup() akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回結果pipe到接收器中了,在log中能看到結果 println "----" + squarer?.squareNowPlease(10)?.get() println "----" + squarer?.squareNow(10) //返回有類型actor擴展 TypedActor.get(factory.system) //返回一個外部有類型actor代理所表明的Akka actor ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer); actor.tell("消息") //這個消息將會在SquarerImpl的onReceive方法中接收到 sleep(1000 * 60 * 10) // TypedActor.get(factory.system).stop(squarer); //這將會盡快地異步終止與指定的代理關聯的有類型Actor TypedActor.get(factory.system).poisonPill(squarer);//這將會在有類型actor完成全部在當前調用以前對它的調用後異步地終止它 } def testRemoteTypedActor() { AkkaClientNoReply client = new AkkaClientNoReply("akka://typedServer@127.0.0.1:2552/user/serv") client.send("遠程消息") //這將會在SquarerImpl的onReceive方法中接收到 sleep(1000) client.shutdown() } }