同步轉異步是一種常見的優化手段,最近一次在作調優時便大量使用了這種方式。一般在一個業務場景中會包含多個操做,有些操做的結果須要讓用戶立馬知道,但有些操做則不須要。這些用戶不須要等待結果的操做,咱們在編程的時候即可以異步處理。這麼作最直接的效果就是縮短接口響應速度,提高用戶體驗。html
我這次優化的是下單場景。建立訂單時同步操做有: 查詢庫存,扣款,刷新庫存; 可異步的操做有: 通知風控系統,給買家發送扣款郵件和短信,通知賣家,建立一些定時任務。java
最初我用的方案是Spring提供的@Async機制。這是一種很輕量的作法,只須要在可異步調用的方法上加上@Async註解便可。可是這種作法也存在兩個問題: 1. 不支持類內部方法之間的調用。使用這種方式,我必需要把一些須要異步調用的方法轉移到一個新類裏,這點讓人不爽。2. 當系統crash的時候,緩存的任務就丟了。所以,這個方案並不特別理想。mysql
兩年以前用akka作過一個社交應用的後端服務,並且消息模型天生異步,因此天然想到了用akka。可是用akka的話也有一些地方須要注意。第一,Actor是單線程順序執行,若是任務比較多最好使用actor router。actor router管理多個actor,能夠作到必定限度的並行執行。第二,使用有持久化actor,確保任務不會丟失。我會以發push提醒爲例描述一下這個方案的實現細節。多數場景中發push提醒均可進行異步調用。sql
下單邏輯都放在OrderService中,下單成功給賣家發送push提醒時,Orderservice會給NotificationActor發送一個消息。數據庫
NotificationActor有兩個職責:1. 保存接收到的任務;2. 把消息轉發給NotificationWorker,當Worker執行成功以後把消息刪除。在最新版本的akka中可使用At-Least-Once Delivery實現這兩個功能。編程
NotificationWorkerRouter僅僅處理髮送邏輯。WorkerActor以Router方式進行部署,以實現並行處理,提升處理效率。後端
下邊看一下具體實現細節:緩存
public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private ActorRef notificationWorkers = null; private final String uniqueId = UUID.randomUUID().toString(); @Autowired public NotificationActor(final ActorSystemManager actorSystemManager) { this.notificationWorkers = actorSystemManager.notificationWorkers; } @Override public String persistenceId() { return "journal:notification-actor:" + uniqueId; } @Override public void onReceiveRecover(final Object msg) throws Throwable { if (msg instanceof NotificationMessage) { deliverAckMessage((NotificationMessage) msg); } } @Override public void onReceiveCommand(final Object msg) throws Throwable { if (msg instanceof NotificationMessage) { persist(msg, m -> { deliverAckMessage((NotificationMessage) m); }); } else if (msg instanceof Confirm) { Confirm confirm = (Confirm) msg; confirmMessage(new MsgConfirmed(confirm.deliveryId)); } else if (msg instanceof UnconfirmedWarning) { UnconfirmedWarning warning = (UnconfirmedWarning) msg; warning.getUnconfirmedDeliveries().forEach(d -> { log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message()); confirmMessage(new MsgConfirmed(d.deliveryId())); }); } else { unhandled(msg); } } private void deliverAckMessage(NotificationMessage event) { deliver(notificationWorkers.path(), (Function<Long, Object>) deliveryId -> new AckMessage(deliveryId, event)); } private void confirmMessage(final MsgConfirmed evt) { confirmDelivery(evt.deliveryId); deleteMessages(evt.deliveryId); } public interface NotificationMessage extends Event {} public static final @Data class PushMessage implements NotificationMessage { private final Long source; private final Long target; private final String trigger; private final ImmutableMap<String, Serializable> payload; } } public class NotificationWorkerActor extends UntypedActor { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private final @NonNull NotificationService notificationService; @Autowired public NotificationWorkerActor(final NotificationService notificationService) { this.notificationService = notificationService; } @Override public void onReceive(final Object event) throws Throwable { if (event instanceof AckMessage) { final AckMessage ackMessage = (AckMessage) event; NotificationMessage msg = (NotificationMessage) ackMessage.msg; log.info("[NOTIFICATION] receive message: {}", msg); if (msg instanceof PushMessage) { final PushMessage m = (PushMessage) msg; log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget()); notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload()); } sender().tell(new Confirm(ackMessage.deliveryId), self()); } else { unhandled(event); } } } public class OrderService { public void createOrder() { actorSystemManager.notificationActor.tell( new PushMessage(), ActorRef.noSender() ); } }
最先實施這個方案的時候遇到一個問題,說一下這個問題如何產生的。咱們一共有三臺服務器,三臺服務器都會部署一樣的代碼,以NotificationActor爲例,它會分別部署在三個機器上。actor journal咱們使用mysql存儲。akka persistent actor內部有一個sequence number用來對接收到的消息進行計數,這個數字是遞增的。同時這個數字也會在journal中記錄。最初個人persistenceId方法是這樣實現的:服務器
@Override public String persistenceId() { return "journal:notification-actor"; }
那麼,假如server1上的NotificationActor接收了一個消息,那麼它的sequence number會變成1,mysql中將會存儲的sequence number爲1的消息。這時server2上也接收到了一個消息,由於它的最初sequence number是0,因此它也會把如今接收到的消息的sequence number設置爲1。可是顯然這條消息是不能持久化的,由於它和數據庫記錄的sequence number衝突了。根本緣由是三臺服務器上的NotificationActor的persistenceId是同樣的。dom
上邊代碼中給出了一種方案,把persistenceId變成random的,每次actor啓動的時候都會獲得不一樣的persistenceId,這樣就解決了上述問題。還有一種方案是引入akka cluster,使用akka singleton。這種方案會在下一篇文章中詳細說明。