集羣(多臺節點機) —> 每臺節點機(1個片區) —> 每一個片區(多個分片) —> 每一個分片(多個實體)java
實體: 分片管理的 Actor
Shards :分片是統一管理的一組實體
ShardRegion : 片區,部署在每一個集羣節點上,對分片進行管理
ShardCoordinator : cluster-singleton 集羣單例, 決定分片屬於哪一個片區node
ShardRegion 在節點上啓動api
帶實體ID的消息--> 片區ShardRegion ,請求分片位置-->ShardCoordinator-->決定哪一個ShardRegion將擁有Shard-->tcp
ShardRegion 確認請求並建立 Shard supervisor 作爲子actor -->shard actor 建立 entity -->ShardRegion和Shard 定位entity分佈式
全部片區組成分佈式分片管理層,帶實體ID的消息直接發給本機片區,分片管理層路由消息, ShardRegion建立須要提供基於ShardRegion.MessageExtractor的實現 ,必須提供從消息抽取分片和實體ID的函數ide
package shard import akka.actor.AbstractActor import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.sharding.ClusterSharding import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion import akka.japi.Option import akka.japi.pf.ReceiveBuilder import com.typesafe.config.ConfigFactory import org.slf4j.LoggerFactory import java.io.Serializable import java.time.Clock.system import java.time.Clock.system import java.util.* import java.time.Clock.system /** * Created by: tankx * Date: 2019/7/16 * Description: 集羣分片示例 */ //分佈到集羣環境中 class DogActor : AbstractActor() { var log = LoggerFactory.getLogger(DogActor::class.java) override fun createReceive(): Receive { return ReceiveBuilder.create().matchAny(this::receive).build() } fun receive(obj: Any) { log.info("收到消息: $obj") if (obj is DogMsg) { log.info("${obj.id} ${obj.msg}") } } } //定義消息(必須帶有實體ID進行分片) data class DogMsg(var id: Int, var msg: String) : Serializable //分片規則 class ShardExtractor : ShardRegion.MessageExtractor { //提取實體ID,實體對應的actor override fun entityId(message: Any?): String { if (message is DogMsg) { return message.id.toString() } else { throw RuntimeException("沒法識別消息類型 $message") } } //根據實體ID,計算出對應分片ID override fun shardId(message: Any?): String { //var numberOfShards: Int = 10 //簡單的分區數取模 return message.id%numberOfShards if (message is DogMsg) { //return (message.id % 10).toString() return message.id.toString() } else { throw RuntimeException("沒法識別消息類型 $message") } } //對消息可進行拆封操做 override fun entityMessage(message: Any): Any { return message } } //分區中止時會派發的消息類型 object handOffStopMessage fun createActorSystem(port: Int): ActorSystem { val config = ConfigFactory.parseString( "akka.remote.artery.canonical.port=$port" ).withFallback( ConfigFactory.load() ) var actorSystem = ActorSystem.create("custerA", config); return actorSystem } fun startShardRegion(port: Int) { var actorSystem = createActorSystem(port) val settings = ClusterShardingSettings.create(actorSystem)//.withRole("ClusterShardRole") val shardReg = ClusterSharding.get(actorSystem).start( "dogShard", Props.create(DogActor::class.java), settings, ShardExtractor(), ShardCoordinator.LeastShardAllocationStrategy(10, 1), handOffStopMessage ) for (i in 1..10) { shardReg.tell(DogMsg(i, " wang"), ActorRef.noSender()) Thread.sleep(3000) } } fun shardRegProxy() { var actorSystem = createActorSystem(2663) //startProxy 代理模式,即它不會承載任何實體自己,但知道如何將消息委託到正確的位置 ClusterSharding.get(actorSystem) .startProxy("dogShard", Optional.empty(), ShardExtractor()) .let { println(" shard proxy $it started.") } Thread.sleep(3000) var shardReg = ClusterSharding.get(actorSystem).shardRegion("dogShard") for (i in 1..100) { shardReg.tell(DogMsg(i, "C wang"), ActorRef.noSender()) Thread.sleep(1500) } }
再分別啓動入口函數
fun main() { startShardRegion(2661) }
fun main() { startShardRegion(2662) }
fun main() {
shardRegProxy()
}
配置文件:oop
akka { actor { provider = "cluster" } # For the sample, just bind to loopback and do not allow access from the network # the port is overridden by the logic in main class remote.artery { enabled = on transport = tcp canonical.port = 0 canonical.hostname = 127.0.0.1 } cluster { seed-nodes = [ "akka://custerA@127.0.0.1:2661", "akka://custerA@127.0.0.1:2662"] # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. auto-down-unreachable-after = 10s } }
以上示例需在同屬一個集羣,消息才能正常中轉,因此必定要保障 ActorSystem.create(name,config) name 爲一致!(調了半天消息一直沒有發送成功,原來是這裏的問題,KAO!)ui
同一個集羣同一個ActorSystem name!this
否則會報異常:
No coordinator found to register. Probably, no seed-nodes configured and manual cluster join not performed