Akka中的Message Stash

    Akka是JVM平臺上構建高併發、分佈式以及高度容錯應用的工具包,其基於Actor模型實現了m:n的線程模式(m大於n,m是actor實例的個數,n是線程數量)。akka程序中每一個actor實例都扮演一種角色或者實現某一個功能,每個actor實例都有對應的消息郵箱,actor從本身的郵箱中消費消息,actor之間經過向對方的郵箱發送消息互相交流。經常遇到這樣一種情形:某個actor須要在A和B兩種狀態下切換,以及分別在每種狀態下接收各自對應的信息Messsage-A類型和Messsage-B類型的消息實例。因爲當處於A狀態時,仍然會收到本該處於B狀態時處理的Messsage-B類型的消息實例;當處於B狀態時,亦是如此。
    解決思路是把當前沒法處理的消息暫存(stash),在切換對應的狀態前把全部的暫存的消息添加到消息隊列最前方,這些被釋放出來的消息仍然按照它們被接收到順序依次處理。node

簡單應用

    實現一個akka集羣監控功能(AkkaClusterHealthActor),等待http請求,在響應請求時向集羣中全部其餘節點請求健康狀態,等待全部節點所有返回健康信息,彙總信息後,再完成http請求,最後回到最初狀態:等待http請求。併發

編碼分析

    以上問題能夠大體分爲兩種狀態:等待http請求、等待其餘節點的健康狀態,可是在等待其餘節點健康狀態時,仍會收到http請求,須要暫存http請求,稍後釋放作進一步處理。tcp

object SomeApp {
  ..... //程序初始化 ActorSystem、ExecutorService ....
  val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") 
  ...
  def main(args: Array[String]): Unit = {
    ...
    val router = 
      path("healthState"){ //akka-http dsl
        get{
          imperativelyComplete {
            httpCtx =>
              clusterhealthActor ! httpCtx //向該actor發送http請求實例
          }
        }
      }
    ...
  }
}
class AkkaClusterHealthActor extends Actor with Stash {

  val cluster = Cluster(context.system) //獲取集羣狀態信息,包括節點的列表、地址、狀態等等

  override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化時

  override def receive: Receive = {
    case _ =>
  }

  def waitingForHttpRequest: Receive = {
    case httpCtx: ImperativeRequestContext => //等待http請求
      val members = cluster.state.members.toList //包含自身節點
      members.foreach { member =>
        context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //請求集羣的節點健康狀態 ,member.address akka.tcp://itoa@127.0.0.1:2551
      }
      context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部變量的產生,members.length 須要等待幾個節點的返回,有可能超時
      context.setReceiveTimeout(3 seconds)
  }

  def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集羣個數 用於接收到的信息個數
    case nodeStateResult: StateResult =>
      (memberNum - 1) match {
        case 0 => //完成任務 取消超時設置
          context.setReceiveTimeout(Duration.Inf)
          ctx.complete(nodeStateResult)
          context.become(waitingForHttpRequest)
          unstashAll() //完成該請求,處理其餘請求
        case _ => //仍有部分節點沒有返回健康狀態結果,繼續等待
          context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //這裏的代碼有點相似於 `尾遞歸`
      }
    case ReceiveTimeout =>
      context.setReceiveTimeout(Duration.Inf) //完成任務 取消超時設置
      ctx.complete(results) //返回部分結果,完成http請求
      context.become(waitingForHttpRequest)
      unstashAll() //完成該次請求,釋放暫存的http請求,處理其餘請求
    case _ =>
      stash() //處理某個請求中,可是收到了其餘http請求,暫存消息
  }
}

    以上代碼大體實現了具體邏輯,actor在waitingForHttpRequestwaitingForAkkaNode兩種狀態之間來回切換,讓咱們來看看akka中關於該功能的源碼。分佈式

...
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]

    重啓以前(並非重啓以後)應當釋放全部暫存的消息,一樣的,並且須要在關閉以後釋放暫存的消息。ide

...
trait UnrestrictedStash extends Actor with StashSupport {
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    try unstashAll() finally super.preRestart(reason, message)
  }
  override def postStop(): Unit = try unstashAll() finally super.postStop()
}
private[akka] trait StashSupport {

  private[akka] def context: ActorContext

  private[akka] def self: ActorRef

  //基於`scala.collection.immutable.Vector`,當存儲的消息數量很大時,也能夠得到很好的性能
  private var theStash = Vector.empty[Envelope]

  // ActorContext是ActorCell的子類
  private def actorCell = context.asInstanceOf[ActorCell] 
  
  //可爲暫存的消息設置數量限制,經過`stash-capacity`配置,默認爲-1,即容量不限
  private val capacity: Int =
    context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox)

  private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
    actorCell.mailbox.messageQueue match {
      case queue: DequeBasedMessageQueueSemantics ⇒ queue
      case other ⇒ throw ActorInitializationException
    }
  }
  
  //1. 獲取當前的消息,不能將同一個消息存兩次(不能調用兩次`stash()`函數,`eq`用於比較對象之間的內存地址)
  //2. capacity默認-1,存儲容量不限
  //3. 暫存的消息被添加到`theStash`集合的頭部(遍歷或者迭代時,從尾向頭遍歷)
  def stash(): Unit = {
    val currMsg = actorCell.currentMessage
    if (theStash.nonEmpty && (currMsg eq theStash.last))
      throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
    if (capacity <= 0 || theStash.size < capacity) 
      theStash = theStash :+ currMsg 
    else throw new StashOverflowException
  }

   //當`others`很小時,而`theStash`很大,該方法很高效
   //從頭向尾依次將`others`中元素添加到`theStash`尾
  private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
    theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
    ...
  
  //`reverseIterator`反序輸出`theStash`中的每一封郵件,依次添加到郵箱的頭部
  def unstashAll(): Unit = {
    try {
      val i = theStash.reverseIterator 
      while (i.hasNext) enqueueFirst(i.next())
    } finally {
      theStash = Vector.empty[Envelope]
    }
  }
  ...
  
  //把該封信放在郵箱的第一個位置
  private def enqueueFirst(envelope: Envelope): Unit = {
    mailbox.enqueueFirst(self, envelope)
    envelope.message match {
      case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref)
      case _               ⇒
    }
  }
}

    基於scala.collection.immutable.Vector的實現,在存儲數量很大時,仍然能夠得到很好的性能。
    同一個消息不能被暫存兩次,不然程序拋出IllegalStateException兩次。
    theStash = theStash :+ currMsgtheStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)val i = theStash.reverseIterator這幾句關於集合操做與迭代的代碼確保了:這些被釋放出來的消息依然按照它們被接收到順序依次處理,先來後到規則要遵照。函數

相關文章
相關標籤/搜索