一旦 shard coordinator(至關於分佈式系統的 zookeeper) 啓動,它就會啓動一個定時器,每隔必定的時間嘗試平衡一下集羣中各個節點的負載,平衡的辦法是把那些負載較重的 actor 移動到負載較輕的節點上。在這一點上,我之前的理解有誤,我覺得 shardRegion 是移動的最小單位。react
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
當 coordinator 收到 ReblanceTick 後,就開始嘗試平衡系統負載負載均衡
case RebalanceTick ⇒ if (persistentState.regions.nonEmpty) { val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress) shardsFuture.value match { case Some(Success(shards)) ⇒ continueRebalance(shards) case _ ⇒ // continue when future is completed shardsFuture.map { shards ⇒ RebalanceResult(shards) }.recover { case _ ⇒ RebalanceResult(Set.empty) }.pipeTo(self) } }
上面的邏輯我看懂了,可是 Future 的用法沒看明白。按照通常的寫法,當 shardsFuture 返回 Failure 之後,應該直接執行 RebalanceResut(Set.empty).pipeTo(self),不知道爲何失敗之後還要嘗試等待 Future分佈式
allocationStrategy 提供了默認的實現,也能夠自定義負載均衡策略。rebalance 函數返回的是 Set(ShardId),即那些要被移動的 shards函數
當 coordinator 收到 RebalanceResult 後,開始 啓動 balance 邏輯spa
def continueRebalance(shards: Set[ShardId]): Unit = shards.foreach { shard ⇒ if (!rebalanceInProgress(shard)) { persistentState.shards.get(shard) match { case Some(rebalanceFromRegion) ⇒ rebalanceInProgress += shard log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, persistentState.regions.keySet ++ persistentState.regionProxies) .withDispatcher(context.props.dispatcher)) case None ⇒ log.debug("Rebalance of non-existing shard [{}] is ignored", shard) } } }
rebalanceInProcess 是一個 Set,記錄正在被移動的 shard,我想,在新一輪 balance 開始時, rebalanceInProcess 爲空的狀況只會發生在上次 balance 尚未作完。不知道這個時候,是應該報錯仍是繼續 balance 更好,由於 balanceStrategy 應該不會考慮吧到 上一輪 balance 還沒作完這種可能性。debug
而後, coordinator 啓動 rebalanceWorker,也就是上篇提到的替身 actor。code
private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration, regions: Set[ActorRef]) extends Actor { import Internal._ regions.foreach(_ ! BeginHandOff(shard)) var remaining = regions import context.dispatcher context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout) def receive = { case BeginHandOffAck(`shard`) ⇒ remaining -= sender() if (remaining.isEmpty) { from ! HandOff(shard) context.become(stoppingShard, discardOld = true) } case ReceiveTimeout ⇒ done(ok = false) } def stoppingShard: Receive = { case ShardStopped(shard) ⇒ done(ok = true) case ReceiveTimeout ⇒ done(ok = false) } def done(ok: Boolean): Unit = { context.parent ! RebalanceDone(shard, ok) context.stop(self) } }
akka 的邏輯是基於消息傳遞的,這種代碼實際上是很難去讀的。在 rebalanceWorker 運行時,牽扯到不少個 actor。首先是,coordinator,其次是 shardRegion,也就是 host 待遷移 shard actor 的那個 region,而後是 shard actor 自己,最後是系統裏全部的 shardRegion,他們也要參與進來。寫到這裏,我不由把電腦屏幕豎了起來。blog
1. RebalanceWorker 首先給全部的 ShardRegion BeginHandOff 消息,告訴你們,hand off 開始,而後等待你們的回覆ip
2. ShardRegion 收到 BeginHandOff 後,開始更新本身的知識庫,將 HostShardRegion 和 shardActor 的記憶從本身的知識庫中抹去utf-8
case BeginHandOff(shard) ⇒ log.debug("BeginHandOff shard [{}]", shard) if (regionByShard.contains(shard)) { val regionRef = regionByShard(shard) val updatedShards = regions(regionRef) - shard if (updatedShards.isEmpty) regions -= regionRef else regions = regions.updated(regionRef, updatedShards) regionByShard -= shard } sender() ! BeginHandOffAck(shard)
最後,發送 BeginHandOffAck 消息,告訴 rebalanceWorker 本身準備完畢(這些 shardRegion 之後也沒事幹了)
3. 繼續回到 rebalanceWorker,它發送 HandOff 告訴 Host shard actor 的 ShardRegion,你能夠作本身的清理工做了。而後將本身的狀態設置成 stoppingShard,等待 ShardStopped 消息,這個消息的來源有兩個,一個是 HostShardRegion,另一個是 shard actor
4. HostShardRegion 收到 HandOff 消息後
case msg @ HandOff(shard) ⇒ log.debug("HandOff shard [{}]", shard) // must drop requests that came in between the BeginHandOff and now, // because they might be forwarded from other regions and there // is a risk or message re-ordering otherwise if (shardBuffers.contains(shard)) { shardBuffers -= shard loggedFullBufferWarning = false } if (shards.contains(shard)) { handingOff += shards(shard) shards(shard) forward msg } else sender() ! ShardStopped(shard)
若是 HostShardRegion 已經再也不含有 shard actor,那麼直接返回 ShardStopped,不然 HandOff 這個 Set 加入 shard actor,並將 HandOff 傳給 shard actor
5. 又看了一遍代碼,發現 shard actor 和 entity actor 又是兩種東西,shard actor 存在於 entity actor 和 shard region 之間
目前還不知道 entity actor 和 shard region 之間的關係
def getEntity(id: EntityId): ActorRef = { val name = URLEncoder.encode(id, "utf-8") context.child(name).getOrElse { log.debug("Starting entity [{}] in shard [{}]", id, shardId) val a = context.watch(context.actorOf(entityProps, name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) state = state.copy(state.entities + id) a } }
從這段代碼來看, shard actor 與 entity actor 是一對多的關係。
def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { case HandOff(`shardId`) ⇒ handOff(sender()) case HandOff(shard) ⇒ log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard) case _ ⇒ unhandled(msg) } def handOff(replyTo: ActorRef): Unit = handOffStopper match { case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId) case None ⇒ log.debug("HandOff shard [{}]", shardId) if (state.entities.nonEmpty) { handOffStopper = Some(context.watch(context.actorOf( handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage)))) //During hand off we only care about watching for termination of the hand off stopper context become { case Terminated(ref) ⇒ receiveTerminated(ref) } } else { replyTo ! ShardStopped(shardId) context stop self } }
def receiveTerminated(ref: ActorRef): Unit = {
if (handOffStopper.exists(_ == ref))
context stop self
else if (idByRef.contains(ref) && handOffStopper.isEmpty)
entityTerminated(ref)
}
從這段代碼看, shard actor 與 entity actor 的關係是一對一,由於當 entity stop self 了之後, shard actor 也會 stop self。這讓我想到 coursera reactive programming 的最後一道做業題,爲何也是相似於 一個 entity 有一個 shard actor 對應。