RxJava簡單的介紹

1. RxJava簡介

Rx(ReactiveX,響應式編程)是一種事件驅動的基於異步數據流的編程模式,整個數據流就像一條河流,它能夠被觀測(監聽),過濾,操控或者與其餘數據流合併爲一條新的數據流。而RxJava是.Net Rx在JVM上的實現。RxJava能夠應用於大部分基於JVM的語言,如Scala,Groovy等。整個RxJava+RxAndroid的包大小爲(1125kb+10kb)java

2.RxJava特色

  • 函數響應式編程(Functional Reactive Programming,FRP)
  • 異步
  • 事件驅動的
  • 基於觀察者模式
  • 專門的出錯處理,當使用RxJava出現錯誤時,它不會直接拋出異常,而是會執行OnError()方法;
  • 併發,能夠很容易實現多線程

3.RxJava的基本概念

RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者),Observables發出一系列事件,Subscribers處理這些事件。而RxJavaObservables是擴展自設計模式中的觀察者模式,添加了如下幾個能力:android

  • onCompleted(),當沒有新的可用數據時,通知Observables
  • onError(),當發生錯誤時,通知Observables,但不會直接將錯誤或異常直接拋出;

3.1 四個關鍵概念

  • Observable,產生事件(事件源)
  • Observer, 根據事件做出相應的響應
  • Subscriber,實現了Observer的抽象類,
  • Subjects,Observable + Observer

3.1.1 Observable

Observable在存活期間,生命週期包含三個可能的事件,與迭代器的生命週期很相似:git

Events Iterable(pull) Observable(push)
獲得數據 T next() onNext(T)
發現錯誤 throws Exception onError(Throwable)
完成 !hasNext() onCompleted()

與使用迭代器的區別:在使用迭代器的時候,線程會阻塞直到他們須要的數據到來。而使用Observable,是使用異步的方式將數據推送到Observer;github

而根據推送機制的不一樣,Observable分爲熱Observable和冷Observable:編程

  • 熱Observable,當他建立時新開始執行它的職責,這樣全部訂閱了這個Observable的Observer就能夠直接大中途觀察了(但可能會丟失前面發送的數據(事件));
  • 冷Observable,只有等到有訂閱(subscribes)了這個Observable的Observer纔開始執行它的職責:發送數據;

下面是一個簡單的建立觀察者的代碼:設計模式

Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber){}
});

// example
Observable<Integer> ob=
Observable.create(new Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> subscriber){
for(int i = 0; i < 5; i++){
observer.onNext(i);
}
observer.onCompleted();
}
});
// 並不用關心有多少數據,
Subscription subscriptionPrint = 
 observableString.subscribe(new Observer<Integer(){
 @Override
 public void onCompleted(){
 System.out.println("Observable completed");
 }
  @Override
 public void onError(Throwable e){
  System.out.println("Oh no! Something wrong happened!");
 }
 
  @Override
 public void onNext(Integer item)
    System.out.println("Item is " + item); 
 })

Observable的構造方法:網絡

  • create(subscribe),須要一個subscribe做爲參數來構造,
  • from(list)// 用來從一個已知的列表的產生數據,和前面的create做用相似;
  • just(funnction),用來接收從一個方法的返回值(最多能夠有9個參數),若是返回的是List,它不會去逐個遍歷List的Items,而是直接輸出整個List ;
  • empty(),不輸出數據,但能夠正常結束;
  • never(),不輸出數據,而且不會終止;
  • throw(),不輸出數據,但在發生錯誤時終止;
  • interval(),建立一個按固定間隔發送整數序列的Observable,
  • timer(),建立一個Observable在給定的延遲後發送一個特殊的值;

3.1.2 Subject

Subject = Observable + Observer
這意味着一個Subject能夠同時是觀察者和被觀察者,事件源(Observable),也就是說Subject能夠像觀察者同樣訂閱一個事件源,而且能夠像Observable同樣輸出它們收到的事件。RxJava提供了四種不一樣類型的subjects:多線程

  • PublishSubject,
  • BehaviorSubject,輸出它觀察到的大部分最近的Items和隨後觀察的Items到全部的訂閱者,初始化時須要一個初始值來作爲最近的Items
  • ReplaySubject,將它觀察到的全部數據重複發送到全部訂閱了的觀察者;
  • AsyncSubject, 在整個Observable完成後,將最後觀察到的Items發送給每個訂閱者;

4. RxJava的操做符

4.1 過濾

  • filter(),過濾掉不須要的數據,只有返回true 的數據纔會被使用;
  • take(int n),只取返回數據中的前n個,skip(int n)跳過前n個數據;
  • takeLast(int n),只取數據的最後n個,skipLast(int n);
  • distinct(),會幫助咱們處理重複的數據,但若是數據太大的話,內存須要比較大
  • distinctUntilChanged(),只有當新數據與先前的不一樣,纔會輸出,
  • first(),last();
  • firstOrDefault(),lastOrDefault,若是Observable沒有輸出任何數據時,咱們能夠給一個默認值;
  • elementAt(int n),輸出第n個位置上的數據(從0 開始)
  • timeout(),若是在給定時間間隔內,沒有輸出有效數據,則會執行onError();
  • delay() 用於事件流中,延遲一段時間再發送來自Observable的結果;

4.2 映射

  • map(),用來映射簡單的數據
  • flatMap(),用來映射隊列等,但可能會改變數據的順序
  • concatMap(),解決了fmp的交錯的問題
  • flatMapIterable(),將生成的Iterable與Items進行對應起來(相似於key-value);
  • switchMap(),
    這幾個方法都是將輸入的數據以一種新的形式輸出;
  • Scan(),相似於一個累加的方法,後一個item是前面item的後再加上原來的item;
  • GroupBy(),
  • buffer(int n),將數據做爲列表(每n個數據做爲一個列表)輸出而不是單個的Items;
  • cast() 相似於map();併發

    4.3 合併

  • merge()能夠將多個輸入整合成一個輸出(並不會合併Items);
  • zip(),能夠將多個輸入整合成一個輸出(會合並Items);app

4.3 重試

  • retryWhen(),當接收到onError()事件時,觸發從新訂閱(發生某些錯誤時,須要作什麼工做);
  • repeat(),當接收到onComplete()事件時,觸發從新訂閱

    4.4 線程的調度(Schedulers)

    RxJava提供了5種類型的調度者:
    Schedulers
  • .io(), 使用線程池來爲IO操做進行調度,但沒有騎士線程池的大小 做限制,所以使用時須要考慮內存的使用
  • .computation(),與IO無關的計算型調度,有不少RxJava相關的默認方法:buffer(),debounce(),delay(),interval()等默認是在該類線程中執行;
  • .immediate(),在當前線程中快速開始某項操做,是方法:timeout(),timeInterval等的默認調度器;
  • .newThread(), 開啓新線程來執行某項操做
  • .trampoline(),爲一些不須要當即執行的任務進行調度,會依次執行隊列裏的任務,是方法:repeat(),retry()的默認調度器;

RxAndroid還提供了一個Android特有的調度器:AndroidSchedulers.mainThread()來讓代碼在UI主線程中執行;

RxJava提供了一個每個Observables均可以使用的subscribeOn(),ObserveOn()方法,將Scheduler與Observables創建聯繫,咱們能夠這樣來使用:

.subscribeOn(Schedulers.io())  // 讓事件的產生髮生在IO線程,屢次調用,以最後一次調用的結果爲準
.observeOn(AndroidSchedulers.mainThread()) // 讓事件的回調發生在UI主線程中
.subscribe(....)

5. RxAndroid

前面版本的RxAndroid還提供了AppObservable,ViewObservable,WidgetObservable,LifecycleObservable 等,但最新版本的RxAndroid直接刪掉了這些;

相關的使用例子可參考這個,也能夠參考我寫的 小Demo

6.參考文獻:

1
Grokking RxJava,Part 4
RxJava處理網絡失敗
RxJava Essentials
給Android開發者的RxJava詳解
拆RxJava

相關文章
相關標籤/搜索