本文是系列文章《Android和iOS開發中的異步處理》的第四篇。在本篇文章中,咱們主要討論在客戶端編程中常用的隊列結構,它的異步編程方式以及相關的接口設計問題。java
前幾天,有位同事跑過來一塊兒討論一個技術問題。狀況是這樣的,他最近在開發一款手遊,用戶在客戶端上的每次操做都須要向服務器同步數據。原本按照傳統的網絡請求處理方式,用戶發起操做後,須要等待操做完成,這時界面要顯示一個請求等待的過程(好比轉菊花)。當請求完成了,客戶端顯示層才更新,用戶也才能發起下一個操做。可是,這個遊戲要求用戶能在短期內連續作不少操做。若是每一個操做都要經歷一個請求等待的過程,無疑體驗是很糟糕的。android
其實呢,這裏就須要一個操做任務隊列。用戶不用等待一個操做完成,而是隻要把操做放入隊列裏,就能夠繼續進行下一步操做了。只是,當隊列中有操做出錯時,須要進入一個統一的錯誤處理流程。固然,服務器也要配合進行一些處理,好比要更加慎重地對待操做去重問題。git
本文要討論的就是跟隊列的設計和實現有關的那些問題。程序員
注:本系列文章中出現的代碼已經整理到GitHub上(持續更新),代碼庫地址爲:github
其中,當前這篇文章中出現的Java代碼,位於com.zhangtielei.demos.async.programming.queueing這個package中。編程
在客戶端編程中,使用隊列的場景實際上是不少的。這裏咱們列舉其中幾個。安全
爲了討論方便,咱們把這種對一系列操做進行排隊,並具有必定失敗重試能力的隊列稱爲「任務隊列」。服務器
下面本文分三個章節來討論異步任務和任務隊列的相關話題。微信
在多線程的環境下,提到隊列就不能不提TSQ。它是一個很經典的工具,在不一樣的線程之間提供了一條有序傳輸數據的通道。它的結構圖以下所示。markdown
消費者和生產者分屬不一樣的線程,這樣消費者和生產者才能解耦,生產不至於被消費所阻塞。若是把TSQ用於任務隊列,那麼生產至關於用戶的操做產生了任務,消費至關於任務的啓動和執行。
消費者線程運行在一個循環當中,它不停地嘗試從隊列裏取數據,若是沒有數據,則阻塞在隊列頭上。這種阻塞操做須要依賴操做系統的一些原語。
利用隊列進行解耦,是一個很重要的思想。說遠一點,TSQ的思想推廣到進程之間,就至關於在分佈式系統裏常用的Message Queue。它對於異構服務之間的解耦,以及屏蔽不一樣服務之間的性能差別,能夠起到關鍵做用。
而TSQ在客戶端編程中比較少見,緣由包括:
咱們在這裏提到TSQ,主要是由於它比較經典,也可以和其它方式作一個對比。咱們在這裏就不給出它的源碼演示了,想了解細節的同窗能夠參見GitHub。GitHub上的演示代碼使用了JDK中現成的TSQ的實現:LinkedBlockingQueue。
如上圖所示,生產者和消費者都運行在一個線程,即主線程。按照這種思路來實現任務隊列,咱們須要執行的任務自己必須是異步的,不然整個隊列的任務就無法異步化。
咱們定義要執行的異步任務的接口以下:
public interface Task { /** * 惟一標識當前任務的ID * @return */ String getTaskId(); /** * 因爲任務是異步任務, 那麼start方法被調用只是啓動任務; * 任務完成後會回調TaskListener. * * 注: start方法需在主線程上執行. */ void start(); /** * 設置回調監聽. * @param listener */ void setListener(TaskListener listener); /** * 異步任務回調接口. */ interface TaskListener { /** * 當前任務完成的回調. * @param task */ void taskComplete(Task task); /** * 當前任務執行失敗的回調. * @param task * @param cause 失敗緣由 */ void taskFailed(Task task, Throwable cause); } }複製代碼
因爲Task
是一個異步任務,因此咱們爲它定義了一個回調接口TaskListener
。
getTaskId
是爲了獲得一個能惟一標識當前任務的ID,便於對不一樣任務進行精確區分。
另外,爲了更通用的表達失敗緣由,咱們這裏選用一個Throwable對象來表達(注:在實際編程中這未必是一個值得效仿的作法,具體狀況請具體分析)。
有人可能會說:這裏把Task
接口定義成異步的,那若是想執行一個同步的任務該怎麼辦?這其實很好辦。把同步任務改形成異步任務是很簡單的,有不少種方法(反過來卻很難)。
任務隊列的接口,定義以下:
public interface TaskQueue { /** * 向隊列中添加一個任務. * @param task */ void addTask(Task task); /** * 設置監聽器. * @param listener */ void setListener(TaskQueueListener listener); /** * 銷燬隊列. * 注: 隊列在最後不用的時候, 應該主動銷燬它. */ void destroy(); /** * 任務隊列對外監聽接口. */ interface TaskQueueListener { /** * 任務完成的回調. * @param task */ void taskComplete(Task task); /** * 任務最終失敗的回調. * @param task * @param cause 失敗緣由 */ void taskFailed(Task task, Throwable cause); } }複製代碼
任務隊列TaskQueue
自己的操做也是異步的,addTask
只是將任務放入隊列,至於它何時完成(或失敗),調用者須要監聽TaskQueueListener
接口。
須要注意的一點是,TaskQueueListener
的taskFailed
,與前面TaskListener
的taskFailed
不一樣,它表示任務在通過必定次數的失敗後,最終放棄重試從而最終失敗。然後者只表示那個任務一次執行失敗。
咱們重點討論TaskQueue
的實現,而Task
的實現咱們這裏不關心,咱們只關心它的接口。TaskQueue
的實現代碼以下:
public class CallbackBasedTaskQueue implements TaskQueue, Task.TaskListener { private static final String TAG = "TaskQueue"; /** * Task排隊的隊列. 不須要thread-safe */ private Queue<Task> taskQueue = new LinkedList<Task>(); private TaskQueueListener listener; private boolean stopped; /** * 一個任務最多重試次數. * 重試次數超過MAX_RETRIES, 任務則最終失敗. */ private static final int MAX_RETRIES = 3; /** * 當前任務的執行次數記錄(當嘗試超過MAX_RETRIES時就最終失敗) */ private int runCount; @Override public void addTask(Task task) { //新任務加入隊列 taskQueue.offer(task); task.setListener(this); if (taskQueue.size() == 1 && !stopped) { //當前是第一個排隊任務, 當即執行它 launchNextTask(); } } @Override public void setListener(TaskQueueListener listener) { this.listener = listener; } @Override public void destroy() { stopped = true; } private void launchNextTask() { //取當前隊列頭的任務, 但不出隊列 Task task = taskQueue.peek(); if (task == null) { //impossible case Log.e(TAG, "impossible: NO task in queue, unexpected!"); return; } Log.d(TAG, "start task (" + task.getTaskId() + ")"); task.start(); runCount = 1; } @Override public void taskComplete(Task task) { Log.d(TAG, "task (" + task.getTaskId() + ") complete"); finishTask(task, null); } @Override public void taskFailed(Task task, Throwable error) { if (runCount < MAX_RETRIES && !stopped) { //能夠繼續嘗試 Log.d(TAG, "task (" + task.getTaskId() + ") failed, try again. runCount: " + runCount); task.start(); runCount++; } else { //最終失敗 Log.d(TAG, "task (" + task.getTaskId() + ") failed, final failed! runCount: " + runCount); finishTask(task, error); } } /** * 一個任務最終結束(成功或最終失敗)後的處理 * @param task * @param error */ private void finishTask(Task task, Throwable error) { //回調 if (listener != null && !stopped) { try { if (error == null) { listener.taskComplete(task); } else { listener.taskFailed(task, error); } } catch (Throwable e) { Log.e(TAG, "", e); } } task.setListener(null); //出隊列 taskQueue.poll(); //啓動隊列下一個任務 if (taskQueue.size() > 0 && !stopped) { launchNextTask(); } } }複製代碼
在這個實現中,咱們須要注意的幾點是:
offer
, peek
, take
)都運行在主線程,因此隊列數據結構再也不須要線程安全。咱們選擇了LinkedList的實現。addTask
的時候,若是原來隊列爲空(當前任務是第一個任務),那麼啓動它;MAX_RETRIES
,纔算最終失敗。runCount
記錄了當前任務的累計執行次數。CallbackBasedTaskQueue
的代碼揭示了任務隊列的基本實現模式。
任務隊列對於失敗任務的重試策略,大大提升了最終成功的機率。在GitHub上的演示程序中,我把Task
的失敗機率設置得很高(高達80%),在重試3次的配置下,當任務執行的時候仍然有比較大的機率能最終執行成功。
關於RxJava到底有什麼用?網上有不少討論。
有人說,RxJava就是爲了異步。這個固然沒錯,但說得不具體。
也有人說,RxJava的真正好處就是它提供的各類lift變換。還有人說,RxJava最大的用處是它的Schedulers機制,可以方便地切換線程。其實這些都不是革命性的關鍵因素。
那關鍵的是什麼呢?我我的認爲,是它對於回調接口設計產生的根本性的影響:它消除了爲每一個異步接口單獨定義回調接口的必要性。
這裏立刻就有一個例子。咱們使用RxJava對TaskQueue
接口從新進行改寫。
public interface TaskQueue { /** * 向隊列中添加一個任務. * * @param task * @param <R> 異步任務執行完要返回的數據類型. * @return 一個Observable. 調用者經過這個Observable獲取異步任務執行結果. */ <R> Observable<R> addTask(Task<R> task); /** * 銷燬隊列. * 注: 隊列在最後不用的時候, 應該主動銷燬它. */ void destroy(); }複製代碼
咱們仔細看一看這個修改後的TaskQueue
接口定義。
TaskQueueListener
沒有了。addTask
原來沒有返回值,如今返回了一個Observable。調用者拿到這個Observable,而後去訂閱它(subscribe),就能得到任務執行結果(成功或失敗)。這裏的改動很關鍵。原本addTask
什麼也不返回,要想得到結果必須監聽一個回調接口,這是典型的異步任務的運做方式。但這裏返回一個Observable以後,讓它感受上很是相似一個同步接口了。說得再抽象一點,這個Observable是咱們站在當下對於將來的一個指代,原本尚未運行的、發生在將來的虛無縹緲的任務,這時候有一個實實在在的東西被咱們抓在手裏了。並且咱們還能對它在當下就進行不少操做,並能夠和其它Observable結合。這是這一思想真正的強大之處。相應地,Task
接口原本也是一個異步接口,天然也能夠用這種方式進行修改:
/** * 異步任務接口定義. * * 再也不使用TaskListener傳遞迴調, 而是使用Observable. * * @param <R> 異步任務執行完要返回的數據類型. */ public interface Task <R> { /** * 惟一標識當前任務的ID * @return */ String getTaskId(); /** * * 啓動任務. * * 注: start方法需在主線程上執行. * * @return 一個Observable. 調用者經過這個Observable獲取異步任務執行結果. */ Observable<R> start(); }複製代碼
這裏把改成RxJava的接口討論清楚了,具體的隊列實現反而不重要了。具體實現代碼就不在這裏討論了,想了解詳情的同窗仍是參見GitHub。注意GitHub的實現中用到了一個小技巧:把一個異步的任務封裝成Observable,咱們可使用AsyncOnSubscribe。
咱們在文章開頭講述了TSQ,並指出它在客戶端編程中不多被使用。但並非說在客戶端環境中TSQ就沒有存在的意義。
實際上,客戶端的Run Loop(即Android的Looper)自己就是一個TSQ,要否則它也無法在不一樣線程之間安全地傳遞消息和調度任務。正是由於客戶端有了一個Run Loop,咱們纔有可能使用無鎖的方式來實現任務隊列。因此說,咱們在客戶端的編程,老是與TSQ有着千絲萬縷的聯繫。
順便說一句,Android中的android.os.Looper,最終會依賴Linux內核中大名鼎鼎的epoll事件機制。
本文的核心任務是要講解任務隊列的異步編程方式,因此忽略了一些設計細節。若是你要實現一個生產環境能使用的任務隊列,可能還須要考慮如下這些點:
本文最後運用了RxJava對任務隊列進行了重寫。咱們確實將接口簡化了許多,省去了回調接口的設計,也讓調用者能用統一的方式來處理異步任務。
可是,咱們也須要注意到RxJava帶來的一些問題:
addTask
中,而是有所延遲,延遲到調用者的subscribe開始執行後。並且其執行線程環境有可能受到調用者對於Schedulers的設置的影響(好比經過subscribeOn),有不在主線程執行的風險。考慮到RxJava帶來的這些問題,若是我要實現一個完整功能的任務隊列或者其它複雜的異步任務,特別是要把它開源出來的的時候,我有可能不會讓它對RxJava產生絕對的依賴。而是有可能像Retrofit那樣,同時支持本身的輕量的異步機制和RxJava。
在本文結束以前,我再提出一個有趣的開放性問題。本文GitHub上給出的代碼大量使用了匿名類(至關於Java 8的lambda表達式),這會致使對象之間的引用關係變得複雜。那麼,對於這些對象的引用關係的分析,會是一個頗有趣的話題。好比,這些引用關係開始是如何隨着程序執行創建起來的,最終銷燬的時候又是如何解除的?有沒有內存泄露呢?歡迎留言討論。
在下一篇,咱們將討論有關異步任務更復雜的一個問題:異步任務的取消。
(完)
其它精選文章: