Android和iOS開發中的異步處理(四)——異步任務和隊列

本文是系列文章《Android和iOS開發中的異步處理》的第四篇。在本篇文章中,咱們主要討論在客戶端編程中常用的隊列結構,它的異步編程方式以及相關的接口設計問題。java


前幾天,有位同事跑過來一塊兒討論一個技術問題。狀況是這樣的,他最近在開發一款手遊,用戶在客戶端上的每次操做都須要向服務器同步數據。原本按照傳統的網絡請求處理方式,用戶發起操做後,須要等待操做完成,這時界面要顯示一個請求等待的過程(好比轉菊花)。當請求完成了,客戶端顯示層才更新,用戶也才能發起下一個操做。可是,這個遊戲要求用戶能在短期內連續作不少操做。若是每一個操做都要經歷一個請求等待的過程,無疑體驗是很糟糕的。android

其實呢,這裏就須要一個操做任務隊列。用戶不用等待一個操做完成,而是隻要把操做放入隊列裏,就能夠繼續進行下一步操做了。只是,當隊列中有操做出錯時,須要進入一個統一的錯誤處理流程。固然,服務器也要配合進行一些處理,好比要更加慎重地對待操做去重問題。git

本文要討論的就是跟隊列的設計和實現有關的那些問題。程序員

注:本系列文章中出現的代碼已經整理到GitHub上(持續更新),代碼庫地址爲:github

其中,當前這篇文章中出現的Java代碼,位於com.zhangtielei.demos.async.programming.queueing這個package中。編程

概述

在客戶端編程中,使用隊列的場景實際上是不少的。這裏咱們列舉其中幾個。安全

  • 發送聊天消息。如今通常的聊天軟件都容許用戶連續輸入多條聊天消息,也就是說,用戶不用等待前一條消息發送成功了,再鍵入第二條消息。系統會保證用戶的消息有序,並且因爲網絡情況很差而發送失敗的消息會經歷若干次重試,從而保證消息盡力送達。這其實背後有一個消息發送隊列,它對消息進行排隊處理,而且在錯誤發生時進行有限的重試。
  • 一次上傳多張照片。若是用戶可以一次性選中多張照片進行上傳操做,這個上傳過程時間會比較長,通常須要一個或多個隊列。隊列的重試功能還可以容許文件的斷點續傳(固然這要求服務端要有相應的支持)。
  • 將關鍵的高頻操做異步化,提高體驗。好比前面提到的那個遊戲連續操做的例子,再好比在微信朋友圈發照片或者評論別人,都不須要等待本次網絡請求結束,就能夠進行後續操做。這背後也隱藏着一個隊列機制。

爲了討論方便,咱們把這種對一系列操做進行排隊,並具有必定失敗重試能力的隊列稱爲「任務隊列」。服務器

下面本文分三個章節來討論異步任務和任務隊列的相關話題。微信

  1. 介紹傳統的線程安全隊列TSQ(Thread-Safe Queue)。
  2. 適合客戶端編程環境的無鎖隊列。這一部分遵循異步任務的經典回調方式(Callback)來設計接口。關於異步任務的回調相關的詳細討論,請參見這個系列的第二篇
  3. 基於RxJava響應式編程的思想實現的隊列。在這一部分,咱們會看到RxJava對於異步任務的接口設計會產生怎樣的影響。

Thread-Safe Queue

在多線程的環境下,提到隊列就不能不提TSQ。它是一個很經典的工具,在不一樣的線程之間提供了一條有序傳輸數據的通道。它的結構圖以下所示。markdown

消費者和生產者分屬不一樣的線程,這樣消費者和生產者才能解耦,生產不至於被消費所阻塞。若是把TSQ用於任務隊列,那麼生產至關於用戶的操做產生了任務,消費至關於任務的啓動和執行。

消費者線程運行在一個循環當中,它不停地嘗試從隊列裏取數據,若是沒有數據,則阻塞在隊列頭上。這種阻塞操做須要依賴操做系統的一些原語。

利用隊列進行解耦,是一個很重要的思想。說遠一點,TSQ的思想推廣到進程之間,就至關於在分佈式系統裏常用的Message Queue。它對於異構服務之間的解耦,以及屏蔽不一樣服務之間的性能差別,能夠起到關鍵做用。

而TSQ在客戶端編程中比較少見,緣由包括:

  • 它須要額外啓動一個單獨的線程做爲消費者。
  • 更適合客戶端環境的「主線程->異步線程->主線程」的編程模式(參見這個系列的第一篇中Run Loop那一章節的相關描述),使得生產者和消費者能夠都運行在主線程中,這樣就不須要一個Thread-Safe的隊列,而是隻須要一個普通隊列就好了(下一章要講到)。

咱們在這裏提到TSQ,主要是由於它比較經典,也可以和其它方式作一個對比。咱們在這裏就不給出它的源碼演示了,想了解細節的同窗能夠參見GitHub。GitHub上的演示代碼使用了JDK中現成的TSQ的實現:LinkedBlockingQueue。

基於Callback的任務隊列

如上圖所示,生產者和消費者都運行在一個線程,即主線程。按照這種思路來實現任務隊列,咱們須要執行的任務自己必須是異步的,不然整個隊列的任務就無法異步化。

咱們定義要執行的異步任務的接口以下:

public interface Task {
    /** * 惟一標識當前任務的ID * @return */
    String getTaskId();

    /** * 因爲任務是異步任務, 那麼start方法被調用只是啓動任務; * 任務完成後會回調TaskListener. * * 注: start方法需在主線程上執行. */
    void start();

    /** * 設置回調監聽. * @param listener */
    void setListener(TaskListener listener);

    /** * 異步任務回調接口. */
    interface TaskListener {
        /** * 當前任務完成的回調. * @param task */
        void taskComplete(Task task);
        /** * 當前任務執行失敗的回調. * @param task * @param cause 失敗緣由 */
        void taskFailed(Task task, Throwable cause);
    }
}複製代碼

因爲Task是一個異步任務,因此咱們爲它定義了一個回調接口TaskListener

getTaskId是爲了獲得一個能惟一標識當前任務的ID,便於對不一樣任務進行精確區分。

另外,爲了更通用的表達失敗緣由,咱們這裏選用一個Throwable對象來表達(注:在實際編程中這未必是一個值得效仿的作法,具體狀況請具體分析)。

有人可能會說:這裏把Task接口定義成異步的,那若是想執行一個同步的任務該怎麼辦?這其實很好辦。把同步任務改形成異步任務是很簡單的,有不少種方法(反過來卻很難)。

任務隊列的接口,定義以下:

public interface TaskQueue {
    /** * 向隊列中添加一個任務. * @param task */
    void addTask(Task task);

    /** * 設置監聽器. * @param listener */
    void setListener(TaskQueueListener listener);

    /** * 銷燬隊列. * 注: 隊列在最後不用的時候, 應該主動銷燬它. */
    void destroy();

    /** * 任務隊列對外監聽接口. */
    interface TaskQueueListener {
        /** * 任務完成的回調. * @param task */
        void taskComplete(Task task);
        /** * 任務最終失敗的回調. * @param task * @param cause 失敗緣由 */
        void taskFailed(Task task, Throwable cause);
    }
}複製代碼

任務隊列TaskQueue自己的操做也是異步的,addTask只是將任務放入隊列,至於它何時完成(或失敗),調用者須要監聽TaskQueueListener接口。

須要注意的一點是,TaskQueueListenertaskFailed,與前面TaskListenertaskFailed不一樣,它表示任務在通過必定次數的失敗後,最終放棄重試從而最終失敗。然後者只表示那個任務一次執行失敗。

咱們重點討論TaskQueue的實現,而Task的實現咱們這裏不關心,咱們只關心它的接口。TaskQueue的實現代碼以下:

public class CallbackBasedTaskQueue implements TaskQueue, Task.TaskListener {
    private static final String TAG = "TaskQueue";

    /** * Task排隊的隊列. 不須要thread-safe */
    private Queue<Task> taskQueue = new LinkedList<Task>();

    private TaskQueueListener listener;
    private boolean stopped;

    /** * 一個任務最多重試次數. * 重試次數超過MAX_RETRIES, 任務則最終失敗. */
    private static final int MAX_RETRIES = 3;
    /** * 當前任務的執行次數記錄(當嘗試超過MAX_RETRIES時就最終失敗) */
    private int runCount;

    @Override
    public void addTask(Task task) {
        //新任務加入隊列
        taskQueue.offer(task);
        task.setListener(this);

        if (taskQueue.size() == 1 && !stopped) {
            //當前是第一個排隊任務, 當即執行它
            launchNextTask();
        }
    }

    @Override
    public void setListener(TaskQueueListener listener) {
        this.listener = listener;
    }

    @Override
    public void destroy() {
        stopped = true;
    }

    private void launchNextTask() {
        //取當前隊列頭的任務, 但不出隊列
        Task task = taskQueue.peek();
        if (task == null) {
            //impossible case
            Log.e(TAG, "impossible: NO task in queue, unexpected!");
            return;
        }

        Log.d(TAG, "start task (" + task.getTaskId() + ")");
        task.start();
        runCount = 1;
    }

    @Override
    public void taskComplete(Task task) {
        Log.d(TAG, "task (" + task.getTaskId() + ") complete");
        finishTask(task, null);
    }

    @Override
    public void taskFailed(Task task, Throwable error) {
        if (runCount < MAX_RETRIES && !stopped) {
            //能夠繼續嘗試
            Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount);
            task.start();
            runCount++;
        }
        else {
            //最終失敗
            Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: " + runCount);
            finishTask(task, error);
        }
    }

    /** * 一個任務最終結束(成功或最終失敗)後的處理 * @param task * @param error */
    private void finishTask(Task task, Throwable error) {
        //回調
        if (listener != null && !stopped) {
            try {
                if (error == null) {
                    listener.taskComplete(task);
                }
                else {
                    listener.taskFailed(task, error);
                }
            }
            catch (Throwable e) {
                Log.e(TAG, "", e);
            }
        }
        task.setListener(null);

        //出隊列
        taskQueue.poll();

        //啓動隊列下一個任務
        if (taskQueue.size() > 0 && !stopped) {
            launchNextTask();
        }
    }

}複製代碼

在這個實現中,咱們須要注意的幾點是:

  • 進出隊列的全部操做(offer, peek, take)都運行在主線程,因此隊列數據結構再也不須要線程安全。咱們選擇了LinkedList的實現。
  • 任務的啓動執行,依賴兩個機會:
    • 任務進隊列addTask的時候,若是原來隊列爲空(當前任務是第一個任務),那麼啓動它;
    • 一個任務執行完成(成功了,或者最終失敗了)後,若是隊列裏有排隊的其它任務,那麼取下一個任務啓動執行。
  • 任務一次執行失敗,並不算失敗,還要通過若干次重試。若是重試次數超過MAX_RETRIES,纔算最終失敗。runCount記錄了當前任務的累計執行次數。

CallbackBasedTaskQueue的代碼揭示了任務隊列的基本實現模式。

任務隊列對於失敗任務的重試策略,大大提升了最終成功的機率。在GitHub上的演示程序中,我把Task的失敗機率設置得很高(高達80%),在重試3次的配置下,當任務執行的時候仍然有比較大的機率能最終執行成功。

基於RxJava的任務隊列

關於RxJava到底有什麼用?網上有不少討論。

有人說,RxJava就是爲了異步。這個固然沒錯,但說得不具體。

也有人說,RxJava的真正好處就是它提供的各類lift變換。還有人說,RxJava最大的用處是它的Schedulers機制,可以方便地切換線程。其實這些都不是革命性的關鍵因素。

那關鍵的是什麼呢?我我的認爲,是它對於回調接口設計產生的根本性的影響:它消除了爲每一個異步接口單獨定義回調接口的必要性

這裏立刻就有一個例子。咱們使用RxJava對TaskQueue接口從新進行改寫。

public interface TaskQueue {
    /** * 向隊列中添加一個任務. * * @param task * @param <R> 異步任務執行完要返回的數據類型. * @return 一個Observable. 調用者經過這個Observable獲取異步任務執行結果. */
    <R> Observable<R> addTask(Task<R> task);

    /** * 銷燬隊列. * 注: 隊列在最後不用的時候, 應該主動銷燬它. */
    void destroy();
}複製代碼

咱們仔細看一看這個修改後的TaskQueue接口定義。

  • 原來的回調接口TaskQueueListener沒有了。
  • 異步接口addTask原來沒有返回值,如今返回了一個Observable。調用者拿到這個Observable,而後去訂閱它(subscribe),就能得到任務執行結果(成功或失敗)。這裏的改動很關鍵。原本addTask什麼也不返回,要想得到結果必須監聽一個回調接口,這是典型的異步任務的運做方式。但這裏返回一個Observable以後,讓它感受上很是相似一個同步接口了。說得再抽象一點,這個Observable是咱們站在當下對於將來的一個指代,原本尚未運行的、發生在將來的虛無縹緲的任務,這時候有一個實實在在的東西被咱們抓在手裏了。並且咱們還能對它在當下就進行不少操做,並能夠和其它Observable結合。這是這一思想真正的強大之處。

相應地,Task接口原本也是一個異步接口,天然也能夠用這種方式進行修改:

/** * 異步任務接口定義. * * 再也不使用TaskListener傳遞迴調, 而是使用Observable. * * @param <R> 異步任務執行完要返回的數據類型. */
public interface Task <R> {
    /** * 惟一標識當前任務的ID * @return */
    String getTaskId();

    /** * * 啓動任務. * * 注: start方法需在主線程上執行. * * @return 一個Observable. 調用者經過這個Observable獲取異步任務執行結果. */
    Observable<R> start();
}複製代碼

這裏把改成RxJava的接口討論清楚了,具體的隊列實現反而不重要了。具體實現代碼就不在這裏討論了,想了解詳情的同窗仍是參見GitHub。注意GitHub的實現中用到了一個小技巧:把一個異步的任務封裝成Observable,咱們可使用AsyncOnSubscribe。

總結

再說一下TSQ

咱們在文章開頭講述了TSQ,並指出它在客戶端編程中不多被使用。但並非說在客戶端環境中TSQ就沒有存在的意義。

實際上,客戶端的Run Loop(即Android的Looper)自己就是一個TSQ,要否則它也無法在不一樣線程之間安全地傳遞消息和調度任務。正是由於客戶端有了一個Run Loop,咱們纔有可能使用無鎖的方式來實現任務隊列。因此說,咱們在客戶端的編程,老是與TSQ有着千絲萬縷的聯繫。

順便說一句,Android中的android.os.Looper,最終會依賴Linux內核中大名鼎鼎的epoll事件機制。

本文的任務隊列設計中所忽略的

本文的核心任務是要講解任務隊列的異步編程方式,因此忽略了一些設計細節。若是你要實現一個生產環境能使用的任務隊列,可能還須要考慮如下這些點:

  • 本文只設計了任務的成功和失敗回調,沒有執行進度回調。
  • 本文沒有涉及到任務取消和暫停的問題(咱們下一篇文章會涉及這個話題)。
  • 任務隊列的一些細節參數應該是能夠由使用者設置的,好比最大重試次數。
  • 長生命週期的隊列和短生命週期的頁面之間的交互,本文沒有考慮。在GitHub實現的演示代碼中,爲了簡單起見,演示頁面關閉後,任務隊列也銷燬了。但實際中不該該是這樣的。關於「長短生命週期的交互」,我後來發現也是一個比較重要的問題,也許後面咱們有機會再討論。
  • 在Android中,相似任務隊列這種可能長時間後臺運行的組件,通常外層會使用Service進行封裝。
  • 任務隊列對於失敗重試的處理,要求服務器慎重地對待去重問題。
  • 監聽到任務隊列失敗發生以後,錯誤處理變得複雜。

RxJava的優缺點

本文最後運用了RxJava對任務隊列進行了重寫。咱們確實將接口簡化了許多,省去了回調接口的設計,也讓調用者能用統一的方式來處理異步任務。

可是,咱們也須要注意到RxJava帶來的一些問題:

  • RxJava是個比較重的框架,它很是抽象,難以理解。它對於接口的調用者簡單,而對於接口的實現者來講,是個難題。在實現一個異步接口的時候,如何返回一個恰當的Observable實例,有時候並非那麼顯而易見。
  • Observable依賴subscribe去驅動它的上游開始運行。也就是說,你若是隻是添加一個任務,但不去觀察它,它就不會執行!若是你只是想運行一個任務,但並不關心結果,那麼,這辦不到。舉個不恰當的例子,這有點像量子力學,觀察對結果形成影響......
  • 受前一點影響,在本文給出的GitHub代碼的實現中,第一個任務的真正啓動運行,並非在addTask中,而是有所延遲,延遲到調用者的subscribe開始執行後。並且其執行線程環境有可能受到調用者對於Schedulers的設置的影響(好比經過subscribeOn),有不在主線程執行的風險。
  • RxJava在調試時會出現奇怪的、讓人難以理解的調用棧。

考慮到RxJava帶來的這些問題,若是我要實現一個完整功能的任務隊列或者其它複雜的異步任務,特別是要把它開源出來的的時候,我有可能不會讓它對RxJava產生絕對的依賴。而是有可能像Retrofit那樣,同時支持本身的輕量的異步機制和RxJava。


在本文結束以前,我再提出一個有趣的開放性問題。本文GitHub上給出的代碼大量使用了匿名類(至關於Java 8的lambda表達式),這會致使對象之間的引用關係變得複雜。那麼,對於這些對象的引用關係的分析,會是一個頗有趣的話題。好比,這些引用關係開始是如何隨着程序執行創建起來的,最終銷燬的時候又是如何解除的?有沒有內存泄露呢?歡迎留言討論。

在下一篇,咱們將討論有關異步任務更復雜的一個問題:異步任務的取消。

(完)

其它精選文章

相關文章
相關標籤/搜索