Akka源碼分析-Cluster-ActorSystem

  前面幾篇博客,咱們依次介紹了local和remote的一些內容,其實再分析cluster就會簡單不少,後面關於cluster的源碼分析,可以省略的地方,就再也不貼源碼而是一句話帶過了,若是有不理解的地方,但願多翻翻以前的博客。html

  在使用cluster時,配置文件中的akka.actor.provider值是cluster,因此ActorSystem對應的provider就是akka.cluster.ClusterActorRefProvidernode

/**
 * INTERNAL API
 *
 * The `ClusterActorRefProvider` will load the [[akka.cluster.Cluster]]
 * extension, i.e. the cluster will automatically be started when
 * the `ClusterActorRefProvider` is used.
 */
private[akka] class ClusterActorRefProvider(
  _systemName:    String,
  _settings:      ActorSystem.Settings,
  _eventStream:   EventStream,
  _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider(
  _systemName, _settings, _eventStream, _dynamicAccess) {

  override def init(system: ActorSystemImpl): Unit = {
    super.init(system)

    // initialize/load the Cluster extension
    Cluster(system)
  }

  override protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = {
    // make sure Cluster extension is initialized/loaded from init thread
    Cluster(system)

    import remoteSettings._
    val failureDetector = createRemoteWatcherFailureDetector(system)
    system.systemActorOf(ClusterRemoteWatcher.props(
      failureDetector,
      heartbeatInterval = WatchHeartBeatInterval,
      unreachableReaperInterval = WatchUnreachableReaperInterval,
      heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
  }

  /**
   * Factory method to make it possible to override deployer in subclass
   * Creates a new instance every time
   */
  override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)

}

   上面是ClusterActorRefProvider的源碼,就問你驚不驚喜意不意外,它就是簡單的繼承了RemoteActorRefProvider,而後重寫了三個函數,好像跟remote差很少啊。那它關於cluster的功能是如何實現的呢?在init中有一行代碼值得注意:Cluster(system)。Cluster是什麼呢?編程

/**
 * Cluster Extension Id and factory for creating Cluster extension.
 */
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
  override def get(system: ActorSystem): Cluster = super.get(system)

  override def lookup = Cluster

  override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)

  /**
   * INTERNAL API
   */
  private[cluster] final val isAssertInvariantsEnabled: Boolean =
    System.getProperty("akka.cluster.assert", "off").toLowerCase match {
      case "on" | "true" ⇒ true
      case _             ⇒ false
    }
}

   很顯然Cluster是Akka的一個擴展,Cluster(system)會建立一個Cluster類的實例,因爲Cluster類的源碼過多,此處只貼出了它的UML圖。bootstrap

  以前的博客分析過Akka的擴展機制,若是讀過該文章,就必定知道Cluster是如何初始化的。由於Cluster的createExtension就是new了一個Cluster,因此全部的初始化過程都在Cluster類的主構造函數中,主構造函數是啥?Cluster中除了字段定義、方法定義,其餘的代碼塊都是。其實Cluster的這種實現,我是很不喜歡的,畢竟把類的初始化過程和定義柔和到一塊,是不便於分析的,沒辦法,還得硬着頭皮上啊。app

/**
 * This module is responsible cluster membership information. Changes to the cluster
 * information is retrieved through [[#subscribe]]. Commands to operate the cluster is
 * available through methods in this class, such as [[#join]], [[#down]] and [[#leave]].
 *
 * Each cluster [[Member]] is identified by its [[akka.actor.Address]], and
 * the cluster address of this actor system is [[#selfAddress]]. A member also has a status;
 * initially [[MemberStatus]] `Joining` followed by [[MemberStatus]] `Up`.
 */
class Cluster(val system: ExtendedActorSystem) extends Extension 

   首先分析Cluster這個擴展的定義,官方註釋說的也很清楚,這個模塊就是負責集羣的成員信息的,固然還能夠對集羣進行操做,好比加入集羣、關閉集羣、離開集羣等。集羣中的每一個成員被它的Address惟一標誌,每一個成員也有一個狀態,及其對應的生命週期。dom

  下面咱們把Cluster主構造函數中全部的代碼塊摘錄出來。tcp

class Cluster(val system: ExtendedActorSystem) extends Extension {

  logInfo("Starting up...")

  system.registerOnTermination(shutdown())

  if (JmxEnabled)
    clusterJmx = {
      val jmx = new ClusterJmx(this, log)
      jmx.createMBean()
      Some(jmx)
    }

  logInfo("Started up successfully")

}

   摘出來以後就清晰多了,拋開日誌打印,一共兩段代碼,調用system.registerOnTermination註冊shutdown函數,JmxEnabled爲true就初始化clusterJmx。JmxEnabled默認就是true。其實這樣看來,cluster主構造函數也沒有太多的邏輯。固然了字段初始化的過程,也能夠認爲是主構造函數的一部分,字段代碼這裏就不一一分析,用到的時候再具體分析。ide

  分析到這裏,但願讀者有一點認識:cluster模式是基於remote模式建立的,它額外使用Cluster這個Extension來管理集羣的狀態、成員信息、成員生命週期等。下面會基於Cluster的demo來分析,cluster是如何實現成員管理等功能的。函數

To enable cluster capabilities in your Akka project you should, at a minimum, add the Remoting settings, but with cluster. The akka.cluster.seed-nodes should normally also be added to your application.conf file.源碼分析

  上面是官網關於cluster的說明,很簡單,除了cluster的相關配置,remote相關的配置也要設置(akka.actor.provider必須是cluster),這也說明cluster是基於remote的,還有一個akka.cluster.seed-nodes須要配置。seed-nodes比較容易理解,其實就是集羣的種子節點,經過種子節點就能夠加入到指定的集羣了。固然了加入集羣還能夠經過編程的方式實現,這裏不作介紹。

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]
  }
}

   上面是官網demo中的配置,能夠看出,很是簡單。

class SimpleClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart(): Unit = {
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent], classOf[UnreachableMember])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) ⇒
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) ⇒
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) ⇒
      log.info(
        "Member is Removed: {} after {}",
        member.address, previousStatus)
    case _: MemberEvent ⇒ // ignore
  }
}  

   上面是SimpleClusterListener的源碼,這是一個普通的actor,沒有任何多餘的繼承信息。固然它覆蓋了preStart方法,這個方法調用了cluster的subscribe方法。

/**
   * Subscribe to one or more cluster domain events.
   * The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
   * or subclasses.
   *
   * If `initialStateMode` is `ClusterEvent.InitialStateAsEvents` the events corresponding
   * to the current state will be sent to the subscriber to mimic what you would
   * have seen if you were listening to the events when they occurred in the past.
   *
   * If `initialStateMode` is `ClusterEvent.InitialStateAsSnapshot` a snapshot of
   * [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the
   * first message.
   *
   * Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`.
   */
  @varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit = {
    require(to.length > 0, "at least one `ClusterDomainEvent` class is required")
    require(
      to.forall(classOf[ClusterDomainEvent].isAssignableFrom),
      s"subscribe to `akka.cluster.ClusterEvent.ClusterDomainEvent` or subclasses, was [${to.map(_.getName).mkString(", ")}]")
    clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet)
  }

   很簡單,SimpleClusterListener這個actor在啓動的時候調用cluster相關函數,監聽對應的集羣事件,而subscribe方法只不過把監聽消息InternalClusterAction.Subscribe發送給了clusterCore這個ActorRef。

private[cluster] val clusterCore: ActorRef = {
    implicit val timeout = system.settings.CreationTimeout
    try {
      Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
    } catch {
      case NonFatal(e) ⇒
        log.error(e, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.")
        shutdown()
        // don't re-throw, that would cause the extension to be re-recreated
        // from shutdown() or other places, which may result in
        // InvalidActorNameException: actor name [cluster] is not unique
        system.deadLetters
    }
  }

   clusterCore是經過向clusterDaemons發送InternalClusterAction.GetClusterCoreRef消息獲取的。

// create supervisor for daemons under path "/system/cluster"
  private val clusterDaemons: ActorRef = {
    system.systemActorOf(Props(classOf[ClusterDaemon], settings, joinConfigCompatChecker).
      withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
  }

   clusterDaemons是一個ActorRef,對應的actor是ClusterDaemon,也就是說cluster在啓動時建立了一個actor,有該actor負責什麼呢?

/**
 * INTERNAL API.
 *
 * Supervisor managing the different Cluster daemons.
 */
@InternalApi
private[cluster] final class ClusterDaemon(settings: ClusterSettings, joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
  import InternalClusterAction._
  // Important - don't use Cluster(context.system) in constructor because that would
  // cause deadlock. The Cluster extension is currently being created and is waiting
  // for response from GetClusterCoreRef in its constructor.
  // Child actors are therefore created when GetClusterCoreRef is received
  var coreSupervisor: Option[ActorRef] = None

  val clusterShutdown = Promise[Done]()
  val coordShutdown = CoordinatedShutdown(context.system)
  coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") {
    val sys = context.system
    () ⇒
      if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down)
        Future.successful(Done)
      else {
        implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave))
        self.ask(CoordinatedShutdownLeave.LeaveReq).mapTo[Done]
      }
  }
  coordShutdown.addTask(CoordinatedShutdown.PhaseClusterShutdown, "wait-shutdown") { () ⇒
    clusterShutdown.future
  }

  override def postStop(): Unit = {
    clusterShutdown.trySuccess(Done)
    if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
      // if it was stopped due to leaving CoordinatedShutdown was started earlier
      coordShutdown.run(CoordinatedShutdown.ClusterDowningReason)
    }
  }

  def createChildren(): Unit = {
    coreSupervisor = Some(context.actorOf(Props(classOf[ClusterCoreSupervisor], joinConfigCompatChecker).
      withDispatcher(context.props.dispatcher), name = "core"))
    context.actorOf(Props[ClusterHeartbeatReceiver].
      withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
  }

  def receive = {
    case msg: GetClusterCoreRef.type ⇒
      if (coreSupervisor.isEmpty)
        createChildren()
      coreSupervisor.foreach(_ forward msg)
    case AddOnMemberUpListener(code) ⇒
      context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local))
    case AddOnMemberRemovedListener(code) ⇒
      context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local))
    case CoordinatedShutdownLeave.LeaveReq ⇒
      val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher))
      // forward the ask request
      ref.forward(CoordinatedShutdownLeave.LeaveReq)
  }

}

   官方註釋說這是一個監督者,用來管理各個集羣daemon實例。它收到GetClusterCoreRef後,會調用createChildren建立兩個actor:ClusterCoreSupervisor、ClusterHeartbeatReceiver。而後把GetClusterCoreRef消息轉發給他們。

/**
 * INTERNAL API.
 *
 * ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state
 * would be obsolete. Shutdown the member if any those actors crashed.
 */
@InternalApi
private[cluster] final class ClusterCoreSupervisor(joinConfigCompatChecker: JoinConfigCompatChecker) extends Actor with ActorLogging
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {

  // Important - don't use Cluster(context.system) in constructor because that would
  // cause deadlock. The Cluster extension is currently being created and is waiting
  // for response from GetClusterCoreRef in its constructor.
  // Child actors are therefore created when GetClusterCoreRef is received

  var coreDaemon: Option[ActorRef] = None

  def createChildren(): Unit = {
    val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
      withDispatcher(context.props.dispatcher), name = "publisher")
    coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher, joinConfigCompatChecker).
      withDispatcher(context.props.dispatcher), name = "daemon")))
  }

  override val supervisorStrategy =
    OneForOneStrategy() {
      case NonFatal(e) ⇒
        log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage)
        self ! PoisonPill
        Stop
    }

  override def postStop(): Unit = Cluster(context.system).shutdown()

  def receive = {
    case InternalClusterAction.GetClusterCoreRef ⇒
      if (coreDaemon.isEmpty)
        createChildren()
      coreDaemon.foreach(sender() ! _)
  }
}

   首先ClusterCoreSupervisor這個actor收到GetClusterCoreRef後,又經過createChildren建立了兩個actor:ClusterDomainEventPublisher、ClusterCoreDaemon。而後把ClusterCoreDaemon的ActorRef返回。

  ClusterHeartbeatReceiver源碼再也不分析,他就是一個心跳檢測的actor,功能簡單獨立。

  分析到這裏,咱們知道了Cluster類中的clusterCore實際上是一個ClusterCoreDaemon實例的ActorRef。ClusterCoreDaemon源碼太長,這裏就不粘貼了,讀者只須要知道它是用來處理全部與集羣有關的消息的就能夠了。但它的preStart函數須要咱們關注一下。

override def preStart(): Unit = {
    context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])

    cluster.downingProvider.downingActorProps.foreach { props ⇒
      val propsWithDispatcher =
        if (props.dispatcher == Deploy.NoDispatcherGiven) props.withDispatcher(context.props.dispatcher)
        else props

      context.actorOf(propsWithDispatcher, name = "downingProvider")
    }

    if (seedNodes.isEmpty)
      logInfo("No seed-nodes configured, manual cluster join required")
    else
      self ! JoinSeedNodes(seedNodes)
  }

   最後一個if語句比較關鍵,它首先判斷當前seedNode是否是爲空,爲空則打印一條日誌,僅作提醒;若是不爲空,則給本身發一個JoinSeedNodes消息。這條日誌也驗證了咱們能夠經過編程的方式加入某個集羣中。

  經過上下文分析,咱們找到了seedNodes的最終定義。

val SeedNodes: immutable.IndexedSeq[Address] =
    immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(address) ⇒ address }.toVector

   很明顯,SeedNodes就是一個Address列表。

def uninitialized: Actor.Receive = ({
    case InitJoin ⇒
      logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
      sender() ! InitJoinNack(selfAddress)
    case ClusterUserAction.JoinTo(address) ⇒
      join(address)
    case JoinSeedNodes(newSeedNodes) ⇒
      resetJoinSeedNodesDeadline()
      joinSeedNodes(newSeedNodes)
    case msg: SubscriptionMessage ⇒
      publisher forward msg
    case Welcome(from, gossip) ⇒
      welcome(from.address, from, gossip)
    case _: Tick ⇒
      if (joinSeedNodesDeadline.exists(_.isOverdue))
        joinSeedNodesWasUnsuccessful()
  }: Actor.Receive).orElse(receiveExitingCompleted)

   根據以上定義,可知收到JoinSeedNodes後調用了兩個函數:resetJoinSeedNodesDeadline、joinSeedNodes。resetJoinSeedNodesDeadline再也不具體分析,其意義可參考shutdown-after-unsuccessful-join-seed-nodes這個配置的說明。

 # The joining of given seed nodes will by default be retried indefinitely until
    # a successful join. That process can be aborted if unsuccessful by defining this
    # timeout. When aborted it will run CoordinatedShutdown, which by default will
    # terminate the ActorSystem. CoordinatedShutdown can also be configured to exit
    # the JVM. It is useful to define this timeout if the seed-nodes are assembled
    # dynamically and a restart with new seed-nodes should be tried after unsuccessful
    # attempts.   
    shutdown-after-unsuccessful-join-seed-nodes = off

   joinSeedNodes方法源碼以下。

def joinSeedNodes(newSeedNodes: immutable.IndexedSeq[Address]): Unit = {
    if (newSeedNodes.nonEmpty) {
      stopSeedNodeProcess()

      seedNodes = newSeedNodes // keep them for retry
      seedNodeProcess =
        if (newSeedNodes == immutable.IndexedSeq(selfAddress)) {
          self ! ClusterUserAction.JoinTo(selfAddress)
          None
        } else {
          // use unique name of this actor, stopSeedNodeProcess doesn't wait for termination
          seedNodeProcessCounter += 1
          if (newSeedNodes.head == selfAddress) {
            Some(context.actorOf(Props(classOf[FirstSeedNodeProcess], newSeedNodes, joinConfigCompatChecker).
              withDispatcher(UseDispatcher), name = "firstSeedNodeProcess-" + seedNodeProcessCounter))
          } else {
            Some(context.actorOf(Props(classOf[JoinSeedNodeProcess], newSeedNodes, joinConfigCompatChecker).
              withDispatcher(UseDispatcher), name = "joinSeedNodeProcess-" + seedNodeProcessCounter))
          }
        }
    }
  }

   上面代碼顯示,若是配置的種子節點只有一個,並且就是當前節點,則給self發一個ClusterUserAction.JoinTo消息;不然會判斷,配置種子節點的第一個address是否是當前地址,若是是則建立FirstSeedNodeProcess,不然建立JoinSeedNodeProcess。

  首先分析配置的種子節點只有一個,且就是當前節點的狀況,當前節點收到ClusterUserAction.JoinTo(selfAddress)消息後,會調用join函數。

 /**
   * Try to join this cluster node with the node specified by `address`.
   * It's only allowed to join from an empty state, i.e. when not already a member.
   * A `Join(selfUniqueAddress)` command is sent to the node to join,
   * which will reply with a `Welcome` message.
   */
  def join(address: Address): Unit = {
    if (address.protocol != selfAddress.protocol)
      log.warning(
        "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]",
        selfAddress.protocol, address.protocol)
    else if (address.system != selfAddress.system)
      log.warning(
        "Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]",
        selfAddress.system, address.system)
    else {
      require(latestGossip.members.isEmpty, "Join can only be done from empty state")

      // to support manual join when joining to seed nodes is stuck (no seed nodes available)
      stopSeedNodeProcess()

      if (address == selfAddress) {
        becomeInitialized()
        joining(selfUniqueAddress, cluster.selfRoles)
      } else {
        val joinDeadline = RetryUnsuccessfulJoinAfter match {
          case d: FiniteDuration ⇒ Some(Deadline.now + d)
          case _                 ⇒ None
        }
        context.become(tryingToJoin(address, joinDeadline))
        clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles)
      }
    }
  }

   因爲就是當前節點,因此會執行becomeInitialized()、joining(selfUniqueAddress, cluster.selfRoles)兩個函數。

def becomeInitialized(): Unit = {
    // start heartbeatSender here, and not in constructor to make sure that
    // heartbeating doesn't start before Welcome is received
    val internalHeartbeatSenderProps = Props(new ClusterHeartbeatSender()).withDispatcher(UseDispatcher)
    context.actorOf(internalHeartbeatSenderProps, name = "heartbeatSender")

    val externalHeartbeatProps = Props(new CrossDcHeartbeatSender()).withDispatcher(UseDispatcher)
    context.actorOf(externalHeartbeatProps, name = "crossDcHeartbeatSender")

    // make sure that join process is stopped
    stopSeedNodeProcess()
    joinSeedNodesDeadline = None
    context.become(initialized)
  }

   becomeInitialized這個函數建立了兩個心跳發送器,一個是集羣內,一個是跨數據中心的,最後修改當前行爲函數爲initialized,意味着初始化完成。

      // check by address without uid to make sure that node with same host:port is not allowed
      // to join until previous node with that host:port has been removed from the cluster
      localMembers.find(_.address == joiningNode.address) match {
        case Some(m) if m.uniqueAddress == joiningNode ⇒
          // node retried join attempt, probably due to lost Welcome message
          logInfo("Existing member [{}] is joining again.", m)
          if (joiningNode != selfUniqueAddress)
            sender() ! Welcome(selfUniqueAddress, latestGossip)
        case Some(m) ⇒
          // node restarted, same host:port as existing member, but with different uid
          // safe to down and later remove existing member
          // new node will retry join
          logInfo("New incarnation of existing member [{}] is trying to join. " +
            "Existing will be removed from the cluster and then new member will be allowed to join.", m)
          if (m.status != Down) {
            // we can confirm it as terminated/unreachable immediately
            val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress)
            val newOverview = latestGossip.overview.copy(reachability = newReachability)
            val newGossip = latestGossip.copy(overview = newOverview)
            updateLatestGossip(newGossip)

            downing(m.address)
          }
        case None ⇒
          // remove the node from the failure detector
          failureDetector.remove(joiningNode.address)
          crossDcFailureDetector.remove(joiningNode.address)

          // add joining node as Joining
          // add self in case someone else joins before self has joined (Set discards duplicates)
          val newMembers = localMembers + Member(joiningNode, roles) + Member(selfUniqueAddress, cluster.selfRoles)
          val newGossip = latestGossip copy (members = newMembers)

          updateLatestGossip(newGossip)

          logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", "))
          if (joiningNode == selfUniqueAddress) {
            if (localMembers.isEmpty)
              leaderActions() // important for deterministic oldest when bootstrapping
          } else
            sender() ! Welcome(selfUniqueAddress, latestGossip)

          publishMembershipState()
      }

   因爲joining源碼太長,只貼上面部分代碼,很顯然會命中case None,第一次加入集羣嗎,確定是None。updateLatestGossip這個函數再也不分析,它應該是用來更新與Gossip協議相關的狀態的。因爲joiningNode == selfUniqueAddress且localMembers是空,由於此時尚未成員,因此會執行leaderActions()。leaderActions方法暫時也不分析,它應該就是計算當前集羣leader的,很顯然當前種子節點就是leader。

  而後咱們分析當前配置不止一個種子節點的狀況,此時會分兩種狀況,第一個節點啓東時會建立FirstSeedNodeProcess,其他節點啓動時會建立JoinSeedNodeProcess。

  先來看FirstSeedNodeProcess,在主構造函數中有一段代碼:self ! JoinSeedNode。也就是給本身發送了一個JoinSeedNode消息。

  收到JoinSeedNode消息後,給剩餘的節點發送了InitJoin消息,FirstSeedNodeProcess就算處理結束了。

  咱們來看JoinSeedNodeProcess,這個actor在preStart時給本身發了一個JoinSeedNode消息,收到消息後,會給其餘種子節點發送InitJoin消息,固然了,其餘節點也會返回InitJoinAck消息。

  請特別注意,上面兩個actor所說的其餘種子節點,是指其餘的ActorSystem系統的ClusterCoreDaemon這個actor。那麼收到InitJoin消息的節點ClusterCoreDaemon是如何處理的呢?

 def uninitialized: Actor.Receive = ({
    case InitJoin ⇒
      logInfo("Received InitJoin message from [{}], but this node is not initialized yet", sender())
      sender() ! InitJoinNack(selfAddress)
    case ClusterUserAction.JoinTo(address) ⇒
      join(address)
    case JoinSeedNodes(newSeedNodes) ⇒
      resetJoinSeedNodesDeadline()
      joinSeedNodes(newSeedNodes)
    case msg: SubscriptionMessage ⇒
      publisher forward msg
    case Welcome(from, gossip) ⇒
      welcome(from.address, from, gossip)
    case _: Tick ⇒
      if (joinSeedNodesDeadline.exists(_.isOverdue))
        joinSeedNodesWasUnsuccessful()
  }: Actor.Receive).orElse(receiveExitingCompleted)

   其實此時,對於其餘節點都是出於uninitialized狀態的,收到InitJoin時,會發送InitJoinNack消息。

case InitJoinNack(address) ⇒
      logInfo("Received InitJoinNack message from [{}] to [{}]", sender(), selfAddress)
      remainingSeedNodes -= address
      if (remainingSeedNodes.isEmpty) {
        // initialize new cluster by joining myself when nacks from all other seed nodes
        context.parent ! JoinTo(selfAddress)
        context.stop(self)
      }

   對於FirstSeedNodeProcess,收到全部其餘節點的InitJoinNack消息後會給parent也就是ClusterCoreDaemon發送JoinTo(selfAddress)消息,而後stop掉本身。若是Deadline.now + cluster.settings.SeedNodeTimeout指定的時間內沒有收到其餘節點的InitJoinNack消息,一樣給ClusterCoreDaemon發送JoinTo(selfAddress)消息,而後stop掉本身。

  對於JoinSeedNodeProcess收到InitJoinNack消息是不做任何處理的。

    case InitJoinNack(_) ⇒ // that seed was uninitialized

    case ReceiveTimeout ⇒
      if (attempt >= 2)
        log.warning(
          "Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]",
          attempt, seedNodes.filterNot(_ == selfAddress).mkString(", "))
      // no InitJoinAck received, try again
      self ! JoinSeedNode

   那麼此時FirstSeedNodeProcess所在節點ClusterCoreDaemon對JoinTo消息的處理就很重要了。

  很顯然第一個種子節點的ClusterCoreDaemon尚未初始化,對JoinTo消息的處理,前面已經分析過,就是調用join(address),而address就是selfAddress,即當前節點成功加入集羣,而且狀態變成已初始化,即當前集羣已經有一個節點了,就是第一個種子節點。那麼JoinSeedNodeProcess怎麼辦呢,好像沒人管他了呢。

  幸虧,它的主構造函數設置了timeout

context.setReceiveTimeout(SeedNodeTimeout)

   上面代碼顯示,收到timeout消息後,又給本身發送了JoinSeedNode消息,從新加入其它節點,此時第一個種子節點已經完成初始化。

def initialized: Actor.Receive = ({
    case msg: GossipEnvelope ⇒ receiveGossip(msg)
    case msg: GossipStatus   ⇒ receiveGossipStatus(msg)
    case GossipTick          ⇒ gossipTick()
    case GossipSpeedupTick   ⇒ gossipSpeedupTick()
    case ReapUnreachableTick ⇒ reapUnreachableMembers()
    case LeaderActionsTick   ⇒ leaderActions()
    case PublishStatsTick    ⇒ publishInternalStats()
    case InitJoin(joiningNodeConfig) ⇒
      logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress)
      initJoin(joiningNodeConfig)
    case Join(node, roles)                ⇒ joining(node, roles)
    case ClusterUserAction.Down(address)  ⇒ downing(address)
    case ClusterUserAction.Leave(address) ⇒ leaving(address)
    case SendGossipTo(address)            ⇒ sendGossipTo(address)
    case msg: SubscriptionMessage         ⇒ publisher forward msg
    case QuarantinedEvent(address, uid)   ⇒ quarantined(UniqueAddress(address, uid))
    case ClusterUserAction.JoinTo(address) ⇒
      logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
    case JoinSeedNodes(nodes) ⇒
      logInfo(
        "Trying to join seed nodes [{}] when already part of a cluster, ignoring",
        nodes.mkString(", "))
    case ExitingConfirmed(address) ⇒ receiveExitingConfirmed(address)
  }: Actor.Receive).orElse(receiveExitingCompleted)

   初始化的ClusterCoreDaemon收到InitJoin消息後,會去調用initJoin方法。

def initJoin(joiningNodeConfig: Config): Unit = {
    val selfStatus = latestGossip.member(selfUniqueAddress).status
    if (removeUnreachableWithMemberStatus.contains(selfStatus)) {
      // prevents a Down and Exiting node from being used for joining
      logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender())
      sender() ! InitJoinNack(selfAddress)
    } else {
      logInfo("Sending InitJoinAck message from node [{}] to [{}]", selfAddress, sender())
      // run config compatibility check using config provided by
      // joining node and current (full) config on cluster side

      val configWithoutSensitiveKeys = {
        val allowedConfigPaths = JoinConfigCompatChecker.removeSensitiveKeys(context.system.settings.config, cluster.settings)
        // build a stripped down config instead where sensitive config paths are removed
        // we don't want any check to happen on those keys
        JoinConfigCompatChecker.filterWithKeys(allowedConfigPaths, context.system.settings.config)
      }

      joinConfigCompatChecker.check(joiningNodeConfig, configWithoutSensitiveKeys) match {
        case Valid ⇒
          val nonSensitiveKeys = JoinConfigCompatChecker.removeSensitiveKeys(joiningNodeConfig, cluster.settings)
          // Send back to joining node a subset of current configuration
          // containing the keys initially sent by the joining node minus
          // any sensitive keys as defined by this node configuration
          val clusterConfig = JoinConfigCompatChecker.filterWithKeys(nonSensitiveKeys, context.system.settings.config)
          sender() ! InitJoinAck(selfAddress, CompatibleConfig(clusterConfig))
        case Invalid(messages) ⇒
          // messages are only logged on the cluster side
          log.warning("Found incompatible settings when [{}] tried to join: {}", sender().path.address, messages.mkString(", "))
          sender() ! InitJoinAck(selfAddress, IncompatibleConfig)
      }

    }
  }

   其實InitJoin也就是把其餘節點的配置與當前節點的配置作了兼容性測試,而後發送InitJoinAck做爲InitJoin的應答,同時包含兼容性結果。而後種子actor(JoinSeedNodeProcess)收到應答後,給parent發送JoinTo(address)消息,將行爲函數改爲done,其實也就是在timeout消息後再stop(self),而不是當即stop。此時JoinSeedNodeProcess所在的ClusterCoreDaemon尚未完成初始化,因此會調用join(address),只不過這裏的address是第一個初始化的種子節點的地址。

val joinDeadline = RetryUnsuccessfulJoinAfter match {
          case d: FiniteDuration ⇒ Some(Deadline.now + d)
          case _                 ⇒ None
        }
        context.become(tryingToJoin(address, joinDeadline))
        clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles)

   因此join函數會執行以上代碼段。修改當前行爲函數爲tryingToJoin,而後給對應的address(也就是第一個種子節點)發送Join消息。因爲第一個種子節點已經完成初始化,因此會命中initialized函數中的一下分支

case Join(node, roles)                ⇒ joining(node, roles)

   上面代碼中的node對於第一個種子節點來講是JoinSeedNodeProcess所在節點的地址,也就是遠程節點地址,因此joining方法中下面的代碼會走else代碼塊。

if (joiningNode == selfUniqueAddress) {
            if (localMembers.isEmpty)
              leaderActions() // important for deterministic oldest when bootstrapping
          } else
            sender() ! Welcome(selfUniqueAddress, latestGossip)

   也就是給sender發送一個Welcome消息,因爲JoinSeedNodeProcess所在節點的ClusterCoreDaemon行爲函數仍是tryingToJoin。

def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
    case Welcome(from, gossip) ⇒
      welcome(joinWith, from, gossip)
    case InitJoin ⇒
      logInfo("Received InitJoin message from [{}], but this node is not a member yet", sender())
      sender() ! InitJoinNack(selfAddress)
    case ClusterUserAction.JoinTo(address) ⇒
      becomeUninitialized()
      join(address)
    case JoinSeedNodes(newSeedNodes) ⇒
      resetJoinSeedNodesDeadline()
      becomeUninitialized()
      joinSeedNodes(newSeedNodes)
    case msg: SubscriptionMessage ⇒ publisher forward msg
    case _: Tick ⇒
      if (joinSeedNodesDeadline.exists(_.isOverdue))
        joinSeedNodesWasUnsuccessful()
      else if (deadline.exists(_.isOverdue)) {
        // join attempt failed, retry
        becomeUninitialized()
        if (seedNodes.nonEmpty) joinSeedNodes(seedNodes)
        else join(joinWith)
      }
  }: Actor.Receive).orElse(receiveExitingCompleted)

   收到消息後調用了welcome方法。

**
   * Accept reply from Join request.
   */
  def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = {
    require(latestGossip.members.isEmpty, "Join can only be done from empty state")
    if (joinWith != from.address)
      logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
    else {
      membershipState = membershipState.copy(latestGossip = gossip).seen()
      logInfo("Welcome from [{}]", from.address)
      assertLatestGossip()
      publishMembershipState()
      if (from != selfUniqueAddress)
        gossipTo(from, sender())
      becomeInitialized()
    }
  }

   簡單點來講上述代碼的公共就是更新gossip協議相關的數據,而後調用becomeInitialized完成初始化,並修改行爲函數爲initialized,結束初始化過程。

  這樣,全部的種子節點都加入了集羣,其餘全部非種子節點,都會按照第二個種子節點的方法(也就是經過JoinSeedNodeProcess)加入集羣。至此,節點加入集羣的過程就分析清楚了,但還要一些細節沒有研究,好比gossip協議是怎樣實現的,種子節點restart以後如何從新加入集羣,非種子節點restart如何加入集羣。

  那麼咱們就能夠分析clusterCore對InternalClusterAction.Subscribe的處理了。

case msg: SubscriptionMessage         ⇒ publisher forward msg

   由於當前ClusterCoreDaemon已經完成了初始化,因此會命中initialized函數的上面的case,其實就是簡單的把消息轉發給了publisher,而publisher又是啥呢?根據建立的上下文publisher是ClusterDomainEventPublisher這個actor

 def receive = {
    case PublishChanges(newState)            ⇒ publishChanges(newState)
    case currentStats: CurrentInternalStats  ⇒ publishInternalStats(currentStats)
    case SendCurrentClusterState(receiver)   ⇒ sendCurrentClusterState(receiver)
    case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to)
    case Unsubscribe(subscriber, to)         ⇒ unsubscribe(subscriber, to)
    case PublishEvent(event)                 ⇒ publish(event)
  }

  ClusterDomainEventPublisher這個actor其實就是保存當前集羣中的各成員節點信息和狀態的,固然這些狀態是經過gossip協議以後發佈的。收到對應的Subscribe消息後,會把當前節點狀態發送給訂閱者。固然這裏有兩個模式,一個是把當前集羣成員狀態一次性發出去另外一個是把各個集羣成員狀態分別發送出去,其實能夠理解爲批量與單條的區別。

  無論如何,在preStart調用cluster.subscribe的Actor,都會受到集羣成員狀態的信息。不過讀者須要注意,集羣成員狀態,只是各個節點host、port、角色等信息,並非某一個特定actor的信息。那讀者要問了,我僅僅知道集羣中成員節點的host/port信息有啥用呢?我要跟集羣中某個actor通訊啊。嗯,你說的有道理,我以前也是這麼想的。但再想一想就不對了,既然你都知道了集羣中全部節點的host、port、角色信息了,確定能經過這些信息構造ActorPath傳給actorSelection給遠程節點任意actor發消息啊!!!

  下面咱們來分析一下在配置中沒有種子節點信息的狀況下,如何加入集羣。首先,確定不會報錯,前面分析過,只是打印了一條日誌而已。

val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)

   官方也說明了如何編程的方式加入集羣,是否是很簡單,就是調用了joinSeedNodes方法。

  /**
   * Join the specified seed nodes without defining them in config.
   * Especially useful from tests when Addresses are unknown before startup time.
   *
   * An actor system can only join a cluster once. Additional attempts will be ignored.
   * When it has successfully joined it must be restarted to be able to join another
   * cluster or to join the same cluster again.
   */
  def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
    clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))

   其實就是給clusterCore發送了JoinSeedNodes消息,還記得ClusterCoreDaemon在preStart發的是什麼消息麼?嗯,沒錯,就是這個消息。

  那cluster.joinSeedNodes應該在哪裏調用呢?其實吧,cluster這個變量只會初始化一次,你在哪裏調用均可以,main方法中,某個actor的主構造函數中,某個actor的preStart中。不過,我通常都在ActorSystem建立完成後緊接着調用,

val system = ActorSystem("ClusterSystem", config)
val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)

   cluster.joinSeedNodes(list)執行完畢後,就能夠建立你本身的actor了,actor在preStart訂閱成員信息的相關狀態就能夠了。

  怎麼樣,這樣分析下來,cluster是否是也很簡單呢?固然了,還有不少其餘細節沒有分析,好比Gossip協議、成員的生命週期什麼的。

相關文章
相關標籤/搜索