Akka路由_BroadcastRoutingLogicjava
路由actor是將收到的消息路由到目的actor的actor. 路由acto將消息發送給它所管理的稱爲 ‘routees’ 的actor。tcp
在Akka中,受Router管理的actor稱做 Routeeide
BroadcastRoutingLogic——這裏作了一個示例,講的是廣播機制的路由策略。大概是這樣的,建立3個actor,經過廣播機制給這三個actor廣播消息。spa
以下,繼承CustomRouterConfig實現router,code
package com.usoft10; import akka.actor.ActorSystem; import akka.actor.Props; import akka.routing.ActorRefRoutee; import akka.routing.BroadcastRoutingLogic; import akka.routing.CustomRouterConfig; import akka.routing.Routee; import akka.routing.Router; import java.util.ArrayList; import java.util.List; public class BurstyMessageRouter extends CustomRouterConfig { private int noOfInstances; public BurstyMessageRouter(int inNoOfInstances) { noOfInstances = inNoOfInstances; } @Override public Router createRouter(ActorSystem system) { final List<Routee> routees = new ArrayList<Routee>(noOfInstances); for (int i = 0; i < noOfInstances; i++) { routees.add(new ActorRefRoutee(system.actorOf( Props.create(MsgEchoActor.class), "actor-" + String.valueOf(i)))); } return new Router(new BroadcastRoutingLogic(), routees); } }
實現接收消息的Actor,以下,orm
package com.usoft10; import akka.actor.UntypedActor; public class MsgEchoActor extends UntypedActor { @Override public void onReceive(Object msg) throws Exception { System.out.println(String.format("Received Message '%s' in Actor %s", msg, getSelf().path())); } }
運行,router
package com.usoft10; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class Example { /** * @param args */ public static void main(String[] args) throws InterruptedException { ActorSystem _system = ActorSystem.create("CustomRouterExample"); ActorRef burstyMessageRouter = _system.actorOf(Props.create( MsgEchoActor.class).withRouter(new BurstyMessageRouter(3)), "MsgEchoActor"); //sends series of messages in a broadcast way to all the actors burstyMessageRouter.tell("are you ready?", ActorRef.noSender()); Thread.sleep(2000); _system.shutdown(); } }
運行結果,能夠看到三個actor經過廣播的router接收到了消息。。。繼承
[INFO] [01/20/2015 14:58:51.828] [main] [Remoting] Starting remoting路由
[INFO] [01/20/2015 14:58:52.529] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://CustomRouterExample@127.0.0.1:2552]rem
[INFO] [01/20/2015 14:58:52.532] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://CustomRouterExample@127.0.0.1:2552]
Received Message 'are you ready?' in Actor akka://CustomRouterExample/user/actor-0
Received Message 'are you ready?' in Actor akka://CustomRouterExample/user/actor-2
Received Message 'are you ready?' in Actor akka://CustomRouterExample/user/actor-1
[INFO] [01/20/2015 14:58:54.577] [CustomRouterExample-akka.remote.default-remote-dispatcher-6] [akka.tcp://CustomRouterExample@127.0.0.1:2552/system/remoting-terminator] Shutting down remote daemon.
[INFO] [01/20/2015 14:58:54.579] [CustomRouterExample-akka.remote.default-remote-dispatcher-6] [akka.tcp://CustomRouterExample@127.0.0.1:2552/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [01/20/2015 14:58:54.637] [ForkJoinPool-3-worker-7] [Remoting] Remoting shut down
[INFO] [01/20/2015 14:58:54.638] [CustomRouterExample-akka.remote.default-remote-dispatcher-6] [akka.tcp://CustomRouterExample@127.0.0.1:2552/system/remoting-terminator] Remoting shut down.
Process finished with exit code 0
============END============