RxJava使用介紹

主講人:陽石柏java

  • RxJava基本概念
  • 背壓概念介紹
  • RxJava 2.0版本介紹及更新

一.RxJava基本概念

RxJava 在 GitHub 主頁上的自我介紹是 「a library for composing asynchronous and event-based programs using observable sequences for the Java VM」(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫),這就是RxJava。然而對於剛接觸Rxjava的人來講,這種歸納顯然變得晦澀難懂,那麼RxJava究竟是什麼?其實初學RxJava只要把握兩點:觀察者模式和異步,就基本能夠熟練使用RxJava了。react

異步在這裏並不須要作太多的解釋,由於在概念和使用上,並無太多高深的東西。大概就是你腦子裏想能到的那些多線程,線程切換這些東西。我會在後面會講解它的用法。咱們先把觀察者模式說清楚,「按下開關,檯燈燈亮」api

在這個事件中,檯燈做爲觀察者,開關做爲被觀察者,檯燈透過電線來觀察開關的狀態來並作出相應的處理

image

觀察上圖,其實已經很明瞭了,不過須要指出一下幾點(對於下面理解RxJava很重要):多線程

開關(被觀察者)做爲事件的產生方(生產「開」和「關」這兩個事件),是主動的,是整個開燈事理流程的起點。

檯燈(觀察者)做爲事件的處理方(處理「燈亮」和「燈滅」這兩個事件),是被動的,是整個開燈事件流程的終點。

在起點和終點之間,即事件傳遞的過程當中是能夠被加工,過濾,轉換,合併等等方式處理的(上圖沒有體現,後面會講到)。

這三點對於咱們理解RxJava很是重要。由於上述三條分別對應了RxJava中被觀察者(Observable),觀察者(Observer)和操做符的職能。而觀察者模式又是RxJava程序運行的骨架。RxJava也是基於觀察者模式來組建本身的程序邏輯的,就是構建被觀察者(Observable),觀察者(Observer/Subscriber),而後創建兩者的訂閱關係(就像那根電線,鏈接起檯燈和開關)實現觀察,在事件傳遞過程當中還能夠對事件作各類處理。架構

建立被觀察者

正常模式:

Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("On"); subscriber.onNext("Off"); subscriber.onNext("On"); subscriber.onNext("On"); subscriber.onCompleted(); } }); 

這是最正宗的寫法,建立了一個開關類,產生了五個事件,分別是:開,關,開,開,結束。異步

偷懶模式1

Observable switcher=Observable.just("On","Off","On","On"); 

偷懶模式2

String [] kk={"On","Off","On","On"}; Observable switcher=Observable.from(kk); 

偷懶模式是一種簡便的寫法,實際上也都是被觀察者把那些信息」On」,」Off」,」On」,」On」,包裝成onNext(」On」)這樣的事件依次發給觀察者,固然,它本身補上了onComplete()事件。

以上是最經常使用到的建立方式,好了,咱們就建立了一個開關類。

建立觀察者

正常模式

Subscriber light=new Subscriber<String>() { @Override public void onCompleted() { //被觀察者的onCompleted()事件會走到這裏; Log.d("DDDDDD","結束觀察...\n"; } @Override public void onError(Throwable e) { //出現錯誤會調用這個方法 } @Override public void onNext(String s) { //處理傳過來的onNext事件 Log.d("DDDDD","handle this---"+s) } 

偷懶模式(非正式寫法)

Action1 light =new Action1<String>() { @Override public void call(String s) { Log.d("DDDDD","handle this---"+s) } } 

之因此說它是非正式寫法,是由於Action1是一個單純的人畜無害的接口,和Observer沒有啥關係,只不過它能夠當作觀察者來使,專門處理onNext 事件,這是一種爲了簡便偷懶的寫法。固然還有Action0,Action2,Action3…,0,1,2,3分別表示call()這個方法能接受幾個參數。若是你還不懂,能夠暫時跳過。後面我也會盡可能使用new Subscriber方式,建立正統的觀察者,便於大家理解。

訂閱

如今已經建立了觀察者和被觀察者,可是二者尚未聯繫起來

switcher.subscribe(light); 這裏有不少初學者的疑問就是明明是檯燈觀察開關,正常來看就應是light.subscribe(switcher)纔對,之因此「開關訂閱檯燈」,是爲了保證流式API調用風格 

啥是流式API調用風格?

//這就是RxJava的流式API調用 Observable.just("On","Off","On","On") //在傳遞過程當中對事件進行過濾操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s=null; } }) .subscribe(mSubscriber); 

上面就是一個很是簡易的RxJava流式API的調用:同一個調用主體一路調用下來,一鼓作氣。

因爲被觀察者產生事件,是事件的起點,那麼開頭就是用Observable這個主體調用來建立被觀察者,產生事件,爲了保證流式API調用規則,就直接讓Observable做爲惟一的調用主體,一路調用下去。async

一句話,背後的真實的邏輯依然是檯燈訂閱了開關,可是在表面上,咱們讓開關「僞裝」訂閱了檯燈,以便於保持流式API調用風格不變。ide

好了,如今分解動做都完成了,已經架構了一個基本的RxJava事件處理流程。學習

咱們再來按照觀察者模式的運做流程和流式Api的寫法複習一遍:this

image

結合流程圖的相應代碼實例以下:

//建立被觀察者,是事件傳遞的起點 Observable.just("On","Off","On","On") //這就是在傳遞過程當中對事件進行過濾操做 .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s=null; } }) //實現訂閱 .subscribe( //建立觀察者,做爲事件傳遞的終點處理事件 new Subscriber<String>() { @Override public void onCompleted() { Log.d("DDDDDD","結束觀察...\n"); } @Override public void onError(Throwable e) { //出現錯誤會調用這個方法 } @Override public void onNext(String s) { //處理事件 Log.d("DDDDD","handle this---"+s) } ); 

至此,基本上咱們就把RxJava的骨架就講完了,總結一下:

1.建立被觀察者,產生事件

2.設置事件傳遞過程當中的過濾,合併,變換等加工操做。

3.訂閱一個觀察者對象,實現事件最終的處理。

Tips: 當調用訂閱操做(即調用Observable.subscribe()方法)的時候,被觀察者才真正開始發出事件。

RxJava的操做符

Map操做

好比被觀察者產生的事件中只有圖片文件路徑;,可是在觀察者這裏只想要bitmap,那麼就須要類型變換。

Observable.just(getFilePath())
           //指定了被觀察者執行的線程環境
          .subscribeOn(Schedulers.newThread())
          //將接下來執行的線程環境指定爲io線程
          .observeOn(Schedulers.io())
            //使用map操做來完成類型轉換
            .map(new Func1<String, Bitmap>() {
              @Override
              public Bitmap call(String s) {
                //顯然自定義的createBitmapFromPath(s)方法,是一個極其耗時的操做
                  return createBitmapFromPath(s);
              }
          })
            //將後面執行的線程環境切換爲主線程
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                 //建立觀察者,做爲事件傳遞的終點處理事件    
                  new Subscriber<Bitmap>() {
                        @Override
                        public void onCompleted() {
                            Log.d("DDDDDD","結束觀察...\n");
                        }

                        @Override
                        public void onError(Throwable e) {
                            //出現錯誤會調用這個方法
                        }
                        @Override
                        public void onNext(Bitmap s) {
                            //處理事件
                            showBitmap(s)
                        }
                    );

實際上在使用map操做時,new Func1() 就對應了類型的轉變的方向,String是原類型,Bitmap是轉換後的類型。在call()方法中,輸入的是原類型,返回轉換後的類型而進行讀取文件,建立bitmap多是一個耗時操做,那麼就應該在子線程中執行,主線程應該僅僅作展現。那麼線程切換通常就會是比較複雜的事情了。可是在Rxjava中,是很是方便的。

異步(線程調度)

異步是相對於主線程來說的子線程操做,在這裏咱們不妨使用線程調度這個概念更加貼切。

首先介紹一下RxJava的線程環境有哪些選項:

image

使用代碼解釋更加簡潔方便:

//new Observable.just()執行在新線程 Observable.just(getFilePath()) //指定在新線程中建立被觀察者 .subscribeOn(Schedulers.newThread()) //將接下來執行的線程環境指定爲io線程 .observeOn(Schedulers.io()) //map就處在io線程 .map(mMapOperater) //將後面執行的線程環境切換爲主線程, //可是這一句依然執行在io線程 .observeOn(AndroidSchedulers.mainThread()) //指定線程無效,但這句代碼自己執行在主線程 .subscribeOn(Schedulers.io()) //執行在主線程 .subscribe(mSubscriber); 

實際上線程調度只有subscribeOn()和observeOn()兩個方法。對於初學者,只須要掌握兩點:

1.subscribeOn()它指示Observable在一個指定的調度器上建立(只做用於被觀察者建立階段)。只能指定一次,若是指定屢次則以第一次爲準

2.observeOn()指定在事件傳遞(加工變換)和最終被處理(觀察者)的發生在哪個調度器。可指定屢次,每次指定完都在下一步生效。

二.背壓概念介紹

RxJava是一個觀察者模式的架構,當這個架構中被觀察者(Observable)和觀察者(Subscriber)處在不一樣的線程環境中時,因爲者各自的工做量不同,致使它們產生事件和處理事件的速度不同,這就會出現兩種狀況:

1.被觀察者產生事件慢一些,觀察者處理事件很快。那麼觀察者就會等着被觀察者發送事件,(比如觀察者在等米下鍋,程序等待,這沒有問題)。

2.被觀察者產生事件的速度很快,而觀察者處理很慢。那就出問題了,若是不做處理的話,事件會堆積起來,最終擠爆你的內存,致使程序崩潰。(比如被觀察者生產的大米沒人吃,堆積最後就會爛掉)。 下面咱們用代碼演示一下這種崩潰的場景:

//被觀察者在主線程中,每1ms發送一個事件 Observable.interval(1, TimeUnit.MILLISECONDS) //將觀察者的工做放在新線程環境中 .observeOn(Schedulers.newThread()) //觀察者處理每1000ms才處理一個事件 .subscribe(new Action1() { @Override public void call(Long aLong) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.w("TAG","---->"+aLong); } }); 

在上面的代碼中,被觀察者發送事件的速度是觀察者處理速度的1000倍

這段代碼運行以後:

... Caused by: rx.exceptions.MissingBackpressureException ... ... 

拋出MissingBackpressureException每每就是由於,被觀察者發送事件的速度太快,而觀察者處理太慢,並且你尚未作相應措施,因此報異常.

簡而言之:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略,即背壓是流速控制的一種策略。

有兩點要強調的是:

1.背壓策略的一個前提是異步環境,也就是說,被觀察者和觀察者處在不一樣的線程環境中。

2.背壓(Backpressure)並非一個像flatMap同樣能夠在程序中直接使用的操做符,他只是一種控制事件流速的策略。

那麼咱們再回看上面的程序異常就很好理解了,就是當被觀察者發送事件速度過快的狀況下,咱們沒有作流速控制,致使了異常。

響應式拉取(reactive pull)

由以前咱們能夠了解到,在RxJava的觀察者模型中,被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。而響應式拉取則反過來,觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據。其結構示意圖以下: image

總的來講:

1.背壓是一種策略,具體措施是下游觀察者通知上游的被觀察者發送事件

2.背壓策略很好的解決了異步環境下被觀察者和觀察者速度不一致的問題

3.在RxJava1.X中,一樣是Observable,有的不支持背壓策略,致使某些狀況下,顯得特別麻煩,出了問題也很難排查,使得RxJava的學習曲線變得十份陡峭。

三.Rxjava 2.0版本介紹及更新

首先要強調的一點是,RxJava以觀察者模式爲骨架,在2.0中依然如此。

不過這次更新中,出現了兩種觀察者模式:

Observable(被觀察者)/Observer(觀察者)

Flowable(被觀察者)/Subscriber(觀察者)

image

RxJava2.X中,Observeable用於訂閱Observer,是不支持背壓的,而Flowable用於訂閱Subscriber,是支持背壓(Backpressure)的。

Observable/Observer(這種觀察者模型是不支持背壓的)

Observable正經常使用法: Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }); Observer mObserver=new Observer<Integer>() { //這是新加入的方法,在訂閱後發送數據以前, //回首先調用這個方法,而Disposable可用於取消訂閱 @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; mObservable.subscribe(mObserver); 

Flowable/Subscriber

Flowable.range(0,10) .subscribe(new Subscriber<Integer>() { Subscription sub; //當訂閱後,會首先調用這個方法,其實就至關於onStart(), //傳入的Subscription s參數能夠用於請求數據或者取消訂閱 @Override public void onSubscribe(Subscription s) { Log.w("TAG","onsubscribe start"); sub=s; sub.request(1); Log.w("TAG","onsubscribe end"); } @Override public void onNext(Integer o) { Log.w("TAG","onNext--->"+o); sub.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { Log.w("TAG","onComplete"); } }); 

輸出以下:

onsubscribe start

onNext—>0

onNext—>1

onNext—>2

onNext—>9

onComplete

onsubscribe end

操做符相關

這一塊其實能夠說沒什麼改動,大部分以前你用過的操做符都沒變,即便有所變更,也只是包名或類名的改動。你們可能常常用到的就是Action和Function。

Action相關

在1.0中,這類接口是從Action0,Action1…日後排(數字表明可接受的參數),如今作出了改動

Rx1.0———–Rx2.0

Action0——–Action

Action1——–Consumer

Action2——–BiConsumer

後面的Action都去掉了,只保留了ActionN

Function相關

同上,也是命名方式的改變

上面那兩個類,和RxJava1.0相比,他們都增長了throws Exception,也就是說,在這些方法作某些操做就不須要try-catch。

例如:

Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace); 

Files.readLines(name)這類io方法原本是須要try-catch的,如今直接拋出異常,就能夠放心的使用lambda ,保證代碼的簡潔優美。

相關文章
相關標籤/搜索