AKKA Router路由

路由概念

大量的actor在並行工做的時候,處理到來的消息流,這時候就須要一個組件或者東西來引導消息從源到目的地Actor,這個組件或者東西就是Router
在Akka中,router也是一種actor 類型,它路由到來的消息到其餘的actors,其餘那些actors就叫作routees(被路由對象) java


Routing模式由Router和Routee組成:
Routee是負責具體運算的Actor
Router 是把外界發來消息按照某種指定的方式(默認提供了多種路由邏輯類)分配給Routee去運算,用於調度actor任務或進行負載均衡策略node

 

Pool 和 Group模式


Router又可分Pool和Group兩種模式:
Router-Pool模式中Router負責構建全部的Routee。路由器建立routee做爲子actor,並在該子actor終止時將它從路由器中移除。api

Router-Group模式中的Routees由外界其它Actor產生(自行建立,自行管理),特色是能實現靈活的Routee構建和監控app

 

路由邏輯類

akka.routing.RoundRobinRoutingLogic  輪詢
akka.routing.RandomRoutingLogic  隨機
akka.routing.SmallestMailboxRoutingLogic 空閒
akka.routing.BroadcastRoutingLogic 廣播
akka.routing.ScatterGatherFirstCompletedRoutingLogic
akka.routing.TailChoppingRoutingLogic
akka.routing.ConsistentHashingRoutingLogic

 

實現示例:

 

package router

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.japi.pf.ReceiveBuilder
import akka.routing.*
import org.slf4j.LoggerFactory

/**
 * Created by: tankx
 * Date: 2019/7/20
 * Description: 路由示例
 */
class WorkActor : AbstractActor() {
  var log = LoggerFactory.getLogger(WorkActor::class.java)

  override fun createReceive(): Receive {
    return ReceiveBuilder.create().matchAny(this::receive).build()
  }


  fun receive(msg: Any) {
    log.info(" {}: $msg", self.path())
  }
}

class RouterActor : AbstractActor() {

  var log = LoggerFactory.getLogger(RouterActor::class.java)

  private lateinit var router: Router;

  override fun preStart() {
    super.preStart()
    var list = arrayListOf<Routee>()

    for (i in 1..10) {
      var worker = context.actorOf(Props.create(WorkActor::class.java), "worker_$i")
      list.add(ActorRefRoutee(worker))
    }

    /**
     * 路由方式
     * RoundRobinRoutingLogic: 輪詢
     * BroadcastRoutingLogic: 廣播
     * RandomRoutingLogic: 隨機
     * SmallestMailboxRoutingLogic: 空閒
     */
    router = Router(RoundRobinRoutingLogic(), list)

  }

  override fun createReceive(): Receive {
    return ReceiveBuilder.create().matchAny(this::receive).build()
  }


  fun receive(msg: Any) {
    //log.info("RouterActor : $msg")
    router.route(msg, sender)
  }

}

fun main() {

  var actorSystem = ActorSystem.create("RouterSystem")

  var routerActor = actorSystem.actorOf(Props.create(RouterActor::class.java))



  for (i in 1..20) {

    Thread.sleep(2000)

    routerActor.tell("消息來了", ActorRef.noSender())
  }

}

 

指定path模式,遠程調用(集羣內部訪問)

package router.cluster

import akka.actor.AbstractActor
import akka.actor.Props
import akka.japi.pf.ReceiveBuilder
import org.slf4j.LoggerFactory
import router.WorkActor

/**
 * Created by: tankx
 * Date: 2019/7/20
 * Description:
 */
class WorkActor : AbstractActor() {
  var log = LoggerFactory.getLogger(WorkActor::class.java)

  override fun createReceive(): Receive {
    return ReceiveBuilder.create().matchAny(this::receive).build()
  }


  fun receive(msg: Any) {
    log.info(" {}: $msg", self.path())
  }

}

 

package router.cluster

import akka.actor.*
import akka.japi.pf.ReceiveBuilder
import org.slf4j.LoggerFactory
import router.WorkActor
import akka.remote.routing.RemoteRouterConfig
import akka.routing.BroadcastGroup
import akka.routing.BroadcastPool
import akka.routing.FromConfig
import akka.routing.RoundRobinPool
import com.typesafe.config.ConfigFactory


/**
 * Created by: tankx
 * Date: 2019/7/20
 * Description:
 */
class MyService : AbstractActor() {

  var log = LoggerFactory.getLogger(MyService::class.java)

  override fun createReceive(): Receive {
    return ReceiveBuilder.create().matchAny(this::receive).build()
  }


  fun receive(msg: Any) {
    log.info(" {}: $msg", self.path())
  }

}


fun getActorSystem(port: Int): ActorSystem {

  val config = ConfigFactory.parseString(
    "akka.remote.netty.tcp.port=$port"
  ).withFallback(
    ConfigFactory.load("application_router.conf")
  )

  var actorSystem = ActorSystem.create("RouterSystem", config);


  return actorSystem
}
//讀取配置文件方式
fun loadRouterByConfig() {

  val actorSystem = getActorSystem(3662);

  var router = actorSystem.actorOf(FromConfig.getInstance().props(), "workerRouter")

  for (i in 1..10) {
    router.tell("test", ActorRef.noSender())
    Thread.sleep(2000)
  }
}
//代碼方式
fun loadRouterByCode() {

  val actorSystem = getActorSystem(3662);

  var address = listOf<Address>(
    AddressFromURIString.parse("akka.tcp://RouterSystem@127.0.0.1:2663"),
    AddressFromURIString.parse("akka.tcp://RouterSystem@127.0.0.1:2661")
  )
  //var paths = listOf("/user/MyWorker")//能夠配置多個地址 akka://RouterSystem/user/MyWorker

  var router = actorSystem.actorOf(
    RemoteRouterConfig(
      BroadcastPool(2),
      address
    ).props(Props.create(router.cluster.WorkActor::class.java)), "workerRouter"
  )


  for (i in 1..20) {
    router.tell("test", ActorRef.noSender())
    Thread.sleep(2000)
  }
}


fun main() {
  //loadRouterByCode()

  loadRouterByConfig()

}

配置文件負載均衡

akka {
  actor {
    provider = "cluster"
  }

  remote {
    //log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2661
    }
  }

  # For the sample, just bind to loopback and do not allow access from the network
  # the port is overridden by the logic in main class
  //  remote.artery {
  //    enabled = on
  //    transport = tcp
  //    canonical.port = 2661
  //    canonical.hostname = 127.0.0.1
  //  }

  cluster {
    seed-nodes = [
      "akka.tcp://RouterSystem@127.0.0.1:2661"
    ]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

 

分不一樣JVM啓動dom

fun main() {

  var actorSystem = getActorSystem(2661)
  var workActor = actorSystem.actorOf(Props.create(WorkActor::class.java), "MyWorker")

}

 

fun main() {

  var actorSystem = getActorSystem(2663)
  var workActor = actorSystem.actorOf(Props.create(WorkActor::class.java), "MyWorker")


}

 

結果調用輸出tcp

2019-07-22 12:25:42.589 [RouterSystem-akka.actor.default-dispatcher-21] INFO  router.WorkActor -  akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test
2019-07-22 12:25:44.589 [RouterSystem-akka.actor.default-dispatcher-2] INFO  router.WorkActor -  akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test
2019-07-22 12:25:46.590 [RouterSystem-akka.actor.default-dispatcher-2] INFO  router.WorkActor -  akka://RouterSystem/remote/akka.tcp/RouterSystem@127.0.0.1:3662/user/workerRouter/c2: test

 

經過以上示例基本能夠了解AKK的路由模式與應用了。ide

相關文章
相關標籤/搜索