Akka源碼分析-深刻ActorRef&ActorPath

  上一節咱們深刻討論了ActorRef等相關的概念及其關係,但ActorRef和ActorPath的關係還須要再加以分析說明。其實仍是官網說的比較清楚。html

「A path in an actor system represents a 「place」 which might be occupied by a living actor. Initially (apart from system initialized actors) a path is empty. When actorOf() is called it assigns an incarnation of the actor described by the passed Props to the given path. An actor incarnation is identified by the path and a UID.」app

「A restart only swaps the Actor instance defined by the Props but the incarnation and hence the UID remains the same. As long as the incarnation is same, you can keep using the same ActorRef. Restart is handled by the Supervision Strategy of actor’s parent actor, and there is more discussion about what restart means.dom

The lifecycle of an incarnation ends when the actor is stopped. At that point the appropriate lifecycle events are called and watching actors are notified of the termination. After the incarnation is stopped, the path can be reused again by creating an actor with actorOf(). In this case the name of the new incarnation will be the same as the previous one but the UIDs will differ. An actor can be stopped by the actor itself, another actor or the ActorSystemide

「An ActorRef always represents an incarnation (path and UID) not just a given path. Therefore if an actor is stopped and a new one with the same name is created an ActorRef of the old incarnation will not point to the new one.」函數

  咱們總結一下官網的說明。開發者自定義的Actor經過actorOf建立的時候,都會分配一個UID,actor的路徑(層級關係)+UID惟一標誌一個Actor實例,也就是所謂的ActorRef。post

@tailrec final def newUid(): Int = {
    // Note that this uid is also used as hashCode in ActorRef, so be careful
    // to not break hashing if you change the way uid is generated
    val uid = ThreadLocalRandom.current.nextInt()
    if (uid == undefinedUid) newUid
    else uid
  }

  上面是UID的生成方法,其實就是一個隨機數,這樣能夠保證每次生成的UID不重複。從官網描述來看,這個uid就是ActorRef的hashCode值。ui

  在Actor完整的生命週期過程當中,也就是沒有被terminate,UID不會發生變化,即便Actor發生了restart。但如要注意理解此處的restart,是指actor處理消息時拋出了異常,被監督者處理並調用了restart方法。這與Actor先stop,再actorOf建立是大相徑庭的。actorOf用一樣的名字從新建立actor會致使Actor的UID發生變化,也就會致使ActorRef不會從新指向新建立的Actor,其實此時Actor的路徑(層級關係)是相同的。this

  final override def hashCode: Int = {
    if (path.uid == ActorCell.undefinedUid) path.hashCode
    else path.uid
  }

  /**
   * Equals takes path and the unique id of the actor cell into account.
   */
  final override def equals(that: Any): Boolean = that match {
    case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path
    case _               ⇒ false
  }

   咱們來看一下abstract class ActorRef對hasCode和equals的定義就大概明白,UID的具體做用了,跟咱們分析的是一致的。那咱們來看看ActorPath的equals是如何定義的。spa

  override def equals(other: Any): Boolean = {
    @tailrec
    def rec(left: ActorPath, right: ActorPath): Boolean =
      if (left eq right) true
      else if (left.isInstanceOf[RootActorPath]) left equals right
      else if (right.isInstanceOf[RootActorPath]) right equals left
      else left.name == right.name && rec(left.parent, right.parent)

    other match {
      case p: ActorPath ⇒ rec(this, p)
      case _            ⇒ false
    }
  }

   對上面的代碼簡單分析一下就會發下,ActorPath在計算是否相等時,除了判斷當前的hashCode是否相同外,就是在遞歸的判斷當前ActorPath的name是否相同,跟UID沒有關係,雖然在ActorPath的定義中也有uid值,且Actor的uid就是保存在ActorPath中,但該uid是一個內部變量,且只提供給ActorRef使用。scala

  咱們再來看看Actor的restart過程。

 final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
        case msg                      ⇒ receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

   相信invoke你們應該知道這是作什麼的了吧,全部發送個mailbox的用戶消息都會經過調用invoke來處理。很明顯在receiveMessage發生異常的過程當中,若是不是致命錯誤,就會去調用handleInvokeFailure處理。

final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
    // prevent any further messages to be processed until the actor has been restarted
    if (!isFailed) try {
      suspendNonRecursive()
      // suspend children
      val skip: Set[ActorRef] = currentMessage match {
        case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
        case _                                ⇒ { setFailed(self); Set.empty }
      }
      suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
      t match {
        // tell supervisor
        case _: InterruptedException ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
        case _ ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, t, uid))
      }
    } catch handleNonFatalOrInterruptedException { e ⇒
      publish(Error(e, self.path.toString, clazz(actor),
        "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
      try children foreach stop
      finally finishTerminate()
    }
  }

   handleInvokeFailure咱們也分析過,它給父actor發送了一個Failed消息,代表某個子actor發生了異常。

  Failed屬於系統消息,會去調用invokeAll,很顯然調用了handleFailure處理異常。

 final protected def handleFailure(f: Failed): Unit = {
    currentMessage = Envelope(f, f.child, system)
    getChildByRef(f.child) match {
      /*
       * only act upon the failure, if it comes from a currently known child;
       * the UID protects against reception of a Failed from a child which was
       * killed in preRestart and re-created in postRestart
       */
      case Some(stats) if stats.uid == f.uid ⇒
        if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
      case Some(stats) ⇒
        publish(Debug(self.path.toString, clazz(actor),
          "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
      case None ⇒
        publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
    }
  }

   handleFailure中經過發生異常的Actor的ActorRef找到對應的Actor實例,而後去調用該Actor的監督策略對異常的處理方案,若是該actor沒法處理該異常,則繼續throw。

  def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
    val directive = decider.applyOrElse(cause, escalateDefault)
    directive match {
      case Resume ⇒
        logFailure(context, child, cause, directive)
        resumeChild(child, cause)
        true
      case Restart ⇒
        logFailure(context, child, cause, directive)
        processFailure(context, true, child, cause, stats, children)
        true
      case Stop ⇒
        logFailure(context, child, cause, directive)
        processFailure(context, false, child, cause, stats, children)
        true
      case Escalate ⇒
        logFailure(context, child, cause, directive)
        false
    }
  }

  經過當前監督策略來判斷如何處理異常,默認狀況下,都是Restart,因此調用了processFailure方法。默認的監督策略通常是OneForOneStrategy

def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
    if (restart && stats.requestRestartPermission(retriesWindow))
      restartChild(child, cause, suspendFirst = false)
    else
      context.stop(child) //TODO optimization to drop child here already?
  }

   上面是OneForOneStrategy的processFailure方法實現,就是去調用restarChild。

  final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
    val c = child.asInstanceOf[InternalActorRef]
    if (suspendFirst) c.suspend()
    c.restart(cause)
  }

   restarChild最終又調用了發生異常的Actor的restart方法,是經過ActorRef調用的。經過前面的分析咱們知道,這個ActorRef最終是一個RepointableActorRef。

def restart(cause: Throwable): Unit = underlying.restart(cause)

   上面是restart的定義,咱們發現又去調用了underlying的restart,真是很繞啊。underlying是啥?固然是ActorRef引用的ActorCell了啊。可是咱們翻了ActorCell的代碼並無發現restart的實現!可是咱們卻在ActorCell混入的Dispatch中發現了restart的蹤跡!

final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException

   很簡單,就是使用dispatcher給當前的ActorCell發送了一個Recreate消息。經過前面invokeAll咱們知道收到Recreate後調用了faultRecreate,這個函數咱們也分析過,就是調用了原有Actor的aroundPreRestart函數,而後調用finishRecreate函數。

  private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
    // need to keep a snapshot of the surviving children before the new actor instance creates new ones
    val survivors = children

    try {
      try resumeNonRecursive()
      finally clearFailed() // must happen in any case, so that failure is propagated

      val freshActor = newActor()
      actor = freshActor // this must happen before postRestart has a chance to fail
      if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.

      freshActor.aroundPostRestart(cause)
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))

      // only after parent is up and running again do restart the children which were not stopped
      survivors foreach (child ⇒
        try child.asInstanceOf[InternalActorRef].restart(cause)
        catch handleNonFatalOrInterruptedException { e ⇒
          publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
        })
    } catch handleNonFatalOrInterruptedException { e ⇒
      clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again
      handleInvokeFailure(survivors, PostRestartException(self, e, cause))
    }
  }

   finishRecreate中調用newActor產生了一個新的Actor實例,調用了該實例的aroundPostRestart函數,最後若是可能則循環調用子actor的restart函數。

  在actor的restart的工程中,咱們發現沒有任何涉及ActorPath和ActorRef修改或更新的地方,更沒有uid變動的地方。這樣就意味着,Actor的restart過程當中,ActorRef不會失效,ActorPath更不會失效。還記得actorOf的過程麼,其中有一步調用了makeChild,裏面調用newUid產生了一個新的uid值給ActorRef,因此Actor被stop掉,而後用actorOf重建以後,actorRef固然會失效了。

  其實咱們能夠這樣簡單的理解,ActorRef = 「ActorPathString」 + UID。開發者自定義的Actor類是一個靜態的概念,當類經過actorOf建立的時候,就會產生一個Actor實例,若是該Actor因爲某種緣由失敗,被系統restart,系統會新生成一個Actor實例,但該實例的UID不變,因此ActorRef指向相同路徑下的actor實例。ActorPath標誌Actor的樹形路徑,經過它能夠找到這個路徑下的實例,但實例的UID是否是相同則不關心。

相關文章
相關標籤/搜索