上一篇文章咱們講了Akka Remote,理解了Akka中的遠程通訊,其實Akka Cluster能夠當作Akka Remote的擴展,由原來的兩點變成由多點組成的通訊網絡,這種模式相信你們都很瞭解,就是集羣,它的優點主要有兩點:系統伸縮性高,容錯性更好。node
不少人很容易把分佈式和集羣的概念搞錯,包括我也是,我一開始也覺得它們兩個是同樣的概念,只是叫法不一樣而已,但其實否則,雖然它們在實際場景中都是部署在不一樣的機器上,但它們所提供的功能並非同樣的。舉個簡單的例子來看看它們之間的不一樣:git
爲了保持整個系列連續性,我又以抽獎爲基礎舉一個例子:github
假定咱們如今抽獎流程包括,抽獎分配獎品和用戶根據連接領取指定獎品,用戶先抽獎而後獲取獎品連接,點擊連接填寫相應信息領取獎品。算法
咱們如今把抽獎分配獎品和用戶根據連接領取指定獎品分別部署在兩臺機器上,忽然有一天很不幸,抽獎活動進行到一半,抽獎分配獎品那臺機子所在的區域停電了,很顯然,後續的用戶參與抽獎就不能進行了,由於咱們只有一臺抽獎分配獎品的機子,但因爲咱們將領取獎品的業務部署在另外一臺機器上,因此前面那些中獎的用戶仍是能夠正常的領取獎品,具體相關定義可參考《分佈式系統概念與設計》中對分佈式系統的定義。數據庫
如今咱們仍是有兩臺機器,可是咱們在兩個機器上都部署了抽獎分配獎品和用戶根據連接領取指定獎品的業務邏輯,忽然有一天,有一臺所在的區域停電了,但這時咱們並擔憂,由於另外一臺服務器仍是能夠正常的運行處理用戶的全部請求。bash
它們的各自特色:服務器
總的來講: 分佈式是以分離任務縮短期來提升效率,而集羣是在單位時間內處理更多的任務來提升效率。網絡
在前面的文章Akka Actor的工做方式,咱們能夠將一個任務分解成一個個小任務,而後分配給它的子Actor執行,其實這就能夠當作一個小的分佈式系統,那麼在Akka中,集羣又是一種怎樣的概念呢?負載均衡
其實往簡單裏說,就是一些相同的ActorSystem的組合,它們具備着相同的功能,咱們須要執行的任務能夠隨機的分配到目前可用的ActorSystem上,這點跟Nginx的負載均衡很相似,根據算法和配置將請求轉發給運行正常的服務器去,Akka集羣的表現形式也是這樣,固然它背後的理論基礎是基於gossip協議的,目前不少分佈式的數據庫的數據同步都採用這個協議,有興趣的同窗能夠本身去研究研究,只是我也是隻知其一;不知其二,這裏就不寫了,怕誤導了你們。frontend
下面我來說講Akka Cluster中比較重要的幾個概念:
Seed Nodes能夠看過是種子節點或者原始節點,它的一個主要做用用於能夠自動接收新加入集羣的節點的信息,並與之通訊,使用方式能夠用配置文件或者運行時指定,推薦使用配置文件方式,好比:
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552",
"akka.tcp://ClusterSystem@host2:2552"]複製代碼
seed-nodes列表中的第一個節點會集羣啓動的時候初始化,而其餘節點則是在有須要時再初始化。
固然你也能夠不指定seed nodes,但你能夠須要手動或者在程序中寫相關邏輯讓相應的節點加入集羣,具體使用方式可參考官方文檔。
Cluster Events字面意思是集羣事件,那麼這是什麼意思呢?其實它表明着是一個節點的各類狀態和操做,舉個例子,假設你在打一局王者5v5的遊戲,那麼你能夠把十我的當作一個集羣,咱們每一個人都是一個節點,咱們的任何操做和狀態都能被整個系統捕獲到,好比A殺了B、A超神了,A離開了遊戲,A從新鏈接了遊戲等等,這些狀態和操做在Cluster Events中就至關於節點之於集羣,那麼它具體是怎麼使用的呢?
首先咱們必須將節點註冊到集羣中,或者說節點訂閱了某個集羣,咱們能夠這麼作:
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])複製代碼
具體代碼相關的使用我會再下面寫一個demo例子,來講明是如何具體使用它們的。
從上面的代碼咱們能夠看到有一個MemberEvent的概念,這個其實就是每一個成員所可能擁有的events,那麼一個成員在它的生命週期中有如下的events
狀態說明:
雖然上面說到集羣中的各個節點的功能是同樣的,其實並不必定,好比咱們將分佈式和集羣融合到一塊兒,集羣中的一部分節點負責接收請求,一部分用於計算,一部分用於數據存儲等等,因此Akka Cluster提供了一種Roles的概念,用來表示該節點的功能特性,咱們能夠在配置文件中指定,好比:
akka.cluster.roles = request
akka.cluster.roles = compute
akka.cluster.roles = store複製代碼
ClusterClient是一個集羣客戶端,主要用於集羣外部系統與集羣通訊,使用它很是方便,咱們只須要將集羣中的任意指定一個節點做爲集羣客戶端,而後將其註冊爲一個該集羣的接待員,最後咱們就能夠在外部系統直接與之通訊了,使用ClusterClient須要作相應的配置:
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]複製代碼
假設咱們如今我一個接待的Actor,叫作frontend,咱們就能夠這樣作:
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
ClusterClientReceptionist(system).registerService(frontend)複製代碼
上面講了集羣概念和Akka Cluster中相對重要的概念,下面咱們就來寫一個Akka Cluster的demo,
demo需求:
線假設須要執行一些相同任務,頻率爲2s一個,如今咱們須要將這些任務分配給Akka集羣中的不一樣節點去執行,這裏使用ClusterClient做爲集羣與外部的通訊接口。
首先咱們先來定義一些命令:
package sample.cluster.transformation
final case class TransformationJob(text: String) // 任務內容
final case class TransformationResult(text: String) // 執行任務結果
final case class JobFailed(reason: String, job: TransformationJob) //任務失敗相應緣由
case object BackendRegistration // 後臺具體執行任務節點註冊事件複製代碼
而後咱們實現具體執行任務邏輯的後臺節點:
class TransformationBackend extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) //在啓動Actor時將該節點訂閱到集羣中
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case TransformationJob(text) => { // 接收任務請求
val result = text.toUpperCase // 任務執行獲得結果(將字符串轉換爲大寫)
sender() ! TransformationResult(text.toUpperCase) // 向發送者返回結果
}
case state: CurrentClusterState =>
state.members.filter(_.status == MemberStatus.Up) foreach register // 根據節點狀態向集羣客戶端註冊
case MemberUp(m) => register(m) // 將剛處於Up狀態的節點向集羣客戶端註冊
}
def register(member: Member): Unit = { //將節點註冊到集羣客戶端
context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
BackendRegistration
}
}複製代碼
相應節點的配置文件信息,我這裏就不貼了,請從相應的源碼demo裏獲取。源碼連接
接着咱們來實現集羣客戶端:
class TransformationFrontend extends Actor {
var backends = IndexedSeq.empty[ActorRef] //任務後臺節點列表
var jobCounter = 0
def receive = {
case job: TransformationJob if backends.isEmpty => //目前暫無執行任務節點可用
sender() ! JobFailed("Service unavailable, try again later", job)
case job: TransformationJob => //執行相應任務
jobCounter += 1
implicit val timeout = Timeout(5 seconds)
val backend = backends(jobCounter % backends.size) //根據相應算法選擇執行任務的節點
println(s"the backend is ${backend} and the job is ${job}")
val result = (backend ? job)
.map(x => x.asInstanceOf[TransformationResult]) // 後臺節點處理獲得結果
result pipeTo sender //向外部系統發送執行結果
case BackendRegistration if !backends.contains(sender()) => // 添加新的後臺任務節點
context watch sender() //監控相應的任務節點
backends = backends :+ sender()
case Terminated(a) =>
backends = backends.filterNot(_ == a) // 移除已經終止運行的節點
}
}複製代碼
最後咱們實現與集羣客戶端交互的邏輯:
class ClientJobTransformationSendingActor extends Actor {
val initialContacts = Set(
ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist"))
val settings = ClusterClientSettings(context.system)
.withInitialContacts(initialContacts)
val c = context.system.actorOf(ClusterClient.props(settings), "demo-client")
def receive = {
case TransformationResult(result) => {
println(s"Client response and the result is ${result}")
}
case Send(counter) => {
val job = TransformationJob("hello-" + counter)
implicit val timeout = Timeout(5 seconds)
val result = Patterns.ask(c,ClusterClient.Send("/user/frontend", job, localAffinity = true), timeout)
result.onComplete {
case Success(transformationResult) => {
self ! transformationResult
}
case Failure(t) => println("An error has occured: " + t.getMessage)
}
}
}
}複製代碼
下面咱們開始運行這個domo:
object DemoClient {
def main(args : Array[String]) {
TransformationFrontendApp.main(Seq("2551").toArray) //啓動集羣客戶端
TransformationBackendApp.main(Seq("8001").toArray) //啓動三個後臺節點
TransformationBackendApp.main(Seq("8002").toArray)
TransformationBackendApp.main(Seq("8003").toArray)
val system = ActorSystem("OTHERSYSTEM")
val clientJobTransformationSendingActor =
system.actorOf(Props[ClientJobTransformationSendingActor],
name = "clientJobTransformationSendingActor")
val counter = new AtomicInteger
import system.dispatcher
system.scheduler.schedule(2.seconds, 2.seconds) { //定時發送任務
clientJobTransformationSendingActor ! Send(counter.incrementAndGet())
}
StdIn.readLine()
system.terminate()
}
}複製代碼
運行結果:
從結果能夠看到,咱們將任務根據算法分配給不一樣的後臺節點進行執行,最終返回結果。
本文的demo例子已上傳github:源碼連接