在某項目裏,有個 actor 須要作一些持久化的操做,這些操做耗時比較久,理應使用異步的代碼來寫,可是需求又強調每次只能作一個持久化操做,後來的請求應該等待。一個顯然的作法是阻塞式的寫,這樣就能比較簡單的實現順序花操做。api
代碼寫完之後,我以爲在 actor 中 block 不夠完美,就想其餘的解決方案。實際上,藉助 akka actor 的一些函數,能夠實如今不阻塞的狀況下實現順序執行請求的功能的。這種辦法的核心是使用 become, unbecome 函數:actor 設置兩種狀態 free 和 busy,當 free 的時,處理消息,當 busy 時,暫時將消息存儲起來,處理消息後,給 actor 返回 done 指令,actor 的狀態從新返回到 free,準備處理下一個請求。具體的實現又有不少細節能夠考慮,好比當 busy 時到來的請求存儲到哪裏,是 stash 起來仍是在 actor 內部維護一個 queue。請求的處理邏輯是寫在 actor 內部,仍是借鑑 cameo 模式,再建立一個 actor。異步
網上已有一種實現,我看了下,以爲應該沒有問題。只不過 actor 內部維護了一個 queue,這可能會形成 actor 死亡後重啓數據丟失的狀況。更好的辦法應該是 cameo 模式建立新的 actor 來處理可能出現異常(危險)的工做,其次是把 actor 的 mailbox 當作那個 queue,不要本身維護,按照 doc 縮寫,actor 重啓後,mailbox 的消息不會丟失。函數
package actors import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import akka.actor.{ Actor, ActorRef } import play.api.libs.concurrent.Akka import play.api.Logger import play.api.Play.current trait SequentialActor { this: Actor => import SequentialActor._ // Actor defines type Receive as PartialFunction[Any, Unit] type ReceiveAsync = PartialFunction[Any, Future[_]] private val queue = scala.collection.mutable.Queue[Job]() private def enqueue(job: Job): Unit = queue enqueue job private def dequeue: Option[Job] = if (queue.isEmpty) None else Some(queue.dequeue) private var _senderAsync: ActorRef = _ def senderAsync = _senderAsync def receive: Receive = { case msg => context become busy process(Job(msg, sender)) } def busy: Receive = { case Done => dequeue match { case None => context.unbecome case Some(job) => process(job) } case msg => enqueue(Job(msg, sender)) } def process(job: Job) { _senderAsync = job.sender (receiveAsync orElse fallback)(job.msg).onComplete { _ => self ! Done } } def receiveAsync: ReceiveAsync def fallback: ReceiveAsync = { case msg => Logger.error(s"Unhandled message: $msg") Future.successful{ () } } } object SequentialActor { case object Done case class Job(msg: Any, sender: ActorRef) }