【AKKA 官方文檔翻譯】第三部分:與設備Actor一塊兒工做

第三部分:與設備Actor一塊兒工做

akka版本2.5.8
版權聲明:本文爲博主原創文章,未經博主容許不得轉載。程序員

在以前的話題中,咱們解釋瞭如何在高層次來看待actor系統,即要如何去表示組件,如何安排actor的層次結構。在本節中,咱們會看到如何實現其中的設備actor。數據庫

若是咱們使用對象,咱們會將API設計爲接口,並擁有一組會被實現類實現的抽象的方法。可是在actor的世界裏,協議(protocols)取代了接口。雖然咱們不能在編程語言內形式化通用協議,可是咱們能夠編寫它們最基本的元素——消息。所以,咱們會從定義咱們但願發給設備的消息開始咱們的程序。編程

給設備的消息

設備actor的工做很簡單:安全

一、收集溫度測量信息
二、當被查詢時,報告最後一次的測量值網絡

然而,在設備啓動時不會馬上就得到溫度測量信息,所以,咱們須要考慮溫度測量信息不存在的狀況。這也容許咱們的actor在沒有寫模塊的時候來測試讀模塊,由於設備能夠簡單地報告一個空結果。架構

從設備獲取但前溫度的協議很簡單,actor須要:框架

一、等待取當前溫度的請求
二、迴應這個請求:編程語言

①擁有當前的溫度數據
②標識當前溫度數據還不可用分佈式

咱們須要兩個消息,一個用來請求,一個用來回復。咱們的第一次嘗試可能以下所示:ide

final case object ReadTemperature
final case class RespondTemperature(value: Option[Double])

這兩條消息貌似涵蓋了全部咱們所須要的功能,然而,咱們選擇方法的時候必需要考慮應用程序的分佈式特性。雖然actor在JVM本地通訊與遠程通訊的基本機制相同,可是咱們須要牢記如下幾點:

一、本地信息與遠程信息的傳輸延遲有很大的不一樣,有些因素,如網絡帶寬、信息大小都會產生做用。
二、可靠性必須被重視,由於在遠程信息傳遞中會涉及到不少的步驟,這也會增大失敗的概率。
三、本地消息僅僅是在JVM內部傳遞引用,所以不會對消息有不少的限制,可是遠程傳輸可能會限制消息的大小。

另外,在JVM內部傳遞消息顯然是可靠性很高的,可是當actor由於程序員的錯誤而在處理信息時失敗了,那麼系統的表現就會和遠程網絡請求中遠程處理消息崩潰一致。儘管這是兩個場景,服務一會就會被恢復(actor會被監管者重啓,主機會被操做員或監控系統重啓),可是個別的請求可能會在故障中丟失。所以,咱們要悲觀一些,在丟失任何信息的狀況下都要保證系統安全

進一步理解協議中的靈活性需求,將有助於咱們去考慮Akka消息順序和消息傳遞保證。Akka爲消息發送提供瞭如下行爲:

一、最多隻有一次傳遞,即不保證送達
二、信息是被每一個發送者接收者對來維護的

如下章節將討論行爲中的更多細節:

一、信息傳遞
二、信息排序

信息傳遞

消息傳遞子系統提供的消息傳遞語義一般分爲如下幾類:

一、最多傳遞一次(At-most-once delivery),每一個消息被髮送零或一次,這意味着信息可能會丟失,但永遠不會被重複接收到
二、至少傳遞一次(At-least-once delivery),每一個消息均可能被潛在地發送不少次,直到有一次成功。這意味着信息可能會被重複接收,但永遠不會丟失
三、準確地發送一次(Exactly-once delivery),每一個消息都被精準地發送給接收者一次,消息不會丟失也不會重複接收

Akka使用第一種行爲,它是最節省資源的,而且性能最好。它擁有最小的實現開銷,由於可使用發送即忘(fire-and-forget)策略,而不用在發送者內保存發送狀態。第二點,也不須要對傳輸丟失進行計數。這些增長了發送結束後保持狀態、發送完畢確認的開銷。準確地發送一次信息的方式開銷是最大的,因爲其不好的性能表現,除了在發送端增長上述所說的開銷外,還須要在接收端增長過濾重複消息的機制。

在actor系統中,咱們須要肯定一個消息被保證的含義,在哪一種狀況下認爲傳輸已經完成:

一、當消息被送出到網絡上時?
二、當消息被接收者主機接收到時?
三、當消息被放到接收者actor的郵箱裏時?
四、當消息接收者actor開始處理這個信息時?
五、當消息接受者actor處理完這個消息時

大多數框架和協議聲稱保證傳輸,實際上它們提供了相似於4和5的東西。雖然這聽起來是合理的,可是實際上真的有用嗎?要理解其中的含義,請考慮一個簡單的問題:用戶嘗試下一個訂單,而且咱們認爲一旦它進入了訂單數據庫,就表明它已經被成功處理了。

若是咱們依賴於第五點,即消息被成功處理,那麼actor須要儘快在處理完後報告成功狀態,這個actor就有義務在訂單被提交到它的API後進行校驗、處理,而後放入訂單數據庫。不幸的是,當API被調用後,這些狀況可能會發生:

一、主機崩潰
二、反序列化失敗
三、校驗失敗
四、數據庫不可訪問
五、發生程序錯誤

這說明傳輸保證不能被認爲是領域級別的保證。咱們只想讓它在徹底處理完訂單並將其持久化後報告成功狀態。惟一能報告成功狀態的實體是應用程序自己,由於只有它瞭解領域內保證傳輸須要有哪些需求。沒有一個通用的系統能夠搞清楚某個特定領域中什麼狀況纔會被認爲是成功。

在這個特定的例子中,咱們只想在成功寫入數據庫以後發出成功信號,數據庫確認已經安全地將訂單存儲起來。因爲這些緣由,Akka將保證程序的責任提高給了應用程序自己,即你必須本身去實現這些。這給了你徹底的控制權,讓你能夠保護你須要保護的內容。如今,讓咱們考慮下Akka爲咱們提供的消息排序,以便輕鬆推理應用程序邏輯。

信息排序

在Akka裏對於一個給定的發送接收actor對。直接從A到B的消息不會被無序接收。直接這個詞強調這隻適用於直接向接收者發動消息,而不包括中間有協調員的狀況。

若是:

一、actor A1A2 發送了信息 M1M2M3
二、actor A3A2 發送了信息 M4M5M6

這意味着對於Akka消息:

一、M1必須在M2M3前被髮送
二、M2必須在M3前被髮送
三、M4必須在M5M6前被髮送
四、M5必須在M6前被髮送
五、A2看到的A1A3的信息多是交錯出現的
六、當前咱們沒有保證傳輸,全部消息都有可能會被丟棄,好比沒有到達A2

這些保證達到了一個很好的平衡:從一個actor接收到有序的消息使咱們能夠方便地構建易於推理的系統。另外一方面,容許不一樣actor的消息交錯接受給了咱們足夠的自由度,讓咱們能夠實現高性能的actor系統。

有關傳輸保證的完整細節,棄權那個參考參考頁面。

爲設備消息添加靈活性

咱們的第一個查詢協議是正確的,可是沒有考慮分佈式應用程序的執行。若是咱們想在actor中實現重傳(由於請求超時),以便查詢設備actor,或者咱們想在查詢多個actor時關聯請求和回覆。所以,咱們在消息裏添加了一個字段,以便請求者能夠提供一個ID(咱們會在接下來的步驟裏把代碼添加到應用程序裏):

final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])

定義設備actor和讀取協議

正如咱們在Hello World實例裏學習到的,每一個actor定義了其能接受到的消息種類。咱們的設備actor有義務使用相同的ID參數來回應請求,這將看起來以下所示:

import akka.actor.{ Actor, ActorLogging, Props }

object Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  import Device._

  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case ReadTemperature(id) ⇒
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }

}

注意代碼中的:

一、伴生對象定義瞭如何建立 Device actor,期中props方法的參數包含設備的ID和所屬的組ID,這在以後將會用到。
二、伴生對象包含了咱們以前所述的消息的定義。
三、在 Device 類裏,lastTemperatureReading的值初始化爲None,而且actor能夠簡單地將它返回。

測試actor

基於上面的簡單actor,咱們能夠寫一個簡單的測試用例。在測試代碼路徑下的com.lightbend.akka.sample包裏添加DeviceSpec.scala文件。(咱們使用ScalaTest,你也可使用其餘測試框架)

你能夠經過在sbt提示符下運行test來運行測試。

"reply with empty reading if no temperature is known" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
  val response = probe.expectMsgType[Device.RespondTemperature]
  response.requestId should ===(42)
  response.value should ===(None)
}

如今當actor接收到傳感器的信息時,須要一種方式來改變其溫度狀態。

添加一個寫入協議

寫入協議的目的是在接受到包含溫度的信息時更新currentTemperature字段。一樣,咱們使用一個簡單的消息來定義寫入協議,就像這樣:

final case class RecordTemperature(value: Double)

然而,這種方式沒有考慮讓發送者知道溫度記錄是否被處理,咱們已經看到Akka並不保證消息傳輸,而且把提供消息成功提示留給了應用程序來作。在咱們的場景下,咱們但願在更新溫度以後給發送者一個確認消息。例如:final case class TemperatureRecorded(requestId: Long)。就像以前場景中溫度的請求和迴應同樣,添加一個ID字段提供了極大的靈活性。

有讀寫消息的actor

將讀寫協議放在一塊兒,設備actor看起來就會像這樣:

import akka.actor.{ Actor, ActorLogging, Props }

object Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class RecordTemperature(requestId: Long, value: Double)
  final case class TemperatureRecorded(requestId: Long)

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  import Device._
  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case RecordTemperature(id, value) ⇒
      log.info("Recorded temperature reading {} with {}", value, id)
      lastTemperatureReading = Some(value)
      sender() ! TemperatureRecorded(id)

    case ReadTemperature(id) ⇒
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }
}

咱們如今還須要寫一個新的測試用例,同時執行讀/請求和寫/記錄:

"reply with latest temperature reading" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 1))

  deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
  val response1 = probe.expectMsgType[Device.RespondTemperature]
  response1.requestId should ===(2)
  response1.value should ===(Some(24.0))

  deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 3))

  deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
  val response2 = probe.expectMsgType[Device.RespondTemperature]
  response2.requestId should ===(4)
  response2.value should ===(Some(55.0))
}

接下來

到目前爲止,咱們已經開始設計咱們的總體架構,而且咱們編寫了與領域直接對應的第一個actor。咱們以後須要建立一個用來維護設備組和設備actor的組件。

相關文章
相關標籤/搜索