前言
在《使用Akka持久化——持久化與快照》一文中介紹瞭如何使用Akka持久化消息及生成快照。對於集羣應用來講,發送者發出消息,只有當收到了接受者的成功回覆才應當認爲是一次完整的請求和應答(一些RPC框架只提供了遠程調用、序列化/反序列化的機制,可是具體調用的成功與否實際是拋給了開發者本人),利用Akka的應答機制很容易實現這些功能。特殊狀況下發送者發送了消息,可是最終接受者卻沒有接收到消息,致使這一狀況發生的因素不少(例如:發送者調用完發送接口後,發送者所在進程奔潰了;網絡故障;接收者不存在等)。若是這些消息的成功接收與處理對於整個應用而言具備強一致性的要求,那麼這些都會致使不少困擾,好在咱們可使用Akka的持久化機制。web
發送者在發送消息以前先對消息進行持久化,那麼不管任何緣由致使沒有收到接收者的成功回覆時,咱們總能有辦法從持久化信息中找出那些未被成功回覆的消息進行重發(這說明接收者接到的消息有可能會重複,此時須要保證接收者的實現是冥等的)。當接收者收到消息進行處理後須要向發送者發送成功回覆,發送者收到回覆後的第一個動做應當是對回覆內容的持久化,不然有可能在還未真正對成功回覆處理時宕機或進程奔潰致使回覆消息丟失(在持久化與收到消息之間仍然會存在宕機和進程奔潰的狀況,只不過這個時間很是短,所以丟失回覆的可能會很低),當持久化回覆消息完成後,能夠本身慢慢來處理這些確認信息,而不用擔憂它們丟失了。spring
本文將根據Akka官網的例子,對其作一些適應性改造後,向你們展現Akka持久化的另外一個強大武器——At least once delivery!緩存
消息投遞規則
通常而言,消息投遞有下面三種狀況:服務器
at-most-once 意味着每條應用了這種機制的消息會被投遞0次或1次。能夠說這條消息可能會丟失。
at-least-once 意味着每條應用了這種機制的消息潛在的存在屢次投遞嘗試並保證至少會成功一次。就是說這條消息可能會重複可是不會丟失。
exactly-once 意味着每條應用了這種機制的消息只會向接收者準確的發送一次。換言之,這種消息既不會丟失也不會重複。
at-most-once的成本最低且性能最高,由於它在發送完消息後不會嘗試去記錄任何狀態,而後這條消息將被他拋之腦後。at-least-once須要發送者必須認識它所發送過的消息,並對沒有收到回覆的消息進行發送重試。exactly-once的成本是三者中最高的並且性能卻又是三者中最差的,它除了要求發送者有記憶和重試能力,還要求接收者可以認識接收過的消息並能過濾出那些重複的消息投遞。Akka的Actor模型通常提供的消息都屬於at-most-once,那是由於大多數場景都不須要有狀態的消息投遞,例如web服務器請求。當你有強一致性需求時,才應該啓用Akka的at-least-once機制,那就是你的Actor再也不繼承自UntypedActor,而是繼承自UntypedPersistentActorWithAtLeastOnceDelivery。
配置
若是要使用,那麼須要在中增長如下的一些配置:
網絡
at-least-once-delivery { redeliver-interval = 20000 redelivery-burst-limit = 100 }
redeliver-interval用於配置從新進行投遞嘗試的時間間隔,單位是毫秒。redelivery-burst-limit用於配置每次從新執行投遞嘗試時發送的最大消息條數。app
一致性消息例子
咱們首先來看看本例中用到的消息體MsgSent、Msg、Confirm及MsgConfirmed。MsgSent表明將要發送的消息,可是隻用於持久化,持久化完成後會將MsgSent轉換爲Msg進行發送。也就是說Msg纔會被真正用於消息發送。接收者收到Msg消息後將向發送者回復Confirm消息,須要注意的是Msg和Confirm都有屬性deliveryId,此deliveryId由發送者的持久化功能生成,一條Msg消息和其對應的Confirm回覆的deliveryId必須一致,不然在利用UntypedPersistentActorWithAtLeastOnceDelivery對回覆消息進行確認時會產生嚴重的bug。發送者收到接收者的Confirm回覆後首先將其轉換爲MsgConfirmed,而後對MsgConfirmed進行持久化,最後調用UntypedPersistentActorWithAtLeastOnceDelivery提供的confirmDelivery方法對回覆進行確認。MsgSent、Msg、Confirm及MsgConfirmed的代碼實現以下:框架
public interface Persistence { public static class Msg implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public final String s; public Msg(long deliveryId, String s) { this.deliveryId = deliveryId; this.s = s; } } public static class Confirm implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public Confirm(long deliveryId) { this.deliveryId = deliveryId; } } public static class MsgSent implements Serializable { private static final long serialVersionUID = 1L; public final String s; public MsgSent(String s) { this.s = s; } } public static class MsgConfirmed implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public MsgConfirmed(long deliveryId) { this.deliveryId = deliveryId; } } }
服務端
本例中的服務端很是簡單,是一個接收處理Msg消息,並向發送者回復Confirm消息的Actor,代碼以下:tcp
@Named("MyDestination") @Scope("prototype") public class MyDestination extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); public void onReceive(Object message) throws Exception { if (message instanceof Msg) { Msg msg = (Msg) message; log.info("receive msg : " + msg.s + ", deliveryId : " + msg.deliveryId); getSender().tell(new Confirm(msg.deliveryId), getSelf()); } else { unhandled(message); } } }
服務端的啓動代碼以下:ide
logger.info("Start myDestination"); final ActorRef myDestination = actorSystem.actorOf(springExt.props("MyDestination"), "myDestination"); logger.info("Started myDestination");
客戶端
具體介紹客戶端以前,先來列出其實現,代碼以下:性能
@Named("MyPersistentActor") @Scope("prototype") public class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery { LoggingAdapter log = Logging.getLogger(getContext().system(), this); private final ActorSelection destination; @Override public String persistenceId() { return "persistence-id"; } public MyPersistentActor(ActorSelection destination) { this.destination = destination; } @Override public void onReceiveCommand(Object message) { if (message instanceof String) { String s = (String) message; log.info("receive msg : " + s); persist(new MsgSent(s), new Procedure<MsgSent>() { public void apply(MsgSent evt) { updateState(evt); } }); } else if (message instanceof Confirm) { Confirm confirm = (Confirm) message; log.info("receive confirm with deliveryId : " + confirm.deliveryId); persist(new MsgConfirmed(confirm.deliveryId), new Procedure<MsgConfirmed>() { public void apply(MsgConfirmed evt) { updateState(evt); } }); } else if (message instanceof UnconfirmedWarning) { log.info("receive unconfirmed warning : " + message); // After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The re-sending will still continue, but you can choose to call confirmDelivery to cancel the re-sending. List<UnconfirmedDelivery> list = ((UnconfirmedWarning) message).getUnconfirmedDeliveries(); for (UnconfirmedDelivery unconfirmedDelivery : list) { Msg msg = (Msg) unconfirmedDelivery.getMessage(); confirmDelivery(msg.deliveryId); } } else { unhandled(message); } } @Override public void onReceiveRecover(Object event) { updateState(event); } void updateState(Object event) { if (event instanceof MsgSent) { final MsgSent evt = (MsgSent) event; deliver(destination, new Function<Long, Object>() { public Object apply(Long deliveryId) { return new Msg(deliveryId, evt.s); } }); } else if (event instanceof MsgConfirmed) { final MsgConfirmed evt = (MsgConfirmed) event; confirmDelivery(evt.deliveryId); } } }
正如咱們以前所述——要使用at-least-once的能力,就必須繼承UntypedPersistentActorWithAtLeastOnceDelivery。有關MsgSent、Msg、Confirm及MsgConfirmed等消息的處理過程已經介紹過,這裏再也不贅述。咱們注意到onReceiveCommand方法還處理了一種名爲UnconfirmedWarning的消息,這類消息將在at-least-once機制下進行無限或者必定數量的投遞嘗試後發送給當前Actor,這裏的數量能夠經過在at-least-once-delivery配置中增長配置項warn-after-number-of-unconfirmed-attempts來調整,例如:
at-least-once-delivery { redeliver-interval = 20000 redelivery-burst-limit = 100 warn-after-number-of-unconfirmed-attempts = 6 }
當你收到UnconfirmedWarning的消息時,說明已經超出了你指望的最大重試次數,此時能夠作一些控制了,例如:對於這些消息發送報警、丟棄等。本例中選擇了丟棄。
UntypedPersistentActorWithAtLeastOnceDelivery的狀態由那些還沒有被確認的消息和一個序列號組成。UntypedPersistentActorWithAtLeastOnceDelivery自己不會存儲這些狀態,依然須要你在調用deliver方法投遞消息以前,調用persist方法持久化這些事件或消息,以便於當持久化Actor可以在恢復階段恢復。在恢復階段,deliver方法並不會將發出消息,此時持久化Actor一面恢復,一面只能等待接收回復。當恢復完成,deliver將發送那些被緩存的消息(除了收到回覆,並調用confirmDelivery方法的消息)。
運行例子
本文將率先啓動客戶端並向服務端發送hello-1,hello-2,hello-3這三消息,可是因爲服務端此時並未啓動,因此客戶端會不斷重試,直到重試達到上限或者受到回覆並確認。服務端發送消息的代碼以下:
logger.info("Start myPersistentActor"); final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2551/user/myDestination"; final ActorSelection destination = actorSystem.actorSelection(path); final ActorRef myPersistentActor = actorSystem.actorOf(springExt.props("MyPersistentActor", destination), "myPersistentActor"); actorMap.put("myPersistentActor", myPersistentActor); logger.info("Started myPersistentActor"); myPersistentActor.tell("hello-1", null); myPersistentActor.tell("hello-2", null); myPersistentActor.tell("hello-3", null);
客戶端發送三條消息後,日誌中立馬打印出瞭如下內容:
可是一直未受到回覆信息,而後咱們啓動服務端,不一會就看到了如下日誌輸出:
咱們再來看看客戶端,發現已經收到了回覆,內容以下:
總結
經過使用UntypedPersistentActorWithAtLeastOnceDelivery提供的persist、deliver及confirmDelivery等方法能夠對整個應用的at-least-once需求,輕鬆實如今框架層面上一致的實現。
其它Akka應用的博文以下:
《Spring與Akka的集成》; 《使用Akka的遠程調用》; 《使用Akka構建集羣(一)》; 《使用Akka構建集羣(二)》; 《使用Akka持久化——持久化與快照》; 《使用Akka持久化——消息發送與接收》;