主講人:陽石柏java
RxJava 在 GitHub 主頁上的自我介紹是 「a library for composing asynchronous and event-based programs using observable sequences for the Java VM」(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫),這就是RxJava。然而對於剛接觸Rxjava的人來講,這種歸納顯然變得晦澀難懂,那麼RxJava究竟是什麼?其實初學RxJava只要把握兩點:觀察者模式和異步,就基本能夠熟練使用RxJava了。react
異步在這裏並不須要作太多的解釋,由於在概念和使用上,並無太多高深的東西。大概就是你腦子裏想能到的那些多線程,線程切換這些東西。我會在後面會講解它的用法。咱們先把觀察者模式說清楚,「按下開關,檯燈燈亮」api
觀察上圖,其實已經很明瞭了,不過須要指出一下幾點(對於下面理解RxJava很重要):多線程
這三點對於咱們理解RxJava很是重要。由於上述三條分別對應了RxJava中被觀察者(Observable),觀察者(Observer)和操做符的職能。而觀察者模式又是RxJava程序運行的骨架。RxJava也是基於觀察者模式來組建本身的程序邏輯的,就是構建被觀察者(Observable),觀察者(Observer/Subscriber),而後創建兩者的訂閱關係(就像那根電線,鏈接起檯燈和開關)實現觀察,在事件傳遞過程當中還能夠對事件作各類處理。架構
Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("On"); subscriber.onNext("Off"); subscriber.onNext("On"); subscriber.onNext("On"); subscriber.onCompleted(); } });
這是最正宗的寫法,建立了一個開關類,產生了五個事件,分別是:開,關,開,開,結束。異步
Observable switcher=Observable.just("On","Off","On","On");
String [] kk={"On","Off","On","On"}; Observable switcher=Observable.from(kk);
Subscriber light=new Subscriber<String>() { @Override public void onCompleted() { //被觀察者的onCompleted()事件會走到這裏; Log.d("DDDDDD","結束觀察...\n"; } @Override public void onError(Throwable e) { //出現錯誤會調用這個方法 } @Override public void onNext(String s) { //處理傳過來的onNext事件 Log.d("DDDDD","handle this---"+s) }
Action1 light =new Action1<String>() { @Override public void call(String s) { Log.d("DDDDD","handle this---"+s) } }
switcher.subscribe(light); 這裏有不少初學者的疑問就是明明是檯燈觀察開關,正常來看就應是light.subscribe(switcher)纔對,之因此「開關訂閱檯燈」,是爲了保證流式API調用風格
//這就是RxJava的流式API調用 Observable.just("On","Off","On","On") //在傳遞過程當中對事件進行過濾操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s!=null; } }) .subscribe(mSubscriber);
因爲被觀察者產生事件,是事件的起點,那麼開頭就是用Observable這個主體調用來建立被觀察者,產生事件,爲了保證流式API調用規則,就直接讓Observable做爲惟一的調用主體,一路調用下去。async
一句話,背後的真實的邏輯依然是檯燈訂閱了開關,可是在表面上,咱們讓開關「僞裝」訂閱了檯燈,以便於保持流式API調用風格不變。ide
好了,如今分解動做都完成了,已經架構了一個基本的RxJava事件處理流程。學習
咱們再來按照觀察者模式的運做流程和流式Api的寫法複習一遍:this
結合流程圖的相應代碼實例以下:
//建立被觀察者,是事件傳遞的起點 Observable.just("On","Off","On","On") //這就是在傳遞過程當中對事件進行過濾操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s!=null; } }) //實現訂閱 .subscribe( //建立觀察者,做爲事件傳遞的終點處理事件 new Subscriber<String>() { @Override public void onCompleted() { Log.d("DDDDDD","結束觀察...\n"); } @Override public void onError(Throwable e) { //出現錯誤會調用這個方法 } @Override public void onNext(String s) { //處理事件 Log.d("DDDDD","handle this---"+s) } );
1.建立被觀察者,產生事件
2.設置事件傳遞過程當中的過濾,合併,變換等加工操做。
3.訂閱一個觀察者對象,實現事件最終的處理。
Tips: 當調用訂閱操做(即調用Observable.subscribe()方法)的時候,被觀察者才真正開始發出事件。
好比被觀察者產生的事件中只有圖片文件路徑;,可是在觀察者這裏只想要bitmap,那麼就須要類型變換。
Observable.just(getFilePath())
//指定了被觀察者執行的線程環境
.subscribeOn(Schedulers.newThread())
//將接下來執行的線程環境指定爲io線程
.observeOn(Schedulers.io())
//使用map操做來完成類型轉換
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法,是一個極其耗時的操做
return createBitmapFromPath(s);
}
})
//將後面執行的線程環境切換爲主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
//建立觀察者,做爲事件傳遞的終點處理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","結束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現錯誤會調用這個方法
}
@Override
public void onNext(Bitmap s) {
//處理事件
showBitmap(s)
}
);
實際上在使用map操做時,new Func1() 就對應了類型的轉變的方向,String是原類型,Bitmap是轉換後的類型。在call()方法中,輸入的是原類型,返回轉換後的類型而進行讀取文件,建立bitmap多是一個耗時操做,那麼就應該在子線程中執行,主線程應該僅僅作展現。那麼線程切換通常就會是比較複雜的事情了。可是在Rxjava中,是很是方便的。
異步是相對於主線程來說的子線程操做,在這裏咱們不妨使用線程調度這個概念更加貼切。
首先介紹一下RxJava的線程環境有哪些選項:
使用代碼解釋更加簡潔方便:
//new Observable.just()執行在新線程 Observable.just(getFilePath()) //指定在新線程中建立被觀察者 .subscribeOn(Schedulers.newThread()) //將接下來執行的線程環境指定爲io線程 .observeOn(Schedulers.io()) //map就處在io線程 .map(mMapOperater) //將後面執行的線程環境切換爲主線程, //可是這一句依然執行在io線程 .observeOn(AndroidSchedulers.mainThread()) //指定線程無效,但這句代碼自己執行在主線程 .subscribeOn(Schedulers.io()) //執行在主線程 .subscribe(mSubscriber);
1.subscribeOn()它指示Observable在一個指定的調度器上建立(只做用於被觀察者建立階段)。只能指定一次,若是指定屢次則以第一次爲準
2.observeOn()指定在事件傳遞(加工變換)和最終被處理(觀察者)的發生在哪個調度器。可指定屢次,每次指定完都在下一步生效。
RxJava是一個觀察者模式的架構,當這個架構中被觀察者(Observable)和觀察者(Subscriber)處在不一樣的線程環境中時,因爲者各自的工做量不同,致使它們產生事件和處理事件的速度不同,這就會出現兩種狀況:
1.被觀察者產生事件慢一些,觀察者處理事件很快。那麼觀察者就會等着被觀察者發送事件,(比如觀察者在等米下鍋,程序等待,這沒有問題)。
2.被觀察者產生事件的速度很快,而觀察者處理很慢。那就出問題了,若是不做處理的話,事件會堆積起來,最終擠爆你的內存,致使程序崩潰。(比如被觀察者生產的大米沒人吃,堆積最後就會爛掉)。 下面咱們用代碼演示一下這種崩潰的場景:
//被觀察者在主線程中,每1ms發送一個事件 Observable.interval(1, TimeUnit.MILLISECONDS) //將觀察者的工做放在新線程環境中 .observeOn(Schedulers.newThread()) //觀察者處理每1000ms才處理一個事件 .subscribe(new Action1() { @Override public void call(Long aLong) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("TAG","---->"+aLong); } });
這段代碼運行以後:
... Caused by: rx.exceptions.MissingBackpressureException ... ...
拋出MissingBackpressureException每每就是由於,被觀察者發送事件的速度太快,而觀察者處理太慢,並且你尚未作相應措施,因此報異常.
簡而言之:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略,即背壓是流速控制的一種策略。
有兩點要強調的是:
1.背壓策略的一個前提是異步環境,也就是說,被觀察者和觀察者處在不一樣的線程環境中。
2.背壓(Backpressure)並非一個像flatMap同樣能夠在程序中直接使用的操做符,他只是一種控制事件流速的策略。
那麼咱們再回看上面的程序異常就很好理解了,就是當被觀察者發送事件速度過快的狀況下,咱們沒有作流速控制,致使了異常。
由以前咱們能夠了解到,在RxJava的觀察者模型中,被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。而響應式拉取則反過來,觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據。其結構示意圖以下:
總的來講:
1.背壓是一種策略,具體措施是下游觀察者通知上游的被觀察者發送事件
2.背壓策略很好的解決了異步環境下被觀察者和觀察者速度不一致的問題
3.在RxJava1.X中,一樣是Observable,有的不支持背壓策略,致使某些狀況下,顯得特別麻煩,出了問題也很難排查,使得RxJava的學習曲線變得十份陡峭。
首先要強調的一點是,RxJava以觀察者模式爲骨架,在2.0中依然如此。
不過這次更新中,出現了兩種觀察者模式:
Observable(被觀察者)/Observer(觀察者)
Flowable(被觀察者)/Subscriber(觀察者)
Observable正經常使用法: Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }); Observer mObserver=new Observer<Integer>() { //這是新加入的方法,在訂閱後發送數據以前, //回首先調用這個方法,而Disposable可用於取消訂閱 @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; mObservable.subscribe(mObserver);
Flowable.range(0,10) .subscribe(new Subscriber<Integer>() { Subscription sub; //當訂閱後,會首先調用這個方法,其實就至關於onStart(), //傳入的Subscription s參數能夠用於請求數據或者取消訂閱 @Override public void onSubscribe(Subscription s) { Log.w("TAG","onsubscribe start"); sub=s; sub.request(1); Log.w("TAG","onsubscribe end"); } @Override public void onNext(Integer o) { Log.w("TAG","onNext--->"+o); sub.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { Log.w("TAG","onComplete"); } });
輸出以下:
onsubscribe start
onNext—>0
onNext—>1
onNext—>2
…
onNext—>9
onComplete
onsubscribe end
這一塊其實能夠說沒什麼改動,大部分以前你用過的操做符都沒變,即便有所變更,也只是包名或類名的改動。你們可能常常用到的就是Action和Function。
在1.0中,這類接口是從Action0,Action1…日後排(數字表明可接受的參數),如今作出了改動
Rx1.0———–Rx2.0
Action0——–Action
Action1——–Consumer
Action2——–BiConsumer
後面的Action都去掉了,只保留了ActionN
同上,也是命名方式的改變
上面那兩個類,和RxJava1.0相比,他們都增長了throws Exception,也就是說,在這些方法作某些操做就不須要try-catch。
例如:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
Files.readLines(name)這類io方法原本是須要try-catch的,如今直接拋出異常,就能夠放心的使用lambda ,保證代碼的簡潔優美。