使用akka做異步任務處理

同步轉異步是一種常見的優化手段,最近一次在作調優時便大量使用了這種方式。一般在一個業務場景中會包含多個操做,有些操做的結果須要讓用戶立馬知道,但有些操做則不須要。這些用戶不須要等待結果的操做,咱們在編程的時候即可以異步處理。這麼作最直接的效果就是縮短接口響應速度,提高用戶體驗。html

我這次優化的是下單場景。建立訂單時同步操做有: 查詢庫存,扣款,刷新庫存; 可異步的操做有: 通知風控系統,給買家發送扣款郵件和短信,通知賣家,建立一些定時任務。java

最初我用的方案是Spring提供的@Async機制。這是一種很輕量的作法,只須要在可異步調用的方法上加上@Async註解便可。可是這種作法也存在兩個問題: 1. 不支持類內部方法之間的調用。使用這種方式,我必需要把一些須要異步調用的方法轉移到一個新類裏,這點讓人不爽。2. 當系統crash的時候,緩存的任務就丟了。所以,這個方案並不特別理想。mysql

兩年以前用akka作過一個社交應用的後端服務,並且消息模型天生異步,因此天然想到了用akka。可是用akka的話也有一些地方須要注意。第一,Actor是單線程順序執行,若是任務比較多最好使用actor router。actor router管理多個actor,能夠作到必定限度的並行執行。第二,使用有持久化actor,確保任務不會丟失。我會以發push提醒爲例描述一下這個方案的實現細節。多數場景中發push提醒均可進行異步調用。sql

classes.png

下單邏輯都放在OrderService中,下單成功給賣家發送push提醒時,Orderservice會給NotificationActor發送一個消息。數據庫

NotificationActor有兩個職責:1. 保存接收到的任務;2. 把消息轉發給NotificationWorker,當Worker執行成功以後把消息刪除。在最新版本的akka中可使用At-Least-Once Delivery實現這兩個功能。編程

NotificationWorkerRouter僅僅處理髮送邏輯。WorkerActor以Router方式進行部署,以實現並行處理,提升處理效率。後端

下邊看一下具體實現細節:緩存

public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef notificationWorkers = null;
    private final String uniqueId = UUID.randomUUID().toString();

    @Autowired
    public NotificationActor(final ActorSystemManager actorSystemManager) {
        this.notificationWorkers = actorSystemManager.notificationWorkers;
    }

    @Override public String persistenceId() {
        return "journal:notification-actor:" + uniqueId;
    }

    @Override public void onReceiveRecover(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            deliverAckMessage((NotificationMessage) msg);
        }
    }

    @Override public void onReceiveCommand(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            persist(msg, m -> { deliverAckMessage((NotificationMessage) m); });
        } else if (msg instanceof Confirm) {
            Confirm confirm = (Confirm) msg;
            confirmMessage(new MsgConfirmed(confirm.deliveryId));
        } else if (msg instanceof UnconfirmedWarning) {
            UnconfirmedWarning warning = (UnconfirmedWarning) msg;
            warning.getUnconfirmedDeliveries().forEach(d -> {
                log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message());

                confirmMessage(new MsgConfirmed(d.deliveryId()));
            });
        } else {
            unhandled(msg);
        }
    }

    private void deliverAckMessage(NotificationMessage event) {
        deliver(notificationWorkers.path(), (Function<Long, Object>) deliveryId -> new AckMessage(deliveryId, event));
    }

    private void confirmMessage(final MsgConfirmed evt) {
        confirmDelivery(evt.deliveryId);
        deleteMessages(evt.deliveryId);
    }

    public interface NotificationMessage extends Event {}

    public static final @Data class PushMessage implements NotificationMessage {
        private final Long source;
        private final Long target;
        private final String trigger;
        private final ImmutableMap<String, Serializable> payload;
    }
}

public class NotificationWorkerActor extends UntypedActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private final @NonNull NotificationService notificationService;

    @Autowired
    public NotificationWorkerActor(final NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override public void onReceive(final Object event) throws Throwable {
        if (event instanceof AckMessage) {
            final AckMessage ackMessage = (AckMessage) event;
            NotificationMessage msg = (NotificationMessage) ackMessage.msg;
            log.info("[NOTIFICATION] receive message: {}", msg);

            if (msg instanceof PushMessage) {
                final PushMessage m = (PushMessage) msg;
                log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget());
                notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload());
            }
            sender().tell(new Confirm(ackMessage.deliveryId), self());
        } else {
            unhandled(event);
        }
    }
}

public class OrderService {
    public void createOrder() {
        actorSystemManager.notificationActor.tell(
          new PushMessage(), ActorRef.noSender()
        );
    }
}

最先實施這個方案的時候遇到一個問題,說一下這個問題如何產生的。咱們一共有三臺服務器,三臺服務器都會部署一樣的代碼,以NotificationActor爲例,它會分別部署在三個機器上。actor journal咱們使用mysql存儲。akka persistent actor內部有一個sequence number用來對接收到的消息進行計數,這個數字是遞增的。同時這個數字也會在journal中記錄。最初個人persistenceId方法是這樣實現的:服務器

@Override public String persistenceId() {
    return "journal:notification-actor";
}

那麼,假如server1上的NotificationActor接收了一個消息,那麼它的sequence number會變成1,mysql中將會存儲的sequence number爲1的消息。這時server2上也接收到了一個消息,由於它的最初sequence number是0,因此它也會把如今接收到的消息的sequence number設置爲1。可是顯然這條消息是不能持久化的,由於它和數據庫記錄的sequence number衝突了。根本緣由是三臺服務器上的NotificationActor的persistenceId是同樣的。dom

上邊代碼中給出了一種方案,把persistenceId變成random的,每次actor啓動的時候都會獲得不一樣的persistenceId,這樣就解決了上述問題。還有一種方案是引入akka cluster,使用akka singleton。這種方案會在下一篇文章中詳細說明。

相關文章
相關標籤/搜索