Akka做爲一種成熟的生產環境併發解決方案,必須擁有一套完善的錯誤異常處理機制,本文主要講講Akka中的監管和容錯。html
看過我上篇文章的同窗應該對Actor系統的工做流程有了必定的瞭解Akka系列(二):Akka中的Actor系統,它的很重要的概念就是分而治之,既然咱們把任務分配給Actor去執行,那麼咱們必須去監管相應的Actor,當Actor出現了失敗,好比系統環境錯誤,各類異常,能根據咱們制定的相應監管策略進行錯誤恢復,就是後面咱們會說到的容錯。git
既然有監管這一事件,那必然存在着監管者這麼一個角色,那麼在ActorSystem中是如何肯定這種角色的呢?github
咱們先來看下ActorSystem中的頂級監管者:併發
一個actor系統在其建立過程當中至少要啓動三個actor,如上圖所示,下面來講說這三個Actor的功能:app
/
: 根監管者顧名思義,它是一個老大,它監管着ActorSystem中全部的頂級Actor,頂級Actor有如下幾種:ide
/user
: 是全部由用戶建立的頂級actor的監管者;用ActorSystem.actorOf建立的actor在其下。測試
/system
: 是全部由系統建立的頂級actor的監管者,如日誌監聽器,或由配置指定在actor系統啓動時自動部署的actor。this
/deadLetters
: 是死信actor,全部發往已經終止或不存在的actor的消息會被重定向到這裏。spa
/temp
:是全部系統建立的短時actor的監管者,例如那些在ActorRef.ask的實現中用到的actor。scala
/remote
: 是一我的造虛擬路徑,用來存放全部其監管者是遠程actor引用的actor。
跟咱們日常打交道最多的就是/user
,它是咱們在程序中用ActorSystem.actorOf建立的actor的監管者,下面的容錯咱們重點關心的就是它下面的失敗處理,其餘幾種頂級Actor具體功能定義已經給出,有興趣的也能夠去了解一下。
根監管者監管着全部頂級Actor,對它們的各類失敗狀況進行處理,通常來講若是錯誤要上升到根監管者,整個系統就會中止。
/user
: 頂級actor監管者上面已經講過/user
是全部由用戶建立的頂級actor的監管者,即用ActorSystem.actorOf建立的actor,咱們能夠本身制定相應的監管策略,但因爲它是actor系統啓動時就產生的,因此咱們須要在相應的配置文件裏配置,具體的配置能夠參考這裏Akka配置
/system
: 系統監管者/system
全部由系統建立的頂級actor的監管者,好比Akka中的日誌監聽器,由於在Akka中日誌自己也是用Actor實現的,/system
的監管策略以下:對收到的除ActorInitializationException
和ActorKilledException
以外的全部Exception
無限地執行重啓,固然這也會終止其全部子actor。全部其餘Throwable被上升到根監管者,而後整個actor系統將會關閉。
用戶建立的普通actor的監管:
上一篇文章介紹了Actor系統的組織結構,它是一種樹形結構,其實這種結構對actor的監管是很是有利的,Akka實現的是一種叫「父監管」的形式,每個被建立的actor都由其父親所監管,這種限制使得actor的監管結構隱式符合其樹形結構,因此咱們能夠得出一個結論:
一個被建立的Actor確定是一個被監管者,也多是一個監管者,它監管着它的子級Actor
上面咱們對ActorSystem中的監管角色有了必定的瞭解,那麼究竟是如何制定相應的監管策略呢?Akka中有如下4種策略:
恢復下屬,保持下屬當前積累的內部狀態
重啓下屬,清除下屬的內部狀態
永久地中止下屬
升級失敗(沿監管樹向上傳遞失敗),由此失敗本身
這其實很好理解,下面是一個簡單例子:
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume //恢復 case _: NullPointerException => Restart //重啓 case _: IllegalArgumentException => Stop //中止 case _: Exception => Escalate //向上級傳遞 }
咱們能夠根據異常的不一樣使用不一樣監管策略,在後面我會具體給出一個示例程序幫助你們理解。咱們在實現本身的策略時,須要複寫Actor中的supervisorStrategy
,由於Actor的默認監管策略以下:
final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart }
它對除了它指定的異常進行中止,其餘異常都是對下屬進行重啓。
Akka中有兩種類型的監管策略:OneForOneStrategy
和AllForOneStrategy
,它們的主要區別在於:
OneForOneStrategy
: 該策略只會應用到發生故障的子actor上。
AllForOneStrategy
: 該策略會應用到全部的子actor上。
咱們通常都使用OneForOneStrategy
來進行制定相關監管策略,固然你也能夠根據具體需求選擇合適的策略。另外咱們能夠給咱們的策略配置相應參數,好比上面maxNrOfRetries,withinTimeRange等,這裏的含義是每分鐘最多進行10次重啓,若超出這個界限相應的Actor將會被中止,固然你也可使用策略的默認配置,具體配置信息能夠參考源碼。
本示例主要演示Actor在發生錯誤時,它的監管者會根據相應的監管策略進行不一樣的處理。源碼連接
由於這個例子比較簡單,這裏我直接貼上相應代碼,後面根據具體的測試用例來解釋各類監管策略所進行的響應:
class Supervisor extends Actor { //監管下屬,根據下屬拋出的異常進行相應的處理 override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate } var childIndex = 0 //用於標示下屬Actor的序號 def receive = { case p: Props => childIndex += 1 //返回一個Child Actor的引用,因此Supervisor Actor是Child Actor的監管者 sender() ! context.actorOf(p,s"child${childIndex}") } } class Child extends Actor { val log = Logging(context.system, this) var state = 0 def receive = { case ex: Exception => throw ex //拋出相應的異常 case x: Int => state = x //改變自身狀態 case s: Command if s.content == "get" => log.info(s"the ${s.self} state is ${state}") sender() ! state //返回自身狀態 } } case class Command( //相應命令 content: String, self: String )
如今咱們來看看具體的測試用例:
首先咱們先構建一個測試環境:
class GuardianSpec(_system: ActorSystem) extends TestKit(_system) with WordSpecLike with Matchers with ImplicitSender { def this() = this(ActorSystem("GuardianSpec")) "A supervisor" must { "apply the chosen strategy for its child" in { code here... val supervisor = system.actorOf(Props[Supervisor], "supervisor") //建立一個監管者 supervisor ! Props[Child] val child = expectMsgType[ActorRef] // 從 TestKit 的 testActor 中獲取迴應 } } }
1.TestOne:正常運行
child ! 50 // 將狀態設爲 50 child ! Command("get",child.path.name) expectMsg(50)
正常運行,測試經過。
2.TestTwo:拋出ArithmeticException
child ! new ArithmeticException // crash it child ! Command("get",child.path.name) expectMsg(50)
你們猜這時候測試會經過嗎?答案是經過,緣由是根據咱們制定的監管策略,監管者在面對子級Actor拋出ArithmeticException
異常時,它會去恢復相應出異常的Actor,並保持該Actor的狀態,因此此時Actor的狀態值仍是50,測試經過。
3.TestThree:拋出NullPointerException
child ! new NullPointerException // crash it harder child ! "get" expectMsg(50)
這種狀況下測試還會經過嗎?答案是不經過,緣由是根據咱們制定的監管策略,監管者在面對子級Actor拋出NullPointerException
異常時,它會去重啓相應出異常的Actor,其狀態會被清除,因此此時Actor的狀態值應該是0,測試不經過。
4.TestFour:拋出IllegalArgumentException
supervisor ! Props[Child] // create new child val child2 = expectMsgType[ActorRef] child2 ! 100 // 將狀態設爲 100 watch(child) // have testActor watch 「child」 child ! new IllegalArgumentException // break it expectMsgPF() { case Terminated(`child`) => (println("the child stop")) } child2 ! Command("get",child2.path.name) expectMsg(100)
這裏首先咱們又建立了一個Child Actor爲child2,並將它的狀態置爲100,這裏咱們監控前面建立的child1,而後給其發送一個IllegalArgumentException
的消息,讓其拋出該異常,測試結果:
the child stop 測試經過
從結果中咱們能夠看出,child在拋出IllegalArgumentException
後,會被其監管着中止,但監管者下的其餘Actor仍是正常工做。
5.TestFive:拋出一個自定義異常
watch(child2) child2 ! Command("get",child2.path.name) // verify it is alive expectMsg(100) supervisor ! Props[Child] // create new child val child3 = expectMsgType[ActorRef] child2 ! new Exception("CRASH") // escalate failure expectMsgPF() { case t @ Terminated(`child2`) if t.existenceConfirmed => ( println("the child2 stop") ) } child3 ! Command("get",child3.path.name) expectMsg(0)
這裏首先咱們又建立了一個Child Actor爲child3,這裏咱們監控前面建立的child2,而後給其發送一個Exception("CRASH")
的消息,讓其拋出該異常,測試結果:
the child2 stop 測試不經過
不少人可能會疑惑爲何TestFour能夠經過,這裏就通不過不了呢?由於這裏錯誤Actor拋出的異常其監管者沒法處理,只能將失敗上溯傳遞,而頂級actor的缺省策略是對全部的Exception狀況(ActorInitializationException和ActorKilledException例外)進行重啓. 因爲缺省的重啓指令會中止全部的子actor,因此咱們這裏的child3也會被中止。致使測試不經過。固然這裏你也能夠複寫默認的重啓方法,好比:
override def preRestart(cause: Throwable, msg: Option[Any]) {}
這樣重啓相應Actor時就不會中止其子級下的全部Actor了。
本文主要介紹了Actor系統中的監管和容錯,這一部份內容在Akka中也是很重要的,它與Actor的樹形組織結構巧妙結合,本文大量參考了Akka官方文檔的相應章節,有興趣的同窗能夠點擊這裏Akka docs。也能夠下載個人示例程序,裏面包含了一個官方的提供的容錯示例。