在單機環境下訂閱與發佈是很經常使用的,然而在集羣環境是比較麻煩和很差實現的;java
AKKA已經提供了相應的實現,集羣環境各節點之間的actor相互訂閱發佈感興的主題的消息,node
關鍵依賴媒介actor: akka.cluster.pubsub.DistributedPubSubMediatorapp
DistributedPubSubMediator.Subscribe方法將actor註冊到本地中介者。
成功的訂閱和取消訂閱由DistributedPubSubMediator.SubscribeAck和DistributedPubSubMediator.UnsubscribeAck應答確認。這個確認消息意味着訂閱已經註冊了,可是它仍然須要花費一些時間複製到其它的節點上。節點之間發現與註冊會有必定延遲,可能形成消息不會當即送達!tcp
你經過向本地的中介者發送DistributedPubSubMediator.Publish消息來發布消息。
當actor終止時,它們會自動從註冊表移除,或者你能夠明確的使用DistributedPubSubMediator.Unsubscribe移除。ide
package pubsub import akka.actor.AbstractActor import akka.actor.ActorRef import akka.actor.ActorSystem import org.slf4j.LoggerFactory import scala.PartialFunction import scala.runtime.BoxedUnit import akka.cluster.pubsub.DistributedPubSubMediator import akka.actor.Nobody.tell import akka.actor.Props import java.time.Clock.system import akka.cluster.pubsub.DistributedPubSub import akka.actor.Nobody.tell import com.typesafe.config.ConfigFactory /** * Created by: tankx * Date: 2019/7/16 * Description: 發佈訂閱模式 */ /** * 定義發佈者 */ class Pub() : AbstractActor() { private var log = LoggerFactory.getLogger(Pub::class.java) var mediator: ActorRef = DistributedPubSub.get(context.system).mediator() override fun createReceive(): Receive { return receiveBuilder().matchAny(this::receive).build() } private fun receive(msg: Any) { log.info("派發事件:$msg") if (msg is String) { mediator.tell(DistributedPubSubMediator.Publish(topA, msg), self) } } } /** * 定義訂閱者 */ class Sub() : AbstractActor() { private var log = LoggerFactory.getLogger(Sub::class.java) override fun preStart() { //註冊訂閱 var mediator = DistributedPubSub.get(getContext().system()).mediator() mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), self) println("註冊訂閱") //ActorRef.noSender()不會接收訂閱信息DistributedPubSubMediator.SubscribeAck //mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), ActorRef.noSender()) //移除訂閱 //DistributedPubSub.get(getContext().system()).mediator().tell(DistributedPubSubMediator.Unsubscribe(topA, self), ActorRef.noSender()) } override fun createReceive(): Receive { return receiveBuilder().matchAny(this::receive).build() } private fun receive(msg: Any) { when (msg) { is String -> log.info("收到事件: $msg") is DistributedPubSubMediator.SubscribeAck -> log.info("訂閱事件:$msg") else -> log.info("無對應類型") } } } //定義主題 var topA: String = "topa" fun getSystem(port: Int): ActorSystem { val config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=$port" ).withFallback( ConfigFactory.load("application_pub.conf") ) var actorSystem = ActorSystem.create("custerPubSystem", config); return actorSystem } fun main() { var system = getSystem(3660); var subActor = system.actorOf(Props.create(Sub::class.java)) Thread.sleep(1000)//讓sub 徹底起來 // var pubActor = system.actorOf(Props.create(Pub::class.java)) // pubActor.tell("hello", ActorRef.noSender()) // // pubActor.tell("world", ActorRef.noSender()) // // Thread.sleep(3000) }
上面訂閱啓動後,再啓動一個節點派發事件ui
package pubsub import akka.actor.ActorRef import akka.actor.Props import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator /** * Created by: tankx * Date: 2019/7/18 * Description: */ fun main() { var system = getSystem(3661); Thread.sleep(3000) var mediator: ActorRef = DistributedPubSub.get(system).mediator() for (i in 1..1000) { mediator.tell(DistributedPubSubMediator.Publish(topA, "消息XXXXXX"), ActorRef.noSender()) Thread.sleep(2000) } }
配置文件this
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } cluster { seed-nodes = [ "akka.tcp://custerPubSystem@127.0.0.1:3660" ] } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } }
依賴JARspa
compile("com.typesafe.akka:akka-actor_2.13:$akkaVersion") compile("com.typesafe.akka:akka-remote_2.13:$akkaVersion") compile("com.typesafe.akka:akka-cluster-tools_2.13:$akkaVersion")
結果:.net
2019-07-18 20:19:55.941 [custerPubSystem-akka.actor.default-dispatcher-4] INFO pubsub.Sub 77- 收到事件: 消息XXXXXX 2019-07-18 20:19:55.942 [custerPubSystem-akka.actor.default-dispatcher-4] INFO pubsub.Sub 77- 收到事件: 消息XXXXXX
AKKA 集羣中的發佈與訂閱在節點之間的Actor之間廣播消息,監聽本身關心的主題消息作相應邏輯,是很是方便與不少場景適用的scala