Akka是JVM平臺上構建高併發、分佈式以及高度容錯應用的工具包,其基於Actor模型實現了m:n的線程模式(m大於n,m是actor實例的個數,n是線程數量)。akka程序中每一個actor實例都扮演一種角色或者實現某一個功能,每個actor實例都有對應的消息郵箱,actor從本身的郵箱中消費消息,actor之間經過向對方的郵箱發送消息互相交流。經常遇到這樣一種情形:某個actor須要在A和B兩種狀態下切換,以及分別在每種狀態下接收各自對應的信息Messsage-A
類型和Messsage-B
類型的消息實例。因爲當處於A狀態時,仍然會收到本該處於B狀態時處理的Messsage-B
類型的消息實例;當處於B狀態時,亦是如此。
解決思路是把當前沒法處理的消息暫存(stash
),在切換對應的狀態前把全部的暫存的消息添加到消息隊列最前方,這些被釋放出來的消息仍然按照它們被接收到順序依次處理。node
實現一個akka集羣監控功能(AkkaClusterHealthActor
),等待http請求,在響應請求時向集羣中全部其餘節點請求健康狀態,等待全部節點所有返回健康信息,彙總信息後,再完成http請求,最後回到最初狀態:等待http請求。併發
以上問題能夠大體分爲兩種狀態:等待http請求、等待其餘節點的健康狀態,可是在等待其餘節點健康狀態時,仍會收到http請求,須要暫存http請求,稍後釋放作進一步處理。tcp
object SomeApp { ..... //程序初始化 ActorSystem、ExecutorService .... val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") ... def main(args: Array[String]): Unit = { ... val router = path("healthState"){ //akka-http dsl get{ imperativelyComplete { httpCtx => clusterhealthActor ! httpCtx //向該actor發送http請求實例 } } } ... } }
class AkkaClusterHealthActor extends Actor with Stash { val cluster = Cluster(context.system) //獲取集羣狀態信息,包括節點的列表、地址、狀態等等 override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化時 override def receive: Receive = { case _ => } def waitingForHttpRequest: Receive = { case httpCtx: ImperativeRequestContext => //等待http請求 val members = cluster.state.members.toList //包含自身節點 members.foreach { member => context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //請求集羣的節點健康狀態 ,member.address akka.tcp://itoa@127.0.0.1:2551 } context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部變量的產生,members.length 須要等待幾個節點的返回,有可能超時 context.setReceiveTimeout(3 seconds) } def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集羣個數 用於接收到的信息個數 case nodeStateResult: StateResult => (memberNum - 1) match { case 0 => //完成任務 取消超時設置 context.setReceiveTimeout(Duration.Inf) ctx.complete(nodeStateResult) context.become(waitingForHttpRequest) unstashAll() //完成該請求,處理其餘請求 case _ => //仍有部分節點沒有返回健康狀態結果,繼續等待 context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //這裏的代碼有點相似於 `尾遞歸` } case ReceiveTimeout => context.setReceiveTimeout(Duration.Inf) //完成任務 取消超時設置 ctx.complete(results) //返回部分結果,完成http請求 context.become(waitingForHttpRequest) unstashAll() //完成該次請求,釋放暫存的http請求,處理其餘請求 case _ => stash() //處理某個請求中,可是收到了其餘http請求,暫存消息 } }
以上代碼大體實現了具體邏輯,actor在waitingForHttpRequest
和waitingForAkkaNode
兩種狀態之間來回切換,讓咱們來看看akka中關於該功能的源碼。分佈式
... trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]
重啓以前(並非重啓以後)應當釋放全部暫存的消息,一樣的,並且須要在關閉以後釋放暫存的消息。ide
... trait UnrestrictedStash extends Actor with StashSupport { override def preRestart(reason: Throwable, message: Option[Any]): Unit = { try unstashAll() finally super.preRestart(reason, message) } override def postStop(): Unit = try unstashAll() finally super.postStop() }
private[akka] trait StashSupport { private[akka] def context: ActorContext private[akka] def self: ActorRef //基於`scala.collection.immutable.Vector`,當存儲的消息數量很大時,也能夠得到很好的性能 private var theStash = Vector.empty[Envelope] // ActorContext是ActorCell的子類 private def actorCell = context.asInstanceOf[ActorCell] //可爲暫存的消息設置數量限制,經過`stash-capacity`配置,默認爲-1,即容量不限 private val capacity: Int = context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox) private[akka] val mailbox: DequeBasedMessageQueueSemantics = { actorCell.mailbox.messageQueue match { case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException } } //1. 獲取當前的消息,不能將同一個消息存兩次(不能調用兩次`stash()`函數,`eq`用於比較對象之間的內存地址) //2. capacity默認-1,存儲容量不限 //3. 暫存的消息被添加到`theStash`集合的頭部(遍歷或者迭代時,從尾向頭遍歷) def stash(): Unit = { val currMsg = actorCell.currentMessage if (theStash.nonEmpty && (currMsg eq theStash.last)) throw new IllegalStateException(s"Can't stash the same message $currMsg more than once") if (capacity <= 0 || theStash.size < capacity) theStash = theStash :+ currMsg else throw new StashOverflowException } //當`others`很小時,而`theStash`很大,該方法很高效 //從頭向尾依次將`others`中元素添加到`theStash`尾 private[akka] def prepend(others: immutable.Seq[Envelope]): Unit = theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s) ... //`reverseIterator`反序輸出`theStash`中的每一封郵件,依次添加到郵箱的頭部 def unstashAll(): Unit = { try { val i = theStash.reverseIterator while (i.hasNext) enqueueFirst(i.next()) } finally { theStash = Vector.empty[Envelope] } } ... //把該封信放在郵箱的第一個位置 private def enqueueFirst(envelope: Envelope): Unit = { mailbox.enqueueFirst(self, envelope) envelope.message match { case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref) case _ ⇒ } } }
基於scala.collection.immutable.Vector
的實現,在存儲數量很大時,仍然能夠得到很好的性能。
同一個消息不能被暫存兩次,不然程序拋出IllegalStateException
兩次。
theStash = theStash :+ currMsg
、theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
和val i = theStash.reverseIterator
這幾句關於集合操做與迭代的代碼確保了:這些被釋放出來的消息依然按照它們被接收到順序依次處理,先來後到
規則要遵照。函數