Akka系列(九):Akka分佈式之Akka Remote

Akka做爲一個天生用於構建分佈式應用的工具,固然提供了用於分佈式組件即Akka Remote,那麼咱們就來看看如何用Akka Remote以及Akka Serialization來構建分佈式應用。java

背景

不少同窗在程序的開發中都會遇到一個問題,當業務需求變得愈來愈複雜,單機服務器已經不足以承載相應的請求的時候,咱們都會考慮將服務部署到不一樣的服務器上,但服務器之間可能須要相互調用,那麼系統必須擁有相互通訊的接口,用於相應的數據交互,這時候一個好的遠程調用方案是一個絕對的利器,主流的遠程通訊有如下幾種選擇:git

  • RPC(Remote Procedure Call Protocol)github

  • Web Service設計模式

  • RMI (Remote Method Invocation)服務器

  • JMS(Java Messaging Service)網絡

這幾種方式都是被採用比較普遍的通訊方案,有興趣的同窗能夠本身去了解一下,這裏我會講一下RMI和JMS。app

JAVA遠程調用

RMI和JMS相信不少寫過Java程序的同窗都知道,是Java程序用來遠程通訊的主要方式,那麼RMI和JMS又有什麼區別呢?異步

1.RMI

i.特徵:
  • 同步通訊:在使用RMI調用遠程方法時,線程會持續等待直到結果返回,因此它是一個同步阻塞操做;tcp

  • 強耦合:請求的系統中須要使用的RMI服務進行接口聲明,返回的數據類型有必定的約束;分佈式

ii.優勢:
  • 實現相對簡單,方法調用形式通俗易理解,接口聲明服務功能清晰。

iii.缺點:
  • 只侷限支持JVM平臺;

  • 對沒法兼容Java語言的其餘語言也不適用;

2.JMS

i.特徵:
  • 異步通訊:JMS發送消息進行通訊,在通訊過程當中,線程不會被阻塞,沒必要等待請求迴應,因此是一個異步操做;

  • 鬆耦合:不須要接口聲明,返回的數據類型能夠是各類各樣,好比JSON,XML等;

ii.通訊方式:

(1)點對點消息傳送模型

顧名思義,點對點能夠理解爲兩個服務器的定點通訊,發送者和接收者都能明確知道對方是誰,大體模型以下:

jms-point-to-point

(2)發佈/訂閱消息傳遞模型

點對點模型有些場景並非很適用,好比有一臺主服務器,它產生一條消息須要讓全部的從服務器都能收到,若採用點對點模型的話,那主服務器須要循環發送消息,後續如有新的從服務器增長,還要改主服務器的配置,這樣就會致使沒必要要的麻煩,那麼發佈/訂閱模型是怎麼樣的呢?其實這種模式跟設計模式中的觀察者模式很類似,相信不少同窗都很熟悉,它最大的特色就是較鬆耦合,易擴展等特色,因此發佈/訂閱模型的大體結構以下:

jms-topic

iii.優勢:
  • 因爲使用異步通訊,不須要線程暫停等待,性能相對較高。

iiii.缺點:
  • 技術實現相對複雜,並須要維護相關的消息隊列;

更通俗的說:

RMI能夠當作是用打電話的方式進行信息交流,而JMS更像是發短信。

總的來講兩種方式沒有孰優孰劣,咱們也不用比較到底哪一種方式比較好,存在即合理,更重要的是哪一種選擇可能更適合你的系統。

Akka Remote

上面講到JAVA中遠程通訊的方式,但咱們以前說過Akka也是基於JVM平臺的,那麼它的通訊方式又有什麼不一樣呢?

在我看來,Akka的遠程通訊方式更像是RMI和JMS的結合,但更偏向於JMS的方式,爲何這麼說呢,咱們先來看一個示例:

咱們先來建立一個遠程的Actor:

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

如今咱們在遠程服務器上啓動這個Actor:

val system = ActorSystem("RemoteDemoSystem")
val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")

那麼如今咱們假若有一個系統須要向這個Actor發送消息應該怎麼作呢?

首先咱們須要相似RMI發佈本身的服務同樣,咱們須要爲其餘系統調用遠程Actor提供消息通訊的接口,在Akka中,設置很是簡單,不須要代碼侵入,只需簡單的在配置文件裏配置便可:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = $localIp  //好比127.0.0.1
      port = $port //好比2552
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

咱們只需配置相應的驅動,傳輸方式,ip,端口等屬性就可簡單完成Akka Remote的配置。

固然本地服務器也須要配置這些信息,由於Akka之間是須要相互通訊的,固然配置除了hostname有必定的區別外,其餘配置信息可一致,本例子是在同一臺機器上,因此這裏hostname是相同的。

這時候咱們就能夠在本地的服務器向這個Actor發送消息了,首先咱們能夠建立一個本地的Actor:

case object Init
case object SendNoReturn

class LocalActor extends Actor{

  val path = ConfigFactory.defaultApplication().getString("remote.actor.name.test")
  implicit val timeout = Timeout(4.seconds)
  val remoteActor = context.actorSelection(path)

  def receive: Receive = {
    case Init => "init local actor"
    case SendNoReturn => remoteActor ! "hello remote actor"
  }
}

其中的remote.actor.name.test的值爲:「akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor」,另外咱們能夠看到咱們使用了context.actorSelection(path)來獲取的是一個ActorSelection對象,如果須要得到ActorRef,咱們能夠調用它的resolveOne(),它返回的是是一個Future[ActorRef],這裏是否是很熟悉,由於它跟本地獲取Actor方式是同樣的,由於Akka中Actor是位置透明的,獲取本地Actor和遠程Actor是同樣的。

最後咱們首先啓動遠程Actor的系統:

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
}

而後咱們在本地系統中啓動這個LocalActor,並向它發送消息:

object LocalDemo extends App {

  implicit val system = ActorSystem("LocalDemoSystem")
  val localActor = system.actorOf(Props[LocalActor], name = "LocalActor")

  localActor ! Init
  localActor ! SendNoReturn
}

咱們能夠看到RemoteActor收到了一條消息:

send-no-return

從以上的步驟和結果看出能夠看出,Akka的遠程通訊跟JMS的點對點模式彷佛更類似一點,可是它有不須要咱們維護消息隊列,而是使用Actor自身的郵箱,另外咱們利用context.actorSelection獲取的ActorRef,能夠當作遠程Actor的副本,這個又和RMI相關概念相似,因此說Akka遠程通訊的形式上像是RMI和JMS的結合,固然底層仍是經過TCP、UDP等相關網絡協議進行數據傳輸的,從配置文件的相應內容即可以看出。

上述例子演示的是sendNoReturn的模式,那麼假如咱們須要遠程Actor給咱們一個回覆應該怎麼作呢?

首先咱們建立一個消息:

case object SendHasReturn

 def receive: Receive = {
    case SendHasReturn =>
      for {
        r <- remoteActor.ask("hello remote actor")
      } yield r
  }

咱們從新運行LocalActor並像RemoteActor發送一條消息:

send-has-return

能夠看到LocalActor在發送消息後並收到了RemoteActor返回來的消息,另外咱們這裏設置了超時時間,若在規定的時間內沒有獲得反饋,程序就會報錯。

Akka Serialization

其實這一部分本能夠單獨拿出來寫,可是相信序列化這塊你們都應該有所瞭解了,因此就不許備講太多序列化的知識了,怕班門弄斧,主要講講Akka中的序列化。

繼續上面的例子,假如咱們這時向RemoteActor發送一個自定義的對象,好比一個case class對象,可是咱們這是是在網絡中傳輸這個消息,那麼怎麼保證這個對象類型和值呢,在同一個JVM系統中咱們不須要擔憂這個,由於對象就在堆中,咱們只要傳遞相應的地址便可就行,可是在不一樣的環境中,咱們並不能這麼作,咱們在網絡中只能傳輸字節數據,因此咱們必須將對象作特殊的處理,在傳輸的時候轉化成特定的由一連串字節組成的數據,並且咱們又能夠根據這些數據恢復成一個相應的對象,這即是序列化。

咱們先定義一個參與的case class, 並修改一下上面發送消息的語句:

case object SendSerialization
case class JoinEvt(
    id: Long,
    name: String
)
def receive: Receive = {
    case SendSerialization =>
      for {
        r <- remoteActor.ask(JoinEvt(1L,"godpan"))
      } yield println(r)
  }

這時咱們從新啓動RemoteActor和LocalActor所在的系統,發送這條消息:

send-serialization

有同窗可能會以爲奇怪,咱們明明沒有對JoinEvt進行過任何序列化的標識和處理,爲何程序還能運行成功呢?

其實否則,只不過是有人替咱們默認作了,不用說,確定是貼心的Akka,它爲咱們提供了一個默認的序列化策略,那就是咱們熟悉又糾結的java.io.Serializable,沉浸在它的易使用性上,又對它的性能深惡痛絕,尤爲是當有大量對象須要傳輸的分佈式系統,若是是小系統,當我沒說,畢竟存在即合理。

又有同窗說,既然Akka是一個天生分佈式組件,爲何還用低效的java.io.Serializable,你問我我也不知道,可能當時的做者偷了偷懶,固然Akka如今可能覺醒了,首先它支持第三方的序列化工具,固然若是你有特殊需求,你也能夠本身實現一個,並且在最新的文檔中說明,在Akka 2.5x以後Akka內核消息全面廢棄java.io.Serializable,用戶自定義的消息暫時仍是支持使用java.io.Serializable的,可是不推薦用,由於它是低效的,容易被攻擊,因此在這裏我也推薦你們再Akka中儘可能不要在使用了java.io.Serializable。

那麼在Akka中咱們如何使用第三方的序列化工具呢?

這裏我推薦一個在Java社區已經久負盛名的序列化工具:kryo,有興趣的同窗能夠去了解一下:kryo,並且它也提供Akka使用的相關包,這裏咱們就使用它做爲示例:

這裏我貼上整個項目的build.sbt, kryo的相關依賴也在裏面:

import sbt._
import sbt.Keys._

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.5.3",
    "com.typesafe.akka" %% "akka-remote" % "2.5.3",
    "com.twitter" %% "chill-akka" % "0.8.4"
  )

lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.11",
  libraryDependencies := AllLibraryDependencies
)

lazy val remote = (project in file("remote"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )

lazy val local = (project in file("local"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )

而後咱們只需將application.conf中的actor配置替換成如下的內容:

actor {
    provider = "akka.remote.RemoteActorRefProvider"
    serializers {
      kryo = "com.twitter.chill.akka.AkkaSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "scala.Product" = kryo
    }
  }

其實其中的"java.io.Serializable" = none能夠省略,由於如果有其餘序列化的策略則會替換掉默認的java.io.Serializable的策略,這裏只是爲了更加仔細的說明。

至此咱們就可使用kryo了,整個過程是否是很easy,火燒眉毛開始寫demo了,那就快快開始吧。

整個例子的相關的源碼已經上傳到akka-demo中:源碼連接

相關文章
相關標籤/搜索