AKKA學習筆記

AKKA學習筆記總結java

01. AKKA

1. 介紹:

Akka基於Actor模型,提供了一個用於構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。多線程

2. Spark中的RPC

目前大多數的分佈式架構底層通訊都是經過RPC(進程間通訊)實現的,好比Hadoop項目的RPC通訊框架,可是Hadoop在設計之初就是爲了運行長達數小時的批量而設計的,在某些極端的狀況下,任務提交的延遲很高,全部Hadoop的RPC顯得有些笨重。架構

Spark 的RPC是經過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具備高可靠、高性能、可擴展、分佈式等特色,使用Akka能夠輕鬆實現分佈式RPC功能。併發

3. Actor模型

1. 介紹

Actor模型:在計算機科學領域,Actor模型是一個並行計算模型,它把actor做爲並行計算的基本元素來對待:爲響應一個接收到的消息,一個actor可以本身作出一些決策,如建立更多的actor,或發送更多的消息,或者肯定如何去響應接收到的下一個消息。
框架

2. Scala中的多線程

(1) 傳統的併發是經過線程(thread)來實現的。在傳統的併發模型中,程序被分紅若干份同時執行的任務,而且全部任務都對一塊共享的內存進行操做。在傳統的併發模型會引發競爭問題,能夠採起鎖機制避免競爭問題,但同時這可能帶來死鎖等問題。異步

(2) 在Scala中,多線程的基礎就是Actor,核心思想是用消息傳遞來進行線程間的信息共享和同步。它是基於事件模型的併發機制,Scala是運用消息(message)的發送、接收來實現多線程的。使用Scala可以更容易地實現多線程應用的開發。tcp

Actor模型是另外一種不一樣的併發模型,它很好地解決了在傳統併發模型中競爭和死鎖等問題。咱們能夠把一個由actor模型實現的併發程序當作是一個星系同樣,星系裏面有不少星球,每一個星球都是一個actor,星球之間不共享任何資源,可是它們之間有通道來相互傳遞信息。分佈式

每一個星球(actor)都有一個信箱來接受來自其它星球的任意信息,它會按照信息接收的順序來處理,處理完一個信息而後接着處理下一個信息。能夠按照信息類型來觸發不一樣的行爲。ide

同時,每一個星球(actor)能夠異步地(也能夠同步,但不是這裏談論的重點)向其它任意星球發送任意消息,就是說,它發送消息以後不會等待返回信息而是直接執行接下來的操做。oop

好比:

object MyActor1 extends Actor{
 //從新act方法
 def act(){
 for(i <- 1 to 20){
   println("actor-1 " + i)
   Thread.sleep(1000)
  }
}
}

啓動線程:

//啓動Actor
MyActor1.start()

關於Actor之間的消息傳遞不是詳情見以後的一篇學習筆記。

4. AKKA實現RPC

Scala在2.11.x版本中將Akka加入其中,做爲其默認的Actor,老版本的Actor已經廢棄。

1. 架構圖

2. 重要類和方法

ActorSystem
在Akka中,ActorSystem是一個重量級的結構,他按需分配多個線程,因此在實際應用中,ActorSystem一般是一個單例對象,咱們可使用這個ActorSystem建立不少Actor。

在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。
1.preStart()方法:該方法在Actor對象構造方法執行後執行,整個Actor生命週期中僅執行一次。
2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收消息,會被反覆執行。

3. 實現

Master類

package wrd.akka

import akka.actor.Actor
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props

class Master_old extends Actor {

  println("constructor invoked")

  override def preStart(): Unit = {
    println("preStart invoked")
  }

  //用於接收消息,sender就是發送者的代理
  def receive: Actor.Receive = {
    case "connect" => {
      println("a client connected")
      sender ! "reply"
    }

    case "hello" => {
      println("hello")
    }
  }

}

object Master_old {

  def main(args: Array[String]): Unit = {
    val host = "127.0.0.1"
    val port = 8888
    // 準備配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,輔助建立和監控下面的Actor,他是單例的
    val actorSystem = ActorSystem("MasterSystem", config)

    val master = actorSystem.actorOf(Props(new Master_old), "Master") //Master主構造器會執行
    master ! "hello" //發送信息
    actorSystem.awaitTermination() //讓進程等待着, 先別結束
    }
}

Worker類

package wrd.akka

import akka.actor.Actor
import akka.actor.ActorSelection
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props

class Worker_old(val masterHost: String, val masterPort: Int) extends Actor {

  var master: ActorSelection = _

  //創建鏈接
  override def preStart(): Unit = {
    //在master啓動時會打印下面的那個協議, 能夠先用這個作一個標誌, 鏈接哪一個master
    //繼承actor後會有一個context, 能夠經過它來鏈接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    //須要有/user, Master要和master那邊建立的名字保持一致
    master ! "connect"
  }

  def receive: Actor.Receive = {
    case "reply" => {
      println("a reply from master")
    }
  }

}

object Worker_old {

  def main(args: Array[String]): Unit = {

    val host = "127.0.0.1"
    val port = 9999

    val masterHost = "127.0.0.1"
    val masterPort = 8888
    // 準備配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,輔助建立和監控下面的Actor,他是單例的
    val actorSystem = ActorSystem("WorkerSystem", config)
    actorSystem.actorOf(Props(new Worker_old(masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}

暫時先記錄到這兒吧,再完善。 參考:《http://blog.csdn.net/fancylovejava/article/details/24724395》

相關文章
相關標籤/搜索