EventBus源碼閱讀(二)——Poster

  上一節在閱讀了EventBus的消息發送後,停在了postToSubscription方法上:async

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

   在不一樣的模式下,當前方法在不一樣線程中調用,會使用不一樣的Poster。上次對這一點沒有深究,今天來粗略的看一看。ide

  從上面的代碼中,咱們能夠看到,除了直接執行方法 invokeSubscriber(subscription, event);外,還有幾個特殊的Poster,分別是mainThreadPoster,backgroundPoster,和asyncPoster。oop

  咱們來一一閱讀。post

  mainThreadPoster的類叫做HandlerPoster,代碼不長,總共80來行。ui

  

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

  能夠看到HandlerPoster繼承了Handler。方法enqueue主要作了兩件事。this

  1、將事件請求放入請求隊列queue中。spa

  2、發送一個message到MainLooper中。線程

  每當HandlerPoster發送message,其中的handleMessage就要開始工做。EventBus爲其設置了最大工做時間。在工做時間內,程序會不斷地從queue中poll出請求,在主線程中執行它,直到queue隊列爲空,或是到達最大工做時間,纔會結束。咱們能夠看到,HandlerPoster其實就是對消息的發送作了處理,經過Handler,將其置於MainThread中來執行。code

 

  BackgroundPoster:blog

  BackgroundPoster則是實現了Runnable接口。

final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

 

 

  BackgroundPoster也維護了一個請求隊列。與HandlerPoster不一樣的是,它再也不使用message來發動任務的執行。而是eventBus.getExecutorService().execute(this);這個方法最終能夠追溯到EventBusBuilder中的DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();也就是說,在BackgroundPoster中執行的任務必定不在主線程。另外一個與HandlerPoster不一樣的點是,BackgroundPoster沒有最大工做時間,但它有個最大等待時間——1000ms。若是隊列爲空,在等待1000ms後,隊列中依然沒有新的事件加入,再表示事件請求已所有執行。

 

AsyncPoster:

  AsyncPoster的實現最爲簡單,它的功能是,不能當前處於什麼線程,必定都會新開一個線程來執行這個任務。

  

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

 

   代碼比較簡單,咱們能夠看出AsyncPoster是在BackgroundPoster上作的減法。去掉了線程保護(由於新開),去掉了wait(也是由於新開)。

 

  經過以上閱讀,咱們大體對EventBus的事件發送機制有了瞭解。下一節,咱們將開始閱讀,它的訂閱機制。

 

Done~

相關文章
相關標籤/搜索