Android 異步框架 RxJava2

觀察者模式的概念

RxJava是android的異步框架,官方介紹是可觀測的序列,組成異步基於事件程序的庫。特色是觀察者模式,基於事件流的鏈式調用,隨着異步操做調度過程複雜的狀況下,程序邏輯也變得愈來愈複雜,但RxJava依然可以保持簡潔。java

簡單的說觀察者A與被觀察者B創建訂閱關係,當被觀察者B發生某種改變時,當即通知觀察者Areact

添加依賴

compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

基本模式

Observable被觀察者

注意各地方添加泛型避免大片警告,onNext()是事件的回調,onComplete()是事件的結尾。onComplete()與onError互斥須要保持惟一性,並只能調用一次。android

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("消息1");
                e.onNext("消息2");
                e.onNext("消息3");
                e.onComplete();
    }
});

Observer觀察者

建立觀察者時回調的onSubscribe能夠獲取Disposable對象,在合適的時候判斷條件,調用dispose()便可接觸訂閱關係緩存

Observer<String> observer=new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //經過判斷解除訂閱關係
         d.dispose();
    }

    @Override
    public void onNext(String o) {
        //對應observable的onNext方法
    }

    @Override
    public void onError(Throwable e) {
        //對應observable的onError方法
    }

    @Override
    public void onComplete() {
        //對應observable的onComplete方法
    }
};

創建訂閱關係

observable.subscribeOn(Schedulers.io()) //指定事件生產在子線程
          .observeOn(AndroidSchedulers.mainThread()) //指定事件消費在UI線程
          .subscribe(observer);

Observable被觀察者的其餘模式

//just模式,將自動發送onNext()事件
Observable<String> observable = Observable.just("發送消息");

//fromIterable模式,遍歷集合,並自動發送onNext()事件
Observable<String> observable = Observable.fromIterable((Iterable<String>) mList);

//interval模式,定時自動發送整數序列,從0開始每隔2秒計數,
Observable<Long> observable = Observable.interval(0,2, TimeUnit.SECONDS)

//range模式,自動發送特定的整數序列,0表示不發送,負數會拋異常,從1開始發送到20
Observable<Integer> observable = Observable.range(1,20);

//timer模式,定時執行觀察者的onNext()方法
Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

Observable被觀察者的更多建立方式以及操做符

如建立操做,數據過濾操做,條件操做,轉載如下博客,很詳細:框架

RxJava操做符大全

Scheduler調度器

四種常見模式

Schedulers.immediate() 默認模式,在當前線程運行異步

Schedulers.newThread() 建立新的子線程運行ide

Schedulers.io() 建立新的子線程運行,內部使用的是無上限的線程池,可重用空閒的線程,效率高spa

 AndroidSchedulers.mainThread() 在UI主線程運行線程

訂閱事件時的生產與消費線程

subscribeOn() 指定Observable(被觀察者)所在的線程,或者叫作事件產生的線程code

observeOn() 指定 Observer(觀察者)所運行在的線程,或者叫作事件消費的線程

新的觀察者模式

Flowable被觀察者

Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("hello RxJava!");
                e.onComplete();
    }
},BackpressureStrategy.BUFFER);//增長背壓模式

Subscriber觀察者

onSubscribe()會返回Subscription對象,調用cancel()便可取消訂閱關係,request()便可指定消費事件的數量 

Subscriber<String> subscriber=new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
         s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String s) {
        Log.i("RxJava", "onNext: "+s);
    }

    @Override
    public void onError(Throwable t) {
        Log.i("RxJava", "onError");
    }

    @Override
    public void onComplete() {
        Log.i("RxJava", "onComplete");
    }
};
flowable.subscribe(subscriber);//創建訂閱關係

Backpressure背壓模式

若是生產者和消費者不在同一線程的狀況下,若是生產者的速度大於消費者的速度,就會產生Backpressure問題。即異步狀況下,Backpressure問題纔會存在。

BUFFER

所謂BUFFER就是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存不少不少的數據。
這樣,消費者經過request()即便傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。
可是這種方式任然比較消耗內存,除非是咱們比較瞭解消費者的消費能力,可以把握具體狀況,不會產生OOM。

DROP

當消費者處理不了事件,就丟棄。
消費者經過request()傳入其需求n,而後生產者把n個事件傳遞給消費者供其消費。其餘消費不掉的事件就丟掉

LATEST

LATEST與DROP功能基本一致,惟一的區別就是LATEST總能使消費者可以接收到生產者產生的最後一個事件

ERROR

這種方式會在產生Backpressure問題的時候直接拋出一個異常,這個異常就是著名的MissingBackpressureException

相關文章
相關標籤/搜索