在不少應用場景中都會出如今系統中須要某類Actor的惟一實例(only instance)。這個實例在集羣環境中可能在任何一個節點上,但保證它是惟一的。Akka的Cluster-Singleton提供對這種Singleton Actor模式的支持,能作到當這個實例所在節點出現問題須要脫離集羣時自動在另外一個節點上構建一個一樣的Actor,並從新轉交控制。固然,因爲涉及了一個新構建的Actor,內部狀態會在這個過程當中丟失。Single-Actor的主要應用包括某種對外部只能支持一個接入的程序接口,或者一種帶有由多個其它Actor運算結果產生的內部狀態的累積型Actor(aggregator)。固然,若是使用一種帶有內部狀態的Singleton-Actor,能夠考慮使用PersistenceActor來實現內部狀態的自動恢復。如此Cluster-Singleton變成了一種很是實用的模式,能夠在許多場合下應用。java
Cluster-Singleton模式也偏偏由於它的惟一性特色存在着一些隱憂,須要特別關注。惟一性容易形成的隱憂包括:容易形成超負荷、沒法保證穩定在線、沒法保證消息投遞。這些須要用戶在編程時增長特別處理。node
好了,咱們設計個例子來了解Cluster-Singleton,先看看Singleton-Actor的功能:編程
class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
這個SingletonActor就是一種特殊的Actor,它繼承了PersistentActor,因此須要實現PersistentActor的抽象函數。SingletonActor維護了幾個內部狀態,分別是各種運算的當前累積結果freeHoles,freeTrees,ttlMatches。SingletonActor模擬的是一個種樹場景:當收到Dig指令後產生登記樹坑AddHole事件,在這個事件中更新當前狀態值;當收到Plant指令後產生AddTree事件並更新狀態。由於Cluster-Singleton模式沒法保證消息安全投遞因此應該加個回覆機制AckDig,AckPlant讓消息發送者可用根據狀況補發消息。咱們是用Cluster.selfAddress來確認當前集羣節點的轉換。安全
咱們須要在全部承載SingletonActor的集羣節點上構建部署ClusterSingletonManager,以下:app
def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") }
能夠看的出來,ClusterSingletonManager也是一種Actor,經過ClusterSingletonManager.props配置其所管理的SingletonActor。咱們的目的主要是去求證當前集羣節點出現故障須要退出集羣時,這個SingletonActor是否可以自動轉移到其它在線的節點上。ClusterSingletonManager的工做原理是首先在全部選定的集羣節點上構建和部署,而後在最早部署的節點上啓動SingletonActor,當這個節點不可以使用時(unreachable)自動在次先部署的節點上從新構建部署SingletonActor。frontend
一樣做爲一種Actor,ClusterSingletonProxy是經過與ClusterSingletonManager消息溝通來調用SingletonActor的。ClusterSingletonProxy動態跟蹤在線的SingletonActor,爲用戶提供它的ActorRef。咱們能夠經過下面的代碼來具體調用SingletonActor:tcp
object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
咱們分不一樣的時間段經過ClusterSingletonProxy向SingletonActor發送Dig和Plant消息。而後每隔30秒向SingletonActor發送一個Disconnect消息通知它所在節點開始脫離集羣。而後咱們用下面的代碼來試着運行:ide
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}
運算結果以下:函數
[INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551] [INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839] [INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840] [INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841] [INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842] [INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting [INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843] [INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0 [INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0 ... [INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ... [INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4 [INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4 [INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5 [INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=5 [INFO] [07/09/2017 20:17:43.672] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=1,ttlMatches=6 [INFO] [07/09/2017 20:17:45.665] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=6 [INFO] [07/09/2017 20:17:46.654] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=6 ... [INFO] [07/09/2017 20:17:53.673] [SingletonClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:55839 is leaving cluster ... [INFO] [07/09/2017 20:17:55.654] [SingletonClusterSystem-akka.actor.default-dispatcher-13] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=4,ttlMatches=9 [INFO] [07/09/2017 20:17:55.664] [SingletonClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=10 [INFO] [07/09/2017 20:17:56.646] [SingletonClusterSystem-akka.actor.default-dispatcher-5] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] [INFO] [07/09/2017 20:17:57.662] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=4,ttlMatches=10 [INFO] [07/09/2017 20:17:58.652] [SingletonClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=5,ttlMatches=10
從結果顯示裏咱們能夠觀察到隨着節點脫離集羣,SingletonActor自動轉換到其它的集羣節點上繼續運行。工具
值得再三注意的是:以此等簡單的編碼就能夠實現那麼複雜的集羣式分佈運算程序,說明Akka是一種具備廣闊前景的實用編程工具!
下面是本次示範的完整源代碼:
build.sbt
name := "cluster-singleton" version := "1.0" scalaVersion := "2.11.9" resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" val akkaversion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaversion, "com.typesafe.akka" %% "akka-remote" % akkaversion, "com.typesafe.akka" %% "akka-cluster" % akkaversion, "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion, "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion, "com.typesafe.akka" %% "akka-persistence" % "2.4.8", "com.typesafe.akka" %% "akka-contrib" % akkaversion, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")
application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://SingletonClusterSystem@127.0.0.1:2551"] log-info = off } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.plugin = "akka.persistence.snapshot-store.local" snapshot-store.local.dir = "target/snapshots" } }
SingletonActor.scala
package clustersingleton.sa import akka.actor._ import akka.cluster._ import akka.persistence._ import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ import akka.persistence.journal.leveldb._ import akka.util.Timeout import akka.pattern._ object SingletonActor { sealed trait Command case object Dig extends Command case object Plant extends Command case object AckDig extends Command //acknowledge
case object AckPlant extends Command //acknowledge
case object Disconnect extends Command //force node to leave cluster
case object CleanUp extends Command //clean up when actor ends
sealed trait Event case object AddHole extends Event case object AddTree extends Event case class State(nHoles: Int, nTrees: Int, nMatches: Int) def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } } class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
var freeTrees = 0
var ttlMatches = 0
override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
if (freeTrees > 0) { ttlMatches += 1 freeTrees -= 1 } else freeHoles += 1
case AddTree =>
if (freeHoles > 0) { ttlMatches += 1 freeHoles -= 1 } else freeTrees += 1 } override def receiveRecover: Receive = { case evt: Event => updateState(evt) case SnapshotOffer(_,ss: State) => freeHoles = ss.nHoles freeTrees = ss.nTrees ttlMatches = ss.nMatches } override def receiveCommand: Receive = { case Dig => persist(AddHole){evt => updateState(evt) } sender() ! AckDig //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Plant => persist(AddTree) {evt => updateState(evt) } sender() ! AckPlant //notify sender message received
log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches") case Disconnect => //this node exits cluster. expect switch to another node
log.info(s"${cluster.selfAddress} is leaving cluster ...") cluster.leave(cluster.selfAddress) case CleanUp =>
//clean up ...
self ! PoisonPill } }
SingletonUser.scala
package clustersingleton.frontend import akka.actor._ import clustersingleton.sa.SingletonActor import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ object SingletonUser { def create = { val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]") .withFallback(ConfigFactory.load()) val suSystem = ActorSystem("SingletonClusterSystem",config) val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props( singletonManagerPath = "/user/singletonManager", settings = ClusterSingletonProxySettings(suSystem).withRole(None) ), name= "singletonUser") import suSystem.dispatcher //send Dig messages every 2 seconds to SingletonActor through prox
suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig) //send Plant messages every 3 seconds to SingletonActor through prox
suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant) //send kill message to hosting node every 30 seconds
suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect) } }
ClusterSingletonDemo.scala
package clustersingleton.demo import clustersingleton.sa.SingletonActor import clustersingleton.frontend.SingletonUser object ClusterSingletonDemo extends App { SingletonActor.create(2551) //seed-node
SingletonActor.create(0) //ClusterSingletonManager node
SingletonActor.create(0) SingletonActor.create(0) SingletonActor.create(0) SingletonUser.create //ClusterSingletonProxy node
}