Akka基於Actor模型,提供了一個用於構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。本文基本上是基於Akka的官方文檔(版本是2.3.12),經過本身的理解,來闡述Akka提供的一些組件或概念,另外總結了Akka的一些使用場景。 html
Actor 編程
維基百科這樣定義Actor模型: 後端
在計算科學領域,Actor模型是一個並行計算(Concurrent Computation)模型,它把actor做爲並行計算的基本元素來對待:爲響應一個接收到的消息,一個actor可以本身作出一些決策,如建立更多的actor,或發送更多的消息,或者肯定如何去響應接收到的下一個消息。 設計模式
Actor是Akka中最核心的概念,它是一個封裝了狀態和行爲的對象,Actor之間能夠經過交換消息的方式進行通訊,每一個Actor都有本身的收件箱(Mailbox)。
經過Actor可以簡化鎖及線程管理,能夠很是容易地開發出正確地併發程序和並行系統,Actor具備以下特性: 安全
- 提供了一種高級抽象,可以簡化在併發(Concurrency)/並行(Parallelism)應用場景下的編程開發
- 提供了異步非阻塞的、高性能的事件驅動編程模型
- 超級輕量級事件處理(每GB堆內存幾百萬Actor)
實現一個Actor,能夠繼承特質akka.actor.Actor,實現一個receive方法,應該在receive方法中定義一系列的case語句,基於標準Scala的模式匹配方法,來實現每一種消息的處理邏輯。
咱們先看一下Akka中特質Actor的定義: 網絡
05 |
type Receive = Actor.Receive |
07 |
implicit val context: ActorContext = { |
08 |
val contextStack = ActorCell.contextStack.get |
09 |
if ((contextStack.isEmpty) || (contextStack.head eq null)) |
10 |
throw ActorInitializationException( |
11 |
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + |
12 |
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.") |
13 |
val c = contextStack.head |
14 |
ActorCell.contextStack.set(null :: contextStack) |
18 |
implicit final val self = context.self //MUST BE A VAL, TRUST ME |
20 |
final def sender(): ActorRef = context.sender() |
22 |
def receive: Actor.Receive // 這個是在子類中必定要實現的抽象方法 |
24 |
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit =receive.applyOrElse(msg, unhandled) |
26 |
protected[akka] def aroundPreStart(): Unit = preStart() |
28 |
protected[akka] def aroundPostStop(): Unit = postStop() |
30 |
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]):Unit = preRestart(reason, message) |
32 |
protected[akka] def aroundPostRestart(reason: Throwable): Unit =postRestart(reason) |
34 |
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy |
36 |
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest |
37 |
def preStart(): Unit = () // 啓動Actor以前須要執行的操做,默認爲空實現,能夠重寫該方法 |
39 |
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest |
40 |
def postStop(): Unit = () // 終止Actor以前須要執行的操做,默認爲空實現,能夠重寫該方法 |
42 |
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest |
43 |
def preRestart(reason: Throwable, message: Option[Any]): Unit = { // 重啓Actor以前須要執行的操做,默認終止該Actor所監督的全部子Actor,而後調用postStop()方法,能夠重寫該方法 |
44 |
context.children foreach { child ⇒ |
45 |
context.unwatch(child) |
51 |
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest |
52 |
def postRestart(reason: Throwable): Unit = { // 重啓Actor以前須要執行的操做,默認執行preStart()的實現邏輯,能夠重寫該方法 |
56 |
def unhandled(message: Any): Unit = { |
58 |
case Terminated(dead) ⇒ throw new DeathPactException(dead) |
59 |
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self)) |
上面特質中提供了幾個Hook,具體說明能夠看代碼中註釋,咱們能夠在繼承該特質時重寫Hook方法,實現本身的處理邏輯。
一個Actor是有生命週期(Lifecycle)的,以下圖所示:
經過上圖咱們能夠看到,一除了/system路徑下面的Actor外,一個Actor初始時路徑爲空,調用ActorSystem的actorOf方法建立一個Actor實例,返回一個引用ActorRef,它包括一個UID和一個Path,標識了一個Actor,能夠經過該引用向該Actor實例發送消息。 架構
ActorSystem 併發
在Akka中,一個ActorSystem是一個重量級的結構,他須要分配多個線程,因此在實際應用中,按照邏輯劃分的每一個應用對應一個ActorSystem實例。
一個ActorSystem是具備分層結構(Hierarchical Structure)的:一個Actor可以管理(Oversee)某個特定的函數,他可能但願將一個task分解爲更小的多個子task,這樣它就須要建立多個子Actor(Child Actors),並監督這些子Actor處理任務的進度等詳細狀況,實際上這個Actor建立了一個Supervisor來監督管理子Actor執行拆分後的多個子task,若是一個子Actor執行子task失敗,那麼就要向Supervisor發送一個消息說明處理子task失敗。須要知道的是,一個Actor能且僅能有一個Supervisor,就是建立它的那個Actor。基於被監控任務的性質和失敗的性質,一個Supervisor能夠選擇執行以下操做選擇: app
- 從新開始(Resume)一個子Actor,保持它內部的狀態
- 重啓一個子Actor,清除它內部的狀態
- 終止一個子Actor
- 擴大失敗的影響,從而使這個子Actor失敗
將一個Actor以一個監督層次結構視圖來看是很是重要的,由於它詮釋了上面第4種操做選擇的存在性,並且對前3種操做選擇也有影響:從新開始(Resume)一個Actor,則該Actor的全部子Actor都繼續工做;重啓一個Actor,則該Actor的全部子Actor都被從新啓動;終止一個Actor,則該Actor的全部子Actor都被終止。另外,一個Actor的preRestart方法的默認行爲是終止全部子Actor,若是咱們不想這樣,能夠在繼承Actor的實現中重寫preRestart方法的邏輯。
一個ActorSystem在建立過程當中,至少啓動3個Actor,以下圖所示:
上圖是一個相似樹狀層次結構,ActorSystem的Top-Level層次結構,與Actor關聯起來,稱爲Actor路徑(Actor Path),不一樣的路徑表明了不一樣的監督範圍(Supervision Scope)。下面說明ActorSystem的監督範圍: 負載均衡
- 「/」路徑:經過根路徑能夠搜索到全部的Actor
- 「/user」路徑:用戶建立的Top-Level Actor在該路徑下面,經過調用ActorSystem.actorOf來實現Actor的建立
- 「/system」路徑:系統建立的Top-Level Actor在該路徑下面
- 「/deadLetters」路徑:消息被髮送到已經終止,或者不存在的Actor,這些Actor都在該路徑下面
- 「/temp」路徑:被系統臨時建立的Actor在該路徑下面
- 「/remote」路徑:改路徑下存在的Actor,它們的Supervisor都是遠程Actor的引用
TypedActor
TypedActor是Akka基於Active對象(Active Object)設計模式的一個實現,關於Active對象模式,能夠看維基百科的定義:
Active對象模式解耦了在一個對象上執行方法和調用方法的邏輯,執行方法和調用方法分別在各自的線程執行上下文中。該模式的目標是經過使用異步方法調用和一個調度器來處理請求,從而實現並行計算處理,該模式由6個元素組成:
- 一個Proxy對象,提供一個面向客戶端的接口和一組公共的方法
- 一個接口,定義了請求一個Active對象上的方法的集合
- 一個來自客戶端請求的列表
- 一個調度器,肯定下一次處理哪個請求
- Active對象上方法的實現
- 一個回掉或者變量,供客戶端接收請求被處理後的結果
經過前面對Actor的瞭解,咱們知道Actor更適用於在Akka的Actor系統之間來實現並行計算處理,而TypedActor適用於橋接Actor系統和非Actor系統。TypedActor是基於JDK的Proxy來實現的,與Actor不一樣的是,Actor一次處理一個消息,而TypedActor一次處理一個調用(Call)。關於更多關於TypedActor,能夠查看Akka文檔。
Cluster
Akka Cluster提供了一個容錯(Fault-Tolerant)、去中心化(Decentralized)、基於P2P的集羣服務,並且不會出現單點故障(SPOF, Single Point Of Failure)。Akka基於Gossip實現集羣服務,並且支持服務自動失敗檢測。
關於Gossip協議的說明,維基百科說明以下所示:
Gossip協議是點對點(Computer-to-Computer)通訊協議的一種,它受社交網絡種的流言傳播的特色所啓發。如今分佈式系統經常使用Gossip協議來解決其餘方式所沒法解決的問題,或者是因爲底層網絡的超大特殊結構,或者是由於Gossip方案是解決這類問題最有效的一種方式。
一個Akka集羣由一組成員節點組成,每一個成員節點經過hostname:port:uid來惟一標識,而且每一個成員節點之間是解耦合的(Decoupled)。一個Akka應用程序是一個分佈式應用程序,它具備一個Actor的集合S,而每一個節點上能夠啓動這個Akka應用S的集合的的一部分Actor,而沒必要是全集S。若是一個新的成員節點須要加入到Akka集羣,只須要在集羣中任意一個成員節點上執行Join命令便可。
Akka集羣中各個成員節點之間的狀態關係,以下圖所示:
Akka集羣中任何一個成員節點都有可能成爲集羣的Leader,這是基於Gossip收斂(Convergence)過程獲得的肯定性結果,沒有通過選舉的過程。Leader只是一種角色,在各輪Gossip收斂過程當中Leader是不斷變化的。Leader的職責是使成員節點進入/離開集羣。
一個成員節點開始於joining狀態,一旦全部其節點都看到了該新加入Akka集羣的節點,則Leader會設置這個節點的狀態爲up。
若是一個節點安全離開Akka集羣,可預期地它的狀態會變爲leaving狀態,當Leader看到該節點爲leaving狀態,會將其狀態修改成exiting,而後當全部節點看到該節點狀態爲exiting,則Leader將該節點移除,狀態修改成removed狀態。
若是一個節點處於unreachable狀態,基於Gossip協議Leader是沒法執行任何操做收斂(Convergence)到該節點的,因此unreachable狀態的節點的狀態是必須被改變的,它必須變成reachable狀態或者down狀態。若是該節點想再次加入到Akka集羣,它必須須要從新啓動,而且從新加入集羣(經由joining狀態)。
Remoting
Akka Remoting的設計目標是基於P2P風格的網絡通訊,因此它存在以下限制:
- 不支持NAT(Network Address Translation)
- 不支持負載均衡器(Load Balancers)
Akka提供了種方式來使用Remoting功能:
- 經過調用actorSelection方法搜索一個actor,該方法輸入的參數的模式爲:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
- 經過actorOf方法建立一個actor
下面看一下Remoting系統中故障恢復模型(Failure Recovery Model),以下圖所示:
上圖中,鏈接到一個遠程系統的過程當中,包括上面4種狀態:在進行任何通訊以前,系統處於Idle狀態;當第一次一個消息嘗試向遠程系統發送,或者當遠程系統鏈接過來,這時系統狀態變爲Active;當兩個系統通訊失敗,鏈接丟失,這時系統變爲Gated狀態;當系統通訊過程當中,因爲參與通訊的系統的狀態不一致致使系統沒法恢復,這時遠程系統變爲Quarantined狀態,只有從新啓動系統才能恢復,重啓後系統變爲Active狀態。
Persistence
Akka的持久性可以使得有狀態的Actor實例保存它的內部狀態,在Actor重啓後可以更快的進行恢復。須要強調的是,持久化的僅僅是Actor的內部狀態,而不是Actor當前的狀態,Actor內部狀態的變化會被一追加的方式存到到指定的存儲中,一旦追加完成存儲狀態,這些數據就不會被更新。有狀態的Actor經過重放(Replay)持久化的狀態來快速恢復,重建內部狀態。
Akka Persistence的架構有以下幾個要點:
它是一個持久的、有狀態的Actor,可以將持久化消息到一個日誌系統中。當一個PersistentActor重啓的時候,它可以重放記錄到日誌系統中的消息,從而基於這些消息來恢復一個Actor的內部狀態。
持久化視圖是一個持久的有狀態的Actor,它接收被記錄到一個PersistentActor中的消息,可是它自己並不記錄消息到日誌系統,而是經過複製一個PersistentActor的消息流,來更新本身內部狀態。
提供了一個消息至少傳遞一次(At-Lease-Once)的語義,在發送者和接收者所在的JVM崩潰的時候,將消息傳遞到目的地。
一個日誌系統存儲發送給一個PersistentActor的消息序列,能夠在應用程序中控制是否一個PersistentActor將消息序列記錄到日誌中。日誌系統是支持插件式的,默認狀況下,消息被記錄到本地文件系統中。
Akka Camel
Akka提供了一個模塊,可以與Apache Camel整合。Apache Camel是一個實現了EIP(Enterprise Integration Patterns)的整合框架,支持經過各類各樣的協議進行消息交換。因此Akka的Actor能夠經過Scala或Java API與其它系統進行通訊,協議好比HTTP、SOAP、TCP、FTP、SMTP、JMS。
Akka適用場景
Akka適用場景很是普遍,這裏根據一些已有的使用案例來總結一下,Akka可以在哪些應用場景下投入生產環境:
- 事務處理(Transaction Processing)
在線遊戲系統、金融/銀行系統、交易系統、投注系統、社交媒體系統、電信服務系統。
任何行業的任何類型的應用均可以使用,好比提供REST、SOAP等風格的服務,相似於一個服務總線,Akka支持縱向&橫向擴展,以及容錯/高可用(HA)的特性。
- 並行計算(Concurrency/Parallelism)
任何具備併發/並行計算需求的行業,基於JVM的應用均可以使用,如使用編程語言Scala、Java、Groovy、JRuby開發。
Master/Slave架構風格的計算系統、計算網格系統、MapReduce系統。
- 通訊Hub(Communications Hub)
電信系統、Web媒體系統、手機媒體系統。
- 復瑣事件流處理(Complex Event Stream Processing)
Akka自己提供的Actor就適合處理基於事件驅動的應用,因此能夠更加容易處理具備復瑣事件流的應用。
其它特性
Akka還支持不少其它特性,以下所示:
- 支持Future,能夠同步或異步地獲取發送消息的結果
- 支持基於事件的Dispatcher,將多個Actor與一個線程池綁定
- 支持消息路由,能夠提供不一樣的消息路由策略,如Akka支持以下策略:RoundRobinRoutingLogic、RandomRoutingLogic、SmallestMailboxRoutingLogic、BroadcastRoutingLogic、ScatterGatherFirstCompletedRoutingLogic、TailChoppingRoutingLogic、ConsistentHashingRoutingLogic
- 支持FSM,提供基於事件的狀態轉移
具體相關細節內容,能夠參考官網文檔。
參考連接