RxJava

最基本的使用方式:ide

角色有:Observable(可觀察的,被觀察者),Observer(觀察者)Subscriber(訂閱者,也可認爲是觀察者)this

Observer接口定義了三種方法:code

public interface Observer<T> {
    //Observable  通知 Observer 已經正常完成數據傳輸,Observable在最後一次調用onNext後調用該方法
    void onCompleted();
    // Observable 通知 Observer 遇到了error,若是Obverable調用了該方法,它將不會再調用onCompleted 也不會再調用onNext
    void onError(Throwable e);
    //當Observer訂閱的事件發生時,Observable調用該方法通知Observer,Observable能夠屢次調用該方法,但一旦Observable調用了onCompleted或者 onError就不會再調用該方法
    void onNext(T t);
}

Subscriber 是一個實現了Observer的抽象類,提供了訂閱,取消訂閱,判斷當前訂閱狀態一些功能server

package rx;

import rx.internal.util.SubscriptionList;


public abstract class Subscriber<T> implements Observer<T>, Subscription {
    private static final long NOT_SET = Long.MIN_VALUE;

    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
    private Producer producer;
    private long requested = NOT_SET; // default to not set

    protected Subscriber() {
        this(null, false);
    }

    
    protected Subscriber(Subscriber<?> subscriber) {
        this(subscriber, true);
    }

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    /**
     * 只要list沒有被標識爲取消訂閱(SubscriptionList中有個字段標識),則把當前的Subscription加入。若是list已經被標識爲取消訂閱,該方法會將當前的Subscription也取消
     */
    public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    /**
     * Indicates whether this Subscriber has unsubscribed from its list of subscriptions.
     */
    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }    
}

Observable 中維護了Subscriber的對象,當調用observable的subscribe (Subscriber)方法時,會執行Subscriber中的call方法,這個描述跨度有點大,先理解爲當這樣調用的時候,表示被觀察者(Observable)觸發了通知事件,執行觀察者(Subscriber)想執行的操做,具體怎麼到call方法的後面再分析。對象

一個簡單的使用例子接口

public class TestRxj {
    public static void main(String[] args) {
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("Yeah!Complete!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("No!Error!");
            }

            @Override
            public void onNext(String t) {
                processData(t);
            }

            private void processData(String data) {
                System.out.println("Hello,I'm received something,i'm going to process it "+data);
            }

        };

        Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> t) {
                if(!t.isUnsubscribed()){
                    t.onNext("this is the first data");
                    t.onCompleted();
                }
            }
        };
        Observable<String> observable = Observable.create(onSubscribe);

        observable.subscribe(subscriber);
    }
}


/** 結果
*Hello,I'm received something,i'm going to process it this is the first data
*Yeah!Complete!
*/

Observable 的subscribe 作的事情:事件

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }

        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }

        subscriber.onStart();


        if (!(subscriber instanceof SafeSubscriber)) {

            subscriber = new SafeSubscriber<T>(subscriber);
        }


        try {

            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

            return RxJavaHooks.onObservableReturn(subscriber);

        } catch (Throwable e) {

            Exceptions.throwIfFatal(e);

            if (subscriber.isUnsubscribed()) {

                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));

            } else {

                try {

                    subscriber.onError(RxJavaHooks.onObservableError(e));

                } catch (Throwable e2) {

                    Exceptions.throwIfFatal(e2);

                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

                    RxJavaHooks.onObservableError(r);
 
                    throw r; // NOPMD

                }
            }
            return Subscriptions.unsubscribed();
        }
    }

Hystrix中的一段ip

final protected Observable<R> getExecutionObservable() {

        return Observable.defer(new Func0<Observable<R>>() {

            @Override

            public Observable<R> call() {

                try {

                    return Observable.just(run());

                } catch (Throwable ex) {

                    return Observable.error(ex);

                }

            }

        }).doOnSubscribe(new Action0() {

            @Override

            public void call() {

                // Save thread on which we get subscribed so that we can interrupt it later if needed

                executionThread.set(Thread.currentThread());

            }

        });

    }
 
 
相關文章
相關標籤/搜索