進階之路與項目使用

一、事件監聽註冊與反註冊

  • BehaviorSubject
  • PublishSubject
  • AsyncSubject
  • ReplaySubject
  • SerializedSubject
  • CompositeDisposable

 1.1   Subjecthtml

  • 都繼承Subject,既是Observable,也是Observer
  • 通常做爲Observable使用,能夠註冊多個Observer,就像咱們之前註冊多個listener同樣

 

1.1.1     AsyncSubjectjava

  • 一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。(若是原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。
  • 然而,若是原始的Observable由於發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

 

1.1.2     BehaviorSubject react

  • BehaviorSubject每次註冊會發射註冊以前最近發射的一個事件和註冊以後的事件(好比系統廣播,監聽網絡變化,只要設置監聽,就會發送一條網絡變化的廣播等等)
  • 然而,若是原始的Observable由於發生了一個錯誤而終止,BehaviorSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

  // observer will receive all 4 events (including "default").
  BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
  subject.subscribe(observer);
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive only onComplete
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onComplete();
  subject.subscribe(observer);

  // observer will receive only onError
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onError(new RuntimeException("error"));
  subject.subscribe(observer);
  } </pre>
 *

 

1.1.3     PublishSubject  緩存

  • PublishSubject只發射註冊以後發射的事件(不關心以前發射的數據)
  • 若是原始的Observable由於發生了一個錯誤而終止,PublishSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onComplete events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onComplete
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onComplete();

 

1.1.4     ReplaySubject網絡

  •  ReplaySubject會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。也有其它版本的ReplaySubject,在重放緩存增加到必定大小的時候或過了一段時間後會丟棄舊的數據(原始Observable發射的)。

 

緩存2條數據:
ReplaySubject subject = ReplaySubject.createWithSize(2);
        subject.onNext(0);
        subject.onNext(1);
        subject.onNext(3);
        subject.onNext(4);
        subject.onNext(5);
 
        subject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
 
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: onComplete");
            }
        });
        subject.onNext(6);
        subject.onNext(7);
        subject.onComplete();


輸出結果:
accept: 4
accept: 5
accept: 6
accept: 7
run: onComplete

 

1.1.5     SerializedSubjectide

  • 若是你把 Subject 看成一個 Observable使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
  • 串行化:要避免此類問題,你能夠將 Subject 轉換爲一個 SerializedSubject ,直接調用Subject的toSerialized接口:post

    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }

 

1.2   CompositeDisposablethis

  • rxjava雖然好用,可是總所周知,容易遭層內存泄漏。也就說在訂閱了事件後沒有及時取閱,致使在activity或者fragment銷燬後仍然佔用着內存,沒法釋放。而disposable即是這個訂閱事件,能夠用來取消訂閱。可是在何時取消訂閱呢?能夠使用CompositeDisposable
  • 定義:一個disposable的容器,能夠容納多個disposable,添加和去除的複雜度爲O(1)。 
/**
 * A disposable container that can hold onto multiple other disposables and
 * offers O(1) add and removal complexity.
 */
  • 若是這個CompositeDisposable容器已是處於dispose的狀態,那麼全部加進來的disposable都會被自動切斷。因此說能夠建立一個BaseActivity,用CompositeDisposable來管理訂閱事件disposable,而後在acivity銷燬的時候,調用compositeDisposable.dispose()就能夠切斷全部訂閱事件,防止內存泄漏。
  • 源碼

1.3   項目spa

 

二、單事件監聽

  • Completable
  • Single
  • Maybe

 2.1  定義線程

  • 在Rxjava2中,Observale和Flowable都是用來發射數據流的,可是,咱們在實際應用中,不少時候,須要發射的數據並非數據流的形式,而只是一條單一的數據,或者一條完成通知,或者一條錯誤通知。在這種狀況下,咱們再使用Observable或者Flowable就顯得有點大材小用,因而,爲了知足這種單一數據或通知的使用場景,便出現了Observable的簡化版——Single、Completable、Maybe。
 2.2  Single
  • 只發射一條單一的數據,或者一條異常通知,不能發射完成通知,其中數據與通知只能發射一個。
  • 項目
 * Example:
 * <pre><code>
 * Disposable d = Single.just("Hello World")
 *    .delay(10, TimeUnit.SECONDS, Schedulers.io())
 *    .subscribeWith(new DisposableSingleObserver&lt;String&gt;() {
 *        &#64;Override
 *        public void onStart() {
 *            System.out.println("Started");
 *        }
 *
 *        &#64;Override
 *        public void onSuccess(String value) {
 *            System.out.println("Success: " + value);
 *        }
 *
 *        &#64;Override
 *        public void onError(Throwable error) {
 *            error.printStackTrace();
 *        }
 *    });
 * 
 * Thread.sleep(5000);
 * 
 * d.dispose();
 * </code></pre>

 

 2.3 Completable

  • 只發射一條完成通知,或者一條異常通知,不能發射數據,其中完成通知與異常通知只能發射一個 
  • 項目
 * Example:
 * <pre><code>
 * Disposable d = Completable.complete()
 *    .delay(10, TimeUnit.SECONDS, Schedulers.io())
 *    .subscribeWith(new DisposableCompletableObserver() {
 *        &#64;Override
 *        public void onStart() {
 *            System.out.println("Started");
 *        }
 *
 *        &#64;Override
 *        public void onError(Throwable error) {
 *            error.printStackTrace();
 *        }
 *
 *        &#64;Override
 *        public void onComplete() {
 *            System.out.println("Done!");
 *        }
 *    });
 * 
 * Thread.sleep(5000);
 * 
 * d.dispose();
 * </code></pre>

 

 2.4 Maybe

  • 可發射一條單一的數據,以及發射一條完成通知,或者一條異常通知,其中完成通知和異常通知只能發射一個,發射數據只能在發射完成通知或者異常通知以前,不然發射數據無效。 
 * Example:
 * <pre><code>
 * Disposable d = Maybe.just("Hello World")
 *    .delay(10, TimeUnit.SECONDS, Schedulers.io())
 *    .subscribeWith(new DisposableMaybeObserver&lt;String&gt;() {
 *        &#64;Override
 *        public void onStart() {
 *            System.out.println("Started");
 *        }
 *
 *        &#64;Override
 *        public void onSuccess(String value) {
 *            System.out.println("Success: " + value);
 *        }
 *
 *        &#64;Override
 *        public void onError(Throwable error) {
 *            error.printStackTrace();
 *        }
 *
 *        &#64;Override
 *        public void onComplete() {
 *            System.out.println("Done!");
 *        }
 *    });
 * 
 * Thread.sleep(5000);
 * 
 * d.dispose();
 * </code></pre>

 

三、EventBus用RxJava實現(即廣播)

  • 定義
public final class RxBus {

    private final Subject mBus;

    public static RxBus getInstance() {
        return Holder.INSTANCE;
    }

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public <U> Observable<U> register(final Class<U> clazz) {
        return mBus.ofType(clazz);
    }

    public void post(Object o) {
        mBus.onNext(o);
    }

    public void unregister(Disposable d) {
        if (d != null && !d.isDisposed()) {
            d.dispose();
        }
    }

    private static final class Holder {
        private static final RxBus INSTANCE = new RxBus();
    }
}
  • 註冊
        Disposable disposable1 = RxBus.getInstance().register(ChapterOne.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ChapterOne>() {
                    @Override
                    public void accept(ChapterOne chapterOne) throws Exception {
                        Log.d(TAG, "receive event ChapterOne");
                    }
                });
        mDisposable1 = disposable1;
  • 發送事件
RxBus.getInstance().post(new ChapterOne());
  • 反註冊
    @Override
    protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().unregister(mDisposable1);
    }
相關文章
相關標籤/搜索