Akka中的Message Stash






object SomeApp {
  ..... //程序初始化 ActorSystem、ExecutorService ....
  val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") 
  def main(args: Array[String]): Unit = {
    val router = 
      path("healthState"){ //akka-http dsl
          imperativelyComplete {
            httpCtx =>
              clusterhealthActor ! httpCtx //向該actor發送http請求實例
class AkkaClusterHealthActor extends Actor with Stash {

  val cluster = Cluster(context.system) //獲取集羣狀態信息,包括節點的列表、地址、狀態等等

  override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化時

  override def receive: Receive = {
    case _ =>

  def waitingForHttpRequest: Receive = {
    case httpCtx: ImperativeRequestContext => //等待http請求
      val members = cluster.state.members.toList //包含自身節點
      members.foreach { member =>
        context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //請求集羣的節點健康狀態 ,member.address akka.tcp://itoa@
      context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部變量的產生,members.length 須要等待幾個節點的返回,有可能超時
      context.setReceiveTimeout(3 seconds)

  def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集羣個數 用於接收到的信息個數
    case nodeStateResult: StateResult =>
      (memberNum - 1) match {
        case 0 => //完成任務 取消超時設置
          unstashAll() //完成該請求,處理其餘請求
        case _ => //仍有部分節點沒有返回健康狀態結果,繼續等待
          context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //這裏的代碼有點相似於 `尾遞歸`
    case ReceiveTimeout =>
      context.setReceiveTimeout(Duration.Inf) //完成任務 取消超時設置
      ctx.complete(results) //返回部分結果,完成http請求
      unstashAll() //完成該次請求,釋放暫存的http請求,處理其餘請求
    case _ =>
      stash() //處理某個請求中,可是收到了其餘http請求,暫存消息


trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]


trait UnrestrictedStash extends Actor with StashSupport {
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    try unstashAll() finally super.preRestart(reason, message)
  override def postStop(): Unit = try unstashAll() finally super.postStop()
private[akka] trait StashSupport {

  private[akka] def context: ActorContext

  private[akka] def self: ActorRef

  private var theStash = Vector.empty[Envelope]

  // ActorContext是ActorCell的子類
  private def actorCell = context.asInstanceOf[ActorCell] 
  private val capacity: Int =
    context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox)

  private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
    actorCell.mailbox.messageQueue match {
      case queue: DequeBasedMessageQueueSemantics ⇒ queue
      case other ⇒ throw ActorInitializationException
  //1. 獲取當前的消息,不能將同一個消息存兩次(不能調用兩次`stash()`函數,`eq`用於比較對象之間的內存地址)
  //2. capacity默認-1,存儲容量不限
  //3. 暫存的消息被添加到`theStash`集合的頭部(遍歷或者迭代時,從尾向頭遍歷)
  def stash(): Unit = {
    val currMsg = actorCell.currentMessage
    if (theStash.nonEmpty && (currMsg eq theStash.last))
      throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
    if (capacity <= 0 || theStash.size < capacity) 
      theStash = theStash :+ currMsg 
    else throw new StashOverflowException

  private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
    theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
  def unstashAll(): Unit = {
    try {
      val i = theStash.reverseIterator 
      while (i.hasNext) enqueueFirst(i.next())
    } finally {
      theStash = Vector.empty[Envelope]
  private def enqueueFirst(envelope: Envelope): Unit = {
    mailbox.enqueueFirst(self, envelope)
    envelope.message match {
      case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref)
      case _               ⇒

    theStash = theStash :+ currMsgtheStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)val i = theStash.reverseIterator這幾句關於集合操做與迭代的代碼確保了:這些被釋放出來的消息依然按照它們被接收到順序依次處理,先來後到規則要遵照。函數
