RxJava重溫基礎

RxJava是什麼

a library for composing asynchronous and event-based programs using observable sequences for the Java VM
解釋:一個對於構成使用的Java虛擬機觀察序列異步和基於事件的程序庫
java

 

RxJava 是一個響應式編程框架,採用觀察者設計模式。因此天然少不了 Observable 和 Subscriber 這兩個東東了。
RxJava 是一個開源項目,地址:https://github.com/ReactiveX/RxJava
RxAndroid,用於 Android 開發,添加了 Android 用的接口。地址: https://github.com/ReactiveX/RxAndroid
android

 

基本概念

網上關於RxJava的博文也有不少,我也看過許多,其中不乏有優秀的文章,但絕大部分文章都有一個共同點,就是側重於講RxJava中各類強大的操做符,而忽略了最基本的東西——概念,因此一開始我也看的一臉懵逼,看到後面又忘了前面的,腦子裏全是問號,這個是什麼,那個又是什麼,這兩個長得怎麼那麼像。舉個不太恰當的例子,概念之於初學者,就像食物之於人,當你餓了,你會想吃麪包、牛奶,那你爲何不去吃土呢,由於你知道麪包牛奶是用來幹嗎的,土是用來幹嗎的。同理,前面已經說過,RxJava無非是發送數據與接收數據,那麼什麼是發射源,什麼是接收源,這就是你應該明確的事,也是RxJava的入門條件之一,下面就依我我的理解,對發射源和接收源作個歸類,以及RxJava中頻繁出現的幾個「單詞」解釋一通;git

  • Observable:發射源,英文釋義「可觀察的」,在觀察者模式中稱爲「被觀察者」或「可觀察對象」;github

  • Observer:接收源,英文釋義「觀察者」,沒錯!就是觀察者模式中的「觀察者」,可接收ObservableSubject發射的數據;web

  • SubjectSubject是一個比較特殊的對象,既可充當發射源,也可充當接收源,爲避免初學者被混淆,本章將不對Subject作過多的解釋和使用,重點放在ObservableObserver上,先把最基本方法的使用學會,後面再學其餘的都不是什麼問題;數據庫

  • Subscriber:「訂閱者」,也是接收源,那它跟Observer有什麼區別呢?Subscriber實現了Observer接口,比Observer多了一個最重要的方法unsubscribe( ),用來取消訂閱,當你再也不想接收數據了,能夠調用unsubscribe( )方法中止接收,Observer 在 subscribe() 過程當中,最終也會被轉換成 Subscriber 對象,通常狀況下,建議使用Subscriber做爲接收源;編程

  • Subscription :Observable調用subscribe( )方法返回的對象,一樣有unsubscribe( )方法,能夠用來取消訂閱事件;設計模式

  • Action0RxJava中的一個接口,它只有一個無參call()方法,且無返回值,一樣還有Action1,Action2…Action9等,Action1封裝了含有 1 個參的call()方法,即call(T t),Action2封裝了含有 2 個參數的call方法,即call(T1 t1,T2 t2),以此類推;api

  • Func0:與Action0很是類似,也有call()方法,可是它是有返回值的,一樣也有Func0、Func1…Func9;網絡

 

RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者)。Observables發出一系列事件,Subscribers處理這些事件。這裏的事件能夠是任何你感興趣的東西(觸摸事件,web接口調用返回的數據…)

一個Observable能夠發出零個或者多個事件,知道結束或者出錯。每發出一個事件,就會調用它的SubscriberonNext方法,最後調用Subscriber.onNext()或者Subscriber.onError()結束。

Rxjava的看起來很想設計模式中的觀察者模式,可是有一點明顯不一樣,那就是若是一個Observerble沒有任何的的Subscriber,那麼這個Observable是不會發出任何事件的。

基本用法

Observable的建立

使用create( ),最基本的建立方式:

1 Observable<String>  myObservable  = Observable.create(new Observable.OnSubscribe<String>() {
2   @Override
3   public void call(Subscriber<? super String> subscriber) {
4       subscriber.onNext("Hello, world!"); //發射一個"Hello, world!"的String
5       subscriber.onCompleted();//發射完成,這種方法須要手動調用onCompleted,纔會回調Observer的onCompleted方法
6   }});

能夠看到,這裏傳入了一個 OnSubscribe 對象做爲參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的做用至關於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber將會被調用一次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式

這個例子很簡單:事件的內容是字符串,而不是一些複雜的對象;事件的內容是已經定好了的,而不像有的觀察者模式同樣是待肯定的(例如網絡請求的結果在請求返回以前是未知的);全部事件在一瞬間被所有發送出去,而不是夾雜一些肯定或不肯定的時間間隔或者通過某種觸發器來觸發的。總之,這個例子看起來毫無實用價值。但這是爲了便於說明,實質上只要你想,各類各樣的事件發送規則你均可以本身來寫。至於具體怎麼作,後面都會講到,但如今不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。


Subscriber的建立

上面定義的Observable對象僅僅發出一個Hello World字符串,而後就結束了。接着咱們建立一個Subscriber來處理Observable對象發出的字符串:

 1 Subscriber<String> mySubscriber = new Subscriber<String>() {  
 2     @Override  
 3     public void onNext(String s) {
 4          System.out.println(s); //打印出"Hello, world!"
 5     }  
 6   
 7     @Override  
 8     public void onCompleted() { }  
 9   
10     @Override  
11     public void onError(Throwable e) { }  
12 };

除了 Observer 接口以外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:

 1 Observer<String> myObserver = new Observer<String>() {  
 2     @Override  
 3     public void onNext(String s) {
 4          System.out.println(s); //打印出"Hello, world!"
 5     }  
 6   
 7     @Override  
 8     public void onCompleted() { }  
 9   
10     @Override  
11     public void onError(Throwable e) { }  
12 };

 

不只基本使用方式同樣,實質上,在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 再使用。因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:

  1. onStart(): 這是 Subscriber 增長的方法。它會在 subscribe 剛開始,而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。須要注意的是,若是對準備工做的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行),onStart() 就不適用了,由於它老是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法,具體能夠在後面的文中看到。

  2. unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe()這個方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關係,以免內存泄露的發生。


 

Observable與Subscriber的關聯

這裏subscriber僅僅就是打印observable發出的字符串。經過subscribe函數就能夠將咱們定義的myObservable對象和mySubscriber對象關聯起來,這樣就完成了subscriberobservable的訂閱。

1 myObservable.subscribe(myObserver);
2 // 或者:
3 myObservable.subscribe(mySubscriber);

一旦mySubscriber訂閱了myObservablemyObservable就是調用mySubscriber對象的onNextonComplete方法,mySubscriber 就會打印出Hello World!

訂閱(Subscriptions)

當調用Observable.subscribe(),會返回一個Subscription對象。這個對象表明了被觀察者和訂閱者之間的聯繫。

1 Subscription subscription = Observable.just("Hello, World!")
2     .subscribe(s -> System.out.println(s));

你能夠在後面使用這個Subscription對象來操做被觀察者和訂閱者之間的聯繫.

1 subscription.unsubscribe();//接觸訂閱關係
2 System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
3 // Outputs "Unsubscribed=true"

 

RxJava的另一個好處就是它處理unsubscribing的時候,會中止整個調用鏈。若是你使用了一串很複雜的操做符,調用unsubscribe將會在他當前執行的地方終止。不須要作任何額外的工做!


 

簡化代碼(Observable與Subscriber)

簡化Observable

是否是以爲僅僅爲了打印一個hello world要寫這麼多代碼太囉嗦?我這裏主要是爲了展現RxJava背後的原理而採用了這種比較囉嗦的寫法,RxJava其實提供了不少便捷的函數來幫助咱們減小代碼。

首先來看看如何簡化Observable對象的建立過程。RxJava內置了不少簡化建立Observable對象的函數,好比Observable.just就是用來建立只發出一個事件就結束的Observable對象,上面建立Observable對象的代碼能夠簡化爲一行:

1 Observable<String> myObservable = Observable.just("Hello, world!"); //發送"Hello, world!"

其餘方法:

1.使用just( ),將爲你建立一個Observable並自動爲你調用onNext( )發射數據:

1 justObservable = Observable.just("just1","just2");//依次發送"just1"和"just2"

2.使用from( ),遍歷集合,發送每一個item:

1 List<String> list = new ArrayList<>();
2 list.add("from1");
3 list.add("from2");
4 list.add("from3");
5 fromObservable = Observable.from(list);  //遍歷list 每次發送一個
6 /** 注意,just()方法也能夠傳list,可是發送的是整個list對象,而from()發送的是list的一個item** /

3.使用defer( ),有觀察者訂閱時才建立Observable,而且爲每一個觀察者建立一個新的Observable:

1 deferObservable = Observable.defer(new Func0<Observable<String>>() {
2   @Override
3   //注意此處的call方法沒有Subscriber參數
4   public Observable<String> call() {
5       return Observable.just("deferObservable");
6   }});

4.使用interval( ),建立一個按固定時間間隔發射整數序列的Observable,可用做定時器:

1 intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒發送一次

5.使用range( ),建立一個發射特定整數序列的Observable,第一個參數爲起始值,第二個爲發送的個數,若是爲0則不發送,負數則拋異常:

1 rangeObservable = Observable.range(10, 5);//將發送整數10,11,12,13,14

6.使用timer( ),建立一個Observable,它在一個給定的延遲後發射一個特殊的值,等同於Android中Handler的postDelay( )方法:

1 timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒後發射一個值

7.使用repeat( ),建立一個重複發射特定數據的Observable:

1 repeatObservable = Observable.just("repeatObservable").repeat(3);//重複發射3次

 

簡化Subscriber

接下來看看如何簡化Subscriber,上面的例子中,咱們其實並不關心OnComplete和OnError,咱們只須要在onNext的時候作一些處理,這時候就可使用Action1類。

1 Action1<String> onNextAction = new Action1<String>() {  
2     @Override  
3     public void call(String s) {  
4         System.out.println(s);  
5     }  
6 };

subscribe方法有一個重載版本,接受三個Action1類型的參數,分別對應OnNext,OnComplete, OnError函數:

1 myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);  

這裏咱們並不關心onError和onComplete,因此只須要第一個參數就能夠

1 myObservable.subscribe(onNextAction);  
2 // Outputs "Hello, world!"  

上面的代碼最終能夠寫成這樣:

1 Observable.just("Hello, world!")  
2     .subscribe(new Action1<String>() {  
3         @Override  
4         public void call(String s) {  
5               System.out.println(s);  
6         }  
7     });

 

使用java8的lambda可使代碼更簡潔:

不熟悉Lambda的能夠看我以前寫的:Java8之Lambda表達式(Android用法)

1 Observable.just("Hello, world!")  
2     .subscribe(s -> System.out.println(s));

 

簡單解釋一下這段代碼中出現的 Action1 和 Action0 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;因爲 onCompleted() 方法也是無參無返回值的,所以 Action0 能夠被當成一個包裝對象,將 onCompleted() 的內容打包起來將本身做爲一個參數傳入 subscribe() 以實現不完整定義的回調。這樣其實也能夠看作將onCompleted() 方法做爲參數傳進了 subscribe(),至關於其餘某些語言中的『閉包』。 Action1 也是一個接口,它一樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數;與 Action0 同理,因爲 onNext(T obj) 和 onError(Throwable error)也是單參數無返回值的,所以 Action1 能夠將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回調。事實上,雖然 Action0 和 Action1 在 API 中使用最普遍,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們能夠被用以包裝不一樣的無返回值的方法。

 

注:正如前面所提到的,Observer 和 Subscriber 具備相同的角色,並且 Observer 在 subscribe() 過程當中最終會被轉換成 Subscriber 對象,所以,從這裏開始,後面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。

操做符(Operators)

操做符就是爲了解決對Observable對象的 變換(關鍵詞) 的問題,操做符用於在Observable和最終的Subscriber之間修改Observable發出的事件。RxJava提供了不少頗有用的操做符。 好比map操做符,就是用來把把一個事件轉換爲另外一個事件的。

map()操做符:

 1 Observable.just("images/logo.png") // 輸入類型 String
 2     .map(new Func1<String, Bitmap>() {
 3         @Override
 4         public Bitmap call(String filePath) { // 參數類型 String
 5             return getBitmapFromPath(filePath); // 返回類型 Bitmap
 6         }
 7     })
 8     .subscribe(new Action1<Bitmap>() {
 9         @Override
10         public void call(Bitmap bitmap) { // 參數類型 Bitmap
11             showBitmap(bitmap);
12         }
13     });

使用lambda能夠簡化爲:

1 Observable.just("images/logo.png") // 輸入類型 String
2     .map(
3                 filePath -> getBitmapFromPath(filePath); // 返回類型 Bitmap
4         )
5     .subscribe(
6                    bitmap -> showBitmap(bitmap);
7               );

 

能夠看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在通過 map() 方法後,事件的參數類型也由 String 轉爲了 Bitmap。這種直接變換對象並返回的,是最多見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不只能夠針對事件對象,還能夠針對整個事件隊列,這使得 RxJava 變得很是靈活。

map()操做符進階:

1 Observable.just("Hello, world!")  
2     .map(s -> s.hashCode())  
3     .map(i -> Integer.toString(i))  
4     .subscribe(s -> System.out.println(s));

是否是很酷?map()操做符就是用於變換Observable對象的,map操做符返回一個Observable對象,這樣就能夠實現鏈式調用,在一個Observable對象上屢次使用map操做符,最終將最簡潔的數據傳遞給Subscriber對象。

flatMap()操做符:

假設我有這樣一個方法: 這個方法根據輸入的字符串返回一個網站的url列表

1 Observable<List<String>> query(String text);   

Observable.flatMap()接收一個Observable的輸出做爲輸入,同時輸出另一個Observable。直接看代碼:

1 query("Hello, world!")  
2     .flatMap(new Func1<List<String>, Observable<String>>() {  
3         @Override  
4         public Observable<String> call(List<String> urls) {  
5             return Observable.from(urls);  
6         }  
7     })  
8     .subscribe(url -> System.out.println(url));

這裏我貼出了整個的函數代碼,以方便你瞭解發生了什麼,使用lambda能夠大大簡化代碼長度:

1 query("Hello, world!")  
2     .flatMap(urls -> Observable.from(urls))  
3     .subscribe(url -> System.out.println(url));

flatMap()是否是看起來很奇怪?爲何它要返回另一個Observable呢?理解flatMap的關鍵點在於,flatMap輸出的新的Observable正是咱們在Subscriber想要接收的。如今Subscriber再也不收到List<String>,而是收到一些列單個的字符串,就像Observable.from()的輸出同樣。

flatMap() 和map()有一個相同點:它也是把傳入的參數轉化以後返回另外一個對象。但須要注意,和 map() 不一樣的是, flatMap() 中返回的是個 Observable 對象,而且這個 Observable 對象並非被直接發送到了 Subscriber 的回調方法中。flatMap() 的原理是這樣的:

  1. 使用傳入的事件對象建立一個 Observable 對象;
  2. 並不發送這個 Observable, 而是將它激活,因而它開始發送事件;
  3. 每個建立出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,經過一組新建立的 Observable 將初始的對象『鋪平』以後經過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat

值得注意的是.from()是Observable建立時候用的,.flatMap()纔是操做符;

其餘操做符:

目前爲止,咱們已經接觸了兩個操做符,RxJava中還有更多的操做符,那麼咱們如何使用其餘的操做符來改進咱們的代碼呢?

更多RxJava的操做符請查看:RxJava操做符大全

getTitle()返回null若是url不存在。咱們不想輸出」null」,那麼咱們能夠從返回的title列表中過濾掉null值!

1 query("Hello, world!")  
2     .flatMap(urls -> Observable.from(urls))  
3     .flatMap(url -> getTitle(url))  
4     .filter(title -> title != null)  
5     .subscribe(title -> System.out.println(title));

filter()輸出和輸入相同的元素,而且會過濾掉那些不知足檢查條件的。

若是咱們只想要最多5個結果:

1 query("Hello, world!")  
2     .flatMap(urls -> Observable.from(urls))  
3     .flatMap(url -> getTitle(url))  
4     .filter(title -> title != null)  
5     .take(5)  
6     .subscribe(title -> System.out.println(title));

take()輸出最多指定數量的結果。

若是咱們想在打印以前,把每一個標題保存到磁盤:

1 query("Hello, world!")  
2     .flatMap(urls -> Observable.from(urls))  
3     .flatMap(url -> getTitle(url))  
4     .filter(title -> title != null)  
5     .take(5)  
6     .doOnNext(title -> saveTitle(title))  
7     .subscribe(title -> System.out.println(title));

doOnNext()容許咱們在每次輸出一個元素以前作一些額外的事情,好比這裏的保存標題。

看到這裏操做數據流是多麼簡單了麼。你能夠添加任意多的操做,而且不會搞亂你的代碼。

RxJava包含了大量的操做符。操做符的數量是有點嚇人,可是很值得你去挨個看一下,這樣你能夠知道有哪些操做符可使用。弄懂這些操做符可能會花一些時間,可是一旦弄懂了,你就徹底掌握了RxJava的威力。

感受如何?

好吧,你是一個懷疑主義者,而且還很難被說服,那爲何你要關心這些操做符呢?

由於操做符可讓你對數據流作任何操做。

將一系列的操做符連接起來就能夠完成複雜的邏輯。代碼被分解成一系列能夠組合的片斷。這就是響應式函數編程的魅力。用的越多,就會越多的改變你的編程思惟。

線程控制(Scheduler)

假設你編寫的Android app須要從網絡請求數據。網絡請求須要花費較長的時間,所以你打算在另一個線程中加載數據。那麼問題來了!

編寫多線程的Android應用程序是很難的,由於你必須確保代碼在正確的線程中運行,不然的話可能會致使app崩潰。最多見的就是在非主線程更新UI。

在不指定線程的狀況下, RxJava 遵循的是線程不變的原則,即:在哪一個線程調用 subscribe(),就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到 Scheduler (調度器)。

使用RxJava,你可使用subscribeOn()指定觀察者代碼運行的線程,使用observerOn()指定訂閱者運行的線程

Scheduler 的 API

在RxJava 中,Scheduler ——調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:

  • Schedulers.immediate(): 直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。

  • Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。

  • Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。

  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。

  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。

有了以上這幾個 Scheduler ,就可使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。

  • subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫作事件產生的線程。

  • observeOn(): 指定 Subscriber 所運行在的線程。或者叫作事件消費的線程。

注意:observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 並不必定是 subscribe() 參數中的 Subscriber(這塊參考RxJava變換部分),而是 observeOn() 執行時的當前 Observable 所對應的 Subscriber ,即它的直接下級 Subscriber 。

換句話說,observeOn() 指定的是它以後的操做所在的線程。所以若是有屢次切換線程的需求,只要在每一個想要切換線程的位置調用一次 observeOn() 便可。

代碼示例:

1 Observable.just(1, 2, 3, 4)
2     .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
3     .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
4     .subscribe(new Action1<Integer>() {
5         @Override
6         public void call(Integer number) {
7             Log.d(tag, "number:" + number);
8         }
9     });

上面這段代碼中,因爲 subscribeOn(Schedulers.io()) 的指定,被建立的事件的內容 一、二、三、4 將會在 IO 線程發出; 而因爲 observeOn(AndroidScheculers.mainThread()) 的指定,所以 subscriber 數字的打印將發生在主線程 。 事實上,這種在 subscribe() 以前寫上兩句subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式很是常見,它適用於多數的 『後臺線程取數據,主線程顯示』的程序策略。

下面的實例,在Observable.OnSubscribecall()中模擬了長時間獲取數據過程,在SubscribernoNext()中顯示數據到UI。

 1 Observable.create(new Observable.OnSubscribe<String>() {
 2      @Override
 3      public void call(Subscriber<? super String> subscriber) {
 4          subscriber.onNext("info1");
 5   
 6          SystemClock.sleep(2000);
 7          subscriber.onNext("info2-sleep 2s");
 8   
 9          SystemClock.sleep(3000);
10          subscriber.onNext("info2-sleep 3s");
11   
12          SystemClock.sleep(5000);
13          subscriber.onCompleted();
14      }
15  })
16 .subscribeOn(Schedulers.io()) //指定 subscribe() 發生在 IO 線程
17 .observeOn(AndroidSchedulers.mainThread()) //指定 Subscriber 的回調發生在主線程
18 .subscribe(new Subscriber<String>() {
19     @Override
20     public void onCompleted() {
21         Log.v(TAG, "onCompleted()");
22     }
23   
24     @Override
25     public void onError(Throwable e) {
26         Log.v(TAG, "onError() e=" + e);
27     }
28   
29     @Override
30     public void onNext(String s) {
31         showInfo(s); //UI view顯示數據
32     }
33 });

至此,咱們能夠看到call()將會發生在 IO 線程,而showInfo(s)則被設定在了主線程。這就意味着,即便加載call()耗費了幾十甚至幾百毫秒的時間,也不會形成絲毫界面的卡頓。

值得注意:subscribeOn () 與 observeOn()都會返回了一個Observable,所以若不是採用上面這種直接流方式,而是分步調用方式,須要將新返回的Observable賦給原來的Observable,不然線程調度將不會起做用。

使用下面方式,最後發現「OnSubscribe」仍是在默認線程中運行;緣由是subscribeOn這類操做後,返回的是一個新的Observable。

1 observable.subscribeOn(Schedulers.io());
2 observable.observeOn(AndroidSchedulers.mainThread());
3 observable .subscribe(subscribe);

能夠修改成下面兩種方式:

1 observable = observable.subscribeOn(Schedulers.io());
2 observable = observable.observeOn(AndroidSchedulers.mainThread());
3 observable .subscribe(subscribe);
4 //OR
5 observable.subscribeOn(Schedulers.io())
6 .observeOn(AndroidSchedulers.mainThread())
7 .subscribe(subscribe);

 

前面講到了,能夠利用 subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發生在不一樣的線程。但是在瞭解了 map() flatMap() 等變換方法後,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾回線程?

答案是:能。 由於 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 並非(嚴格說應該爲『不必定是』,但這裏不妨理解爲『不是』)subscribe() 參數中的 Subscriber ,而是 observeOn() 執行時的當前 Observable 所對應的 Subscriber ,即它的直接下級 Subscriber 。換句話說,observeOn() 指定的是它以後的操做所在的線程。所以若是有屢次切換線程的需求,只要在每一個想要切換線程的位置調用一次 observeOn() 便可。上代碼:

1 Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
2     .subscribeOn(Schedulers.io())
3     .observeOn(Schedulers.newThread())
4     .map(mapOperator) // 新線程,由 observeOn() 指定
5     .observeOn(Schedulers.io())
6     .map(mapOperator2) // IO 線程,由 observeOn() 指定
7     .observeOn(AndroidSchedulers.mainThread) 
8     .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

如上,經過 observeOn() 的屢次調用,程序實現了線程的屢次切換。

不過,不一樣於 observeOn() , subscribeOn() 的位置放在哪裏均可以,但它是隻能調用一次的。

又有好事的(其實仍是當初的我)問了:若是我非要調用屢次 subscribeOn() 呢?會有什麼效果?

這個問題先放着,咱們仍是從 RxJava 線程控制的原理提及吧。

Scheduler 的原理

其實, subscribeOn() 和 observeOn() 的內部實現,也是用的 lift()。具體看圖(不一樣顏色的箭頭表示不一樣的線程):

subscribeOn()原理圖:

observeOn() 原理圖:

從圖中能夠看出,subscribeOn() 和 observeOn() 都作了線程切換的工做(圖中的 「schedule…」 部位)。不一樣的是, subscribeOn() 的線程切換髮生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件尚未開始發送,所以 subscribeOn() 的線程控制能夠從事件發出的開端就形成影響;而 observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 發送事件時,所以 observeOn() 控制的是它後面的線程。

最後,我用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,線程調度是怎麼發生的(因爲圖中對象較多,相對於上面的圖對結構作了一些簡化調整):

圖中共有 5 處含有對事件的操做。由圖中能夠看出,①和②兩處受第一個 subscribeOn() 影響,運行在紅色線程;③和④處受第一個 observeOn() 的影響,運行在綠色線程;⑤處受第二個 onserveOn() 影響,運行在紫色線程;而第二個 subscribeOn() ,因爲在通知過程當中線程就被第一個 subscribeOn() 截斷,所以對整個流程並無任何影響。這裏也就回答了前面的問題:當使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起做用。

延伸:doOnSubscribe()

doOnSubscribe()通常用於執行一些初始化操做.

然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程以前倒是能夠利用的。

在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用做流程開始前的初始化。然而 onStart() 因爲在 subscribe() 發生時就被調用了,所以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就致使若是 onStart()中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,由於有時你沒法預測 subscribe() 將會在什麼線程執行。

而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 一樣是在 subscribe() 調用後並且在事件發送前執行,但區別在於它能夠指定線程。默認狀況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而若是在 doOnSubscribe() 以後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。

示例:

 1 Observable.create(onSubscribe)
 2     .subscribeOn(Schedulers.io())
 3     .doOnSubscribe(new Action0() {
 4         @Override
 5         public void call() {
 6             progressBar.setVisibility(View.VISIBLE); // 須要在主線程執行
 7         }
 8     })
 9     .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
10     .observeOn(AndroidSchedulers.mainThread())
11     .subscribe(subscriber);

 

如上,在 doOnSubscribe() 的後面跟一個 subscribeOn() ,就能指定準備工做的線程了。

 

RxJava 的適用場景和使用方式

RxJava + Retrofit

 

Retrofit 是 Square 的一個著名的網絡請求庫。對於Retrofit不瞭解的同窗 能夠參考我以前寫的文章:全新的網絡加載框架Retrofit2,上位的小三

 

Retrofit 除了提供了傳統的 Callback 形式的 API,還有 RxJava 版本的 Observable 形式 API。下面我用對比的方式來介紹 Retrofit 的 RxJava 版 API 和傳統版本的區別。

 

以獲取一個 MovieEntity 對象的接口做爲例子。使用Retrofit 的傳統 API,你能夠用這樣的方式來定義請求:

1 @GET("top250")
2 Call<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//正常返回Call對象

咱們來寫getMovie方法的代碼:

 

 1 //進行網絡請求
 2 private void getMovie(){
 3     String baseUrl = "https://api.douban.com/v2/movie/";
 4 
 5     Retrofit retrofit = new Retrofit.Builder()
 6             .baseUrl(baseUrl)
 7             .addConverterFactory(GsonConverterFactory.create())
 8             .build();
 9 
10     MovieService movieService = retrofit.create(MovieService.class);
11     Call<MovieEntity> call = movieService.getTopMovie(0, 10);
12     call.enqueue(new Callback<MovieEntity>() {
13         @Override
14         public void onResponse(Call<MovieEntity> call, Response<MovieEntity> response) {
15             resultTV.setText(response.body().toString());
16         }
17 
18         @Override
19         public void onFailure(Call<MovieEntity> call, Throwable t) {
20             resultTV.setText(t.getMessage());
21         }
22     });
23 }

 

以上爲沒有通過封裝的、原生態的Retrofit寫網絡請求的代碼。

 

而使用 RxJava 形式的 API,定義一樣的請求是這樣的:

1 @GET("top250")
2   Observable<MovieEntity> getTopMovie(@Query("start") int start, @Query("count") int count);//RxJava返回Observable對象

Retrofit自己對Rxjava提供了支持,getMovie方法改成:

 1 //進行網絡請求
 2 private void getMovie(){
 3     String baseUrl = "https://api.douban.com/v2/movie/";
 4 
 5     Retrofit retrofit = new Retrofit.Builder()
 6             .baseUrl(baseUrl)
 7             .addConverterFactory(GsonConverterFactory.create())
 8             .addCallAdapterFactory(RxJavaCallAdapterFactory.create())//提供RXjava支持
 9             .build();
10 
11     MovieService movieService = retrofit.create(MovieService.class);
12 
13     movieService.getTopMovie(0, 10)//返回Observable對象
14             .subscribeOn(Schedulers.io())
15             .observeOn(AndroidSchedulers.mainThread())
16             .subscribe(new Subscriber<MovieEntity>() {
17                 @Override
18                 public void onCompleted() {
19                     Toast.makeText(MainActivity.this, "Get Top Movie Completed", Toast.LENGTH_SHORT).show();
20                 }
21 
22                 @Override
23                 public void onError(Throwable e) {
24                     resultTV.setText(e.getMessage());
25                 }
26 
27                 @Override
28                 public void onNext(MovieEntity movieEntity) {
29                     resultTV.setText(movieEntity.toString());
30                 }
31             });
32 }

 

這樣基本上就完成了RetrofitRxjava的結合,你們能夠本身進行封裝;那麼用上了RxJava,咱們就能夠用它強大的操做符來對數據進行處理和操做,各位看官能夠具體去實現,我在這裏不作多作贅述。

 

參考文章:RxJava 與 Retrofit 結合的最佳實踐

RxBinding

RxBinding 是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基於 RxJava 的 Binding API。所謂 Binding,就是相似設置 OnClickListener 、設置 TextWatcher 這樣的註冊綁定對象的 API

 

舉個設置點擊監聽的例子。使用 RxBinding ,能夠把事件監聽用這樣的方法來設置:

1 Button button = ...;
2 RxView.clickEvents(button) // 以 Observable 形式來反饋點擊事件
3     .subscribe(new Action1<ViewClickEvent>() {
4         @Override
5         public void call(ViewClickEvent event) {
6             // Click handling
7         }
8     });

看起來除了形式變了沒什麼區別,實質上也是這樣。甚至若是你看一下它的源碼,你會發現它連實現都沒什麼驚喜:它的內部是直接用一個包裹着的 setOnClickListener() 來實現的。然而,僅僅這一個形式的改變,卻剛好就是 RxBinding 的目的:擴展性。經過 RxBinding 把點擊監聽轉換成 Observable 以後,就有了對它進行擴展的可能。擴展的方式有不少,根據需求而定。一個例子是前面提到過的 throttleFirst() 操做符,用於去抖動,也就是消除手抖致使的快速連環點擊:

1 RxView.clickEvents(button)
2     .throttleFirst(500, TimeUnit.MILLISECONDS)
3     .subscribe(clickAction);

若是想對 RxBinding 有更多瞭解,能夠去它的 GitHub 項目 下面看看。

RxLifecyle

RxLifecycle 配合 Activity/Fragment 生命週期來管理訂閱的。 因爲 RxJava Observable 訂閱後(調用 subscribe 函數),通常會在後臺線程執行一些操做(好比訪問網絡請求數據),當後臺操做返回後,調用 Observer 的 onNext 等函數,而後在 更新 UI 狀態。 可是後臺線程請求是須要時間的,若是用戶點擊刷新按鈕請求新的微博信息,在刷新尚未完成的時候,用戶退出了當前界面返回前面的界面,這個時候刷新的 Observable 若是不取消訂閱,則會致使以前的 Activity 沒法被 JVM 回收致使內存泄露。 這就是 Android 裏面的生命週期管理須要注意的地方,RxLifecycle 就是用來幹這事的。好比下面的示例:

1 myObservable
2     .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTROY))
3     .subscribe();

這樣Activitydestroy的時候就會自動取消這個observer

 

RxBus

 

RxBus並非一個庫,而是一種模式。相信大多數開發者都使用過EventBus或者Otto,做爲事件總線通訊庫,若是你的項目已經加入RxJavaEventBus,不妨用RxBus代替EventBus,以減小庫的依賴。RxJava也能夠輕鬆實現事件總線,由於它們都依據於觀察者模式。

 

拓展連接: 用RxJava實現事件總線(Event Bus) [深刻RxBus]:支持Sticky事件

 

RxPermission

 

RxPermission是基於RxJava開發的用於幫助在Android 6.0中處理運行時權限檢測的框架。在Android 6.0中,系統新增了部分權限的運行時動態獲取。而再也不是在之前的版本中安裝的時候授予權限。

 

拓展連接: 使用RxPermission框架對android6.0權限進行檢測

 

總結

 

簡而言之Rxjava是一個很牛逼的庫,若是你的項目中尚未使用RxJava的話,建議能夠嘗試去集成使用;對大多數人而已RxJava是一個比較難上手的庫了,不亞於Dagger的上手難度;不過當你認識學習使用過了,你就會發現RxJava的魅力所在;若是看一遍沒有看懂的童鞋,建議多看幾回;動手寫寫代碼,我想信本文能夠給到大家一些幫助;大家真正的體會到什麼是 從入門到放棄再到不離不棄 ;這就是RxJava的魅力所在。

 

 

拓展閱讀:

 

我所理解的RxJava——上手其實很簡單

 

深刻淺出RxJava - 大頭鬼

 

給 Android 開發者的 RxJava 詳解 - 拋物線

相關文章
相關標籤/搜索