Akka源碼分析-Cluster-ClusterClient

  ClusterClient能夠與某個集羣通訊,而自己節點沒必要是集羣的一部分。它只須要知道一個或多個節點的位置做爲聯繫節點。它會跟ClusterReceptionist 創建鏈接,來跟集羣中的特定節點發送消息。並且必須把provider改爲remote或cluster。receptionist須要在集羣全部節點或一組節點內啓動,它能夠自行啓動或經過ClusterReceptionist 擴展來啓動。ClusterClient能夠進行通訊的actor必須是經過ClusterReceptionis擴展註冊過的actor。html

  看到這裏,你是否是想罵人了,這麼簡單的功能我均可以本身實現了。不過akka就是這樣,一些看似很是簡單的功能,框架自己提供的功能更加穩定、更加通用,但性能不必定是最優的。廢話很少說,咱們來看看ClusterClient的具體實現。node

/**
 * This actor is intended to be used on an external node that is not member
 * of the cluster. It acts like a gateway for sending messages to actors
 * somewhere in the cluster. From the initial contact points it will establish
 * a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will
 * monitor the connection to the receptionist and establish a new connection if
 * the link goes down. When looking for a new receptionist it uses fresh contact
 * points retrieved from previous establishment, or periodically refreshed
 * contacts, i.e. not necessarily the initial contact points.
 *
 * You can send messages via the `ClusterClient` to any actor in the cluster
 * that is registered in the [[ClusterReceptionist]].
 * Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]]
 * or [[ClusterClient.Publish]].
 *
 *  Use the factory method [[ClusterClient#props]]) to create the
 * [[akka.actor.Props]] for the actor.
 *
 * If the receptionist is not currently available, the client will buffer the messages
 * and then deliver them when the connection to the receptionist has been established.
 * The size of the buffer is configurable and it can be disabled by using a buffer size
 * of 0. When the buffer is full old messages will be dropped when new messages are sent
 * via the client.
 *
 * Note that this is a best effort implementation: messages can always be lost due to the distributed
 * nature of the actors involved.
 */
final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging

   經過ClusterClient的定義和官方註釋來看,就是一個普通的actor,它能夠集羣中的特定actor(ClusterReceptionist)進行通訊。它經過初始的聯繫點(其實就是ActorPath)與集羣內的ClusterReceptionist發消息,同時會監控receptionist的連接狀態,以確保連接正常。ClusterClient沒有重定義preStart,那就看它的主構造函數吧。api

sendGetContacts()
scheduleRefreshContactsTick(establishingGetContactsInterval)
self ! RefreshContactsTick

   分別調用了上面三段代碼。緩存

def sendGetContacts(): Unit = {
    val sendTo =
      if (contacts.isEmpty) initialContactsSel
      else if (contacts.size == 1) initialContactsSel union contacts
      else contacts
    if (log.isDebugEnabled)
      log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""")
    sendTo.foreach { _ ! GetContacts }
  }

   sendGetContacts很簡單就是給當前的聯繫點發送GetContacts消息。安全

def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = {
    refreshContactsTask foreach { _.cancel() }
    refreshContactsTask = Some(context.system.scheduler.schedule(
      interval, interval, self, RefreshContactsTick))
  }

   scheduleRefreshContactsTick啓動定時器在interval以後,每隔interval時間,給本身發送RefreshContactsTick消息。網絡

  第三段給本身發送了RefreshContactsTick消息。感受後面兩個代碼有點重複,定時器第一個參數直接設置成0不就行了?省略了第三段代碼的調用。app

case RefreshContactsTick ⇒ sendGetContacts()

   收到RefreshContactsTick消息怎麼處理?仍是調用sendGetContacts。那請問在主構造函數裏面調用sendGetContacts幹啥呢?框架

var contactPaths: HashSet[ActorPath] =
    initialContacts.to[HashSet]
  val initialContactsSel =
    contactPaths.map(context.actorSelection)
  var contacts = initialContactsSel

   initialContactsSel、contacts、contactPaths、initialContacts是否是很類似呢?ide

  其中initialContactsSel最關鍵,這是把initialContacts給map成了ActorSelection,同時還給initialContacts發送了Identity消息。ActorPath是遠程的actor,怎麼select呢?還記得上文說過麼?必須把provider配置成remote或者cluster,爲啥?你猜。函數

case ActorIdentity(_, Some(receptionist)) ⇒
        log.info("Connected to [{}]", receptionist.path)
        scheduleRefreshContactsTick(refreshContactsInterval)
        sendBuffered(receptionist)
        context.become(active(receptionist) orElse contactPointMessages)
        connectTimerCancelable.foreach(_.cancel())
        failureDetector.heartbeat()
        self ! HeartbeatTick // will register us as active client of the selected receptionist

   收到ActorIdentity以後調用scheduleRefreshContactsTick從新設置定時器,把緩存的消息發送給receptionist ,修改當前行爲變成active。至此就能夠經過Send、SendToAll、Publish給集羣內特定的actor轉發消息了。

def active(receptionist: ActorRef): Actor.Receive = {
    case Send(path, msg, localAffinity) ⇒
      receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity)
    case SendToAll(path, msg) ⇒
      receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
    case Publish(topic, msg) ⇒
      receptionist forward DistributedPubSubMediator.Publish(topic, msg)
    case HeartbeatTick ⇒
      if (!failureDetector.isAvailable) {
        log.info("Lost contact with [{}], reestablishing connection", receptionist)
        reestablish()
      } else
        receptionist ! Heartbeat
    case HeartbeatRsp ⇒
      failureDetector.heartbeat()
    case RefreshContactsTick ⇒
      receptionist ! GetContacts
    case Contacts(contactPoints) ⇒
      // refresh of contacts
      if (contactPoints.nonEmpty) {
        contactPaths = contactPoints.map(ActorPath.fromString).to[HashSet]
        contacts = contactPaths.map(context.actorSelection)
      }
      publishContactPoints()
    case _: ActorIdentity ⇒ // ok, from previous establish, already handled
    case ReceptionistShutdown ⇒
      if (receptionist == sender()) {
        log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist)
        reestablish()
      }
  }

   總結下ClusterClient的行爲,它經過配置的initialContacts給遠程的actor(集羣內的ClusterReceptionist)發送ActorSelection消息,而後在收到第一個ActorIdentity消息後,就算聯繫上了集羣。(剩下的ActorIdentity消息被忽略,其實就是最快返回的做爲聯繫點)。定時第一個返回ActorIdentity消息的ClusterReceptionist發送GetContacts消息,獲取全部的ClusterReceptionist實例的位置。那如何判斷第一個聯繫點失去聯繫了呢?看到HeartbeatTick了嗎?

val heartbeatTask = context.system.scheduler.schedule(
    heartbeatInterval, heartbeatInterval, self, HeartbeatTick)

   咱們剛纔忽略了heartbeatTask的定義,其實這是一個定時器,每隔heartbeatInterval秒給本身發送HeartbeatTick消息。其實關於在變量定義過程當中寫代碼,我是不喜歡的,不利於分析源碼的啊。

  收到HeartbeatTick消息就給receptionist發送了Heartbeat消息,在收到HeartbeatRsp後更新failureDetector當前的心跳信息。若是failureDetector檢測到失敗則調用reestablish方法,從新創建連接。

  ClusterClient的源碼就分析到這裏,下面咱們來看看Cluster內的ClusterReceptionist的實現,以前說過,咱們能夠用actorOf啓動或者ClusterReceptionist擴展來啓動。固然優先看ClusterReceptionist擴展了啊。

object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider {
  override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system)

  override def lookup() = ClusterClientReceptionist

  override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist =
    new ClusterClientReceptionist(system)
}

   上面是ExtensionId的定義,很顯然它還擴展了ExtensionIdProvider,也就是說,經過配置這個Extension就能夠啓動了,無需代碼顯式的啓動。

/**
 * Extension that starts [[ClusterReceptionist]] and accompanying [[akka.cluster.pubsub.DistributedPubSubMediator]]
 * with settings defined in config section `akka.cluster.client.receptionist`.
 * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSub]] extension.
 */
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension

   有沒有發現關於重要的類,官方註釋都很清晰?這個擴展啓動ClusterReceptionist和DistributedPubSubMediator,而DistributedPubSubMediator由DistributedPubSub擴展啓動,關於DistributedPubSub後面再分析。

/**
   * The [[ClusterReceptionist]] actor
   */
  private val receptionist: ActorRef = {
    if (isTerminated)
      system.deadLetters
    else {
      val name = config.getString("name")
      val dispatcher = config.getString("use-dispatcher") match {
        case "" ⇒ Dispatchers.DefaultDispatcherId
        case id ⇒ id
      }
      // important to use val mediator here to activate it outside of ClusterReceptionist constructor
      val mediator = pubSubMediator
      system.systemActorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config))
        .withDispatcher(dispatcher), name)
    }
  }

  /**
   * Returns the underlying receptionist actor, particularly so that its
   * events can be observed via subscribe/unsubscribe.
   */
  def underlying: ActorRef =
    receptionist
/**
   * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]].
   */
  private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator

  ClusterClientReceptionist定義中有上面源碼,很是關鍵,它啓動了一個ClusterReceptionist,其餘源碼都是註冊和註銷服務的,咱們先忽略。

/**
 * [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is
 * supposed to be started on all nodes, or all nodes with specified role, in the cluster.
 * The receptionist can be started with the [[ClusterClientReceptionist]] or as an
 * ordinary actor (use the factory method [[ClusterReceptionist#props]]).
 *
 * The receptionist forwards messages from the client to the associated [[akka.cluster.pubsub.DistributedPubSubMediator]],
 * i.e. the client can send messages to any actor in the cluster that is registered in the
 * `DistributedPubSubMediator`. Messages from the client are wrapped in
 * [[akka.cluster.pubsub.DistributedPubSubMediator.Send]], [[akka.cluster.pubsub.DistributedPubSubMediator.SendToAll]]
 * or [[akka.cluster.pubsub.DistributedPubSubMediator.Publish]] with the semantics described in
 * [[akka.cluster.pubsub.DistributedPubSubMediator]].
 *
 * Response messages from the destination actor are tunneled via the receptionist
 * to avoid inbound connections from other cluster nodes to the client, i.e.
 * the `sender()`, as seen by the destination actor, is not the client itself.
 * The `sender()` of the response messages, as seen by the client, is `deadLetters`
 * since the client should normally send subsequent messages via the `ClusterClient`.
 * It is possible to pass the original sender inside the reply messages if
 * the client is supposed to communicate directly to the actor in the cluster.
 *
 */
final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings)
  extends Actor with ActorLogging

   加上咱們以前的分析和官方註釋,這個actor就很好理解了。ClusterClient就是發送GetContracts消息給這個actor的,ClusterReceptionist在集羣內全部節點或一組節點啓動。它能夠經過ClusterClientReceptionist這個擴展啓動,或者做爲普通的actor啓動(actorOf)。ClusterReceptionist把ClusterClient轉發的消息再吃給你信轉發給DistributedPubSubMediator或註冊的DistributedPubSubMediator(也就是咱們註冊的Service)。目標actor返回的消息經過DistributedPubSubMediator打的「洞」返回給客戶端,其實就是修改了sender。

  這個定義也能夠看出,它就是一個很是普通的actor。從源碼來看,主構造函數和preStart函數都沒有須要特別注意的地方,那就直接看receive嘍。

case GetContacts ⇒
      // Consistent hashing is used to ensure that the reply to GetContacts
      // is the same from all nodes (most of the time) and it also
      // load balances the client connections among the nodes in the cluster.
      if (numberOfContacts >= nodes.size) {
        val contacts = Contacts(nodes.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut))
        if (log.isDebugEnabled)
          log.debug("Client [{}] gets contactPoints [{}] (all nodes)", sender().path, contacts.contactPoints.mkString(","))
        sender() ! contacts
      } else {
        // using toStringWithAddress in case the client is local, normally it is not, and
        // toStringWithAddress will use the remote address of the client
        val a = consistentHash.nodeFor(sender().path.toStringWithAddress(cluster.selfAddress))
        val slice = {
          val first = nodes.from(a).tail.take(numberOfContacts)
          if (first.size == numberOfContacts) first
          else first union nodes.take(numberOfContacts - first.size)
        }
        val contacts = Contacts(slice.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut))
        if (log.isDebugEnabled)
          log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(","))
        sender() ! contacts
      }

   對GetContacts消息的處理咱們須要特別關注,畢竟ClusterClient就是發送這個消息來獲取集羣內service信息的。第一個if語句的註釋也很明白,有一個一致性hash來保證全部節點對GetContacts消息的返回都是一致的。

case msg @ (_: Send | _: SendToAll | _: Publish) ⇒
      val tunnel = responseTunnel(sender())
      tunnel ! Ping // keep alive
      pubSubMediator.tell(msg, tunnel)

   上面就是收到Send、SendToAll、Publish消息的處理邏輯。好像就是把消息發送給了pubSubMediator,這裏出現了前面註釋中說的「打洞」

def responseTunnel(client: ActorRef): ActorRef = {
    val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8")
    context.child(encName) match {
      case Some(tunnel) ⇒ tunnel
      case None ⇒
        context.actorOf(Props(classOf[ClientResponseTunnel], client, responseTunnelReceiveTimeout), encName)
    }
  }

   它在幹啥,又建立了一個ClientResponseTunnel這個actor?把這個actor做爲service消息的返回者?而後還有一個responseTunnelReceiveTimeout超時時間?

/**
     * Replies are tunneled via this actor, child of the receptionist, to avoid
     * inbound connections from other cluster nodes to the client.
     */
    class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
      context.setReceiveTimeout(timeout)

      private val isAsk = {
        val pathElements = client.path.elements
        pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
      }

      def receive = {
        case Ping ⇒ // keep alive from client
        case ReceiveTimeout ⇒
          log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
          context stop self
        case msg ⇒
          client.tell(msg, Actor.noSender)
          if (isAsk)
            context stop self
      }
    }

   這個actor功能很簡單,就是給client轉發消息,這尼瑪有點太繞了啊。在本地給各個client有建立了一個代理actor啊,返回的消息都經過這個actor返回啊,爲啥不直接在服務端就把消息發送給client了呢?其實想一想這是很是合理且必要的。有可能service所在的節點,與客戶端網絡是不通的。或者爲了安全管理不能直接通訊,經過這個代理回送消息就很必要了。無論怎麼樣吧,akka的都是對的,akka的都是好的。

case Heartbeat ⇒
      if (verboseHeartbeat) log.debug("Heartbeat from client [{}]", sender().path)
      sender() ! HeartbeatRsp
      updateClientInteractions(sender())

   還有就是對客戶端發送的Heartbeat消息的處理,處理邏輯很簡單,但有一點須要注意,那就是對客戶端列表的一個維護。也就是說在每一個ClusterReceptionist都是有客戶端列表的。其實吧,這一點我是很是不贊同的。畢竟客戶端有多是海量的,光是維護這個列表就很是耗內存了。弄這個列表雖然功能上很是豐富,但容易形成OOM啊。若是客戶端很少,說明akka尚未正式被你們所熟知或者被大公司使用啊。

  好了,ClusterClient就分析到這裏了。聰明的讀者可能會問,我尚未看到消息是如何經過ClusterReceptionist發送給實際的服務actor啊,pubSubMediator.tell(msg, tunnel)這段代碼是如何路由消息的呢?嗯,確實,不過別急,這個會在下一章節(DistributedPubSubMediator)講解。畢竟官方在ClusterClient的文檔中,直接推薦用DistributedPubSubMediator來實現相似的功能。我以爲吧,這又是一個坑,既然你都推薦DistributedPubSubMediator了,還提供ClusterClient模塊幹啥呢?直接廢棄掉啊。

相關文章
相關標籤/搜索