大量的actor在並行工做的時候,處理到來的消息流,這時候就須要一個組件或者東西來引導消息從源到目的地Actor,這個組件或者東西就是Router
在Akka中,router也是一種actor 類型,它路由到來的消息到其餘的actors,其餘那些actors就叫作routees(被路由對象) java
Routing模式由Router和Routee組成:
Routee是負責具體運算的Actor
Router 是把外界發來消息按照某種指定的方式(默認提供了多種路由邏輯類)分配給Routee去運算,用於調度actor任務或進行負載均衡策略node
Router又可分Pool和Group兩種模式:
Router-Pool模式中Router負責構建全部的Routee。路由器建立routee做爲子actor,並在該子actor終止時將它從路由器中移除。api
Router-Group模式中的Routees由外界其它Actor產生(自行建立,自行管理),特色是能實現靈活的Routee構建和監控app
akka.routing.RoundRobinRoutingLogic 輪詢
akka.routing.RandomRoutingLogic 隨機
akka.routing.SmallestMailboxRoutingLogic 空閒
akka.routing.BroadcastRoutingLogic 廣播
akka.routing.ScatterGatherFirstCompletedRoutingLogic
akka.routing.TailChoppingRoutingLogic
akka.routing.ConsistentHashingRoutingLogic
package router import akka.actor.AbstractActor import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.japi.pf.ReceiveBuilder import akka.routing.* import org.slf4j.LoggerFactory /** * Created by: tankx * Date: 2019/7/20 * Description: 路由示例 */ class WorkActor : AbstractActor() { var log = LoggerFactory.getLogger(WorkActor::class.java) override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(msg: Any) { log.info(" {}: $msg", self.path()) } } class RouterActor : AbstractActor() { var log = LoggerFactory.getLogger(RouterActor::class.java) private lateinit var router: Router; override fun preStart() { super.preStart() var list = arrayListOf<Routee>() for (i in 1..10) { var worker = context.actorOf(Props.create(WorkActor::class.java), "worker_$i") list.add(ActorRefRoutee(worker)) } /** * 路由方式 * RoundRobinRoutingLogic: 輪詢 * BroadcastRoutingLogic: 廣播 * RandomRoutingLogic: 隨機 * SmallestMailboxRoutingLogic: 空閒 */ router = Router(RoundRobinRoutingLogic(), list) } override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(msg: Any) { //log.info("RouterActor : $msg") router.route(msg, sender) } } fun main() { var actorSystem = ActorSystem.create("RouterSystem") var routerActor = actorSystem.actorOf(Props.create(RouterActor::class.java)) for (i in 1..20) { Thread.sleep(2000) routerActor.tell("消息來了", ActorRef.noSender()) } }
package router.cluster import akka.actor.AbstractActor import akka.actor.Props import akka.japi.pf.ReceiveBuilder import org.slf4j.LoggerFactory import router.WorkActor /** * Created by: tankx * Date: 2019/7/20 * Description: */ class WorkActor : AbstractActor() { var log = LoggerFactory.getLogger(WorkActor::class.java) override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(msg: Any) { log.info(" {}: $msg", self.path()) } }
package router.cluster import akka.actor.* import akka.japi.pf.ReceiveBuilder import org.slf4j.LoggerFactory import router.WorkActor import akka.remote.routing.RemoteRouterConfig import akka.routing.BroadcastGroup import akka.routing.BroadcastPool import akka.routing.FromConfig import akka.routing.RoundRobinPool import com.typesafe.config.ConfigFactory /** * Created by: tankx * Date: 2019/7/20 * Description: */ class MyService : AbstractActor() { var log = LoggerFactory.getLogger(MyService::class.java) override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(msg: Any) { log.info(" {}: $msg", self.path()) } } fun getActorSystem(port: Int): ActorSystem { val config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=$port" ).withFallback( ConfigFactory.load("application_router.conf") ) var actorSystem = ActorSystem.create("RouterSystem", config); return actorSystem } //讀取配置文件方式 fun loadRouterByConfig() { val actorSystem = getActorSystem(3662); var router = actorSystem.actorOf(FromConfig.getInstance().props(), "workerRouter") for (i in 1..10) { router.tell("test", ActorRef.noSender()) Thread.sleep(2000) } } //代碼方式 fun loadRouterByCode() { val actorSystem = getActorSystem(3662); var address = listOf<Address>( AddressFromURIString.parse("akka.tcp://RouterSystem@127.0.0.1:2663"), AddressFromURIString.parse("akka.tcp://RouterSystem@127.0.0.1:2661") ) //var paths = listOf("/user/MyWorker")//能夠配置多個地址 akka://RouterSystem/user/MyWorker var router = actorSystem.actorOf( RemoteRouterConfig( BroadcastPool(2), address ).props(Props.create(router.cluster.WorkActor::class.java)), "workerRouter" ) for (i in 1..20) { router.tell("test", ActorRef.noSender()) Thread.sleep(2000) } } fun main() { //loadRouterByCode() loadRouterByConfig() }
配置文件負載均衡
akka { actor { provider = "cluster" } remote { //log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2661 } } # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class // remote.artery { // enabled = on // transport = tcp // canonical.port = 2661 // canonical.hostname = 127.0.0.1 // } cluster { seed-nodes = [ "akka.tcp://RouterSystem@127.0.0.1:2661" ] # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. auto-down-unreachable-after = 10s } }
分不一樣JVM啓動dom
fun main() { var actorSystem = getActorSystem(2661) var workActor = actorSystem.actorOf(Props.create(WorkActor::class.java), "MyWorker") }
fun main() { var actorSystem = getActorSystem(2663) var workActor = actorSystem.actorOf(Props.create(WorkActor::class.java), "MyWorker") }
結果調用輸出tcp
2019-07-22 12:25:42.589 [RouterSystem-akka.actor.default-dispatcher-21] INFO router.WorkActor - akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test 2019-07-22 12:25:44.589 [RouterSystem-akka.actor.default-dispatcher-2] INFO router.WorkActor - akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test 2019-07-22 12:25:46.590 [RouterSystem-akka.actor.default-dispatcher-2] INFO router.WorkActor - akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test
經過以上示例基本能夠了解AKK的路由模式與應用了。ide