AKKA 集羣中的發佈與訂閱Distributed Publish Subscribe in Cluster

Distributed Publish Subscribe in Cluster

基本定義

在單機環境下訂閱與發佈是很經常使用的,然而在集羣環境是比較麻煩和很差實現的;java

AKKA已經提供了相應的實現,集羣環境各節點之間的actor相互訂閱發佈感興的主題的消息,node

關鍵依賴媒介actor: akka.cluster.pubsub.DistributedPubSubMediatorapp


訂閱:

DistributedPubSubMediator.Subscribe方法將actor註冊到本地中介者。
成功的訂閱和取消訂閱由DistributedPubSubMediator.SubscribeAck和DistributedPubSubMediator.UnsubscribeAck應答確認。這個確認消息意味着訂閱已經註冊了,可是它仍然須要花費一些時間複製到其它的節點上。節點之間發現與註冊會有必定延遲,可能形成消息不會當即送達!tcp

發佈:

你經過向本地的中介者發送DistributedPubSubMediator.Publish消息來發布消息。
當actor終止時,它們會自動從註冊表移除,或者你能夠明確的使用DistributedPubSubMediator.Unsubscribe移除。ide

 

實現示例

package pubsub

import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import org.slf4j.LoggerFactory
import scala.PartialFunction
import scala.runtime.BoxedUnit
import akka.cluster.pubsub.DistributedPubSubMediator
import akka.actor.Nobody.tell
import akka.actor.Props
import java.time.Clock.system
import akka.cluster.pubsub.DistributedPubSub
import akka.actor.Nobody.tell
import com.typesafe.config.ConfigFactory


/**
 * Created by: tankx
 * Date: 2019/7/16
 * Description: 發佈訂閱模式
 */
/**
 * 定義發佈者
 */
class Pub() : AbstractActor() {

    private var log = LoggerFactory.getLogger(Pub::class.java)

    var mediator: ActorRef = DistributedPubSub.get(context.system).mediator()

    override fun createReceive(): Receive {
        return receiveBuilder().matchAny(this::receive).build()
    }

    private fun receive(msg: Any) {

        log.info("派發事件:$msg")
        if (msg is String) {
            mediator.tell(DistributedPubSubMediator.Publish(topA, msg), self)
        }


    }


}

/**
 * 定義訂閱者
 */
class Sub() : AbstractActor() {

    private var log = LoggerFactory.getLogger(Sub::class.java)

    override fun preStart() {
        //註冊訂閱
        var mediator = DistributedPubSub.get(getContext().system()).mediator()
        mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), self)

        println("註冊訂閱")
        //ActorRef.noSender()不會接收訂閱信息DistributedPubSubMediator.SubscribeAck
        //mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), ActorRef.noSender())

        //移除訂閱
        //DistributedPubSub.get(getContext().system()).mediator().tell(DistributedPubSubMediator.Unsubscribe(topA, self), ActorRef.noSender())
    }


    override fun createReceive(): Receive {
        return receiveBuilder().matchAny(this::receive).build()
    }

    private fun receive(msg: Any) {

        when (msg) {
            is String -> log.info("收到事件: $msg")
            is DistributedPubSubMediator.SubscribeAck -> log.info("訂閱事件:$msg")
            else -> log.info("無對應類型")
        }


    }


}

//定義主題
var topA: String = "topa"

fun getSystem(port: Int): ActorSystem {

    val config = ConfigFactory.parseString(
        "akka.remote.netty.tcp.port=$port"
    ).withFallback(
        ConfigFactory.load("application_pub.conf")
    )

    var actorSystem = ActorSystem.create("custerPubSystem", config);


    return actorSystem
}


fun main() {

    var system = getSystem(3660);

    var subActor = system.actorOf(Props.create(Sub::class.java))
    Thread.sleep(1000)//讓sub 徹底起來

//    var pubActor = system.actorOf(Props.create(Pub::class.java))
//    pubActor.tell("hello", ActorRef.noSender())
//
//    pubActor.tell("world", ActorRef.noSender())
//
//    Thread.sleep(3000)

}

 

上面訂閱啓動後,再啓動一個節點派發事件ui

package pubsub

import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator

/**
 * Created by: tankx
 * Date: 2019/7/18
 * Description:
 */
fun main() {

    var system = getSystem(3661);

    Thread.sleep(3000)

    var mediator: ActorRef = DistributedPubSub.get(system).mediator()
  
    for (i in 1..1000) {
        mediator.tell(DistributedPubSubMediator.Publish(topA, "消息XXXXXX"), ActorRef.noSender())

        Thread.sleep(2000)
    }

}

 

配置文件this

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  cluster {
    seed-nodes = [
      "akka.tcp://custerPubSystem@127.0.0.1:3660"
    ]
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

}

 

依賴JARspa

    compile("com.typesafe.akka:akka-actor_2.13:$akkaVersion")
    compile("com.typesafe.akka:akka-remote_2.13:$akkaVersion")
    compile("com.typesafe.akka:akka-cluster-tools_2.13:$akkaVersion")

 

結果:.net

2019-07-18 20:19:55.941 [custerPubSystem-akka.actor.default-dispatcher-4] INFO  pubsub.Sub 77- 收到事件: 消息XXXXXX
2019-07-18 20:19:55.942 [custerPubSystem-akka.actor.default-dispatcher-4] INFO  pubsub.Sub 77- 收到事件: 消息XXXXXX

 

結論:

AKKA 集羣中的發佈與訂閱在節點之間的Actor之間廣播消息,監聽本身關心的主題消息作相應邏輯,是很是方便與不少場景適用的scala

相關文章
相關標籤/搜索