咱們的第一個查詢協議是正確的,但沒有考慮分佈式應用程序執行。若是咱們想在查詢設備Actor中實現重發(因爲超時請求),或者若是咱們想查詢多個Actor,咱們須要可以關聯請求和響應。所以,咱們在消息中再添加一個字段,以便請求者能夠提供ID(咱們將在稍後的步驟中將此代碼添加到咱們的應用程序中):html
定義設備actor及其讀取協議git
正如咱們在Hello World示例中所瞭解到的,每一個actor都定義了它將接受的消息類型。咱們的設備Actor有責任對給定查詢的響應使用相同的ID參數,這將使其看起來以下所示。github
import java.util.Optional; import akka.actor.AbstractActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, groupId, deviceId); } public static final class ReadTemperature { long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { long requestId; Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(ReadTemperature.class, r -> { getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }
請注意代碼中:分佈式
測試Actoride
基於上面的簡單Actor,咱們能夠編寫一個簡單的測試。在項目的測試目錄中中的com.lightbend.akka.sample包中,將如下代碼添加到DeviceTest.java文件中。post
您能夠經過運行mvn test或在sbt提示符下運行test來運行此測試。測試
@Test public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { TestKit probe = new TestKit(system); ActorRef deviceActor = system.actorOf(Device.props("group", "device")); deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); assertEquals(42L, response.requestId); assertEquals(Optional.empty(), response.value); }
如今,Actor須要一種方法來從傳感器接收到消息時改變溫度狀態。
添加寫協議
寫入協議的目的是在actor接收包含溫度的消息時更新currentTemperature字段。一樣,很容易將寫協議定義爲一個很是簡單的消息,以下所示:
public static final class RecordTemperature { final double value; public RecordTemperature(double value) { this.value = value; } }
可是,這種方法沒有考慮到記錄溫度消息的發送者永遠沒法肯定消息是否被處理。咱們已經看到Akka不保證傳遞這些消息並將其留給應用程序以提供成功通知。在這種狀況下,咱們但願在更新上次溫度記錄後向發件人發送確認,例如TemperatureRecorded。就像溫度查詢和響應同樣,最好包含一個ID字段以提供最大的靈活性。
讀取和寫入消息的Actor
將讀寫協議放在一塊兒,設備Actor看起來像下面的例子:
import java.util.Optional; import akka.actor.AbstractActor; import akka.actor.AbstractActor.Receive; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, groupId, deviceId); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature(long requestId, double value) { this.requestId = requestId; this.value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded(long requestId) { this.requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}", r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); }) .match(ReadTemperature.class, r -> { getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }
咱們如今還應該編寫一個新的測試用例,同時執行讀/查和寫/記錄功能:
@Test public void testReplyWithLatestTemperatureReading() { TestKit probe = new TestKit(system); ActorRef deviceActor = system.actorOf(Device.props("group", "device")); deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); assertEquals(2L, response1.requestId); assertEquals(Optional.of(24.0), response1.value); deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); assertEquals(4L, response2.requestId); assertEquals(Optional.of(55.0), response2.value); }
到目前爲止,咱們已經開始設計咱們的總體架構,而且咱們編寫了第一個直接對應領域的actor。咱們如今必須建立負責維護設備組和設備Actor自己的組件。
下節繼續!
原文:https://doc.akka.io/docs/akka/2.5/guide/tutorial_3.html
有什麼討論的內容,能夠加我公衆號: