akka-cluster對每一個節點的每種狀態變化都會在系統消息隊列裏發佈相關的事件。經過訂閱有關節點狀態變化的消息就能夠獲取每一個節點的狀態。這部分已經在以前關於akka-cluster的討論裏介紹過了。因爲akka-typed裏採用了新的消息交流協議,而系統消息的發佈和訂閱也算是消息交換,也受交流協議約束。因此想經過重寫之前示範的ClusterMemberStatus來了解一下akka-typed環境下節點狀態變化消息監聽的一些機制。java
咱們須要一個actor來訂閱系統發佈的節點狀態變化消息。這裏涉及到系統、actor兩端的信息交流。假設向系統訂閱是一種消息的發送,那麼獲得的節點狀態變化消息就是系統的response了。先看看actor裏的消息定義:node
object MonitorActor { sealed trait ClusterEvent private case class MemberStatus(event: MemberEvent) extends ClusterEvent private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx => val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus) val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus) Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent]) Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent]) ... }
首先,response 分爲 MemberEvent, ReachabilityEvent兩種。MonitorActor處理的消息類型是ClusterEvent。爲了處理系統返回的response類型,即MemberEvent,ReachabilityEvent,必須提供這兩種類型到ClusterEvent的轉換。經過ctx.messageAdapter登記MemberEvent -> MemberStatus, ReachabilityEvent -> ReachStatus兩種類型轉換機制使MonitorActor能夠接收到MemberStatus, ReachStatus兩種消息:app
object MonitorActor { sealed trait ClusterEvent private case class MemberStatus(event: MemberEvent) extends ClusterEvent private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx => val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus) val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus) Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent]) Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent]) Behaviors.receiveMessage { event =>
event match { case MemberStatus(status) => status match { case MemberJoined(member) => ctx.log.info("**************** Member joined: [{}] ***************", member.address) case MemberJoined(member) => ctx.log.info("**************** Member joined: [{}] ***************", member.address) case MemberUp(member) => ctx.log.info("**************** Member is Up: [{}] ***************", member.address) case MemberRemoved(member, previousStatus) => ctx.log.info("**************** Member is Removed: [{}] after {} ***************", member.address, previousStatus) case MemberLeft(member) => ctx.log.info("**************** Member left: [{}] ***************", member.address) case MemberExited(member) => ctx.log.info("**************** Member exited: [{}] ***************", member.address) case _: MemberEvent => // ignore
} case ReachStatus(status) => status match { case UnreachableMember(member) => ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member) case ReachableMember(member) => ctx.log.info("**************** Member back to reachable: [{}] ***************", member) } } Behaviors.same } } }
還須要一個actor, 什麼都不幹。存粹構建一個MonitorActor:ide
object RootActor { def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx => ctx.spawn(MonitorActor(),"listner") Behaviors.empty } }
好了,看看main是怎麼實現的吧:測試
object ClusterMemberStatus { import com.typesafe.config.ConfigFactory def main(args: Array[String]): Unit = { val ports =
if (args.isEmpty) Seq(25251, 25252, 0) else args.toSeq.map(_.toInt) ports.foreach { port => startup(port) } } def startup(port: Int): Unit = { val config = ConfigFactory.parseString(s""" akka.remote.artery.canonical.port=$port """).withFallback(ConfigFactory.load("cluster.conf"))
ActorSystem[Nothing](RootActor(),"ClusterSystem",config) } }
下面是測試結果顯示:ui
22:14:52.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] *************** 22:14:52.810 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:51081] - Received InitJoinAck message from [Actor[akka://ClusterSystem@127.0.0.1:25251/system/cluster/core/daemon#313431252]] to [akka://ClusterSystem@127.0.0.1:51081] 22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Node [akka://ClusterSystem@127.0.0.1:51081] is JOINING, roles [dc-default] 22:14:52.825 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:51081] *************** 22:14:52.829 [ClusterSystem-akka.actor.internal-dispatcher-7] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25251] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:51081,567025403336682144)] 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:51081] - Welcome from [akka://ClusterSystem@127.0.0.1:25251] 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] *************** 22:14:52.858 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:51081] *************** 22:14:52.858 [ClusterSystem-akka.actor.internal-dispatcher-13] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:51081] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25251,6076326462170320177)] 22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Leader is moving node [akka://ClusterSystem@127.0.0.1:51081] to [Up] 22:14:53.044 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] *************** 22:14:53.679 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] *************** 22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Received InitJoin message from [Actor[akka://ClusterSystem@127.0.0.1:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] to [akka://ClusterSystem@127.0.0.1:25251] 22:14:57.707 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Sending InitJoinAck message from node [akka://ClusterSystem@127.0.0.1:25251] to [Actor[akka://ClusterSystem@127.0.0.1:25252/system/cluster/core/daemon/joinSeedNodeProcess-1#1472023843]] (version [2.6.5]) 22:14:57.732 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25252] - Received InitJoinAck message from [Actor[akka://ClusterSystem@127.0.0.1:25251/system/cluster/core/daemon#313431252]] to [akka://ClusterSystem@127.0.0.1:25252] 22:14:57.734 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Node [akka://ClusterSystem@127.0.0.1:25252] is JOINING, roles [dc-default] 22:14:57.735 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] *************** 22:14:57.735 [ClusterSystem-akka.actor.internal-dispatcher-26] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25251] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25252,-6913064885699273532)] 22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25252] - Welcome from [akka://ClusterSystem@127.0.0.1:25251] 22:14:57.737 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25251] *************** 22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25252] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25251,6076326462170320177)] 22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] *************** 22:14:57.738 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:51081] *************** 22:14:57.738 [ClusterSystem-akka.actor.internal-dispatcher-30] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:25252] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:51081,567025403336682144)] 22:14:57.740 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member joined: [akka://ClusterSystem@127.0.0.1:25252] *************** 22:14:57.740 [ClusterSystem-akka.actor.internal-dispatcher-16] DEBUG akka.cluster.typed.internal.receptionist.ClusterReceptionist - ClusterReceptionist [akka://ClusterSystem@127.0.0.1:51081] - Node added [UniqueAddress(akka://ClusterSystem@127.0.0.1:25252,-6913064885699273532)] 22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO akka.cluster.Cluster - Cluster Node [akka://ClusterSystem@127.0.0.1:25251] - Leader is moving node [akka://ClusterSystem@127.0.0.1:25252] to [Up] 22:14:58.134 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] *************** 22:14:58.755 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] *************** 22:14:59.146 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.MonitorActor$ - **************** Member is Up: [akka://ClusterSystem@127.0.0.1:25252] ***************
下面是本次示範的所有源代碼:spa
build.sbtscala
name := "learn-akka-typed" version := "0.1" scalaVersion := "2.13.1" scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint") javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation") val AkkaVersion = "2.6.5" val AkkaPersistenceCassandraVersion = "1.0.0" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" )
cluster.confcode
akka { actor { provider = cluster serialization-bindings { "com.learn.akka.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:25251", "akka://ClusterSystem@127.0.0.1:25252"] } }
ClusterMemberStatus.scalablog
package com.learn.akka import akka.actor.typed._ import akka.actor.typed.scaladsl.Behaviors import akka.cluster.ClusterEvent._ import akka.cluster.typed.Subscribe import akka.cluster.typed.Cluster import akka.actor.typed.ActorSystem object MonitorActor { sealed trait ClusterEvent private case class MemberStatus(event: MemberEvent) extends ClusterEvent private case class ReachStatus(event: ReachabilityEvent) extends ClusterEvent def apply(): Behavior[ClusterEvent] = Behaviors.setup[ClusterEvent] { ctx => val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberStatus) val reachEventAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(ReachStatus) Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter,classOf[MemberEvent]) Cluster(ctx.system).subscriptions ! Subscribe(reachEventAdapter,classOf[ReachabilityEvent]) Behaviors.receiveMessage { event => event match { case MemberStatus(status) => status match { case MemberJoined(member) => ctx.log.info("**************** Member joined: [{}] ***************", member.address) case MemberJoined(member) => ctx.log.info("**************** Member joined: [{}] ***************", member.address) case MemberUp(member) => ctx.log.info("**************** Member is Up: [{}] ***************", member.address) case MemberRemoved(member, previousStatus) => ctx.log.info("**************** Member is Removed: [{}] after {} ***************", member.address, previousStatus) case MemberLeft(member) => ctx.log.info("**************** Member left: [{}] ***************", member.address) case MemberExited(member) => ctx.log.info("**************** Member exited: [{}] ***************", member.address) case _: MemberEvent => // ignore } case ReachStatus(status) => status match { case UnreachableMember(member) => ctx.log.info("**************** Member detected as unreachable: [{}] ***************", member) case ReachableMember(member) => ctx.log.info("**************** Member back to reachable: [{}] ***************", member) } } Behaviors.same } } } object RootActor { def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] {ctx => ctx.spawn(MonitorActor(),"listner") Behaviors.empty } } object ClusterMemberStatus { import com.typesafe.config.ConfigFactory def main(args: Array[String]): Unit = { val ports = if (args.isEmpty) Seq(25251, 25252, 0) else args.toSeq.map(_.toInt) ports.foreach { port => startup(port) } } def startup(port: Int): Unit = { val config = ConfigFactory.parseString(s""" akka.remote.artery.canonical.port=$port """).withFallback(ConfigFactory.load("cluster.conf")) ActorSystem[Nothing](RootActor(),"ClusterSystem",config) } }