1.1 Subjecthtml
1.1.1 AsyncSubjectjava
1.1.2 BehaviorSubject react
// observer will receive all 4 events (including "default"). BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default"); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); // observer will receive the "one", "two" and "three" events, but not "zero" BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.subscribe(observer); subject.onNext("two"); subject.onNext("three"); // observer will receive only onComplete BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.onComplete(); subject.subscribe(observer); // observer will receive only onError BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.onError(new RuntimeException("error")); subject.subscribe(observer); } </pre> *
1.1.3 PublishSubject 緩存
PublishSubject<Object> subject = PublishSubject.create(); // observer1 will receive all onNext and onComplete events subject.subscribe(observer1); subject.onNext("one"); subject.onNext("two"); // observer2 will only receive "three" and onComplete subject.subscribe(observer2); subject.onNext("three"); subject.onComplete();
1.1.4 ReplaySubject網絡
緩存2條數據: ReplaySubject subject = ReplaySubject.createWithSize(2); subject.onNext(0); subject.onNext(1); subject.onNext(3); subject.onNext(4); subject.onNext(5); subject.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "run: onComplete"); } }); subject.onNext(6); subject.onNext(7); subject.onComplete();
輸出結果:
accept: 4
accept: 5
accept: 6
accept: 7
run: onComplete
1.1.5 SerializedSubjectide
Subject
看成一個 Observable使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。串行化:要避免此類問題,你能夠將 Subject
轉換爲一個 SerializedSubject
,直接調用Subject的toSerialized接口:post
public final Subject<T> toSerialized() { if (this instanceof SerializedSubject) { return this; } return new SerializedSubject<T>(this); }
1.2 CompositeDisposablethis
/** * A disposable container that can hold onto multiple other disposables and * offers O(1) add and removal complexity. */
BaseActivity
,用CompositeDisposable來管理訂閱事件disposable,而後在acivity銷燬的時候,調用compositeDisposable.dispose()
就能夠切斷全部訂閱事件,防止內存泄漏。1.3 項目spa
2.1 定義線程
* Example:
* <pre><code>
* Disposable d = Single.just("Hello World")
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .subscribeWith(new DisposableSingleObserver<String>() {
* @Override
* public void onStart() {
* System.out.println("Started");
* }
*
* @Override
* public void onSuccess(String value) {
* System.out.println("Success: " + value);
* }
*
* @Override
* public void onError(Throwable error) {
* error.printStackTrace();
* }
* });
*
* Thread.sleep(5000);
*
* d.dispose();
* </code></pre>
2.3 Completable
* Example:
* <pre><code>
* Disposable d = Completable.complete()
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .subscribeWith(new DisposableCompletableObserver() {
* @Override
* public void onStart() {
* System.out.println("Started");
* }
*
* @Override
* public void onError(Throwable error) {
* error.printStackTrace();
* }
*
* @Override
* public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(5000);
*
* d.dispose();
* </code></pre>
2.4 Maybe
* Example: * <pre><code> * Disposable d = Maybe.just("Hello World") * .delay(10, TimeUnit.SECONDS, Schedulers.io()) * .subscribeWith(new DisposableMaybeObserver<String>() { * @Override * public void onStart() { * System.out.println("Started"); * } * * @Override * public void onSuccess(String value) { * System.out.println("Success: " + value); * } * * @Override * public void onError(Throwable error) { * error.printStackTrace(); * } * * @Override * public void onComplete() { * System.out.println("Done!"); * } * }); * * Thread.sleep(5000); * * d.dispose(); * </code></pre>
public final class RxBus { private final Subject mBus; public static RxBus getInstance() { return Holder.INSTANCE; } private RxBus() { mBus = PublishSubject.create().toSerialized(); } public <U> Observable<U> register(final Class<U> clazz) { return mBus.ofType(clazz); } public void post(Object o) { mBus.onNext(o); } public void unregister(Disposable d) { if (d != null && !d.isDisposed()) { d.dispose(); } } private static final class Holder { private static final RxBus INSTANCE = new RxBus(); } }
Disposable disposable1 = RxBus.getInstance().register(ChapterOne.class) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ChapterOne>() { @Override public void accept(ChapterOne chapterOne) throws Exception { Log.d(TAG, "receive event ChapterOne"); } }); mDisposable1 = disposable1;
RxBus.getInstance().post(new ChapterOne());
@Override protected void onDestroy() { super.onDestroy(); RxBus.getInstance().unregister(mDisposable1); }