akka cluster sharding source code 學習 (1/5) 替身模式

爲了使一個項目支持集羣,本身學習使用了 akka cluster 並在項目中實施了,今後,生活就變得有些痛苦。再配上 apache 作反向代理和負載均衡,debug 起來不要太酸爽。直到如今,我還對 akka cluster 輸出的 log 不是很熟悉,目前網絡上 akka cluster 的信息還比較少,想深刻了解這東西的話,仍是要本身讀 source code。前幾天,雪球那幫人說 akka 不推薦使用,有不少坑,這給我提了個醒,目前我對 akka 的理解是遠遠不夠的,須要深刻學習。html

 

akka cluster sharding 是 akka 的一個 extension。12年左右,有人在 google group 中開始討論dedicated actor for each entity 這個概念,通過不少討論,最終由 Patrik Nordwall 實現,以 experimental 的形式加入到 akka contri 庫裏。我原本不知道有這麼一個東西,甚至想過本身實現一個這樣玩意。我並無爲 cluster sharding 作過 benchmark,也不知道該怎麼作,http://dcaoyuan.github.io/papers/rpi_cluster/benchmark.html 作了一個在樹莓派上的benchmark,單個節點1000 qps,很像學習下他的 benchmark 的代碼。git

 

第一篇,學習下 cluster sharding 中是如何使用替身模式的。首先,什麼是替身模式:一個 actor 收到 request 後可能會作一些比較複雜的操做,典型的操做好比,彙集操做。舉個例子,primary 節點 爲了知道各個 replica 節點的狀態,他會 ping 全部的 replica,收集他們的反饋,記錄他們的存活狀態,這種場景下,就比較適合新建立一個 actor,它專門作着一件事。這樣作有幾個優勢,首先,primary actor 能夠把這部分邏輯放到其餘 actor 中,不會搞亂本身自己的邏輯,其實 actor 僅有一個 receive 函數,case 寫的多了會很亂的。其次,把這種事情交給其餘 actor,這個 actor 即使因異常重啓,也不會對系統有太大影響,重作一遍便可。總之,替身模式,就是指建立一個替身actor來單獨作一件事。github

 

在 cluster sharding 中,有兩個邏輯使用了 替身模式,一個是 stop cluster。apache

 

/**
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of
* them have terminated it replies with `ShardStopped`.
*/
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any)
extends Actor {
import ShardCoordinator.Internal.ShardStopped

entities.foreach { a ⇒
context watch a
a ! stopMessage
}

var remaining = entities

def receive = {
case Terminated(ref) ⇒
remaining -= ref
if (remaining.isEmpty) {
replyTo ! ShardStopped(shard)
context stop self
}
}
}

 

第二個用法,也是用來 hande off 網絡

 

/** * INTERNAL API. Rebalancing process is performed by this actor. * It sends `BeginHandOff` to all `ShardRegion` actors followed by * `HandOff` to the `ShardRegion` responsible for the shard. * When the handoff is completed it sends [[RebalanceDone]] to its * parent `ShardCoordinator`. If the process takes longer than the * `handOffTimeout` it also sends [[RebalanceDone]]. */private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration,                                    regions: Set[ActorRef]) extends Actor {  import Internal._  regions.foreach(_ ! BeginHandOff(shard))  var remaining = regions  import context.dispatcher  context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout)  def receive = {    case BeginHandOffAck(`shard`) ⇒      remaining -= sender()      if (remaining.isEmpty) {        from ! HandOff(shard)        context.become(stoppingShard, discardOld = true)      }    case ReceiveTimeout ⇒ done(ok = false)  }  def stoppingShard: Receive = {    case ShardStopped(shard) ⇒ done(ok = true)    case ReceiveTimeout      ⇒ done(ok = false)  }  def done(ok: Boolean): Unit = {    context.parent ! RebalanceDone(shard, ok)    context.stop(self)  }}
相關文章
相關標籤/搜索