最基本的使用方式:ide
角色有:Observable(可觀察的,被觀察者),Observer(觀察者)Subscriber(訂閱者,也可認爲是觀察者)this
Observer接口定義了三種方法:code
public interface Observer<T> { //Observable 通知 Observer 已經正常完成數據傳輸,Observable在最後一次調用onNext後調用該方法 void onCompleted(); // Observable 通知 Observer 遇到了error,若是Obverable調用了該方法,它將不會再調用onCompleted 也不會再調用onNext void onError(Throwable e); //當Observer訂閱的事件發生時,Observable調用該方法通知Observer,Observable能夠屢次調用該方法,但一旦Observable調用了onCompleted或者 onError就不會再調用該方法 void onNext(T t); }
Subscriber 是一個實現了Observer的抽象類,提供了訂閱,取消訂閱,判斷當前訂閱狀態一些功能server
package rx; import rx.internal.util.SubscriptionList; public abstract class Subscriber<T> implements Observer<T>, Subscription { private static final long NOT_SET = Long.MIN_VALUE; private final SubscriptionList subscriptions; private final Subscriber<?> subscriber; private Producer producer; private long requested = NOT_SET; // default to not set protected Subscriber() { this(null, false); } protected Subscriber(Subscriber<?> subscriber) { this(subscriber, true); } protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } /** * 只要list沒有被標識爲取消訂閱(SubscriptionList中有個字段標識),則把當前的Subscription加入。若是list已經被標識爲取消訂閱,該方法會將當前的Subscription也取消 */ public final void add(Subscription s) { subscriptions.add(s); } @Override public final void unsubscribe() { subscriptions.unsubscribe(); } /** * Indicates whether this Subscriber has unsubscribed from its list of subscriptions. */ @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } }
Observable 中維護了Subscriber的對象,當調用observable的subscribe (Subscriber)方法時,會執行Subscriber中的call方法,這個描述跨度有點大,先理解爲當這樣調用的時候,表示被觀察者(Observable)觸發了通知事件,執行觀察者(Subscriber)想執行的操做,具體怎麼到call方法的後面再分析。對象
一個簡單的使用例子接口
public class TestRxj { public static void main(String[] args) { Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { System.out.println("Yeah!Complete!"); } @Override public void onError(Throwable e) { System.out.println("No!Error!"); } @Override public void onNext(String t) { processData(t); } private void processData(String data) { System.out.println("Hello,I'm received something,i'm going to process it "+data); } }; Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> t) { if(!t.isUnsubscribed()){ t.onNext("this is the first data"); t.onCompleted(); } } }; Observable<String> observable = Observable.create(onSubscribe); observable.subscribe(subscriber); } } /** 結果 *Hello,I'm received something,i'm going to process it this is the first data *Yeah!Complete! */
Observable 的subscribe 作的事情:事件
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); } subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { Exceptions.throwIfFatal(e); if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); RxJavaHooks.onObservableError(r); throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
Hystrix中的一段ip
final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }