[EventBus源碼解析] 訂閱者處理消息的四種ThreadMode

前言

  在前面,咱們探討了如何在本身的代碼中引入EventBus,進行基本的事件分發/監聽;對註冊觀察者與事件發送的過程進行了淺析。從以前的學習中,咱們瞭解到,EventBus一共有4種onEvent方法,要根據實際需求的不一樣選用不一樣的事件處理方法。本篇blog中,咱們集中研究一下這四種事件處理方法內部分別作了什麼事情,是如何實現的。html

  本篇會較多涉及java中的併發/多線程技術,這部分基礎不夠紮實的朋友,能夠趁機黑練一下。(實際上是說我本身 -_-!)java

post的最後一步

  EventBus.post 是分發事件時調用的方法,post時,根據EventType找到對應的Subscription列表,遍歷列表依次調用postToSubscription。方法以下多線程

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case PostThread:
                invokeSubscriber(subscription, event);
                break;
            case MainThread:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BackgroundThread:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case Async:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

  根據EventBus提供的4種消息處理方式(ThreadMode)進行switch/case,這4種ThreadMode的含義,在以前的文章中已經給出過說明。結合代碼,能看出在這四個case語句裏面,處理方式無非如下兩種:併發

  1. 若當前線程是ThreadMode所指明的線程,直接調用 invokeSubscriber(subscription, event);
  2. 當前線程不是ThreadMode所指明的線程,將 subscription、event 加入到目標線程的隊列中(enqueue)。

  接下來逐一分析這兩種處理方式。異步

交由當前線程處理

  從方法名與方法所起的做用來看,invokeSubscriber必定是用到了反射機制,代碼中是如何作的呢?async

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

 

  如此簡單,subscription中保存了以前解析出的以「onEvent」開頭的方法,直接調用之。固然要catch住invoke可能會拋出的三個Exception:IllegalArgumentException, InvocationTargetException, IllegalAccessException,不知爲什麼這裏只catch了其中兩個。我判斷緣由是,以前解析subscription時,已經就參數類型作過過濾了,能夠保證傳入的event必定是方法所須要的,因此就無需再去catch參數類型不匹配的Exception。ide

交由非當前線程處理

  在EventBus中,聲明瞭以下三個Posteroop

    private final HandlerPoster mainThreadPoster;
    private final BackgroundPoster backgroundPoster;
    private final AsyncPoster asyncPoster;

 

  這三個Poster裏,都有enqueue方法,用來將事件壓入隊列。可是這三種處理方式是略有不一樣的,依次來看。post

HandlerPoster

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

  HandlerPoster,人如其名,繼承了Handler類,內部維護了一個PendingPostQueue,每當有時間enqueue時,判斷當前Poster是否處於激活狀態。對於未激活的將其激活。激活中的Poster,會經過sendMessage(obtainMessage())來依次處理PendingPostQueue中的事件,這是經過重載Handler中的handleMessage來作的。學習

BackgroundPoster

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

  與HandlerPoster不一樣的是,BackgroundPoster並無繼承自Handler,而是實現了Runnable接口。這麼作的緣由是,在用到BackgroudPoster的場合,必須是新建一個進程來處理事件,這裏就使用了eventBus.getExecutorService().execute(this),從線程池裏取出線程來執行。能夠看到後面的AsyncPoster也是採用相似的方法。

AsyncPoster

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

  一樣是用Runnable來實現,爲啥AsyncPoster的代碼比BackgroundPoster的簡單這麼多呢?仔細對比一下,就會發現BackgroudPoster中多出的代碼,都是用來處理同步的。因此,緣由在於,BackgroudPoster必須保證事件處理的順序,先進先出。而AsyncQueue則沒有這個顧慮。因此在須要按順序處理事件的場合,就不要使用AsyncPoster啦!

小結

  本篇blog簡單介紹了四種ThreadMode內部的機制,若是須要執行事件的線程是當前線程,則直接用反射調用方法;不然將事件壓入對應Poster的隊列中,依次(HandlerPoster,BackgroundPoster)或異步(AsyncPoster)執行。

下期預告

  從目錄結構上,對EventBus項目進行總體解析。

相關文章
相關標籤/搜索