AKKA學習筆記總結java
Akka基於Actor模型,提供了一個用於構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。多線程
目前大多數的分佈式架構底層通訊都是經過RPC(進程間通訊)實現的,好比Hadoop項目的RPC通訊框架,可是Hadoop在設計之初就是爲了運行長達數小時的批量而設計的,在某些極端的狀況下,任務提交的延遲很高,全部Hadoop的RPC顯得有些笨重。架構
Spark 的RPC是經過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具備高可靠、高性能、可擴展、分佈式等特色,使用Akka能夠輕鬆實現分佈式RPC功能。併發
Actor模型:在計算機科學領域,Actor模型是一個並行計算模型,它把actor做爲並行計算的基本元素來對待:爲響應一個接收到的消息,一個actor可以本身作出一些決策,如建立更多的actor,或發送更多的消息,或者肯定如何去響應接收到的下一個消息。
框架
(1) 傳統的併發是經過線程(thread)來實現的。在傳統的併發模型中,程序被分紅若干份同時執行的任務,而且全部任務都對一塊共享的內存進行操做。在傳統的併發模型會引發競爭問題,能夠採起鎖機制避免競爭問題,但同時這可能帶來死鎖等問題。異步
(2) 在Scala中,多線程的基礎就是Actor,核心思想是用消息傳遞來進行線程間的信息共享和同步。它是基於事件模型的併發機制,Scala是運用消息(message)的發送、接收來實現多線程的。使用Scala可以更容易地實現多線程應用的開發。tcp
Actor模型是另外一種不一樣的併發模型,它很好地解決了在傳統併發模型中競爭和死鎖等問題。咱們能夠把一個由actor模型實現的併發程序當作是一個星系同樣,星系裏面有不少星球,每一個星球都是一個actor,星球之間不共享任何資源,可是它們之間有通道來相互傳遞信息。分佈式
每一個星球(actor)都有一個信箱來接受來自其它星球的任意信息,它會按照信息接收的順序來處理,處理完一個信息而後接着處理下一個信息。能夠按照信息類型來觸發不一樣的行爲。ide
同時,每一個星球(actor)能夠異步地(也能夠同步,但不是這裏談論的重點)向其它任意星球發送任意消息,就是說,它發送消息以後不會等待返回信息而是直接執行接下來的操做。oop
好比:
object MyActor1 extends Actor{ //從新act方法 def act(){ for(i <- 1 to 20){ println("actor-1 " + i) Thread.sleep(1000) } } }
啓動線程:
//啓動Actor MyActor1.start()
關於Actor之間的消息傳遞不是詳情見以後的一篇學習筆記。
Scala在2.11.x版本中將Akka加入其中,做爲其默認的Actor,老版本的Actor已經廢棄。
ActorSystem
在Akka中,ActorSystem是一個重量級的結構,他按需分配多個線程,因此在實際應用中,ActorSystem一般是一個單例對象,咱們可使用這個ActorSystem建立不少Actor。
在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。
1.preStart()方法:該方法在Actor對象構造方法執行後執行,整個Actor生命週期中僅執行一次。
2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收消息,會被反覆執行。
Master類
package wrd.akka import akka.actor.Actor import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import akka.actor.Props class Master_old extends Actor { println("constructor invoked") override def preStart(): Unit = { println("preStart invoked") } //用於接收消息,sender就是發送者的代理 def receive: Actor.Receive = { case "connect" => { println("a client connected") sender ! "reply" } case "hello" => { println("hello") } } } object Master_old { def main(args: Array[String]): Unit = { val host = "127.0.0.1" val port = 8888 // 準備配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,輔助建立和監控下面的Actor,他是單例的 val actorSystem = ActorSystem("MasterSystem", config) val master = actorSystem.actorOf(Props(new Master_old), "Master") //Master主構造器會執行 master ! "hello" //發送信息 actorSystem.awaitTermination() //讓進程等待着, 先別結束 } }
Worker類
package wrd.akka import akka.actor.Actor import akka.actor.ActorSelection import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import akka.actor.Props class Worker_old(val masterHost: String, val masterPort: Int) extends Actor { var master: ActorSelection = _ //創建鏈接 override def preStart(): Unit = { //在master啓動時會打印下面的那個協議, 能夠先用這個作一個標誌, 鏈接哪一個master //繼承actor後會有一個context, 能夠經過它來鏈接 master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //須要有/user, Master要和master那邊建立的名字保持一致 master ! "connect" } def receive: Actor.Receive = { case "reply" => { println("a reply from master") } } } object Worker_old { def main(args: Array[String]): Unit = { val host = "127.0.0.1" val port = 9999 val masterHost = "127.0.0.1" val masterPort = 8888 // 準備配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,輔助建立和監控下面的Actor,他是單例的 val actorSystem = ActorSystem("WorkerSystem", config) actorSystem.actorOf(Props(new Worker_old(masterHost, masterPort)), "Worker") actorSystem.awaitTermination() } }
暫時先記錄到這兒吧,再完善。 參考:《http://blog.csdn.net/fancylovejava/article/details/24724395》