此次把這部份內容提到如今寫,是由於這段時間開發的項目恰好在這一塊遇到了一些難點,因此準備把經驗分享給你們,咱們在使用Akka時,會常常遇到一些存儲Actor內部狀態的場景,在系統正常運行的狀況下,咱們不須要擔憂什麼,可是當系統出錯,好比Actor錯誤須要重啓,或者內存溢出,亦或者整個系統崩潰,若是咱們不採起必定的方案的話,在系統重啓時Actor的狀態就會丟失,這會致使咱們丟失一些關鍵的數據,形成系統數據不一致的問題。Akka做爲一款成熟的生產環境應用,爲咱們提供了相應的解決方案就是Akka persistence。java
萬變不離其宗,數據的一致性是永恆的主題,一個性能再好的系統,不能保證數據的正確,也稱不上是一個好的系統,一個系統在運行的時候不免會出錯,如何保證系統在出錯後能正確的恢復數據,不讓數據出現混亂是一個難題。使用Actor模型的時候,咱們會有這麼一個想法,就是能不對數據庫操做就儘可能不對數據庫操做(這裏咱們假定咱們的數據庫是安全,可靠的,能保證數據的正確性和一致性,好比使用國內某雲的雲數據庫),一方面若是大量的數據操做會使數據庫面臨的巨大的壓力,致使崩潰,另外一方面即便數據庫能處理的過來,好比一些count,update的大表操做也會消耗不少的時間,遠沒有內存中直接操做來的快,大大影響性能。可是又有人說幾人內存操做這麼快,爲何不把數據都放內存中呢?答案顯而易見,當出現機器死機,或者內存溢出等問題時,數據頗有可能就丟失了致使沒法恢復。在這種背景下,咱們是否是有一種比較好的解決方案,既能知足需求又能用最小的性能消耗,答案就是上面咱們的說的Akka persistence。git
在具體深刻Akka persistence以前,咱們能夠先了解一下它的核心設計理念,其實簡單來講,咱們能夠利用一些thing來恢復Actor的狀態,這裏的thing能夠是日誌、數據庫中的數據,亦或者是文件,因此說它的本質很是容易理解,在Actor處理的時候咱們會保存一些數據,Actor在恢復的時候能根據這些數據恢復其自身的狀態。github
因此Akka persistence 有如下幾個關鍵部分組成:redis
PersistentActor:任何一個須要持久化的Actor都必須繼承它,並必須定義或者實現其中的三個關鍵屬性:sql
def persistenceId = "example" //做爲持久化Actor的惟一表示,用於持久化或者查詢時使用 def receiveCommand: Receive = ??? //Actor正常運行時處理處理消息邏輯,可在這部份內容裏持久化本身想要的消息 def receiveRecover: Receive = ??? //Actor重啓恢復是執行的邏輯
相比普通的Actor,除receiveCommand類似之外,還必須實現另外兩個屬性。
另外在持久化Actor中還有另外兩個關鍵的的概念就是Journal和Snapshot,前者用於持久化事件,後者用於保存Actor的快照,二者在Actor恢復狀態的時候都起到了相當重要的做用。數據庫
這裏我首先會用一個demo讓你們能對Akka persistence的使用有必定了解的,並能大體明白它的工做原理,後面再繼續講解一些實戰可能會遇到的問題。安全
假定如今有這麼一個場景,如今假設有一個1w元的大紅包,瞬間可能會不少人同時來搶,每一個人搶的金額也可能不同,場景很簡單,實現方式也有不少種,但前提是保證數據的正確性,好比最普通的使用數據庫保證,但對這方面有所瞭解的同窗都知道這並非一個很好的方案,由於須要鎖,並須要大量的數據庫操做,致使性能不高,那麼咱們是否能夠用Actor來實現這個需求麼?答案是固然能夠。架構
咱們首先來定義一個抽獎命令,app
case class LotteryCmd( userId: Long, // 參與用戶Id username: String, //參與用戶名 email: String // 參與用戶郵箱 )
而後咱們實現一個抽獎Actor,並繼承PersistentActor做出相應的實現:dom
case class LuckyEvent( //抽獎成功事件 userId: Long, luckyMoney: Int ) case class FailureEvent( //抽獎失敗事件 userId: Long, reason: String ) case class Lottery( totalAmount: Int, //紅包總金額 remainAmount: Int //剩餘紅包金額 ) { def update(luckyMoney: Int) = { copy( remainAmount = remainAmount - luckyMoney ) } } class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{ override def persistenceId: String = "lottery-actor-1" var state = initState //初始化Actor的狀態 override def receiveRecover: Receive = { case event: LuckyEvent => updateState(event) //恢復Actor時根據持久化的事件恢復Actor狀態 case SnapshotOffer(_, snapshot: Lottery) => log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}") state = snapshot //利用快照恢復Actor的狀態 case RecoveryCompleted => log.info("the actor recover completed") } def updateState(le: LuckyEvent) = state = state.update(le.luckyMoney) //更新自身狀態 override def receiveCommand: Receive = { case lc: LotteryCmd => doLottery(lc) match { //進行抽獎,並獲得抽獎結果,根據結果作出不一樣的處理 case le: LuckyEvent => //抽到隨機紅包 persist(le) { event => updateState(event) increaseEvtCountAndSnapshot() sender() ! event } case fe: FailureEvent => //紅包已經抽完 sender() ! fe } case "saveSnapshot" => // 接收存儲快照命令執行存儲快照操做 saveSnapshot(state) case SaveSnapshotSuccess(metadata) => ??? //你能夠在快照存儲成功後作一些操做,好比刪除以前的快照等 } private def increaseEvtCountAndSnapshot() = { val snapShotInterval = 5 if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { //當有持久化5個事件後咱們便存儲一次當前Actor狀態的快照 self ! "saveSnapshot" } } def doLottery(lc: LotteryCmd) = { //抽獎邏輯具體實現 if (state.remainAmount > 0) { val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1 LuckyEvent(lc.userId, luckyMoney) } else { FailureEvent(lc.userId, "下次早點來,紅包已被抽完咯!") } } }
程序很簡單,關鍵位置我也給了註釋,相信你們對Actor有所瞭解的話很容易理解,固然要是有些疑惑,能夠看看我以前寫的文章,下面咱們就對剛纔寫的抽紅包Actor進行測試:
object PersistenceTest extends App { val lottery = Lottery(10000,10000) val system = ActorSystem("example-05") val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1") //建立抽獎Actor val pool: ExecutorService = Executors.newFixedThreadPool(10) val r = (1 to 100).map(i => new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","xx@gmail.com")) //建立100個抽獎請求 ) r.map(pool.execute(_)) //使用線程池來發起抽獎請求,模擬同時多人蔘加 Thread.sleep(5000) pool.shutdown() system.terminate() } class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { //抽獎請求 implicit val timeout = Timeout(3.seconds) def run: Unit = { for { fut <- lotteryActor ? lotteryCmd } yield fut match { //根據不一樣事件顯示不一樣的抽獎結果 case le: LuckyEvent => println(s"恭喜用戶${le.userId}抽到了${le.luckyMoney}元紅包") case fe: FailureEvent => println(fe.reason) case _ => println("系統錯誤,請從新抽取") } } }
運行程序,咱們可能看到如下的結果:
下面我會把persistence actor在整個運行過程的步驟給出,幫助你們理解它的原理:
1.初始化Persistence Actor
1.1如果第一次初始化,則與正常的Actor的初始化一致。
1.2如果重啓恢復Actor,這根據Actor以前持久的數據恢復。
1.2.1從快照恢復,可快速恢復Actor,但並不是每次持久化事件都會保存快照,在快照完整的狀況下,Actor優先從快照恢復自身狀態。
1.2.2從事件(日誌,數據庫記錄等)恢復,經過重放持久化事件恢復Actor狀態,比較關鍵。
2.接收命令進行處理,轉化爲須要持久化的事件(持久化的事件儘可能只包含關鍵性的數據)使用Persistence Actor的持久化方法進行持久化(上述例子中的persist,後面我會講一下批量持久化),並處理持久化成功後的邏輯處理,好比修改Actor狀態,向外部Actor發送消息等。
3.如果咱們須要存儲快照,那麼能夠主動指定存儲快照的頻率,好比持久化事件100次咱們就存儲一次快照,這個頻率應該要考慮實際的業務場景,在存儲快照成功後咱們也能夠執行一些操做。
總的來講Persistence Actor運行時的大體操做就是以上這些,固然它是r如何持久化事件,恢復時的機制是怎麼樣的等有興趣的能夠看一下Akka源碼。
首先咱們必須加載相應的依賴包,在bulid.sbt
中加入如下依賴:
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.16", //Akka actor 核心依賴 "com.typesafe.akka" %% "akka-persistence" % "2.4.16", //Akka persistence 依賴 "org.iq80.leveldb" % "leveldb" % "0.7", //leveldb java版本依賴 "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", //leveldb java版本依賴 "com.twitter" %% "chill-akka" % "0.8.0" //事件序列化依賴 )
另外咱們還需在application.conf
加入如下配置:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.journal.leveldb.dir = "log/journal" akka.persistence.snapshot-store.local.dir = "log/snapshots" # DO NOT USE THIS IN PRODUCTION !!! # See also https://github.com/typesafehub/activator/issues/287 akka.persistence.journal.leveldb.native = false //由於咱們本地並無安裝leveldb,因此這個屬性置爲false,可是生產環境並不推薦使用 akka.actor.serializers { kryo = "com.twitter.chill.akka.AkkaSerializer" } akka.actor.serialization-bindings { "scala.Product" = kryo "akka.persistence.PersistentRepr" = kryo }
至此爲止咱們整個Akka persistence demo已經搭建好了,能夠正常運行了,有興趣的同窗能夠下載源碼。源碼連接
有同窗可能會問,我對leveldb不是很熟悉亦或者以爲單機存儲並非安全,有沒有支持分佈式數據存儲的插件呢,好比某爸的雲數據庫?答案固然是有咯,良心的我固然是幫大家都找好咯。
1.akka-persistence-sql-async: 支持MySQL和PostgreSQL,另外使用了全異步的數據庫驅動,提供異步非阻塞的API,我司用的就是它的變種版,6的飛起。項目地址
2.akka-persistence-cassandra: 官方推薦的插件,使用寫性能very very very fast的cassandra數據庫,是幾個插件中比較流行的一個,另外它還支持persistence query。項目地址
3.akka-persistence-redis: redis應該也很符合Akka persistence的場景,熟悉redis的同窗可使用看看。項目地址
4.akka-persistence-jdbc: 怎麼能少了jdbc呢?否則怎麼對的起java爸爸呢,支持scala和java哦。項目地址
相應的插件的具體使用能夠看該項目的具體介紹使用,我看了下相對來講都是比較容易的。
上面說到我司用的是akka-persistence-sql-async插件,因此咱們是將事件和快照持久化到數據庫的,一開始我也是像上面demo同樣,每次事件都會持久化到數據庫,可是後來在性能測試的時候,由於自己業務場景對數據庫的壓力也比較大,在當數據庫到達每秒1000+的讀寫量後,另外說明一下使用的是某雲數據庫,性能中配以上,發現每次持久化的時間將近要15ms,這樣換算一下的話Actor每秒只能處理60~70個須要持久化的事件,而實際業務場景要求Actor必須在3秒內返回處理結果,這種狀況下致使大量消息處理超時得不到反饋,另外還有大量的消息得不處處理,致使系統錯誤暴增,用戶體驗降低,既然咱們發現了問題,那麼咱們能不能進行優化呢?事實上固然是能夠,既然單個插入慢,那麼咱們能不能批量插入呢,Akka persistence爲咱們提供了persistAll方法,下面我就對上面的demo進行一下改造,讓其變成批量持久化:
class LotteryActorN(initState: Lottery) extends PersistentActor with ActorLogging{ override def persistenceId: String = "lottery-actor-2" var state = initState //初始化Actor的狀態 override def receiveRecover: Receive = { case event: LuckyEvent => updateState(event) //恢復Actor時根據持久化的事件恢復Actor狀態 case SnapshotOffer(_, snapshot: Lottery) => log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}") state = snapshot //利用快照恢復Actor的狀態 case RecoveryCompleted => log.info("the actor recover completed") } def updateState(le: LuckyEvent) = state = state.update(le.luckyMoney) //更新自身狀態 var lotteryQueue : ArrayBuffer[(LotteryCmd, ActorRef)] = ArrayBuffer() context.system.scheduler //定時器,定時觸發抽獎邏輯 .schedule( 0.milliseconds, 100.milliseconds, new Runnable { def run = { self ! "doLottery" } } ) override def receiveCommand: Receive = { case lc: LotteryCmd => lotteryQueue = lotteryQueue :+ (lc, sender()) //參與信息加入抽獎隊列 println(s"the lotteryQueue size is ${lotteryQueue.size}") if (lotteryQueue.size > 5) //當參與人數有5個時觸發抽獎 joinN(lotteryQueue) case "doLottery" => if (lotteryQueue.size > 0) joinN(lotteryQueue) case "saveSnapshot" => // 接收存儲快照命令執行存儲快照操做 saveSnapshot(state) case SaveSnapshotSuccess(metadata) => ??? //你能夠在快照存儲成功後作一些操做,好比刪除以前的快照等 } private def joinN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = { //批量處理抽獎結果 val rs = doLotteryN(lotteryQueue) val success = rs.collect { //獲得其中中獎的相應信息 case (event: LuckyEvent, ref: ActorRef) => event -> ref }.toMap val failure = rs.collect { //獲得其中未中獎的相應信息 case (event: FailureEvent, ref: ActorRef) => event -> ref } persistAll(success.keys.toIndexedSeq) { //批量持久化中獎用戶事件 case event => println(event) updateState(event) increaseEvtCountAndSnapshot() success(event) ! event } failure.foreach { case (event, ref) => ref ! event } this.lotteryQueue.clear() //清空參與隊列 } private def increaseEvtCountAndSnapshot() = { val snapShotInterval = 5 if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { //當有持久化5個事件後咱們便存儲一次當前Actor狀態的快照 self ! "saveSnapshot" } } private def doLotteryN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = { //抽獎邏輯具體實現 var remainAmount = state.remainAmount lotteryQueue.map(lq => if (remainAmount > 0) { val luckyMoney = scala.util.Random.nextInt(remainAmount) + 1 remainAmount = remainAmount - luckyMoney (LuckyEvent(lq._1.userId, luckyMoney),lq._2) } else { (FailureEvent(lq._1.userId, "下次早點來,紅包已被抽完咯!"),lq._2) } ) } }
這是改造後的參與Actor,實現了批量持久的功能,固然這裏爲了給發送者返回消息,處理邏輯稍微複雜了一點,不過真實場景可能會更復雜,相關源碼也在剛纔的項目上。
另外Akka Persistence還提供了Query接口,用於須要查詢持久化事件的需求,這部份內容可能要根據實際業務場景考慮是否須要應用,我就不展開講了,另外我也寫了一個小demo在項目中,想要嘗試的同窗也能夠試試。