/** * INTERNAL API * * The `ClusterActorRefProvider` will load the [[akka.cluster.Cluster]] * extension, i.e. the cluster will automatically be started when * the `ClusterActorRefProvider` is used. */ private[akka] class ClusterActorRefProvider( _systemName: String, _settings: ActorSystem.Settings, _eventStream: EventStream, _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( _systemName, _settings, _eventStream, _dynamicAccess) { override def init(system: ActorSystemImpl): Unit = { super.init(system) // initialize/load the Cluster extension Cluster(system) } override protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = { // make sure Cluster extension is initialized/loaded from init thread Cluster(system) import remoteSettings._ val failureDetector = createRemoteWatcherFailureDetector(system) system.systemActorOf(ClusterRemoteWatcher.props( failureDetector, heartbeatInterval = WatchHeartBeatInterval, unreachableReaperInterval = WatchUnreachableReaperInterval, heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher") } /** * Factory method to make it possible to override deployer in subclass * Creates a new instance every time */ override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess) }
/** * Cluster Extension Id and factory for creating Cluster extension. */ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def get(system: ActorSystem): Cluster = super.get(system) override def lookup = Cluster override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) /** * INTERNAL API */ private[cluster] final val isAssertInvariantsEnabled: Boolean = System.getProperty("akka.cluster.assert", "off").toLowerCase match { case "on" | "true" ⇒ true case _ ⇒ false } }
/** * This module is responsible cluster membership information. Changes to the cluster * information is retrieved through [[#subscribe]]. Commands to operate the cluster is * available through methods in this class, such as [[#join]], [[#down]] and [[#leave]]. * * Each cluster [[Member]] is identified by its [[akka.actor.Address]], and * the cluster address of this actor system is [[#selfAddress]]. A member also has a status; * initially [[MemberStatus]] `Joining` followed by [[MemberStatus]] `Up`. */ class Cluster(val system: ExtendedActorSystem) extends Extension
class Cluster(val system: ExtendedActorSystem) extends Extension { logInfo("Starting up...") system.registerOnTermination(shutdown()) if (JmxEnabled) clusterJmx = { val jmx = new ClusterJmx(this, log) jmx.createMBean() Some(jmx) } logInfo("Started up successfully") }
To enable cluster capabilities in your Akka project you should, at a minimum, add the Remoting settings, but with
akka.cluster.seed-nodes
should normally also be added to your application.conf
akka { actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "" port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@", "akka.tcp://ClusterSystem@"] } }
class SimpleClusterListener extends Actor with ActorLogging { val cluster = Cluster(context.system) // subscribe to cluster changes, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) ⇒ log.info("Member is Up: {}", member.address) case UnreachableMember(member) ⇒ log.info("Member detected as unreachable: {}", member) case MemberRemoved(member, previousStatus) ⇒ log.info( "Member is Removed: {} after {}", member.address, previousStatus) case _: MemberEvent ⇒ // ignore } }
/** * Subscribe to one or more cluster domain events. * The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] * or subclasses. * * If `initialStateMode` is `ClusterEvent.InitialStateAsEvents` the events corresponding * to the current state will be sent to the subscriber to mimic what you would * have seen if you were listening to the events when they occurred in the past. * * If `initialStateMode` is `ClusterEvent.InitialStateAsSnapshot` a snapshot of * [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the * first message. * * Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`. */ @varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit = { require(to.length > 0, "at least one `ClusterDomainEvent` class is required") require( to.forall(classOf[ClusterDomainEvent].isAssignableFrom), s"subscribe to `akka.cluster.ClusterEvent.ClusterDomainEvent` or subclasses, was [${to.map(_.getName).mkString(", ")}]") clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet) }
private[cluster] val clusterCore: ActorRef = { implicit val timeout = system.settings.CreationTimeout try { Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } catch { case NonFatal(e) ⇒ log.error(e, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.") shutdown() // don't re-throw, that would cause the extension to be re-recreated // from shutdown() or other places, which may result in // InvalidActorNameException: actor name [cluster] is not unique system.deadLetters } }
// create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { system.systemActorOf(Props(classOf[ClusterDaemon], settings, joinConfigCompatChecker). withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster") }
/** * INTERNAL API. * * Supervisor managing the different Cluster daemons. */ @InternalApi private[cluster] final class ClusterDaemon(settings: ClusterSettings, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ // Important - don't use Cluster(context.system) in constructor because that would // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. // Child actors are therefore created when GetClusterCoreRef is received var coreSupervisor: Option[ActorRef] = None val clusterShutdown = Promise[Done]() val coordShutdown = CoordinatedShutdown(context.system) coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") { val sys = context.system () ⇒ if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down) Future.successful(Done) else { implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave)) self.ask(CoordinatedShutdownLeave.LeaveReq).mapTo[Done] } } coordShutdown.addTask(CoordinatedShutdown.PhaseClusterShutdown, "wait-shutdown") { () ⇒ clusterShutdown.future } override def postStop(): Unit = { clusterShutdown.trySuccess(Done) if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) { // if it was stopped due to leaving CoordinatedShutdown was started earlier coordShutdown.run(CoordinatedShutdown.ClusterDowningReason) } } def createChildren(): Unit = { coreSupervisor = Some(context.actorOf(Props(classOf[ClusterCoreSupervisor], joinConfigCompatChecker). withDispatcher(context.props.dispatcher), name = "core")) context.actorOf(Props[ClusterHeartbeatReceiver]. withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") } def receive = { case msg: GetClusterCoreRef.type ⇒ if (coreSupervisor.isEmpty) createChildren() coreSupervisor.foreach(_ forward msg) case AddOnMemberUpListener(code) ⇒ context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local)) case AddOnMemberRemovedListener(code) ⇒ context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local)) case CoordinatedShutdownLeave.LeaveReq ⇒ val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher)) // forward the ask request ref.forward(CoordinatedShutdownLeave.LeaveReq) } }
/** * INTERNAL API. * * ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state * would be obsolete. Shutdown the member if any those actors crashed. */ @InternalApi private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { // Important - don't use Cluster(context.system) in constructor because that would // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. // Child actors are therefore created when GetClusterCoreRef is received var coreDaemon: Option[ActorRef] = None def createChildren(): Unit = { val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. withDispatcher(context.props.dispatcher), name = "publisher") coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher, joinConfigCompatChecker). withDispatcher(context.props.dispatcher), name = "daemon"))) } override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage) self ! PoisonPill Stop } override def postStop(): Unit = Cluster(context.system).shutdown() def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ if (coreDaemon.isEmpty) createChildren() coreDaemon.foreach(sender() ! _) } }
override def preStart(): Unit = { context.system.eventStream.subscribe(self, classOf[QuarantinedEvent]) cluster.downingProvider.downingActorProps.foreach { props ⇒ val propsWithDispatcher = if (props.dispatcher == Deploy.NoDispatcherGiven) props.withDispatcher(context.props.dispatcher) else props context.actorOf(propsWithDispatcher, name = "downingProvider") } if (seedNodes.isEmpty) logInfo("No seed-nodes configured, manual cluster join required") else self ! JoinSeedNodes(seedNodes) }
val SeedNodes: immutable.IndexedSeq[Address] = immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(address) ⇒ address }.toVector
def uninitialized: Actor.Receive = ({ case InitJoin ⇒ logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender()) sender() ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ join(address) case JoinSeedNodes(newSeedNodes) ⇒ resetJoinSeedNodesDeadline() joinSeedNodes(newSeedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case Welcome(from, gossip) ⇒ welcome(from.address, from, gossip) case _: Tick ⇒ if (joinSeedNodesDeadline.exists(_.isOverdue)) joinSeedNodesWasUnsuccessful() }: Actor.Receive).orElse(receiveExitingCompleted)
# The joining of given seed nodes will by default be retried indefinitely until # a successful join. That process can be aborted if unsuccessful by defining this # timeout. When aborted it will run CoordinatedShutdown, which by default will # terminate the ActorSystem. CoordinatedShutdown can also be configured to exit # the JVM. It is useful to define this timeout if the seed-nodes are assembled # dynamically and a restart with new seed-nodes should be tried after unsuccessful # attempts. shutdown-after-unsuccessful-join-seed-nodes = off
def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = { if (newSeedNodes.nonEmpty) { stopSeedNodeProcess() seedNodes = newSeedNodes // keep them for retry seedNodeProcess = if (newSeedNodes == immutable.IndexedSeq(selfAddress)) { self ! ClusterUserAction.JoinTo(selfAddress) None } else { // use unique name of this actor, stopSeedNodeProcess doesn't wait for termination seedNodeProcessCounter += 1 if (newSeedNodes.head == selfAddress) { Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], newSeedNodes, joinConfigCompatChecker). withDispatcher(UseDispatcher), name = "firstSeedNodeProcess-" + seedNodeProcessCounter)) } else { Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], newSeedNodes, joinConfigCompatChecker). withDispatcher(UseDispatcher), name = "joinSeedNodeProcess-" + seedNodeProcessCounter)) } } } }
/** * Try to join this cluster node with the node specified by `address`. * It's only allowed to join from an empty state, i.e. when not already a member. * A `Join(selfUniqueAddress)` command is sent to the node to join, * which will reply with a `Welcome` message. */ def join(address: Address): Unit = { if (address.protocol != selfAddress.protocol) log.warning( "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, address.protocol) else if (address.system != selfAddress.system) log.warning( "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", selfAddress.system, address.system) else { require(latestGossip.members.isEmpty, "Join can only be done from empty state") // to support manual join when joining to seed nodes is stuck (no seed nodes available) stopSeedNodeProcess() if (address == selfAddress) { becomeInitialized() joining(selfUniqueAddress, cluster.selfRoles) } else { val joinDeadline = RetryUnsuccessfulJoinAfter match { case d: FiniteDuration ⇒ Some(Deadline.now + d) case _ ⇒ None } context.become(tryingToJoin(address, joinDeadline)) clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles) } } }
因爲就是當前節點,因此會執行becomeInitialized()、joining(selfUniqueAddress, cluster.selfRoles)兩個函數。
def becomeInitialized(): Unit = { // start heartbeatSender here, and not in constructor to make sure that // heartbeating doesn't start before Welcome is received val internalHeartbeatSenderProps = Props(new ClusterHeartbeatSender()).withDispatcher(UseDispatcher) context.actorOf(internalHeartbeatSenderProps, name = "heartbeatSender") val externalHeartbeatProps = Props(new CrossDcHeartbeatSender()).withDispatcher(UseDispatcher) context.actorOf(externalHeartbeatProps, name = "crossDcHeartbeatSender") // make sure that join process is stopped stopSeedNodeProcess() joinSeedNodesDeadline = None context.become(initialized) }
// check by address without uid to make sure that node with same host:port is not allowed // to join until previous node with that host:port has been removed from the cluster localMembers.find(_.address == joiningNode.address) match { case Some(m) if m.uniqueAddress == joiningNode ⇒ // node retried join attempt, probably due to lost Welcome message logInfo("Existing member [{}] is joining again.", m) if (joiningNode != selfUniqueAddress) sender() ! Welcome(selfUniqueAddress, latestGossip) case Some(m) ⇒ // node restarted, same host:port as existing member, but with different uid // safe to down and later remove existing member // new node will retry join logInfo("New incarnation of existing member [{}] is trying to join. " + "Existing will be removed from the cluster and then new member will be allowed to join.", m) if (m.status != Down) { // we can confirm it as terminated/unreachable immediately val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress) val newOverview = latestGossip.overview.copy(reachability = newReachability) val newGossip = latestGossip.copy(overview = newOverview) updateLatestGossip(newGossip) downing(m.address) } case None ⇒ // remove the node from the failure detector failureDetector.remove(joiningNode.address) crossDcFailureDetector.remove(joiningNode.address) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers + Member(joiningNode, roles) + Member(selfUniqueAddress, cluster.selfRoles) val newGossip = latestGossip copy (members = newMembers) updateLatestGossip(newGossip) logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", ")) if (joiningNode == selfUniqueAddress) { if (localMembers.isEmpty) leaderActions() // important for deterministic oldest when bootstrapping } else sender() ! Welcome(selfUniqueAddress, latestGossip) publishMembershipState() }
因爲joining源碼太長,只貼上面部分代碼,很顯然會命中case None,第一次加入集羣嗎,確定是None。updateLatestGossip這個函數再也不分析,它應該是用來更新與Gossip協議相關的狀態的。因爲joiningNode == selfUniqueAddress且localMembers是空,由於此時尚未成員,因此會執行leaderActions()。leaderActions方法暫時也不分析,它應該就是計算當前集羣leader的,很顯然當前種子節點就是leader。
先來看FirstSeedNodeProcess,在主構造函數中有一段代碼:self ! JoinSeedNode。也就是給本身發送了一個JoinSeedNode消息。
case InitJoinNack(address) ⇒ logInfo("Received InitJoinNack message from [{}] to [{}]", sender(), selfAddress) remainingSeedNodes -= address if (remainingSeedNodes.isEmpty) { // initialize new cluster by joining myself when nacks from all other seed nodes context.parent ! JoinTo(selfAddress) context.stop(self) }
對於FirstSeedNodeProcess,收到全部其餘節點的InitJoinNack消息後會給parent也就是ClusterCoreDaemon發送JoinTo(selfAddress)消息,而後stop掉本身。若是Deadline.now + cluster.settings.SeedNodeTimeout指定的時間內沒有收到其餘節點的InitJoinNack消息,一樣給ClusterCoreDaemon發送JoinTo(selfAddress)消息,而後stop掉本身。
case InitJoinNack(_) ⇒ // that seed was uninitialized case ReceiveTimeout ⇒ if (attempt >= 2) log.warning( "Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]", attempt, seedNodes.filterNot(_ == selfAddress).mkString(", ")) // no InitJoinAck received, try again self ! JoinSeedNode
def initialized: Actor.Receive = ({ case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipStatus ⇒ receiveGossipStatus(msg) case GossipTick ⇒ gossipTick() case GossipSpeedupTick ⇒ gossipSpeedupTick() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() case InitJoin(joiningNodeConfig) ⇒ logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress) initJoin(joiningNodeConfig) case Join(node, roles) ⇒ joining(node, roles) case ClusterUserAction.Down(address) ⇒ downing(address) case ClusterUserAction.Leave(address) ⇒ leaving(address) case SendGossipTo(address) ⇒ sendGossipTo(address) case msg: SubscriptionMessage ⇒ publisher forward msg case QuarantinedEvent(address, uid) ⇒ quarantined(UniqueAddress(address, uid)) case ClusterUserAction.JoinTo(address) ⇒ logInfo("Trying to join [{}] when already part of a cluster, ignoring", address) case JoinSeedNodes(nodes) ⇒ logInfo( "Trying to join seed nodes [{}] when already part of a cluster, ignoring", nodes.mkString(", ")) case ExitingConfirmed(address) ⇒ receiveExitingConfirmed(address) }: Actor.Receive).orElse(receiveExitingCompleted)
def initJoin(joiningNodeConfig: Config): Unit = { val selfStatus = latestGossip.member(selfUniqueAddress).status if (removeUnreachableWithMemberStatus.contains(selfStatus)) { // prevents a Down and Exiting node from being used for joining logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender()) sender() ! InitJoinNack(selfAddress) } else { logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender()) // run config compatibility check using config provided by // joining node and current (full) config on cluster side val configWithoutSensitiveKeys = { val allowedConfigPaths = JoinConfigCompatChecker.removeSensitiveKeys(context.system.settings.config, cluster.settings) // build a stripped down config instead where sensitive config paths are removed // we don't want any check to happen on those keys JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, context.system.settings.config) } joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match { case Valid ⇒ val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings) // Send back to joining node a subset of current configuration // containing the keys initially sent by the joining node minus // any sensitive keys as defined by this node configuration val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config) sender() ! InitJoinAck(selfAddress, CompatibleConfig(clusterConfig)) case Invalid(messages) ⇒ // messages are only logged on the cluster side log.warning("Found incompatible settings when [{}] tried to join: {}", sender().path.address, messages.mkString(", ")) sender() ! InitJoinAck(selfAddress, IncompatibleConfig) } } }
val joinDeadline = RetryUnsuccessfulJoinAfter match { case d: FiniteDuration ⇒ Some(Deadline.now + d) case _ ⇒ None } context.become(tryingToJoin(address, joinDeadline)) clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles)
case Join(node, roles) ⇒ joining(node, roles)
if (joiningNode == selfUniqueAddress) { if (localMembers.isEmpty) leaderActions() // important for deterministic oldest when bootstrapping } else sender() ! Welcome(selfUniqueAddress, latestGossip)
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({ case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip) case InitJoin ⇒ logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender()) sender() ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ becomeUninitialized() join(address) case JoinSeedNodes(newSeedNodes) ⇒ resetJoinSeedNodesDeadline() becomeUninitialized() joinSeedNodes(newSeedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ if (joinSeedNodesDeadline.exists(_.isOverdue)) joinSeedNodesWasUnsuccessful() else if (deadline.exists(_.isOverdue)) { // join attempt failed, retry becomeUninitialized() if (seedNodes.nonEmpty) joinSeedNodes(seedNodes) else join(joinWith) } }: Actor.Receive).orElse(receiveExitingCompleted)
** * Accept reply from Join request. */ def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = { require(latestGossip.members.isEmpty, "Join can only be done from empty state") if (joinWith != from.address) logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith) else { membershipState = membershipState.copy(latestGossip = gossip).seen() logInfo("Welcome from [{}]", from.address) assertLatestGossip() publishMembershipState() if (from != selfUniqueAddress) gossipTo(from, sender()) becomeInitialized() } }
case msg: SubscriptionMessage ⇒ publisher forward msg
def receive = { case PublishChanges(newState) ⇒ publishChanges(newState) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) case SendCurrentClusterState(receiver) ⇒ sendCurrentClusterState(receiver) case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to) case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishEvent(event) ⇒ publish(event) }
val cluster = Cluster(system) val list: List[Address] = ??? //your method to dynamically get seed nodes cluster.joinSeedNodes(list)
/** * Join the specified seed nodes without defining them in config. * Especially useful from tests when Addresses are unknown before startup time. * * An actor system can only join a cluster once. Additional attempts will be ignored. * When it has successfully joined it must be restarted to be able to join another * cluster or to join the same cluster again. */ def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit = clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
